ipc/chromium/src/chrome/common/ipc_sync_channel.cc

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
     2 // Use of this source code is governed by a BSD-style license that can be
     3 // found in the LICENSE file.
     5 #include "chrome/common/ipc_sync_channel.h"
     7 #include "base/lazy_instance.h"
     8 #include "base/logging.h"
     9 #include "base/thread_local.h"
    10 #include "base/message_loop.h"
    11 #include "base/waitable_event.h"
    12 #include "base/waitable_event_watcher.h"
    13 #include "chrome/common/ipc_sync_message.h"
    15 using base::TimeDelta;
    16 using base::TimeTicks;
    17 using base::WaitableEvent;
    19 namespace IPC {
    20 // When we're blocked in a Send(), we need to process incoming synchronous
    21 // messages right away because it could be blocking our reply (either
    22 // directly from the same object we're calling, or indirectly through one or
    23 // more other channels).  That means that in SyncContext's OnMessageReceived,
    24 // we need to process sync message right away if we're blocked.  However a
    25 // simple check isn't sufficient, because the listener thread can be in the
    26 // process of calling Send.
    27 // To work around this, when SyncChannel filters a sync message, it sets
    28 // an event that the listener thread waits on during its Send() call.  This
    29 // allows us to dispatch incoming sync messages when blocked.  The race
    30 // condition is handled because if Send is in the process of being called, it
    31 // will check the event.  In case the listener thread isn't sending a message,
    32 // we queue a task on the listener thread to dispatch the received messages.
    33 // The messages are stored in this queue object that's shared among all
    34 // SyncChannel objects on the same thread (since one object can receive a
    35 // sync message while another one is blocked).
    37 class SyncChannel::ReceivedSyncMsgQueue :
    38     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
    39  public:
    40   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
    41   // if necessary.  Call RemoveContext on the same thread when done.
    42   static ReceivedSyncMsgQueue* AddContext() {
    43     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
    44     // SyncChannel objects can block the same thread).
    45     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
    46     if (!rv) {
    47       rv = new ReceivedSyncMsgQueue();
    48       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
    49     }
    50     rv->listener_count_++;
    51     return rv;
    52   }
    54   ~ReceivedSyncMsgQueue() {
    55   }
    57   // Called on IPC thread when a synchronous message or reply arrives.
    58   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
    59     bool was_task_pending;
    60     {
    61       AutoLock auto_lock(message_lock_);
    63       was_task_pending = task_pending_;
    64       task_pending_ = true;
    66       // We set the event in case the listener thread is blocked (or is about
    67       // to). In case it's not, the PostTask dispatches the messages.
    68       message_queue_.push_back(QueuedMessage(new Message(msg), context));
    69     }
    71     dispatch_event_.Signal();
    72     if (!was_task_pending) {
    73       listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
    74           this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
    75     }
    76   }
    78   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
    79     received_replies_.push_back(QueuedMessage(new Message(msg), context));
    80   }
    82   // Called on the listener's thread to process any queues synchronous
    83   // messages.
    84   void DispatchMessagesTask() {
    85     {
    86       AutoLock auto_lock(message_lock_);
    87       task_pending_ = false;
    88     }
    89     DispatchMessages();
    90   }
    92   void DispatchMessages() {
    93     while (true) {
    94       Message* message;
    95       scoped_refptr<SyncChannel::SyncContext> context;
    96       {
    97         AutoLock auto_lock(message_lock_);
    98         if (message_queue_.empty())
    99           break;
   101         message = message_queue_.front().message;
   102         context = message_queue_.front().context;
   103         message_queue_.pop_front();
   104       }
   106       context->OnDispatchMessage(*message);
   107       delete message;
   108     }
   109   }
   111   // SyncChannel calls this in its destructor.
   112   void RemoveContext(SyncContext* context) {
   113     AutoLock auto_lock(message_lock_);
   115     SyncMessageQueue::iterator iter = message_queue_.begin();
   116     while (iter != message_queue_.end()) {
   117       if (iter->context == context) {
   118         delete iter->message;
   119         iter = message_queue_.erase(iter);
   120       } else {
   121         iter++;
   122       }
   123     }
   125     if (--listener_count_ == 0) {
   126       DCHECK(lazy_tls_ptr_.Pointer()->Get());
   127       lazy_tls_ptr_.Pointer()->Set(NULL);
   128     }
   129   }
   131   WaitableEvent* dispatch_event() { return &dispatch_event_; }
   132   MessageLoop* listener_message_loop() { return listener_message_loop_; }
   134   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
   135   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
   136       lazy_tls_ptr_;
   138   // Called on the ipc thread to check if we can unblock any current Send()
   139   // calls based on a queued reply.
   140   void DispatchReplies() {
   141     for (size_t i = 0; i < received_replies_.size(); ++i) {
   142       Message* message = received_replies_[i].message;
   143       if (received_replies_[i].context->TryToUnblockListener(message)) {
   144         delete message;
   145         received_replies_.erase(received_replies_.begin() + i);
   146         return;
   147       }
   148     }
   149   }
   151  private:
   152   // See the comment in SyncChannel::SyncChannel for why this event is created
   153   // as manual reset.
   154   ReceivedSyncMsgQueue() :
   155       dispatch_event_(true, false),
   156       listener_message_loop_(MessageLoop::current()),
   157       task_pending_(false),
   158       listener_count_(0) {
   159   }
   161   // Holds information about a queued synchronous message or reply.
   162   struct QueuedMessage {
   163     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
   164     Message* message;
   165     scoped_refptr<SyncChannel::SyncContext> context;
   166   };
   168   typedef std::deque<QueuedMessage> SyncMessageQueue;
   169   SyncMessageQueue message_queue_;
   171   std::vector<QueuedMessage> received_replies_;
   173   // Set when we got a synchronous message that we must respond to as the
   174   // sender needs its reply before it can reply to our original synchronous
   175   // message.
   176   WaitableEvent dispatch_event_;
   177   MessageLoop* listener_message_loop_;
   178   Lock message_lock_;
   179   bool task_pending_;
   180   int listener_count_;
   181 };
   183 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
   184     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
   186 SyncChannel::SyncContext::SyncContext(
   187     Channel::Listener* listener,
   188     MessageFilter* filter,
   189     MessageLoop* ipc_thread,
   190     WaitableEvent* shutdown_event)
   191     : ChannelProxy::Context(listener, filter, ipc_thread),
   192       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
   193       shutdown_event_(shutdown_event) {
   194 }
   196 SyncChannel::SyncContext::~SyncContext() {
   197   while (!deserializers_.empty())
   198     Pop();
   199 }
   201 // Adds information about an outgoing sync message to the context so that
   202 // we know how to deserialize the reply.  Returns a handle that's set when
   203 // the reply has arrived.
   204 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
   205   // The event is created as manual reset because in between Signal and
   206   // OnObjectSignalled, another Send can happen which would stop the watcher
   207   // from being called.  The event would get watched later, when the nested
   208   // Send completes, so the event will need to remain set.
   209   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
   210                          sync_msg->GetReplyDeserializer(),
   211                          new WaitableEvent(true, false));
   212   AutoLock auto_lock(deserializers_lock_);
   213   deserializers_.push_back(pending);
   214 }
   216 bool SyncChannel::SyncContext::Pop() {
   217   bool result;
   218   {
   219     AutoLock auto_lock(deserializers_lock_);
   220     PendingSyncMsg msg = deserializers_.back();
   221     delete msg.deserializer;
   222     delete msg.done_event;
   223     msg.done_event = NULL;
   224     deserializers_.pop_back();
   225     result = msg.send_result;
   226   }
   228   // We got a reply to a synchronous Send() call that's blocking the listener
   229   // thread.  However, further down the call stack there could be another
   230   // blocking Send() call, whose reply we received after we made this last
   231   // Send() call.  So check if we have any queued replies available that
   232   // can now unblock the listener thread.
   233   ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
   234       received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies));
   236   return result;
   237 }
   239 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
   240   AutoLock auto_lock(deserializers_lock_);
   241   return deserializers_.back().done_event;
   242 }
   244 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
   245   return received_sync_msgs_->dispatch_event();
   246 }
   248 void SyncChannel::SyncContext::DispatchMessages() {
   249   received_sync_msgs_->DispatchMessages();
   250 }
   252 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
   253   AutoLock auto_lock(deserializers_lock_);
   254   if (deserializers_.empty() ||
   255       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
   256     return false;
   257   }
   259   if (!msg->is_reply_error()) {
   260     deserializers_.back().send_result = deserializers_.back().deserializer->
   261         SerializeOutputParameters(*msg);
   262   }
   263   deserializers_.back().done_event->Signal();
   265   return true;
   266 }
   268 void SyncChannel::SyncContext::Clear() {
   269   CancelPendingSends();
   270   received_sync_msgs_->RemoveContext(this);
   272   Context::Clear();
   273 }
   275 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
   276   // Give the filters a chance at processing this message.
   277   if (TryFilters(msg))
   278     return;
   280   if (TryToUnblockListener(&msg))
   281     return;
   283   if (msg.should_unblock()) {
   284     received_sync_msgs_->QueueMessage(msg, this);
   285     return;
   286   }
   288   if (msg.is_reply()) {
   289     received_sync_msgs_->QueueReply(msg, this);
   290     return;
   291   }
   293   return Context::OnMessageReceivedNoFilter(msg);
   294 }
   296 void SyncChannel::SyncContext::OnChannelError() {
   297   CancelPendingSends();
   298   shutdown_watcher_.StopWatching();
   299   Context::OnChannelError();
   300 }
   302 void SyncChannel::SyncContext::OnChannelOpened() {
   303   shutdown_watcher_.StartWatching(shutdown_event_, this);
   304   Context::OnChannelOpened();
   305 }
   307 void SyncChannel::SyncContext::OnChannelClosed() {
   308   shutdown_watcher_.StopWatching();
   309   Context::OnChannelClosed();
   310 }
   312 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
   313   AutoLock auto_lock(deserializers_lock_);
   314   PendingSyncMessageQueue::iterator iter;
   315   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
   316     if (iter->id == message_id) {
   317       iter->done_event->Signal();
   318       break;
   319     }
   320   }
   321 }
   323 void SyncChannel::SyncContext::CancelPendingSends() {
   324   AutoLock auto_lock(deserializers_lock_);
   325   PendingSyncMessageQueue::iterator iter;
   326   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
   327     iter->done_event->Signal();
   328 }
   330 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
   331   DCHECK(event == shutdown_event_);
   332   // Process shut down before we can get a reply to a synchronous message.
   333   // Cancel pending Send calls, which will end up setting the send done event.
   334   CancelPendingSends();
   335 }
   338 SyncChannel::SyncChannel(
   339     const std::wstring& channel_id, Channel::Mode mode,
   340     Channel::Listener* listener, MessageFilter* filter,
   341     MessageLoop* ipc_message_loop, bool create_pipe_now,
   342     WaitableEvent* shutdown_event)
   343     : ChannelProxy(
   344           channel_id, mode, ipc_message_loop,
   345           new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
   346           create_pipe_now),
   347       sync_messages_with_no_timeout_allowed_(true) {
   348   // Ideally we only want to watch this object when running a nested message
   349   // loop.  However, we don't know when it exits if there's another nested
   350   // message loop running under it or not, so we wouldn't know whether to
   351   // stop or keep watching.  So we always watch it, and create the event as
   352   // manual reset since the object watcher might otherwise reset the event
   353   // when we're doing a WaitMany.
   354   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
   355 }
   357 SyncChannel::~SyncChannel() {
   358 }
   360 bool SyncChannel::Send(Message* message) {
   361   return SendWithTimeout(message, base::kNoTimeout);
   362 }
   364 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
   365   if (!message->is_sync()) {
   366     ChannelProxy::Send(message);
   367     return true;
   368   }
   370   // *this* might get deleted in WaitForReply.
   371   scoped_refptr<SyncContext> context(sync_context());
   372   if (context->shutdown_event()->IsSignaled()) {
   373     delete message;
   374     return false;
   375   }
   377   DCHECK(sync_messages_with_no_timeout_allowed_ ||
   378          timeout_ms != base::kNoTimeout);
   379   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
   380   context->Push(sync_msg);
   381   int message_id = SyncMessage::GetMessageId(*sync_msg);
   382   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
   384   ChannelProxy::Send(message);
   386   if (timeout_ms != base::kNoTimeout) {
   387     // We use the sync message id so that when a message times out, we don't
   388     // confuse it with another send that is either above/below this Send in
   389     // the call stack.
   390     context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
   391         NewRunnableMethod(context.get(),
   392             &SyncContext::OnSendTimeout, message_id), timeout_ms);
   393   }
   395   // Wait for reply, or for any other incoming synchronous messages.
   396   WaitForReply(pump_messages_event);
   398   return context->Pop();
   399 }
   401 void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) {
   402   while (true) {
   403     WaitableEvent* objects[] = {
   404       sync_context()->GetDispatchEvent(),
   405       sync_context()->GetSendDoneEvent(),
   406       pump_messages_event
   407     };
   409     unsigned count = pump_messages_event ? 3: 2;
   410     unsigned result = WaitableEvent::WaitMany(objects, count);
   411     if (result == 0 /* dispatch event */) {
   412       // We're waiting for a reply, but we received a blocking synchronous
   413       // call.  We must process it or otherwise a deadlock might occur.
   414       sync_context()->GetDispatchEvent()->Reset();
   415       sync_context()->DispatchMessages();
   416       continue;
   417     }
   419     if (result == 2 /* pump_messages_event */)
   420       WaitForReplyWithNestedMessageLoop();  // Start a nested message loop.
   422     break;
   423   }
   424 }
   426 void SyncChannel::WaitForReplyWithNestedMessageLoop() {
   427   WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent();
   428   send_done_watcher_.StopWatching();
   429   send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
   430   bool old_state = MessageLoop::current()->NestableTasksAllowed();
   431   MessageLoop::current()->SetNestableTasksAllowed(true);
   432   MessageLoop::current()->Run();
   433   MessageLoop::current()->SetNestableTasksAllowed(old_state);
   434   if (old_done_event)
   435     send_done_watcher_.StartWatching(old_done_event, this);
   436 }
   438 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
   439   WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent();
   440   if (event == dispatch_event) {
   441     // The call to DispatchMessages might delete this object, so reregister
   442     // the object watcher first.
   443     dispatch_event->Reset();
   444     dispatch_watcher_.StartWatching(dispatch_event, this);
   445     sync_context()->DispatchMessages();
   446   } else {
   447     // We got the reply, timed out or the process shutdown.
   448     DCHECK(event == sync_context()->GetSendDoneEvent());
   449     MessageLoop::current()->Quit();
   450   }
   451 }
   453 }  // namespace IPC

mercurial