ipc/chromium/src/chrome/common/ipc_sync_channel.cc

Wed, 31 Dec 2014 13:27:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 13:27:57 +0100
branch
TOR_BUG_3246
changeset 6
8bccb770b82d
permissions
-rw-r--r--

Ignore runtime configuration files generated during quality assurance.

michael@0 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
michael@0 2 // Use of this source code is governed by a BSD-style license that can be
michael@0 3 // found in the LICENSE file.
michael@0 4
michael@0 5 #include "chrome/common/ipc_sync_channel.h"
michael@0 6
michael@0 7 #include "base/lazy_instance.h"
michael@0 8 #include "base/logging.h"
michael@0 9 #include "base/thread_local.h"
michael@0 10 #include "base/message_loop.h"
michael@0 11 #include "base/waitable_event.h"
michael@0 12 #include "base/waitable_event_watcher.h"
michael@0 13 #include "chrome/common/ipc_sync_message.h"
michael@0 14
michael@0 15 using base::TimeDelta;
michael@0 16 using base::TimeTicks;
michael@0 17 using base::WaitableEvent;
michael@0 18
michael@0 19 namespace IPC {
michael@0 20 // When we're blocked in a Send(), we need to process incoming synchronous
michael@0 21 // messages right away because it could be blocking our reply (either
michael@0 22 // directly from the same object we're calling, or indirectly through one or
michael@0 23 // more other channels). That means that in SyncContext's OnMessageReceived,
michael@0 24 // we need to process sync message right away if we're blocked. However a
michael@0 25 // simple check isn't sufficient, because the listener thread can be in the
michael@0 26 // process of calling Send.
michael@0 27 // To work around this, when SyncChannel filters a sync message, it sets
michael@0 28 // an event that the listener thread waits on during its Send() call. This
michael@0 29 // allows us to dispatch incoming sync messages when blocked. The race
michael@0 30 // condition is handled because if Send is in the process of being called, it
michael@0 31 // will check the event. In case the listener thread isn't sending a message,
michael@0 32 // we queue a task on the listener thread to dispatch the received messages.
michael@0 33 // The messages are stored in this queue object that's shared among all
michael@0 34 // SyncChannel objects on the same thread (since one object can receive a
michael@0 35 // sync message while another one is blocked).
michael@0 36
michael@0 37 class SyncChannel::ReceivedSyncMsgQueue :
michael@0 38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
michael@0 39 public:
michael@0 40 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
michael@0 41 // if necessary. Call RemoveContext on the same thread when done.
michael@0 42 static ReceivedSyncMsgQueue* AddContext() {
michael@0 43 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
michael@0 44 // SyncChannel objects can block the same thread).
michael@0 45 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
michael@0 46 if (!rv) {
michael@0 47 rv = new ReceivedSyncMsgQueue();
michael@0 48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
michael@0 49 }
michael@0 50 rv->listener_count_++;
michael@0 51 return rv;
michael@0 52 }
michael@0 53
michael@0 54 ~ReceivedSyncMsgQueue() {
michael@0 55 }
michael@0 56
michael@0 57 // Called on IPC thread when a synchronous message or reply arrives.
michael@0 58 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
michael@0 59 bool was_task_pending;
michael@0 60 {
michael@0 61 AutoLock auto_lock(message_lock_);
michael@0 62
michael@0 63 was_task_pending = task_pending_;
michael@0 64 task_pending_ = true;
michael@0 65
michael@0 66 // We set the event in case the listener thread is blocked (or is about
michael@0 67 // to). In case it's not, the PostTask dispatches the messages.
michael@0 68 message_queue_.push_back(QueuedMessage(new Message(msg), context));
michael@0 69 }
michael@0 70
michael@0 71 dispatch_event_.Signal();
michael@0 72 if (!was_task_pending) {
michael@0 73 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
michael@0 74 this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
michael@0 75 }
michael@0 76 }
michael@0 77
michael@0 78 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
michael@0 79 received_replies_.push_back(QueuedMessage(new Message(msg), context));
michael@0 80 }
michael@0 81
michael@0 82 // Called on the listener's thread to process any queues synchronous
michael@0 83 // messages.
michael@0 84 void DispatchMessagesTask() {
michael@0 85 {
michael@0 86 AutoLock auto_lock(message_lock_);
michael@0 87 task_pending_ = false;
michael@0 88 }
michael@0 89 DispatchMessages();
michael@0 90 }
michael@0 91
michael@0 92 void DispatchMessages() {
michael@0 93 while (true) {
michael@0 94 Message* message;
michael@0 95 scoped_refptr<SyncChannel::SyncContext> context;
michael@0 96 {
michael@0 97 AutoLock auto_lock(message_lock_);
michael@0 98 if (message_queue_.empty())
michael@0 99 break;
michael@0 100
michael@0 101 message = message_queue_.front().message;
michael@0 102 context = message_queue_.front().context;
michael@0 103 message_queue_.pop_front();
michael@0 104 }
michael@0 105
michael@0 106 context->OnDispatchMessage(*message);
michael@0 107 delete message;
michael@0 108 }
michael@0 109 }
michael@0 110
michael@0 111 // SyncChannel calls this in its destructor.
michael@0 112 void RemoveContext(SyncContext* context) {
michael@0 113 AutoLock auto_lock(message_lock_);
michael@0 114
michael@0 115 SyncMessageQueue::iterator iter = message_queue_.begin();
michael@0 116 while (iter != message_queue_.end()) {
michael@0 117 if (iter->context == context) {
michael@0 118 delete iter->message;
michael@0 119 iter = message_queue_.erase(iter);
michael@0 120 } else {
michael@0 121 iter++;
michael@0 122 }
michael@0 123 }
michael@0 124
michael@0 125 if (--listener_count_ == 0) {
michael@0 126 DCHECK(lazy_tls_ptr_.Pointer()->Get());
michael@0 127 lazy_tls_ptr_.Pointer()->Set(NULL);
michael@0 128 }
michael@0 129 }
michael@0 130
michael@0 131 WaitableEvent* dispatch_event() { return &dispatch_event_; }
michael@0 132 MessageLoop* listener_message_loop() { return listener_message_loop_; }
michael@0 133
michael@0 134 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
michael@0 135 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
michael@0 136 lazy_tls_ptr_;
michael@0 137
michael@0 138 // Called on the ipc thread to check if we can unblock any current Send()
michael@0 139 // calls based on a queued reply.
michael@0 140 void DispatchReplies() {
michael@0 141 for (size_t i = 0; i < received_replies_.size(); ++i) {
michael@0 142 Message* message = received_replies_[i].message;
michael@0 143 if (received_replies_[i].context->TryToUnblockListener(message)) {
michael@0 144 delete message;
michael@0 145 received_replies_.erase(received_replies_.begin() + i);
michael@0 146 return;
michael@0 147 }
michael@0 148 }
michael@0 149 }
michael@0 150
michael@0 151 private:
michael@0 152 // See the comment in SyncChannel::SyncChannel for why this event is created
michael@0 153 // as manual reset.
michael@0 154 ReceivedSyncMsgQueue() :
michael@0 155 dispatch_event_(true, false),
michael@0 156 listener_message_loop_(MessageLoop::current()),
michael@0 157 task_pending_(false),
michael@0 158 listener_count_(0) {
michael@0 159 }
michael@0 160
michael@0 161 // Holds information about a queued synchronous message or reply.
michael@0 162 struct QueuedMessage {
michael@0 163 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
michael@0 164 Message* message;
michael@0 165 scoped_refptr<SyncChannel::SyncContext> context;
michael@0 166 };
michael@0 167
michael@0 168 typedef std::deque<QueuedMessage> SyncMessageQueue;
michael@0 169 SyncMessageQueue message_queue_;
michael@0 170
michael@0 171 std::vector<QueuedMessage> received_replies_;
michael@0 172
michael@0 173 // Set when we got a synchronous message that we must respond to as the
michael@0 174 // sender needs its reply before it can reply to our original synchronous
michael@0 175 // message.
michael@0 176 WaitableEvent dispatch_event_;
michael@0 177 MessageLoop* listener_message_loop_;
michael@0 178 Lock message_lock_;
michael@0 179 bool task_pending_;
michael@0 180 int listener_count_;
michael@0 181 };
michael@0 182
michael@0 183 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
michael@0 184 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
michael@0 185
michael@0 186 SyncChannel::SyncContext::SyncContext(
michael@0 187 Channel::Listener* listener,
michael@0 188 MessageFilter* filter,
michael@0 189 MessageLoop* ipc_thread,
michael@0 190 WaitableEvent* shutdown_event)
michael@0 191 : ChannelProxy::Context(listener, filter, ipc_thread),
michael@0 192 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
michael@0 193 shutdown_event_(shutdown_event) {
michael@0 194 }
michael@0 195
michael@0 196 SyncChannel::SyncContext::~SyncContext() {
michael@0 197 while (!deserializers_.empty())
michael@0 198 Pop();
michael@0 199 }
michael@0 200
michael@0 201 // Adds information about an outgoing sync message to the context so that
michael@0 202 // we know how to deserialize the reply. Returns a handle that's set when
michael@0 203 // the reply has arrived.
michael@0 204 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
michael@0 205 // The event is created as manual reset because in between Signal and
michael@0 206 // OnObjectSignalled, another Send can happen which would stop the watcher
michael@0 207 // from being called. The event would get watched later, when the nested
michael@0 208 // Send completes, so the event will need to remain set.
michael@0 209 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
michael@0 210 sync_msg->GetReplyDeserializer(),
michael@0 211 new WaitableEvent(true, false));
michael@0 212 AutoLock auto_lock(deserializers_lock_);
michael@0 213 deserializers_.push_back(pending);
michael@0 214 }
michael@0 215
michael@0 216 bool SyncChannel::SyncContext::Pop() {
michael@0 217 bool result;
michael@0 218 {
michael@0 219 AutoLock auto_lock(deserializers_lock_);
michael@0 220 PendingSyncMsg msg = deserializers_.back();
michael@0 221 delete msg.deserializer;
michael@0 222 delete msg.done_event;
michael@0 223 msg.done_event = NULL;
michael@0 224 deserializers_.pop_back();
michael@0 225 result = msg.send_result;
michael@0 226 }
michael@0 227
michael@0 228 // We got a reply to a synchronous Send() call that's blocking the listener
michael@0 229 // thread. However, further down the call stack there could be another
michael@0 230 // blocking Send() call, whose reply we received after we made this last
michael@0 231 // Send() call. So check if we have any queued replies available that
michael@0 232 // can now unblock the listener thread.
michael@0 233 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
michael@0 234 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies));
michael@0 235
michael@0 236 return result;
michael@0 237 }
michael@0 238
michael@0 239 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
michael@0 240 AutoLock auto_lock(deserializers_lock_);
michael@0 241 return deserializers_.back().done_event;
michael@0 242 }
michael@0 243
michael@0 244 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
michael@0 245 return received_sync_msgs_->dispatch_event();
michael@0 246 }
michael@0 247
michael@0 248 void SyncChannel::SyncContext::DispatchMessages() {
michael@0 249 received_sync_msgs_->DispatchMessages();
michael@0 250 }
michael@0 251
michael@0 252 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
michael@0 253 AutoLock auto_lock(deserializers_lock_);
michael@0 254 if (deserializers_.empty() ||
michael@0 255 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
michael@0 256 return false;
michael@0 257 }
michael@0 258
michael@0 259 if (!msg->is_reply_error()) {
michael@0 260 deserializers_.back().send_result = deserializers_.back().deserializer->
michael@0 261 SerializeOutputParameters(*msg);
michael@0 262 }
michael@0 263 deserializers_.back().done_event->Signal();
michael@0 264
michael@0 265 return true;
michael@0 266 }
michael@0 267
michael@0 268 void SyncChannel::SyncContext::Clear() {
michael@0 269 CancelPendingSends();
michael@0 270 received_sync_msgs_->RemoveContext(this);
michael@0 271
michael@0 272 Context::Clear();
michael@0 273 }
michael@0 274
michael@0 275 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
michael@0 276 // Give the filters a chance at processing this message.
michael@0 277 if (TryFilters(msg))
michael@0 278 return;
michael@0 279
michael@0 280 if (TryToUnblockListener(&msg))
michael@0 281 return;
michael@0 282
michael@0 283 if (msg.should_unblock()) {
michael@0 284 received_sync_msgs_->QueueMessage(msg, this);
michael@0 285 return;
michael@0 286 }
michael@0 287
michael@0 288 if (msg.is_reply()) {
michael@0 289 received_sync_msgs_->QueueReply(msg, this);
michael@0 290 return;
michael@0 291 }
michael@0 292
michael@0 293 return Context::OnMessageReceivedNoFilter(msg);
michael@0 294 }
michael@0 295
michael@0 296 void SyncChannel::SyncContext::OnChannelError() {
michael@0 297 CancelPendingSends();
michael@0 298 shutdown_watcher_.StopWatching();
michael@0 299 Context::OnChannelError();
michael@0 300 }
michael@0 301
michael@0 302 void SyncChannel::SyncContext::OnChannelOpened() {
michael@0 303 shutdown_watcher_.StartWatching(shutdown_event_, this);
michael@0 304 Context::OnChannelOpened();
michael@0 305 }
michael@0 306
michael@0 307 void SyncChannel::SyncContext::OnChannelClosed() {
michael@0 308 shutdown_watcher_.StopWatching();
michael@0 309 Context::OnChannelClosed();
michael@0 310 }
michael@0 311
michael@0 312 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
michael@0 313 AutoLock auto_lock(deserializers_lock_);
michael@0 314 PendingSyncMessageQueue::iterator iter;
michael@0 315 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
michael@0 316 if (iter->id == message_id) {
michael@0 317 iter->done_event->Signal();
michael@0 318 break;
michael@0 319 }
michael@0 320 }
michael@0 321 }
michael@0 322
michael@0 323 void SyncChannel::SyncContext::CancelPendingSends() {
michael@0 324 AutoLock auto_lock(deserializers_lock_);
michael@0 325 PendingSyncMessageQueue::iterator iter;
michael@0 326 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
michael@0 327 iter->done_event->Signal();
michael@0 328 }
michael@0 329
michael@0 330 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
michael@0 331 DCHECK(event == shutdown_event_);
michael@0 332 // Process shut down before we can get a reply to a synchronous message.
michael@0 333 // Cancel pending Send calls, which will end up setting the send done event.
michael@0 334 CancelPendingSends();
michael@0 335 }
michael@0 336
michael@0 337
michael@0 338 SyncChannel::SyncChannel(
michael@0 339 const std::wstring& channel_id, Channel::Mode mode,
michael@0 340 Channel::Listener* listener, MessageFilter* filter,
michael@0 341 MessageLoop* ipc_message_loop, bool create_pipe_now,
michael@0 342 WaitableEvent* shutdown_event)
michael@0 343 : ChannelProxy(
michael@0 344 channel_id, mode, ipc_message_loop,
michael@0 345 new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
michael@0 346 create_pipe_now),
michael@0 347 sync_messages_with_no_timeout_allowed_(true) {
michael@0 348 // Ideally we only want to watch this object when running a nested message
michael@0 349 // loop. However, we don't know when it exits if there's another nested
michael@0 350 // message loop running under it or not, so we wouldn't know whether to
michael@0 351 // stop or keep watching. So we always watch it, and create the event as
michael@0 352 // manual reset since the object watcher might otherwise reset the event
michael@0 353 // when we're doing a WaitMany.
michael@0 354 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
michael@0 355 }
michael@0 356
michael@0 357 SyncChannel::~SyncChannel() {
michael@0 358 }
michael@0 359
michael@0 360 bool SyncChannel::Send(Message* message) {
michael@0 361 return SendWithTimeout(message, base::kNoTimeout);
michael@0 362 }
michael@0 363
michael@0 364 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
michael@0 365 if (!message->is_sync()) {
michael@0 366 ChannelProxy::Send(message);
michael@0 367 return true;
michael@0 368 }
michael@0 369
michael@0 370 // *this* might get deleted in WaitForReply.
michael@0 371 scoped_refptr<SyncContext> context(sync_context());
michael@0 372 if (context->shutdown_event()->IsSignaled()) {
michael@0 373 delete message;
michael@0 374 return false;
michael@0 375 }
michael@0 376
michael@0 377 DCHECK(sync_messages_with_no_timeout_allowed_ ||
michael@0 378 timeout_ms != base::kNoTimeout);
michael@0 379 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
michael@0 380 context->Push(sync_msg);
michael@0 381 int message_id = SyncMessage::GetMessageId(*sync_msg);
michael@0 382 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
michael@0 383
michael@0 384 ChannelProxy::Send(message);
michael@0 385
michael@0 386 if (timeout_ms != base::kNoTimeout) {
michael@0 387 // We use the sync message id so that when a message times out, we don't
michael@0 388 // confuse it with another send that is either above/below this Send in
michael@0 389 // the call stack.
michael@0 390 context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
michael@0 391 NewRunnableMethod(context.get(),
michael@0 392 &SyncContext::OnSendTimeout, message_id), timeout_ms);
michael@0 393 }
michael@0 394
michael@0 395 // Wait for reply, or for any other incoming synchronous messages.
michael@0 396 WaitForReply(pump_messages_event);
michael@0 397
michael@0 398 return context->Pop();
michael@0 399 }
michael@0 400
michael@0 401 void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) {
michael@0 402 while (true) {
michael@0 403 WaitableEvent* objects[] = {
michael@0 404 sync_context()->GetDispatchEvent(),
michael@0 405 sync_context()->GetSendDoneEvent(),
michael@0 406 pump_messages_event
michael@0 407 };
michael@0 408
michael@0 409 unsigned count = pump_messages_event ? 3: 2;
michael@0 410 unsigned result = WaitableEvent::WaitMany(objects, count);
michael@0 411 if (result == 0 /* dispatch event */) {
michael@0 412 // We're waiting for a reply, but we received a blocking synchronous
michael@0 413 // call. We must process it or otherwise a deadlock might occur.
michael@0 414 sync_context()->GetDispatchEvent()->Reset();
michael@0 415 sync_context()->DispatchMessages();
michael@0 416 continue;
michael@0 417 }
michael@0 418
michael@0 419 if (result == 2 /* pump_messages_event */)
michael@0 420 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
michael@0 421
michael@0 422 break;
michael@0 423 }
michael@0 424 }
michael@0 425
michael@0 426 void SyncChannel::WaitForReplyWithNestedMessageLoop() {
michael@0 427 WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent();
michael@0 428 send_done_watcher_.StopWatching();
michael@0 429 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
michael@0 430 bool old_state = MessageLoop::current()->NestableTasksAllowed();
michael@0 431 MessageLoop::current()->SetNestableTasksAllowed(true);
michael@0 432 MessageLoop::current()->Run();
michael@0 433 MessageLoop::current()->SetNestableTasksAllowed(old_state);
michael@0 434 if (old_done_event)
michael@0 435 send_done_watcher_.StartWatching(old_done_event, this);
michael@0 436 }
michael@0 437
michael@0 438 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
michael@0 439 WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent();
michael@0 440 if (event == dispatch_event) {
michael@0 441 // The call to DispatchMessages might delete this object, so reregister
michael@0 442 // the object watcher first.
michael@0 443 dispatch_event->Reset();
michael@0 444 dispatch_watcher_.StartWatching(dispatch_event, this);
michael@0 445 sync_context()->DispatchMessages();
michael@0 446 } else {
michael@0 447 // We got the reply, timed out or the process shutdown.
michael@0 448 DCHECK(event == sync_context()->GetSendDoneEvent());
michael@0 449 MessageLoop::current()->Quit();
michael@0 450 }
michael@0 451 }
michael@0 452
michael@0 453 } // namespace IPC

mercurial