ipc/glue/MessageChannel.cpp

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
     2  * vim: sw=4 ts=4 et :
     3  */
     4 /* This Source Code Form is subject to the terms of the Mozilla Public
     5  * License, v. 2.0. If a copy of the MPL was not distributed with this
     6  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     8 #include "mozilla/ipc/MessageChannel.h"
     9 #include "mozilla/ipc/ProtocolUtils.h"
    11 #include "mozilla/Assertions.h"
    12 #include "mozilla/DebugOnly.h"
    13 #include "mozilla/Move.h"
    14 #include "nsDebug.h"
    15 #include "nsISupportsImpl.h"
    17 // Undo the damage done by mozzconf.h
    18 #undef compress
    20 using namespace mozilla;
    21 using namespace std;
    23 using mozilla::MonitorAutoLock;
    24 using mozilla::MonitorAutoUnlock;
    26 template<>
    27 struct RunnableMethodTraits<mozilla::ipc::MessageChannel>
    28 {
    29     static void RetainCallee(mozilla::ipc::MessageChannel* obj) { }
    30     static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { }
    31 };
    33 #define IPC_ASSERT(_cond, ...)                                      \
    34     do {                                                            \
    35         if (!(_cond))                                               \
    36             DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__);  \
    37     } while (0)
    39 namespace mozilla {
    40 namespace ipc {
    42 const int32_t MessageChannel::kNoTimeout = INT32_MIN;
    44 // static
    45 bool MessageChannel::sIsPumpingMessages = false;
    47 enum Direction
    48 {
    49     IN_MESSAGE,
    50     OUT_MESSAGE
    51 };
    54 class MessageChannel::InterruptFrame
    55 {
    56 private:
    57     enum Semantics
    58     {
    59         INTR_SEMS,
    60         SYNC_SEMS,
    61         ASYNC_SEMS
    62     };
    64 public:
    65     InterruptFrame(Direction direction, const Message* msg)
    66       : mMessageName(strdup(msg->name())),
    67         mMessageRoutingId(msg->routing_id()),
    68         mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
    69                           msg->is_sync() ? SYNC_SEMS :
    70                           ASYNC_SEMS),
    71         mDirection(direction),
    72         mMoved(false)
    73     {
    74         MOZ_ASSERT(mMessageName);
    75     }
    77     InterruptFrame(InterruptFrame&& aOther)
    78     {
    79         MOZ_ASSERT(aOther.mMessageName);
    80         mMessageName = aOther.mMessageName;
    81         aOther.mMessageName = nullptr;
    82         aOther.mMoved = true;
    84         mMessageRoutingId = aOther.mMessageRoutingId;
    85         mMesageSemantics = aOther.mMesageSemantics;
    86         mDirection = aOther.mDirection;
    87     }
    89     ~InterruptFrame()
    90     {
    91         MOZ_ASSERT_IF(!mMessageName, mMoved);
    93         if (mMessageName)
    94             free(const_cast<char*>(mMessageName));
    95     }
    97     InterruptFrame& operator=(InterruptFrame&& aOther)
    98     {
    99         MOZ_ASSERT(&aOther != this);
   100         this->~InterruptFrame();
   101         new (this) InterruptFrame(mozilla::Move(aOther));
   102         return *this;
   103     }
   105     bool IsInterruptIncall() const
   106     {
   107         return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
   108     }
   110     bool IsInterruptOutcall() const
   111     {
   112         return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
   113     }
   115     void Describe(int32_t* id, const char** dir, const char** sems,
   116                   const char** name) const
   117     {
   118         *id = mMessageRoutingId;
   119         *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
   120         *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
   121                 (SYNC_SEMS == mMesageSemantics) ? "sync" :
   122                 "async";
   123         *name = mMessageName;
   124     }
   126 private:
   127     const char* mMessageName;
   128     int32_t mMessageRoutingId;
   129     Semantics mMesageSemantics;
   130     Direction mDirection;
   131     DebugOnly<bool> mMoved;
   133     // Disable harmful methods.
   134     InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE;
   135     InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE;
   136 };
   138 class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
   139 {
   140 public:
   141     CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
   142       : mThat(that)
   143     {
   144         mThat.AssertWorkerThread();
   146         if (mThat.mCxxStackFrames.empty())
   147             mThat.EnteredCxxStack();
   149         mThat.mCxxStackFrames.append(InterruptFrame(direction, msg));
   151         const InterruptFrame& frame = mThat.mCxxStackFrames.back();
   153         if (frame.IsInterruptIncall())
   154             mThat.EnteredCall();
   156         mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
   157     }
   159     ~CxxStackFrame() {
   160         mThat.AssertWorkerThread();
   162         MOZ_ASSERT(!mThat.mCxxStackFrames.empty());
   164         bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall();
   165         mThat.mCxxStackFrames.shrinkBy(1);
   167         bool exitingStack = mThat.mCxxStackFrames.empty();
   169         // mListener could have gone away if Close() was called while
   170         // MessageChannel code was still on the stack
   171         if (!mThat.mListener)
   172             return;
   174         if (exitingCall)
   175             mThat.ExitedCall();
   177         if (exitingStack)
   178             mThat.ExitedCxxStack();
   179     }
   180 private:
   181     MessageChannel& mThat;
   183     // Disable harmful methods.
   184     CxxStackFrame() MOZ_DELETE;
   185     CxxStackFrame(const CxxStackFrame&) MOZ_DELETE;
   186     CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE;
   187 };
   189 MessageChannel::MessageChannel(MessageListener *aListener)
   190   : mListener(aListener->asWeakPtr()),
   191     mChannelState(ChannelClosed),
   192     mSide(UnknownSide),
   193     mLink(nullptr),
   194     mWorkerLoop(nullptr),
   195     mChannelErrorTask(nullptr),
   196     mWorkerLoopID(-1),
   197     mTimeoutMs(kNoTimeout),
   198     mInTimeoutSecondHalf(false),
   199     mNextSeqno(0),
   200     mPendingSyncReplies(0),
   201     mPendingUrgentReplies(0),
   202     mPendingRPCReplies(0),
   203     mCurrentRPCTransaction(0),
   204     mDispatchingSyncMessage(false),
   205     mDispatchingUrgentMessageCount(0),
   206     mRemoteStackDepthGuess(false),
   207     mSawInterruptOutMsg(false),
   208     mAbortOnError(false),
   209     mFlags(REQUIRE_DEFAULT)
   210 {
   211     MOZ_COUNT_CTOR(ipc::MessageChannel);
   213 #ifdef OS_WIN
   214     mTopFrame = nullptr;
   215     mIsSyncWaitingOnNonMainThread = false;
   216 #endif
   218     mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
   219                                                  this,
   220                                                  &MessageChannel::OnMaybeDequeueOne));
   222 #ifdef OS_WIN
   223     mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
   224     NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
   225 #endif
   226 }
   228 MessageChannel::~MessageChannel()
   229 {
   230     MOZ_COUNT_DTOR(ipc::MessageChannel);
   231     IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
   232 #ifdef OS_WIN
   233     DebugOnly<BOOL> ok = CloseHandle(mEvent);
   234     MOZ_ASSERT(ok);
   235 #endif
   236     Clear();
   237 }
   239 static void
   240 PrintErrorMessage(Side side, const char* channelName, const char* msg)
   241 {
   242     const char *from = (side == ChildSide)
   243                        ? "Child"
   244                        : ((side == ParentSide) ? "Parent" : "Unknown");
   245     printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
   246 }
   248 bool
   249 MessageChannel::Connected() const
   250 {
   251     mMonitor->AssertCurrentThreadOwns();
   253     // The transport layer allows us to send messages before
   254     // receiving the "connected" ack from the remote side.
   255     return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
   256 }
   258 bool
   259 MessageChannel::CanSend() const
   260 {
   261     MonitorAutoLock lock(*mMonitor);
   262     return Connected();
   263 }
   265 void
   266 MessageChannel::Clear()
   267 {
   268     // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
   269     // AssertWorkerThread().
   270     //
   271     // Also don't clear mListener.  If we clear it, then sending a message
   272     // through this channel after it's Clear()'ed can cause this process to
   273     // crash.
   274     //
   275     // In practice, mListener owns the channel, so the channel gets deleted
   276     // before mListener.  But just to be safe, mListener is a weak pointer.
   278     mDequeueOneTask->Cancel();
   280     mWorkerLoop = nullptr;
   281     delete mLink;
   282     mLink = nullptr;
   284     if (mChannelErrorTask) {
   285         mChannelErrorTask->Cancel();
   286         mChannelErrorTask = nullptr;
   287     }
   289     // Free up any memory used by pending messages.
   290     mPending.clear();
   291     mPendingUrgentRequest = nullptr;
   292     mPendingRPCCall = nullptr;
   293     mOutOfTurnReplies.clear();
   294     while (!mDeferred.empty()) {
   295         mDeferred.pop();
   296     }
   297 }
   299 bool
   300 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
   301 {
   302     NS_PRECONDITION(!mLink, "Open() called > once");
   304     mMonitor = new RefCountedMonitor();
   305     mWorkerLoop = MessageLoop::current();
   306     mWorkerLoopID = mWorkerLoop->id();
   308     ProcessLink *link = new ProcessLink(this);
   309     link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
   310     mLink = link;
   311     return true;
   312 }
   314 bool
   315 MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
   316 {
   317     // Opens a connection to another thread in the same process.
   319     //  This handshake proceeds as follows:
   320     //  - Let A be the thread initiating the process (either child or parent)
   321     //    and B be the other thread.
   322     //  - A spawns thread for B, obtaining B's message loop
   323     //  - A creates ProtocolChild and ProtocolParent instances.
   324     //    Let PA be the one appropriate to A and PB the side for B.
   325     //  - A invokes PA->Open(PB, ...):
   326     //    - set state to mChannelOpening
   327     //    - this will place a work item in B's worker loop (see next bullet)
   328     //      and then spins until PB->mChannelState becomes mChannelConnected
   329     //    - meanwhile, on PB's worker loop, the work item is removed and:
   330     //      - invokes PB->SlaveOpen(PA, ...):
   331     //        - sets its state and that of PA to Connected
   332     NS_PRECONDITION(aTargetChan, "Need a target channel");
   333     NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
   335     CommonThreadOpenInit(aTargetChan, aSide);
   337     Side oppSide = UnknownSide;
   338     switch(aSide) {
   339       case ChildSide: oppSide = ParentSide; break;
   340       case ParentSide: oppSide = ChildSide; break;
   341       case UnknownSide: break;
   342     }
   344     mMonitor = new RefCountedMonitor();
   346     MonitorAutoLock lock(*mMonitor);
   347     mChannelState = ChannelOpening;
   348     aTargetLoop->PostTask(
   349         FROM_HERE,
   350         NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide));
   352     while (ChannelOpening == mChannelState)
   353         mMonitor->Wait();
   354     NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
   355     return (ChannelConnected == mChannelState);
   356 }
   358 void
   359 MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
   360 {
   361     // Invoked when the other side has begun the open.
   362     NS_PRECONDITION(ChannelClosed == mChannelState,
   363                     "Not currently closed");
   364     NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
   365                     "Target channel not in the process of opening");
   367     CommonThreadOpenInit(aTargetChan, aSide);
   368     mMonitor = aTargetChan->mMonitor;
   370     MonitorAutoLock lock(*mMonitor);
   371     NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
   372                  "Target channel not in the process of opening");
   373     mChannelState = ChannelConnected;
   374     aTargetChan->mChannelState = ChannelConnected;
   375     aTargetChan->mMonitor->Notify();
   376 }
   378 void
   379 MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
   380 {
   381     mWorkerLoop = MessageLoop::current();
   382     mWorkerLoopID = mWorkerLoop->id();
   383     mLink = new ThreadLink(this, aTargetChan);
   384     mSide = aSide;
   385 }
   387 bool
   388 MessageChannel::Echo(Message* aMsg)
   389 {
   390     nsAutoPtr<Message> msg(aMsg);
   391     AssertWorkerThread();
   392     mMonitor->AssertNotCurrentThreadOwns();
   393     if (MSG_ROUTING_NONE == msg->routing_id()) {
   394         ReportMessageRouteError("MessageChannel::Echo");
   395         return false;
   396     }
   398     MonitorAutoLock lock(*mMonitor);
   400     if (!Connected()) {
   401         ReportConnectionError("MessageChannel");
   402         return false;
   403     }
   405     mLink->EchoMessage(msg.forget());
   406     return true;
   407 }
   409 bool
   410 MessageChannel::Send(Message* aMsg)
   411 {
   412     CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);
   414     nsAutoPtr<Message> msg(aMsg);
   415     AssertWorkerThread();
   416     mMonitor->AssertNotCurrentThreadOwns();
   417     if (MSG_ROUTING_NONE == msg->routing_id()) {
   418         ReportMessageRouteError("MessageChannel::Send");
   419         return false;
   420     }
   422     MonitorAutoLock lock(*mMonitor);
   423     if (!Connected()) {
   424         ReportConnectionError("MessageChannel");
   425         return false;
   426     }
   427     mLink->SendMessage(msg.forget());
   428     return true;
   429 }
   431 bool
   432 MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
   433 {
   434     AssertLinkThread();
   435     mMonitor->AssertCurrentThreadOwns();
   437     if (MSG_ROUTING_NONE == aMsg.routing_id() &&
   438         GOODBYE_MESSAGE_TYPE == aMsg.type())
   439     {
   440         // :TODO: Sort out Close() on this side racing with Close() on the
   441         // other side
   442         mChannelState = ChannelClosing;
   443         if (LoggingEnabled()) {
   444             printf("NOTE: %s process received `Goodbye', closing down\n",
   445                    (mSide == ChildSide) ? "child" : "parent");
   446         }
   447         return true;
   448     }
   449     return false;
   450 }
   452 void
   453 MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
   454 {
   455     AssertLinkThread();
   456     mMonitor->AssertCurrentThreadOwns();
   458     if (MaybeInterceptSpecialIOMessage(aMsg))
   459         return;
   461     // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply,
   462     // we know that it needs to be immediately handled to unblock us.
   463     if ((AwaitingSyncReply() && aMsg.is_sync()) ||
   464         (AwaitingUrgentReply() && aMsg.is_urgent()) ||
   465         (AwaitingRPCReply() && aMsg.is_rpc()))
   466     {
   467         mRecvd = new Message(aMsg);
   468         NotifyWorkerThread();
   469         return;
   470     }
   472     // Urgent messages cannot be compressed.
   473     MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent());
   475     bool compress = (aMsg.compress() && !mPending.empty() &&
   476                      mPending.back().type() == aMsg.type() &&
   477                      mPending.back().routing_id() == aMsg.routing_id());
   478     if (compress) {
   479         // This message type has compression enabled, and the back of the
   480         // queue was the same message type and routed to the same destination.
   481         // Replace it with the newer message.
   482         MOZ_ASSERT(mPending.back().compress());
   483         mPending.pop_back();
   484     }
   486     bool shouldWakeUp = AwaitingInterruptReply() ||
   487                         // Allow incoming RPCs to be processed inside an urgent message.
   488                         (AwaitingUrgentReply() && aMsg.is_rpc()) ||
   489                         // Always process urgent messages while blocked.
   490                         ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent());
   492     // There are four cases we're concerned about, relating to the state of the
   493     // main thread:
   494     //
   495     // (1) We are waiting on a sync|rpc reply - main thread is blocked on the
   496     //     IPC monitor.
   497     //   - If the message is high priority, we wake up the main thread to
   498     //     deliver the message. Otherwise, we leave it in the mPending queue,
   499     //     posting a task to the main event loop, where it will be processed
   500     //     once the synchronous reply has been received.
   501     //
   502     // (2) We are waiting on an Interrupt reply - main thread is blocked on the
   503     //     IPC monitor.
   504     //   - Always notify and wake up the main thread.
   505     //
   506     // (3) We are not waiting on a reply.
   507     //   - We post a task to the main event loop.
   508     //
   509     // Note that, we may notify the main thread even though the monitor is not
   510     // blocked. This is okay, since we always check for pending events before
   511     // blocking again.
   513     if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) {
   514         // If we're receiving an RPC message while blocked on an urgent message,
   515         // we must defer any messages that were not sent as part of the child
   516         // answering the urgent message.
   517         //
   518         // We must also be sure that we will not accidentally defer any RPC
   519         // message that was sent while answering an urgent message. Otherwise,
   520         // we will deadlock.
   521         //
   522         // On the parent side, the current transaction can only transition from 0
   523         // to an ID, either by us issuing an urgent request while not blocked, or
   524         // by receiving an RPC request while not blocked. When we unblock, the
   525         // current transaction is reset to 0.
   526         //
   527         // When the child side receives an urgent message, any RPC messages sent
   528         // before issuing the urgent reply will carry the urgent message's
   529         // transaction ID.
   530         //
   531         // Since AwaitingUrgentReply() implies we are blocked, it also implies
   532         // that we are within a transaction that will not change until we are
   533         // completely unblocked (i.e, the transaction has completed).
   534         if (aMsg.transaction_id() != mCurrentRPCTransaction)
   535             shouldWakeUp = false;
   536     }
   538     if (aMsg.is_urgent()) {
   539         MOZ_ASSERT(!mPendingUrgentRequest);
   540         mPendingUrgentRequest = new Message(aMsg);
   541     } else if (aMsg.is_rpc() && shouldWakeUp) {
   542         // Only use this slot if we need to wake up for an RPC call. Otherwise
   543         // we treat it like a normal async or sync message.
   544         MOZ_ASSERT(!mPendingRPCCall);
   545         mPendingRPCCall = new Message(aMsg);
   546     } else {
   547         mPending.push_back(aMsg);
   548     }
   550     if (shouldWakeUp) {
   551         // Always wake up Interrupt waiters, sync waiters for urgent messages,
   552         // RPC waiters for urgent messages, and urgent waiters for RPCs in the
   553         // same transaction.
   554         NotifyWorkerThread();
   555     } else {
   556         // Worker thread is either not blocked on a reply, or this is an
   557         // incoming Interrupt that raced with outgoing sync, and needs to be
   558         // deferred to a later event-loop iteration.
   559         if (!compress) {
   560             // If we compressed away the previous message, we'll re-use
   561             // its pending task.
   562             mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
   563         }
   564     }
   565 }
   567 bool
   568 MessageChannel::Send(Message* aMsg, Message* aReply)
   569 {
   570     // Sanity checks.
   571     AssertWorkerThread();
   572     mMonitor->AssertNotCurrentThreadOwns();
   574 #ifdef OS_WIN
   575     SyncStackFrame frame(this, false);
   576 #endif
   578     CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   580     MonitorAutoLock lock(*mMonitor);
   582     IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
   583     IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant");
   584     IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message");
   585     IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported");
   587     AutoEnterPendingReply replies(mPendingSyncReplies);
   588     if (!SendAndWait(aMsg, aReply))
   589         return false;
   591     NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync");
   592     return true;
   593 }
   595 bool
   596 MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
   597 {
   598     AssertWorkerThread();
   599     mMonitor->AssertNotCurrentThreadOwns();
   600     IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child");
   602 #ifdef OS_WIN
   603     SyncStackFrame frame(this, false);
   604 #endif
   606     CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   608     MonitorAutoLock lock(*mMonitor);
   610     IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls");
   611     IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends");
   613     AutoEnterRPCTransaction transact(this);
   614     aMsg->set_transaction_id(mCurrentRPCTransaction);
   616     AutoEnterPendingReply replies(mPendingUrgentReplies);
   617     if (!SendAndWait(aMsg, aReply))
   618         return false;
   620     NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent");
   621     return true;
   622 }
   624 bool
   625 MessageChannel::RPCCall(Message* aMsg, Message* aReply)
   626 {
   627     AssertWorkerThread();
   628     mMonitor->AssertNotCurrentThreadOwns();
   629     IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent");
   631 #ifdef OS_WIN
   632     SyncStackFrame frame(this, false);
   633 #endif
   635     CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   637     MonitorAutoLock lock(*mMonitor);
   639     AutoEnterRPCTransaction transact(this);
   640     aMsg->set_transaction_id(mCurrentRPCTransaction);
   642     AutoEnterPendingReply replies(mPendingRPCReplies);
   643     if (!SendAndWait(aMsg, aReply))
   644         return false;
   646     NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply");
   647     return true;
   648 }
   650 bool
   651 MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
   652 {
   653     mMonitor->AssertCurrentThreadOwns();
   655     nsAutoPtr<Message> msg(aMsg);
   657     if (!Connected()) {
   658         ReportConnectionError("MessageChannel::SendAndWait");
   659         return false;
   660     }
   662     msg->set_seqno(NextSeqno());
   664     DebugOnly<int32_t> replySeqno = msg->seqno();
   665     DebugOnly<msgid_t> replyType = msg->type() + 1;
   667     mLink->SendMessage(msg.forget());
   669     while (true) {
   670         // Wait for an event to occur.
   671         while (true) {
   672             if (mRecvd || mPendingUrgentRequest || mPendingRPCCall)
   673                 break;
   675             bool maybeTimedOut = !WaitForSyncNotify();
   677             if (!Connected()) {
   678                 ReportConnectionError("MessageChannel::SendAndWait");
   679                 return false;
   680             }
   682             if (maybeTimedOut && !ShouldContinueFromTimeout())
   683                 return false;
   684         }
   686         if (mPendingUrgentRequest && !ProcessPendingUrgentRequest())
   687             return false;
   689         if (mPendingRPCCall && !ProcessPendingRPCCall())
   690             return false;
   692         if (mRecvd) {
   693             NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply");
   695             if (mRecvd->is_reply_error()) {
   696                 mRecvd = nullptr;
   697                 return false;
   698             }
   700             NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type");
   701             NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number");
   703             *aReply = *mRecvd;
   704             mRecvd = nullptr;
   705             return true;
   706         }
   707     }
   709     return true;
   710 }
   712 bool
   713 MessageChannel::Call(Message* aMsg, Message* aReply)
   714 {
   715     if (aMsg->is_urgent())
   716         return UrgentCall(aMsg, aReply);
   717     if (aMsg->is_rpc())
   718         return RPCCall(aMsg, aReply);
   719     return InterruptCall(aMsg, aReply);
   720 }
   722 bool
   723 MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
   724 {
   725     AssertWorkerThread();
   726     mMonitor->AssertNotCurrentThreadOwns();
   728 #ifdef OS_WIN
   729     SyncStackFrame frame(this, true);
   730 #endif
   732     // This must come before MonitorAutoLock, as its destructor acquires the
   733     // monitor lock.
   734     CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg);
   736     MonitorAutoLock lock(*mMonitor);
   737     if (!Connected()) {
   738         ReportConnectionError("MessageChannel::Call");
   739         return false;
   740     }
   742     // Sanity checks.
   743     IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(),
   744                "cannot issue Interrupt call whiel blocked on sync or urgent");
   745     IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH,
   746                "violation of sync handler invariant");
   747     IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here");
   750     nsAutoPtr<Message> msg(aMsg);
   752     msg->set_seqno(NextSeqno());
   753     msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
   754     msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
   755     mInterruptStack.push(*msg);
   756     mLink->SendMessage(msg.forget());
   758     while (true) {
   759         // if a handler invoked by *Dispatch*() spun a nested event
   760         // loop, and the connection was broken during that loop, we
   761         // might have already processed the OnError event. if so,
   762         // trying another loop iteration will be futile because
   763         // channel state will have been cleared
   764         if (!Connected()) {
   765             ReportConnectionError("MessageChannel::InterruptCall");
   766             return false;
   767         }
   769         // Now might be the time to process a message deferred because of race
   770         // resolution.
   771         MaybeUndeferIncall();
   773         // Wait for an event to occur.
   774         while (!InterruptEventOccurred()) {
   775             bool maybeTimedOut = !WaitForInterruptNotify();
   777             // We might have received a "subtly deferred" message in a nested
   778             // loop that it's now time to process.
   779             if (InterruptEventOccurred() ||
   780                 (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
   781             {
   782                 break;
   783             }
   785             if (maybeTimedOut && !ShouldContinueFromTimeout())
   786                 return false;
   787         }
   789         Message recvd;
   790         MessageMap::iterator it;
   792         if (mPendingUrgentRequest) {
   793             recvd = *mPendingUrgentRequest;
   794             mPendingUrgentRequest = nullptr;
   795         } else if (mPendingRPCCall) {
   796             recvd = *mPendingRPCCall;
   797             mPendingRPCCall = nullptr;
   798         } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
   799                     != mOutOfTurnReplies.end())
   800         {
   801             recvd = it->second;
   802             mOutOfTurnReplies.erase(it);
   803         } else if (!mPending.empty()) {
   804             recvd = mPending.front();
   805             mPending.pop_front();
   806         } else {
   807             // because of subtleties with nested event loops, it's possible
   808             // that we got here and nothing happened.  or, we might have a
   809             // deferred in-call that needs to be processed.  either way, we
   810             // won't break the inner while loop again until something new
   811             // happens.
   812             continue;
   813         }
   815         // If the message is not Interrupt, we can dispatch it as normal.
   816         if (!recvd.is_interrupt()) {
   817             // Other side should be blocked.
   818             IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked");
   820             {
   821                 AutoEnterRPCTransaction transaction(this, &recvd);
   822                 MonitorAutoUnlock unlock(*mMonitor);
   823                 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
   824                 DispatchMessage(recvd);
   825             }
   826             if (!Connected()) {
   827                 ReportConnectionError("MessageChannel::DispatchMessage");
   828                 return false;
   829             }
   830             continue;
   831         }
   833         // If the message is an Interrupt reply, either process it as a reply to our
   834         // call, or add it to the list of out-of-turn replies we've received.
   835         if (recvd.is_reply()) {
   836             IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");
   838             // If this is not a reply the call we've initiated, add it to our
   839             // out-of-turn replies and keep polling for events.
   840             {
   841                 const Message &outcall = mInterruptStack.top();
   843                 // Note, In the parent, sequence numbers increase from 0, and
   844                 // in the child, they decrease from 0.
   845                 if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
   846                     (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
   847                 {
   848                     mOutOfTurnReplies[recvd.seqno()] = recvd;
   849                     continue;
   850                 }
   852                 IPC_ASSERT(recvd.is_reply_error() ||
   853                            (recvd.type() == (outcall.type() + 1) &&
   854                             recvd.seqno() == outcall.seqno()),
   855                            "somebody's misbehavin'", true);
   856             }
   858             // We received a reply to our most recent outstanding call. Pop
   859             // this frame and return the reply.
   860             mInterruptStack.pop();
   862             if (!recvd.is_reply_error()) {
   863                 *aReply = recvd;
   864             }
   866             // If we have no more pending out calls waiting on replies, then
   867             // the reply queue should be empty.
   868             IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
   869                        "still have pending replies with no pending out-calls",
   870                        true);
   872             return !recvd.is_reply_error();
   873         }
   875         // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
   876         // own the monitor.
   877         size_t stackDepth = InterruptStackDepth();
   878         {
   879             MonitorAutoUnlock unlock(*mMonitor);
   881             CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
   882             DispatchInterruptMessage(recvd, stackDepth);
   883         }
   884         if (!Connected()) {
   885             ReportConnectionError("MessageChannel::DispatchInterruptMessage");
   886             return false;
   887         }
   888     }
   890     return true;
   891 }
   893 bool
   894 MessageChannel::InterruptEventOccurred()
   895 {
   896     AssertWorkerThread();
   897     mMonitor->AssertCurrentThreadOwns();
   898     IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
   900     return (!Connected() ||
   901             !mPending.empty() ||
   902             mPendingUrgentRequest ||
   903             mPendingRPCCall ||
   904             (!mOutOfTurnReplies.empty() &&
   905              mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
   906              mOutOfTurnReplies.end()));
   907 }
   909 bool
   910 MessageChannel::ProcessPendingUrgentRequest()
   911 {
   912     AssertWorkerThread();
   913     mMonitor->AssertCurrentThreadOwns();
   915     // Note that it is possible we could have sent a sync message at
   916     // the same time the parent process sent an urgent message, and
   917     // therefore mPendingUrgentRequest is set *and* mRecvd is set as
   918     // well, because the link thread received both before the worker
   919     // thread woke up.
   920     //
   921     // In this case, we process the urgent message first, but we need
   922     // to save the reply.
   923     nsAutoPtr<Message> savedReply(mRecvd.forget());
   925     // We're the child process. We should not be receiving RPC calls.
   926     IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call");
   928     nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
   929     {
   930         // In order to send the parent RPC messages and guarantee it will
   931         // wake up, we must re-use its transaction.
   932         AutoEnterRPCTransaction transaction(this, recvd);
   934         MonitorAutoUnlock unlock(*mMonitor);
   935         DispatchUrgentMessage(*recvd);
   936     }
   937     if (!Connected()) {
   938         ReportConnectionError("MessageChannel::DispatchUrgentMessage");
   939         return false;
   940     }
   942     // In between having dispatched our reply to the parent process, and
   943     // re-acquiring the monitor, the parent process could have already
   944     // processed that reply and sent the reply to our sync message. If so,
   945     // our saved reply should be empty.
   946     IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
   947     if (!mRecvd)
   948         mRecvd = savedReply.forget();
   949     return true;
   950 }
   952 bool
   953 MessageChannel::ProcessPendingRPCCall()
   954 {
   955     AssertWorkerThread();
   956     mMonitor->AssertCurrentThreadOwns();
   958     // See comment above re: mRecvd replies and incoming calls.
   959     nsAutoPtr<Message> savedReply(mRecvd.forget());
   961     IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message");
   963     nsAutoPtr<Message> recvd(mPendingRPCCall.forget());
   964     {
   965         // If we are not currently in a transaction, this will begin one,
   966         // and the link thread will not wake us up for any RPC messages not
   967         // apart of this transaction. If we are already in a transaction,
   968         // then this will assert that we're still in the same transaction.
   969         AutoEnterRPCTransaction transaction(this, recvd);
   971         MonitorAutoUnlock unlock(*mMonitor);
   972         DispatchRPCMessage(*recvd);
   973     }
   974     if (!Connected()) {
   975         ReportConnectionError("MessageChannel::DispatchRPCMessage");
   976         return false;
   977     }
   979     // In between having dispatched our reply to the parent process, and
   980     // re-acquiring the monitor, the parent process could have already
   981     // processed that reply and sent the reply to our sync message. If so,
   982     // our saved reply should be empty.
   983     IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
   984     if (!mRecvd)
   985         mRecvd = savedReply.forget();
   986     return true;
   987 }
   989 bool
   990 MessageChannel::DequeueOne(Message *recvd)
   991 {
   992     AssertWorkerThread();
   993     mMonitor->AssertCurrentThreadOwns();
   995     if (!Connected()) {
   996         ReportConnectionError("OnMaybeDequeueOne");
   997         return false;
   998     }
  1000     if (mPendingUrgentRequest) {
  1001         *recvd = *mPendingUrgentRequest;
  1002         mPendingUrgentRequest = nullptr;
  1003         return true;
  1006     if (mPendingRPCCall) {
  1007         *recvd = *mPendingRPCCall;
  1008         mPendingRPCCall = nullptr;
  1009         return true;
  1012     if (!mDeferred.empty())
  1013         MaybeUndeferIncall();
  1015     if (mPending.empty())
  1016         return false;
  1018     *recvd = mPending.front();
  1019     mPending.pop_front();
  1020     return true;
  1023 bool
  1024 MessageChannel::OnMaybeDequeueOne()
  1026     AssertWorkerThread();
  1027     mMonitor->AssertNotCurrentThreadOwns();
  1029     Message recvd;
  1031     MonitorAutoLock lock(*mMonitor);
  1032     if (!DequeueOne(&recvd))
  1033         return false;
  1035     if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
  1036         // We probably just received a reply in a nested loop for an
  1037         // Interrupt call sent before entering that loop.
  1038         mOutOfTurnReplies[recvd.seqno()] = recvd;
  1039         return false;
  1043         // We should not be in a transaction yet if we're not blocked.
  1044         MOZ_ASSERT(mCurrentRPCTransaction == 0);
  1045         AutoEnterRPCTransaction transaction(this, &recvd);
  1047         MonitorAutoUnlock unlock(*mMonitor);
  1049         CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
  1050         DispatchMessage(recvd);
  1052     return true;
  1055 void
  1056 MessageChannel::DispatchMessage(const Message &aMsg)
  1058     if (aMsg.is_sync())
  1059         DispatchSyncMessage(aMsg);
  1060     else if (aMsg.is_urgent())
  1061         DispatchUrgentMessage(aMsg);
  1062     else if (aMsg.is_interrupt())
  1063         DispatchInterruptMessage(aMsg, 0);
  1064     else if (aMsg.is_rpc())
  1065         DispatchRPCMessage(aMsg);
  1066     else
  1067         DispatchAsyncMessage(aMsg);
  1070 void
  1071 MessageChannel::DispatchSyncMessage(const Message& aMsg)
  1073     AssertWorkerThread();
  1075     Message *reply = nullptr;
  1077     mDispatchingSyncMessage = true;
  1078     Result rv = mListener->OnMessageReceived(aMsg, reply);
  1079     mDispatchingSyncMessage = false;
  1081     if (!MaybeHandleError(rv, "DispatchSyncMessage")) {
  1082         delete reply;
  1083         reply = new Message();
  1084         reply->set_sync();
  1085         reply->set_reply();
  1086         reply->set_reply_error();
  1088     reply->set_seqno(aMsg.seqno());
  1090     MonitorAutoLock lock(*mMonitor);
  1091     if (ChannelConnected == mChannelState)
  1092         mLink->SendMessage(reply);
  1095 void
  1096 MessageChannel::DispatchUrgentMessage(const Message& aMsg)
  1098     AssertWorkerThread();
  1099     MOZ_ASSERT(aMsg.is_urgent());
  1101     Message *reply = nullptr;
  1103     mDispatchingUrgentMessageCount++;
  1104     Result rv = mListener->OnCallReceived(aMsg, reply);
  1105     mDispatchingUrgentMessageCount--;
  1107     if (!MaybeHandleError(rv, "DispatchUrgentMessage")) {
  1108         delete reply;
  1109         reply = new Message();
  1110         reply->set_urgent();
  1111         reply->set_reply();
  1112         reply->set_reply_error();
  1114     reply->set_seqno(aMsg.seqno());
  1116     MonitorAutoLock lock(*mMonitor);
  1117     if (ChannelConnected == mChannelState)
  1118         mLink->SendMessage(reply);
  1121 void
  1122 MessageChannel::DispatchRPCMessage(const Message& aMsg)
  1124     AssertWorkerThread();
  1125     MOZ_ASSERT(aMsg.is_rpc());
  1127     Message *reply = nullptr;
  1129     if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) {
  1130         delete reply;
  1131         reply = new Message();
  1132         reply->set_rpc();
  1133         reply->set_reply();
  1134         reply->set_reply_error();
  1136     reply->set_seqno(aMsg.seqno());
  1138     MonitorAutoLock lock(*mMonitor);
  1139     if (ChannelConnected == mChannelState)
  1140         mLink->SendMessage(reply);
  1143 void
  1144 MessageChannel::DispatchAsyncMessage(const Message& aMsg)
  1146     AssertWorkerThread();
  1147     MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent());
  1149     if (aMsg.routing_id() == MSG_ROUTING_NONE) {
  1150         NS_RUNTIMEABORT("unhandled special message!");
  1153     MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage");
  1156 void
  1157 MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth)
  1159     AssertWorkerThread();
  1160     mMonitor->AssertNotCurrentThreadOwns();
  1162     IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");
  1164     // Race detection: see the long comment near mRemoteStackDepthGuess in
  1165     // MessageChannel.h. "Remote" stack depth means our side, and "local" means
  1166     // the other side.
  1167     if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
  1168         // Interrupt in-calls have raced. The winner, if there is one, gets to defer
  1169         // processing of the other side's in-call.
  1170         bool defer;
  1171         const char* winner;
  1172         switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(),
  1173                                           (mSide != ChildSide) ? mInterruptStack.top() : aMsg))
  1175           case RIPChildWins:
  1176             winner = "child";
  1177             defer = (mSide == ChildSide);
  1178             break;
  1179           case RIPParentWins:
  1180             winner = "parent";
  1181             defer = (mSide != ChildSide);
  1182             break;
  1183           case RIPError:
  1184             NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
  1185             return;
  1186           default:
  1187             NS_RUNTIMEABORT("not reached");
  1188             return;
  1191         if (LoggingEnabled()) {
  1192             printf_stderr("  (%s: %s won, so we're%sdeferring)\n",
  1193                           (mSide == ChildSide) ? "child" : "parent",
  1194                           winner,
  1195                           defer ? " " : " not ");
  1198         if (defer) {
  1199             // We now know the other side's stack has one more frame
  1200             // than we thought.
  1201             ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
  1202             mDeferred.push(aMsg);
  1203             return;
  1206         // We "lost" and need to process the other side's in-call. Don't need
  1207         // to fix up the mRemoteStackDepthGuess here, because we're just about
  1208         // to increment it in DispatchCall(), which will make it correct again.
  1211 #ifdef OS_WIN
  1212     SyncStackFrame frame(this, true);
  1213 #endif
  1215     Message* reply = nullptr;
  1217     ++mRemoteStackDepthGuess;
  1218     Result rv = mListener->OnCallReceived(aMsg, reply);
  1219     --mRemoteStackDepthGuess;
  1221     if (!MaybeHandleError(rv, "DispatchInterruptMessage")) {
  1222         delete reply;
  1223         reply = new Message();
  1224         reply->set_interrupt();
  1225         reply->set_reply();
  1226         reply->set_reply_error();
  1228     reply->set_seqno(aMsg.seqno());
  1230     MonitorAutoLock lock(*mMonitor);
  1231     if (ChannelConnected == mChannelState)
  1232         mLink->SendMessage(reply);
  1235 void
  1236 MessageChannel::MaybeUndeferIncall()
  1238     AssertWorkerThread();
  1239     mMonitor->AssertCurrentThreadOwns();
  1241     if (mDeferred.empty())
  1242         return;
  1244     size_t stackDepth = InterruptStackDepth();
  1246     // the other side can only *under*-estimate our actual stack depth
  1247     IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
  1248                "fatal logic error");
  1250     if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
  1251         return;
  1253     // maybe time to process this message
  1254     Message call = mDeferred.top();
  1255     mDeferred.pop();
  1257     // fix up fudge factor we added to account for race
  1258     IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
  1259     --mRemoteStackDepthGuess;
  1261     mPending.push_back(call);
  1264 void
  1265 MessageChannel::FlushPendingInterruptQueue()
  1267     AssertWorkerThread();
  1268     mMonitor->AssertNotCurrentThreadOwns();
  1271         MonitorAutoLock lock(*mMonitor);
  1273         if (mDeferred.empty()) {
  1274             if (mPending.empty())
  1275                 return;
  1277             const Message& last = mPending.back();
  1278             if (!last.is_interrupt() || last.is_reply())
  1279                 return;
  1283     while (OnMaybeDequeueOne());
  1286 void
  1287 MessageChannel::ExitedCxxStack()
  1289     mListener->OnExitedCxxStack();
  1290     if (mSawInterruptOutMsg) {
  1291         MonitorAutoLock lock(*mMonitor);
  1292         // see long comment in OnMaybeDequeueOne()
  1293         EnqueuePendingMessages();
  1294         mSawInterruptOutMsg = false;
  1298 void
  1299 MessageChannel::EnqueuePendingMessages()
  1301     AssertWorkerThread();
  1302     mMonitor->AssertCurrentThreadOwns();
  1304     MaybeUndeferIncall();
  1306     for (size_t i = 0; i < mDeferred.size(); ++i) {
  1307         mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
  1310     // XXX performance tuning knob: could process all or k pending
  1311     // messages here, rather than enqueuing for later processing
  1313     for (size_t i = 0; i < mPending.size(); ++i) {
  1314         mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
  1318 static inline bool
  1319 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
  1321     return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
  1322            (aTimeout <= (PR_IntervalNow() - aStart));
  1325 bool
  1326 MessageChannel::WaitResponse(bool aWaitTimedOut)
  1328     if (aWaitTimedOut) {
  1329         if (mInTimeoutSecondHalf) {
  1330             // We've really timed out this time.
  1331             return false;
  1333         // Try a second time.
  1334         mInTimeoutSecondHalf = true;
  1335     } else {
  1336         mInTimeoutSecondHalf = false;
  1338     return true;
  1341 #ifndef OS_WIN
  1342 bool
  1343 MessageChannel::WaitForSyncNotify()
  1345     PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
  1346                              PR_INTERVAL_NO_TIMEOUT :
  1347                              PR_MillisecondsToInterval(mTimeoutMs);
  1348     // XXX could optimize away this syscall for "no timeout" case if desired
  1349     PRIntervalTime waitStart = PR_IntervalNow();
  1351     mMonitor->Wait(timeout);
  1353     // If the timeout didn't expire, we know we received an event. The
  1354     // converse is not true.
  1355     return WaitResponse(IsTimeoutExpired(waitStart, timeout));
  1358 bool
  1359 MessageChannel::WaitForInterruptNotify()
  1361     return WaitForSyncNotify();
  1364 void
  1365 MessageChannel::NotifyWorkerThread()
  1367     mMonitor->Notify();
  1369 #endif
  1371 bool
  1372 MessageChannel::ShouldContinueFromTimeout()
  1374     AssertWorkerThread();
  1375     mMonitor->AssertCurrentThreadOwns();
  1377     bool cont;
  1379         MonitorAutoUnlock unlock(*mMonitor);
  1380         cont = mListener->OnReplyTimeout();
  1383     static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
  1385     if (sDebuggingChildren == UNKNOWN) {
  1386         sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
  1388     if (sDebuggingChildren == DEBUGGING) {
  1389         return true;
  1392     if (!cont) {
  1393         // NB: there's a sublety here.  If parents were allowed to send sync
  1394         // messages to children, then it would be possible for this
  1395         // synchronous close-on-timeout to race with async |OnMessageReceived|
  1396         // tasks arriving from the child, posted to the worker thread's event
  1397         // loop.  This would complicate cleanup of the *Channel.  But since
  1398         // IPDL forbids this (and since it doesn't support children timing out
  1399         // on parents), the parent can only block on interrupt messages to the child,
  1400         // and in that case arriving async messages are enqueued to the interrupt 
  1401         // channel's special queue.  They're then ignored because the channel
  1402         // state changes to ChannelTimeout (i.e. !Connected).
  1403         SynchronouslyClose();
  1404         mChannelState = ChannelTimeout;
  1407     return cont;
  1410 void
  1411 MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
  1413     // Set channel timeout value. Since this is broken up into
  1414     // two period, the minimum timeout value is 2ms.
  1415     AssertWorkerThread();
  1416     mTimeoutMs = (aTimeoutMs <= 0)
  1417                  ? kNoTimeout
  1418                  : (int32_t)ceil((double)aTimeoutMs / 2.0);
  1421 void
  1422 MessageChannel::OnChannelConnected(int32_t peer_id)
  1424     mWorkerLoop->PostTask(
  1425         FROM_HERE,
  1426         NewRunnableMethod(this,
  1427                           &MessageChannel::DispatchOnChannelConnected,
  1428                           peer_id));
  1431 void
  1432 MessageChannel::DispatchOnChannelConnected(int32_t peer_pid)
  1434     AssertWorkerThread();
  1435     if (mListener)
  1436         mListener->OnChannelConnected(peer_pid);
  1439 void
  1440 MessageChannel::ReportMessageRouteError(const char* channelName) const
  1442     PrintErrorMessage(mSide, channelName, "Need a route");
  1443     mListener->OnProcessingError(MsgRouteError);
  1446 void
  1447 MessageChannel::ReportConnectionError(const char* aChannelName) const
  1449     AssertWorkerThread();
  1450     mMonitor->AssertCurrentThreadOwns();
  1452     const char* errorMsg = nullptr;
  1453     switch (mChannelState) {
  1454       case ChannelClosed:
  1455         errorMsg = "Closed channel: cannot send/recv";
  1456         break;
  1457       case ChannelOpening:
  1458         errorMsg = "Opening channel: not yet ready for send/recv";
  1459         break;
  1460       case ChannelTimeout:
  1461         errorMsg = "Channel timeout: cannot send/recv";
  1462         break;
  1463       case ChannelClosing:
  1464         errorMsg = "Channel closing: too late to send/recv, messages will be lost";
  1465         break;
  1466       case ChannelError:
  1467         errorMsg = "Channel error: cannot send/recv";
  1468         break;
  1470       default:
  1471         NS_RUNTIMEABORT("unreached");
  1474     PrintErrorMessage(mSide, aChannelName, errorMsg);
  1476     MonitorAutoUnlock unlock(*mMonitor);
  1477     mListener->OnProcessingError(MsgDropped);
  1480 bool
  1481 MessageChannel::MaybeHandleError(Result code, const char* channelName)
  1483     if (MsgProcessed == code)
  1484         return true;
  1486     const char* errorMsg = nullptr;
  1487     switch (code) {
  1488       case MsgNotKnown:
  1489         errorMsg = "Unknown message: not processed";
  1490         break;
  1491       case MsgNotAllowed:
  1492         errorMsg = "Message not allowed: cannot be sent/recvd in this state";
  1493         break;
  1494       case MsgPayloadError:
  1495         errorMsg = "Payload error: message could not be deserialized";
  1496         break;
  1497       case MsgProcessingError:
  1498         errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
  1499         break;
  1500       case MsgRouteError:
  1501         errorMsg = "Route error: message sent to unknown actor ID";
  1502         break;
  1503       case MsgValueError:
  1504         errorMsg = "Value error: message was deserialized, but contained an illegal value";
  1505         break;
  1507     default:
  1508         NS_RUNTIMEABORT("unknown Result code");
  1509         return false;
  1512     PrintErrorMessage(mSide, channelName, errorMsg);
  1514     mListener->OnProcessingError(code);
  1516     return false;
  1519 void
  1520 MessageChannel::OnChannelErrorFromLink()
  1522     AssertLinkThread();
  1523     mMonitor->AssertCurrentThreadOwns();
  1525     if (InterruptStackDepth() > 0)
  1526         NotifyWorkerThread();
  1528     if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply())
  1529         NotifyWorkerThread();
  1531     if (ChannelClosing != mChannelState) {
  1532         if (mAbortOnError) {
  1533             NS_RUNTIMEABORT("Aborting on channel error.");
  1535         mChannelState = ChannelError;
  1536         mMonitor->Notify();
  1539     PostErrorNotifyTask();
  1542 void
  1543 MessageChannel::NotifyMaybeChannelError()
  1545     mMonitor->AssertNotCurrentThreadOwns();
  1547     // TODO sort out Close() on this side racing with Close() on the other side
  1548     if (ChannelClosing == mChannelState) {
  1549         // the channel closed, but we received a "Goodbye" message warning us
  1550         // about it. no worries
  1551         mChannelState = ChannelClosed;
  1552         NotifyChannelClosed();
  1553         return;
  1556     // Oops, error!  Let the listener know about it.
  1557     mChannelState = ChannelError;
  1558     mListener->OnChannelError();
  1559     Clear();
  1562 void
  1563 MessageChannel::OnNotifyMaybeChannelError()
  1565     AssertWorkerThread();
  1566     mMonitor->AssertNotCurrentThreadOwns();
  1568     mChannelErrorTask = nullptr;
  1570     // OnChannelError holds mMonitor when it posts this task and this
  1571     // task cannot be allowed to run until OnChannelError has
  1572     // exited. We enforce that order by grabbing the mutex here which
  1573     // should only continue once OnChannelError has completed.
  1575         MonitorAutoLock lock(*mMonitor);
  1576         // nothing to do here
  1579     if (IsOnCxxStack()) {
  1580         mChannelErrorTask =
  1581             NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
  1582         // 10 ms delay is completely arbitrary
  1583         mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
  1584         return;
  1587     NotifyMaybeChannelError();
  1590 void
  1591 MessageChannel::PostErrorNotifyTask()
  1593     mMonitor->AssertCurrentThreadOwns();
  1595     if (mChannelErrorTask)
  1596         return;
  1598     // This must be the last code that runs on this thread!
  1599     mChannelErrorTask =
  1600         NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
  1601     mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
  1604 // Special async message.
  1605 class GoodbyeMessage : public IPC::Message
  1607 public:
  1608     GoodbyeMessage() :
  1609         IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL)
  1612     static bool Read(const Message* msg) {
  1613         return true;
  1615     void Log(const std::string& aPrefix, FILE* aOutf) const {
  1616         fputs("(special `Goodbye' message)", aOutf);
  1618 };
  1620 void
  1621 MessageChannel::SynchronouslyClose()
  1623     AssertWorkerThread();
  1624     mMonitor->AssertCurrentThreadOwns();
  1625     mLink->SendClose();
  1626     while (ChannelClosed != mChannelState)
  1627         mMonitor->Wait();
  1630 void
  1631 MessageChannel::CloseWithError()
  1633     AssertWorkerThread();
  1635     MonitorAutoLock lock(*mMonitor);
  1636     if (ChannelConnected != mChannelState) {
  1637         return;
  1639     SynchronouslyClose();
  1640     mChannelState = ChannelError;
  1641     PostErrorNotifyTask();
  1644 void
  1645 MessageChannel::Close()
  1647     AssertWorkerThread();
  1650         MonitorAutoLock lock(*mMonitor);
  1652         if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
  1653             // See bug 538586: if the listener gets deleted while the
  1654             // IO thread's NotifyChannelError event is still enqueued
  1655             // and subsequently deletes us, then the error event will
  1656             // also be deleted and the listener will never be notified
  1657             // of the channel error.
  1658             if (mListener) {
  1659                 MonitorAutoUnlock unlock(*mMonitor);
  1660                 NotifyMaybeChannelError();
  1662             return;
  1665         if (ChannelOpening == mChannelState) {
  1666             // Mimic CloseWithError().
  1667             SynchronouslyClose();
  1668             mChannelState = ChannelError;
  1669             PostErrorNotifyTask();
  1670             return;
  1673         if (ChannelConnected != mChannelState) {
  1674             // XXX be strict about this until there's a compelling reason
  1675             // to relax
  1676             NS_RUNTIMEABORT("Close() called on closed channel!");
  1679         // notify the other side that we're about to close our socket
  1680         mLink->SendMessage(new GoodbyeMessage());
  1681         SynchronouslyClose();
  1684     NotifyChannelClosed();
  1687 void
  1688 MessageChannel::NotifyChannelClosed()
  1690     mMonitor->AssertNotCurrentThreadOwns();
  1692     if (ChannelClosed != mChannelState)
  1693         NS_RUNTIMEABORT("channel should have been closed!");
  1695     // OK, the IO thread just closed the channel normally.  Let the
  1696     // listener know about it.
  1697     mListener->OnChannelClose();
  1699     Clear();
  1702 void
  1703 MessageChannel::DebugAbort(const char* file, int line, const char* cond,
  1704                            const char* why,
  1705                            bool reply) const
  1707     printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
  1708                   "Assertion (%s) failed.  %s %s\n",
  1709                   mSide == ChildSide ? "Child" : "Parent",
  1710                   file, line, cond,
  1711                   why,
  1712                   reply ? "(reply)" : "");
  1713     // technically we need the mutex for this, but we're dying anyway
  1714     DumpInterruptStack("  ");
  1715     printf_stderr("  remote Interrupt stack guess: %lu\n",
  1716                   mRemoteStackDepthGuess);
  1717     printf_stderr("  deferred stack size: %lu\n",
  1718                   mDeferred.size());
  1719     printf_stderr("  out-of-turn Interrupt replies stack size: %lu\n",
  1720                   mOutOfTurnReplies.size());
  1721     printf_stderr("  Pending queue size: %lu, front to back:\n",
  1722                   mPending.size());
  1724     MessageQueue pending = mPending;
  1725     while (!pending.empty()) {
  1726         printf_stderr("    [ %s%s ]\n",
  1727                       pending.front().is_interrupt() ? "intr" :
  1728                       (pending.front().is_sync() ? "sync" : "async"),
  1729                       pending.front().is_reply() ? "reply" : "");
  1730         pending.pop_front();
  1733     NS_RUNTIMEABORT(why);
  1736 void
  1737 MessageChannel::DumpInterruptStack(const char* const pfx) const
  1739     NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
  1740                      "The worker thread had better be paused in a debugger!");
  1742     printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
  1744     // print a python-style backtrace, first frame to last
  1745     for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
  1746         int32_t id;
  1747         const char* dir, *sems, *name;
  1748         mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
  1750         printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
  1751                       i, dir, sems, name, id);
  1755 } // ipc
  1756 } // mozilla

mercurial