michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- michael@0: * vim: sw=4 ts=4 et : michael@0: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "mozilla/ipc/MessageChannel.h" michael@0: #include "mozilla/ipc/ProtocolUtils.h" michael@0: michael@0: #include "mozilla/Assertions.h" michael@0: #include "mozilla/DebugOnly.h" michael@0: #include "mozilla/Move.h" michael@0: #include "nsDebug.h" michael@0: #include "nsISupportsImpl.h" michael@0: michael@0: // Undo the damage done by mozzconf.h michael@0: #undef compress michael@0: michael@0: using namespace mozilla; michael@0: using namespace std; michael@0: michael@0: using mozilla::MonitorAutoLock; michael@0: using mozilla::MonitorAutoUnlock; michael@0: michael@0: template<> michael@0: struct RunnableMethodTraits michael@0: { michael@0: static void RetainCallee(mozilla::ipc::MessageChannel* obj) { } michael@0: static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { } michael@0: }; michael@0: michael@0: #define IPC_ASSERT(_cond, ...) \ michael@0: do { \ michael@0: if (!(_cond)) \ michael@0: DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \ michael@0: } while (0) michael@0: michael@0: namespace mozilla { michael@0: namespace ipc { michael@0: michael@0: const int32_t MessageChannel::kNoTimeout = INT32_MIN; michael@0: michael@0: // static michael@0: bool MessageChannel::sIsPumpingMessages = false; michael@0: michael@0: enum Direction michael@0: { michael@0: IN_MESSAGE, michael@0: OUT_MESSAGE michael@0: }; michael@0: michael@0: michael@0: class MessageChannel::InterruptFrame michael@0: { michael@0: private: michael@0: enum Semantics michael@0: { michael@0: INTR_SEMS, michael@0: SYNC_SEMS, michael@0: ASYNC_SEMS michael@0: }; michael@0: michael@0: public: michael@0: InterruptFrame(Direction direction, const Message* msg) michael@0: : mMessageName(strdup(msg->name())), michael@0: mMessageRoutingId(msg->routing_id()), michael@0: mMesageSemantics(msg->is_interrupt() ? INTR_SEMS : michael@0: msg->is_sync() ? SYNC_SEMS : michael@0: ASYNC_SEMS), michael@0: mDirection(direction), michael@0: mMoved(false) michael@0: { michael@0: MOZ_ASSERT(mMessageName); michael@0: } michael@0: michael@0: InterruptFrame(InterruptFrame&& aOther) michael@0: { michael@0: MOZ_ASSERT(aOther.mMessageName); michael@0: mMessageName = aOther.mMessageName; michael@0: aOther.mMessageName = nullptr; michael@0: aOther.mMoved = true; michael@0: michael@0: mMessageRoutingId = aOther.mMessageRoutingId; michael@0: mMesageSemantics = aOther.mMesageSemantics; michael@0: mDirection = aOther.mDirection; michael@0: } michael@0: michael@0: ~InterruptFrame() michael@0: { michael@0: MOZ_ASSERT_IF(!mMessageName, mMoved); michael@0: michael@0: if (mMessageName) michael@0: free(const_cast(mMessageName)); michael@0: } michael@0: michael@0: InterruptFrame& operator=(InterruptFrame&& aOther) michael@0: { michael@0: MOZ_ASSERT(&aOther != this); michael@0: this->~InterruptFrame(); michael@0: new (this) InterruptFrame(mozilla::Move(aOther)); michael@0: return *this; michael@0: } michael@0: michael@0: bool IsInterruptIncall() const michael@0: { michael@0: return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection; michael@0: } michael@0: michael@0: bool IsInterruptOutcall() const michael@0: { michael@0: return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection; michael@0: } michael@0: michael@0: void Describe(int32_t* id, const char** dir, const char** sems, michael@0: const char** name) const michael@0: { michael@0: *id = mMessageRoutingId; michael@0: *dir = (IN_MESSAGE == mDirection) ? "in" : "out"; michael@0: *sems = (INTR_SEMS == mMesageSemantics) ? "intr" : michael@0: (SYNC_SEMS == mMesageSemantics) ? "sync" : michael@0: "async"; michael@0: *name = mMessageName; michael@0: } michael@0: michael@0: private: michael@0: const char* mMessageName; michael@0: int32_t mMessageRoutingId; michael@0: Semantics mMesageSemantics; michael@0: Direction mDirection; michael@0: DebugOnly mMoved; michael@0: michael@0: // Disable harmful methods. michael@0: InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE; michael@0: InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE; michael@0: }; michael@0: michael@0: class MOZ_STACK_CLASS MessageChannel::CxxStackFrame michael@0: { michael@0: public: michael@0: CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg) michael@0: : mThat(that) michael@0: { michael@0: mThat.AssertWorkerThread(); michael@0: michael@0: if (mThat.mCxxStackFrames.empty()) michael@0: mThat.EnteredCxxStack(); michael@0: michael@0: mThat.mCxxStackFrames.append(InterruptFrame(direction, msg)); michael@0: michael@0: const InterruptFrame& frame = mThat.mCxxStackFrames.back(); michael@0: michael@0: if (frame.IsInterruptIncall()) michael@0: mThat.EnteredCall(); michael@0: michael@0: mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall(); michael@0: } michael@0: michael@0: ~CxxStackFrame() { michael@0: mThat.AssertWorkerThread(); michael@0: michael@0: MOZ_ASSERT(!mThat.mCxxStackFrames.empty()); michael@0: michael@0: bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall(); michael@0: mThat.mCxxStackFrames.shrinkBy(1); michael@0: michael@0: bool exitingStack = mThat.mCxxStackFrames.empty(); michael@0: michael@0: // mListener could have gone away if Close() was called while michael@0: // MessageChannel code was still on the stack michael@0: if (!mThat.mListener) michael@0: return; michael@0: michael@0: if (exitingCall) michael@0: mThat.ExitedCall(); michael@0: michael@0: if (exitingStack) michael@0: mThat.ExitedCxxStack(); michael@0: } michael@0: private: michael@0: MessageChannel& mThat; michael@0: michael@0: // Disable harmful methods. michael@0: CxxStackFrame() MOZ_DELETE; michael@0: CxxStackFrame(const CxxStackFrame&) MOZ_DELETE; michael@0: CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE; michael@0: }; michael@0: michael@0: MessageChannel::MessageChannel(MessageListener *aListener) michael@0: : mListener(aListener->asWeakPtr()), michael@0: mChannelState(ChannelClosed), michael@0: mSide(UnknownSide), michael@0: mLink(nullptr), michael@0: mWorkerLoop(nullptr), michael@0: mChannelErrorTask(nullptr), michael@0: mWorkerLoopID(-1), michael@0: mTimeoutMs(kNoTimeout), michael@0: mInTimeoutSecondHalf(false), michael@0: mNextSeqno(0), michael@0: mPendingSyncReplies(0), michael@0: mPendingUrgentReplies(0), michael@0: mPendingRPCReplies(0), michael@0: mCurrentRPCTransaction(0), michael@0: mDispatchingSyncMessage(false), michael@0: mDispatchingUrgentMessageCount(0), michael@0: mRemoteStackDepthGuess(false), michael@0: mSawInterruptOutMsg(false), michael@0: mAbortOnError(false), michael@0: mFlags(REQUIRE_DEFAULT) michael@0: { michael@0: MOZ_COUNT_CTOR(ipc::MessageChannel); michael@0: michael@0: #ifdef OS_WIN michael@0: mTopFrame = nullptr; michael@0: mIsSyncWaitingOnNonMainThread = false; michael@0: #endif michael@0: michael@0: mDequeueOneTask = new RefCountedTask(NewRunnableMethod( michael@0: this, michael@0: &MessageChannel::OnMaybeDequeueOne)); michael@0: michael@0: #ifdef OS_WIN michael@0: mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr); michael@0: NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!"); michael@0: #endif michael@0: } michael@0: michael@0: MessageChannel::~MessageChannel() michael@0: { michael@0: MOZ_COUNT_DTOR(ipc::MessageChannel); michael@0: IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors"); michael@0: #ifdef OS_WIN michael@0: DebugOnly ok = CloseHandle(mEvent); michael@0: MOZ_ASSERT(ok); michael@0: #endif michael@0: Clear(); michael@0: } michael@0: michael@0: static void michael@0: PrintErrorMessage(Side side, const char* channelName, const char* msg) michael@0: { michael@0: const char *from = (side == ChildSide) michael@0: ? "Child" michael@0: : ((side == ParentSide) ? "Parent" : "Unknown"); michael@0: printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Connected() const michael@0: { michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: // The transport layer allows us to send messages before michael@0: // receiving the "connected" ack from the remote side. michael@0: return (ChannelOpening == mChannelState || ChannelConnected == mChannelState); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::CanSend() const michael@0: { michael@0: MonitorAutoLock lock(*mMonitor); michael@0: return Connected(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::Clear() michael@0: { michael@0: // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and michael@0: // AssertWorkerThread(). michael@0: // michael@0: // Also don't clear mListener. If we clear it, then sending a message michael@0: // through this channel after it's Clear()'ed can cause this process to michael@0: // crash. michael@0: // michael@0: // In practice, mListener owns the channel, so the channel gets deleted michael@0: // before mListener. But just to be safe, mListener is a weak pointer. michael@0: michael@0: mDequeueOneTask->Cancel(); michael@0: michael@0: mWorkerLoop = nullptr; michael@0: delete mLink; michael@0: mLink = nullptr; michael@0: michael@0: if (mChannelErrorTask) { michael@0: mChannelErrorTask->Cancel(); michael@0: mChannelErrorTask = nullptr; michael@0: } michael@0: michael@0: // Free up any memory used by pending messages. michael@0: mPending.clear(); michael@0: mPendingUrgentRequest = nullptr; michael@0: mPendingRPCCall = nullptr; michael@0: mOutOfTurnReplies.clear(); michael@0: while (!mDeferred.empty()) { michael@0: mDeferred.pop(); michael@0: } michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide) michael@0: { michael@0: NS_PRECONDITION(!mLink, "Open() called > once"); michael@0: michael@0: mMonitor = new RefCountedMonitor(); michael@0: mWorkerLoop = MessageLoop::current(); michael@0: mWorkerLoopID = mWorkerLoop->id(); michael@0: michael@0: ProcessLink *link = new ProcessLink(this); michael@0: link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild michael@0: mLink = link; michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide) michael@0: { michael@0: // Opens a connection to another thread in the same process. michael@0: michael@0: // This handshake proceeds as follows: michael@0: // - Let A be the thread initiating the process (either child or parent) michael@0: // and B be the other thread. michael@0: // - A spawns thread for B, obtaining B's message loop michael@0: // - A creates ProtocolChild and ProtocolParent instances. michael@0: // Let PA be the one appropriate to A and PB the side for B. michael@0: // - A invokes PA->Open(PB, ...): michael@0: // - set state to mChannelOpening michael@0: // - this will place a work item in B's worker loop (see next bullet) michael@0: // and then spins until PB->mChannelState becomes mChannelConnected michael@0: // - meanwhile, on PB's worker loop, the work item is removed and: michael@0: // - invokes PB->SlaveOpen(PA, ...): michael@0: // - sets its state and that of PA to Connected michael@0: NS_PRECONDITION(aTargetChan, "Need a target channel"); michael@0: NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed"); michael@0: michael@0: CommonThreadOpenInit(aTargetChan, aSide); michael@0: michael@0: Side oppSide = UnknownSide; michael@0: switch(aSide) { michael@0: case ChildSide: oppSide = ParentSide; break; michael@0: case ParentSide: oppSide = ChildSide; break; michael@0: case UnknownSide: break; michael@0: } michael@0: michael@0: mMonitor = new RefCountedMonitor(); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: mChannelState = ChannelOpening; michael@0: aTargetLoop->PostTask( michael@0: FROM_HERE, michael@0: NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide)); michael@0: michael@0: while (ChannelOpening == mChannelState) michael@0: mMonitor->Wait(); michael@0: NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken"); michael@0: return (ChannelConnected == mChannelState); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide) michael@0: { michael@0: // Invoked when the other side has begun the open. michael@0: NS_PRECONDITION(ChannelClosed == mChannelState, michael@0: "Not currently closed"); michael@0: NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState, michael@0: "Target channel not in the process of opening"); michael@0: michael@0: CommonThreadOpenInit(aTargetChan, aSide); michael@0: mMonitor = aTargetChan->mMonitor; michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState, michael@0: "Target channel not in the process of opening"); michael@0: mChannelState = ChannelConnected; michael@0: aTargetChan->mChannelState = ChannelConnected; michael@0: aTargetChan->mMonitor->Notify(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide) michael@0: { michael@0: mWorkerLoop = MessageLoop::current(); michael@0: mWorkerLoopID = mWorkerLoop->id(); michael@0: mLink = new ThreadLink(this, aTargetChan); michael@0: mSide = aSide; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Echo(Message* aMsg) michael@0: { michael@0: nsAutoPtr msg(aMsg); michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: if (MSG_ROUTING_NONE == msg->routing_id()) { michael@0: ReportMessageRouteError("MessageChannel::Echo"); michael@0: return false; michael@0: } michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel"); michael@0: return false; michael@0: } michael@0: michael@0: mLink->EchoMessage(msg.forget()); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Send(Message* aMsg) michael@0: { michael@0: CxxStackFrame frame(*this, OUT_MESSAGE, aMsg); michael@0: michael@0: nsAutoPtr msg(aMsg); michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: if (MSG_ROUTING_NONE == msg->routing_id()) { michael@0: ReportMessageRouteError("MessageChannel::Send"); michael@0: return false; michael@0: } michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel"); michael@0: return false; michael@0: } michael@0: mLink->SendMessage(msg.forget()); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) michael@0: { michael@0: AssertLinkThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (MSG_ROUTING_NONE == aMsg.routing_id() && michael@0: GOODBYE_MESSAGE_TYPE == aMsg.type()) michael@0: { michael@0: // :TODO: Sort out Close() on this side racing with Close() on the michael@0: // other side michael@0: mChannelState = ChannelClosing; michael@0: if (LoggingEnabled()) { michael@0: printf("NOTE: %s process received `Goodbye', closing down\n", michael@0: (mSide == ChildSide) ? "child" : "parent"); michael@0: } michael@0: return true; michael@0: } michael@0: return false; michael@0: } michael@0: michael@0: void michael@0: MessageChannel::OnMessageReceivedFromLink(const Message& aMsg) michael@0: { michael@0: AssertLinkThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (MaybeInterceptSpecialIOMessage(aMsg)) michael@0: return; michael@0: michael@0: // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply, michael@0: // we know that it needs to be immediately handled to unblock us. michael@0: if ((AwaitingSyncReply() && aMsg.is_sync()) || michael@0: (AwaitingUrgentReply() && aMsg.is_urgent()) || michael@0: (AwaitingRPCReply() && aMsg.is_rpc())) michael@0: { michael@0: mRecvd = new Message(aMsg); michael@0: NotifyWorkerThread(); michael@0: return; michael@0: } michael@0: michael@0: // Urgent messages cannot be compressed. michael@0: MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent()); michael@0: michael@0: bool compress = (aMsg.compress() && !mPending.empty() && michael@0: mPending.back().type() == aMsg.type() && michael@0: mPending.back().routing_id() == aMsg.routing_id()); michael@0: if (compress) { michael@0: // This message type has compression enabled, and the back of the michael@0: // queue was the same message type and routed to the same destination. michael@0: // Replace it with the newer message. michael@0: MOZ_ASSERT(mPending.back().compress()); michael@0: mPending.pop_back(); michael@0: } michael@0: michael@0: bool shouldWakeUp = AwaitingInterruptReply() || michael@0: // Allow incoming RPCs to be processed inside an urgent message. michael@0: (AwaitingUrgentReply() && aMsg.is_rpc()) || michael@0: // Always process urgent messages while blocked. michael@0: ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent()); michael@0: michael@0: // There are four cases we're concerned about, relating to the state of the michael@0: // main thread: michael@0: // michael@0: // (1) We are waiting on a sync|rpc reply - main thread is blocked on the michael@0: // IPC monitor. michael@0: // - If the message is high priority, we wake up the main thread to michael@0: // deliver the message. Otherwise, we leave it in the mPending queue, michael@0: // posting a task to the main event loop, where it will be processed michael@0: // once the synchronous reply has been received. michael@0: // michael@0: // (2) We are waiting on an Interrupt reply - main thread is blocked on the michael@0: // IPC monitor. michael@0: // - Always notify and wake up the main thread. michael@0: // michael@0: // (3) We are not waiting on a reply. michael@0: // - We post a task to the main event loop. michael@0: // michael@0: // Note that, we may notify the main thread even though the monitor is not michael@0: // blocked. This is okay, since we always check for pending events before michael@0: // blocking again. michael@0: michael@0: if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) { michael@0: // If we're receiving an RPC message while blocked on an urgent message, michael@0: // we must defer any messages that were not sent as part of the child michael@0: // answering the urgent message. michael@0: // michael@0: // We must also be sure that we will not accidentally defer any RPC michael@0: // message that was sent while answering an urgent message. Otherwise, michael@0: // we will deadlock. michael@0: // michael@0: // On the parent side, the current transaction can only transition from 0 michael@0: // to an ID, either by us issuing an urgent request while not blocked, or michael@0: // by receiving an RPC request while not blocked. When we unblock, the michael@0: // current transaction is reset to 0. michael@0: // michael@0: // When the child side receives an urgent message, any RPC messages sent michael@0: // before issuing the urgent reply will carry the urgent message's michael@0: // transaction ID. michael@0: // michael@0: // Since AwaitingUrgentReply() implies we are blocked, it also implies michael@0: // that we are within a transaction that will not change until we are michael@0: // completely unblocked (i.e, the transaction has completed). michael@0: if (aMsg.transaction_id() != mCurrentRPCTransaction) michael@0: shouldWakeUp = false; michael@0: } michael@0: michael@0: if (aMsg.is_urgent()) { michael@0: MOZ_ASSERT(!mPendingUrgentRequest); michael@0: mPendingUrgentRequest = new Message(aMsg); michael@0: } else if (aMsg.is_rpc() && shouldWakeUp) { michael@0: // Only use this slot if we need to wake up for an RPC call. Otherwise michael@0: // we treat it like a normal async or sync message. michael@0: MOZ_ASSERT(!mPendingRPCCall); michael@0: mPendingRPCCall = new Message(aMsg); michael@0: } else { michael@0: mPending.push_back(aMsg); michael@0: } michael@0: michael@0: if (shouldWakeUp) { michael@0: // Always wake up Interrupt waiters, sync waiters for urgent messages, michael@0: // RPC waiters for urgent messages, and urgent waiters for RPCs in the michael@0: // same transaction. michael@0: NotifyWorkerThread(); michael@0: } else { michael@0: // Worker thread is either not blocked on a reply, or this is an michael@0: // incoming Interrupt that raced with outgoing sync, and needs to be michael@0: // deferred to a later event-loop iteration. michael@0: if (!compress) { michael@0: // If we compressed away the previous message, we'll re-use michael@0: // its pending task. michael@0: mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); michael@0: } michael@0: } michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Send(Message* aMsg, Message* aReply) michael@0: { michael@0: // Sanity checks. michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: #ifdef OS_WIN michael@0: SyncStackFrame frame(this, false); michael@0: #endif michael@0: michael@0: CxxStackFrame f(*this, OUT_MESSAGE, aMsg); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here"); michael@0: IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant"); michael@0: IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message"); michael@0: IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported"); michael@0: michael@0: AutoEnterPendingReply replies(mPendingSyncReplies); michael@0: if (!SendAndWait(aMsg, aReply)) michael@0: return false; michael@0: michael@0: NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync"); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::UrgentCall(Message* aMsg, Message* aReply) michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child"); michael@0: michael@0: #ifdef OS_WIN michael@0: SyncStackFrame frame(this, false); michael@0: #endif michael@0: michael@0: CxxStackFrame f(*this, OUT_MESSAGE, aMsg); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls"); michael@0: IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends"); michael@0: michael@0: AutoEnterRPCTransaction transact(this); michael@0: aMsg->set_transaction_id(mCurrentRPCTransaction); michael@0: michael@0: AutoEnterPendingReply replies(mPendingUrgentReplies); michael@0: if (!SendAndWait(aMsg, aReply)) michael@0: return false; michael@0: michael@0: NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent"); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::RPCCall(Message* aMsg, Message* aReply) michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent"); michael@0: michael@0: #ifdef OS_WIN michael@0: SyncStackFrame frame(this, false); michael@0: #endif michael@0: michael@0: CxxStackFrame f(*this, OUT_MESSAGE, aMsg); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: AutoEnterRPCTransaction transact(this); michael@0: aMsg->set_transaction_id(mCurrentRPCTransaction); michael@0: michael@0: AutoEnterPendingReply replies(mPendingRPCReplies); michael@0: if (!SendAndWait(aMsg, aReply)) michael@0: return false; michael@0: michael@0: NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply"); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::SendAndWait(Message* aMsg, Message* aReply) michael@0: { michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: nsAutoPtr msg(aMsg); michael@0: michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::SendAndWait"); michael@0: return false; michael@0: } michael@0: michael@0: msg->set_seqno(NextSeqno()); michael@0: michael@0: DebugOnly replySeqno = msg->seqno(); michael@0: DebugOnly replyType = msg->type() + 1; michael@0: michael@0: mLink->SendMessage(msg.forget()); michael@0: michael@0: while (true) { michael@0: // Wait for an event to occur. michael@0: while (true) { michael@0: if (mRecvd || mPendingUrgentRequest || mPendingRPCCall) michael@0: break; michael@0: michael@0: bool maybeTimedOut = !WaitForSyncNotify(); michael@0: michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::SendAndWait"); michael@0: return false; michael@0: } michael@0: michael@0: if (maybeTimedOut && !ShouldContinueFromTimeout()) michael@0: return false; michael@0: } michael@0: michael@0: if (mPendingUrgentRequest && !ProcessPendingUrgentRequest()) michael@0: return false; michael@0: michael@0: if (mPendingRPCCall && !ProcessPendingRPCCall()) michael@0: return false; michael@0: michael@0: if (mRecvd) { michael@0: NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply"); michael@0: michael@0: if (mRecvd->is_reply_error()) { michael@0: mRecvd = nullptr; michael@0: return false; michael@0: } michael@0: michael@0: NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type"); michael@0: NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number"); michael@0: michael@0: *aReply = *mRecvd; michael@0: mRecvd = nullptr; michael@0: return true; michael@0: } michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::Call(Message* aMsg, Message* aReply) michael@0: { michael@0: if (aMsg->is_urgent()) michael@0: return UrgentCall(aMsg, aReply); michael@0: if (aMsg->is_rpc()) michael@0: return RPCCall(aMsg, aReply); michael@0: return InterruptCall(aMsg, aReply); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::InterruptCall(Message* aMsg, Message* aReply) michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: #ifdef OS_WIN michael@0: SyncStackFrame frame(this, true); michael@0: #endif michael@0: michael@0: // This must come before MonitorAutoLock, as its destructor acquires the michael@0: // monitor lock. michael@0: CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::Call"); michael@0: return false; michael@0: } michael@0: michael@0: // Sanity checks. michael@0: IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(), michael@0: "cannot issue Interrupt call whiel blocked on sync or urgent"); michael@0: IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH, michael@0: "violation of sync handler invariant"); michael@0: IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here"); michael@0: michael@0: michael@0: nsAutoPtr msg(aMsg); michael@0: michael@0: msg->set_seqno(NextSeqno()); michael@0: msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess); michael@0: msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth()); michael@0: mInterruptStack.push(*msg); michael@0: mLink->SendMessage(msg.forget()); michael@0: michael@0: while (true) { michael@0: // if a handler invoked by *Dispatch*() spun a nested event michael@0: // loop, and the connection was broken during that loop, we michael@0: // might have already processed the OnError event. if so, michael@0: // trying another loop iteration will be futile because michael@0: // channel state will have been cleared michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::InterruptCall"); michael@0: return false; michael@0: } michael@0: michael@0: // Now might be the time to process a message deferred because of race michael@0: // resolution. michael@0: MaybeUndeferIncall(); michael@0: michael@0: // Wait for an event to occur. michael@0: while (!InterruptEventOccurred()) { michael@0: bool maybeTimedOut = !WaitForInterruptNotify(); michael@0: michael@0: // We might have received a "subtly deferred" message in a nested michael@0: // loop that it's now time to process. michael@0: if (InterruptEventOccurred() || michael@0: (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty()))) michael@0: { michael@0: break; michael@0: } michael@0: michael@0: if (maybeTimedOut && !ShouldContinueFromTimeout()) michael@0: return false; michael@0: } michael@0: michael@0: Message recvd; michael@0: MessageMap::iterator it; michael@0: michael@0: if (mPendingUrgentRequest) { michael@0: recvd = *mPendingUrgentRequest; michael@0: mPendingUrgentRequest = nullptr; michael@0: } else if (mPendingRPCCall) { michael@0: recvd = *mPendingRPCCall; michael@0: mPendingRPCCall = nullptr; michael@0: } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno())) michael@0: != mOutOfTurnReplies.end()) michael@0: { michael@0: recvd = it->second; michael@0: mOutOfTurnReplies.erase(it); michael@0: } else if (!mPending.empty()) { michael@0: recvd = mPending.front(); michael@0: mPending.pop_front(); michael@0: } else { michael@0: // because of subtleties with nested event loops, it's possible michael@0: // that we got here and nothing happened. or, we might have a michael@0: // deferred in-call that needs to be processed. either way, we michael@0: // won't break the inner while loop again until something new michael@0: // happens. michael@0: continue; michael@0: } michael@0: michael@0: // If the message is not Interrupt, we can dispatch it as normal. michael@0: if (!recvd.is_interrupt()) { michael@0: // Other side should be blocked. michael@0: IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked"); michael@0: michael@0: { michael@0: AutoEnterRPCTransaction transaction(this, &recvd); michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: CxxStackFrame frame(*this, IN_MESSAGE, &recvd); michael@0: DispatchMessage(recvd); michael@0: } michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::DispatchMessage"); michael@0: return false; michael@0: } michael@0: continue; michael@0: } michael@0: michael@0: // If the message is an Interrupt reply, either process it as a reply to our michael@0: // call, or add it to the list of out-of-turn replies we've received. michael@0: if (recvd.is_reply()) { michael@0: IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack"); michael@0: michael@0: // If this is not a reply the call we've initiated, add it to our michael@0: // out-of-turn replies and keep polling for events. michael@0: { michael@0: const Message &outcall = mInterruptStack.top(); michael@0: michael@0: // Note, In the parent, sequence numbers increase from 0, and michael@0: // in the child, they decrease from 0. michael@0: if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) || michael@0: (mSide != ChildSide && recvd.seqno() < outcall.seqno())) michael@0: { michael@0: mOutOfTurnReplies[recvd.seqno()] = recvd; michael@0: continue; michael@0: } michael@0: michael@0: IPC_ASSERT(recvd.is_reply_error() || michael@0: (recvd.type() == (outcall.type() + 1) && michael@0: recvd.seqno() == outcall.seqno()), michael@0: "somebody's misbehavin'", true); michael@0: } michael@0: michael@0: // We received a reply to our most recent outstanding call. Pop michael@0: // this frame and return the reply. michael@0: mInterruptStack.pop(); michael@0: michael@0: if (!recvd.is_reply_error()) { michael@0: *aReply = recvd; michael@0: } michael@0: michael@0: // If we have no more pending out calls waiting on replies, then michael@0: // the reply queue should be empty. michael@0: IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(), michael@0: "still have pending replies with no pending out-calls", michael@0: true); michael@0: michael@0: return !recvd.is_reply_error(); michael@0: } michael@0: michael@0: // Dispatch an Interrupt in-call. Snapshot the current stack depth while we michael@0: // own the monitor. michael@0: size_t stackDepth = InterruptStackDepth(); michael@0: { michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: michael@0: CxxStackFrame frame(*this, IN_MESSAGE, &recvd); michael@0: DispatchInterruptMessage(recvd, stackDepth); michael@0: } michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::DispatchInterruptMessage"); michael@0: return false; michael@0: } michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::InterruptEventOccurred() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop"); michael@0: michael@0: return (!Connected() || michael@0: !mPending.empty() || michael@0: mPendingUrgentRequest || michael@0: mPendingRPCCall || michael@0: (!mOutOfTurnReplies.empty() && michael@0: mOutOfTurnReplies.find(mInterruptStack.top().seqno()) != michael@0: mOutOfTurnReplies.end())); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::ProcessPendingUrgentRequest() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: // Note that it is possible we could have sent a sync message at michael@0: // the same time the parent process sent an urgent message, and michael@0: // therefore mPendingUrgentRequest is set *and* mRecvd is set as michael@0: // well, because the link thread received both before the worker michael@0: // thread woke up. michael@0: // michael@0: // In this case, we process the urgent message first, but we need michael@0: // to save the reply. michael@0: nsAutoPtr savedReply(mRecvd.forget()); michael@0: michael@0: // We're the child process. We should not be receiving RPC calls. michael@0: IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call"); michael@0: michael@0: nsAutoPtr recvd(mPendingUrgentRequest.forget()); michael@0: { michael@0: // In order to send the parent RPC messages and guarantee it will michael@0: // wake up, we must re-use its transaction. michael@0: AutoEnterRPCTransaction transaction(this, recvd); michael@0: michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: DispatchUrgentMessage(*recvd); michael@0: } michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::DispatchUrgentMessage"); michael@0: return false; michael@0: } michael@0: michael@0: // In between having dispatched our reply to the parent process, and michael@0: // re-acquiring the monitor, the parent process could have already michael@0: // processed that reply and sent the reply to our sync message. If so, michael@0: // our saved reply should be empty. michael@0: IPC_ASSERT(!mRecvd || !savedReply, "unknown reply"); michael@0: if (!mRecvd) michael@0: mRecvd = savedReply.forget(); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::ProcessPendingRPCCall() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: // See comment above re: mRecvd replies and incoming calls. michael@0: nsAutoPtr savedReply(mRecvd.forget()); michael@0: michael@0: IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message"); michael@0: michael@0: nsAutoPtr recvd(mPendingRPCCall.forget()); michael@0: { michael@0: // If we are not currently in a transaction, this will begin one, michael@0: // and the link thread will not wake us up for any RPC messages not michael@0: // apart of this transaction. If we are already in a transaction, michael@0: // then this will assert that we're still in the same transaction. michael@0: AutoEnterRPCTransaction transaction(this, recvd); michael@0: michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: DispatchRPCMessage(*recvd); michael@0: } michael@0: if (!Connected()) { michael@0: ReportConnectionError("MessageChannel::DispatchRPCMessage"); michael@0: return false; michael@0: } michael@0: michael@0: // In between having dispatched our reply to the parent process, and michael@0: // re-acquiring the monitor, the parent process could have already michael@0: // processed that reply and sent the reply to our sync message. If so, michael@0: // our saved reply should be empty. michael@0: IPC_ASSERT(!mRecvd || !savedReply, "unknown reply"); michael@0: if (!mRecvd) michael@0: mRecvd = savedReply.forget(); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::DequeueOne(Message *recvd) michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (!Connected()) { michael@0: ReportConnectionError("OnMaybeDequeueOne"); michael@0: return false; michael@0: } michael@0: michael@0: if (mPendingUrgentRequest) { michael@0: *recvd = *mPendingUrgentRequest; michael@0: mPendingUrgentRequest = nullptr; michael@0: return true; michael@0: } michael@0: michael@0: if (mPendingRPCCall) { michael@0: *recvd = *mPendingRPCCall; michael@0: mPendingRPCCall = nullptr; michael@0: return true; michael@0: } michael@0: michael@0: if (!mDeferred.empty()) michael@0: MaybeUndeferIncall(); michael@0: michael@0: if (mPending.empty()) michael@0: return false; michael@0: michael@0: *recvd = mPending.front(); michael@0: mPending.pop_front(); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::OnMaybeDequeueOne() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: Message recvd; michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (!DequeueOne(&recvd)) michael@0: return false; michael@0: michael@0: if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) { michael@0: // We probably just received a reply in a nested loop for an michael@0: // Interrupt call sent before entering that loop. michael@0: mOutOfTurnReplies[recvd.seqno()] = recvd; michael@0: return false; michael@0: } michael@0: michael@0: { michael@0: // We should not be in a transaction yet if we're not blocked. michael@0: MOZ_ASSERT(mCurrentRPCTransaction == 0); michael@0: AutoEnterRPCTransaction transaction(this, &recvd); michael@0: michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: michael@0: CxxStackFrame frame(*this, IN_MESSAGE, &recvd); michael@0: DispatchMessage(recvd); michael@0: } michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchMessage(const Message &aMsg) michael@0: { michael@0: if (aMsg.is_sync()) michael@0: DispatchSyncMessage(aMsg); michael@0: else if (aMsg.is_urgent()) michael@0: DispatchUrgentMessage(aMsg); michael@0: else if (aMsg.is_interrupt()) michael@0: DispatchInterruptMessage(aMsg, 0); michael@0: else if (aMsg.is_rpc()) michael@0: DispatchRPCMessage(aMsg); michael@0: else michael@0: DispatchAsyncMessage(aMsg); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchSyncMessage(const Message& aMsg) michael@0: { michael@0: AssertWorkerThread(); michael@0: michael@0: Message *reply = nullptr; michael@0: michael@0: mDispatchingSyncMessage = true; michael@0: Result rv = mListener->OnMessageReceived(aMsg, reply); michael@0: mDispatchingSyncMessage = false; michael@0: michael@0: if (!MaybeHandleError(rv, "DispatchSyncMessage")) { michael@0: delete reply; michael@0: reply = new Message(); michael@0: reply->set_sync(); michael@0: reply->set_reply(); michael@0: reply->set_reply_error(); michael@0: } michael@0: reply->set_seqno(aMsg.seqno()); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (ChannelConnected == mChannelState) michael@0: mLink->SendMessage(reply); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchUrgentMessage(const Message& aMsg) michael@0: { michael@0: AssertWorkerThread(); michael@0: MOZ_ASSERT(aMsg.is_urgent()); michael@0: michael@0: Message *reply = nullptr; michael@0: michael@0: mDispatchingUrgentMessageCount++; michael@0: Result rv = mListener->OnCallReceived(aMsg, reply); michael@0: mDispatchingUrgentMessageCount--; michael@0: michael@0: if (!MaybeHandleError(rv, "DispatchUrgentMessage")) { michael@0: delete reply; michael@0: reply = new Message(); michael@0: reply->set_urgent(); michael@0: reply->set_reply(); michael@0: reply->set_reply_error(); michael@0: } michael@0: reply->set_seqno(aMsg.seqno()); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (ChannelConnected == mChannelState) michael@0: mLink->SendMessage(reply); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchRPCMessage(const Message& aMsg) michael@0: { michael@0: AssertWorkerThread(); michael@0: MOZ_ASSERT(aMsg.is_rpc()); michael@0: michael@0: Message *reply = nullptr; michael@0: michael@0: if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) { michael@0: delete reply; michael@0: reply = new Message(); michael@0: reply->set_rpc(); michael@0: reply->set_reply(); michael@0: reply->set_reply_error(); michael@0: } michael@0: reply->set_seqno(aMsg.seqno()); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (ChannelConnected == mChannelState) michael@0: mLink->SendMessage(reply); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchAsyncMessage(const Message& aMsg) michael@0: { michael@0: AssertWorkerThread(); michael@0: MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent()); michael@0: michael@0: if (aMsg.routing_id() == MSG_ROUTING_NONE) { michael@0: NS_RUNTIMEABORT("unhandled special message!"); michael@0: } michael@0: michael@0: MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage"); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth) michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type"); michael@0: michael@0: // Race detection: see the long comment near mRemoteStackDepthGuess in michael@0: // MessageChannel.h. "Remote" stack depth means our side, and "local" means michael@0: // the other side. michael@0: if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) { michael@0: // Interrupt in-calls have raced. The winner, if there is one, gets to defer michael@0: // processing of the other side's in-call. michael@0: bool defer; michael@0: const char* winner; michael@0: switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(), michael@0: (mSide != ChildSide) ? mInterruptStack.top() : aMsg)) michael@0: { michael@0: case RIPChildWins: michael@0: winner = "child"; michael@0: defer = (mSide == ChildSide); michael@0: break; michael@0: case RIPParentWins: michael@0: winner = "parent"; michael@0: defer = (mSide != ChildSide); michael@0: break; michael@0: case RIPError: michael@0: NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy"); michael@0: return; michael@0: default: michael@0: NS_RUNTIMEABORT("not reached"); michael@0: return; michael@0: } michael@0: michael@0: if (LoggingEnabled()) { michael@0: printf_stderr(" (%s: %s won, so we're%sdeferring)\n", michael@0: (mSide == ChildSide) ? "child" : "parent", michael@0: winner, michael@0: defer ? " " : " not "); michael@0: } michael@0: michael@0: if (defer) { michael@0: // We now know the other side's stack has one more frame michael@0: // than we thought. michael@0: ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred() michael@0: mDeferred.push(aMsg); michael@0: return; michael@0: } michael@0: michael@0: // We "lost" and need to process the other side's in-call. Don't need michael@0: // to fix up the mRemoteStackDepthGuess here, because we're just about michael@0: // to increment it in DispatchCall(), which will make it correct again. michael@0: } michael@0: michael@0: #ifdef OS_WIN michael@0: SyncStackFrame frame(this, true); michael@0: #endif michael@0: michael@0: Message* reply = nullptr; michael@0: michael@0: ++mRemoteStackDepthGuess; michael@0: Result rv = mListener->OnCallReceived(aMsg, reply); michael@0: --mRemoteStackDepthGuess; michael@0: michael@0: if (!MaybeHandleError(rv, "DispatchInterruptMessage")) { michael@0: delete reply; michael@0: reply = new Message(); michael@0: reply->set_interrupt(); michael@0: reply->set_reply(); michael@0: reply->set_reply_error(); michael@0: } michael@0: reply->set_seqno(aMsg.seqno()); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (ChannelConnected == mChannelState) michael@0: mLink->SendMessage(reply); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::MaybeUndeferIncall() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (mDeferred.empty()) michael@0: return; michael@0: michael@0: size_t stackDepth = InterruptStackDepth(); michael@0: michael@0: // the other side can only *under*-estimate our actual stack depth michael@0: IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth, michael@0: "fatal logic error"); michael@0: michael@0: if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth)) michael@0: return; michael@0: michael@0: // maybe time to process this message michael@0: Message call = mDeferred.top(); michael@0: mDeferred.pop(); michael@0: michael@0: // fix up fudge factor we added to account for race michael@0: IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error"); michael@0: --mRemoteStackDepthGuess; michael@0: michael@0: mPending.push_back(call); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::FlushPendingInterruptQueue() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: { michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: if (mDeferred.empty()) { michael@0: if (mPending.empty()) michael@0: return; michael@0: michael@0: const Message& last = mPending.back(); michael@0: if (!last.is_interrupt() || last.is_reply()) michael@0: return; michael@0: } michael@0: } michael@0: michael@0: while (OnMaybeDequeueOne()); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::ExitedCxxStack() michael@0: { michael@0: mListener->OnExitedCxxStack(); michael@0: if (mSawInterruptOutMsg) { michael@0: MonitorAutoLock lock(*mMonitor); michael@0: // see long comment in OnMaybeDequeueOne() michael@0: EnqueuePendingMessages(); michael@0: mSawInterruptOutMsg = false; michael@0: } michael@0: } michael@0: michael@0: void michael@0: MessageChannel::EnqueuePendingMessages() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: MaybeUndeferIncall(); michael@0: michael@0: for (size_t i = 0; i < mDeferred.size(); ++i) { michael@0: mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); michael@0: } michael@0: michael@0: // XXX performance tuning knob: could process all or k pending michael@0: // messages here, rather than enqueuing for later processing michael@0: michael@0: for (size_t i = 0; i < mPending.size(); ++i) { michael@0: mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask)); michael@0: } michael@0: } michael@0: michael@0: static inline bool michael@0: IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout) michael@0: { michael@0: return (aTimeout != PR_INTERVAL_NO_TIMEOUT) && michael@0: (aTimeout <= (PR_IntervalNow() - aStart)); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::WaitResponse(bool aWaitTimedOut) michael@0: { michael@0: if (aWaitTimedOut) { michael@0: if (mInTimeoutSecondHalf) { michael@0: // We've really timed out this time. michael@0: return false; michael@0: } michael@0: // Try a second time. michael@0: mInTimeoutSecondHalf = true; michael@0: } else { michael@0: mInTimeoutSecondHalf = false; michael@0: } michael@0: return true; michael@0: } michael@0: michael@0: #ifndef OS_WIN michael@0: bool michael@0: MessageChannel::WaitForSyncNotify() michael@0: { michael@0: PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ? michael@0: PR_INTERVAL_NO_TIMEOUT : michael@0: PR_MillisecondsToInterval(mTimeoutMs); michael@0: // XXX could optimize away this syscall for "no timeout" case if desired michael@0: PRIntervalTime waitStart = PR_IntervalNow(); michael@0: michael@0: mMonitor->Wait(timeout); michael@0: michael@0: // If the timeout didn't expire, we know we received an event. The michael@0: // converse is not true. michael@0: return WaitResponse(IsTimeoutExpired(waitStart, timeout)); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::WaitForInterruptNotify() michael@0: { michael@0: return WaitForSyncNotify(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::NotifyWorkerThread() michael@0: { michael@0: mMonitor->Notify(); michael@0: } michael@0: #endif michael@0: michael@0: bool michael@0: MessageChannel::ShouldContinueFromTimeout() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: bool cont; michael@0: { michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: cont = mListener->OnReplyTimeout(); michael@0: } michael@0: michael@0: static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN; michael@0: michael@0: if (sDebuggingChildren == UNKNOWN) { michael@0: sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING; michael@0: } michael@0: if (sDebuggingChildren == DEBUGGING) { michael@0: return true; michael@0: } michael@0: michael@0: if (!cont) { michael@0: // NB: there's a sublety here. If parents were allowed to send sync michael@0: // messages to children, then it would be possible for this michael@0: // synchronous close-on-timeout to race with async |OnMessageReceived| michael@0: // tasks arriving from the child, posted to the worker thread's event michael@0: // loop. This would complicate cleanup of the *Channel. But since michael@0: // IPDL forbids this (and since it doesn't support children timing out michael@0: // on parents), the parent can only block on interrupt messages to the child, michael@0: // and in that case arriving async messages are enqueued to the interrupt michael@0: // channel's special queue. They're then ignored because the channel michael@0: // state changes to ChannelTimeout (i.e. !Connected). michael@0: SynchronouslyClose(); michael@0: mChannelState = ChannelTimeout; michael@0: } michael@0: michael@0: return cont; michael@0: } michael@0: michael@0: void michael@0: MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) michael@0: { michael@0: // Set channel timeout value. Since this is broken up into michael@0: // two period, the minimum timeout value is 2ms. michael@0: AssertWorkerThread(); michael@0: mTimeoutMs = (aTimeoutMs <= 0) michael@0: ? kNoTimeout michael@0: : (int32_t)ceil((double)aTimeoutMs / 2.0); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::OnChannelConnected(int32_t peer_id) michael@0: { michael@0: mWorkerLoop->PostTask( michael@0: FROM_HERE, michael@0: NewRunnableMethod(this, michael@0: &MessageChannel::DispatchOnChannelConnected, michael@0: peer_id)); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DispatchOnChannelConnected(int32_t peer_pid) michael@0: { michael@0: AssertWorkerThread(); michael@0: if (mListener) michael@0: mListener->OnChannelConnected(peer_pid); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::ReportMessageRouteError(const char* channelName) const michael@0: { michael@0: PrintErrorMessage(mSide, channelName, "Need a route"); michael@0: mListener->OnProcessingError(MsgRouteError); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::ReportConnectionError(const char* aChannelName) const michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: const char* errorMsg = nullptr; michael@0: switch (mChannelState) { michael@0: case ChannelClosed: michael@0: errorMsg = "Closed channel: cannot send/recv"; michael@0: break; michael@0: case ChannelOpening: michael@0: errorMsg = "Opening channel: not yet ready for send/recv"; michael@0: break; michael@0: case ChannelTimeout: michael@0: errorMsg = "Channel timeout: cannot send/recv"; michael@0: break; michael@0: case ChannelClosing: michael@0: errorMsg = "Channel closing: too late to send/recv, messages will be lost"; michael@0: break; michael@0: case ChannelError: michael@0: errorMsg = "Channel error: cannot send/recv"; michael@0: break; michael@0: michael@0: default: michael@0: NS_RUNTIMEABORT("unreached"); michael@0: } michael@0: michael@0: PrintErrorMessage(mSide, aChannelName, errorMsg); michael@0: michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: mListener->OnProcessingError(MsgDropped); michael@0: } michael@0: michael@0: bool michael@0: MessageChannel::MaybeHandleError(Result code, const char* channelName) michael@0: { michael@0: if (MsgProcessed == code) michael@0: return true; michael@0: michael@0: const char* errorMsg = nullptr; michael@0: switch (code) { michael@0: case MsgNotKnown: michael@0: errorMsg = "Unknown message: not processed"; michael@0: break; michael@0: case MsgNotAllowed: michael@0: errorMsg = "Message not allowed: cannot be sent/recvd in this state"; michael@0: break; michael@0: case MsgPayloadError: michael@0: errorMsg = "Payload error: message could not be deserialized"; michael@0: break; michael@0: case MsgProcessingError: michael@0: errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)"; michael@0: break; michael@0: case MsgRouteError: michael@0: errorMsg = "Route error: message sent to unknown actor ID"; michael@0: break; michael@0: case MsgValueError: michael@0: errorMsg = "Value error: message was deserialized, but contained an illegal value"; michael@0: break; michael@0: michael@0: default: michael@0: NS_RUNTIMEABORT("unknown Result code"); michael@0: return false; michael@0: } michael@0: michael@0: PrintErrorMessage(mSide, channelName, errorMsg); michael@0: michael@0: mListener->OnProcessingError(code); michael@0: michael@0: return false; michael@0: } michael@0: michael@0: void michael@0: MessageChannel::OnChannelErrorFromLink() michael@0: { michael@0: AssertLinkThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (InterruptStackDepth() > 0) michael@0: NotifyWorkerThread(); michael@0: michael@0: if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply()) michael@0: NotifyWorkerThread(); michael@0: michael@0: if (ChannelClosing != mChannelState) { michael@0: if (mAbortOnError) { michael@0: NS_RUNTIMEABORT("Aborting on channel error."); michael@0: } michael@0: mChannelState = ChannelError; michael@0: mMonitor->Notify(); michael@0: } michael@0: michael@0: PostErrorNotifyTask(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::NotifyMaybeChannelError() michael@0: { michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: // TODO sort out Close() on this side racing with Close() on the other side michael@0: if (ChannelClosing == mChannelState) { michael@0: // the channel closed, but we received a "Goodbye" message warning us michael@0: // about it. no worries michael@0: mChannelState = ChannelClosed; michael@0: NotifyChannelClosed(); michael@0: return; michael@0: } michael@0: michael@0: // Oops, error! Let the listener know about it. michael@0: mChannelState = ChannelError; michael@0: mListener->OnChannelError(); michael@0: Clear(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::OnNotifyMaybeChannelError() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: mChannelErrorTask = nullptr; michael@0: michael@0: // OnChannelError holds mMonitor when it posts this task and this michael@0: // task cannot be allowed to run until OnChannelError has michael@0: // exited. We enforce that order by grabbing the mutex here which michael@0: // should only continue once OnChannelError has completed. michael@0: { michael@0: MonitorAutoLock lock(*mMonitor); michael@0: // nothing to do here michael@0: } michael@0: michael@0: if (IsOnCxxStack()) { michael@0: mChannelErrorTask = michael@0: NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); michael@0: // 10 ms delay is completely arbitrary michael@0: mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10); michael@0: return; michael@0: } michael@0: michael@0: NotifyMaybeChannelError(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::PostErrorNotifyTask() michael@0: { michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: michael@0: if (mChannelErrorTask) michael@0: return; michael@0: michael@0: // This must be the last code that runs on this thread! michael@0: mChannelErrorTask = michael@0: NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); michael@0: mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask); michael@0: } michael@0: michael@0: // Special async message. michael@0: class GoodbyeMessage : public IPC::Message michael@0: { michael@0: public: michael@0: GoodbyeMessage() : michael@0: IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL) michael@0: { michael@0: } michael@0: static bool Read(const Message* msg) { michael@0: return true; michael@0: } michael@0: void Log(const std::string& aPrefix, FILE* aOutf) const { michael@0: fputs("(special `Goodbye' message)", aOutf); michael@0: } michael@0: }; michael@0: michael@0: void michael@0: MessageChannel::SynchronouslyClose() michael@0: { michael@0: AssertWorkerThread(); michael@0: mMonitor->AssertCurrentThreadOwns(); michael@0: mLink->SendClose(); michael@0: while (ChannelClosed != mChannelState) michael@0: mMonitor->Wait(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::CloseWithError() michael@0: { michael@0: AssertWorkerThread(); michael@0: michael@0: MonitorAutoLock lock(*mMonitor); michael@0: if (ChannelConnected != mChannelState) { michael@0: return; michael@0: } michael@0: SynchronouslyClose(); michael@0: mChannelState = ChannelError; michael@0: PostErrorNotifyTask(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::Close() michael@0: { michael@0: AssertWorkerThread(); michael@0: michael@0: { michael@0: MonitorAutoLock lock(*mMonitor); michael@0: michael@0: if (ChannelError == mChannelState || ChannelTimeout == mChannelState) { michael@0: // See bug 538586: if the listener gets deleted while the michael@0: // IO thread's NotifyChannelError event is still enqueued michael@0: // and subsequently deletes us, then the error event will michael@0: // also be deleted and the listener will never be notified michael@0: // of the channel error. michael@0: if (mListener) { michael@0: MonitorAutoUnlock unlock(*mMonitor); michael@0: NotifyMaybeChannelError(); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: if (ChannelOpening == mChannelState) { michael@0: // Mimic CloseWithError(). michael@0: SynchronouslyClose(); michael@0: mChannelState = ChannelError; michael@0: PostErrorNotifyTask(); michael@0: return; michael@0: } michael@0: michael@0: if (ChannelConnected != mChannelState) { michael@0: // XXX be strict about this until there's a compelling reason michael@0: // to relax michael@0: NS_RUNTIMEABORT("Close() called on closed channel!"); michael@0: } michael@0: michael@0: // notify the other side that we're about to close our socket michael@0: mLink->SendMessage(new GoodbyeMessage()); michael@0: SynchronouslyClose(); michael@0: } michael@0: michael@0: NotifyChannelClosed(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::NotifyChannelClosed() michael@0: { michael@0: mMonitor->AssertNotCurrentThreadOwns(); michael@0: michael@0: if (ChannelClosed != mChannelState) michael@0: NS_RUNTIMEABORT("channel should have been closed!"); michael@0: michael@0: // OK, the IO thread just closed the channel normally. Let the michael@0: // listener know about it. michael@0: mListener->OnChannelClose(); michael@0: michael@0: Clear(); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DebugAbort(const char* file, int line, const char* cond, michael@0: const char* why, michael@0: bool reply) const michael@0: { michael@0: printf_stderr("###!!! [MessageChannel][%s][%s:%d] " michael@0: "Assertion (%s) failed. %s %s\n", michael@0: mSide == ChildSide ? "Child" : "Parent", michael@0: file, line, cond, michael@0: why, michael@0: reply ? "(reply)" : ""); michael@0: // technically we need the mutex for this, but we're dying anyway michael@0: DumpInterruptStack(" "); michael@0: printf_stderr(" remote Interrupt stack guess: %lu\n", michael@0: mRemoteStackDepthGuess); michael@0: printf_stderr(" deferred stack size: %lu\n", michael@0: mDeferred.size()); michael@0: printf_stderr(" out-of-turn Interrupt replies stack size: %lu\n", michael@0: mOutOfTurnReplies.size()); michael@0: printf_stderr(" Pending queue size: %lu, front to back:\n", michael@0: mPending.size()); michael@0: michael@0: MessageQueue pending = mPending; michael@0: while (!pending.empty()) { michael@0: printf_stderr(" [ %s%s ]\n", michael@0: pending.front().is_interrupt() ? "intr" : michael@0: (pending.front().is_sync() ? "sync" : "async"), michael@0: pending.front().is_reply() ? "reply" : ""); michael@0: pending.pop_front(); michael@0: } michael@0: michael@0: NS_RUNTIMEABORT(why); michael@0: } michael@0: michael@0: void michael@0: MessageChannel::DumpInterruptStack(const char* const pfx) const michael@0: { michael@0: NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop, michael@0: "The worker thread had better be paused in a debugger!"); michael@0: michael@0: printf_stderr("%sMessageChannel 'backtrace':\n", pfx); michael@0: michael@0: // print a python-style backtrace, first frame to last michael@0: for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) { michael@0: int32_t id; michael@0: const char* dir, *sems, *name; michael@0: mCxxStackFrames[i].Describe(&id, &dir, &sems, &name); michael@0: michael@0: printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx, michael@0: i, dir, sems, name, id); michael@0: } michael@0: } michael@0: michael@0: } // ipc michael@0: } // mozilla