ipc/glue/MessageChannel.cpp

changeset 0
6474c204b198
equal deleted inserted replaced
-1:000000000000 0:e4e500495428
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 * vim: sw=4 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7
8 #include "mozilla/ipc/MessageChannel.h"
9 #include "mozilla/ipc/ProtocolUtils.h"
10
11 #include "mozilla/Assertions.h"
12 #include "mozilla/DebugOnly.h"
13 #include "mozilla/Move.h"
14 #include "nsDebug.h"
15 #include "nsISupportsImpl.h"
16
17 // Undo the damage done by mozzconf.h
18 #undef compress
19
20 using namespace mozilla;
21 using namespace std;
22
23 using mozilla::MonitorAutoLock;
24 using mozilla::MonitorAutoUnlock;
25
26 template<>
27 struct RunnableMethodTraits<mozilla::ipc::MessageChannel>
28 {
29 static void RetainCallee(mozilla::ipc::MessageChannel* obj) { }
30 static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { }
31 };
32
33 #define IPC_ASSERT(_cond, ...) \
34 do { \
35 if (!(_cond)) \
36 DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
37 } while (0)
38
39 namespace mozilla {
40 namespace ipc {
41
42 const int32_t MessageChannel::kNoTimeout = INT32_MIN;
43
44 // static
45 bool MessageChannel::sIsPumpingMessages = false;
46
47 enum Direction
48 {
49 IN_MESSAGE,
50 OUT_MESSAGE
51 };
52
53
54 class MessageChannel::InterruptFrame
55 {
56 private:
57 enum Semantics
58 {
59 INTR_SEMS,
60 SYNC_SEMS,
61 ASYNC_SEMS
62 };
63
64 public:
65 InterruptFrame(Direction direction, const Message* msg)
66 : mMessageName(strdup(msg->name())),
67 mMessageRoutingId(msg->routing_id()),
68 mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
69 msg->is_sync() ? SYNC_SEMS :
70 ASYNC_SEMS),
71 mDirection(direction),
72 mMoved(false)
73 {
74 MOZ_ASSERT(mMessageName);
75 }
76
77 InterruptFrame(InterruptFrame&& aOther)
78 {
79 MOZ_ASSERT(aOther.mMessageName);
80 mMessageName = aOther.mMessageName;
81 aOther.mMessageName = nullptr;
82 aOther.mMoved = true;
83
84 mMessageRoutingId = aOther.mMessageRoutingId;
85 mMesageSemantics = aOther.mMesageSemantics;
86 mDirection = aOther.mDirection;
87 }
88
89 ~InterruptFrame()
90 {
91 MOZ_ASSERT_IF(!mMessageName, mMoved);
92
93 if (mMessageName)
94 free(const_cast<char*>(mMessageName));
95 }
96
97 InterruptFrame& operator=(InterruptFrame&& aOther)
98 {
99 MOZ_ASSERT(&aOther != this);
100 this->~InterruptFrame();
101 new (this) InterruptFrame(mozilla::Move(aOther));
102 return *this;
103 }
104
105 bool IsInterruptIncall() const
106 {
107 return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
108 }
109
110 bool IsInterruptOutcall() const
111 {
112 return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
113 }
114
115 void Describe(int32_t* id, const char** dir, const char** sems,
116 const char** name) const
117 {
118 *id = mMessageRoutingId;
119 *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
120 *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
121 (SYNC_SEMS == mMesageSemantics) ? "sync" :
122 "async";
123 *name = mMessageName;
124 }
125
126 private:
127 const char* mMessageName;
128 int32_t mMessageRoutingId;
129 Semantics mMesageSemantics;
130 Direction mDirection;
131 DebugOnly<bool> mMoved;
132
133 // Disable harmful methods.
134 InterruptFrame(const InterruptFrame& aOther) MOZ_DELETE;
135 InterruptFrame& operator=(const InterruptFrame&) MOZ_DELETE;
136 };
137
138 class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
139 {
140 public:
141 CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
142 : mThat(that)
143 {
144 mThat.AssertWorkerThread();
145
146 if (mThat.mCxxStackFrames.empty())
147 mThat.EnteredCxxStack();
148
149 mThat.mCxxStackFrames.append(InterruptFrame(direction, msg));
150
151 const InterruptFrame& frame = mThat.mCxxStackFrames.back();
152
153 if (frame.IsInterruptIncall())
154 mThat.EnteredCall();
155
156 mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
157 }
158
159 ~CxxStackFrame() {
160 mThat.AssertWorkerThread();
161
162 MOZ_ASSERT(!mThat.mCxxStackFrames.empty());
163
164 bool exitingCall = mThat.mCxxStackFrames.back().IsInterruptIncall();
165 mThat.mCxxStackFrames.shrinkBy(1);
166
167 bool exitingStack = mThat.mCxxStackFrames.empty();
168
169 // mListener could have gone away if Close() was called while
170 // MessageChannel code was still on the stack
171 if (!mThat.mListener)
172 return;
173
174 if (exitingCall)
175 mThat.ExitedCall();
176
177 if (exitingStack)
178 mThat.ExitedCxxStack();
179 }
180 private:
181 MessageChannel& mThat;
182
183 // Disable harmful methods.
184 CxxStackFrame() MOZ_DELETE;
185 CxxStackFrame(const CxxStackFrame&) MOZ_DELETE;
186 CxxStackFrame& operator=(const CxxStackFrame&) MOZ_DELETE;
187 };
188
189 MessageChannel::MessageChannel(MessageListener *aListener)
190 : mListener(aListener->asWeakPtr()),
191 mChannelState(ChannelClosed),
192 mSide(UnknownSide),
193 mLink(nullptr),
194 mWorkerLoop(nullptr),
195 mChannelErrorTask(nullptr),
196 mWorkerLoopID(-1),
197 mTimeoutMs(kNoTimeout),
198 mInTimeoutSecondHalf(false),
199 mNextSeqno(0),
200 mPendingSyncReplies(0),
201 mPendingUrgentReplies(0),
202 mPendingRPCReplies(0),
203 mCurrentRPCTransaction(0),
204 mDispatchingSyncMessage(false),
205 mDispatchingUrgentMessageCount(0),
206 mRemoteStackDepthGuess(false),
207 mSawInterruptOutMsg(false),
208 mAbortOnError(false),
209 mFlags(REQUIRE_DEFAULT)
210 {
211 MOZ_COUNT_CTOR(ipc::MessageChannel);
212
213 #ifdef OS_WIN
214 mTopFrame = nullptr;
215 mIsSyncWaitingOnNonMainThread = false;
216 #endif
217
218 mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
219 this,
220 &MessageChannel::OnMaybeDequeueOne));
221
222 #ifdef OS_WIN
223 mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
224 NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
225 #endif
226 }
227
228 MessageChannel::~MessageChannel()
229 {
230 MOZ_COUNT_DTOR(ipc::MessageChannel);
231 IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
232 #ifdef OS_WIN
233 DebugOnly<BOOL> ok = CloseHandle(mEvent);
234 MOZ_ASSERT(ok);
235 #endif
236 Clear();
237 }
238
239 static void
240 PrintErrorMessage(Side side, const char* channelName, const char* msg)
241 {
242 const char *from = (side == ChildSide)
243 ? "Child"
244 : ((side == ParentSide) ? "Parent" : "Unknown");
245 printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
246 }
247
248 bool
249 MessageChannel::Connected() const
250 {
251 mMonitor->AssertCurrentThreadOwns();
252
253 // The transport layer allows us to send messages before
254 // receiving the "connected" ack from the remote side.
255 return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
256 }
257
258 bool
259 MessageChannel::CanSend() const
260 {
261 MonitorAutoLock lock(*mMonitor);
262 return Connected();
263 }
264
265 void
266 MessageChannel::Clear()
267 {
268 // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
269 // AssertWorkerThread().
270 //
271 // Also don't clear mListener. If we clear it, then sending a message
272 // through this channel after it's Clear()'ed can cause this process to
273 // crash.
274 //
275 // In practice, mListener owns the channel, so the channel gets deleted
276 // before mListener. But just to be safe, mListener is a weak pointer.
277
278 mDequeueOneTask->Cancel();
279
280 mWorkerLoop = nullptr;
281 delete mLink;
282 mLink = nullptr;
283
284 if (mChannelErrorTask) {
285 mChannelErrorTask->Cancel();
286 mChannelErrorTask = nullptr;
287 }
288
289 // Free up any memory used by pending messages.
290 mPending.clear();
291 mPendingUrgentRequest = nullptr;
292 mPendingRPCCall = nullptr;
293 mOutOfTurnReplies.clear();
294 while (!mDeferred.empty()) {
295 mDeferred.pop();
296 }
297 }
298
299 bool
300 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
301 {
302 NS_PRECONDITION(!mLink, "Open() called > once");
303
304 mMonitor = new RefCountedMonitor();
305 mWorkerLoop = MessageLoop::current();
306 mWorkerLoopID = mWorkerLoop->id();
307
308 ProcessLink *link = new ProcessLink(this);
309 link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
310 mLink = link;
311 return true;
312 }
313
314 bool
315 MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
316 {
317 // Opens a connection to another thread in the same process.
318
319 // This handshake proceeds as follows:
320 // - Let A be the thread initiating the process (either child or parent)
321 // and B be the other thread.
322 // - A spawns thread for B, obtaining B's message loop
323 // - A creates ProtocolChild and ProtocolParent instances.
324 // Let PA be the one appropriate to A and PB the side for B.
325 // - A invokes PA->Open(PB, ...):
326 // - set state to mChannelOpening
327 // - this will place a work item in B's worker loop (see next bullet)
328 // and then spins until PB->mChannelState becomes mChannelConnected
329 // - meanwhile, on PB's worker loop, the work item is removed and:
330 // - invokes PB->SlaveOpen(PA, ...):
331 // - sets its state and that of PA to Connected
332 NS_PRECONDITION(aTargetChan, "Need a target channel");
333 NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
334
335 CommonThreadOpenInit(aTargetChan, aSide);
336
337 Side oppSide = UnknownSide;
338 switch(aSide) {
339 case ChildSide: oppSide = ParentSide; break;
340 case ParentSide: oppSide = ChildSide; break;
341 case UnknownSide: break;
342 }
343
344 mMonitor = new RefCountedMonitor();
345
346 MonitorAutoLock lock(*mMonitor);
347 mChannelState = ChannelOpening;
348 aTargetLoop->PostTask(
349 FROM_HERE,
350 NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide));
351
352 while (ChannelOpening == mChannelState)
353 mMonitor->Wait();
354 NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
355 return (ChannelConnected == mChannelState);
356 }
357
358 void
359 MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
360 {
361 // Invoked when the other side has begun the open.
362 NS_PRECONDITION(ChannelClosed == mChannelState,
363 "Not currently closed");
364 NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
365 "Target channel not in the process of opening");
366
367 CommonThreadOpenInit(aTargetChan, aSide);
368 mMonitor = aTargetChan->mMonitor;
369
370 MonitorAutoLock lock(*mMonitor);
371 NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
372 "Target channel not in the process of opening");
373 mChannelState = ChannelConnected;
374 aTargetChan->mChannelState = ChannelConnected;
375 aTargetChan->mMonitor->Notify();
376 }
377
378 void
379 MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
380 {
381 mWorkerLoop = MessageLoop::current();
382 mWorkerLoopID = mWorkerLoop->id();
383 mLink = new ThreadLink(this, aTargetChan);
384 mSide = aSide;
385 }
386
387 bool
388 MessageChannel::Echo(Message* aMsg)
389 {
390 nsAutoPtr<Message> msg(aMsg);
391 AssertWorkerThread();
392 mMonitor->AssertNotCurrentThreadOwns();
393 if (MSG_ROUTING_NONE == msg->routing_id()) {
394 ReportMessageRouteError("MessageChannel::Echo");
395 return false;
396 }
397
398 MonitorAutoLock lock(*mMonitor);
399
400 if (!Connected()) {
401 ReportConnectionError("MessageChannel");
402 return false;
403 }
404
405 mLink->EchoMessage(msg.forget());
406 return true;
407 }
408
409 bool
410 MessageChannel::Send(Message* aMsg)
411 {
412 CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);
413
414 nsAutoPtr<Message> msg(aMsg);
415 AssertWorkerThread();
416 mMonitor->AssertNotCurrentThreadOwns();
417 if (MSG_ROUTING_NONE == msg->routing_id()) {
418 ReportMessageRouteError("MessageChannel::Send");
419 return false;
420 }
421
422 MonitorAutoLock lock(*mMonitor);
423 if (!Connected()) {
424 ReportConnectionError("MessageChannel");
425 return false;
426 }
427 mLink->SendMessage(msg.forget());
428 return true;
429 }
430
431 bool
432 MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
433 {
434 AssertLinkThread();
435 mMonitor->AssertCurrentThreadOwns();
436
437 if (MSG_ROUTING_NONE == aMsg.routing_id() &&
438 GOODBYE_MESSAGE_TYPE == aMsg.type())
439 {
440 // :TODO: Sort out Close() on this side racing with Close() on the
441 // other side
442 mChannelState = ChannelClosing;
443 if (LoggingEnabled()) {
444 printf("NOTE: %s process received `Goodbye', closing down\n",
445 (mSide == ChildSide) ? "child" : "parent");
446 }
447 return true;
448 }
449 return false;
450 }
451
452 void
453 MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
454 {
455 AssertLinkThread();
456 mMonitor->AssertCurrentThreadOwns();
457
458 if (MaybeInterceptSpecialIOMessage(aMsg))
459 return;
460
461 // Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply,
462 // we know that it needs to be immediately handled to unblock us.
463 if ((AwaitingSyncReply() && aMsg.is_sync()) ||
464 (AwaitingUrgentReply() && aMsg.is_urgent()) ||
465 (AwaitingRPCReply() && aMsg.is_rpc()))
466 {
467 mRecvd = new Message(aMsg);
468 NotifyWorkerThread();
469 return;
470 }
471
472 // Urgent messages cannot be compressed.
473 MOZ_ASSERT(!aMsg.compress() || !aMsg.is_urgent());
474
475 bool compress = (aMsg.compress() && !mPending.empty() &&
476 mPending.back().type() == aMsg.type() &&
477 mPending.back().routing_id() == aMsg.routing_id());
478 if (compress) {
479 // This message type has compression enabled, and the back of the
480 // queue was the same message type and routed to the same destination.
481 // Replace it with the newer message.
482 MOZ_ASSERT(mPending.back().compress());
483 mPending.pop_back();
484 }
485
486 bool shouldWakeUp = AwaitingInterruptReply() ||
487 // Allow incoming RPCs to be processed inside an urgent message.
488 (AwaitingUrgentReply() && aMsg.is_rpc()) ||
489 // Always process urgent messages while blocked.
490 ((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent());
491
492 // There are four cases we're concerned about, relating to the state of the
493 // main thread:
494 //
495 // (1) We are waiting on a sync|rpc reply - main thread is blocked on the
496 // IPC monitor.
497 // - If the message is high priority, we wake up the main thread to
498 // deliver the message. Otherwise, we leave it in the mPending queue,
499 // posting a task to the main event loop, where it will be processed
500 // once the synchronous reply has been received.
501 //
502 // (2) We are waiting on an Interrupt reply - main thread is blocked on the
503 // IPC monitor.
504 // - Always notify and wake up the main thread.
505 //
506 // (3) We are not waiting on a reply.
507 // - We post a task to the main event loop.
508 //
509 // Note that, we may notify the main thread even though the monitor is not
510 // blocked. This is okay, since we always check for pending events before
511 // blocking again.
512
513 if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) {
514 // If we're receiving an RPC message while blocked on an urgent message,
515 // we must defer any messages that were not sent as part of the child
516 // answering the urgent message.
517 //
518 // We must also be sure that we will not accidentally defer any RPC
519 // message that was sent while answering an urgent message. Otherwise,
520 // we will deadlock.
521 //
522 // On the parent side, the current transaction can only transition from 0
523 // to an ID, either by us issuing an urgent request while not blocked, or
524 // by receiving an RPC request while not blocked. When we unblock, the
525 // current transaction is reset to 0.
526 //
527 // When the child side receives an urgent message, any RPC messages sent
528 // before issuing the urgent reply will carry the urgent message's
529 // transaction ID.
530 //
531 // Since AwaitingUrgentReply() implies we are blocked, it also implies
532 // that we are within a transaction that will not change until we are
533 // completely unblocked (i.e, the transaction has completed).
534 if (aMsg.transaction_id() != mCurrentRPCTransaction)
535 shouldWakeUp = false;
536 }
537
538 if (aMsg.is_urgent()) {
539 MOZ_ASSERT(!mPendingUrgentRequest);
540 mPendingUrgentRequest = new Message(aMsg);
541 } else if (aMsg.is_rpc() && shouldWakeUp) {
542 // Only use this slot if we need to wake up for an RPC call. Otherwise
543 // we treat it like a normal async or sync message.
544 MOZ_ASSERT(!mPendingRPCCall);
545 mPendingRPCCall = new Message(aMsg);
546 } else {
547 mPending.push_back(aMsg);
548 }
549
550 if (shouldWakeUp) {
551 // Always wake up Interrupt waiters, sync waiters for urgent messages,
552 // RPC waiters for urgent messages, and urgent waiters for RPCs in the
553 // same transaction.
554 NotifyWorkerThread();
555 } else {
556 // Worker thread is either not blocked on a reply, or this is an
557 // incoming Interrupt that raced with outgoing sync, and needs to be
558 // deferred to a later event-loop iteration.
559 if (!compress) {
560 // If we compressed away the previous message, we'll re-use
561 // its pending task.
562 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
563 }
564 }
565 }
566
567 bool
568 MessageChannel::Send(Message* aMsg, Message* aReply)
569 {
570 // Sanity checks.
571 AssertWorkerThread();
572 mMonitor->AssertNotCurrentThreadOwns();
573
574 #ifdef OS_WIN
575 SyncStackFrame frame(this, false);
576 #endif
577
578 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
579
580 MonitorAutoLock lock(*mMonitor);
581
582 IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
583 IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant");
584 IPC_ASSERT(!DispatchingUrgentMessage(), "sync messages forbidden while handling urgent message");
585 IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported");
586
587 AutoEnterPendingReply replies(mPendingSyncReplies);
588 if (!SendAndWait(aMsg, aReply))
589 return false;
590
591 NS_ABORT_IF_FALSE(aReply->is_sync(), "reply is not sync");
592 return true;
593 }
594
595 bool
596 MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
597 {
598 AssertWorkerThread();
599 mMonitor->AssertNotCurrentThreadOwns();
600 IPC_ASSERT(mSide == ParentSide, "cannot send urgent requests from child");
601
602 #ifdef OS_WIN
603 SyncStackFrame frame(this, false);
604 #endif
605
606 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
607
608 MonitorAutoLock lock(*mMonitor);
609
610 IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls");
611 IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends");
612
613 AutoEnterRPCTransaction transact(this);
614 aMsg->set_transaction_id(mCurrentRPCTransaction);
615
616 AutoEnterPendingReply replies(mPendingUrgentReplies);
617 if (!SendAndWait(aMsg, aReply))
618 return false;
619
620 NS_ABORT_IF_FALSE(aReply->is_urgent(), "reply is not urgent");
621 return true;
622 }
623
624 bool
625 MessageChannel::RPCCall(Message* aMsg, Message* aReply)
626 {
627 AssertWorkerThread();
628 mMonitor->AssertNotCurrentThreadOwns();
629 IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent");
630
631 #ifdef OS_WIN
632 SyncStackFrame frame(this, false);
633 #endif
634
635 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
636
637 MonitorAutoLock lock(*mMonitor);
638
639 AutoEnterRPCTransaction transact(this);
640 aMsg->set_transaction_id(mCurrentRPCTransaction);
641
642 AutoEnterPendingReply replies(mPendingRPCReplies);
643 if (!SendAndWait(aMsg, aReply))
644 return false;
645
646 NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply");
647 return true;
648 }
649
650 bool
651 MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
652 {
653 mMonitor->AssertCurrentThreadOwns();
654
655 nsAutoPtr<Message> msg(aMsg);
656
657 if (!Connected()) {
658 ReportConnectionError("MessageChannel::SendAndWait");
659 return false;
660 }
661
662 msg->set_seqno(NextSeqno());
663
664 DebugOnly<int32_t> replySeqno = msg->seqno();
665 DebugOnly<msgid_t> replyType = msg->type() + 1;
666
667 mLink->SendMessage(msg.forget());
668
669 while (true) {
670 // Wait for an event to occur.
671 while (true) {
672 if (mRecvd || mPendingUrgentRequest || mPendingRPCCall)
673 break;
674
675 bool maybeTimedOut = !WaitForSyncNotify();
676
677 if (!Connected()) {
678 ReportConnectionError("MessageChannel::SendAndWait");
679 return false;
680 }
681
682 if (maybeTimedOut && !ShouldContinueFromTimeout())
683 return false;
684 }
685
686 if (mPendingUrgentRequest && !ProcessPendingUrgentRequest())
687 return false;
688
689 if (mPendingRPCCall && !ProcessPendingRPCCall())
690 return false;
691
692 if (mRecvd) {
693 NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply");
694
695 if (mRecvd->is_reply_error()) {
696 mRecvd = nullptr;
697 return false;
698 }
699
700 NS_ABORT_IF_FALSE(mRecvd->type() == replyType, "wrong reply type");
701 NS_ABORT_IF_FALSE(mRecvd->seqno() == replySeqno, "wrong sequence number");
702
703 *aReply = *mRecvd;
704 mRecvd = nullptr;
705 return true;
706 }
707 }
708
709 return true;
710 }
711
712 bool
713 MessageChannel::Call(Message* aMsg, Message* aReply)
714 {
715 if (aMsg->is_urgent())
716 return UrgentCall(aMsg, aReply);
717 if (aMsg->is_rpc())
718 return RPCCall(aMsg, aReply);
719 return InterruptCall(aMsg, aReply);
720 }
721
722 bool
723 MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
724 {
725 AssertWorkerThread();
726 mMonitor->AssertNotCurrentThreadOwns();
727
728 #ifdef OS_WIN
729 SyncStackFrame frame(this, true);
730 #endif
731
732 // This must come before MonitorAutoLock, as its destructor acquires the
733 // monitor lock.
734 CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg);
735
736 MonitorAutoLock lock(*mMonitor);
737 if (!Connected()) {
738 ReportConnectionError("MessageChannel::Call");
739 return false;
740 }
741
742 // Sanity checks.
743 IPC_ASSERT(!AwaitingSyncReply() && !AwaitingUrgentReply(),
744 "cannot issue Interrupt call whiel blocked on sync or urgent");
745 IPC_ASSERT(!DispatchingSyncMessage() || aMsg->priority() == IPC::Message::PRIORITY_HIGH,
746 "violation of sync handler invariant");
747 IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here");
748
749
750 nsAutoPtr<Message> msg(aMsg);
751
752 msg->set_seqno(NextSeqno());
753 msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
754 msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
755 mInterruptStack.push(*msg);
756 mLink->SendMessage(msg.forget());
757
758 while (true) {
759 // if a handler invoked by *Dispatch*() spun a nested event
760 // loop, and the connection was broken during that loop, we
761 // might have already processed the OnError event. if so,
762 // trying another loop iteration will be futile because
763 // channel state will have been cleared
764 if (!Connected()) {
765 ReportConnectionError("MessageChannel::InterruptCall");
766 return false;
767 }
768
769 // Now might be the time to process a message deferred because of race
770 // resolution.
771 MaybeUndeferIncall();
772
773 // Wait for an event to occur.
774 while (!InterruptEventOccurred()) {
775 bool maybeTimedOut = !WaitForInterruptNotify();
776
777 // We might have received a "subtly deferred" message in a nested
778 // loop that it's now time to process.
779 if (InterruptEventOccurred() ||
780 (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
781 {
782 break;
783 }
784
785 if (maybeTimedOut && !ShouldContinueFromTimeout())
786 return false;
787 }
788
789 Message recvd;
790 MessageMap::iterator it;
791
792 if (mPendingUrgentRequest) {
793 recvd = *mPendingUrgentRequest;
794 mPendingUrgentRequest = nullptr;
795 } else if (mPendingRPCCall) {
796 recvd = *mPendingRPCCall;
797 mPendingRPCCall = nullptr;
798 } else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
799 != mOutOfTurnReplies.end())
800 {
801 recvd = it->second;
802 mOutOfTurnReplies.erase(it);
803 } else if (!mPending.empty()) {
804 recvd = mPending.front();
805 mPending.pop_front();
806 } else {
807 // because of subtleties with nested event loops, it's possible
808 // that we got here and nothing happened. or, we might have a
809 // deferred in-call that needs to be processed. either way, we
810 // won't break the inner while loop again until something new
811 // happens.
812 continue;
813 }
814
815 // If the message is not Interrupt, we can dispatch it as normal.
816 if (!recvd.is_interrupt()) {
817 // Other side should be blocked.
818 IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked");
819
820 {
821 AutoEnterRPCTransaction transaction(this, &recvd);
822 MonitorAutoUnlock unlock(*mMonitor);
823 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
824 DispatchMessage(recvd);
825 }
826 if (!Connected()) {
827 ReportConnectionError("MessageChannel::DispatchMessage");
828 return false;
829 }
830 continue;
831 }
832
833 // If the message is an Interrupt reply, either process it as a reply to our
834 // call, or add it to the list of out-of-turn replies we've received.
835 if (recvd.is_reply()) {
836 IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");
837
838 // If this is not a reply the call we've initiated, add it to our
839 // out-of-turn replies and keep polling for events.
840 {
841 const Message &outcall = mInterruptStack.top();
842
843 // Note, In the parent, sequence numbers increase from 0, and
844 // in the child, they decrease from 0.
845 if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
846 (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
847 {
848 mOutOfTurnReplies[recvd.seqno()] = recvd;
849 continue;
850 }
851
852 IPC_ASSERT(recvd.is_reply_error() ||
853 (recvd.type() == (outcall.type() + 1) &&
854 recvd.seqno() == outcall.seqno()),
855 "somebody's misbehavin'", true);
856 }
857
858 // We received a reply to our most recent outstanding call. Pop
859 // this frame and return the reply.
860 mInterruptStack.pop();
861
862 if (!recvd.is_reply_error()) {
863 *aReply = recvd;
864 }
865
866 // If we have no more pending out calls waiting on replies, then
867 // the reply queue should be empty.
868 IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
869 "still have pending replies with no pending out-calls",
870 true);
871
872 return !recvd.is_reply_error();
873 }
874
875 // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
876 // own the monitor.
877 size_t stackDepth = InterruptStackDepth();
878 {
879 MonitorAutoUnlock unlock(*mMonitor);
880
881 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
882 DispatchInterruptMessage(recvd, stackDepth);
883 }
884 if (!Connected()) {
885 ReportConnectionError("MessageChannel::DispatchInterruptMessage");
886 return false;
887 }
888 }
889
890 return true;
891 }
892
893 bool
894 MessageChannel::InterruptEventOccurred()
895 {
896 AssertWorkerThread();
897 mMonitor->AssertCurrentThreadOwns();
898 IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
899
900 return (!Connected() ||
901 !mPending.empty() ||
902 mPendingUrgentRequest ||
903 mPendingRPCCall ||
904 (!mOutOfTurnReplies.empty() &&
905 mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
906 mOutOfTurnReplies.end()));
907 }
908
909 bool
910 MessageChannel::ProcessPendingUrgentRequest()
911 {
912 AssertWorkerThread();
913 mMonitor->AssertCurrentThreadOwns();
914
915 // Note that it is possible we could have sent a sync message at
916 // the same time the parent process sent an urgent message, and
917 // therefore mPendingUrgentRequest is set *and* mRecvd is set as
918 // well, because the link thread received both before the worker
919 // thread woke up.
920 //
921 // In this case, we process the urgent message first, but we need
922 // to save the reply.
923 nsAutoPtr<Message> savedReply(mRecvd.forget());
924
925 // We're the child process. We should not be receiving RPC calls.
926 IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call");
927
928 nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
929 {
930 // In order to send the parent RPC messages and guarantee it will
931 // wake up, we must re-use its transaction.
932 AutoEnterRPCTransaction transaction(this, recvd);
933
934 MonitorAutoUnlock unlock(*mMonitor);
935 DispatchUrgentMessage(*recvd);
936 }
937 if (!Connected()) {
938 ReportConnectionError("MessageChannel::DispatchUrgentMessage");
939 return false;
940 }
941
942 // In between having dispatched our reply to the parent process, and
943 // re-acquiring the monitor, the parent process could have already
944 // processed that reply and sent the reply to our sync message. If so,
945 // our saved reply should be empty.
946 IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
947 if (!mRecvd)
948 mRecvd = savedReply.forget();
949 return true;
950 }
951
952 bool
953 MessageChannel::ProcessPendingRPCCall()
954 {
955 AssertWorkerThread();
956 mMonitor->AssertCurrentThreadOwns();
957
958 // See comment above re: mRecvd replies and incoming calls.
959 nsAutoPtr<Message> savedReply(mRecvd.forget());
960
961 IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message");
962
963 nsAutoPtr<Message> recvd(mPendingRPCCall.forget());
964 {
965 // If we are not currently in a transaction, this will begin one,
966 // and the link thread will not wake us up for any RPC messages not
967 // apart of this transaction. If we are already in a transaction,
968 // then this will assert that we're still in the same transaction.
969 AutoEnterRPCTransaction transaction(this, recvd);
970
971 MonitorAutoUnlock unlock(*mMonitor);
972 DispatchRPCMessage(*recvd);
973 }
974 if (!Connected()) {
975 ReportConnectionError("MessageChannel::DispatchRPCMessage");
976 return false;
977 }
978
979 // In between having dispatched our reply to the parent process, and
980 // re-acquiring the monitor, the parent process could have already
981 // processed that reply and sent the reply to our sync message. If so,
982 // our saved reply should be empty.
983 IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
984 if (!mRecvd)
985 mRecvd = savedReply.forget();
986 return true;
987 }
988
989 bool
990 MessageChannel::DequeueOne(Message *recvd)
991 {
992 AssertWorkerThread();
993 mMonitor->AssertCurrentThreadOwns();
994
995 if (!Connected()) {
996 ReportConnectionError("OnMaybeDequeueOne");
997 return false;
998 }
999
1000 if (mPendingUrgentRequest) {
1001 *recvd = *mPendingUrgentRequest;
1002 mPendingUrgentRequest = nullptr;
1003 return true;
1004 }
1005
1006 if (mPendingRPCCall) {
1007 *recvd = *mPendingRPCCall;
1008 mPendingRPCCall = nullptr;
1009 return true;
1010 }
1011
1012 if (!mDeferred.empty())
1013 MaybeUndeferIncall();
1014
1015 if (mPending.empty())
1016 return false;
1017
1018 *recvd = mPending.front();
1019 mPending.pop_front();
1020 return true;
1021 }
1022
1023 bool
1024 MessageChannel::OnMaybeDequeueOne()
1025 {
1026 AssertWorkerThread();
1027 mMonitor->AssertNotCurrentThreadOwns();
1028
1029 Message recvd;
1030
1031 MonitorAutoLock lock(*mMonitor);
1032 if (!DequeueOne(&recvd))
1033 return false;
1034
1035 if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
1036 // We probably just received a reply in a nested loop for an
1037 // Interrupt call sent before entering that loop.
1038 mOutOfTurnReplies[recvd.seqno()] = recvd;
1039 return false;
1040 }
1041
1042 {
1043 // We should not be in a transaction yet if we're not blocked.
1044 MOZ_ASSERT(mCurrentRPCTransaction == 0);
1045 AutoEnterRPCTransaction transaction(this, &recvd);
1046
1047 MonitorAutoUnlock unlock(*mMonitor);
1048
1049 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
1050 DispatchMessage(recvd);
1051 }
1052 return true;
1053 }
1054
1055 void
1056 MessageChannel::DispatchMessage(const Message &aMsg)
1057 {
1058 if (aMsg.is_sync())
1059 DispatchSyncMessage(aMsg);
1060 else if (aMsg.is_urgent())
1061 DispatchUrgentMessage(aMsg);
1062 else if (aMsg.is_interrupt())
1063 DispatchInterruptMessage(aMsg, 0);
1064 else if (aMsg.is_rpc())
1065 DispatchRPCMessage(aMsg);
1066 else
1067 DispatchAsyncMessage(aMsg);
1068 }
1069
1070 void
1071 MessageChannel::DispatchSyncMessage(const Message& aMsg)
1072 {
1073 AssertWorkerThread();
1074
1075 Message *reply = nullptr;
1076
1077 mDispatchingSyncMessage = true;
1078 Result rv = mListener->OnMessageReceived(aMsg, reply);
1079 mDispatchingSyncMessage = false;
1080
1081 if (!MaybeHandleError(rv, "DispatchSyncMessage")) {
1082 delete reply;
1083 reply = new Message();
1084 reply->set_sync();
1085 reply->set_reply();
1086 reply->set_reply_error();
1087 }
1088 reply->set_seqno(aMsg.seqno());
1089
1090 MonitorAutoLock lock(*mMonitor);
1091 if (ChannelConnected == mChannelState)
1092 mLink->SendMessage(reply);
1093 }
1094
1095 void
1096 MessageChannel::DispatchUrgentMessage(const Message& aMsg)
1097 {
1098 AssertWorkerThread();
1099 MOZ_ASSERT(aMsg.is_urgent());
1100
1101 Message *reply = nullptr;
1102
1103 mDispatchingUrgentMessageCount++;
1104 Result rv = mListener->OnCallReceived(aMsg, reply);
1105 mDispatchingUrgentMessageCount--;
1106
1107 if (!MaybeHandleError(rv, "DispatchUrgentMessage")) {
1108 delete reply;
1109 reply = new Message();
1110 reply->set_urgent();
1111 reply->set_reply();
1112 reply->set_reply_error();
1113 }
1114 reply->set_seqno(aMsg.seqno());
1115
1116 MonitorAutoLock lock(*mMonitor);
1117 if (ChannelConnected == mChannelState)
1118 mLink->SendMessage(reply);
1119 }
1120
1121 void
1122 MessageChannel::DispatchRPCMessage(const Message& aMsg)
1123 {
1124 AssertWorkerThread();
1125 MOZ_ASSERT(aMsg.is_rpc());
1126
1127 Message *reply = nullptr;
1128
1129 if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) {
1130 delete reply;
1131 reply = new Message();
1132 reply->set_rpc();
1133 reply->set_reply();
1134 reply->set_reply_error();
1135 }
1136 reply->set_seqno(aMsg.seqno());
1137
1138 MonitorAutoLock lock(*mMonitor);
1139 if (ChannelConnected == mChannelState)
1140 mLink->SendMessage(reply);
1141 }
1142
1143 void
1144 MessageChannel::DispatchAsyncMessage(const Message& aMsg)
1145 {
1146 AssertWorkerThread();
1147 MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync() && !aMsg.is_urgent());
1148
1149 if (aMsg.routing_id() == MSG_ROUTING_NONE) {
1150 NS_RUNTIMEABORT("unhandled special message!");
1151 }
1152
1153 MaybeHandleError(mListener->OnMessageReceived(aMsg), "DispatchAsyncMessage");
1154 }
1155
1156 void
1157 MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth)
1158 {
1159 AssertWorkerThread();
1160 mMonitor->AssertNotCurrentThreadOwns();
1161
1162 IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");
1163
1164 // Race detection: see the long comment near mRemoteStackDepthGuess in
1165 // MessageChannel.h. "Remote" stack depth means our side, and "local" means
1166 // the other side.
1167 if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
1168 // Interrupt in-calls have raced. The winner, if there is one, gets to defer
1169 // processing of the other side's in-call.
1170 bool defer;
1171 const char* winner;
1172 switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(),
1173 (mSide != ChildSide) ? mInterruptStack.top() : aMsg))
1174 {
1175 case RIPChildWins:
1176 winner = "child";
1177 defer = (mSide == ChildSide);
1178 break;
1179 case RIPParentWins:
1180 winner = "parent";
1181 defer = (mSide != ChildSide);
1182 break;
1183 case RIPError:
1184 NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
1185 return;
1186 default:
1187 NS_RUNTIMEABORT("not reached");
1188 return;
1189 }
1190
1191 if (LoggingEnabled()) {
1192 printf_stderr(" (%s: %s won, so we're%sdeferring)\n",
1193 (mSide == ChildSide) ? "child" : "parent",
1194 winner,
1195 defer ? " " : " not ");
1196 }
1197
1198 if (defer) {
1199 // We now know the other side's stack has one more frame
1200 // than we thought.
1201 ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
1202 mDeferred.push(aMsg);
1203 return;
1204 }
1205
1206 // We "lost" and need to process the other side's in-call. Don't need
1207 // to fix up the mRemoteStackDepthGuess here, because we're just about
1208 // to increment it in DispatchCall(), which will make it correct again.
1209 }
1210
1211 #ifdef OS_WIN
1212 SyncStackFrame frame(this, true);
1213 #endif
1214
1215 Message* reply = nullptr;
1216
1217 ++mRemoteStackDepthGuess;
1218 Result rv = mListener->OnCallReceived(aMsg, reply);
1219 --mRemoteStackDepthGuess;
1220
1221 if (!MaybeHandleError(rv, "DispatchInterruptMessage")) {
1222 delete reply;
1223 reply = new Message();
1224 reply->set_interrupt();
1225 reply->set_reply();
1226 reply->set_reply_error();
1227 }
1228 reply->set_seqno(aMsg.seqno());
1229
1230 MonitorAutoLock lock(*mMonitor);
1231 if (ChannelConnected == mChannelState)
1232 mLink->SendMessage(reply);
1233 }
1234
1235 void
1236 MessageChannel::MaybeUndeferIncall()
1237 {
1238 AssertWorkerThread();
1239 mMonitor->AssertCurrentThreadOwns();
1240
1241 if (mDeferred.empty())
1242 return;
1243
1244 size_t stackDepth = InterruptStackDepth();
1245
1246 // the other side can only *under*-estimate our actual stack depth
1247 IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
1248 "fatal logic error");
1249
1250 if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
1251 return;
1252
1253 // maybe time to process this message
1254 Message call = mDeferred.top();
1255 mDeferred.pop();
1256
1257 // fix up fudge factor we added to account for race
1258 IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
1259 --mRemoteStackDepthGuess;
1260
1261 mPending.push_back(call);
1262 }
1263
1264 void
1265 MessageChannel::FlushPendingInterruptQueue()
1266 {
1267 AssertWorkerThread();
1268 mMonitor->AssertNotCurrentThreadOwns();
1269
1270 {
1271 MonitorAutoLock lock(*mMonitor);
1272
1273 if (mDeferred.empty()) {
1274 if (mPending.empty())
1275 return;
1276
1277 const Message& last = mPending.back();
1278 if (!last.is_interrupt() || last.is_reply())
1279 return;
1280 }
1281 }
1282
1283 while (OnMaybeDequeueOne());
1284 }
1285
1286 void
1287 MessageChannel::ExitedCxxStack()
1288 {
1289 mListener->OnExitedCxxStack();
1290 if (mSawInterruptOutMsg) {
1291 MonitorAutoLock lock(*mMonitor);
1292 // see long comment in OnMaybeDequeueOne()
1293 EnqueuePendingMessages();
1294 mSawInterruptOutMsg = false;
1295 }
1296 }
1297
1298 void
1299 MessageChannel::EnqueuePendingMessages()
1300 {
1301 AssertWorkerThread();
1302 mMonitor->AssertCurrentThreadOwns();
1303
1304 MaybeUndeferIncall();
1305
1306 for (size_t i = 0; i < mDeferred.size(); ++i) {
1307 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
1308 }
1309
1310 // XXX performance tuning knob: could process all or k pending
1311 // messages here, rather than enqueuing for later processing
1312
1313 for (size_t i = 0; i < mPending.size(); ++i) {
1314 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
1315 }
1316 }
1317
1318 static inline bool
1319 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
1320 {
1321 return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
1322 (aTimeout <= (PR_IntervalNow() - aStart));
1323 }
1324
1325 bool
1326 MessageChannel::WaitResponse(bool aWaitTimedOut)
1327 {
1328 if (aWaitTimedOut) {
1329 if (mInTimeoutSecondHalf) {
1330 // We've really timed out this time.
1331 return false;
1332 }
1333 // Try a second time.
1334 mInTimeoutSecondHalf = true;
1335 } else {
1336 mInTimeoutSecondHalf = false;
1337 }
1338 return true;
1339 }
1340
1341 #ifndef OS_WIN
1342 bool
1343 MessageChannel::WaitForSyncNotify()
1344 {
1345 PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
1346 PR_INTERVAL_NO_TIMEOUT :
1347 PR_MillisecondsToInterval(mTimeoutMs);
1348 // XXX could optimize away this syscall for "no timeout" case if desired
1349 PRIntervalTime waitStart = PR_IntervalNow();
1350
1351 mMonitor->Wait(timeout);
1352
1353 // If the timeout didn't expire, we know we received an event. The
1354 // converse is not true.
1355 return WaitResponse(IsTimeoutExpired(waitStart, timeout));
1356 }
1357
1358 bool
1359 MessageChannel::WaitForInterruptNotify()
1360 {
1361 return WaitForSyncNotify();
1362 }
1363
1364 void
1365 MessageChannel::NotifyWorkerThread()
1366 {
1367 mMonitor->Notify();
1368 }
1369 #endif
1370
1371 bool
1372 MessageChannel::ShouldContinueFromTimeout()
1373 {
1374 AssertWorkerThread();
1375 mMonitor->AssertCurrentThreadOwns();
1376
1377 bool cont;
1378 {
1379 MonitorAutoUnlock unlock(*mMonitor);
1380 cont = mListener->OnReplyTimeout();
1381 }
1382
1383 static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
1384
1385 if (sDebuggingChildren == UNKNOWN) {
1386 sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
1387 }
1388 if (sDebuggingChildren == DEBUGGING) {
1389 return true;
1390 }
1391
1392 if (!cont) {
1393 // NB: there's a sublety here. If parents were allowed to send sync
1394 // messages to children, then it would be possible for this
1395 // synchronous close-on-timeout to race with async |OnMessageReceived|
1396 // tasks arriving from the child, posted to the worker thread's event
1397 // loop. This would complicate cleanup of the *Channel. But since
1398 // IPDL forbids this (and since it doesn't support children timing out
1399 // on parents), the parent can only block on interrupt messages to the child,
1400 // and in that case arriving async messages are enqueued to the interrupt
1401 // channel's special queue. They're then ignored because the channel
1402 // state changes to ChannelTimeout (i.e. !Connected).
1403 SynchronouslyClose();
1404 mChannelState = ChannelTimeout;
1405 }
1406
1407 return cont;
1408 }
1409
1410 void
1411 MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
1412 {
1413 // Set channel timeout value. Since this is broken up into
1414 // two period, the minimum timeout value is 2ms.
1415 AssertWorkerThread();
1416 mTimeoutMs = (aTimeoutMs <= 0)
1417 ? kNoTimeout
1418 : (int32_t)ceil((double)aTimeoutMs / 2.0);
1419 }
1420
1421 void
1422 MessageChannel::OnChannelConnected(int32_t peer_id)
1423 {
1424 mWorkerLoop->PostTask(
1425 FROM_HERE,
1426 NewRunnableMethod(this,
1427 &MessageChannel::DispatchOnChannelConnected,
1428 peer_id));
1429 }
1430
1431 void
1432 MessageChannel::DispatchOnChannelConnected(int32_t peer_pid)
1433 {
1434 AssertWorkerThread();
1435 if (mListener)
1436 mListener->OnChannelConnected(peer_pid);
1437 }
1438
1439 void
1440 MessageChannel::ReportMessageRouteError(const char* channelName) const
1441 {
1442 PrintErrorMessage(mSide, channelName, "Need a route");
1443 mListener->OnProcessingError(MsgRouteError);
1444 }
1445
1446 void
1447 MessageChannel::ReportConnectionError(const char* aChannelName) const
1448 {
1449 AssertWorkerThread();
1450 mMonitor->AssertCurrentThreadOwns();
1451
1452 const char* errorMsg = nullptr;
1453 switch (mChannelState) {
1454 case ChannelClosed:
1455 errorMsg = "Closed channel: cannot send/recv";
1456 break;
1457 case ChannelOpening:
1458 errorMsg = "Opening channel: not yet ready for send/recv";
1459 break;
1460 case ChannelTimeout:
1461 errorMsg = "Channel timeout: cannot send/recv";
1462 break;
1463 case ChannelClosing:
1464 errorMsg = "Channel closing: too late to send/recv, messages will be lost";
1465 break;
1466 case ChannelError:
1467 errorMsg = "Channel error: cannot send/recv";
1468 break;
1469
1470 default:
1471 NS_RUNTIMEABORT("unreached");
1472 }
1473
1474 PrintErrorMessage(mSide, aChannelName, errorMsg);
1475
1476 MonitorAutoUnlock unlock(*mMonitor);
1477 mListener->OnProcessingError(MsgDropped);
1478 }
1479
1480 bool
1481 MessageChannel::MaybeHandleError(Result code, const char* channelName)
1482 {
1483 if (MsgProcessed == code)
1484 return true;
1485
1486 const char* errorMsg = nullptr;
1487 switch (code) {
1488 case MsgNotKnown:
1489 errorMsg = "Unknown message: not processed";
1490 break;
1491 case MsgNotAllowed:
1492 errorMsg = "Message not allowed: cannot be sent/recvd in this state";
1493 break;
1494 case MsgPayloadError:
1495 errorMsg = "Payload error: message could not be deserialized";
1496 break;
1497 case MsgProcessingError:
1498 errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
1499 break;
1500 case MsgRouteError:
1501 errorMsg = "Route error: message sent to unknown actor ID";
1502 break;
1503 case MsgValueError:
1504 errorMsg = "Value error: message was deserialized, but contained an illegal value";
1505 break;
1506
1507 default:
1508 NS_RUNTIMEABORT("unknown Result code");
1509 return false;
1510 }
1511
1512 PrintErrorMessage(mSide, channelName, errorMsg);
1513
1514 mListener->OnProcessingError(code);
1515
1516 return false;
1517 }
1518
1519 void
1520 MessageChannel::OnChannelErrorFromLink()
1521 {
1522 AssertLinkThread();
1523 mMonitor->AssertCurrentThreadOwns();
1524
1525 if (InterruptStackDepth() > 0)
1526 NotifyWorkerThread();
1527
1528 if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply())
1529 NotifyWorkerThread();
1530
1531 if (ChannelClosing != mChannelState) {
1532 if (mAbortOnError) {
1533 NS_RUNTIMEABORT("Aborting on channel error.");
1534 }
1535 mChannelState = ChannelError;
1536 mMonitor->Notify();
1537 }
1538
1539 PostErrorNotifyTask();
1540 }
1541
1542 void
1543 MessageChannel::NotifyMaybeChannelError()
1544 {
1545 mMonitor->AssertNotCurrentThreadOwns();
1546
1547 // TODO sort out Close() on this side racing with Close() on the other side
1548 if (ChannelClosing == mChannelState) {
1549 // the channel closed, but we received a "Goodbye" message warning us
1550 // about it. no worries
1551 mChannelState = ChannelClosed;
1552 NotifyChannelClosed();
1553 return;
1554 }
1555
1556 // Oops, error! Let the listener know about it.
1557 mChannelState = ChannelError;
1558 mListener->OnChannelError();
1559 Clear();
1560 }
1561
1562 void
1563 MessageChannel::OnNotifyMaybeChannelError()
1564 {
1565 AssertWorkerThread();
1566 mMonitor->AssertNotCurrentThreadOwns();
1567
1568 mChannelErrorTask = nullptr;
1569
1570 // OnChannelError holds mMonitor when it posts this task and this
1571 // task cannot be allowed to run until OnChannelError has
1572 // exited. We enforce that order by grabbing the mutex here which
1573 // should only continue once OnChannelError has completed.
1574 {
1575 MonitorAutoLock lock(*mMonitor);
1576 // nothing to do here
1577 }
1578
1579 if (IsOnCxxStack()) {
1580 mChannelErrorTask =
1581 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
1582 // 10 ms delay is completely arbitrary
1583 mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
1584 return;
1585 }
1586
1587 NotifyMaybeChannelError();
1588 }
1589
1590 void
1591 MessageChannel::PostErrorNotifyTask()
1592 {
1593 mMonitor->AssertCurrentThreadOwns();
1594
1595 if (mChannelErrorTask)
1596 return;
1597
1598 // This must be the last code that runs on this thread!
1599 mChannelErrorTask =
1600 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
1601 mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
1602 }
1603
1604 // Special async message.
1605 class GoodbyeMessage : public IPC::Message
1606 {
1607 public:
1608 GoodbyeMessage() :
1609 IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL)
1610 {
1611 }
1612 static bool Read(const Message* msg) {
1613 return true;
1614 }
1615 void Log(const std::string& aPrefix, FILE* aOutf) const {
1616 fputs("(special `Goodbye' message)", aOutf);
1617 }
1618 };
1619
1620 void
1621 MessageChannel::SynchronouslyClose()
1622 {
1623 AssertWorkerThread();
1624 mMonitor->AssertCurrentThreadOwns();
1625 mLink->SendClose();
1626 while (ChannelClosed != mChannelState)
1627 mMonitor->Wait();
1628 }
1629
1630 void
1631 MessageChannel::CloseWithError()
1632 {
1633 AssertWorkerThread();
1634
1635 MonitorAutoLock lock(*mMonitor);
1636 if (ChannelConnected != mChannelState) {
1637 return;
1638 }
1639 SynchronouslyClose();
1640 mChannelState = ChannelError;
1641 PostErrorNotifyTask();
1642 }
1643
1644 void
1645 MessageChannel::Close()
1646 {
1647 AssertWorkerThread();
1648
1649 {
1650 MonitorAutoLock lock(*mMonitor);
1651
1652 if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
1653 // See bug 538586: if the listener gets deleted while the
1654 // IO thread's NotifyChannelError event is still enqueued
1655 // and subsequently deletes us, then the error event will
1656 // also be deleted and the listener will never be notified
1657 // of the channel error.
1658 if (mListener) {
1659 MonitorAutoUnlock unlock(*mMonitor);
1660 NotifyMaybeChannelError();
1661 }
1662 return;
1663 }
1664
1665 if (ChannelOpening == mChannelState) {
1666 // Mimic CloseWithError().
1667 SynchronouslyClose();
1668 mChannelState = ChannelError;
1669 PostErrorNotifyTask();
1670 return;
1671 }
1672
1673 if (ChannelConnected != mChannelState) {
1674 // XXX be strict about this until there's a compelling reason
1675 // to relax
1676 NS_RUNTIMEABORT("Close() called on closed channel!");
1677 }
1678
1679 // notify the other side that we're about to close our socket
1680 mLink->SendMessage(new GoodbyeMessage());
1681 SynchronouslyClose();
1682 }
1683
1684 NotifyChannelClosed();
1685 }
1686
1687 void
1688 MessageChannel::NotifyChannelClosed()
1689 {
1690 mMonitor->AssertNotCurrentThreadOwns();
1691
1692 if (ChannelClosed != mChannelState)
1693 NS_RUNTIMEABORT("channel should have been closed!");
1694
1695 // OK, the IO thread just closed the channel normally. Let the
1696 // listener know about it.
1697 mListener->OnChannelClose();
1698
1699 Clear();
1700 }
1701
1702 void
1703 MessageChannel::DebugAbort(const char* file, int line, const char* cond,
1704 const char* why,
1705 bool reply) const
1706 {
1707 printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
1708 "Assertion (%s) failed. %s %s\n",
1709 mSide == ChildSide ? "Child" : "Parent",
1710 file, line, cond,
1711 why,
1712 reply ? "(reply)" : "");
1713 // technically we need the mutex for this, but we're dying anyway
1714 DumpInterruptStack(" ");
1715 printf_stderr(" remote Interrupt stack guess: %lu\n",
1716 mRemoteStackDepthGuess);
1717 printf_stderr(" deferred stack size: %lu\n",
1718 mDeferred.size());
1719 printf_stderr(" out-of-turn Interrupt replies stack size: %lu\n",
1720 mOutOfTurnReplies.size());
1721 printf_stderr(" Pending queue size: %lu, front to back:\n",
1722 mPending.size());
1723
1724 MessageQueue pending = mPending;
1725 while (!pending.empty()) {
1726 printf_stderr(" [ %s%s ]\n",
1727 pending.front().is_interrupt() ? "intr" :
1728 (pending.front().is_sync() ? "sync" : "async"),
1729 pending.front().is_reply() ? "reply" : "");
1730 pending.pop_front();
1731 }
1732
1733 NS_RUNTIMEABORT(why);
1734 }
1735
1736 void
1737 MessageChannel::DumpInterruptStack(const char* const pfx) const
1738 {
1739 NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
1740 "The worker thread had better be paused in a debugger!");
1741
1742 printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
1743
1744 // print a python-style backtrace, first frame to last
1745 for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
1746 int32_t id;
1747 const char* dir, *sems, *name;
1748 mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
1749
1750 printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
1751 i, dir, sems, name, id);
1752 }
1753 }
1754
1755 } // ipc
1756 } // mozilla

mercurial