michael@0: // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. michael@0: // Use of this source code is governed by a BSD-style license that can be michael@0: // found in the LICENSE file. michael@0: michael@0: #include "chrome/common/ipc_sync_channel.h" michael@0: michael@0: #include "base/lazy_instance.h" michael@0: #include "base/logging.h" michael@0: #include "base/thread_local.h" michael@0: #include "base/message_loop.h" michael@0: #include "base/waitable_event.h" michael@0: #include "base/waitable_event_watcher.h" michael@0: #include "chrome/common/ipc_sync_message.h" michael@0: michael@0: using base::TimeDelta; michael@0: using base::TimeTicks; michael@0: using base::WaitableEvent; michael@0: michael@0: namespace IPC { michael@0: // When we're blocked in a Send(), we need to process incoming synchronous michael@0: // messages right away because it could be blocking our reply (either michael@0: // directly from the same object we're calling, or indirectly through one or michael@0: // more other channels). That means that in SyncContext's OnMessageReceived, michael@0: // we need to process sync message right away if we're blocked. However a michael@0: // simple check isn't sufficient, because the listener thread can be in the michael@0: // process of calling Send. michael@0: // To work around this, when SyncChannel filters a sync message, it sets michael@0: // an event that the listener thread waits on during its Send() call. This michael@0: // allows us to dispatch incoming sync messages when blocked. The race michael@0: // condition is handled because if Send is in the process of being called, it michael@0: // will check the event. In case the listener thread isn't sending a message, michael@0: // we queue a task on the listener thread to dispatch the received messages. michael@0: // The messages are stored in this queue object that's shared among all michael@0: // SyncChannel objects on the same thread (since one object can receive a michael@0: // sync message while another one is blocked). michael@0: michael@0: class SyncChannel::ReceivedSyncMsgQueue : michael@0: public base::RefCountedThreadSafe { michael@0: public: michael@0: // Returns the ReceivedSyncMsgQueue instance for this thread, creating one michael@0: // if necessary. Call RemoveContext on the same thread when done. michael@0: static ReceivedSyncMsgQueue* AddContext() { michael@0: // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple michael@0: // SyncChannel objects can block the same thread). michael@0: ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); michael@0: if (!rv) { michael@0: rv = new ReceivedSyncMsgQueue(); michael@0: ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); michael@0: } michael@0: rv->listener_count_++; michael@0: return rv; michael@0: } michael@0: michael@0: ~ReceivedSyncMsgQueue() { michael@0: } michael@0: michael@0: // Called on IPC thread when a synchronous message or reply arrives. michael@0: void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { michael@0: bool was_task_pending; michael@0: { michael@0: AutoLock auto_lock(message_lock_); michael@0: michael@0: was_task_pending = task_pending_; michael@0: task_pending_ = true; michael@0: michael@0: // We set the event in case the listener thread is blocked (or is about michael@0: // to). In case it's not, the PostTask dispatches the messages. michael@0: message_queue_.push_back(QueuedMessage(new Message(msg), context)); michael@0: } michael@0: michael@0: dispatch_event_.Signal(); michael@0: if (!was_task_pending) { michael@0: listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( michael@0: this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); michael@0: } michael@0: } michael@0: michael@0: void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { michael@0: received_replies_.push_back(QueuedMessage(new Message(msg), context)); michael@0: } michael@0: michael@0: // Called on the listener's thread to process any queues synchronous michael@0: // messages. michael@0: void DispatchMessagesTask() { michael@0: { michael@0: AutoLock auto_lock(message_lock_); michael@0: task_pending_ = false; michael@0: } michael@0: DispatchMessages(); michael@0: } michael@0: michael@0: void DispatchMessages() { michael@0: while (true) { michael@0: Message* message; michael@0: scoped_refptr context; michael@0: { michael@0: AutoLock auto_lock(message_lock_); michael@0: if (message_queue_.empty()) michael@0: break; michael@0: michael@0: message = message_queue_.front().message; michael@0: context = message_queue_.front().context; michael@0: message_queue_.pop_front(); michael@0: } michael@0: michael@0: context->OnDispatchMessage(*message); michael@0: delete message; michael@0: } michael@0: } michael@0: michael@0: // SyncChannel calls this in its destructor. michael@0: void RemoveContext(SyncContext* context) { michael@0: AutoLock auto_lock(message_lock_); michael@0: michael@0: SyncMessageQueue::iterator iter = message_queue_.begin(); michael@0: while (iter != message_queue_.end()) { michael@0: if (iter->context == context) { michael@0: delete iter->message; michael@0: iter = message_queue_.erase(iter); michael@0: } else { michael@0: iter++; michael@0: } michael@0: } michael@0: michael@0: if (--listener_count_ == 0) { michael@0: DCHECK(lazy_tls_ptr_.Pointer()->Get()); michael@0: lazy_tls_ptr_.Pointer()->Set(NULL); michael@0: } michael@0: } michael@0: michael@0: WaitableEvent* dispatch_event() { return &dispatch_event_; } michael@0: MessageLoop* listener_message_loop() { return listener_message_loop_; } michael@0: michael@0: // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. michael@0: static base::LazyInstance > michael@0: lazy_tls_ptr_; michael@0: michael@0: // Called on the ipc thread to check if we can unblock any current Send() michael@0: // calls based on a queued reply. michael@0: void DispatchReplies() { michael@0: for (size_t i = 0; i < received_replies_.size(); ++i) { michael@0: Message* message = received_replies_[i].message; michael@0: if (received_replies_[i].context->TryToUnblockListener(message)) { michael@0: delete message; michael@0: received_replies_.erase(received_replies_.begin() + i); michael@0: return; michael@0: } michael@0: } michael@0: } michael@0: michael@0: private: michael@0: // See the comment in SyncChannel::SyncChannel for why this event is created michael@0: // as manual reset. michael@0: ReceivedSyncMsgQueue() : michael@0: dispatch_event_(true, false), michael@0: listener_message_loop_(MessageLoop::current()), michael@0: task_pending_(false), michael@0: listener_count_(0) { michael@0: } michael@0: michael@0: // Holds information about a queued synchronous message or reply. michael@0: struct QueuedMessage { michael@0: QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } michael@0: Message* message; michael@0: scoped_refptr context; michael@0: }; michael@0: michael@0: typedef std::deque SyncMessageQueue; michael@0: SyncMessageQueue message_queue_; michael@0: michael@0: std::vector received_replies_; michael@0: michael@0: // Set when we got a synchronous message that we must respond to as the michael@0: // sender needs its reply before it can reply to our original synchronous michael@0: // message. michael@0: WaitableEvent dispatch_event_; michael@0: MessageLoop* listener_message_loop_; michael@0: Lock message_lock_; michael@0: bool task_pending_; michael@0: int listener_count_; michael@0: }; michael@0: michael@0: base::LazyInstance > michael@0: SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); michael@0: michael@0: SyncChannel::SyncContext::SyncContext( michael@0: Channel::Listener* listener, michael@0: MessageFilter* filter, michael@0: MessageLoop* ipc_thread, michael@0: WaitableEvent* shutdown_event) michael@0: : ChannelProxy::Context(listener, filter, ipc_thread), michael@0: received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), michael@0: shutdown_event_(shutdown_event) { michael@0: } michael@0: michael@0: SyncChannel::SyncContext::~SyncContext() { michael@0: while (!deserializers_.empty()) michael@0: Pop(); michael@0: } michael@0: michael@0: // Adds information about an outgoing sync message to the context so that michael@0: // we know how to deserialize the reply. Returns a handle that's set when michael@0: // the reply has arrived. michael@0: void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { michael@0: // The event is created as manual reset because in between Signal and michael@0: // OnObjectSignalled, another Send can happen which would stop the watcher michael@0: // from being called. The event would get watched later, when the nested michael@0: // Send completes, so the event will need to remain set. michael@0: PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), michael@0: sync_msg->GetReplyDeserializer(), michael@0: new WaitableEvent(true, false)); michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: deserializers_.push_back(pending); michael@0: } michael@0: michael@0: bool SyncChannel::SyncContext::Pop() { michael@0: bool result; michael@0: { michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: PendingSyncMsg msg = deserializers_.back(); michael@0: delete msg.deserializer; michael@0: delete msg.done_event; michael@0: msg.done_event = NULL; michael@0: deserializers_.pop_back(); michael@0: result = msg.send_result; michael@0: } michael@0: michael@0: // We got a reply to a synchronous Send() call that's blocking the listener michael@0: // thread. However, further down the call stack there could be another michael@0: // blocking Send() call, whose reply we received after we made this last michael@0: // Send() call. So check if we have any queued replies available that michael@0: // can now unblock the listener thread. michael@0: ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); michael@0: michael@0: return result; michael@0: } michael@0: michael@0: WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: return deserializers_.back().done_event; michael@0: } michael@0: michael@0: WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { michael@0: return received_sync_msgs_->dispatch_event(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::DispatchMessages() { michael@0: received_sync_msgs_->DispatchMessages(); michael@0: } michael@0: michael@0: bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: if (deserializers_.empty() || michael@0: !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { michael@0: return false; michael@0: } michael@0: michael@0: if (!msg->is_reply_error()) { michael@0: deserializers_.back().send_result = deserializers_.back().deserializer-> michael@0: SerializeOutputParameters(*msg); michael@0: } michael@0: deserializers_.back().done_event->Signal(); michael@0: michael@0: return true; michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::Clear() { michael@0: CancelPendingSends(); michael@0: received_sync_msgs_->RemoveContext(this); michael@0: michael@0: Context::Clear(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { michael@0: // Give the filters a chance at processing this message. michael@0: if (TryFilters(msg)) michael@0: return; michael@0: michael@0: if (TryToUnblockListener(&msg)) michael@0: return; michael@0: michael@0: if (msg.should_unblock()) { michael@0: received_sync_msgs_->QueueMessage(msg, this); michael@0: return; michael@0: } michael@0: michael@0: if (msg.is_reply()) { michael@0: received_sync_msgs_->QueueReply(msg, this); michael@0: return; michael@0: } michael@0: michael@0: return Context::OnMessageReceivedNoFilter(msg); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnChannelError() { michael@0: CancelPendingSends(); michael@0: shutdown_watcher_.StopWatching(); michael@0: Context::OnChannelError(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnChannelOpened() { michael@0: shutdown_watcher_.StartWatching(shutdown_event_, this); michael@0: Context::OnChannelOpened(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnChannelClosed() { michael@0: shutdown_watcher_.StopWatching(); michael@0: Context::OnChannelClosed(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnSendTimeout(int message_id) { michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: PendingSyncMessageQueue::iterator iter; michael@0: for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { michael@0: if (iter->id == message_id) { michael@0: iter->done_event->Signal(); michael@0: break; michael@0: } michael@0: } michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::CancelPendingSends() { michael@0: AutoLock auto_lock(deserializers_lock_); michael@0: PendingSyncMessageQueue::iterator iter; michael@0: for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) michael@0: iter->done_event->Signal(); michael@0: } michael@0: michael@0: void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { michael@0: DCHECK(event == shutdown_event_); michael@0: // Process shut down before we can get a reply to a synchronous message. michael@0: // Cancel pending Send calls, which will end up setting the send done event. michael@0: CancelPendingSends(); michael@0: } michael@0: michael@0: michael@0: SyncChannel::SyncChannel( michael@0: const std::wstring& channel_id, Channel::Mode mode, michael@0: Channel::Listener* listener, MessageFilter* filter, michael@0: MessageLoop* ipc_message_loop, bool create_pipe_now, michael@0: WaitableEvent* shutdown_event) michael@0: : ChannelProxy( michael@0: channel_id, mode, ipc_message_loop, michael@0: new SyncContext(listener, filter, ipc_message_loop, shutdown_event), michael@0: create_pipe_now), michael@0: sync_messages_with_no_timeout_allowed_(true) { michael@0: // Ideally we only want to watch this object when running a nested message michael@0: // loop. However, we don't know when it exits if there's another nested michael@0: // message loop running under it or not, so we wouldn't know whether to michael@0: // stop or keep watching. So we always watch it, and create the event as michael@0: // manual reset since the object watcher might otherwise reset the event michael@0: // when we're doing a WaitMany. michael@0: dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); michael@0: } michael@0: michael@0: SyncChannel::~SyncChannel() { michael@0: } michael@0: michael@0: bool SyncChannel::Send(Message* message) { michael@0: return SendWithTimeout(message, base::kNoTimeout); michael@0: } michael@0: michael@0: bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { michael@0: if (!message->is_sync()) { michael@0: ChannelProxy::Send(message); michael@0: return true; michael@0: } michael@0: michael@0: // *this* might get deleted in WaitForReply. michael@0: scoped_refptr context(sync_context()); michael@0: if (context->shutdown_event()->IsSignaled()) { michael@0: delete message; michael@0: return false; michael@0: } michael@0: michael@0: DCHECK(sync_messages_with_no_timeout_allowed_ || michael@0: timeout_ms != base::kNoTimeout); michael@0: SyncMessage* sync_msg = static_cast(message); michael@0: context->Push(sync_msg); michael@0: int message_id = SyncMessage::GetMessageId(*sync_msg); michael@0: WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); michael@0: michael@0: ChannelProxy::Send(message); michael@0: michael@0: if (timeout_ms != base::kNoTimeout) { michael@0: // We use the sync message id so that when a message times out, we don't michael@0: // confuse it with another send that is either above/below this Send in michael@0: // the call stack. michael@0: context->ipc_message_loop()->PostDelayedTask(FROM_HERE, michael@0: NewRunnableMethod(context.get(), michael@0: &SyncContext::OnSendTimeout, message_id), timeout_ms); michael@0: } michael@0: michael@0: // Wait for reply, or for any other incoming synchronous messages. michael@0: WaitForReply(pump_messages_event); michael@0: michael@0: return context->Pop(); michael@0: } michael@0: michael@0: void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) { michael@0: while (true) { michael@0: WaitableEvent* objects[] = { michael@0: sync_context()->GetDispatchEvent(), michael@0: sync_context()->GetSendDoneEvent(), michael@0: pump_messages_event michael@0: }; michael@0: michael@0: unsigned count = pump_messages_event ? 3: 2; michael@0: unsigned result = WaitableEvent::WaitMany(objects, count); michael@0: if (result == 0 /* dispatch event */) { michael@0: // We're waiting for a reply, but we received a blocking synchronous michael@0: // call. We must process it or otherwise a deadlock might occur. michael@0: sync_context()->GetDispatchEvent()->Reset(); michael@0: sync_context()->DispatchMessages(); michael@0: continue; michael@0: } michael@0: michael@0: if (result == 2 /* pump_messages_event */) michael@0: WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. michael@0: michael@0: break; michael@0: } michael@0: } michael@0: michael@0: void SyncChannel::WaitForReplyWithNestedMessageLoop() { michael@0: WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent(); michael@0: send_done_watcher_.StopWatching(); michael@0: send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); michael@0: bool old_state = MessageLoop::current()->NestableTasksAllowed(); michael@0: MessageLoop::current()->SetNestableTasksAllowed(true); michael@0: MessageLoop::current()->Run(); michael@0: MessageLoop::current()->SetNestableTasksAllowed(old_state); michael@0: if (old_done_event) michael@0: send_done_watcher_.StartWatching(old_done_event, this); michael@0: } michael@0: michael@0: void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { michael@0: WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent(); michael@0: if (event == dispatch_event) { michael@0: // The call to DispatchMessages might delete this object, so reregister michael@0: // the object watcher first. michael@0: dispatch_event->Reset(); michael@0: dispatch_watcher_.StartWatching(dispatch_event, this); michael@0: sync_context()->DispatchMessages(); michael@0: } else { michael@0: // We got the reply, timed out or the process shutdown. michael@0: DCHECK(event == sync_context()->GetSendDoneEvent()); michael@0: MessageLoop::current()->Quit(); michael@0: } michael@0: } michael@0: michael@0: } // namespace IPC