ipc/glue/MessageChannel.cpp

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/ipc/glue/MessageChannel.cpp	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,1756 @@
     1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
     1.5 + * vim: sw=4 ts=4 et :
     1.6 + */
     1.7 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.8 + * License, v. 2.0. If a copy of the MPL was not distributed with this
     1.9 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
    1.10 +
    1.11 +#include "mozilla/ipc/MessageChannel.h"
    1.12 +#include "mozilla/ipc/ProtocolUtils.h"
    1.13 +
    1.14 +#include "mozilla/Assertions.h"
    1.15 +#include "mozilla/DebugOnly.h"
    1.16 +#include "mozilla/Move.h"
    1.17 +#include "nsDebug.h"
    1.18 +#include "nsISupportsImpl.h"
    1.19 +
    1.20 +// Undo the damage done by mozzconf.h
    1.21 +#undef compress
    1.22 +
    1.23 +using namespace mozilla;
    1.24 +using namespace std;
    1.25 +
    1.26 +using mozilla::MonitorAutoLock;
    1.27 +using mozilla::MonitorAutoUnlock;
    1.28 +
    1.29 +template<>
    1.30 +struct RunnableMethodTraits<mozilla::ipc::MessageChannel>
    1.31 +{
    1.32 +    static void RetainCallee(mozilla::ipc::MessageChannel* obj) { }
    1.33 +    static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { }
    1.34 +};
    1.35 +
    1.36 +#define IPC_ASSERT(_cond, ...)                                      \
    1.37 +    do {                                                            \
    1.38 +        if (!(_cond))                                               \
    1.39 +            DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__);  \
    1.40 +    } while (0)
    1.41 +
    1.42 +namespace mozilla {
    1.43 +namespace ipc {
    1.44 +
    1.45 +const int32_t MessageChannel::kNoTimeout = INT32_MIN;
    1.46 +
    1.47 +// static
    1.48 +bool MessageChannel::sIsPumpingMessages = false;
    1.49 +
    1.50 +enum Direction
    1.51 +{
    1.52 +    IN_MESSAGE,
    1.53 +    OUT_MESSAGE
    1.54 +};
    1.55 +
    1.56 +
    1.57 +class MessageChannel::InterruptFrame
    1.58 +{
    1.59 +private:
    1.60 +    enum Semantics
    1.61 +    {
    1.62 +        INTR_SEMS,
    1.63 +        SYNC_SEMS,
    1.64 +        ASYNC_SEMS
    1.65 +    };
    1.66 +
    1.67 +public:
    1.68 +    InterruptFrame(Direction direction, const Message* msg)
    1.69 +      : mMessageName(strdup(msg->name())),
    1.70 +        mMessageRoutingId(msg->routing_id()),
    1.71 +        mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
    1.72 +                          msg->is_sync() ? SYNC_SEMS :
    1.73 +                          ASYNC_SEMS),
    1.74 +        mDirection(direction),
    1.75 +        mMoved(false)
    1.76 +    {
    1.77 +        MOZ_ASSERT(mMessageName);
    1.78 +    }
    1.79 +
    1.80 +    InterruptFrame(InterruptFrame&& aOther)
    1.81 +    {
    1.82 +        MOZ_ASSERT(aOther.mMessageName);
    1.83 +        mMessageName = aOther.mMessageName;
    1.84 +        aOther.mMessageName = nullptr;
    1.85 +        aOther.mMoved = true;
    1.86 +
    1.87 +        mMessageRoutingId = aOther.mMessageRoutingId;
    1.88 +        mMesageSemantics = aOther.mMesageSemantics;
    1.89 +        mDirection = aOther.mDirection;
    1.90 +    }
    1.91 +
    1.92 +    ~InterruptFrame()
    1.93 +    {
    1.94 +        MOZ_ASSERT_IF(!mMessageName, mMoved);
    1.95 +
    1.96 +        if (mMessageName)
    1.97 +            free(const_cast<char*>(mMessageName));
    1.98 +    }
    1.99 +
   1.100 +    InterruptFrame& operator=(InterruptFrame&& aOther)
   1.101 +    {
   1.102 +        MOZ_ASSERT(&aOther != this);
   1.103 +        this->~InterruptFrame();
   1.104 +        new (this) InterruptFrame(mozilla::Move(aOther));
   1.105 +        return *this;
   1.106 +    }
   1.107 +
   1.108 +    bool IsInterruptIncall() const
   1.109 +    {
   1.110 +        return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
   1.111 +    }
   1.112 +
   1.113 +    bool IsInterruptOutcall() const
   1.114 +    {
   1.115 +        return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
   1.116 +    }
   1.117 +
   1.118 +    void Describe(int32_t* id, const char** dir, const char** sems,
   1.119 +                  const char** name) const
   1.120 +    {
   1.121 +        *id = mMessageRoutingId;
   1.122 +        *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
   1.123 +        *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
   1.124 +                (SYNC_SEMS == mMesageSemantics) ? "sync" :
   1.125 +                "async";
   1.126 +        *name = mMessageName;
   1.127 +    }
   1.128 +
   1.129 +private:
   1.130 +    const char* mMessageName;
   1.131 +    int32_t mMessageRoutingId;
   1.132 +    Semantics mMesageSemantics;
   1.133 +    Direction mDirection;
   1.134 +    DebugOnly<bool> mMoved;
   1.135 +
   1.136 +    // Disable harmful methods.
   1.137 +    InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE;
   1.138 +    InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE;
   1.139 +};
   1.140 +
   1.141 +class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
   1.142 +{
   1.143 +public:
   1.144 +    CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
   1.145 +      : mThat(that)
   1.146 +    {
   1.147 +        mThat.AssertWorkerThread();
   1.148 +
   1.149 +        if (mThat.mCxxStackFrames.empty())
   1.150 +            mThat.EnteredCxxStack();
   1.151 +
   1.152 +        mThat.mCxxStackFrames.append(InterruptFrame(direction, msg));
   1.153 +
   1.154 +        const InterruptFrame& frame = mThat.mCxxStackFrames.back();
   1.155 +
   1.156 +        if (frame.IsInterruptIncall())
   1.157 +            mThat.EnteredCall();
   1.158 +
   1.159 +        mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
   1.160 +    }
   1.161 +
   1.162 +    ~CxxStackFrame() {
   1.163 +        mThat.AssertWorkerThread();
   1.164 +
   1.165 +        MOZ_ASSERT(!mThat.mCxxStackFrames.empty());
   1.166 +
   1.167 +        bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall();
   1.168 +        mThat.mCxxStackFrames.shrinkBy(1);
   1.169 +
   1.170 +        bool exitingStack = mThat.mCxxStackFrames.empty();
   1.171 +
   1.172 +        // mListener could have gone away if Close() was called while
   1.173 +        // MessageChannel code was still on the stack
   1.174 +        if (!mThat.mListener)
   1.175 +            return;
   1.176 +
   1.177 +        if (exitingCall)
   1.178 +            mThat.ExitedCall();
   1.179 +
   1.180 +        if (exitingStack)
   1.181 +            mThat.ExitedCxxStack();
   1.182 +    }
   1.183 +private:
   1.184 +    MessageChannel& mThat;
   1.185 +
   1.186 +    // Disable harmful methods.
   1.187 +    CxxStackFrame() MOZ_DELETE;
   1.188 +    CxxStackFrame(const CxxStackFrame&) MOZ_DELETE;
   1.189 +    CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE;
   1.190 +};
   1.191 +
   1.192 +MessageChannel::MessageChannel(MessageListener *aListener)
   1.193 +  : mListener(aListener->asWeakPtr()),
   1.194 +    mChannelState(ChannelClosed),
   1.195 +    mSide(UnknownSide),
   1.196 +    mLink(nullptr),
   1.197 +    mWorkerLoop(nullptr),
   1.198 +    mChannelErrorTask(nullptr),
   1.199 +    mWorkerLoopID(-1),
   1.200 +    mTimeoutMs(kNoTimeout),
   1.201 +    mInTimeoutSecondHalf(false),
   1.202 +    mNextSeqno(0),
   1.203 +    mPendingSyncReplies(0),
   1.204 +    mPendingUrgentReplies(0),
   1.205 +    mPendingRPCReplies(0),
   1.206 +    mCurrentRPCTransaction(0),
   1.207 +    mDispatchingSyncMessage(false),
   1.208 +    mDispatchingUrgentMessageCount(0),
   1.209 +    mRemoteStackDepthGuess(false),
   1.210 +    mSawInterruptOutMsg(false),
   1.211 +    mAbortOnError(false),
   1.212 +    mFlags(REQUIRE_DEFAULT)
   1.213 +{
   1.214 +    MOZ_COUNT_CTOR(ipc::MessageChannel);
   1.215 +
   1.216 +#ifdef OS_WIN
   1.217 +    mTopFrame = nullptr;
   1.218 +    mIsSyncWaitingOnNonMainThread = false;
   1.219 +#endif
   1.220 +
   1.221 +    mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
   1.222 +                                                 this,
   1.223 +                                                 &MessageChannel::OnMaybeDequeueOne));
   1.224 +
   1.225 +#ifdef OS_WIN
   1.226 +    mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
   1.227 +    NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
   1.228 +#endif
   1.229 +}
   1.230 +
   1.231 +MessageChannel::~MessageChannel()
   1.232 +{
   1.233 +    MOZ_COUNT_DTOR(ipc::MessageChannel);
   1.234 +    IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
   1.235 +#ifdef OS_WIN
   1.236 +    DebugOnly<BOOL> ok = CloseHandle(mEvent);
   1.237 +    MOZ_ASSERT(ok);
   1.238 +#endif
   1.239 +    Clear();
   1.240 +}
   1.241 +
   1.242 +static void
   1.243 +PrintErrorMessage(Side side, const char* channelName, const char* msg)
   1.244 +{
   1.245 +    const char *from = (side == ChildSide)
   1.246 +                       ? "Child"
   1.247 +                       : ((side == ParentSide) ? "Parent" : "Unknown");
   1.248 +    printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
   1.249 +}
   1.250 +
   1.251 +bool
   1.252 +MessageChannel::Connected() const
   1.253 +{
   1.254 +    mMonitor->AssertCurrentThreadOwns();
   1.255 +
   1.256 +    // The transport layer allows us to send messages before
   1.257 +    // receiving the "connected" ack from the remote side.
   1.258 +    return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
   1.259 +}
   1.260 +
   1.261 +bool
   1.262 +MessageChannel::CanSend() const
   1.263 +{
   1.264 +    MonitorAutoLock lock(*mMonitor);
   1.265 +    return Connected();
   1.266 +}
   1.267 +
   1.268 +void
   1.269 +MessageChannel::Clear()
   1.270 +{
   1.271 +    // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
   1.272 +    // AssertWorkerThread().
   1.273 +    //
   1.274 +    // Also don't clear mListener.  If we clear it, then sending a message
   1.275 +    // through this channel after it's Clear()'ed can cause this process to
   1.276 +    // crash.
   1.277 +    //
   1.278 +    // In practice, mListener owns the channel, so the channel gets deleted
   1.279 +    // before mListener.  But just to be safe, mListener is a weak pointer.
   1.280 +
   1.281 +    mDequeueOneTask->Cancel();
   1.282 +
   1.283 +    mWorkerLoop = nullptr;
   1.284 +    delete mLink;
   1.285 +    mLink = nullptr;
   1.286 +
   1.287 +    if (mChannelErrorTask) {
   1.288 +        mChannelErrorTask->Cancel();
   1.289 +        mChannelErrorTask = nullptr;
   1.290 +    }
   1.291 +
   1.292 +    // Free up any memory used by pending messages.
   1.293 +    mPending.clear();
   1.294 +    mPendingUrgentRequest = nullptr;
   1.295 +    mPendingRPCCall = nullptr;
   1.296 +    mOutOfTurnReplies.clear();
   1.297 +    while (!mDeferred.empty()) {
   1.298 +        mDeferred.pop();
   1.299 +    }
   1.300 +}
   1.301 +
   1.302 +bool
   1.303 +MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
   1.304 +{
   1.305 +    NS_PRECONDITION(!mLink, "Open() called > once");
   1.306 +
   1.307 +    mMonitor = new RefCountedMonitor();
   1.308 +    mWorkerLoop = MessageLoop::current();
   1.309 +    mWorkerLoopID = mWorkerLoop->id();
   1.310 +
   1.311 +    ProcessLink *link = new ProcessLink(this);
   1.312 +    link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
   1.313 +    mLink = link;
   1.314 +    return true;
   1.315 +}
   1.316 +
   1.317 +bool
   1.318 +MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
   1.319 +{
   1.320 +    // Opens a connection to another thread in the same process.
   1.321 +
   1.322 +    //  This handshake proceeds as follows:
   1.323 +    //  - Let A be the thread initiating the process (either child or parent)
   1.324 +    //    and B be the other thread.
   1.325 +    //  - A spawns thread for B, obtaining B's message loop
   1.326 +    //  - A creates ProtocolChild and ProtocolParent instances.
   1.327 +    //    Let PA be the one appropriate to A and PB the side for B.
   1.328 +    //  - A invokes PA->Open(PB, ...):
   1.329 +    //    - set state to mChannelOpening
   1.330 +    //    - this will place a work item in B's worker loop (see next bullet)
   1.331 +    //      and then spins until PB->mChannelState becomes mChannelConnected
   1.332 +    //    - meanwhile, on PB's worker loop, the work item is removed and:
   1.333 +    //      - invokes PB->SlaveOpen(PA, ...):
   1.334 +    //        - sets its state and that of PA to Connected
   1.335 +    NS_PRECONDITION(aTargetChan, "Need a target channel");
   1.336 +    NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
   1.337 +
   1.338 +    CommonThreadOpenInit(aTargetChan, aSide);
   1.339 +
   1.340 +    Side oppSide = UnknownSide;
   1.341 +    switch(aSide) {
   1.342 +      case ChildSide: oppSide = ParentSide; break;
   1.343 +      case ParentSide: oppSide = ChildSide; break;
   1.344 +      case UnknownSide: break;
   1.345 +    }
   1.346 +
   1.347 +    mMonitor = new RefCountedMonitor();
   1.348 +
   1.349 +    MonitorAutoLock lock(*mMonitor);
   1.350 +    mChannelState = ChannelOpening;
   1.351 +    aTargetLoop->PostTask(
   1.352 +        FROM_HERE,
   1.353 +        NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide));
   1.354 +
   1.355 +    while (ChannelOpening == mChannelState)
   1.356 +        mMonitor->Wait();
   1.357 +    NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
   1.358 +    return (ChannelConnected == mChannelState);
   1.359 +}
   1.360 +
   1.361 +void
   1.362 +MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
   1.363 +{
   1.364 +    // Invoked when the other side has begun the open.
   1.365 +    NS_PRECONDITION(ChannelClosed == mChannelState,
   1.366 +                    "Not currently closed");
   1.367 +    NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
   1.368 +                    "Target channel not in the process of opening");
   1.369 +
   1.370 +    CommonThreadOpenInit(aTargetChan, aSide);
   1.371 +    mMonitor = aTargetChan->mMonitor;
   1.372 +
   1.373 +    MonitorAutoLock lock(*mMonitor);
   1.374 +    NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
   1.375 +                 "Target channel not in the process of opening");
   1.376 +    mChannelState = ChannelConnected;
   1.377 +    aTargetChan->mChannelState = ChannelConnected;
   1.378 +    aTargetChan->mMonitor->Notify();
   1.379 +}
   1.380 +
   1.381 +void
   1.382 +MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
   1.383 +{
   1.384 +    mWorkerLoop = MessageLoop::current();
   1.385 +    mWorkerLoopID = mWorkerLoop->id();
   1.386 +    mLink = new ThreadLink(this, aTargetChan);
   1.387 +    mSide = aSide;
   1.388 +}
   1.389 +
   1.390 +bool
   1.391 +MessageChannel::Echo(Message* aMsg)
   1.392 +{
   1.393 +    nsAutoPtr<Message> msg(aMsg);
   1.394 +    AssertWorkerThread();
   1.395 +    mMonitor->AssertNotCurrentThreadOwns();
   1.396 +    if (MSG_ROUTING_NONE == msg->routing_id()) {
   1.397 +        ReportMessageRouteError("MessageChannel::Echo");
   1.398 +        return false;
   1.399 +    }
   1.400 +
   1.401 +    MonitorAutoLock lock(*mMonitor);
   1.402 +
   1.403 +    if (!Connected()) {
   1.404 +        ReportConnectionError("MessageChannel");
   1.405 +        return false;
   1.406 +    }
   1.407 +
   1.408 +    mLink->EchoMessage(msg.forget());
   1.409 +    return true;
   1.410 +}
   1.411 +
   1.412 +bool
   1.413 +MessageChannel::Send(Message* aMsg)
   1.414 +{
   1.415 +    CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);
   1.416 +
   1.417 +    nsAutoPtr<Message> msg(aMsg);
   1.418 +    AssertWorkerThread();
   1.419 +    mMonitor->AssertNotCurrentThreadOwns();
   1.420 +    if (MSG_ROUTING_NONE == msg->routing_id()) {
   1.421 +        ReportMessageRouteError("MessageChannel::Send");
   1.422 +        return false;
   1.423 +    }
   1.424 +
   1.425 +    MonitorAutoLock lock(*mMonitor);
   1.426 +    if (!Connected()) {
   1.427 +        ReportConnectionError("MessageChannel");
   1.428 +        return false;
   1.429 +    }
   1.430 +    mLink->SendMessage(msg.forget());
   1.431 +    return true;
   1.432 +}
   1.433 +
   1.434 +bool
   1.435 +MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
   1.436 +{
   1.437 +    AssertLinkThread();
   1.438 +    mMonitor->AssertCurrentThreadOwns();
   1.439 +
   1.440 +    if (MSG_ROUTING_NONE == aMsg.routing_id() &&
   1.441 +        GOODBYE_MESSAGE_TYPE == aMsg.type())
   1.442 +    {
   1.443 +        // :TODO: Sort out Close() on this side racing with Close() on the
   1.444 +        // other side
   1.445 +        mChannelState = ChannelClosing;
   1.446 +        if (LoggingEnabled()) {
   1.447 +            printf("NOTE: %s process received `Goodbye', closing down\n",
   1.448 +                   (mSide == ChildSide) ? "child" : "parent");
   1.449 +        }
   1.450 +        return true;
   1.451 +    }
   1.452 +    return false;
   1.453 +}
   1.454 +
   1.455 +void
   1.456 +MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
   1.457 +{
   1.458 +    AssertLinkThread();
   1.459 +    mMonitor->AssertCurrentThreadOwns();
   1.460 +
   1.461 +    if (MaybeInterceptSpecialIOMessage(aMsg))
   1.462 +        return;
   1.463 +
   1.464 +    // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply,
   1.465 +    // we know that it needs to be immediately handled to unblock us.
   1.466 +    if ((AwaitingSyncReply() && aMsg.is_sync()) ||
   1.467 +        (AwaitingUrgentReply() && aMsg.is_urgent()) ||
   1.468 +        (AwaitingRPCReply() && aMsg.is_rpc()))
   1.469 +    {
   1.470 +        mRecvd = new Message(aMsg);
   1.471 +        NotifyWorkerThread();
   1.472 +        return;
   1.473 +    }
   1.474 +
   1.475 +    // Urgent messages cannot be compressed.
   1.476 +    MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent());
   1.477 +
   1.478 +    bool compress = (aMsg.compress() && !mPending.empty() &&
   1.479 +                     mPending.back().type() == aMsg.type() &&
   1.480 +                     mPending.back().routing_id() == aMsg.routing_id());
   1.481 +    if (compress) {
   1.482 +        // This message type has compression enabled, and the back of the
   1.483 +        // queue was the same message type and routed to the same destination.
   1.484 +        // Replace it with the newer message.
   1.485 +        MOZ_ASSERT(mPending.back().compress());
   1.486 +        mPending.pop_back();
   1.487 +    }
   1.488 +
   1.489 +    bool shouldWakeUp = AwaitingInterruptReply() ||
   1.490 +                        // Allow incoming RPCs to be processed inside an urgent message.
   1.491 +                        (AwaitingUrgentReply() && aMsg.is_rpc()) ||
   1.492 +                        // Always process urgent messages while blocked.
   1.493 +                        ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent());
   1.494 +
   1.495 +    // There are four cases we're concerned about, relating to the state of the
   1.496 +    // main thread:
   1.497 +    //
   1.498 +    // (1) We are waiting on a sync|rpc reply - main thread is blocked on the
   1.499 +    //     IPC monitor.
   1.500 +    //   - If the message is high priority, we wake up the main thread to
   1.501 +    //     deliver the message. Otherwise, we leave it in the mPending queue,
   1.502 +    //     posting a task to the main event loop, where it will be processed
   1.503 +    //     once the synchronous reply has been received.
   1.504 +    //
   1.505 +    // (2) We are waiting on an Interrupt reply - main thread is blocked on the
   1.506 +    //     IPC monitor.
   1.507 +    //   - Always notify and wake up the main thread.
   1.508 +    //
   1.509 +    // (3) We are not waiting on a reply.
   1.510 +    //   - We post a task to the main event loop.
   1.511 +    //
   1.512 +    // Note that, we may notify the main thread even though the monitor is not
   1.513 +    // blocked. This is okay, since we always check for pending events before
   1.514 +    // blocking again.
   1.515 +
   1.516 +    if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) {
   1.517 +        // If we're receiving an RPC message while blocked on an urgent message,
   1.518 +        // we must defer any messages that were not sent as part of the child
   1.519 +        // answering the urgent message.
   1.520 +        //
   1.521 +        // We must also be sure that we will not accidentally defer any RPC
   1.522 +        // message that was sent while answering an urgent message. Otherwise,
   1.523 +        // we will deadlock.
   1.524 +        //
   1.525 +        // On the parent side, the current transaction can only transition from 0
   1.526 +        // to an ID, either by us issuing an urgent request while not blocked, or
   1.527 +        // by receiving an RPC request while not blocked. When we unblock, the
   1.528 +        // current transaction is reset to 0.
   1.529 +        //
   1.530 +        // When the child side receives an urgent message, any RPC messages sent
   1.531 +        // before issuing the urgent reply will carry the urgent message's
   1.532 +        // transaction ID.
   1.533 +        //
   1.534 +        // Since AwaitingUrgentReply() implies we are blocked, it also implies
   1.535 +        // that we are within a transaction that will not change until we are
   1.536 +        // completely unblocked (i.e, the transaction has completed).
   1.537 +        if (aMsg.transaction_id() != mCurrentRPCTransaction)
   1.538 +            shouldWakeUp = false;
   1.539 +    }
   1.540 +
   1.541 +    if (aMsg.is_urgent()) {
   1.542 +        MOZ_ASSERT(!mPendingUrgentRequest);
   1.543 +        mPendingUrgentRequest = new Message(aMsg);
   1.544 +    } else if (aMsg.is_rpc() && shouldWakeUp) {
   1.545 +        // Only use this slot if we need to wake up for an RPC call. Otherwise
   1.546 +        // we treat it like a normal async or sync message.
   1.547 +        MOZ_ASSERT(!mPendingRPCCall);
   1.548 +        mPendingRPCCall = new Message(aMsg);
   1.549 +    } else {
   1.550 +        mPending.push_back(aMsg);
   1.551 +    }
   1.552 +
   1.553 +    if (shouldWakeUp) {
   1.554 +        // Always wake up Interrupt waiters, sync waiters for urgent messages,
   1.555 +        // RPC waiters for urgent messages, and urgent waiters for RPCs in the
   1.556 +        // same transaction.
   1.557 +        NotifyWorkerThread();
   1.558 +    } else {
   1.559 +        // Worker thread is either not blocked on a reply, or this is an
   1.560 +        // incoming Interrupt that raced with outgoing sync, and needs to be
   1.561 +        // deferred to a later event-loop iteration.
   1.562 +        if (!compress) {
   1.563 +            // If we compressed away the previous message, we'll re-use
   1.564 +            // its pending task.
   1.565 +            mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
   1.566 +        }
   1.567 +    }
   1.568 +}
   1.569 +
   1.570 +bool
   1.571 +MessageChannel::Send(Message* aMsg, Message* aReply)
   1.572 +{
   1.573 +    // Sanity checks.
   1.574 +    AssertWorkerThread();
   1.575 +    mMonitor->AssertNotCurrentThreadOwns();
   1.576 +
   1.577 +#ifdef OS_WIN
   1.578 +    SyncStackFrame frame(this, false);
   1.579 +#endif
   1.580 +
   1.581 +    CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   1.582 +
   1.583 +    MonitorAutoLock lock(*mMonitor);
   1.584 +
   1.585 +    IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
   1.586 +    IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant");
   1.587 +    IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message");
   1.588 +    IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported");
   1.589 +
   1.590 +    AutoEnterPendingReply replies(mPendingSyncReplies);
   1.591 +    if (!SendAndWait(aMsg, aReply))
   1.592 +        return false;
   1.593 +
   1.594 +    NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync");
   1.595 +    return true;
   1.596 +}
   1.597 +
   1.598 +bool
   1.599 +MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
   1.600 +{
   1.601 +    AssertWorkerThread();
   1.602 +    mMonitor->AssertNotCurrentThreadOwns();
   1.603 +    IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child");
   1.604 +
   1.605 +#ifdef OS_WIN
   1.606 +    SyncStackFrame frame(this, false);
   1.607 +#endif
   1.608 +
   1.609 +    CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   1.610 +
   1.611 +    MonitorAutoLock lock(*mMonitor);
   1.612 +
   1.613 +    IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls");
   1.614 +    IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends");
   1.615 +
   1.616 +    AutoEnterRPCTransaction transact(this);
   1.617 +    aMsg->set_transaction_id(mCurrentRPCTransaction);
   1.618 +
   1.619 +    AutoEnterPendingReply replies(mPendingUrgentReplies);
   1.620 +    if (!SendAndWait(aMsg, aReply))
   1.621 +        return false;
   1.622 +
   1.623 +    NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent");
   1.624 +    return true;
   1.625 +}
   1.626 +
   1.627 +bool
   1.628 +MessageChannel::RPCCall(Message* aMsg, Message* aReply)
   1.629 +{
   1.630 +    AssertWorkerThread();
   1.631 +    mMonitor->AssertNotCurrentThreadOwns();
   1.632 +    IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent");
   1.633 +
   1.634 +#ifdef OS_WIN
   1.635 +    SyncStackFrame frame(this, false);
   1.636 +#endif
   1.637 +
   1.638 +    CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
   1.639 +
   1.640 +    MonitorAutoLock lock(*mMonitor);
   1.641 +
   1.642 +    AutoEnterRPCTransaction transact(this);
   1.643 +    aMsg->set_transaction_id(mCurrentRPCTransaction);
   1.644 +
   1.645 +    AutoEnterPendingReply replies(mPendingRPCReplies);
   1.646 +    if (!SendAndWait(aMsg, aReply))
   1.647 +        return false;
   1.648 +
   1.649 +    NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply");
   1.650 +    return true;
   1.651 +}
   1.652 +
   1.653 +bool
   1.654 +MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
   1.655 +{
   1.656 +    mMonitor->AssertCurrentThreadOwns();
   1.657 +
   1.658 +    nsAutoPtr<Message> msg(aMsg);
   1.659 +
   1.660 +    if (!Connected()) {
   1.661 +        ReportConnectionError("MessageChannel::SendAndWait");
   1.662 +        return false;
   1.663 +    }
   1.664 +
   1.665 +    msg->set_seqno(NextSeqno());
   1.666 +
   1.667 +    DebugOnly<int32_t> replySeqno = msg->seqno();
   1.668 +    DebugOnly<msgid_t> replyType = msg->type() + 1;
   1.669 +
   1.670 +    mLink->SendMessage(msg.forget());
   1.671 +
   1.672 +    while (true) {
   1.673 +        // Wait for an event to occur.
   1.674 +        while (true) {
   1.675 +            if (mRecvd || mPendingUrgentRequest || mPendingRPCCall)
   1.676 +                break;
   1.677 +
   1.678 +            bool maybeTimedOut = !WaitForSyncNotify();
   1.679 +
   1.680 +            if (!Connected()) {
   1.681 +                ReportConnectionError("MessageChannel::SendAndWait");
   1.682 +                return false;
   1.683 +            }
   1.684 +
   1.685 +            if (maybeTimedOut && !ShouldContinueFromTimeout())
   1.686 +                return false;
   1.687 +        }
   1.688 +
   1.689 +        if (mPendingUrgentRequest && !ProcessPendingUrgentRequest())
   1.690 +            return false;
   1.691 +
   1.692 +        if (mPendingRPCCall && !ProcessPendingRPCCall())
   1.693 +            return false;
   1.694 +
   1.695 +        if (mRecvd) {
   1.696 +            NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply");
   1.697 +
   1.698 +            if (mRecvd->is_reply_error()) {
   1.699 +                mRecvd = nullptr;
   1.700 +                return false;
   1.701 +            }
   1.702 +
   1.703 +            NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type");
   1.704 +            NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number");
   1.705 +
   1.706 +            *aReply = *mRecvd;
   1.707 +            mRecvd = nullptr;
   1.708 +            return true;
   1.709 +        }
   1.710 +    }
   1.711 +
   1.712 +    return true;
   1.713 +}
   1.714 +
   1.715 +bool
   1.716 +MessageChannel::Call(Message* aMsg, Message* aReply)
   1.717 +{
   1.718 +    if (aMsg->is_urgent())
   1.719 +        return UrgentCall(aMsg, aReply);
   1.720 +    if (aMsg->is_rpc())
   1.721 +        return RPCCall(aMsg, aReply);
   1.722 +    return InterruptCall(aMsg, aReply);
   1.723 +}
   1.724 +
   1.725 +bool
   1.726 +MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
   1.727 +{
   1.728 +    AssertWorkerThread();
   1.729 +    mMonitor->AssertNotCurrentThreadOwns();
   1.730 +
   1.731 +#ifdef OS_WIN
   1.732 +    SyncStackFrame frame(this, true);
   1.733 +#endif
   1.734 +
   1.735 +    // This must come before MonitorAutoLock, as its destructor acquires the
   1.736 +    // monitor lock.
   1.737 +    CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg);
   1.738 +
   1.739 +    MonitorAutoLock lock(*mMonitor);
   1.740 +    if (!Connected()) {
   1.741 +        ReportConnectionError("MessageChannel::Call");
   1.742 +        return false;
   1.743 +    }
   1.744 +
   1.745 +    // Sanity checks.
   1.746 +    IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(),
   1.747 +               "cannot issue Interrupt call whiel blocked on sync or urgent");
   1.748 +    IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH,
   1.749 +               "violation of sync handler invariant");
   1.750 +    IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here");
   1.751 +
   1.752 +
   1.753 +    nsAutoPtr<Message> msg(aMsg);
   1.754 +
   1.755 +    msg->set_seqno(NextSeqno());
   1.756 +    msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
   1.757 +    msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
   1.758 +    mInterruptStack.push(*msg);
   1.759 +    mLink->SendMessage(msg.forget());
   1.760 +
   1.761 +    while (true) {
   1.762 +        // if a handler invoked by *Dispatch*() spun a nested event
   1.763 +        // loop, and the connection was broken during that loop, we
   1.764 +        // might have already processed the OnError event. if so,
   1.765 +        // trying another loop iteration will be futile because
   1.766 +        // channel state will have been cleared
   1.767 +        if (!Connected()) {
   1.768 +            ReportConnectionError("MessageChannel::InterruptCall");
   1.769 +            return false;
   1.770 +        }
   1.771 +
   1.772 +        // Now might be the time to process a message deferred because of race
   1.773 +        // resolution.
   1.774 +        MaybeUndeferIncall();
   1.775 +
   1.776 +        // Wait for an event to occur.
   1.777 +        while (!InterruptEventOccurred()) {
   1.778 +            bool maybeTimedOut = !WaitForInterruptNotify();
   1.779 +
   1.780 +            // We might have received a "subtly deferred" message in a nested
   1.781 +            // loop that it's now time to process.
   1.782 +            if (InterruptEventOccurred() ||
   1.783 +                (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
   1.784 +            {
   1.785 +                break;
   1.786 +            }
   1.787 +
   1.788 +            if (maybeTimedOut && !ShouldContinueFromTimeout())
   1.789 +                return false;
   1.790 +        }
   1.791 +
   1.792 +        Message recvd;
   1.793 +        MessageMap::iterator it;
   1.794 +
   1.795 +        if (mPendingUrgentRequest) {
   1.796 +            recvd = *mPendingUrgentRequest;
   1.797 +            mPendingUrgentRequest = nullptr;
   1.798 +        } else if (mPendingRPCCall) {
   1.799 +            recvd = *mPendingRPCCall;
   1.800 +            mPendingRPCCall = nullptr;
   1.801 +        } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
   1.802 +                    != mOutOfTurnReplies.end())
   1.803 +        {
   1.804 +            recvd = it->second;
   1.805 +            mOutOfTurnReplies.erase(it);
   1.806 +        } else if (!mPending.empty()) {
   1.807 +            recvd = mPending.front();
   1.808 +            mPending.pop_front();
   1.809 +        } else {
   1.810 +            // because of subtleties with nested event loops, it's possible
   1.811 +            // that we got here and nothing happened.  or, we might have a
   1.812 +            // deferred in-call that needs to be processed.  either way, we
   1.813 +            // won't break the inner while loop again until something new
   1.814 +            // happens.
   1.815 +            continue;
   1.816 +        }
   1.817 +
   1.818 +        // If the message is not Interrupt, we can dispatch it as normal.
   1.819 +        if (!recvd.is_interrupt()) {
   1.820 +            // Other side should be blocked.
   1.821 +            IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked");
   1.822 +
   1.823 +            {
   1.824 +                AutoEnterRPCTransaction transaction(this, &recvd);
   1.825 +                MonitorAutoUnlock unlock(*mMonitor);
   1.826 +                CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
   1.827 +                DispatchMessage(recvd);
   1.828 +            }
   1.829 +            if (!Connected()) {
   1.830 +                ReportConnectionError("MessageChannel::DispatchMessage");
   1.831 +                return false;
   1.832 +            }
   1.833 +            continue;
   1.834 +        }
   1.835 +
   1.836 +        // If the message is an Interrupt reply, either process it as a reply to our
   1.837 +        // call, or add it to the list of out-of-turn replies we've received.
   1.838 +        if (recvd.is_reply()) {
   1.839 +            IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");
   1.840 +
   1.841 +            // If this is not a reply the call we've initiated, add it to our
   1.842 +            // out-of-turn replies and keep polling for events.
   1.843 +            {
   1.844 +                const Message &outcall = mInterruptStack.top();
   1.845 +
   1.846 +                // Note, In the parent, sequence numbers increase from 0, and
   1.847 +                // in the child, they decrease from 0.
   1.848 +                if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
   1.849 +                    (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
   1.850 +                {
   1.851 +                    mOutOfTurnReplies[recvd.seqno()] = recvd;
   1.852 +                    continue;
   1.853 +                }
   1.854 +
   1.855 +                IPC_ASSERT(recvd.is_reply_error() ||
   1.856 +                           (recvd.type() == (outcall.type() + 1) &&
   1.857 +                            recvd.seqno() == outcall.seqno()),
   1.858 +                           "somebody's misbehavin'", true);
   1.859 +            }
   1.860 +
   1.861 +            // We received a reply to our most recent outstanding call. Pop
   1.862 +            // this frame and return the reply.
   1.863 +            mInterruptStack.pop();
   1.864 +
   1.865 +            if (!recvd.is_reply_error()) {
   1.866 +                *aReply = recvd;
   1.867 +            }
   1.868 +
   1.869 +            // If we have no more pending out calls waiting on replies, then
   1.870 +            // the reply queue should be empty.
   1.871 +            IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
   1.872 +                       "still have pending replies with no pending out-calls",
   1.873 +                       true);
   1.874 +
   1.875 +            return !recvd.is_reply_error();
   1.876 +        }
   1.877 +
   1.878 +        // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
   1.879 +        // own the monitor.
   1.880 +        size_t stackDepth = InterruptStackDepth();
   1.881 +        {
   1.882 +            MonitorAutoUnlock unlock(*mMonitor);
   1.883 +
   1.884 +            CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
   1.885 +            DispatchInterruptMessage(recvd, stackDepth);
   1.886 +        }
   1.887 +        if (!Connected()) {
   1.888 +            ReportConnectionError("MessageChannel::DispatchInterruptMessage");
   1.889 +            return false;
   1.890 +        }
   1.891 +    }
   1.892 +
   1.893 +    return true;
   1.894 +}
   1.895 +
   1.896 +bool
   1.897 +MessageChannel::InterruptEventOccurred()
   1.898 +{
   1.899 +    AssertWorkerThread();
   1.900 +    mMonitor->AssertCurrentThreadOwns();
   1.901 +    IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
   1.902 +
   1.903 +    return (!Connected() ||
   1.904 +            !mPending.empty() ||
   1.905 +            mPendingUrgentRequest ||
   1.906 +            mPendingRPCCall ||
   1.907 +            (!mOutOfTurnReplies.empty() &&
   1.908 +             mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
   1.909 +             mOutOfTurnReplies.end()));
   1.910 +}
   1.911 +
   1.912 +bool
   1.913 +MessageChannel::ProcessPendingUrgentRequest()
   1.914 +{
   1.915 +    AssertWorkerThread();
   1.916 +    mMonitor->AssertCurrentThreadOwns();
   1.917 +
   1.918 +    // Note that it is possible we could have sent a sync message at
   1.919 +    // the same time the parent process sent an urgent message, and
   1.920 +    // therefore mPendingUrgentRequest is set *and* mRecvd is set as
   1.921 +    // well, because the link thread received both before the worker
   1.922 +    // thread woke up.
   1.923 +    //
   1.924 +    // In this case, we process the urgent message first, but we need
   1.925 +    // to save the reply.
   1.926 +    nsAutoPtr<Message> savedReply(mRecvd.forget());
   1.927 +
   1.928 +    // We're the child process. We should not be receiving RPC calls.
   1.929 +    IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call");
   1.930 +
   1.931 +    nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
   1.932 +    {
   1.933 +        // In order to send the parent RPC messages and guarantee it will
   1.934 +        // wake up, we must re-use its transaction.
   1.935 +        AutoEnterRPCTransaction transaction(this, recvd);
   1.936 +
   1.937 +        MonitorAutoUnlock unlock(*mMonitor);
   1.938 +        DispatchUrgentMessage(*recvd);
   1.939 +    }
   1.940 +    if (!Connected()) {
   1.941 +        ReportConnectionError("MessageChannel::DispatchUrgentMessage");
   1.942 +        return false;
   1.943 +    }
   1.944 +
   1.945 +    // In between having dispatched our reply to the parent process, and
   1.946 +    // re-acquiring the monitor, the parent process could have already
   1.947 +    // processed that reply and sent the reply to our sync message. If so,
   1.948 +    // our saved reply should be empty.
   1.949 +    IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
   1.950 +    if (!mRecvd)
   1.951 +        mRecvd = savedReply.forget();
   1.952 +    return true;
   1.953 +}
   1.954 +
   1.955 +bool
   1.956 +MessageChannel::ProcessPendingRPCCall()
   1.957 +{
   1.958 +    AssertWorkerThread();
   1.959 +    mMonitor->AssertCurrentThreadOwns();
   1.960 +
   1.961 +    // See comment above re: mRecvd replies and incoming calls.
   1.962 +    nsAutoPtr<Message> savedReply(mRecvd.forget());
   1.963 +
   1.964 +    IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message");
   1.965 +
   1.966 +    nsAutoPtr<Message> recvd(mPendingRPCCall.forget());
   1.967 +    {
   1.968 +        // If we are not currently in a transaction, this will begin one,
   1.969 +        // and the link thread will not wake us up for any RPC messages not
   1.970 +        // apart of this transaction. If we are already in a transaction,
   1.971 +        // then this will assert that we're still in the same transaction.
   1.972 +        AutoEnterRPCTransaction transaction(this, recvd);
   1.973 +
   1.974 +        MonitorAutoUnlock unlock(*mMonitor);
   1.975 +        DispatchRPCMessage(*recvd);
   1.976 +    }
   1.977 +    if (!Connected()) {
   1.978 +        ReportConnectionError("MessageChannel::DispatchRPCMessage");
   1.979 +        return false;
   1.980 +    }
   1.981 +
   1.982 +    // In between having dispatched our reply to the parent process, and
   1.983 +    // re-acquiring the monitor, the parent process could have already
   1.984 +    // processed that reply and sent the reply to our sync message. If so,
   1.985 +    // our saved reply should be empty.
   1.986 +    IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
   1.987 +    if (!mRecvd)
   1.988 +        mRecvd = savedReply.forget();
   1.989 +    return true;
   1.990 +}
   1.991 +
   1.992 +bool
   1.993 +MessageChannel::DequeueOne(Message *recvd)
   1.994 +{
   1.995 +    AssertWorkerThread();
   1.996 +    mMonitor->AssertCurrentThreadOwns();
   1.997 +
   1.998 +    if (!Connected()) {
   1.999 +        ReportConnectionError("OnMaybeDequeueOne");
  1.1000 +        return false;
  1.1001 +    }
  1.1002 +
  1.1003 +    if (mPendingUrgentRequest) {
  1.1004 +        *recvd = *mPendingUrgentRequest;
  1.1005 +        mPendingUrgentRequest = nullptr;
  1.1006 +        return true;
  1.1007 +    }
  1.1008 +
  1.1009 +    if (mPendingRPCCall) {
  1.1010 +        *recvd = *mPendingRPCCall;
  1.1011 +        mPendingRPCCall = nullptr;
  1.1012 +        return true;
  1.1013 +    }
  1.1014 +
  1.1015 +    if (!mDeferred.empty())
  1.1016 +        MaybeUndeferIncall();
  1.1017 +
  1.1018 +    if (mPending.empty())
  1.1019 +        return false;
  1.1020 +
  1.1021 +    *recvd = mPending.front();
  1.1022 +    mPending.pop_front();
  1.1023 +    return true;
  1.1024 +}
  1.1025 +
  1.1026 +bool
  1.1027 +MessageChannel::OnMaybeDequeueOne()
  1.1028 +{
  1.1029 +    AssertWorkerThread();
  1.1030 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1031 +
  1.1032 +    Message recvd;
  1.1033 +
  1.1034 +    MonitorAutoLock lock(*mMonitor);
  1.1035 +    if (!DequeueOne(&recvd))
  1.1036 +        return false;
  1.1037 +
  1.1038 +    if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
  1.1039 +        // We probably just received a reply in a nested loop for an
  1.1040 +        // Interrupt call sent before entering that loop.
  1.1041 +        mOutOfTurnReplies[recvd.seqno()] = recvd;
  1.1042 +        return false;
  1.1043 +    }
  1.1044 +
  1.1045 +    {
  1.1046 +        // We should not be in a transaction yet if we're not blocked.
  1.1047 +        MOZ_ASSERT(mCurrentRPCTransaction == 0);
  1.1048 +        AutoEnterRPCTransaction transaction(this, &recvd);
  1.1049 +
  1.1050 +        MonitorAutoUnlock unlock(*mMonitor);
  1.1051 +
  1.1052 +        CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
  1.1053 +        DispatchMessage(recvd);
  1.1054 +    }
  1.1055 +    return true;
  1.1056 +}
  1.1057 +
  1.1058 +void
  1.1059 +MessageChannel::DispatchMessage(const Message &aMsg)
  1.1060 +{
  1.1061 +    if (aMsg.is_sync())
  1.1062 +        DispatchSyncMessage(aMsg);
  1.1063 +    else if (aMsg.is_urgent())
  1.1064 +        DispatchUrgentMessage(aMsg);
  1.1065 +    else if (aMsg.is_interrupt())
  1.1066 +        DispatchInterruptMessage(aMsg, 0);
  1.1067 +    else if (aMsg.is_rpc())
  1.1068 +        DispatchRPCMessage(aMsg);
  1.1069 +    else
  1.1070 +        DispatchAsyncMessage(aMsg);
  1.1071 +}
  1.1072 +
  1.1073 +void
  1.1074 +MessageChannel::DispatchSyncMessage(const Message& aMsg)
  1.1075 +{
  1.1076 +    AssertWorkerThread();
  1.1077 +
  1.1078 +    Message *reply = nullptr;
  1.1079 +
  1.1080 +    mDispatchingSyncMessage = true;
  1.1081 +    Result rv = mListener->OnMessageReceived(aMsg, reply);
  1.1082 +    mDispatchingSyncMessage = false;
  1.1083 +
  1.1084 +    if (!MaybeHandleError(rv, "DispatchSyncMessage")) {
  1.1085 +        delete reply;
  1.1086 +        reply = new Message();
  1.1087 +        reply->set_sync();
  1.1088 +        reply->set_reply();
  1.1089 +        reply->set_reply_error();
  1.1090 +    }
  1.1091 +    reply->set_seqno(aMsg.seqno());
  1.1092 +
  1.1093 +    MonitorAutoLock lock(*mMonitor);
  1.1094 +    if (ChannelConnected == mChannelState)
  1.1095 +        mLink->SendMessage(reply);
  1.1096 +}
  1.1097 +
  1.1098 +void
  1.1099 +MessageChannel::DispatchUrgentMessage(const Message& aMsg)
  1.1100 +{
  1.1101 +    AssertWorkerThread();
  1.1102 +    MOZ_ASSERT(aMsg.is_urgent());
  1.1103 +
  1.1104 +    Message *reply = nullptr;
  1.1105 +
  1.1106 +    mDispatchingUrgentMessageCount++;
  1.1107 +    Result rv = mListener->OnCallReceived(aMsg, reply);
  1.1108 +    mDispatchingUrgentMessageCount--;
  1.1109 +
  1.1110 +    if (!MaybeHandleError(rv, "DispatchUrgentMessage")) {
  1.1111 +        delete reply;
  1.1112 +        reply = new Message();
  1.1113 +        reply->set_urgent();
  1.1114 +        reply->set_reply();
  1.1115 +        reply->set_reply_error();
  1.1116 +    }
  1.1117 +    reply->set_seqno(aMsg.seqno());
  1.1118 +
  1.1119 +    MonitorAutoLock lock(*mMonitor);
  1.1120 +    if (ChannelConnected == mChannelState)
  1.1121 +        mLink->SendMessage(reply);
  1.1122 +}
  1.1123 +
  1.1124 +void
  1.1125 +MessageChannel::DispatchRPCMessage(const Message& aMsg)
  1.1126 +{
  1.1127 +    AssertWorkerThread();
  1.1128 +    MOZ_ASSERT(aMsg.is_rpc());
  1.1129 +
  1.1130 +    Message *reply = nullptr;
  1.1131 +
  1.1132 +    if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) {
  1.1133 +        delete reply;
  1.1134 +        reply = new Message();
  1.1135 +        reply->set_rpc();
  1.1136 +        reply->set_reply();
  1.1137 +        reply->set_reply_error();
  1.1138 +    }
  1.1139 +    reply->set_seqno(aMsg.seqno());
  1.1140 +    
  1.1141 +    MonitorAutoLock lock(*mMonitor);
  1.1142 +    if (ChannelConnected == mChannelState)
  1.1143 +        mLink->SendMessage(reply);
  1.1144 +}
  1.1145 +
  1.1146 +void
  1.1147 +MessageChannel::DispatchAsyncMessage(const Message& aMsg)
  1.1148 +{
  1.1149 +    AssertWorkerThread();
  1.1150 +    MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent());
  1.1151 +
  1.1152 +    if (aMsg.routing_id() == MSG_ROUTING_NONE) {
  1.1153 +        NS_RUNTIMEABORT("unhandled special message!");
  1.1154 +    }
  1.1155 +
  1.1156 +    MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage");
  1.1157 +}
  1.1158 +
  1.1159 +void
  1.1160 +MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth)
  1.1161 +{
  1.1162 +    AssertWorkerThread();
  1.1163 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1164 +
  1.1165 +    IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");
  1.1166 +
  1.1167 +    // Race detection: see the long comment near mRemoteStackDepthGuess in
  1.1168 +    // MessageChannel.h. "Remote" stack depth means our side, and "local" means
  1.1169 +    // the other side.
  1.1170 +    if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
  1.1171 +        // Interrupt in-calls have raced. The winner, if there is one, gets to defer
  1.1172 +        // processing of the other side's in-call.
  1.1173 +        bool defer;
  1.1174 +        const char* winner;
  1.1175 +        switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(),
  1.1176 +                                          (mSide != ChildSide) ? mInterruptStack.top() : aMsg))
  1.1177 +        {
  1.1178 +          case RIPChildWins:
  1.1179 +            winner = "child";
  1.1180 +            defer = (mSide == ChildSide);
  1.1181 +            break;
  1.1182 +          case RIPParentWins:
  1.1183 +            winner = "parent";
  1.1184 +            defer = (mSide != ChildSide);
  1.1185 +            break;
  1.1186 +          case RIPError:
  1.1187 +            NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
  1.1188 +            return;
  1.1189 +          default:
  1.1190 +            NS_RUNTIMEABORT("not reached");
  1.1191 +            return;
  1.1192 +        }
  1.1193 +
  1.1194 +        if (LoggingEnabled()) {
  1.1195 +            printf_stderr("  (%s: %s won, so we're%sdeferring)\n",
  1.1196 +                          (mSide == ChildSide) ? "child" : "parent",
  1.1197 +                          winner,
  1.1198 +                          defer ? " " : " not ");
  1.1199 +        }
  1.1200 +
  1.1201 +        if (defer) {
  1.1202 +            // We now know the other side's stack has one more frame
  1.1203 +            // than we thought.
  1.1204 +            ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
  1.1205 +            mDeferred.push(aMsg);
  1.1206 +            return;
  1.1207 +        }
  1.1208 +
  1.1209 +        // We "lost" and need to process the other side's in-call. Don't need
  1.1210 +        // to fix up the mRemoteStackDepthGuess here, because we're just about
  1.1211 +        // to increment it in DispatchCall(), which will make it correct again.
  1.1212 +    }
  1.1213 +
  1.1214 +#ifdef OS_WIN
  1.1215 +    SyncStackFrame frame(this, true);
  1.1216 +#endif
  1.1217 +
  1.1218 +    Message* reply = nullptr;
  1.1219 +
  1.1220 +    ++mRemoteStackDepthGuess;
  1.1221 +    Result rv = mListener->OnCallReceived(aMsg, reply);
  1.1222 +    --mRemoteStackDepthGuess;
  1.1223 +
  1.1224 +    if (!MaybeHandleError(rv, "DispatchInterruptMessage")) {
  1.1225 +        delete reply;
  1.1226 +        reply = new Message();
  1.1227 +        reply->set_interrupt();
  1.1228 +        reply->set_reply();
  1.1229 +        reply->set_reply_error();
  1.1230 +    }
  1.1231 +    reply->set_seqno(aMsg.seqno());
  1.1232 +
  1.1233 +    MonitorAutoLock lock(*mMonitor);
  1.1234 +    if (ChannelConnected == mChannelState)
  1.1235 +        mLink->SendMessage(reply);
  1.1236 +}
  1.1237 +
  1.1238 +void
  1.1239 +MessageChannel::MaybeUndeferIncall()
  1.1240 +{
  1.1241 +    AssertWorkerThread();
  1.1242 +    mMonitor->AssertCurrentThreadOwns();
  1.1243 +
  1.1244 +    if (mDeferred.empty())
  1.1245 +        return;
  1.1246 +
  1.1247 +    size_t stackDepth = InterruptStackDepth();
  1.1248 +
  1.1249 +    // the other side can only *under*-estimate our actual stack depth
  1.1250 +    IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
  1.1251 +               "fatal logic error");
  1.1252 +
  1.1253 +    if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
  1.1254 +        return;
  1.1255 +
  1.1256 +    // maybe time to process this message
  1.1257 +    Message call = mDeferred.top();
  1.1258 +    mDeferred.pop();
  1.1259 +
  1.1260 +    // fix up fudge factor we added to account for race
  1.1261 +    IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
  1.1262 +    --mRemoteStackDepthGuess;
  1.1263 +
  1.1264 +    mPending.push_back(call);
  1.1265 +}
  1.1266 +
  1.1267 +void
  1.1268 +MessageChannel::FlushPendingInterruptQueue()
  1.1269 +{
  1.1270 +    AssertWorkerThread();
  1.1271 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1272 +
  1.1273 +    {
  1.1274 +        MonitorAutoLock lock(*mMonitor);
  1.1275 +
  1.1276 +        if (mDeferred.empty()) {
  1.1277 +            if (mPending.empty())
  1.1278 +                return;
  1.1279 +
  1.1280 +            const Message& last = mPending.back();
  1.1281 +            if (!last.is_interrupt() || last.is_reply())
  1.1282 +                return;
  1.1283 +        }
  1.1284 +    }
  1.1285 +
  1.1286 +    while (OnMaybeDequeueOne());
  1.1287 +}
  1.1288 +
  1.1289 +void
  1.1290 +MessageChannel::ExitedCxxStack()
  1.1291 +{
  1.1292 +    mListener->OnExitedCxxStack();
  1.1293 +    if (mSawInterruptOutMsg) {
  1.1294 +        MonitorAutoLock lock(*mMonitor);
  1.1295 +        // see long comment in OnMaybeDequeueOne()
  1.1296 +        EnqueuePendingMessages();
  1.1297 +        mSawInterruptOutMsg = false;
  1.1298 +    }
  1.1299 +}
  1.1300 +
  1.1301 +void
  1.1302 +MessageChannel::EnqueuePendingMessages()
  1.1303 +{
  1.1304 +    AssertWorkerThread();
  1.1305 +    mMonitor->AssertCurrentThreadOwns();
  1.1306 +
  1.1307 +    MaybeUndeferIncall();
  1.1308 +
  1.1309 +    for (size_t i = 0; i < mDeferred.size(); ++i) {
  1.1310 +        mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
  1.1311 +    }
  1.1312 +
  1.1313 +    // XXX performance tuning knob: could process all or k pending
  1.1314 +    // messages here, rather than enqueuing for later processing
  1.1315 +
  1.1316 +    for (size_t i = 0; i < mPending.size(); ++i) {
  1.1317 +        mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
  1.1318 +    }
  1.1319 +}
  1.1320 +
  1.1321 +static inline bool
  1.1322 +IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
  1.1323 +{
  1.1324 +    return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
  1.1325 +           (aTimeout <= (PR_IntervalNow() - aStart));
  1.1326 +}
  1.1327 +
  1.1328 +bool
  1.1329 +MessageChannel::WaitResponse(bool aWaitTimedOut)
  1.1330 +{
  1.1331 +    if (aWaitTimedOut) {
  1.1332 +        if (mInTimeoutSecondHalf) {
  1.1333 +            // We've really timed out this time.
  1.1334 +            return false;
  1.1335 +        }
  1.1336 +        // Try a second time.
  1.1337 +        mInTimeoutSecondHalf = true;
  1.1338 +    } else {
  1.1339 +        mInTimeoutSecondHalf = false;
  1.1340 +    }
  1.1341 +    return true;
  1.1342 +}
  1.1343 +
  1.1344 +#ifndef OS_WIN
  1.1345 +bool
  1.1346 +MessageChannel::WaitForSyncNotify()
  1.1347 +{
  1.1348 +    PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
  1.1349 +                             PR_INTERVAL_NO_TIMEOUT :
  1.1350 +                             PR_MillisecondsToInterval(mTimeoutMs);
  1.1351 +    // XXX could optimize away this syscall for "no timeout" case if desired
  1.1352 +    PRIntervalTime waitStart = PR_IntervalNow();
  1.1353 +
  1.1354 +    mMonitor->Wait(timeout);
  1.1355 +
  1.1356 +    // If the timeout didn't expire, we know we received an event. The
  1.1357 +    // converse is not true.
  1.1358 +    return WaitResponse(IsTimeoutExpired(waitStart, timeout));
  1.1359 +}
  1.1360 +
  1.1361 +bool
  1.1362 +MessageChannel::WaitForInterruptNotify()
  1.1363 +{
  1.1364 +    return WaitForSyncNotify();
  1.1365 +}
  1.1366 +
  1.1367 +void
  1.1368 +MessageChannel::NotifyWorkerThread()
  1.1369 +{
  1.1370 +    mMonitor->Notify();
  1.1371 +}
  1.1372 +#endif
  1.1373 +
  1.1374 +bool
  1.1375 +MessageChannel::ShouldContinueFromTimeout()
  1.1376 +{
  1.1377 +    AssertWorkerThread();
  1.1378 +    mMonitor->AssertCurrentThreadOwns();
  1.1379 +
  1.1380 +    bool cont;
  1.1381 +    {
  1.1382 +        MonitorAutoUnlock unlock(*mMonitor);
  1.1383 +        cont = mListener->OnReplyTimeout();
  1.1384 +    }
  1.1385 +
  1.1386 +    static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
  1.1387 +
  1.1388 +    if (sDebuggingChildren == UNKNOWN) {
  1.1389 +        sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
  1.1390 +    }
  1.1391 +    if (sDebuggingChildren == DEBUGGING) {
  1.1392 +        return true;
  1.1393 +    }
  1.1394 +
  1.1395 +    if (!cont) {
  1.1396 +        // NB: there's a sublety here.  If parents were allowed to send sync
  1.1397 +        // messages to children, then it would be possible for this
  1.1398 +        // synchronous close-on-timeout to race with async |OnMessageReceived|
  1.1399 +        // tasks arriving from the child, posted to the worker thread's event
  1.1400 +        // loop.  This would complicate cleanup of the *Channel.  But since
  1.1401 +        // IPDL forbids this (and since it doesn't support children timing out
  1.1402 +        // on parents), the parent can only block on interrupt messages to the child,
  1.1403 +        // and in that case arriving async messages are enqueued to the interrupt 
  1.1404 +        // channel's special queue.  They're then ignored because the channel
  1.1405 +        // state changes to ChannelTimeout (i.e. !Connected).
  1.1406 +        SynchronouslyClose();
  1.1407 +        mChannelState = ChannelTimeout;
  1.1408 +    }
  1.1409 +
  1.1410 +    return cont;
  1.1411 +}
  1.1412 +
  1.1413 +void
  1.1414 +MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
  1.1415 +{
  1.1416 +    // Set channel timeout value. Since this is broken up into
  1.1417 +    // two period, the minimum timeout value is 2ms.
  1.1418 +    AssertWorkerThread();
  1.1419 +    mTimeoutMs = (aTimeoutMs <= 0)
  1.1420 +                 ? kNoTimeout
  1.1421 +                 : (int32_t)ceil((double)aTimeoutMs / 2.0);
  1.1422 +}
  1.1423 +
  1.1424 +void
  1.1425 +MessageChannel::OnChannelConnected(int32_t peer_id)
  1.1426 +{
  1.1427 +    mWorkerLoop->PostTask(
  1.1428 +        FROM_HERE,
  1.1429 +        NewRunnableMethod(this,
  1.1430 +                          &MessageChannel::DispatchOnChannelConnected,
  1.1431 +                          peer_id));
  1.1432 +}
  1.1433 +
  1.1434 +void
  1.1435 +MessageChannel::DispatchOnChannelConnected(int32_t peer_pid)
  1.1436 +{
  1.1437 +    AssertWorkerThread();
  1.1438 +    if (mListener)
  1.1439 +        mListener->OnChannelConnected(peer_pid);
  1.1440 +}
  1.1441 +
  1.1442 +void
  1.1443 +MessageChannel::ReportMessageRouteError(const char* channelName) const
  1.1444 +{
  1.1445 +    PrintErrorMessage(mSide, channelName, "Need a route");
  1.1446 +    mListener->OnProcessingError(MsgRouteError);
  1.1447 +}
  1.1448 +
  1.1449 +void
  1.1450 +MessageChannel::ReportConnectionError(const char* aChannelName) const
  1.1451 +{
  1.1452 +    AssertWorkerThread();
  1.1453 +    mMonitor->AssertCurrentThreadOwns();
  1.1454 +
  1.1455 +    const char* errorMsg = nullptr;
  1.1456 +    switch (mChannelState) {
  1.1457 +      case ChannelClosed:
  1.1458 +        errorMsg = "Closed channel: cannot send/recv";
  1.1459 +        break;
  1.1460 +      case ChannelOpening:
  1.1461 +        errorMsg = "Opening channel: not yet ready for send/recv";
  1.1462 +        break;
  1.1463 +      case ChannelTimeout:
  1.1464 +        errorMsg = "Channel timeout: cannot send/recv";
  1.1465 +        break;
  1.1466 +      case ChannelClosing:
  1.1467 +        errorMsg = "Channel closing: too late to send/recv, messages will be lost";
  1.1468 +        break;
  1.1469 +      case ChannelError:
  1.1470 +        errorMsg = "Channel error: cannot send/recv";
  1.1471 +        break;
  1.1472 +
  1.1473 +      default:
  1.1474 +        NS_RUNTIMEABORT("unreached");
  1.1475 +    }
  1.1476 +
  1.1477 +    PrintErrorMessage(mSide, aChannelName, errorMsg);
  1.1478 +
  1.1479 +    MonitorAutoUnlock unlock(*mMonitor);
  1.1480 +    mListener->OnProcessingError(MsgDropped);
  1.1481 +}
  1.1482 +
  1.1483 +bool
  1.1484 +MessageChannel::MaybeHandleError(Result code, const char* channelName)
  1.1485 +{
  1.1486 +    if (MsgProcessed == code)
  1.1487 +        return true;
  1.1488 +
  1.1489 +    const char* errorMsg = nullptr;
  1.1490 +    switch (code) {
  1.1491 +      case MsgNotKnown:
  1.1492 +        errorMsg = "Unknown message: not processed";
  1.1493 +        break;
  1.1494 +      case MsgNotAllowed:
  1.1495 +        errorMsg = "Message not allowed: cannot be sent/recvd in this state";
  1.1496 +        break;
  1.1497 +      case MsgPayloadError:
  1.1498 +        errorMsg = "Payload error: message could not be deserialized";
  1.1499 +        break;
  1.1500 +      case MsgProcessingError:
  1.1501 +        errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
  1.1502 +        break;
  1.1503 +      case MsgRouteError:
  1.1504 +        errorMsg = "Route error: message sent to unknown actor ID";
  1.1505 +        break;
  1.1506 +      case MsgValueError:
  1.1507 +        errorMsg = "Value error: message was deserialized, but contained an illegal value";
  1.1508 +        break;
  1.1509 +
  1.1510 +    default:
  1.1511 +        NS_RUNTIMEABORT("unknown Result code");
  1.1512 +        return false;
  1.1513 +    }
  1.1514 +
  1.1515 +    PrintErrorMessage(mSide, channelName, errorMsg);
  1.1516 +
  1.1517 +    mListener->OnProcessingError(code);
  1.1518 +
  1.1519 +    return false;
  1.1520 +}
  1.1521 +
  1.1522 +void
  1.1523 +MessageChannel::OnChannelErrorFromLink()
  1.1524 +{
  1.1525 +    AssertLinkThread();
  1.1526 +    mMonitor->AssertCurrentThreadOwns();
  1.1527 +
  1.1528 +    if (InterruptStackDepth() > 0)
  1.1529 +        NotifyWorkerThread();
  1.1530 +
  1.1531 +    if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply())
  1.1532 +        NotifyWorkerThread();
  1.1533 +
  1.1534 +    if (ChannelClosing != mChannelState) {
  1.1535 +        if (mAbortOnError) {
  1.1536 +            NS_RUNTIMEABORT("Aborting on channel error.");
  1.1537 +        }
  1.1538 +        mChannelState = ChannelError;
  1.1539 +        mMonitor->Notify();
  1.1540 +    }
  1.1541 +
  1.1542 +    PostErrorNotifyTask();
  1.1543 +}
  1.1544 +
  1.1545 +void
  1.1546 +MessageChannel::NotifyMaybeChannelError()
  1.1547 +{
  1.1548 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1549 +
  1.1550 +    // TODO sort out Close() on this side racing with Close() on the other side
  1.1551 +    if (ChannelClosing == mChannelState) {
  1.1552 +        // the channel closed, but we received a "Goodbye" message warning us
  1.1553 +        // about it. no worries
  1.1554 +        mChannelState = ChannelClosed;
  1.1555 +        NotifyChannelClosed();
  1.1556 +        return;
  1.1557 +    }
  1.1558 +
  1.1559 +    // Oops, error!  Let the listener know about it.
  1.1560 +    mChannelState = ChannelError;
  1.1561 +    mListener->OnChannelError();
  1.1562 +    Clear();
  1.1563 +}
  1.1564 +
  1.1565 +void
  1.1566 +MessageChannel::OnNotifyMaybeChannelError()
  1.1567 +{
  1.1568 +    AssertWorkerThread();
  1.1569 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1570 +
  1.1571 +    mChannelErrorTask = nullptr;
  1.1572 +
  1.1573 +    // OnChannelError holds mMonitor when it posts this task and this
  1.1574 +    // task cannot be allowed to run until OnChannelError has
  1.1575 +    // exited. We enforce that order by grabbing the mutex here which
  1.1576 +    // should only continue once OnChannelError has completed.
  1.1577 +    {
  1.1578 +        MonitorAutoLock lock(*mMonitor);
  1.1579 +        // nothing to do here
  1.1580 +    }
  1.1581 +
  1.1582 +    if (IsOnCxxStack()) {
  1.1583 +        mChannelErrorTask =
  1.1584 +            NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
  1.1585 +        // 10 ms delay is completely arbitrary
  1.1586 +        mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
  1.1587 +        return;
  1.1588 +    }
  1.1589 +
  1.1590 +    NotifyMaybeChannelError();
  1.1591 +}
  1.1592 +
  1.1593 +void
  1.1594 +MessageChannel::PostErrorNotifyTask()
  1.1595 +{
  1.1596 +    mMonitor->AssertCurrentThreadOwns();
  1.1597 +
  1.1598 +    if (mChannelErrorTask)
  1.1599 +        return;
  1.1600 +
  1.1601 +    // This must be the last code that runs on this thread!
  1.1602 +    mChannelErrorTask =
  1.1603 +        NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
  1.1604 +    mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
  1.1605 +}
  1.1606 +
  1.1607 +// Special async message.
  1.1608 +class GoodbyeMessage : public IPC::Message
  1.1609 +{
  1.1610 +public:
  1.1611 +    GoodbyeMessage() :
  1.1612 +        IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL)
  1.1613 +    {
  1.1614 +    }
  1.1615 +    static bool Read(const Message* msg) {
  1.1616 +        return true;
  1.1617 +    }
  1.1618 +    void Log(const std::string& aPrefix, FILE* aOutf) const {
  1.1619 +        fputs("(special `Goodbye' message)", aOutf);
  1.1620 +    }
  1.1621 +};
  1.1622 +
  1.1623 +void
  1.1624 +MessageChannel::SynchronouslyClose()
  1.1625 +{
  1.1626 +    AssertWorkerThread();
  1.1627 +    mMonitor->AssertCurrentThreadOwns();
  1.1628 +    mLink->SendClose();
  1.1629 +    while (ChannelClosed != mChannelState)
  1.1630 +        mMonitor->Wait();
  1.1631 +}
  1.1632 +
  1.1633 +void
  1.1634 +MessageChannel::CloseWithError()
  1.1635 +{
  1.1636 +    AssertWorkerThread();
  1.1637 +
  1.1638 +    MonitorAutoLock lock(*mMonitor);
  1.1639 +    if (ChannelConnected != mChannelState) {
  1.1640 +        return;
  1.1641 +    }
  1.1642 +    SynchronouslyClose();
  1.1643 +    mChannelState = ChannelError;
  1.1644 +    PostErrorNotifyTask();
  1.1645 +}
  1.1646 +
  1.1647 +void
  1.1648 +MessageChannel::Close()
  1.1649 +{
  1.1650 +    AssertWorkerThread();
  1.1651 +
  1.1652 +    {
  1.1653 +        MonitorAutoLock lock(*mMonitor);
  1.1654 +
  1.1655 +        if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
  1.1656 +            // See bug 538586: if the listener gets deleted while the
  1.1657 +            // IO thread's NotifyChannelError event is still enqueued
  1.1658 +            // and subsequently deletes us, then the error event will
  1.1659 +            // also be deleted and the listener will never be notified
  1.1660 +            // of the channel error.
  1.1661 +            if (mListener) {
  1.1662 +                MonitorAutoUnlock unlock(*mMonitor);
  1.1663 +                NotifyMaybeChannelError();
  1.1664 +            }
  1.1665 +            return;
  1.1666 +        }
  1.1667 +
  1.1668 +        if (ChannelOpening == mChannelState) {
  1.1669 +            // Mimic CloseWithError().
  1.1670 +            SynchronouslyClose();
  1.1671 +            mChannelState = ChannelError;
  1.1672 +            PostErrorNotifyTask();
  1.1673 +            return;
  1.1674 +        }
  1.1675 +
  1.1676 +        if (ChannelConnected != mChannelState) {
  1.1677 +            // XXX be strict about this until there's a compelling reason
  1.1678 +            // to relax
  1.1679 +            NS_RUNTIMEABORT("Close() called on closed channel!");
  1.1680 +        }
  1.1681 +
  1.1682 +        // notify the other side that we're about to close our socket
  1.1683 +        mLink->SendMessage(new GoodbyeMessage());
  1.1684 +        SynchronouslyClose();
  1.1685 +    }
  1.1686 +
  1.1687 +    NotifyChannelClosed();
  1.1688 +}
  1.1689 +
  1.1690 +void
  1.1691 +MessageChannel::NotifyChannelClosed()
  1.1692 +{
  1.1693 +    mMonitor->AssertNotCurrentThreadOwns();
  1.1694 +
  1.1695 +    if (ChannelClosed != mChannelState)
  1.1696 +        NS_RUNTIMEABORT("channel should have been closed!");
  1.1697 +
  1.1698 +    // OK, the IO thread just closed the channel normally.  Let the
  1.1699 +    // listener know about it.
  1.1700 +    mListener->OnChannelClose();
  1.1701 +
  1.1702 +    Clear();
  1.1703 +}
  1.1704 +
  1.1705 +void
  1.1706 +MessageChannel::DebugAbort(const char* file, int line, const char* cond,
  1.1707 +                           const char* why,
  1.1708 +                           bool reply) const
  1.1709 +{
  1.1710 +    printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
  1.1711 +                  "Assertion (%s) failed.  %s %s\n",
  1.1712 +                  mSide == ChildSide ? "Child" : "Parent",
  1.1713 +                  file, line, cond,
  1.1714 +                  why,
  1.1715 +                  reply ? "(reply)" : "");
  1.1716 +    // technically we need the mutex for this, but we're dying anyway
  1.1717 +    DumpInterruptStack("  ");
  1.1718 +    printf_stderr("  remote Interrupt stack guess: %lu\n",
  1.1719 +                  mRemoteStackDepthGuess);
  1.1720 +    printf_stderr("  deferred stack size: %lu\n",
  1.1721 +                  mDeferred.size());
  1.1722 +    printf_stderr("  out-of-turn Interrupt replies stack size: %lu\n",
  1.1723 +                  mOutOfTurnReplies.size());
  1.1724 +    printf_stderr("  Pending queue size: %lu, front to back:\n",
  1.1725 +                  mPending.size());
  1.1726 +
  1.1727 +    MessageQueue pending = mPending;
  1.1728 +    while (!pending.empty()) {
  1.1729 +        printf_stderr("    [ %s%s ]\n",
  1.1730 +                      pending.front().is_interrupt() ? "intr" :
  1.1731 +                      (pending.front().is_sync() ? "sync" : "async"),
  1.1732 +                      pending.front().is_reply() ? "reply" : "");
  1.1733 +        pending.pop_front();
  1.1734 +    }
  1.1735 +
  1.1736 +    NS_RUNTIMEABORT(why);
  1.1737 +}
  1.1738 +
  1.1739 +void
  1.1740 +MessageChannel::DumpInterruptStack(const char* const pfx) const
  1.1741 +{
  1.1742 +    NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
  1.1743 +                     "The worker thread had better be paused in a debugger!");
  1.1744 +
  1.1745 +    printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
  1.1746 +
  1.1747 +    // print a python-style backtrace, first frame to last
  1.1748 +    for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
  1.1749 +        int32_t id;
  1.1750 +        const char* dir, *sems, *name;
  1.1751 +        mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
  1.1752 +
  1.1753 +        printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
  1.1754 +                      i, dir, sems, name, id);
  1.1755 +    }
  1.1756 +}
  1.1757 +
  1.1758 +} // ipc
  1.1759 +} // mozilla

mercurial