ipc/glue/MessageChannel.cpp

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
michael@0 2 * vim: sw=4 ts=4 et :
michael@0 3 */
michael@0 4 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 5 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 7
michael@0 8 #include "mozilla/ipc/MessageChannel.h"
michael@0 9 #include "mozilla/ipc/ProtocolUtils.h"
michael@0 10
michael@0 11 #include "mozilla/Assertions.h"
michael@0 12 #include "mozilla/DebugOnly.h"
michael@0 13 #include "mozilla/Move.h"
michael@0 14 #include "nsDebug.h"
michael@0 15 #include "nsISupportsImpl.h"
michael@0 16
michael@0 17 // Undo the damage done by mozzconf.h
michael@0 18 #undef compress
michael@0 19
michael@0 20 using namespace mozilla;
michael@0 21 using namespace std;
michael@0 22
michael@0 23 using mozilla::MonitorAutoLock;
michael@0 24 using mozilla::MonitorAutoUnlock;
michael@0 25
michael@0 26 template<>
michael@0 27 struct RunnableMethodTraits<mozilla::ipc::MessageChannel>
michael@0 28 {
michael@0 29 static void RetainCallee(mozilla::ipc::MessageChannel* obj) { }
michael@0 30 static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { }
michael@0 31 };
michael@0 32
michael@0 33 #define IPC_ASSERT(_cond, ...) \
michael@0 34 do { \
michael@0 35 if (!(_cond)) \
michael@0 36 DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
michael@0 37 } while (0)
michael@0 38
michael@0 39 namespace mozilla {
michael@0 40 namespace ipc {
michael@0 41
michael@0 42 const int32_t MessageChannel::kNoTimeout = INT32_MIN;
michael@0 43
michael@0 44 // static
michael@0 45 bool MessageChannel::sIsPumpingMessages = false;
michael@0 46
michael@0 47 enum Direction
michael@0 48 {
michael@0 49 IN_MESSAGE,
michael@0 50 OUT_MESSAGE
michael@0 51 };
michael@0 52
michael@0 53
michael@0 54 class MessageChannel::InterruptFrame
michael@0 55 {
michael@0 56 private:
michael@0 57 enum Semantics
michael@0 58 {
michael@0 59 INTR_SEMS,
michael@0 60 SYNC_SEMS,
michael@0 61 ASYNC_SEMS
michael@0 62 };
michael@0 63
michael@0 64 public:
michael@0 65 InterruptFrame(Direction direction, const Message* msg)
michael@0 66 : mMessageName(strdup(msg->name())),
michael@0 67 mMessageRoutingId(msg->routing_id()),
michael@0 68 mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
michael@0 69 msg->is_sync() ? SYNC_SEMS :
michael@0 70 ASYNC_SEMS),
michael@0 71 mDirection(direction),
michael@0 72 mMoved(false)
michael@0 73 {
michael@0 74 MOZ_ASSERT(mMessageName);
michael@0 75 }
michael@0 76
michael@0 77 InterruptFrame(InterruptFrame&& aOther)
michael@0 78 {
michael@0 79 MOZ_ASSERT(aOther.mMessageName);
michael@0 80 mMessageName = aOther.mMessageName;
michael@0 81 aOther.mMessageName = nullptr;
michael@0 82 aOther.mMoved = true;
michael@0 83
michael@0 84 mMessageRoutingId = aOther.mMessageRoutingId;
michael@0 85 mMesageSemantics = aOther.mMesageSemantics;
michael@0 86 mDirection = aOther.mDirection;
michael@0 87 }
michael@0 88
michael@0 89 ~InterruptFrame()
michael@0 90 {
michael@0 91 MOZ_ASSERT_IF(!mMessageName, mMoved);
michael@0 92
michael@0 93 if (mMessageName)
michael@0 94 free(const_cast<char*>(mMessageName));
michael@0 95 }
michael@0 96
michael@0 97 InterruptFrame& operator=(InterruptFrame&& aOther)
michael@0 98 {
michael@0 99 MOZ_ASSERT(&aOther != this);
michael@0 100 this->~InterruptFrame();
michael@0 101 new (this) InterruptFrame(mozilla::Move(aOther));
michael@0 102 return *this;
michael@0 103 }
michael@0 104
michael@0 105 bool IsInterruptIncall() const
michael@0 106 {
michael@0 107 return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
michael@0 108 }
michael@0 109
michael@0 110 bool IsInterruptOutcall() const
michael@0 111 {
michael@0 112 return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
michael@0 113 }
michael@0 114
michael@0 115 void Describe(int32_t* id, const char** dir, const char** sems,
michael@0 116 const char** name) const
michael@0 117 {
michael@0 118 *id = mMessageRoutingId;
michael@0 119 *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
michael@0 120 *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
michael@0 121 (SYNC_SEMS == mMesageSemantics) ? "sync" :
michael@0 122 "async";
michael@0 123 *name = mMessageName;
michael@0 124 }
michael@0 125
michael@0 126 private:
michael@0 127 const char* mMessageName;
michael@0 128 int32_t mMessageRoutingId;
michael@0 129 Semantics mMesageSemantics;
michael@0 130 Direction mDirection;
michael@0 131 DebugOnly<bool> mMoved;
michael@0 132
michael@0 133 // Disable harmful methods.
michael@0 134 InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE;
michael@0 135 InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE;
michael@0 136 };
michael@0 137
michael@0 138 class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
michael@0 139 {
michael@0 140 public:
michael@0 141 CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
michael@0 142 : mThat(that)
michael@0 143 {
michael@0 144 mThat.AssertWorkerThread();
michael@0 145
michael@0 146 if (mThat.mCxxStackFrames.empty())
michael@0 147 mThat.EnteredCxxStack();
michael@0 148
michael@0 149 mThat.mCxxStackFrames.append(InterruptFrame(direction, msg));
michael@0 150
michael@0 151 const InterruptFrame& frame = mThat.mCxxStackFrames.back();
michael@0 152
michael@0 153 if (frame.IsInterruptIncall())
michael@0 154 mThat.EnteredCall();
michael@0 155
michael@0 156 mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
michael@0 157 }
michael@0 158
michael@0 159 ~CxxStackFrame() {
michael@0 160 mThat.AssertWorkerThread();
michael@0 161
michael@0 162 MOZ_ASSERT(!mThat.mCxxStackFrames.empty());
michael@0 163
michael@0 164 bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall();
michael@0 165 mThat.mCxxStackFrames.shrinkBy(1);
michael@0 166
michael@0 167 bool exitingStack = mThat.mCxxStackFrames.empty();
michael@0 168
michael@0 169 // mListener could have gone away if Close() was called while
michael@0 170 // MessageChannel code was still on the stack
michael@0 171 if (!mThat.mListener)
michael@0 172 return;
michael@0 173
michael@0 174 if (exitingCall)
michael@0 175 mThat.ExitedCall();
michael@0 176
michael@0 177 if (exitingStack)
michael@0 178 mThat.ExitedCxxStack();
michael@0 179 }
michael@0 180 private:
michael@0 181 MessageChannel& mThat;
michael@0 182
michael@0 183 // Disable harmful methods.
michael@0 184 CxxStackFrame() MOZ_DELETE;
michael@0 185 CxxStackFrame(const CxxStackFrame&) MOZ_DELETE;
michael@0 186 CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE;
michael@0 187 };
michael@0 188
michael@0 189 MessageChannel::MessageChannel(MessageListener *aListener)
michael@0 190 : mListener(aListener->asWeakPtr()),
michael@0 191 mChannelState(ChannelClosed),
michael@0 192 mSide(UnknownSide),
michael@0 193 mLink(nullptr),
michael@0 194 mWorkerLoop(nullptr),
michael@0 195 mChannelErrorTask(nullptr),
michael@0 196 mWorkerLoopID(-1),
michael@0 197 mTimeoutMs(kNoTimeout),
michael@0 198 mInTimeoutSecondHalf(false),
michael@0 199 mNextSeqno(0),
michael@0 200 mPendingSyncReplies(0),
michael@0 201 mPendingUrgentReplies(0),
michael@0 202 mPendingRPCReplies(0),
michael@0 203 mCurrentRPCTransaction(0),
michael@0 204 mDispatchingSyncMessage(false),
michael@0 205 mDispatchingUrgentMessageCount(0),
michael@0 206 mRemoteStackDepthGuess(false),
michael@0 207 mSawInterruptOutMsg(false),
michael@0 208 mAbortOnError(false),
michael@0 209 mFlags(REQUIRE_DEFAULT)
michael@0 210 {
michael@0 211 MOZ_COUNT_CTOR(ipc::MessageChannel);
michael@0 212
michael@0 213 #ifdef OS_WIN
michael@0 214 mTopFrame = nullptr;
michael@0 215 mIsSyncWaitingOnNonMainThread = false;
michael@0 216 #endif
michael@0 217
michael@0 218 mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
michael@0 219 this,
michael@0 220 &MessageChannel::OnMaybeDequeueOne));
michael@0 221
michael@0 222 #ifdef OS_WIN
michael@0 223 mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
michael@0 224 NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
michael@0 225 #endif
michael@0 226 }
michael@0 227
michael@0 228 MessageChannel::~MessageChannel()
michael@0 229 {
michael@0 230 MOZ_COUNT_DTOR(ipc::MessageChannel);
michael@0 231 IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
michael@0 232 #ifdef OS_WIN
michael@0 233 DebugOnly<BOOL> ok = CloseHandle(mEvent);
michael@0 234 MOZ_ASSERT(ok);
michael@0 235 #endif
michael@0 236 Clear();
michael@0 237 }
michael@0 238
michael@0 239 static void
michael@0 240 PrintErrorMessage(Side side, const char* channelName, const char* msg)
michael@0 241 {
michael@0 242 const char *from = (side == ChildSide)
michael@0 243 ? "Child"
michael@0 244 : ((side == ParentSide) ? "Parent" : "Unknown");
michael@0 245 printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
michael@0 246 }
michael@0 247
michael@0 248 bool
michael@0 249 MessageChannel::Connected() const
michael@0 250 {
michael@0 251 mMonitor->AssertCurrentThreadOwns();
michael@0 252
michael@0 253 // The transport layer allows us to send messages before
michael@0 254 // receiving the "connected" ack from the remote side.
michael@0 255 return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
michael@0 256 }
michael@0 257
michael@0 258 bool
michael@0 259 MessageChannel::CanSend() const
michael@0 260 {
michael@0 261 MonitorAutoLock lock(*mMonitor);
michael@0 262 return Connected();
michael@0 263 }
michael@0 264
michael@0 265 void
michael@0 266 MessageChannel::Clear()
michael@0 267 {
michael@0 268 // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
michael@0 269 // AssertWorkerThread().
michael@0 270 //
michael@0 271 // Also don't clear mListener. If we clear it, then sending a message
michael@0 272 // through this channel after it's Clear()'ed can cause this process to
michael@0 273 // crash.
michael@0 274 //
michael@0 275 // In practice, mListener owns the channel, so the channel gets deleted
michael@0 276 // before mListener. But just to be safe, mListener is a weak pointer.
michael@0 277
michael@0 278 mDequeueOneTask->Cancel();
michael@0 279
michael@0 280 mWorkerLoop = nullptr;
michael@0 281 delete mLink;
michael@0 282 mLink = nullptr;
michael@0 283
michael@0 284 if (mChannelErrorTask) {
michael@0 285 mChannelErrorTask->Cancel();
michael@0 286 mChannelErrorTask = nullptr;
michael@0 287 }
michael@0 288
michael@0 289 // Free up any memory used by pending messages.
michael@0 290 mPending.clear();
michael@0 291 mPendingUrgentRequest = nullptr;
michael@0 292 mPendingRPCCall = nullptr;
michael@0 293 mOutOfTurnReplies.clear();
michael@0 294 while (!mDeferred.empty()) {
michael@0 295 mDeferred.pop();
michael@0 296 }
michael@0 297 }
michael@0 298
michael@0 299 bool
michael@0 300 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
michael@0 301 {
michael@0 302 NS_PRECONDITION(!mLink, "Open() called > once");
michael@0 303
michael@0 304 mMonitor = new RefCountedMonitor();
michael@0 305 mWorkerLoop = MessageLoop::current();
michael@0 306 mWorkerLoopID = mWorkerLoop->id();
michael@0 307
michael@0 308 ProcessLink *link = new ProcessLink(this);
michael@0 309 link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
michael@0 310 mLink = link;
michael@0 311 return true;
michael@0 312 }
michael@0 313
michael@0 314 bool
michael@0 315 MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
michael@0 316 {
michael@0 317 // Opens a connection to another thread in the same process.
michael@0 318
michael@0 319 // This handshake proceeds as follows:
michael@0 320 // - Let A be the thread initiating the process (either child or parent)
michael@0 321 // and B be the other thread.
michael@0 322 // - A spawns thread for B, obtaining B's message loop
michael@0 323 // - A creates ProtocolChild and ProtocolParent instances.
michael@0 324 // Let PA be the one appropriate to A and PB the side for B.
michael@0 325 // - A invokes PA->Open(PB, ...):
michael@0 326 // - set state to mChannelOpening
michael@0 327 // - this will place a work item in B's worker loop (see next bullet)
michael@0 328 // and then spins until PB->mChannelState becomes mChannelConnected
michael@0 329 // - meanwhile, on PB's worker loop, the work item is removed and:
michael@0 330 // - invokes PB->SlaveOpen(PA, ...):
michael@0 331 // - sets its state and that of PA to Connected
michael@0 332 NS_PRECONDITION(aTargetChan, "Need a target channel");
michael@0 333 NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
michael@0 334
michael@0 335 CommonThreadOpenInit(aTargetChan, aSide);
michael@0 336
michael@0 337 Side oppSide = UnknownSide;
michael@0 338 switch(aSide) {
michael@0 339 case ChildSide: oppSide = ParentSide; break;
michael@0 340 case ParentSide: oppSide = ChildSide; break;
michael@0 341 case UnknownSide: break;
michael@0 342 }
michael@0 343
michael@0 344 mMonitor = new RefCountedMonitor();
michael@0 345
michael@0 346 MonitorAutoLock lock(*mMonitor);
michael@0 347 mChannelState = ChannelOpening;
michael@0 348 aTargetLoop->PostTask(
michael@0 349 FROM_HERE,
michael@0 350 NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide));
michael@0 351
michael@0 352 while (ChannelOpening == mChannelState)
michael@0 353 mMonitor->Wait();
michael@0 354 NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
michael@0 355 return (ChannelConnected == mChannelState);
michael@0 356 }
michael@0 357
michael@0 358 void
michael@0 359 MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
michael@0 360 {
michael@0 361 // Invoked when the other side has begun the open.
michael@0 362 NS_PRECONDITION(ChannelClosed == mChannelState,
michael@0 363 "Not currently closed");
michael@0 364 NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
michael@0 365 "Target channel not in the process of opening");
michael@0 366
michael@0 367 CommonThreadOpenInit(aTargetChan, aSide);
michael@0 368 mMonitor = aTargetChan->mMonitor;
michael@0 369
michael@0 370 MonitorAutoLock lock(*mMonitor);
michael@0 371 NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
michael@0 372 "Target channel not in the process of opening");
michael@0 373 mChannelState = ChannelConnected;
michael@0 374 aTargetChan->mChannelState = ChannelConnected;
michael@0 375 aTargetChan->mMonitor->Notify();
michael@0 376 }
michael@0 377
michael@0 378 void
michael@0 379 MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
michael@0 380 {
michael@0 381 mWorkerLoop = MessageLoop::current();
michael@0 382 mWorkerLoopID = mWorkerLoop->id();
michael@0 383 mLink = new ThreadLink(this, aTargetChan);
michael@0 384 mSide = aSide;
michael@0 385 }
michael@0 386
michael@0 387 bool
michael@0 388 MessageChannel::Echo(Message* aMsg)
michael@0 389 {
michael@0 390 nsAutoPtr<Message> msg(aMsg);
michael@0 391 AssertWorkerThread();
michael@0 392 mMonitor->AssertNotCurrentThreadOwns();
michael@0 393 if (MSG_ROUTING_NONE == msg->routing_id()) {
michael@0 394 ReportMessageRouteError("MessageChannel::Echo");
michael@0 395 return false;
michael@0 396 }
michael@0 397
michael@0 398 MonitorAutoLock lock(*mMonitor);
michael@0 399
michael@0 400 if (!Connected()) {
michael@0 401 ReportConnectionError("MessageChannel");
michael@0 402 return false;
michael@0 403 }
michael@0 404
michael@0 405 mLink->EchoMessage(msg.forget());
michael@0 406 return true;
michael@0 407 }
michael@0 408
michael@0 409 bool
michael@0 410 MessageChannel::Send(Message* aMsg)
michael@0 411 {
michael@0 412 CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);
michael@0 413
michael@0 414 nsAutoPtr<Message> msg(aMsg);
michael@0 415 AssertWorkerThread();
michael@0 416 mMonitor->AssertNotCurrentThreadOwns();
michael@0 417 if (MSG_ROUTING_NONE == msg->routing_id()) {
michael@0 418 ReportMessageRouteError("MessageChannel::Send");
michael@0 419 return false;
michael@0 420 }
michael@0 421
michael@0 422 MonitorAutoLock lock(*mMonitor);
michael@0 423 if (!Connected()) {
michael@0 424 ReportConnectionError("MessageChannel");
michael@0 425 return false;
michael@0 426 }
michael@0 427 mLink->SendMessage(msg.forget());
michael@0 428 return true;
michael@0 429 }
michael@0 430
michael@0 431 bool
michael@0 432 MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
michael@0 433 {
michael@0 434 AssertLinkThread();
michael@0 435 mMonitor->AssertCurrentThreadOwns();
michael@0 436
michael@0 437 if (MSG_ROUTING_NONE == aMsg.routing_id() &&
michael@0 438 GOODBYE_MESSAGE_TYPE == aMsg.type())
michael@0 439 {
michael@0 440 // :TODO: Sort out Close() on this side racing with Close() on the
michael@0 441 // other side
michael@0 442 mChannelState = ChannelClosing;
michael@0 443 if (LoggingEnabled()) {
michael@0 444 printf("NOTE: %s process received `Goodbye', closing down\n",
michael@0 445 (mSide == ChildSide) ? "child" : "parent");
michael@0 446 }
michael@0 447 return true;
michael@0 448 }
michael@0 449 return false;
michael@0 450 }
michael@0 451
michael@0 452 void
michael@0 453 MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
michael@0 454 {
michael@0 455 AssertLinkThread();
michael@0 456 mMonitor->AssertCurrentThreadOwns();
michael@0 457
michael@0 458 if (MaybeInterceptSpecialIOMessage(aMsg))
michael@0 459 return;
michael@0 460
michael@0 461 // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply,
michael@0 462 // we know that it needs to be immediately handled to unblock us.
michael@0 463 if ((AwaitingSyncReply() && aMsg.is_sync()) ||
michael@0 464 (AwaitingUrgentReply() && aMsg.is_urgent()) ||
michael@0 465 (AwaitingRPCReply() && aMsg.is_rpc()))
michael@0 466 {
michael@0 467 mRecvd = new Message(aMsg);
michael@0 468 NotifyWorkerThread();
michael@0 469 return;
michael@0 470 }
michael@0 471
michael@0 472 // Urgent messages cannot be compressed.
michael@0 473 MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent());
michael@0 474
michael@0 475 bool compress = (aMsg.compress() && !mPending.empty() &&
michael@0 476 mPending.back().type() == aMsg.type() &&
michael@0 477 mPending.back().routing_id() == aMsg.routing_id());
michael@0 478 if (compress) {
michael@0 479 // This message type has compression enabled, and the back of the
michael@0 480 // queue was the same message type and routed to the same destination.
michael@0 481 // Replace it with the newer message.
michael@0 482 MOZ_ASSERT(mPending.back().compress());
michael@0 483 mPending.pop_back();
michael@0 484 }
michael@0 485
michael@0 486 bool shouldWakeUp = AwaitingInterruptReply() ||
michael@0 487 // Allow incoming RPCs to be processed inside an urgent message.
michael@0 488 (AwaitingUrgentReply() && aMsg.is_rpc()) ||
michael@0 489 // Always process urgent messages while blocked.
michael@0 490 ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent());
michael@0 491
michael@0 492 // There are four cases we're concerned about, relating to the state of the
michael@0 493 // main thread:
michael@0 494 //
michael@0 495 // (1) We are waiting on a sync|rpc reply - main thread is blocked on the
michael@0 496 // IPC monitor.
michael@0 497 // - If the message is high priority, we wake up the main thread to
michael@0 498 // deliver the message. Otherwise, we leave it in the mPending queue,
michael@0 499 // posting a task to the main event loop, where it will be processed
michael@0 500 // once the synchronous reply has been received.
michael@0 501 //
michael@0 502 // (2) We are waiting on an Interrupt reply - main thread is blocked on the
michael@0 503 // IPC monitor.
michael@0 504 // - Always notify and wake up the main thread.
michael@0 505 //
michael@0 506 // (3) We are not waiting on a reply.
michael@0 507 // - We post a task to the main event loop.
michael@0 508 //
michael@0 509 // Note that, we may notify the main thread even though the monitor is not
michael@0 510 // blocked. This is okay, since we always check for pending events before
michael@0 511 // blocking again.
michael@0 512
michael@0 513 if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) {
michael@0 514 // If we're receiving an RPC message while blocked on an urgent message,
michael@0 515 // we must defer any messages that were not sent as part of the child
michael@0 516 // answering the urgent message.
michael@0 517 //
michael@0 518 // We must also be sure that we will not accidentally defer any RPC
michael@0 519 // message that was sent while answering an urgent message. Otherwise,
michael@0 520 // we will deadlock.
michael@0 521 //
michael@0 522 // On the parent side, the current transaction can only transition from 0
michael@0 523 // to an ID, either by us issuing an urgent request while not blocked, or
michael@0 524 // by receiving an RPC request while not blocked. When we unblock, the
michael@0 525 // current transaction is reset to 0.
michael@0 526 //
michael@0 527 // When the child side receives an urgent message, any RPC messages sent
michael@0 528 // before issuing the urgent reply will carry the urgent message's
michael@0 529 // transaction ID.
michael@0 530 //
michael@0 531 // Since AwaitingUrgentReply() implies we are blocked, it also implies
michael@0 532 // that we are within a transaction that will not change until we are
michael@0 533 // completely unblocked (i.e, the transaction has completed).
michael@0 534 if (aMsg.transaction_id() != mCurrentRPCTransaction)
michael@0 535 shouldWakeUp = false;
michael@0 536 }
michael@0 537
michael@0 538 if (aMsg.is_urgent()) {
michael@0 539 MOZ_ASSERT(!mPendingUrgentRequest);
michael@0 540 mPendingUrgentRequest = new Message(aMsg);
michael@0 541 } else if (aMsg.is_rpc() && shouldWakeUp) {
michael@0 542 // Only use this slot if we need to wake up for an RPC call. Otherwise
michael@0 543 // we treat it like a normal async or sync message.
michael@0 544 MOZ_ASSERT(!mPendingRPCCall);
michael@0 545 mPendingRPCCall = new Message(aMsg);
michael@0 546 } else {
michael@0 547 mPending.push_back(aMsg);
michael@0 548 }
michael@0 549
michael@0 550 if (shouldWakeUp) {
michael@0 551 // Always wake up Interrupt waiters, sync waiters for urgent messages,
michael@0 552 // RPC waiters for urgent messages, and urgent waiters for RPCs in the
michael@0 553 // same transaction.
michael@0 554 NotifyWorkerThread();
michael@0 555 } else {
michael@0 556 // Worker thread is either not blocked on a reply, or this is an
michael@0 557 // incoming Interrupt that raced with outgoing sync, and needs to be
michael@0 558 // deferred to a later event-loop iteration.
michael@0 559 if (!compress) {
michael@0 560 // If we compressed away the previous message, we'll re-use
michael@0 561 // its pending task.
michael@0 562 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
michael@0 563 }
michael@0 564 }
michael@0 565 }
michael@0 566
michael@0 567 bool
michael@0 568 MessageChannel::Send(Message* aMsg, Message* aReply)
michael@0 569 {
michael@0 570 // Sanity checks.
michael@0 571 AssertWorkerThread();
michael@0 572 mMonitor->AssertNotCurrentThreadOwns();
michael@0 573
michael@0 574 #ifdef OS_WIN
michael@0 575 SyncStackFrame frame(this, false);
michael@0 576 #endif
michael@0 577
michael@0 578 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
michael@0 579
michael@0 580 MonitorAutoLock lock(*mMonitor);
michael@0 581
michael@0 582 IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
michael@0 583 IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant");
michael@0 584 IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message");
michael@0 585 IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported");
michael@0 586
michael@0 587 AutoEnterPendingReply replies(mPendingSyncReplies);
michael@0 588 if (!SendAndWait(aMsg, aReply))
michael@0 589 return false;
michael@0 590
michael@0 591 NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync");
michael@0 592 return true;
michael@0 593 }
michael@0 594
michael@0 595 bool
michael@0 596 MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
michael@0 597 {
michael@0 598 AssertWorkerThread();
michael@0 599 mMonitor->AssertNotCurrentThreadOwns();
michael@0 600 IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child");
michael@0 601
michael@0 602 #ifdef OS_WIN
michael@0 603 SyncStackFrame frame(this, false);
michael@0 604 #endif
michael@0 605
michael@0 606 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
michael@0 607
michael@0 608 MonitorAutoLock lock(*mMonitor);
michael@0 609
michael@0 610 IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls");
michael@0 611 IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends");
michael@0 612
michael@0 613 AutoEnterRPCTransaction transact(this);
michael@0 614 aMsg->set_transaction_id(mCurrentRPCTransaction);
michael@0 615
michael@0 616 AutoEnterPendingReply replies(mPendingUrgentReplies);
michael@0 617 if (!SendAndWait(aMsg, aReply))
michael@0 618 return false;
michael@0 619
michael@0 620 NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent");
michael@0 621 return true;
michael@0 622 }
michael@0 623
michael@0 624 bool
michael@0 625 MessageChannel::RPCCall(Message* aMsg, Message* aReply)
michael@0 626 {
michael@0 627 AssertWorkerThread();
michael@0 628 mMonitor->AssertNotCurrentThreadOwns();
michael@0 629 IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent");
michael@0 630
michael@0 631 #ifdef OS_WIN
michael@0 632 SyncStackFrame frame(this, false);
michael@0 633 #endif
michael@0 634
michael@0 635 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
michael@0 636
michael@0 637 MonitorAutoLock lock(*mMonitor);
michael@0 638
michael@0 639 AutoEnterRPCTransaction transact(this);
michael@0 640 aMsg->set_transaction_id(mCurrentRPCTransaction);
michael@0 641
michael@0 642 AutoEnterPendingReply replies(mPendingRPCReplies);
michael@0 643 if (!SendAndWait(aMsg, aReply))
michael@0 644 return false;
michael@0 645
michael@0 646 NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply");
michael@0 647 return true;
michael@0 648 }
michael@0 649
michael@0 650 bool
michael@0 651 MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
michael@0 652 {
michael@0 653 mMonitor->AssertCurrentThreadOwns();
michael@0 654
michael@0 655 nsAutoPtr<Message> msg(aMsg);
michael@0 656
michael@0 657 if (!Connected()) {
michael@0 658 ReportConnectionError("MessageChannel::SendAndWait");
michael@0 659 return false;
michael@0 660 }
michael@0 661
michael@0 662 msg->set_seqno(NextSeqno());
michael@0 663
michael@0 664 DebugOnly<int32_t> replySeqno = msg->seqno();
michael@0 665 DebugOnly<msgid_t> replyType = msg->type() + 1;
michael@0 666
michael@0 667 mLink->SendMessage(msg.forget());
michael@0 668
michael@0 669 while (true) {
michael@0 670 // Wait for an event to occur.
michael@0 671 while (true) {
michael@0 672 if (mRecvd || mPendingUrgentRequest || mPendingRPCCall)
michael@0 673 break;
michael@0 674
michael@0 675 bool maybeTimedOut = !WaitForSyncNotify();
michael@0 676
michael@0 677 if (!Connected()) {
michael@0 678 ReportConnectionError("MessageChannel::SendAndWait");
michael@0 679 return false;
michael@0 680 }
michael@0 681
michael@0 682 if (maybeTimedOut && !ShouldContinueFromTimeout())
michael@0 683 return false;
michael@0 684 }
michael@0 685
michael@0 686 if (mPendingUrgentRequest && !ProcessPendingUrgentRequest())
michael@0 687 return false;
michael@0 688
michael@0 689 if (mPendingRPCCall && !ProcessPendingRPCCall())
michael@0 690 return false;
michael@0 691
michael@0 692 if (mRecvd) {
michael@0 693 NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply");
michael@0 694
michael@0 695 if (mRecvd->is_reply_error()) {
michael@0 696 mRecvd = nullptr;
michael@0 697 return false;
michael@0 698 }
michael@0 699
michael@0 700 NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type");
michael@0 701 NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number");
michael@0 702
michael@0 703 *aReply = *mRecvd;
michael@0 704 mRecvd = nullptr;
michael@0 705 return true;
michael@0 706 }
michael@0 707 }
michael@0 708
michael@0 709 return true;
michael@0 710 }
michael@0 711
michael@0 712 bool
michael@0 713 MessageChannel::Call(Message* aMsg, Message* aReply)
michael@0 714 {
michael@0 715 if (aMsg->is_urgent())
michael@0 716 return UrgentCall(aMsg, aReply);
michael@0 717 if (aMsg->is_rpc())
michael@0 718 return RPCCall(aMsg, aReply);
michael@0 719 return InterruptCall(aMsg, aReply);
michael@0 720 }
michael@0 721
michael@0 722 bool
michael@0 723 MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
michael@0 724 {
michael@0 725 AssertWorkerThread();
michael@0 726 mMonitor->AssertNotCurrentThreadOwns();
michael@0 727
michael@0 728 #ifdef OS_WIN
michael@0 729 SyncStackFrame frame(this, true);
michael@0 730 #endif
michael@0 731
michael@0 732 // This must come before MonitorAutoLock, as its destructor acquires the
michael@0 733 // monitor lock.
michael@0 734 CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg);
michael@0 735
michael@0 736 MonitorAutoLock lock(*mMonitor);
michael@0 737 if (!Connected()) {
michael@0 738 ReportConnectionError("MessageChannel::Call");
michael@0 739 return false;
michael@0 740 }
michael@0 741
michael@0 742 // Sanity checks.
michael@0 743 IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(),
michael@0 744 "cannot issue Interrupt call whiel blocked on sync or urgent");
michael@0 745 IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH,
michael@0 746 "violation of sync handler invariant");
michael@0 747 IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here");
michael@0 748
michael@0 749
michael@0 750 nsAutoPtr<Message> msg(aMsg);
michael@0 751
michael@0 752 msg->set_seqno(NextSeqno());
michael@0 753 msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
michael@0 754 msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
michael@0 755 mInterruptStack.push(*msg);
michael@0 756 mLink->SendMessage(msg.forget());
michael@0 757
michael@0 758 while (true) {
michael@0 759 // if a handler invoked by *Dispatch*() spun a nested event
michael@0 760 // loop, and the connection was broken during that loop, we
michael@0 761 // might have already processed the OnError event. if so,
michael@0 762 // trying another loop iteration will be futile because
michael@0 763 // channel state will have been cleared
michael@0 764 if (!Connected()) {
michael@0 765 ReportConnectionError("MessageChannel::InterruptCall");
michael@0 766 return false;
michael@0 767 }
michael@0 768
michael@0 769 // Now might be the time to process a message deferred because of race
michael@0 770 // resolution.
michael@0 771 MaybeUndeferIncall();
michael@0 772
michael@0 773 // Wait for an event to occur.
michael@0 774 while (!InterruptEventOccurred()) {
michael@0 775 bool maybeTimedOut = !WaitForInterruptNotify();
michael@0 776
michael@0 777 // We might have received a "subtly deferred" message in a nested
michael@0 778 // loop that it's now time to process.
michael@0 779 if (InterruptEventOccurred() ||
michael@0 780 (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
michael@0 781 {
michael@0 782 break;
michael@0 783 }
michael@0 784
michael@0 785 if (maybeTimedOut && !ShouldContinueFromTimeout())
michael@0 786 return false;
michael@0 787 }
michael@0 788
michael@0 789 Message recvd;
michael@0 790 MessageMap::iterator it;
michael@0 791
michael@0 792 if (mPendingUrgentRequest) {
michael@0 793 recvd = *mPendingUrgentRequest;
michael@0 794 mPendingUrgentRequest = nullptr;
michael@0 795 } else if (mPendingRPCCall) {
michael@0 796 recvd = *mPendingRPCCall;
michael@0 797 mPendingRPCCall = nullptr;
michael@0 798 } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
michael@0 799 != mOutOfTurnReplies.end())
michael@0 800 {
michael@0 801 recvd = it->second;
michael@0 802 mOutOfTurnReplies.erase(it);
michael@0 803 } else if (!mPending.empty()) {
michael@0 804 recvd = mPending.front();
michael@0 805 mPending.pop_front();
michael@0 806 } else {
michael@0 807 // because of subtleties with nested event loops, it's possible
michael@0 808 // that we got here and nothing happened. or, we might have a
michael@0 809 // deferred in-call that needs to be processed. either way, we
michael@0 810 // won't break the inner while loop again until something new
michael@0 811 // happens.
michael@0 812 continue;
michael@0 813 }
michael@0 814
michael@0 815 // If the message is not Interrupt, we can dispatch it as normal.
michael@0 816 if (!recvd.is_interrupt()) {
michael@0 817 // Other side should be blocked.
michael@0 818 IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked");
michael@0 819
michael@0 820 {
michael@0 821 AutoEnterRPCTransaction transaction(this, &recvd);
michael@0 822 MonitorAutoUnlock unlock(*mMonitor);
michael@0 823 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
michael@0 824 DispatchMessage(recvd);
michael@0 825 }
michael@0 826 if (!Connected()) {
michael@0 827 ReportConnectionError("MessageChannel::DispatchMessage");
michael@0 828 return false;
michael@0 829 }
michael@0 830 continue;
michael@0 831 }
michael@0 832
michael@0 833 // If the message is an Interrupt reply, either process it as a reply to our
michael@0 834 // call, or add it to the list of out-of-turn replies we've received.
michael@0 835 if (recvd.is_reply()) {
michael@0 836 IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");
michael@0 837
michael@0 838 // If this is not a reply the call we've initiated, add it to our
michael@0 839 // out-of-turn replies and keep polling for events.
michael@0 840 {
michael@0 841 const Message &outcall = mInterruptStack.top();
michael@0 842
michael@0 843 // Note, In the parent, sequence numbers increase from 0, and
michael@0 844 // in the child, they decrease from 0.
michael@0 845 if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
michael@0 846 (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
michael@0 847 {
michael@0 848 mOutOfTurnReplies[recvd.seqno()] = recvd;
michael@0 849 continue;
michael@0 850 }
michael@0 851
michael@0 852 IPC_ASSERT(recvd.is_reply_error() ||
michael@0 853 (recvd.type() == (outcall.type() + 1) &&
michael@0 854 recvd.seqno() == outcall.seqno()),
michael@0 855 "somebody's misbehavin'", true);
michael@0 856 }
michael@0 857
michael@0 858 // We received a reply to our most recent outstanding call. Pop
michael@0 859 // this frame and return the reply.
michael@0 860 mInterruptStack.pop();
michael@0 861
michael@0 862 if (!recvd.is_reply_error()) {
michael@0 863 *aReply = recvd;
michael@0 864 }
michael@0 865
michael@0 866 // If we have no more pending out calls waiting on replies, then
michael@0 867 // the reply queue should be empty.
michael@0 868 IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
michael@0 869 "still have pending replies with no pending out-calls",
michael@0 870 true);
michael@0 871
michael@0 872 return !recvd.is_reply_error();
michael@0 873 }
michael@0 874
michael@0 875 // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
michael@0 876 // own the monitor.
michael@0 877 size_t stackDepth = InterruptStackDepth();
michael@0 878 {
michael@0 879 MonitorAutoUnlock unlock(*mMonitor);
michael@0 880
michael@0 881 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
michael@0 882 DispatchInterruptMessage(recvd, stackDepth);
michael@0 883 }
michael@0 884 if (!Connected()) {
michael@0 885 ReportConnectionError("MessageChannel::DispatchInterruptMessage");
michael@0 886 return false;
michael@0 887 }
michael@0 888 }
michael@0 889
michael@0 890 return true;
michael@0 891 }
michael@0 892
michael@0 893 bool
michael@0 894 MessageChannel::InterruptEventOccurred()
michael@0 895 {
michael@0 896 AssertWorkerThread();
michael@0 897 mMonitor->AssertCurrentThreadOwns();
michael@0 898 IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
michael@0 899
michael@0 900 return (!Connected() ||
michael@0 901 !mPending.empty() ||
michael@0 902 mPendingUrgentRequest ||
michael@0 903 mPendingRPCCall ||
michael@0 904 (!mOutOfTurnReplies.empty() &&
michael@0 905 mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
michael@0 906 mOutOfTurnReplies.end()));
michael@0 907 }
michael@0 908
michael@0 909 bool
michael@0 910 MessageChannel::ProcessPendingUrgentRequest()
michael@0 911 {
michael@0 912 AssertWorkerThread();
michael@0 913 mMonitor->AssertCurrentThreadOwns();
michael@0 914
michael@0 915 // Note that it is possible we could have sent a sync message at
michael@0 916 // the same time the parent process sent an urgent message, and
michael@0 917 // therefore mPendingUrgentRequest is set *and* mRecvd is set as
michael@0 918 // well, because the link thread received both before the worker
michael@0 919 // thread woke up.
michael@0 920 //
michael@0 921 // In this case, we process the urgent message first, but we need
michael@0 922 // to save the reply.
michael@0 923 nsAutoPtr<Message> savedReply(mRecvd.forget());
michael@0 924
michael@0 925 // We're the child process. We should not be receiving RPC calls.
michael@0 926 IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call");
michael@0 927
michael@0 928 nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
michael@0 929 {
michael@0 930 // In order to send the parent RPC messages and guarantee it will
michael@0 931 // wake up, we must re-use its transaction.
michael@0 932 AutoEnterRPCTransaction transaction(this, recvd);
michael@0 933
michael@0 934 MonitorAutoUnlock unlock(*mMonitor);
michael@0 935 DispatchUrgentMessage(*recvd);
michael@0 936 }
michael@0 937 if (!Connected()) {
michael@0 938 ReportConnectionError("MessageChannel::DispatchUrgentMessage");
michael@0 939 return false;
michael@0 940 }
michael@0 941
michael@0 942 // In between having dispatched our reply to the parent process, and
michael@0 943 // re-acquiring the monitor, the parent process could have already
michael@0 944 // processed that reply and sent the reply to our sync message. If so,
michael@0 945 // our saved reply should be empty.
michael@0 946 IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
michael@0 947 if (!mRecvd)
michael@0 948 mRecvd = savedReply.forget();
michael@0 949 return true;
michael@0 950 }
michael@0 951
michael@0 952 bool
michael@0 953 MessageChannel::ProcessPendingRPCCall()
michael@0 954 {
michael@0 955 AssertWorkerThread();
michael@0 956 mMonitor->AssertCurrentThreadOwns();
michael@0 957
michael@0 958 // See comment above re: mRecvd replies and incoming calls.
michael@0 959 nsAutoPtr<Message> savedReply(mRecvd.forget());
michael@0 960
michael@0 961 IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message");
michael@0 962
michael@0 963 nsAutoPtr<Message> recvd(mPendingRPCCall.forget());
michael@0 964 {
michael@0 965 // If we are not currently in a transaction, this will begin one,
michael@0 966 // and the link thread will not wake us up for any RPC messages not
michael@0 967 // apart of this transaction. If we are already in a transaction,
michael@0 968 // then this will assert that we're still in the same transaction.
michael@0 969 AutoEnterRPCTransaction transaction(this, recvd);
michael@0 970
michael@0 971 MonitorAutoUnlock unlock(*mMonitor);
michael@0 972 DispatchRPCMessage(*recvd);
michael@0 973 }
michael@0 974 if (!Connected()) {
michael@0 975 ReportConnectionError("MessageChannel::DispatchRPCMessage");
michael@0 976 return false;
michael@0 977 }
michael@0 978
michael@0 979 // In between having dispatched our reply to the parent process, and
michael@0 980 // re-acquiring the monitor, the parent process could have already
michael@0 981 // processed that reply and sent the reply to our sync message. If so,
michael@0 982 // our saved reply should be empty.
michael@0 983 IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
michael@0 984 if (!mRecvd)
michael@0 985 mRecvd = savedReply.forget();
michael@0 986 return true;
michael@0 987 }
michael@0 988
michael@0 989 bool
michael@0 990 MessageChannel::DequeueOne(Message *recvd)
michael@0 991 {
michael@0 992 AssertWorkerThread();
michael@0 993 mMonitor->AssertCurrentThreadOwns();
michael@0 994
michael@0 995 if (!Connected()) {
michael@0 996 ReportConnectionError("OnMaybeDequeueOne");
michael@0 997 return false;
michael@0 998 }
michael@0 999
michael@0 1000 if (mPendingUrgentRequest) {
michael@0 1001 *recvd = *mPendingUrgentRequest;
michael@0 1002 mPendingUrgentRequest = nullptr;
michael@0 1003 return true;
michael@0 1004 }
michael@0 1005
michael@0 1006 if (mPendingRPCCall) {
michael@0 1007 *recvd = *mPendingRPCCall;
michael@0 1008 mPendingRPCCall = nullptr;
michael@0 1009 return true;
michael@0 1010 }
michael@0 1011
michael@0 1012 if (!mDeferred.empty())
michael@0 1013 MaybeUndeferIncall();
michael@0 1014
michael@0 1015 if (mPending.empty())
michael@0 1016 return false;
michael@0 1017
michael@0 1018 *recvd = mPending.front();
michael@0 1019 mPending.pop_front();
michael@0 1020 return true;
michael@0 1021 }
michael@0 1022
michael@0 1023 bool
michael@0 1024 MessageChannel::OnMaybeDequeueOne()
michael@0 1025 {
michael@0 1026 AssertWorkerThread();
michael@0 1027 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1028
michael@0 1029 Message recvd;
michael@0 1030
michael@0 1031 MonitorAutoLock lock(*mMonitor);
michael@0 1032 if (!DequeueOne(&recvd))
michael@0 1033 return false;
michael@0 1034
michael@0 1035 if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
michael@0 1036 // We probably just received a reply in a nested loop for an
michael@0 1037 // Interrupt call sent before entering that loop.
michael@0 1038 mOutOfTurnReplies[recvd.seqno()] = recvd;
michael@0 1039 return false;
michael@0 1040 }
michael@0 1041
michael@0 1042 {
michael@0 1043 // We should not be in a transaction yet if we're not blocked.
michael@0 1044 MOZ_ASSERT(mCurrentRPCTransaction == 0);
michael@0 1045 AutoEnterRPCTransaction transaction(this, &recvd);
michael@0 1046
michael@0 1047 MonitorAutoUnlock unlock(*mMonitor);
michael@0 1048
michael@0 1049 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
michael@0 1050 DispatchMessage(recvd);
michael@0 1051 }
michael@0 1052 return true;
michael@0 1053 }
michael@0 1054
michael@0 1055 void
michael@0 1056 MessageChannel::DispatchMessage(const Message &aMsg)
michael@0 1057 {
michael@0 1058 if (aMsg.is_sync())
michael@0 1059 DispatchSyncMessage(aMsg);
michael@0 1060 else if (aMsg.is_urgent())
michael@0 1061 DispatchUrgentMessage(aMsg);
michael@0 1062 else if (aMsg.is_interrupt())
michael@0 1063 DispatchInterruptMessage(aMsg, 0);
michael@0 1064 else if (aMsg.is_rpc())
michael@0 1065 DispatchRPCMessage(aMsg);
michael@0 1066 else
michael@0 1067 DispatchAsyncMessage(aMsg);
michael@0 1068 }
michael@0 1069
michael@0 1070 void
michael@0 1071 MessageChannel::DispatchSyncMessage(const Message& aMsg)
michael@0 1072 {
michael@0 1073 AssertWorkerThread();
michael@0 1074
michael@0 1075 Message *reply = nullptr;
michael@0 1076
michael@0 1077 mDispatchingSyncMessage = true;
michael@0 1078 Result rv = mListener->OnMessageReceived(aMsg, reply);
michael@0 1079 mDispatchingSyncMessage = false;
michael@0 1080
michael@0 1081 if (!MaybeHandleError(rv, "DispatchSyncMessage")) {
michael@0 1082 delete reply;
michael@0 1083 reply = new Message();
michael@0 1084 reply->set_sync();
michael@0 1085 reply->set_reply();
michael@0 1086 reply->set_reply_error();
michael@0 1087 }
michael@0 1088 reply->set_seqno(aMsg.seqno());
michael@0 1089
michael@0 1090 MonitorAutoLock lock(*mMonitor);
michael@0 1091 if (ChannelConnected == mChannelState)
michael@0 1092 mLink->SendMessage(reply);
michael@0 1093 }
michael@0 1094
michael@0 1095 void
michael@0 1096 MessageChannel::DispatchUrgentMessage(const Message& aMsg)
michael@0 1097 {
michael@0 1098 AssertWorkerThread();
michael@0 1099 MOZ_ASSERT(aMsg.is_urgent());
michael@0 1100
michael@0 1101 Message *reply = nullptr;
michael@0 1102
michael@0 1103 mDispatchingUrgentMessageCount++;
michael@0 1104 Result rv = mListener->OnCallReceived(aMsg, reply);
michael@0 1105 mDispatchingUrgentMessageCount--;
michael@0 1106
michael@0 1107 if (!MaybeHandleError(rv, "DispatchUrgentMessage")) {
michael@0 1108 delete reply;
michael@0 1109 reply = new Message();
michael@0 1110 reply->set_urgent();
michael@0 1111 reply->set_reply();
michael@0 1112 reply->set_reply_error();
michael@0 1113 }
michael@0 1114 reply->set_seqno(aMsg.seqno());
michael@0 1115
michael@0 1116 MonitorAutoLock lock(*mMonitor);
michael@0 1117 if (ChannelConnected == mChannelState)
michael@0 1118 mLink->SendMessage(reply);
michael@0 1119 }
michael@0 1120
michael@0 1121 void
michael@0 1122 MessageChannel::DispatchRPCMessage(const Message& aMsg)
michael@0 1123 {
michael@0 1124 AssertWorkerThread();
michael@0 1125 MOZ_ASSERT(aMsg.is_rpc());
michael@0 1126
michael@0 1127 Message *reply = nullptr;
michael@0 1128
michael@0 1129 if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) {
michael@0 1130 delete reply;
michael@0 1131 reply = new Message();
michael@0 1132 reply->set_rpc();
michael@0 1133 reply->set_reply();
michael@0 1134 reply->set_reply_error();
michael@0 1135 }
michael@0 1136 reply->set_seqno(aMsg.seqno());
michael@0 1137
michael@0 1138 MonitorAutoLock lock(*mMonitor);
michael@0 1139 if (ChannelConnected == mChannelState)
michael@0 1140 mLink->SendMessage(reply);
michael@0 1141 }
michael@0 1142
michael@0 1143 void
michael@0 1144 MessageChannel::DispatchAsyncMessage(const Message& aMsg)
michael@0 1145 {
michael@0 1146 AssertWorkerThread();
michael@0 1147 MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent());
michael@0 1148
michael@0 1149 if (aMsg.routing_id() == MSG_ROUTING_NONE) {
michael@0 1150 NS_RUNTIMEABORT("unhandled special message!");
michael@0 1151 }
michael@0 1152
michael@0 1153 MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage");
michael@0 1154 }
michael@0 1155
michael@0 1156 void
michael@0 1157 MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth)
michael@0 1158 {
michael@0 1159 AssertWorkerThread();
michael@0 1160 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1161
michael@0 1162 IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");
michael@0 1163
michael@0 1164 // Race detection: see the long comment near mRemoteStackDepthGuess in
michael@0 1165 // MessageChannel.h. "Remote" stack depth means our side, and "local" means
michael@0 1166 // the other side.
michael@0 1167 if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
michael@0 1168 // Interrupt in-calls have raced. The winner, if there is one, gets to defer
michael@0 1169 // processing of the other side's in-call.
michael@0 1170 bool defer;
michael@0 1171 const char* winner;
michael@0 1172 switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(),
michael@0 1173 (mSide != ChildSide) ? mInterruptStack.top() : aMsg))
michael@0 1174 {
michael@0 1175 case RIPChildWins:
michael@0 1176 winner = "child";
michael@0 1177 defer = (mSide == ChildSide);
michael@0 1178 break;
michael@0 1179 case RIPParentWins:
michael@0 1180 winner = "parent";
michael@0 1181 defer = (mSide != ChildSide);
michael@0 1182 break;
michael@0 1183 case RIPError:
michael@0 1184 NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
michael@0 1185 return;
michael@0 1186 default:
michael@0 1187 NS_RUNTIMEABORT("not reached");
michael@0 1188 return;
michael@0 1189 }
michael@0 1190
michael@0 1191 if (LoggingEnabled()) {
michael@0 1192 printf_stderr(" (%s: %s won, so we're%sdeferring)\n",
michael@0 1193 (mSide == ChildSide) ? "child" : "parent",
michael@0 1194 winner,
michael@0 1195 defer ? " " : " not ");
michael@0 1196 }
michael@0 1197
michael@0 1198 if (defer) {
michael@0 1199 // We now know the other side's stack has one more frame
michael@0 1200 // than we thought.
michael@0 1201 ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
michael@0 1202 mDeferred.push(aMsg);
michael@0 1203 return;
michael@0 1204 }
michael@0 1205
michael@0 1206 // We "lost" and need to process the other side's in-call. Don't need
michael@0 1207 // to fix up the mRemoteStackDepthGuess here, because we're just about
michael@0 1208 // to increment it in DispatchCall(), which will make it correct again.
michael@0 1209 }
michael@0 1210
michael@0 1211 #ifdef OS_WIN
michael@0 1212 SyncStackFrame frame(this, true);
michael@0 1213 #endif
michael@0 1214
michael@0 1215 Message* reply = nullptr;
michael@0 1216
michael@0 1217 ++mRemoteStackDepthGuess;
michael@0 1218 Result rv = mListener->OnCallReceived(aMsg, reply);
michael@0 1219 --mRemoteStackDepthGuess;
michael@0 1220
michael@0 1221 if (!MaybeHandleError(rv, "DispatchInterruptMessage")) {
michael@0 1222 delete reply;
michael@0 1223 reply = new Message();
michael@0 1224 reply->set_interrupt();
michael@0 1225 reply->set_reply();
michael@0 1226 reply->set_reply_error();
michael@0 1227 }
michael@0 1228 reply->set_seqno(aMsg.seqno());
michael@0 1229
michael@0 1230 MonitorAutoLock lock(*mMonitor);
michael@0 1231 if (ChannelConnected == mChannelState)
michael@0 1232 mLink->SendMessage(reply);
michael@0 1233 }
michael@0 1234
michael@0 1235 void
michael@0 1236 MessageChannel::MaybeUndeferIncall()
michael@0 1237 {
michael@0 1238 AssertWorkerThread();
michael@0 1239 mMonitor->AssertCurrentThreadOwns();
michael@0 1240
michael@0 1241 if (mDeferred.empty())
michael@0 1242 return;
michael@0 1243
michael@0 1244 size_t stackDepth = InterruptStackDepth();
michael@0 1245
michael@0 1246 // the other side can only *under*-estimate our actual stack depth
michael@0 1247 IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
michael@0 1248 "fatal logic error");
michael@0 1249
michael@0 1250 if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
michael@0 1251 return;
michael@0 1252
michael@0 1253 // maybe time to process this message
michael@0 1254 Message call = mDeferred.top();
michael@0 1255 mDeferred.pop();
michael@0 1256
michael@0 1257 // fix up fudge factor we added to account for race
michael@0 1258 IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
michael@0 1259 --mRemoteStackDepthGuess;
michael@0 1260
michael@0 1261 mPending.push_back(call);
michael@0 1262 }
michael@0 1263
michael@0 1264 void
michael@0 1265 MessageChannel::FlushPendingInterruptQueue()
michael@0 1266 {
michael@0 1267 AssertWorkerThread();
michael@0 1268 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1269
michael@0 1270 {
michael@0 1271 MonitorAutoLock lock(*mMonitor);
michael@0 1272
michael@0 1273 if (mDeferred.empty()) {
michael@0 1274 if (mPending.empty())
michael@0 1275 return;
michael@0 1276
michael@0 1277 const Message& last = mPending.back();
michael@0 1278 if (!last.is_interrupt() || last.is_reply())
michael@0 1279 return;
michael@0 1280 }
michael@0 1281 }
michael@0 1282
michael@0 1283 while (OnMaybeDequeueOne());
michael@0 1284 }
michael@0 1285
michael@0 1286 void
michael@0 1287 MessageChannel::ExitedCxxStack()
michael@0 1288 {
michael@0 1289 mListener->OnExitedCxxStack();
michael@0 1290 if (mSawInterruptOutMsg) {
michael@0 1291 MonitorAutoLock lock(*mMonitor);
michael@0 1292 // see long comment in OnMaybeDequeueOne()
michael@0 1293 EnqueuePendingMessages();
michael@0 1294 mSawInterruptOutMsg = false;
michael@0 1295 }
michael@0 1296 }
michael@0 1297
michael@0 1298 void
michael@0 1299 MessageChannel::EnqueuePendingMessages()
michael@0 1300 {
michael@0 1301 AssertWorkerThread();
michael@0 1302 mMonitor->AssertCurrentThreadOwns();
michael@0 1303
michael@0 1304 MaybeUndeferIncall();
michael@0 1305
michael@0 1306 for (size_t i = 0; i < mDeferred.size(); ++i) {
michael@0 1307 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
michael@0 1308 }
michael@0 1309
michael@0 1310 // XXX performance tuning knob: could process all or k pending
michael@0 1311 // messages here, rather than enqueuing for later processing
michael@0 1312
michael@0 1313 for (size_t i = 0; i < mPending.size(); ++i) {
michael@0 1314 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
michael@0 1315 }
michael@0 1316 }
michael@0 1317
michael@0 1318 static inline bool
michael@0 1319 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
michael@0 1320 {
michael@0 1321 return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
michael@0 1322 (aTimeout <= (PR_IntervalNow() - aStart));
michael@0 1323 }
michael@0 1324
michael@0 1325 bool
michael@0 1326 MessageChannel::WaitResponse(bool aWaitTimedOut)
michael@0 1327 {
michael@0 1328 if (aWaitTimedOut) {
michael@0 1329 if (mInTimeoutSecondHalf) {
michael@0 1330 // We've really timed out this time.
michael@0 1331 return false;
michael@0 1332 }
michael@0 1333 // Try a second time.
michael@0 1334 mInTimeoutSecondHalf = true;
michael@0 1335 } else {
michael@0 1336 mInTimeoutSecondHalf = false;
michael@0 1337 }
michael@0 1338 return true;
michael@0 1339 }
michael@0 1340
michael@0 1341 #ifndef OS_WIN
michael@0 1342 bool
michael@0 1343 MessageChannel::WaitForSyncNotify()
michael@0 1344 {
michael@0 1345 PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
michael@0 1346 PR_INTERVAL_NO_TIMEOUT :
michael@0 1347 PR_MillisecondsToInterval(mTimeoutMs);
michael@0 1348 // XXX could optimize away this syscall for "no timeout" case if desired
michael@0 1349 PRIntervalTime waitStart = PR_IntervalNow();
michael@0 1350
michael@0 1351 mMonitor->Wait(timeout);
michael@0 1352
michael@0 1353 // If the timeout didn't expire, we know we received an event. The
michael@0 1354 // converse is not true.
michael@0 1355 return WaitResponse(IsTimeoutExpired(waitStart, timeout));
michael@0 1356 }
michael@0 1357
michael@0 1358 bool
michael@0 1359 MessageChannel::WaitForInterruptNotify()
michael@0 1360 {
michael@0 1361 return WaitForSyncNotify();
michael@0 1362 }
michael@0 1363
michael@0 1364 void
michael@0 1365 MessageChannel::NotifyWorkerThread()
michael@0 1366 {
michael@0 1367 mMonitor->Notify();
michael@0 1368 }
michael@0 1369 #endif
michael@0 1370
michael@0 1371 bool
michael@0 1372 MessageChannel::ShouldContinueFromTimeout()
michael@0 1373 {
michael@0 1374 AssertWorkerThread();
michael@0 1375 mMonitor->AssertCurrentThreadOwns();
michael@0 1376
michael@0 1377 bool cont;
michael@0 1378 {
michael@0 1379 MonitorAutoUnlock unlock(*mMonitor);
michael@0 1380 cont = mListener->OnReplyTimeout();
michael@0 1381 }
michael@0 1382
michael@0 1383 static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
michael@0 1384
michael@0 1385 if (sDebuggingChildren == UNKNOWN) {
michael@0 1386 sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
michael@0 1387 }
michael@0 1388 if (sDebuggingChildren == DEBUGGING) {
michael@0 1389 return true;
michael@0 1390 }
michael@0 1391
michael@0 1392 if (!cont) {
michael@0 1393 // NB: there's a sublety here. If parents were allowed to send sync
michael@0 1394 // messages to children, then it would be possible for this
michael@0 1395 // synchronous close-on-timeout to race with async |OnMessageReceived|
michael@0 1396 // tasks arriving from the child, posted to the worker thread's event
michael@0 1397 // loop. This would complicate cleanup of the *Channel. But since
michael@0 1398 // IPDL forbids this (and since it doesn't support children timing out
michael@0 1399 // on parents), the parent can only block on interrupt messages to the child,
michael@0 1400 // and in that case arriving async messages are enqueued to the interrupt
michael@0 1401 // channel's special queue. They're then ignored because the channel
michael@0 1402 // state changes to ChannelTimeout (i.e. !Connected).
michael@0 1403 SynchronouslyClose();
michael@0 1404 mChannelState = ChannelTimeout;
michael@0 1405 }
michael@0 1406
michael@0 1407 return cont;
michael@0 1408 }
michael@0 1409
michael@0 1410 void
michael@0 1411 MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
michael@0 1412 {
michael@0 1413 // Set channel timeout value. Since this is broken up into
michael@0 1414 // two period, the minimum timeout value is 2ms.
michael@0 1415 AssertWorkerThread();
michael@0 1416 mTimeoutMs = (aTimeoutMs <= 0)
michael@0 1417 ? kNoTimeout
michael@0 1418 : (int32_t)ceil((double)aTimeoutMs / 2.0);
michael@0 1419 }
michael@0 1420
michael@0 1421 void
michael@0 1422 MessageChannel::OnChannelConnected(int32_t peer_id)
michael@0 1423 {
michael@0 1424 mWorkerLoop->PostTask(
michael@0 1425 FROM_HERE,
michael@0 1426 NewRunnableMethod(this,
michael@0 1427 &MessageChannel::DispatchOnChannelConnected,
michael@0 1428 peer_id));
michael@0 1429 }
michael@0 1430
michael@0 1431 void
michael@0 1432 MessageChannel::DispatchOnChannelConnected(int32_t peer_pid)
michael@0 1433 {
michael@0 1434 AssertWorkerThread();
michael@0 1435 if (mListener)
michael@0 1436 mListener->OnChannelConnected(peer_pid);
michael@0 1437 }
michael@0 1438
michael@0 1439 void
michael@0 1440 MessageChannel::ReportMessageRouteError(const char* channelName) const
michael@0 1441 {
michael@0 1442 PrintErrorMessage(mSide, channelName, "Need a route");
michael@0 1443 mListener->OnProcessingError(MsgRouteError);
michael@0 1444 }
michael@0 1445
michael@0 1446 void
michael@0 1447 MessageChannel::ReportConnectionError(const char* aChannelName) const
michael@0 1448 {
michael@0 1449 AssertWorkerThread();
michael@0 1450 mMonitor->AssertCurrentThreadOwns();
michael@0 1451
michael@0 1452 const char* errorMsg = nullptr;
michael@0 1453 switch (mChannelState) {
michael@0 1454 case ChannelClosed:
michael@0 1455 errorMsg = "Closed channel: cannot send/recv";
michael@0 1456 break;
michael@0 1457 case ChannelOpening:
michael@0 1458 errorMsg = "Opening channel: not yet ready for send/recv";
michael@0 1459 break;
michael@0 1460 case ChannelTimeout:
michael@0 1461 errorMsg = "Channel timeout: cannot send/recv";
michael@0 1462 break;
michael@0 1463 case ChannelClosing:
michael@0 1464 errorMsg = "Channel closing: too late to send/recv, messages will be lost";
michael@0 1465 break;
michael@0 1466 case ChannelError:
michael@0 1467 errorMsg = "Channel error: cannot send/recv";
michael@0 1468 break;
michael@0 1469
michael@0 1470 default:
michael@0 1471 NS_RUNTIMEABORT("unreached");
michael@0 1472 }
michael@0 1473
michael@0 1474 PrintErrorMessage(mSide, aChannelName, errorMsg);
michael@0 1475
michael@0 1476 MonitorAutoUnlock unlock(*mMonitor);
michael@0 1477 mListener->OnProcessingError(MsgDropped);
michael@0 1478 }
michael@0 1479
michael@0 1480 bool
michael@0 1481 MessageChannel::MaybeHandleError(Result code, const char* channelName)
michael@0 1482 {
michael@0 1483 if (MsgProcessed == code)
michael@0 1484 return true;
michael@0 1485
michael@0 1486 const char* errorMsg = nullptr;
michael@0 1487 switch (code) {
michael@0 1488 case MsgNotKnown:
michael@0 1489 errorMsg = "Unknown message: not processed";
michael@0 1490 break;
michael@0 1491 case MsgNotAllowed:
michael@0 1492 errorMsg = "Message not allowed: cannot be sent/recvd in this state";
michael@0 1493 break;
michael@0 1494 case MsgPayloadError:
michael@0 1495 errorMsg = "Payload error: message could not be deserialized";
michael@0 1496 break;
michael@0 1497 case MsgProcessingError:
michael@0 1498 errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
michael@0 1499 break;
michael@0 1500 case MsgRouteError:
michael@0 1501 errorMsg = "Route error: message sent to unknown actor ID";
michael@0 1502 break;
michael@0 1503 case MsgValueError:
michael@0 1504 errorMsg = "Value error: message was deserialized, but contained an illegal value";
michael@0 1505 break;
michael@0 1506
michael@0 1507 default:
michael@0 1508 NS_RUNTIMEABORT("unknown Result code");
michael@0 1509 return false;
michael@0 1510 }
michael@0 1511
michael@0 1512 PrintErrorMessage(mSide, channelName, errorMsg);
michael@0 1513
michael@0 1514 mListener->OnProcessingError(code);
michael@0 1515
michael@0 1516 return false;
michael@0 1517 }
michael@0 1518
michael@0 1519 void
michael@0 1520 MessageChannel::OnChannelErrorFromLink()
michael@0 1521 {
michael@0 1522 AssertLinkThread();
michael@0 1523 mMonitor->AssertCurrentThreadOwns();
michael@0 1524
michael@0 1525 if (InterruptStackDepth() > 0)
michael@0 1526 NotifyWorkerThread();
michael@0 1527
michael@0 1528 if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply())
michael@0 1529 NotifyWorkerThread();
michael@0 1530
michael@0 1531 if (ChannelClosing != mChannelState) {
michael@0 1532 if (mAbortOnError) {
michael@0 1533 NS_RUNTIMEABORT("Aborting on channel error.");
michael@0 1534 }
michael@0 1535 mChannelState = ChannelError;
michael@0 1536 mMonitor->Notify();
michael@0 1537 }
michael@0 1538
michael@0 1539 PostErrorNotifyTask();
michael@0 1540 }
michael@0 1541
michael@0 1542 void
michael@0 1543 MessageChannel::NotifyMaybeChannelError()
michael@0 1544 {
michael@0 1545 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1546
michael@0 1547 // TODO sort out Close() on this side racing with Close() on the other side
michael@0 1548 if (ChannelClosing == mChannelState) {
michael@0 1549 // the channel closed, but we received a "Goodbye" message warning us
michael@0 1550 // about it. no worries
michael@0 1551 mChannelState = ChannelClosed;
michael@0 1552 NotifyChannelClosed();
michael@0 1553 return;
michael@0 1554 }
michael@0 1555
michael@0 1556 // Oops, error! Let the listener know about it.
michael@0 1557 mChannelState = ChannelError;
michael@0 1558 mListener->OnChannelError();
michael@0 1559 Clear();
michael@0 1560 }
michael@0 1561
michael@0 1562 void
michael@0 1563 MessageChannel::OnNotifyMaybeChannelError()
michael@0 1564 {
michael@0 1565 AssertWorkerThread();
michael@0 1566 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1567
michael@0 1568 mChannelErrorTask = nullptr;
michael@0 1569
michael@0 1570 // OnChannelError holds mMonitor when it posts this task and this
michael@0 1571 // task cannot be allowed to run until OnChannelError has
michael@0 1572 // exited. We enforce that order by grabbing the mutex here which
michael@0 1573 // should only continue once OnChannelError has completed.
michael@0 1574 {
michael@0 1575 MonitorAutoLock lock(*mMonitor);
michael@0 1576 // nothing to do here
michael@0 1577 }
michael@0 1578
michael@0 1579 if (IsOnCxxStack()) {
michael@0 1580 mChannelErrorTask =
michael@0 1581 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
michael@0 1582 // 10 ms delay is completely arbitrary
michael@0 1583 mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
michael@0 1584 return;
michael@0 1585 }
michael@0 1586
michael@0 1587 NotifyMaybeChannelError();
michael@0 1588 }
michael@0 1589
michael@0 1590 void
michael@0 1591 MessageChannel::PostErrorNotifyTask()
michael@0 1592 {
michael@0 1593 mMonitor->AssertCurrentThreadOwns();
michael@0 1594
michael@0 1595 if (mChannelErrorTask)
michael@0 1596 return;
michael@0 1597
michael@0 1598 // This must be the last code that runs on this thread!
michael@0 1599 mChannelErrorTask =
michael@0 1600 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
michael@0 1601 mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
michael@0 1602 }
michael@0 1603
michael@0 1604 // Special async message.
michael@0 1605 class GoodbyeMessage : public IPC::Message
michael@0 1606 {
michael@0 1607 public:
michael@0 1608 GoodbyeMessage() :
michael@0 1609 IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL)
michael@0 1610 {
michael@0 1611 }
michael@0 1612 static bool Read(const Message* msg) {
michael@0 1613 return true;
michael@0 1614 }
michael@0 1615 void Log(const std::string& aPrefix, FILE* aOutf) const {
michael@0 1616 fputs("(special `Goodbye' message)", aOutf);
michael@0 1617 }
michael@0 1618 };
michael@0 1619
michael@0 1620 void
michael@0 1621 MessageChannel::SynchronouslyClose()
michael@0 1622 {
michael@0 1623 AssertWorkerThread();
michael@0 1624 mMonitor->AssertCurrentThreadOwns();
michael@0 1625 mLink->SendClose();
michael@0 1626 while (ChannelClosed != mChannelState)
michael@0 1627 mMonitor->Wait();
michael@0 1628 }
michael@0 1629
michael@0 1630 void
michael@0 1631 MessageChannel::CloseWithError()
michael@0 1632 {
michael@0 1633 AssertWorkerThread();
michael@0 1634
michael@0 1635 MonitorAutoLock lock(*mMonitor);
michael@0 1636 if (ChannelConnected != mChannelState) {
michael@0 1637 return;
michael@0 1638 }
michael@0 1639 SynchronouslyClose();
michael@0 1640 mChannelState = ChannelError;
michael@0 1641 PostErrorNotifyTask();
michael@0 1642 }
michael@0 1643
michael@0 1644 void
michael@0 1645 MessageChannel::Close()
michael@0 1646 {
michael@0 1647 AssertWorkerThread();
michael@0 1648
michael@0 1649 {
michael@0 1650 MonitorAutoLock lock(*mMonitor);
michael@0 1651
michael@0 1652 if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
michael@0 1653 // See bug 538586: if the listener gets deleted while the
michael@0 1654 // IO thread's NotifyChannelError event is still enqueued
michael@0 1655 // and subsequently deletes us, then the error event will
michael@0 1656 // also be deleted and the listener will never be notified
michael@0 1657 // of the channel error.
michael@0 1658 if (mListener) {
michael@0 1659 MonitorAutoUnlock unlock(*mMonitor);
michael@0 1660 NotifyMaybeChannelError();
michael@0 1661 }
michael@0 1662 return;
michael@0 1663 }
michael@0 1664
michael@0 1665 if (ChannelOpening == mChannelState) {
michael@0 1666 // Mimic CloseWithError().
michael@0 1667 SynchronouslyClose();
michael@0 1668 mChannelState = ChannelError;
michael@0 1669 PostErrorNotifyTask();
michael@0 1670 return;
michael@0 1671 }
michael@0 1672
michael@0 1673 if (ChannelConnected != mChannelState) {
michael@0 1674 // XXX be strict about this until there's a compelling reason
michael@0 1675 // to relax
michael@0 1676 NS_RUNTIMEABORT("Close() called on closed channel!");
michael@0 1677 }
michael@0 1678
michael@0 1679 // notify the other side that we're about to close our socket
michael@0 1680 mLink->SendMessage(new GoodbyeMessage());
michael@0 1681 SynchronouslyClose();
michael@0 1682 }
michael@0 1683
michael@0 1684 NotifyChannelClosed();
michael@0 1685 }
michael@0 1686
michael@0 1687 void
michael@0 1688 MessageChannel::NotifyChannelClosed()
michael@0 1689 {
michael@0 1690 mMonitor->AssertNotCurrentThreadOwns();
michael@0 1691
michael@0 1692 if (ChannelClosed != mChannelState)
michael@0 1693 NS_RUNTIMEABORT("channel should have been closed!");
michael@0 1694
michael@0 1695 // OK, the IO thread just closed the channel normally. Let the
michael@0 1696 // listener know about it.
michael@0 1697 mListener->OnChannelClose();
michael@0 1698
michael@0 1699 Clear();
michael@0 1700 }
michael@0 1701
michael@0 1702 void
michael@0 1703 MessageChannel::DebugAbort(const char* file, int line, const char* cond,
michael@0 1704 const char* why,
michael@0 1705 bool reply) const
michael@0 1706 {
michael@0 1707 printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
michael@0 1708 "Assertion (%s) failed. %s %s\n",
michael@0 1709 mSide == ChildSide ? "Child" : "Parent",
michael@0 1710 file, line, cond,
michael@0 1711 why,
michael@0 1712 reply ? "(reply)" : "");
michael@0 1713 // technically we need the mutex for this, but we're dying anyway
michael@0 1714 DumpInterruptStack(" ");
michael@0 1715 printf_stderr(" remote Interrupt stack guess: %lu\n",
michael@0 1716 mRemoteStackDepthGuess);
michael@0 1717 printf_stderr(" deferred stack size: %lu\n",
michael@0 1718 mDeferred.size());
michael@0 1719 printf_stderr(" out-of-turn Interrupt replies stack size: %lu\n",
michael@0 1720 mOutOfTurnReplies.size());
michael@0 1721 printf_stderr(" Pending queue size: %lu, front to back:\n",
michael@0 1722 mPending.size());
michael@0 1723
michael@0 1724 MessageQueue pending = mPending;
michael@0 1725 while (!pending.empty()) {
michael@0 1726 printf_stderr(" [ %s%s ]\n",
michael@0 1727 pending.front().is_interrupt() ? "intr" :
michael@0 1728 (pending.front().is_sync() ? "sync" : "async"),
michael@0 1729 pending.front().is_reply() ? "reply" : "");
michael@0 1730 pending.pop_front();
michael@0 1731 }
michael@0 1732
michael@0 1733 NS_RUNTIMEABORT(why);
michael@0 1734 }
michael@0 1735
michael@0 1736 void
michael@0 1737 MessageChannel::DumpInterruptStack(const char* const pfx) const
michael@0 1738 {
michael@0 1739 NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
michael@0 1740 "The worker thread had better be paused in a debugger!");
michael@0 1741
michael@0 1742 printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
michael@0 1743
michael@0 1744 // print a python-style backtrace, first frame to last
michael@0 1745 for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
michael@0 1746 int32_t id;
michael@0 1747 const char* dir, *sems, *name;
michael@0 1748 mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
michael@0 1749
michael@0 1750 printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
michael@0 1751 i, dir, sems, name, id);
michael@0 1752 }
michael@0 1753 }
michael@0 1754
michael@0 1755 } // ipc
michael@0 1756 } // mozilla

mercurial