1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/chromium/src/chrome/common/ipc_sync_channel.cc Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,453 @@ 1.4 +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1.5 +// Use of this source code is governed by a BSD-style license that can be 1.6 +// found in the LICENSE file. 1.7 + 1.8 +#include "chrome/common/ipc_sync_channel.h" 1.9 + 1.10 +#include "base/lazy_instance.h" 1.11 +#include "base/logging.h" 1.12 +#include "base/thread_local.h" 1.13 +#include "base/message_loop.h" 1.14 +#include "base/waitable_event.h" 1.15 +#include "base/waitable_event_watcher.h" 1.16 +#include "chrome/common/ipc_sync_message.h" 1.17 + 1.18 +using base::TimeDelta; 1.19 +using base::TimeTicks; 1.20 +using base::WaitableEvent; 1.21 + 1.22 +namespace IPC { 1.23 +// When we're blocked in a Send(), we need to process incoming synchronous 1.24 +// messages right away because it could be blocking our reply (either 1.25 +// directly from the same object we're calling, or indirectly through one or 1.26 +// more other channels). That means that in SyncContext's OnMessageReceived, 1.27 +// we need to process sync message right away if we're blocked. However a 1.28 +// simple check isn't sufficient, because the listener thread can be in the 1.29 +// process of calling Send. 1.30 +// To work around this, when SyncChannel filters a sync message, it sets 1.31 +// an event that the listener thread waits on during its Send() call. This 1.32 +// allows us to dispatch incoming sync messages when blocked. The race 1.33 +// condition is handled because if Send is in the process of being called, it 1.34 +// will check the event. In case the listener thread isn't sending a message, 1.35 +// we queue a task on the listener thread to dispatch the received messages. 1.36 +// The messages are stored in this queue object that's shared among all 1.37 +// SyncChannel objects on the same thread (since one object can receive a 1.38 +// sync message while another one is blocked). 1.39 + 1.40 +class SyncChannel::ReceivedSyncMsgQueue : 1.41 + public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 1.42 + public: 1.43 + // Returns the ReceivedSyncMsgQueue instance for this thread, creating one 1.44 + // if necessary. Call RemoveContext on the same thread when done. 1.45 + static ReceivedSyncMsgQueue* AddContext() { 1.46 + // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple 1.47 + // SyncChannel objects can block the same thread). 1.48 + ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 1.49 + if (!rv) { 1.50 + rv = new ReceivedSyncMsgQueue(); 1.51 + ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); 1.52 + } 1.53 + rv->listener_count_++; 1.54 + return rv; 1.55 + } 1.56 + 1.57 + ~ReceivedSyncMsgQueue() { 1.58 + } 1.59 + 1.60 + // Called on IPC thread when a synchronous message or reply arrives. 1.61 + void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { 1.62 + bool was_task_pending; 1.63 + { 1.64 + AutoLock auto_lock(message_lock_); 1.65 + 1.66 + was_task_pending = task_pending_; 1.67 + task_pending_ = true; 1.68 + 1.69 + // We set the event in case the listener thread is blocked (or is about 1.70 + // to). In case it's not, the PostTask dispatches the messages. 1.71 + message_queue_.push_back(QueuedMessage(new Message(msg), context)); 1.72 + } 1.73 + 1.74 + dispatch_event_.Signal(); 1.75 + if (!was_task_pending) { 1.76 + listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 1.77 + this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); 1.78 + } 1.79 + } 1.80 + 1.81 + void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 1.82 + received_replies_.push_back(QueuedMessage(new Message(msg), context)); 1.83 + } 1.84 + 1.85 + // Called on the listener's thread to process any queues synchronous 1.86 + // messages. 1.87 + void DispatchMessagesTask() { 1.88 + { 1.89 + AutoLock auto_lock(message_lock_); 1.90 + task_pending_ = false; 1.91 + } 1.92 + DispatchMessages(); 1.93 + } 1.94 + 1.95 + void DispatchMessages() { 1.96 + while (true) { 1.97 + Message* message; 1.98 + scoped_refptr<SyncChannel::SyncContext> context; 1.99 + { 1.100 + AutoLock auto_lock(message_lock_); 1.101 + if (message_queue_.empty()) 1.102 + break; 1.103 + 1.104 + message = message_queue_.front().message; 1.105 + context = message_queue_.front().context; 1.106 + message_queue_.pop_front(); 1.107 + } 1.108 + 1.109 + context->OnDispatchMessage(*message); 1.110 + delete message; 1.111 + } 1.112 + } 1.113 + 1.114 + // SyncChannel calls this in its destructor. 1.115 + void RemoveContext(SyncContext* context) { 1.116 + AutoLock auto_lock(message_lock_); 1.117 + 1.118 + SyncMessageQueue::iterator iter = message_queue_.begin(); 1.119 + while (iter != message_queue_.end()) { 1.120 + if (iter->context == context) { 1.121 + delete iter->message; 1.122 + iter = message_queue_.erase(iter); 1.123 + } else { 1.124 + iter++; 1.125 + } 1.126 + } 1.127 + 1.128 + if (--listener_count_ == 0) { 1.129 + DCHECK(lazy_tls_ptr_.Pointer()->Get()); 1.130 + lazy_tls_ptr_.Pointer()->Set(NULL); 1.131 + } 1.132 + } 1.133 + 1.134 + WaitableEvent* dispatch_event() { return &dispatch_event_; } 1.135 + MessageLoop* listener_message_loop() { return listener_message_loop_; } 1.136 + 1.137 + // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 1.138 + static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 1.139 + lazy_tls_ptr_; 1.140 + 1.141 + // Called on the ipc thread to check if we can unblock any current Send() 1.142 + // calls based on a queued reply. 1.143 + void DispatchReplies() { 1.144 + for (size_t i = 0; i < received_replies_.size(); ++i) { 1.145 + Message* message = received_replies_[i].message; 1.146 + if (received_replies_[i].context->TryToUnblockListener(message)) { 1.147 + delete message; 1.148 + received_replies_.erase(received_replies_.begin() + i); 1.149 + return; 1.150 + } 1.151 + } 1.152 + } 1.153 + 1.154 + private: 1.155 + // See the comment in SyncChannel::SyncChannel for why this event is created 1.156 + // as manual reset. 1.157 + ReceivedSyncMsgQueue() : 1.158 + dispatch_event_(true, false), 1.159 + listener_message_loop_(MessageLoop::current()), 1.160 + task_pending_(false), 1.161 + listener_count_(0) { 1.162 + } 1.163 + 1.164 + // Holds information about a queued synchronous message or reply. 1.165 + struct QueuedMessage { 1.166 + QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 1.167 + Message* message; 1.168 + scoped_refptr<SyncChannel::SyncContext> context; 1.169 + }; 1.170 + 1.171 + typedef std::deque<QueuedMessage> SyncMessageQueue; 1.172 + SyncMessageQueue message_queue_; 1.173 + 1.174 + std::vector<QueuedMessage> received_replies_; 1.175 + 1.176 + // Set when we got a synchronous message that we must respond to as the 1.177 + // sender needs its reply before it can reply to our original synchronous 1.178 + // message. 1.179 + WaitableEvent dispatch_event_; 1.180 + MessageLoop* listener_message_loop_; 1.181 + Lock message_lock_; 1.182 + bool task_pending_; 1.183 + int listener_count_; 1.184 +}; 1.185 + 1.186 +base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 1.187 + SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); 1.188 + 1.189 +SyncChannel::SyncContext::SyncContext( 1.190 + Channel::Listener* listener, 1.191 + MessageFilter* filter, 1.192 + MessageLoop* ipc_thread, 1.193 + WaitableEvent* shutdown_event) 1.194 + : ChannelProxy::Context(listener, filter, ipc_thread), 1.195 + received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 1.196 + shutdown_event_(shutdown_event) { 1.197 +} 1.198 + 1.199 +SyncChannel::SyncContext::~SyncContext() { 1.200 + while (!deserializers_.empty()) 1.201 + Pop(); 1.202 +} 1.203 + 1.204 +// Adds information about an outgoing sync message to the context so that 1.205 +// we know how to deserialize the reply. Returns a handle that's set when 1.206 +// the reply has arrived. 1.207 +void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 1.208 + // The event is created as manual reset because in between Signal and 1.209 + // OnObjectSignalled, another Send can happen which would stop the watcher 1.210 + // from being called. The event would get watched later, when the nested 1.211 + // Send completes, so the event will need to remain set. 1.212 + PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), 1.213 + sync_msg->GetReplyDeserializer(), 1.214 + new WaitableEvent(true, false)); 1.215 + AutoLock auto_lock(deserializers_lock_); 1.216 + deserializers_.push_back(pending); 1.217 +} 1.218 + 1.219 +bool SyncChannel::SyncContext::Pop() { 1.220 + bool result; 1.221 + { 1.222 + AutoLock auto_lock(deserializers_lock_); 1.223 + PendingSyncMsg msg = deserializers_.back(); 1.224 + delete msg.deserializer; 1.225 + delete msg.done_event; 1.226 + msg.done_event = NULL; 1.227 + deserializers_.pop_back(); 1.228 + result = msg.send_result; 1.229 + } 1.230 + 1.231 + // We got a reply to a synchronous Send() call that's blocking the listener 1.232 + // thread. However, further down the call stack there could be another 1.233 + // blocking Send() call, whose reply we received after we made this last 1.234 + // Send() call. So check if we have any queued replies available that 1.235 + // can now unblock the listener thread. 1.236 + ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 1.237 + received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); 1.238 + 1.239 + return result; 1.240 +} 1.241 + 1.242 +WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 1.243 + AutoLock auto_lock(deserializers_lock_); 1.244 + return deserializers_.back().done_event; 1.245 +} 1.246 + 1.247 +WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 1.248 + return received_sync_msgs_->dispatch_event(); 1.249 +} 1.250 + 1.251 +void SyncChannel::SyncContext::DispatchMessages() { 1.252 + received_sync_msgs_->DispatchMessages(); 1.253 +} 1.254 + 1.255 +bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 1.256 + AutoLock auto_lock(deserializers_lock_); 1.257 + if (deserializers_.empty() || 1.258 + !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 1.259 + return false; 1.260 + } 1.261 + 1.262 + if (!msg->is_reply_error()) { 1.263 + deserializers_.back().send_result = deserializers_.back().deserializer-> 1.264 + SerializeOutputParameters(*msg); 1.265 + } 1.266 + deserializers_.back().done_event->Signal(); 1.267 + 1.268 + return true; 1.269 +} 1.270 + 1.271 +void SyncChannel::SyncContext::Clear() { 1.272 + CancelPendingSends(); 1.273 + received_sync_msgs_->RemoveContext(this); 1.274 + 1.275 + Context::Clear(); 1.276 +} 1.277 + 1.278 +void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { 1.279 + // Give the filters a chance at processing this message. 1.280 + if (TryFilters(msg)) 1.281 + return; 1.282 + 1.283 + if (TryToUnblockListener(&msg)) 1.284 + return; 1.285 + 1.286 + if (msg.should_unblock()) { 1.287 + received_sync_msgs_->QueueMessage(msg, this); 1.288 + return; 1.289 + } 1.290 + 1.291 + if (msg.is_reply()) { 1.292 + received_sync_msgs_->QueueReply(msg, this); 1.293 + return; 1.294 + } 1.295 + 1.296 + return Context::OnMessageReceivedNoFilter(msg); 1.297 +} 1.298 + 1.299 +void SyncChannel::SyncContext::OnChannelError() { 1.300 + CancelPendingSends(); 1.301 + shutdown_watcher_.StopWatching(); 1.302 + Context::OnChannelError(); 1.303 +} 1.304 + 1.305 +void SyncChannel::SyncContext::OnChannelOpened() { 1.306 + shutdown_watcher_.StartWatching(shutdown_event_, this); 1.307 + Context::OnChannelOpened(); 1.308 +} 1.309 + 1.310 +void SyncChannel::SyncContext::OnChannelClosed() { 1.311 + shutdown_watcher_.StopWatching(); 1.312 + Context::OnChannelClosed(); 1.313 +} 1.314 + 1.315 +void SyncChannel::SyncContext::OnSendTimeout(int message_id) { 1.316 + AutoLock auto_lock(deserializers_lock_); 1.317 + PendingSyncMessageQueue::iterator iter; 1.318 + for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 1.319 + if (iter->id == message_id) { 1.320 + iter->done_event->Signal(); 1.321 + break; 1.322 + } 1.323 + } 1.324 +} 1.325 + 1.326 +void SyncChannel::SyncContext::CancelPendingSends() { 1.327 + AutoLock auto_lock(deserializers_lock_); 1.328 + PendingSyncMessageQueue::iterator iter; 1.329 + for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) 1.330 + iter->done_event->Signal(); 1.331 +} 1.332 + 1.333 +void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { 1.334 + DCHECK(event == shutdown_event_); 1.335 + // Process shut down before we can get a reply to a synchronous message. 1.336 + // Cancel pending Send calls, which will end up setting the send done event. 1.337 + CancelPendingSends(); 1.338 +} 1.339 + 1.340 + 1.341 +SyncChannel::SyncChannel( 1.342 + const std::wstring& channel_id, Channel::Mode mode, 1.343 + Channel::Listener* listener, MessageFilter* filter, 1.344 + MessageLoop* ipc_message_loop, bool create_pipe_now, 1.345 + WaitableEvent* shutdown_event) 1.346 + : ChannelProxy( 1.347 + channel_id, mode, ipc_message_loop, 1.348 + new SyncContext(listener, filter, ipc_message_loop, shutdown_event), 1.349 + create_pipe_now), 1.350 + sync_messages_with_no_timeout_allowed_(true) { 1.351 + // Ideally we only want to watch this object when running a nested message 1.352 + // loop. However, we don't know when it exits if there's another nested 1.353 + // message loop running under it or not, so we wouldn't know whether to 1.354 + // stop or keep watching. So we always watch it, and create the event as 1.355 + // manual reset since the object watcher might otherwise reset the event 1.356 + // when we're doing a WaitMany. 1.357 + dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); 1.358 +} 1.359 + 1.360 +SyncChannel::~SyncChannel() { 1.361 +} 1.362 + 1.363 +bool SyncChannel::Send(Message* message) { 1.364 + return SendWithTimeout(message, base::kNoTimeout); 1.365 +} 1.366 + 1.367 +bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { 1.368 + if (!message->is_sync()) { 1.369 + ChannelProxy::Send(message); 1.370 + return true; 1.371 + } 1.372 + 1.373 + // *this* might get deleted in WaitForReply. 1.374 + scoped_refptr<SyncContext> context(sync_context()); 1.375 + if (context->shutdown_event()->IsSignaled()) { 1.376 + delete message; 1.377 + return false; 1.378 + } 1.379 + 1.380 + DCHECK(sync_messages_with_no_timeout_allowed_ || 1.381 + timeout_ms != base::kNoTimeout); 1.382 + SyncMessage* sync_msg = static_cast<SyncMessage*>(message); 1.383 + context->Push(sync_msg); 1.384 + int message_id = SyncMessage::GetMessageId(*sync_msg); 1.385 + WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); 1.386 + 1.387 + ChannelProxy::Send(message); 1.388 + 1.389 + if (timeout_ms != base::kNoTimeout) { 1.390 + // We use the sync message id so that when a message times out, we don't 1.391 + // confuse it with another send that is either above/below this Send in 1.392 + // the call stack. 1.393 + context->ipc_message_loop()->PostDelayedTask(FROM_HERE, 1.394 + NewRunnableMethod(context.get(), 1.395 + &SyncContext::OnSendTimeout, message_id), timeout_ms); 1.396 + } 1.397 + 1.398 + // Wait for reply, or for any other incoming synchronous messages. 1.399 + WaitForReply(pump_messages_event); 1.400 + 1.401 + return context->Pop(); 1.402 +} 1.403 + 1.404 +void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) { 1.405 + while (true) { 1.406 + WaitableEvent* objects[] = { 1.407 + sync_context()->GetDispatchEvent(), 1.408 + sync_context()->GetSendDoneEvent(), 1.409 + pump_messages_event 1.410 + }; 1.411 + 1.412 + unsigned count = pump_messages_event ? 3: 2; 1.413 + unsigned result = WaitableEvent::WaitMany(objects, count); 1.414 + if (result == 0 /* dispatch event */) { 1.415 + // We're waiting for a reply, but we received a blocking synchronous 1.416 + // call. We must process it or otherwise a deadlock might occur. 1.417 + sync_context()->GetDispatchEvent()->Reset(); 1.418 + sync_context()->DispatchMessages(); 1.419 + continue; 1.420 + } 1.421 + 1.422 + if (result == 2 /* pump_messages_event */) 1.423 + WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. 1.424 + 1.425 + break; 1.426 + } 1.427 +} 1.428 + 1.429 +void SyncChannel::WaitForReplyWithNestedMessageLoop() { 1.430 + WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent(); 1.431 + send_done_watcher_.StopWatching(); 1.432 + send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); 1.433 + bool old_state = MessageLoop::current()->NestableTasksAllowed(); 1.434 + MessageLoop::current()->SetNestableTasksAllowed(true); 1.435 + MessageLoop::current()->Run(); 1.436 + MessageLoop::current()->SetNestableTasksAllowed(old_state); 1.437 + if (old_done_event) 1.438 + send_done_watcher_.StartWatching(old_done_event, this); 1.439 +} 1.440 + 1.441 +void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 1.442 + WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent(); 1.443 + if (event == dispatch_event) { 1.444 + // The call to DispatchMessages might delete this object, so reregister 1.445 + // the object watcher first. 1.446 + dispatch_event->Reset(); 1.447 + dispatch_watcher_.StartWatching(dispatch_event, this); 1.448 + sync_context()->DispatchMessages(); 1.449 + } else { 1.450 + // We got the reply, timed out or the process shutdown. 1.451 + DCHECK(event == sync_context()->GetSendDoneEvent()); 1.452 + MessageLoop::current()->Quit(); 1.453 + } 1.454 +} 1.455 + 1.456 +} // namespace IPC