|
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
|
2 // Use of this source code is governed by a BSD-style license that can be |
|
3 // found in the LICENSE file. |
|
4 |
|
5 #include "chrome/common/ipc_channel_win.h" |
|
6 |
|
7 #include <windows.h> |
|
8 #include <sstream> |
|
9 |
|
10 #include "base/compiler_specific.h" |
|
11 #include "base/logging.h" |
|
12 #include "base/non_thread_safe.h" |
|
13 #include "base/stats_counters.h" |
|
14 #include "base/win_util.h" |
|
15 #include "chrome/common/ipc_logging.h" |
|
16 #include "chrome/common/ipc_message_utils.h" |
|
17 #include "mozilla/ipc/ProtocolUtils.h" |
|
18 |
|
19 namespace IPC { |
|
20 //------------------------------------------------------------------------------ |
|
21 |
|
22 Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) { |
|
23 memset(&context.overlapped, 0, sizeof(context.overlapped)); |
|
24 context.handler = channel; |
|
25 } |
|
26 |
|
27 Channel::ChannelImpl::State::~State() { |
|
28 COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context), |
|
29 starts_with_io_context); |
|
30 } |
|
31 |
|
32 //------------------------------------------------------------------------------ |
|
33 |
|
34 Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, |
|
35 Listener* listener) |
|
36 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), |
|
37 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), |
|
38 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { |
|
39 Init(mode, listener); |
|
40 |
|
41 if (!CreatePipe(channel_id, mode)) { |
|
42 // The pipe may have been closed already. |
|
43 CHROMIUM_LOG(WARNING) << "Unable to create pipe named \"" << channel_id << |
|
44 "\" in " << (mode == 0 ? "server" : "client") << " mode."; |
|
45 } |
|
46 } |
|
47 |
|
48 Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, |
|
49 HANDLE server_pipe, |
|
50 Mode mode, Listener* listener) |
|
51 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), |
|
52 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), |
|
53 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { |
|
54 Init(mode, listener); |
|
55 |
|
56 if (mode == MODE_SERVER) { |
|
57 // Use the existing handle that was dup'd to us |
|
58 pipe_ = server_pipe; |
|
59 EnqueueHelloMessage(); |
|
60 } else { |
|
61 // Take the normal init path to connect to the server pipe |
|
62 CreatePipe(channel_id, mode); |
|
63 } |
|
64 } |
|
65 |
|
66 void Channel::ChannelImpl::Init(Mode mode, Listener* listener) { |
|
67 pipe_ = INVALID_HANDLE_VALUE; |
|
68 listener_ = listener; |
|
69 waiting_connect_ = (mode == MODE_SERVER); |
|
70 processing_incoming_ = false; |
|
71 closed_ = false; |
|
72 output_queue_length_ = 0; |
|
73 } |
|
74 |
|
75 void Channel::ChannelImpl::OutputQueuePush(Message* msg) |
|
76 { |
|
77 output_queue_.push(msg); |
|
78 output_queue_length_++; |
|
79 } |
|
80 |
|
81 void Channel::ChannelImpl::OutputQueuePop() |
|
82 { |
|
83 output_queue_.pop(); |
|
84 output_queue_length_--; |
|
85 } |
|
86 |
|
87 HANDLE Channel::ChannelImpl::GetServerPipeHandle() const { |
|
88 return pipe_; |
|
89 } |
|
90 |
|
91 void Channel::ChannelImpl::Close() { |
|
92 if (thread_check_.get()) { |
|
93 DCHECK(thread_check_->CalledOnValidThread()); |
|
94 } |
|
95 |
|
96 bool waited = false; |
|
97 if (input_state_.is_pending || output_state_.is_pending) { |
|
98 CancelIo(pipe_); |
|
99 waited = true; |
|
100 } |
|
101 |
|
102 // Closing the handle at this point prevents us from issuing more requests |
|
103 // form OnIOCompleted(). |
|
104 if (pipe_ != INVALID_HANDLE_VALUE) { |
|
105 CloseHandle(pipe_); |
|
106 pipe_ = INVALID_HANDLE_VALUE; |
|
107 } |
|
108 |
|
109 while (input_state_.is_pending || output_state_.is_pending) { |
|
110 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); |
|
111 } |
|
112 |
|
113 while (!output_queue_.empty()) { |
|
114 Message* m = output_queue_.front(); |
|
115 OutputQueuePop(); |
|
116 delete m; |
|
117 } |
|
118 |
|
119 if (thread_check_.get()) |
|
120 thread_check_.reset(); |
|
121 |
|
122 closed_ = true; |
|
123 } |
|
124 |
|
125 bool Channel::ChannelImpl::Send(Message* message) { |
|
126 if (thread_check_.get()) { |
|
127 DCHECK(thread_check_->CalledOnValidThread()); |
|
128 } |
|
129 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
|
130 DLOG(INFO) << "sending message @" << message << " on channel @" << this |
|
131 << " with type " << message->type() |
|
132 << " (" << output_queue_.size() << " in queue)"; |
|
133 #endif |
|
134 |
|
135 #ifdef IPC_MESSAGE_LOG_ENABLED |
|
136 Logging::current()->OnSendMessage(message, L""); |
|
137 #endif |
|
138 |
|
139 if (closed_) { |
|
140 if (mozilla::ipc::LoggingEnabled()) { |
|
141 fprintf(stderr, "Can't send message %s, because this channel is closed.\n", |
|
142 message->name()); |
|
143 } |
|
144 delete message; |
|
145 return false; |
|
146 } |
|
147 |
|
148 OutputQueuePush(message); |
|
149 // ensure waiting to write |
|
150 if (!waiting_connect_) { |
|
151 if (!output_state_.is_pending) { |
|
152 if (!ProcessOutgoingMessages(NULL, 0)) |
|
153 return false; |
|
154 } |
|
155 } |
|
156 |
|
157 return true; |
|
158 } |
|
159 |
|
160 const std::wstring Channel::ChannelImpl::PipeName( |
|
161 const std::wstring& channel_id) const { |
|
162 std::wostringstream ss; |
|
163 // XXX(darin): get application name from somewhere else |
|
164 ss << L"\\\\.\\pipe\\chrome." << channel_id; |
|
165 return ss.str(); |
|
166 } |
|
167 |
|
168 bool Channel::ChannelImpl::CreatePipe(const std::wstring& channel_id, |
|
169 Mode mode) { |
|
170 DCHECK(pipe_ == INVALID_HANDLE_VALUE); |
|
171 const std::wstring pipe_name = PipeName(channel_id); |
|
172 if (mode == MODE_SERVER) { |
|
173 pipe_ = CreateNamedPipeW(pipe_name.c_str(), |
|
174 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | |
|
175 FILE_FLAG_FIRST_PIPE_INSTANCE, |
|
176 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, |
|
177 1, // number of pipe instances |
|
178 // output buffer size (XXX tune) |
|
179 Channel::kReadBufferSize, |
|
180 // input buffer size (XXX tune) |
|
181 Channel::kReadBufferSize, |
|
182 5000, // timeout in milliseconds (XXX tune) |
|
183 NULL); |
|
184 } else { |
|
185 pipe_ = CreateFileW(pipe_name.c_str(), |
|
186 GENERIC_READ | GENERIC_WRITE, |
|
187 0, |
|
188 NULL, |
|
189 OPEN_EXISTING, |
|
190 SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | |
|
191 FILE_FLAG_OVERLAPPED, |
|
192 NULL); |
|
193 } |
|
194 if (pipe_ == INVALID_HANDLE_VALUE) { |
|
195 // If this process is being closed, the pipe may be gone already. |
|
196 CHROMIUM_LOG(WARNING) << "failed to create pipe: " << GetLastError(); |
|
197 return false; |
|
198 } |
|
199 |
|
200 // Create the Hello message to be sent when Connect is called |
|
201 return EnqueueHelloMessage(); |
|
202 } |
|
203 |
|
204 bool Channel::ChannelImpl::EnqueueHelloMessage() { |
|
205 scoped_ptr<Message> m(new Message(MSG_ROUTING_NONE, |
|
206 HELLO_MESSAGE_TYPE, |
|
207 IPC::Message::PRIORITY_NORMAL)); |
|
208 if (!m->WriteInt(GetCurrentProcessId())) { |
|
209 CloseHandle(pipe_); |
|
210 pipe_ = INVALID_HANDLE_VALUE; |
|
211 return false; |
|
212 } |
|
213 |
|
214 OutputQueuePush(m.release()); |
|
215 return true; |
|
216 } |
|
217 |
|
218 bool Channel::ChannelImpl::Connect() { |
|
219 if (!thread_check_.get()) |
|
220 thread_check_.reset(new NonThreadSafe()); |
|
221 |
|
222 if (pipe_ == INVALID_HANDLE_VALUE) |
|
223 return false; |
|
224 |
|
225 MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); |
|
226 |
|
227 // Check to see if there is a client connected to our pipe... |
|
228 if (waiting_connect_) |
|
229 ProcessConnection(); |
|
230 |
|
231 if (!input_state_.is_pending) { |
|
232 // Complete setup asynchronously. By not setting input_state_.is_pending |
|
233 // to true, we indicate to OnIOCompleted that this is the special |
|
234 // initialization signal. |
|
235 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( |
|
236 &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0)); |
|
237 } |
|
238 |
|
239 if (!waiting_connect_) |
|
240 ProcessOutgoingMessages(NULL, 0); |
|
241 return true; |
|
242 } |
|
243 |
|
244 bool Channel::ChannelImpl::ProcessConnection() { |
|
245 DCHECK(thread_check_->CalledOnValidThread()); |
|
246 if (input_state_.is_pending) |
|
247 input_state_.is_pending = false; |
|
248 |
|
249 // Do we have a client connected to our pipe? |
|
250 if (INVALID_HANDLE_VALUE == pipe_) |
|
251 return false; |
|
252 |
|
253 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); |
|
254 |
|
255 DWORD err = GetLastError(); |
|
256 if (ok) { |
|
257 // Uhm, the API documentation says that this function should never |
|
258 // return success when used in overlapped mode. |
|
259 NOTREACHED(); |
|
260 return false; |
|
261 } |
|
262 |
|
263 switch (err) { |
|
264 case ERROR_IO_PENDING: |
|
265 input_state_.is_pending = true; |
|
266 break; |
|
267 case ERROR_PIPE_CONNECTED: |
|
268 waiting_connect_ = false; |
|
269 break; |
|
270 default: |
|
271 NOTREACHED(); |
|
272 return false; |
|
273 } |
|
274 |
|
275 return true; |
|
276 } |
|
277 |
|
278 bool Channel::ChannelImpl::ProcessIncomingMessages( |
|
279 MessageLoopForIO::IOContext* context, |
|
280 DWORD bytes_read) { |
|
281 DCHECK(thread_check_->CalledOnValidThread()); |
|
282 if (input_state_.is_pending) { |
|
283 input_state_.is_pending = false; |
|
284 DCHECK(context); |
|
285 |
|
286 if (!context || !bytes_read) |
|
287 return false; |
|
288 } else { |
|
289 // This happens at channel initialization. |
|
290 DCHECK(!bytes_read && context == &input_state_.context); |
|
291 } |
|
292 |
|
293 for (;;) { |
|
294 if (bytes_read == 0) { |
|
295 if (INVALID_HANDLE_VALUE == pipe_) |
|
296 return false; |
|
297 |
|
298 // Read from pipe... |
|
299 BOOL ok = ReadFile(pipe_, |
|
300 input_buf_, |
|
301 Channel::kReadBufferSize, |
|
302 &bytes_read, |
|
303 &input_state_.context.overlapped); |
|
304 if (!ok) { |
|
305 DWORD err = GetLastError(); |
|
306 if (err == ERROR_IO_PENDING) { |
|
307 input_state_.is_pending = true; |
|
308 return true; |
|
309 } |
|
310 CHROMIUM_LOG(ERROR) << "pipe error: " << err; |
|
311 return false; |
|
312 } |
|
313 input_state_.is_pending = true; |
|
314 return true; |
|
315 } |
|
316 DCHECK(bytes_read); |
|
317 |
|
318 // Process messages from input buffer. |
|
319 |
|
320 const char* p, *end; |
|
321 if (input_overflow_buf_.empty()) { |
|
322 p = input_buf_; |
|
323 end = p + bytes_read; |
|
324 } else { |
|
325 if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { |
|
326 input_overflow_buf_.clear(); |
|
327 CHROMIUM_LOG(ERROR) << "IPC message is too big"; |
|
328 return false; |
|
329 } |
|
330 input_overflow_buf_.append(input_buf_, bytes_read); |
|
331 p = input_overflow_buf_.data(); |
|
332 end = p + input_overflow_buf_.size(); |
|
333 } |
|
334 |
|
335 while (p < end) { |
|
336 const char* message_tail = Message::FindNext(p, end); |
|
337 if (message_tail) { |
|
338 int len = static_cast<int>(message_tail - p); |
|
339 const Message m(p, len); |
|
340 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
|
341 DLOG(INFO) << "received message on channel @" << this << |
|
342 " with type " << m.type(); |
|
343 #endif |
|
344 if (m.routing_id() == MSG_ROUTING_NONE && |
|
345 m.type() == HELLO_MESSAGE_TYPE) { |
|
346 // The Hello message contains only the process id. |
|
347 listener_->OnChannelConnected(MessageIterator(m).NextInt()); |
|
348 } else { |
|
349 listener_->OnMessageReceived(m); |
|
350 } |
|
351 p = message_tail; |
|
352 } else { |
|
353 // Last message is partial. |
|
354 break; |
|
355 } |
|
356 } |
|
357 input_overflow_buf_.assign(p, end - p); |
|
358 |
|
359 bytes_read = 0; // Get more data. |
|
360 } |
|
361 |
|
362 return true; |
|
363 } |
|
364 |
|
365 bool Channel::ChannelImpl::ProcessOutgoingMessages( |
|
366 MessageLoopForIO::IOContext* context, |
|
367 DWORD bytes_written) { |
|
368 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
|
369 // no connection? |
|
370 DCHECK(thread_check_->CalledOnValidThread()); |
|
371 |
|
372 if (output_state_.is_pending) { |
|
373 DCHECK(context); |
|
374 output_state_.is_pending = false; |
|
375 if (!context || bytes_written == 0) { |
|
376 DWORD err = GetLastError(); |
|
377 CHROMIUM_LOG(ERROR) << "pipe error: " << err; |
|
378 return false; |
|
379 } |
|
380 // Message was sent. |
|
381 DCHECK(!output_queue_.empty()); |
|
382 Message* m = output_queue_.front(); |
|
383 OutputQueuePop(); |
|
384 delete m; |
|
385 } |
|
386 |
|
387 if (output_queue_.empty()) |
|
388 return true; |
|
389 |
|
390 if (INVALID_HANDLE_VALUE == pipe_) |
|
391 return false; |
|
392 |
|
393 // Write to pipe... |
|
394 Message* m = output_queue_.front(); |
|
395 BOOL ok = WriteFile(pipe_, |
|
396 m->data(), |
|
397 m->size(), |
|
398 &bytes_written, |
|
399 &output_state_.context.overlapped); |
|
400 if (!ok) { |
|
401 DWORD err = GetLastError(); |
|
402 if (err == ERROR_IO_PENDING) { |
|
403 output_state_.is_pending = true; |
|
404 |
|
405 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
|
406 DLOG(INFO) << "sent pending message @" << m << " on channel @" << |
|
407 this << " with type " << m->type(); |
|
408 #endif |
|
409 |
|
410 return true; |
|
411 } |
|
412 CHROMIUM_LOG(ERROR) << "pipe error: " << err; |
|
413 return false; |
|
414 } |
|
415 |
|
416 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
|
417 DLOG(INFO) << "sent message @" << m << " on channel @" << this << |
|
418 " with type " << m->type(); |
|
419 #endif |
|
420 |
|
421 output_state_.is_pending = true; |
|
422 return true; |
|
423 } |
|
424 |
|
425 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
|
426 DWORD bytes_transfered, DWORD error) { |
|
427 bool ok; |
|
428 DCHECK(thread_check_->CalledOnValidThread()); |
|
429 if (context == &input_state_.context) { |
|
430 if (waiting_connect_) { |
|
431 if (!ProcessConnection()) |
|
432 return; |
|
433 // We may have some messages queued up to send... |
|
434 if (!output_queue_.empty() && !output_state_.is_pending) |
|
435 ProcessOutgoingMessages(NULL, 0); |
|
436 if (input_state_.is_pending) |
|
437 return; |
|
438 // else, fall-through and look for incoming messages... |
|
439 } |
|
440 // we don't support recursion through OnMessageReceived yet! |
|
441 DCHECK(!processing_incoming_); |
|
442 processing_incoming_ = true; |
|
443 ok = ProcessIncomingMessages(context, bytes_transfered); |
|
444 processing_incoming_ = false; |
|
445 } else { |
|
446 DCHECK(context == &output_state_.context); |
|
447 ok = ProcessOutgoingMessages(context, bytes_transfered); |
|
448 } |
|
449 if (!ok && INVALID_HANDLE_VALUE != pipe_) { |
|
450 // We don't want to re-enter Close(). |
|
451 Close(); |
|
452 listener_->OnChannelError(); |
|
453 } |
|
454 } |
|
455 |
|
456 bool Channel::ChannelImpl::Unsound_IsClosed() const |
|
457 { |
|
458 return closed_; |
|
459 } |
|
460 |
|
461 uint32_t Channel::ChannelImpl::Unsound_NumQueuedMessages() const |
|
462 { |
|
463 return output_queue_length_; |
|
464 } |
|
465 |
|
466 //------------------------------------------------------------------------------ |
|
467 // Channel's methods simply call through to ChannelImpl. |
|
468 Channel::Channel(const std::wstring& channel_id, Mode mode, |
|
469 Listener* listener) |
|
470 : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { |
|
471 } |
|
472 |
|
473 Channel::Channel(const std::wstring& channel_id, void* server_pipe, |
|
474 Mode mode, Listener* listener) |
|
475 : channel_impl_(new ChannelImpl(channel_id, server_pipe, mode, listener)) { |
|
476 } |
|
477 |
|
478 Channel::~Channel() { |
|
479 delete channel_impl_; |
|
480 } |
|
481 |
|
482 bool Channel::Connect() { |
|
483 return channel_impl_->Connect(); |
|
484 } |
|
485 |
|
486 void Channel::Close() { |
|
487 channel_impl_->Close(); |
|
488 } |
|
489 |
|
490 void* Channel::GetServerPipeHandle() const { |
|
491 return channel_impl_->GetServerPipeHandle(); |
|
492 } |
|
493 |
|
494 Channel::Listener* Channel::set_listener(Listener* listener) { |
|
495 return channel_impl_->set_listener(listener); |
|
496 } |
|
497 |
|
498 bool Channel::Send(Message* message) { |
|
499 return channel_impl_->Send(message); |
|
500 } |
|
501 |
|
502 bool Channel::Unsound_IsClosed() const { |
|
503 return channel_impl_->Unsound_IsClosed(); |
|
504 } |
|
505 |
|
506 uint32_t Channel::Unsound_NumQueuedMessages() const { |
|
507 return channel_impl_->Unsound_NumQueuedMessages(); |
|
508 } |
|
509 |
|
510 } // namespace IPC |