diff -r 000000000000 -r 6474c204b198 ipc/glue/MessageChannel.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ipc/glue/MessageChannel.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,1756 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: sw=4 ts=4 et : + */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/ipc/MessageChannel.h" +#include "mozilla/ipc/ProtocolUtils.h" + +#include "mozilla/Assertions.h" +#include "mozilla/DebugOnly.h" +#include "mozilla/Move.h" +#include "nsDebug.h" +#include "nsISupportsImpl.h" + +// Undo the damage done by mozzconf.h +#undef compress + +using namespace mozilla; +using namespace std; + +using mozilla::MonitorAutoLock; +using mozilla::MonitorAutoUnlock; + +template<> +struct RunnableMethodTraits +{ + static void RetainCallee(mozilla::ipc::MessageChannel* obj) { } + static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { } +}; + +#define IPC_ASSERT(_cond, ...) \ + do { \ + if (!(_cond)) \ + DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \ + } while (0) + +namespace mozilla { +namespace ipc { + +const int32_t MessageChannel::kNoTimeout = INT32_MIN; + +// static +bool MessageChannel::sIsPumpingMessages = false; + +enum Direction +{ + IN_MESSAGE, + OUT_MESSAGE +}; + + +class MessageChannel::InterruptFrame +{ +private: + enum Semantics + { + INTR_SEMS, + SYNC_SEMS, + ASYNC_SEMS + }; + +public: + InterruptFrame(Direction direction, const Message* msg) + : mMessageName(strdup(msg->name())), + mMessageRoutingId(msg->routing_id()), + mMesageSemantics(msg->is_interrupt() ? INTR_SEMS : + msg->is_sync() ? SYNC_SEMS : + ASYNC_SEMS), + mDirection(direction), + mMoved(false) + { + MOZ_ASSERT(mMessageName); + } + + InterruptFrame(InterruptFrame&& aOther) + { + MOZ_ASSERT(aOther.mMessageName); + mMessageName = aOther.mMessageName; + aOther.mMessageName = nullptr; + aOther.mMoved = true; + + mMessageRoutingId = aOther.mMessageRoutingId; + mMesageSemantics = aOther.mMesageSemantics; + mDirection = aOther.mDirection; + } + + ~InterruptFrame() + { + MOZ_ASSERT_IF(!mMessageName, mMoved); + + if (mMessageName) + free(const_cast(mMessageName)); + } + + InterruptFrame& operator=(InterruptFrame&& aOther) + { + MOZ_ASSERT(&aOther != this); + this->~InterruptFrame(); + new (this) InterruptFrame(mozilla::Move(aOther)); + return *this; + } + + bool IsInterruptIncall() const + { + return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection; + } + + bool IsInterruptOutcall() const + { + return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection; + } + + void Describe(int32_t* id, const char** dir, const char** sems, + const char** name) const + { + *id = mMessageRoutingId; + *dir = (IN_MESSAGE == mDirection) ? "in" : "out"; + *sems = (INTR_SEMS == mMesageSemantics) ? "intr" : + (SYNC_SEMS == mMesageSemantics) ? "sync" : + "async"; + *name = mMessageName; + } + +private: + const char* mMessageName; + int32_t mMessageRoutingId; + Semantics mMesageSemantics; + Direction mDirection; + DebugOnly mMoved; + + // Disable harmful methods. + InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE; + InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE; +}; + +class MOZ_STACK_CLASS MessageChannel::CxxStackFrame +{ +public: + CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg) + : mThat(that) + { + mThat.AssertWorkerThread(); + + if (mThat.mCxxStackFrames.empty()) + mThat.EnteredCxxStack(); + + mThat.mCxxStackFrames.append(InterruptFrame(direction, msg)); + + const InterruptFrame& frame = mThat.mCxxStackFrames.back(); + + if (frame.IsInterruptIncall()) + mThat.EnteredCall(); + + mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall(); + } + + ~CxxStackFrame() { + mThat.AssertWorkerThread(); + + MOZ_ASSERT(!mThat.mCxxStackFrames.empty()); + + bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall(); + mThat.mCxxStackFrames.shrinkBy(1); + + bool exitingStack = mThat.mCxxStackFrames.empty(); + + // mListener could have gone away if Close() was called while + // MessageChannel code was still on the stack + if (!mThat.mListener) + return; + + if (exitingCall) + mThat.ExitedCall(); + + if (exitingStack) + mThat.ExitedCxxStack(); + } +private: + MessageChannel& mThat; + + // Disable harmful methods. + CxxStackFrame() MOZ_DELETE; + CxxStackFrame(const CxxStackFrame&) MOZ_DELETE; + CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE; +}; + +MessageChannel::MessageChannel(MessageListener *aListener) + : mListener(aListener->asWeakPtr()), + mChannelState(ChannelClosed), + mSide(UnknownSide), + mLink(nullptr), + mWorkerLoop(nullptr), + mChannelErrorTask(nullptr), + mWorkerLoopID(-1), + mTimeoutMs(kNoTimeout), + mInTimeoutSecondHalf(false), + mNextSeqno(0), + mPendingSyncReplies(0), + mPendingUrgentReplies(0), + mPendingRPCReplies(0), + mCurrentRPCTransaction(0), + mDispatchingSyncMessage(false), + mDispatchingUrgentMessageCount(0), + mRemoteStackDepthGuess(false), + mSawInterruptOutMsg(false), + mAbortOnError(false), + mFlags(REQUIRE_DEFAULT) +{ + MOZ_COUNT_CTOR(ipc::MessageChannel); + +#ifdef OS_WIN + mTopFrame = nullptr; + mIsSyncWaitingOnNonMainThread = false; +#endif + + mDequeueOneTask = new RefCountedTask(NewRunnableMethod( + this, + &MessageChannel::OnMaybeDequeueOne)); + +#ifdef OS_WIN + mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr); + NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!"); +#endif +} + +MessageChannel::~MessageChannel() +{ + MOZ_COUNT_DTOR(ipc::MessageChannel); + IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors"); +#ifdef OS_WIN + DebugOnly ok = CloseHandle(mEvent); + MOZ_ASSERT(ok); +#endif + Clear(); +} + +static void +PrintErrorMessage(Side side, const char* channelName, const char* msg) +{ + const char *from = (side == ChildSide) + ? "Child" + : ((side == ParentSide) ? "Parent" : "Unknown"); + printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg); +} + +bool +MessageChannel::Connected() const +{ + mMonitor->AssertCurrentThreadOwns(); + + // The transport layer allows us to send messages before + // receiving the "connected" ack from the remote side. + return (ChannelOpening == mChannelState || ChannelConnected == mChannelState); +} + +bool +MessageChannel::CanSend() const +{ + MonitorAutoLock lock(*mMonitor); + return Connected(); +} + +void +MessageChannel::Clear() +{ + // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and + // AssertWorkerThread(). + // + // Also don't clear mListener. If we clear it, then sending a message + // through this channel after it's Clear()'ed can cause this process to + // crash. + // + // In practice, mListener owns the channel, so the channel gets deleted + // before mListener. But just to be safe, mListener is a weak pointer. + + mDequeueOneTask->Cancel(); + + mWorkerLoop = nullptr; + delete mLink; + mLink = nullptr; + + if (mChannelErrorTask) { + mChannelErrorTask->Cancel(); + mChannelErrorTask = nullptr; + } + + // Free up any memory used by pending messages. + mPending.clear(); + mPendingUrgentRequest = nullptr; + mPendingRPCCall = nullptr; + mOutOfTurnReplies.clear(); + while (!mDeferred.empty()) { + mDeferred.pop(); + } +} + +bool +MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide) +{ + NS_PRECONDITION(!mLink, "Open() called > once"); + + mMonitor = new RefCountedMonitor(); + mWorkerLoop = MessageLoop::current(); + mWorkerLoopID = mWorkerLoop->id(); + + ProcessLink *link = new ProcessLink(this); + link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild + mLink = link; + return true; +} + +bool +MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide) +{ + // Opens a connection to another thread in the same process. + + // This handshake proceeds as follows: + // - Let A be the thread initiating the process (either child or parent) + // and B be the other thread. + // - A spawns thread for B, obtaining B's message loop + // - A creates ProtocolChild and ProtocolParent instances. + // Let PA be the one appropriate to A and PB the side for B. + // - A invokes PA->Open(PB, ...): + // - set state to mChannelOpening + // - this will place a work item in B's worker loop (see next bullet) + // and then spins until PB->mChannelState becomes mChannelConnected + // - meanwhile, on PB's worker loop, the work item is removed and: + // - invokes PB->SlaveOpen(PA, ...): + // - sets its state and that of PA to Connected + NS_PRECONDITION(aTargetChan, "Need a target channel"); + NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed"); + + CommonThreadOpenInit(aTargetChan, aSide); + + Side oppSide = UnknownSide; + switch(aSide) { + case ChildSide: oppSide = ParentSide; break; + case ParentSide: oppSide = ChildSide; break; + case UnknownSide: break; + } + + mMonitor = new RefCountedMonitor(); + + MonitorAutoLock lock(*mMonitor); + mChannelState = ChannelOpening; + aTargetLoop->PostTask( + FROM_HERE, + NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide)); + + while (ChannelOpening == mChannelState) + mMonitor->Wait(); + NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken"); + return (ChannelConnected == mChannelState); +} + +void +MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide) +{ + // Invoked when the other side has begun the open. + NS_PRECONDITION(ChannelClosed == mChannelState, + "Not currently closed"); + NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + + CommonThreadOpenInit(aTargetChan, aSide); + mMonitor = aTargetChan->mMonitor; + + MonitorAutoLock lock(*mMonitor); + NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + mChannelState = ChannelConnected; + aTargetChan->mChannelState = ChannelConnected; + aTargetChan->mMonitor->Notify(); +} + +void +MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide) +{ + mWorkerLoop = MessageLoop::current(); + mWorkerLoopID = mWorkerLoop->id(); + mLink = new ThreadLink(this, aTargetChan); + mSide = aSide; +} + +bool +MessageChannel::Echo(Message* aMsg) +{ + nsAutoPtr msg(aMsg); + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + if (MSG_ROUTING_NONE == msg->routing_id()) { + ReportMessageRouteError("MessageChannel::Echo"); + return false; + } + + MonitorAutoLock lock(*mMonitor); + + if (!Connected()) { + ReportConnectionError("MessageChannel"); + return false; + } + + mLink->EchoMessage(msg.forget()); + return true; +} + +bool +MessageChannel::Send(Message* aMsg) +{ + CxxStackFrame frame(*this, OUT_MESSAGE, aMsg); + + nsAutoPtr msg(aMsg); + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + if (MSG_ROUTING_NONE == msg->routing_id()) { + ReportMessageRouteError("MessageChannel::Send"); + return false; + } + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("MessageChannel"); + return false; + } + mLink->SendMessage(msg.forget()); + return true; +} + +bool +MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (MSG_ROUTING_NONE == aMsg.routing_id() && + GOODBYE_MESSAGE_TYPE == aMsg.type()) + { + // :TODO: Sort out Close() on this side racing with Close() on the + // other side + mChannelState = ChannelClosing; + if (LoggingEnabled()) { + printf("NOTE: %s process received `Goodbye', closing down\n", + (mSide == ChildSide) ? "child" : "parent"); + } + return true; + } + return false; +} + +void +MessageChannel::OnMessageReceivedFromLink(const Message& aMsg) +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (MaybeInterceptSpecialIOMessage(aMsg)) + return; + + // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply, + // we know that it needs to be immediately handled to unblock us. + if ((AwaitingSyncReply() && aMsg.is_sync()) || + (AwaitingUrgentReply() && aMsg.is_urgent()) || + (AwaitingRPCReply() && aMsg.is_rpc())) + { + mRecvd = new Message(aMsg); + NotifyWorkerThread(); + return; + } + + // Urgent messages cannot be compressed. + MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent()); + + bool compress = (aMsg.compress() && !mPending.empty() && + mPending.back().type() == aMsg.type() && + mPending.back().routing_id() == aMsg.routing_id()); + if (compress) { + // This message type has compression enabled, and the back of the + // queue was the same message type and routed to the same destination. + // Replace it with the newer message. + MOZ_ASSERT(mPending.back().compress()); + mPending.pop_back(); + } + + bool shouldWakeUp = AwaitingInterruptReply() || + // Allow incoming RPCs to be processed inside an urgent message. + (AwaitingUrgentReply() && aMsg.is_rpc()) || + // Always process urgent messages while blocked. + ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent()); + + // There are four cases we're concerned about, relating to the state of the + // main thread: + // + // (1) We are waiting on a sync|rpc reply - main thread is blocked on the + // IPC monitor. + // - If the message is high priority, we wake up the main thread to + // deliver the message. Otherwise, we leave it in the mPending queue, + // posting a task to the main event loop, where it will be processed + // once the synchronous reply has been received. + // + // (2) We are waiting on an Interrupt reply - main thread is blocked on the + // IPC monitor. + // - Always notify and wake up the main thread. + // + // (3) We are not waiting on a reply. + // - We post a task to the main event loop. + // + // Note that, we may notify the main thread even though the monitor is not + // blocked. This is okay, since we always check for pending events before + // blocking again. + + if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) { + // If we're receiving an RPC message while blocked on an urgent message, + // we must defer any messages that were not sent as part of the child + // answering the urgent message. + // + // We must also be sure that we will not accidentally defer any RPC + // message that was sent while answering an urgent message. Otherwise, + // we will deadlock. + // + // On the parent side, the current transaction can only transition from 0 + // to an ID, either by us issuing an urgent request while not blocked, or + // by receiving an RPC request while not blocked. When we unblock, the + // current transaction is reset to 0. + // + // When the child side receives an urgent message, any RPC messages sent + // before issuing the urgent reply will carry the urgent message's + // transaction ID. + // + // Since AwaitingUrgentReply() implies we are blocked, it also implies + // that we are within a transaction that will not change until we are + // completely unblocked (i.e, the transaction has completed). + if (aMsg.transaction_id() != mCurrentRPCTransaction) + shouldWakeUp = false; + } + + if (aMsg.is_urgent()) { + MOZ_ASSERT(!mPendingUrgentRequest); + mPendingUrgentRequest = new Message(aMsg); + } else if (aMsg.is_rpc() && shouldWakeUp) { + // Only use this slot if we need to wake up for an RPC call. Otherwise + // we treat it like a normal async or sync message. + MOZ_ASSERT(!mPendingRPCCall); + mPendingRPCCall = new Message(aMsg); + } else { + mPending.push_back(aMsg); + } + + if (shouldWakeUp) { + // Always wake up Interrupt waiters, sync waiters for urgent messages, + // RPC waiters for urgent messages, and urgent waiters for RPCs in the + // same transaction. + NotifyWorkerThread(); + } else { + // Worker thread is either not blocked on a reply, or this is an + // incoming Interrupt that raced with outgoing sync, and needs to be + // deferred to a later event-loop iteration. + if (!compress) { + // If we compressed away the previous message, we'll re-use + // its pending task. + mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); + } + } +} + +bool +MessageChannel::Send(Message* aMsg, Message* aReply) +{ + // Sanity checks. + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + +#ifdef OS_WIN + SyncStackFrame frame(this, false); +#endif + + CxxStackFrame f(*this, OUT_MESSAGE, aMsg); + + MonitorAutoLock lock(*mMonitor); + + IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here"); + IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant"); + IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message"); + IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported"); + + AutoEnterPendingReply replies(mPendingSyncReplies); + if (!SendAndWait(aMsg, aReply)) + return false; + + NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync"); + return true; +} + +bool +MessageChannel::UrgentCall(Message* aMsg, Message* aReply) +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child"); + +#ifdef OS_WIN + SyncStackFrame frame(this, false); +#endif + + CxxStackFrame f(*this, OUT_MESSAGE, aMsg); + + MonitorAutoLock lock(*mMonitor); + + IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls"); + IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends"); + + AutoEnterRPCTransaction transact(this); + aMsg->set_transaction_id(mCurrentRPCTransaction); + + AutoEnterPendingReply replies(mPendingUrgentReplies); + if (!SendAndWait(aMsg, aReply)) + return false; + + NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent"); + return true; +} + +bool +MessageChannel::RPCCall(Message* aMsg, Message* aReply) +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent"); + +#ifdef OS_WIN + SyncStackFrame frame(this, false); +#endif + + CxxStackFrame f(*this, OUT_MESSAGE, aMsg); + + MonitorAutoLock lock(*mMonitor); + + AutoEnterRPCTransaction transact(this); + aMsg->set_transaction_id(mCurrentRPCTransaction); + + AutoEnterPendingReply replies(mPendingRPCReplies); + if (!SendAndWait(aMsg, aReply)) + return false; + + NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply"); + return true; +} + +bool +MessageChannel::SendAndWait(Message* aMsg, Message* aReply) +{ + mMonitor->AssertCurrentThreadOwns(); + + nsAutoPtr msg(aMsg); + + if (!Connected()) { + ReportConnectionError("MessageChannel::SendAndWait"); + return false; + } + + msg->set_seqno(NextSeqno()); + + DebugOnly replySeqno = msg->seqno(); + DebugOnly replyType = msg->type() + 1; + + mLink->SendMessage(msg.forget()); + + while (true) { + // Wait for an event to occur. + while (true) { + if (mRecvd || mPendingUrgentRequest || mPendingRPCCall) + break; + + bool maybeTimedOut = !WaitForSyncNotify(); + + if (!Connected()) { + ReportConnectionError("MessageChannel::SendAndWait"); + return false; + } + + if (maybeTimedOut && !ShouldContinueFromTimeout()) + return false; + } + + if (mPendingUrgentRequest && !ProcessPendingUrgentRequest()) + return false; + + if (mPendingRPCCall && !ProcessPendingRPCCall()) + return false; + + if (mRecvd) { + NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply"); + + if (mRecvd->is_reply_error()) { + mRecvd = nullptr; + return false; + } + + NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type"); + NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number"); + + *aReply = *mRecvd; + mRecvd = nullptr; + return true; + } + } + + return true; +} + +bool +MessageChannel::Call(Message* aMsg, Message* aReply) +{ + if (aMsg->is_urgent()) + return UrgentCall(aMsg, aReply); + if (aMsg->is_rpc()) + return RPCCall(aMsg, aReply); + return InterruptCall(aMsg, aReply); +} + +bool +MessageChannel::InterruptCall(Message* aMsg, Message* aReply) +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + +#ifdef OS_WIN + SyncStackFrame frame(this, true); +#endif + + // This must come before MonitorAutoLock, as its destructor acquires the + // monitor lock. + CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg); + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("MessageChannel::Call"); + return false; + } + + // Sanity checks. + IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(), + "cannot issue Interrupt call whiel blocked on sync or urgent"); + IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH, + "violation of sync handler invariant"); + IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here"); + + + nsAutoPtr msg(aMsg); + + msg->set_seqno(NextSeqno()); + msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess); + msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth()); + mInterruptStack.push(*msg); + mLink->SendMessage(msg.forget()); + + while (true) { + // if a handler invoked by *Dispatch*() spun a nested event + // loop, and the connection was broken during that loop, we + // might have already processed the OnError event. if so, + // trying another loop iteration will be futile because + // channel state will have been cleared + if (!Connected()) { + ReportConnectionError("MessageChannel::InterruptCall"); + return false; + } + + // Now might be the time to process a message deferred because of race + // resolution. + MaybeUndeferIncall(); + + // Wait for an event to occur. + while (!InterruptEventOccurred()) { + bool maybeTimedOut = !WaitForInterruptNotify(); + + // We might have received a "subtly deferred" message in a nested + // loop that it's now time to process. + if (InterruptEventOccurred() || + (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty()))) + { + break; + } + + if (maybeTimedOut && !ShouldContinueFromTimeout()) + return false; + } + + Message recvd; + MessageMap::iterator it; + + if (mPendingUrgentRequest) { + recvd = *mPendingUrgentRequest; + mPendingUrgentRequest = nullptr; + } else if (mPendingRPCCall) { + recvd = *mPendingRPCCall; + mPendingRPCCall = nullptr; + } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno())) + != mOutOfTurnReplies.end()) + { + recvd = it->second; + mOutOfTurnReplies.erase(it); + } else if (!mPending.empty()) { + recvd = mPending.front(); + mPending.pop_front(); + } else { + // because of subtleties with nested event loops, it's possible + // that we got here and nothing happened. or, we might have a + // deferred in-call that needs to be processed. either way, we + // won't break the inner while loop again until something new + // happens. + continue; + } + + // If the message is not Interrupt, we can dispatch it as normal. + if (!recvd.is_interrupt()) { + // Other side should be blocked. + IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked"); + + { + AutoEnterRPCTransaction transaction(this, &recvd); + MonitorAutoUnlock unlock(*mMonitor); + CxxStackFrame frame(*this, IN_MESSAGE, &recvd); + DispatchMessage(recvd); + } + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchMessage"); + return false; + } + continue; + } + + // If the message is an Interrupt reply, either process it as a reply to our + // call, or add it to the list of out-of-turn replies we've received. + if (recvd.is_reply()) { + IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack"); + + // If this is not a reply the call we've initiated, add it to our + // out-of-turn replies and keep polling for events. + { + const Message &outcall = mInterruptStack.top(); + + // Note, In the parent, sequence numbers increase from 0, and + // in the child, they decrease from 0. + if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) || + (mSide != ChildSide && recvd.seqno() < outcall.seqno())) + { + mOutOfTurnReplies[recvd.seqno()] = recvd; + continue; + } + + IPC_ASSERT(recvd.is_reply_error() || + (recvd.type() == (outcall.type() + 1) && + recvd.seqno() == outcall.seqno()), + "somebody's misbehavin'", true); + } + + // We received a reply to our most recent outstanding call. Pop + // this frame and return the reply. + mInterruptStack.pop(); + + if (!recvd.is_reply_error()) { + *aReply = recvd; + } + + // If we have no more pending out calls waiting on replies, then + // the reply queue should be empty. + IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(), + "still have pending replies with no pending out-calls", + true); + + return !recvd.is_reply_error(); + } + + // Dispatch an Interrupt in-call. Snapshot the current stack depth while we + // own the monitor. + size_t stackDepth = InterruptStackDepth(); + { + MonitorAutoUnlock unlock(*mMonitor); + + CxxStackFrame frame(*this, IN_MESSAGE, &recvd); + DispatchInterruptMessage(recvd, stackDepth); + } + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchInterruptMessage"); + return false; + } + } + + return true; +} + +bool +MessageChannel::InterruptEventOccurred() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop"); + + return (!Connected() || + !mPending.empty() || + mPendingUrgentRequest || + mPendingRPCCall || + (!mOutOfTurnReplies.empty() && + mOutOfTurnReplies.find(mInterruptStack.top().seqno()) != + mOutOfTurnReplies.end())); +} + +bool +MessageChannel::ProcessPendingUrgentRequest() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + // Note that it is possible we could have sent a sync message at + // the same time the parent process sent an urgent message, and + // therefore mPendingUrgentRequest is set *and* mRecvd is set as + // well, because the link thread received both before the worker + // thread woke up. + // + // In this case, we process the urgent message first, but we need + // to save the reply. + nsAutoPtr savedReply(mRecvd.forget()); + + // We're the child process. We should not be receiving RPC calls. + IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call"); + + nsAutoPtr recvd(mPendingUrgentRequest.forget()); + { + // In order to send the parent RPC messages and guarantee it will + // wake up, we must re-use its transaction. + AutoEnterRPCTransaction transaction(this, recvd); + + MonitorAutoUnlock unlock(*mMonitor); + DispatchUrgentMessage(*recvd); + } + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchUrgentMessage"); + return false; + } + + // In between having dispatched our reply to the parent process, and + // re-acquiring the monitor, the parent process could have already + // processed that reply and sent the reply to our sync message. If so, + // our saved reply should be empty. + IPC_ASSERT(!mRecvd || !savedReply, "unknown reply"); + if (!mRecvd) + mRecvd = savedReply.forget(); + return true; +} + +bool +MessageChannel::ProcessPendingRPCCall() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + // See comment above re: mRecvd replies and incoming calls. + nsAutoPtr savedReply(mRecvd.forget()); + + IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message"); + + nsAutoPtr recvd(mPendingRPCCall.forget()); + { + // If we are not currently in a transaction, this will begin one, + // and the link thread will not wake us up for any RPC messages not + // apart of this transaction. If we are already in a transaction, + // then this will assert that we're still in the same transaction. + AutoEnterRPCTransaction transaction(this, recvd); + + MonitorAutoUnlock unlock(*mMonitor); + DispatchRPCMessage(*recvd); + } + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchRPCMessage"); + return false; + } + + // In between having dispatched our reply to the parent process, and + // re-acquiring the monitor, the parent process could have already + // processed that reply and sent the reply to our sync message. If so, + // our saved reply should be empty. + IPC_ASSERT(!mRecvd || !savedReply, "unknown reply"); + if (!mRecvd) + mRecvd = savedReply.forget(); + return true; +} + +bool +MessageChannel::DequeueOne(Message *recvd) +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (!Connected()) { + ReportConnectionError("OnMaybeDequeueOne"); + return false; + } + + if (mPendingUrgentRequest) { + *recvd = *mPendingUrgentRequest; + mPendingUrgentRequest = nullptr; + return true; + } + + if (mPendingRPCCall) { + *recvd = *mPendingRPCCall; + mPendingRPCCall = nullptr; + return true; + } + + if (!mDeferred.empty()) + MaybeUndeferIncall(); + + if (mPending.empty()) + return false; + + *recvd = mPending.front(); + mPending.pop_front(); + return true; +} + +bool +MessageChannel::OnMaybeDequeueOne() +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + Message recvd; + + MonitorAutoLock lock(*mMonitor); + if (!DequeueOne(&recvd)) + return false; + + if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) { + // We probably just received a reply in a nested loop for an + // Interrupt call sent before entering that loop. + mOutOfTurnReplies[recvd.seqno()] = recvd; + return false; + } + + { + // We should not be in a transaction yet if we're not blocked. + MOZ_ASSERT(mCurrentRPCTransaction == 0); + AutoEnterRPCTransaction transaction(this, &recvd); + + MonitorAutoUnlock unlock(*mMonitor); + + CxxStackFrame frame(*this, IN_MESSAGE, &recvd); + DispatchMessage(recvd); + } + return true; +} + +void +MessageChannel::DispatchMessage(const Message &aMsg) +{ + if (aMsg.is_sync()) + DispatchSyncMessage(aMsg); + else if (aMsg.is_urgent()) + DispatchUrgentMessage(aMsg); + else if (aMsg.is_interrupt()) + DispatchInterruptMessage(aMsg, 0); + else if (aMsg.is_rpc()) + DispatchRPCMessage(aMsg); + else + DispatchAsyncMessage(aMsg); +} + +void +MessageChannel::DispatchSyncMessage(const Message& aMsg) +{ + AssertWorkerThread(); + + Message *reply = nullptr; + + mDispatchingSyncMessage = true; + Result rv = mListener->OnMessageReceived(aMsg, reply); + mDispatchingSyncMessage = false; + + if (!MaybeHandleError(rv, "DispatchSyncMessage")) { + delete reply; + reply = new Message(); + reply->set_sync(); + reply->set_reply(); + reply->set_reply_error(); + } + reply->set_seqno(aMsg.seqno()); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected == mChannelState) + mLink->SendMessage(reply); +} + +void +MessageChannel::DispatchUrgentMessage(const Message& aMsg) +{ + AssertWorkerThread(); + MOZ_ASSERT(aMsg.is_urgent()); + + Message *reply = nullptr; + + mDispatchingUrgentMessageCount++; + Result rv = mListener->OnCallReceived(aMsg, reply); + mDispatchingUrgentMessageCount--; + + if (!MaybeHandleError(rv, "DispatchUrgentMessage")) { + delete reply; + reply = new Message(); + reply->set_urgent(); + reply->set_reply(); + reply->set_reply_error(); + } + reply->set_seqno(aMsg.seqno()); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected == mChannelState) + mLink->SendMessage(reply); +} + +void +MessageChannel::DispatchRPCMessage(const Message& aMsg) +{ + AssertWorkerThread(); + MOZ_ASSERT(aMsg.is_rpc()); + + Message *reply = nullptr; + + if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) { + delete reply; + reply = new Message(); + reply->set_rpc(); + reply->set_reply(); + reply->set_reply_error(); + } + reply->set_seqno(aMsg.seqno()); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected == mChannelState) + mLink->SendMessage(reply); +} + +void +MessageChannel::DispatchAsyncMessage(const Message& aMsg) +{ + AssertWorkerThread(); + MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent()); + + if (aMsg.routing_id() == MSG_ROUTING_NONE) { + NS_RUNTIMEABORT("unhandled special message!"); + } + + MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage"); +} + +void +MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth) +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type"); + + // Race detection: see the long comment near mRemoteStackDepthGuess in + // MessageChannel.h. "Remote" stack depth means our side, and "local" means + // the other side. + if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) { + // Interrupt in-calls have raced. The winner, if there is one, gets to defer + // processing of the other side's in-call. + bool defer; + const char* winner; + switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(), + (mSide != ChildSide) ? mInterruptStack.top() : aMsg)) + { + case RIPChildWins: + winner = "child"; + defer = (mSide == ChildSide); + break; + case RIPParentWins: + winner = "parent"; + defer = (mSide != ChildSide); + break; + case RIPError: + NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy"); + return; + default: + NS_RUNTIMEABORT("not reached"); + return; + } + + if (LoggingEnabled()) { + printf_stderr(" (%s: %s won, so we're%sdeferring)\n", + (mSide == ChildSide) ? "child" : "parent", + winner, + defer ? " " : " not "); + } + + if (defer) { + // We now know the other side's stack has one more frame + // than we thought. + ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred() + mDeferred.push(aMsg); + return; + } + + // We "lost" and need to process the other side's in-call. Don't need + // to fix up the mRemoteStackDepthGuess here, because we're just about + // to increment it in DispatchCall(), which will make it correct again. + } + +#ifdef OS_WIN + SyncStackFrame frame(this, true); +#endif + + Message* reply = nullptr; + + ++mRemoteStackDepthGuess; + Result rv = mListener->OnCallReceived(aMsg, reply); + --mRemoteStackDepthGuess; + + if (!MaybeHandleError(rv, "DispatchInterruptMessage")) { + delete reply; + reply = new Message(); + reply->set_interrupt(); + reply->set_reply(); + reply->set_reply_error(); + } + reply->set_seqno(aMsg.seqno()); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected == mChannelState) + mLink->SendMessage(reply); +} + +void +MessageChannel::MaybeUndeferIncall() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (mDeferred.empty()) + return; + + size_t stackDepth = InterruptStackDepth(); + + // the other side can only *under*-estimate our actual stack depth + IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth, + "fatal logic error"); + + if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth)) + return; + + // maybe time to process this message + Message call = mDeferred.top(); + mDeferred.pop(); + + // fix up fudge factor we added to account for race + IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error"); + --mRemoteStackDepthGuess; + + mPending.push_back(call); +} + +void +MessageChannel::FlushPendingInterruptQueue() +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + { + MonitorAutoLock lock(*mMonitor); + + if (mDeferred.empty()) { + if (mPending.empty()) + return; + + const Message& last = mPending.back(); + if (!last.is_interrupt() || last.is_reply()) + return; + } + } + + while (OnMaybeDequeueOne()); +} + +void +MessageChannel::ExitedCxxStack() +{ + mListener->OnExitedCxxStack(); + if (mSawInterruptOutMsg) { + MonitorAutoLock lock(*mMonitor); + // see long comment in OnMaybeDequeueOne() + EnqueuePendingMessages(); + mSawInterruptOutMsg = false; + } +} + +void +MessageChannel::EnqueuePendingMessages() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + MaybeUndeferIncall(); + + for (size_t i = 0; i < mDeferred.size(); ++i) { + mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); + } + + // XXX performance tuning knob: could process all or k pending + // messages here, rather than enqueuing for later processing + + for (size_t i = 0; i < mPending.size(); ++i) { + mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); + } +} + +static inline bool +IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout) +{ + return (aTimeout != PR_INTERVAL_NO_TIMEOUT) && + (aTimeout <= (PR_IntervalNow() - aStart)); +} + +bool +MessageChannel::WaitResponse(bool aWaitTimedOut) +{ + if (aWaitTimedOut) { + if (mInTimeoutSecondHalf) { + // We've really timed out this time. + return false; + } + // Try a second time. + mInTimeoutSecondHalf = true; + } else { + mInTimeoutSecondHalf = false; + } + return true; +} + +#ifndef OS_WIN +bool +MessageChannel::WaitForSyncNotify() +{ + PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ? + PR_INTERVAL_NO_TIMEOUT : + PR_MillisecondsToInterval(mTimeoutMs); + // XXX could optimize away this syscall for "no timeout" case if desired + PRIntervalTime waitStart = PR_IntervalNow(); + + mMonitor->Wait(timeout); + + // If the timeout didn't expire, we know we received an event. The + // converse is not true. + return WaitResponse(IsTimeoutExpired(waitStart, timeout)); +} + +bool +MessageChannel::WaitForInterruptNotify() +{ + return WaitForSyncNotify(); +} + +void +MessageChannel::NotifyWorkerThread() +{ + mMonitor->Notify(); +} +#endif + +bool +MessageChannel::ShouldContinueFromTimeout() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + bool cont; + { + MonitorAutoUnlock unlock(*mMonitor); + cont = mListener->OnReplyTimeout(); + } + + static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN; + + if (sDebuggingChildren == UNKNOWN) { + sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING; + } + if (sDebuggingChildren == DEBUGGING) { + return true; + } + + if (!cont) { + // NB: there's a sublety here. If parents were allowed to send sync + // messages to children, then it would be possible for this + // synchronous close-on-timeout to race with async |OnMessageReceived| + // tasks arriving from the child, posted to the worker thread's event + // loop. This would complicate cleanup of the *Channel. But since + // IPDL forbids this (and since it doesn't support children timing out + // on parents), the parent can only block on interrupt messages to the child, + // and in that case arriving async messages are enqueued to the interrupt + // channel's special queue. They're then ignored because the channel + // state changes to ChannelTimeout (i.e. !Connected). + SynchronouslyClose(); + mChannelState = ChannelTimeout; + } + + return cont; +} + +void +MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) +{ + // Set channel timeout value. Since this is broken up into + // two period, the minimum timeout value is 2ms. + AssertWorkerThread(); + mTimeoutMs = (aTimeoutMs <= 0) + ? kNoTimeout + : (int32_t)ceil((double)aTimeoutMs / 2.0); +} + +void +MessageChannel::OnChannelConnected(int32_t peer_id) +{ + mWorkerLoop->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &MessageChannel::DispatchOnChannelConnected, + peer_id)); +} + +void +MessageChannel::DispatchOnChannelConnected(int32_t peer_pid) +{ + AssertWorkerThread(); + if (mListener) + mListener->OnChannelConnected(peer_pid); +} + +void +MessageChannel::ReportMessageRouteError(const char* channelName) const +{ + PrintErrorMessage(mSide, channelName, "Need a route"); + mListener->OnProcessingError(MsgRouteError); +} + +void +MessageChannel::ReportConnectionError(const char* aChannelName) const +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + const char* errorMsg = nullptr; + switch (mChannelState) { + case ChannelClosed: + errorMsg = "Closed channel: cannot send/recv"; + break; + case ChannelOpening: + errorMsg = "Opening channel: not yet ready for send/recv"; + break; + case ChannelTimeout: + errorMsg = "Channel timeout: cannot send/recv"; + break; + case ChannelClosing: + errorMsg = "Channel closing: too late to send/recv, messages will be lost"; + break; + case ChannelError: + errorMsg = "Channel error: cannot send/recv"; + break; + + default: + NS_RUNTIMEABORT("unreached"); + } + + PrintErrorMessage(mSide, aChannelName, errorMsg); + + MonitorAutoUnlock unlock(*mMonitor); + mListener->OnProcessingError(MsgDropped); +} + +bool +MessageChannel::MaybeHandleError(Result code, const char* channelName) +{ + if (MsgProcessed == code) + return true; + + const char* errorMsg = nullptr; + switch (code) { + case MsgNotKnown: + errorMsg = "Unknown message: not processed"; + break; + case MsgNotAllowed: + errorMsg = "Message not allowed: cannot be sent/recvd in this state"; + break; + case MsgPayloadError: + errorMsg = "Payload error: message could not be deserialized"; + break; + case MsgProcessingError: + errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)"; + break; + case MsgRouteError: + errorMsg = "Route error: message sent to unknown actor ID"; + break; + case MsgValueError: + errorMsg = "Value error: message was deserialized, but contained an illegal value"; + break; + + default: + NS_RUNTIMEABORT("unknown Result code"); + return false; + } + + PrintErrorMessage(mSide, channelName, errorMsg); + + mListener->OnProcessingError(code); + + return false; +} + +void +MessageChannel::OnChannelErrorFromLink() +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (InterruptStackDepth() > 0) + NotifyWorkerThread(); + + if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply()) + NotifyWorkerThread(); + + if (ChannelClosing != mChannelState) { + if (mAbortOnError) { + NS_RUNTIMEABORT("Aborting on channel error."); + } + mChannelState = ChannelError; + mMonitor->Notify(); + } + + PostErrorNotifyTask(); +} + +void +MessageChannel::NotifyMaybeChannelError() +{ + mMonitor->AssertNotCurrentThreadOwns(); + + // TODO sort out Close() on this side racing with Close() on the other side + if (ChannelClosing == mChannelState) { + // the channel closed, but we received a "Goodbye" message warning us + // about it. no worries + mChannelState = ChannelClosed; + NotifyChannelClosed(); + return; + } + + // Oops, error! Let the listener know about it. + mChannelState = ChannelError; + mListener->OnChannelError(); + Clear(); +} + +void +MessageChannel::OnNotifyMaybeChannelError() +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + mChannelErrorTask = nullptr; + + // OnChannelError holds mMonitor when it posts this task and this + // task cannot be allowed to run until OnChannelError has + // exited. We enforce that order by grabbing the mutex here which + // should only continue once OnChannelError has completed. + { + MonitorAutoLock lock(*mMonitor); + // nothing to do here + } + + if (IsOnCxxStack()) { + mChannelErrorTask = + NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); + // 10 ms delay is completely arbitrary + mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10); + return; + } + + NotifyMaybeChannelError(); +} + +void +MessageChannel::PostErrorNotifyTask() +{ + mMonitor->AssertCurrentThreadOwns(); + + if (mChannelErrorTask) + return; + + // This must be the last code that runs on this thread! + mChannelErrorTask = + NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); + mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask); +} + +// Special async message. +class GoodbyeMessage : public IPC::Message +{ +public: + GoodbyeMessage() : + IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL) + { + } + static bool Read(const Message* msg) { + return true; + } + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Goodbye' message)", aOutf); + } +}; + +void +MessageChannel::SynchronouslyClose() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + mLink->SendClose(); + while (ChannelClosed != mChannelState) + mMonitor->Wait(); +} + +void +MessageChannel::CloseWithError() +{ + AssertWorkerThread(); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected != mChannelState) { + return; + } + SynchronouslyClose(); + mChannelState = ChannelError; + PostErrorNotifyTask(); +} + +void +MessageChannel::Close() +{ + AssertWorkerThread(); + + { + MonitorAutoLock lock(*mMonitor); + + if (ChannelError == mChannelState || ChannelTimeout == mChannelState) { + // See bug 538586: if the listener gets deleted while the + // IO thread's NotifyChannelError event is still enqueued + // and subsequently deletes us, then the error event will + // also be deleted and the listener will never be notified + // of the channel error. + if (mListener) { + MonitorAutoUnlock unlock(*mMonitor); + NotifyMaybeChannelError(); + } + return; + } + + if (ChannelOpening == mChannelState) { + // Mimic CloseWithError(). + SynchronouslyClose(); + mChannelState = ChannelError; + PostErrorNotifyTask(); + return; + } + + if (ChannelConnected != mChannelState) { + // XXX be strict about this until there's a compelling reason + // to relax + NS_RUNTIMEABORT("Close() called on closed channel!"); + } + + // notify the other side that we're about to close our socket + mLink->SendMessage(new GoodbyeMessage()); + SynchronouslyClose(); + } + + NotifyChannelClosed(); +} + +void +MessageChannel::NotifyChannelClosed() +{ + mMonitor->AssertNotCurrentThreadOwns(); + + if (ChannelClosed != mChannelState) + NS_RUNTIMEABORT("channel should have been closed!"); + + // OK, the IO thread just closed the channel normally. Let the + // listener know about it. + mListener->OnChannelClose(); + + Clear(); +} + +void +MessageChannel::DebugAbort(const char* file, int line, const char* cond, + const char* why, + bool reply) const +{ + printf_stderr("###!!! [MessageChannel][%s][%s:%d] " + "Assertion (%s) failed. %s %s\n", + mSide == ChildSide ? "Child" : "Parent", + file, line, cond, + why, + reply ? "(reply)" : ""); + // technically we need the mutex for this, but we're dying anyway + DumpInterruptStack(" "); + printf_stderr(" remote Interrupt stack guess: %lu\n", + mRemoteStackDepthGuess); + printf_stderr(" deferred stack size: %lu\n", + mDeferred.size()); + printf_stderr(" out-of-turn Interrupt replies stack size: %lu\n", + mOutOfTurnReplies.size()); + printf_stderr(" Pending queue size: %lu, front to back:\n", + mPending.size()); + + MessageQueue pending = mPending; + while (!pending.empty()) { + printf_stderr(" [ %s%s ]\n", + pending.front().is_interrupt() ? "intr" : + (pending.front().is_sync() ? "sync" : "async"), + pending.front().is_reply() ? "reply" : ""); + pending.pop_front(); + } + + NS_RUNTIMEABORT(why); +} + +void +MessageChannel::DumpInterruptStack(const char* const pfx) const +{ + NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop, + "The worker thread had better be paused in a debugger!"); + + printf_stderr("%sMessageChannel 'backtrace':\n", pfx); + + // print a python-style backtrace, first frame to last + for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) { + int32_t id; + const char* dir, *sems, *name; + mCxxStackFrames[i].Describe(&id, &dir, &sems, &name); + + printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx, + i, dir, sems, name, id); + } +} + +} // ipc +} // mozilla