Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 * vim: sw=4 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #ifndef ipc_glue_MessageChannel_h
9 #define ipc_glue_MessageChannel_h 1
11 #include "base/basictypes.h"
12 #include "base/message_loop.h"
14 #include "mozilla/Monitor.h"
15 #include "mozilla/Vector.h"
16 #include "mozilla/WeakPtr.h"
17 #include "mozilla/ipc/Transport.h"
18 #include "MessageLink.h"
19 #include "nsAutoPtr.h"
21 #include <deque>
22 #include <stack>
23 #include <math.h>
25 namespace mozilla {
26 namespace ipc {
28 class MessageChannel;
30 class RefCountedMonitor : public Monitor
31 {
32 public:
33 RefCountedMonitor()
34 : Monitor("mozilla.ipc.MessageChannel.mMonitor")
35 {}
37 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
38 };
40 class MessageChannel : HasResultCodes
41 {
42 friend class ProcessLink;
43 friend class ThreadLink;
44 friend class AutoEnterRPCTransaction;
46 class CxxStackFrame;
47 class InterruptFrame;
49 typedef mozilla::Monitor Monitor;
51 public:
52 static const int32_t kNoTimeout;
54 typedef IPC::Message Message;
55 typedef mozilla::ipc::Transport Transport;
57 MessageChannel(MessageListener *aListener);
58 ~MessageChannel();
60 // "Open" from the perspective of the transport layer; the underlying
61 // socketpair/pipe should already be created.
62 //
63 // Returns true if the transport layer was successfully connected,
64 // i.e., mChannelState == ChannelConnected.
65 bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=UnknownSide);
67 // "Open" a connection to another thread in the same process.
68 //
69 // Returns true if the transport layer was successfully connected,
70 // i.e., mChannelState == ChannelConnected.
71 //
72 // For more details on the process of opening a channel between
73 // threads, see the extended comment on this function
74 // in MessageChannel.cpp.
75 bool Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide);
77 // Close the underlying transport channel.
78 void Close();
80 // Force the channel to behave as if a channel error occurred. Valid
81 // for process links only, not thread links.
82 void CloseWithError();
84 void SetAbortOnError(bool abort)
85 {
86 mAbortOnError = true;
87 }
89 // Misc. behavioral traits consumers can request for this channel
90 enum ChannelFlags {
91 REQUIRE_DEFAULT = 0,
92 // Windows: if this channel operates on the UI thread, indicates
93 // WindowsMessageLoop code should enable deferred native message
94 // handling to prevent deadlocks. Should only be used for protocols
95 // that manage child processes which might create native UI, like
96 // plugins.
97 REQUIRE_DEFERRED_MESSAGE_PROTECTION = 1 << 0
98 };
99 void SetChannelFlags(ChannelFlags aFlags) { mFlags = aFlags; }
100 ChannelFlags GetChannelFlags() { return mFlags; }
102 // Asynchronously send a message to the other side of the channel
103 bool Send(Message* aMsg);
105 // Asynchronously deliver a message back to this side of the
106 // channel
107 bool Echo(Message* aMsg);
109 // Synchronously send |msg| (i.e., wait for |reply|)
110 bool Send(Message* aMsg, Message* aReply);
112 // Make an Interrupt call to the other side of the channel
113 bool Call(Message* aMsg, Message* aReply);
115 bool CanSend() const;
117 void SetReplyTimeoutMs(int32_t aTimeoutMs);
119 bool IsOnCxxStack() const {
120 return !mCxxStackFrames.empty();
121 }
123 void FlushPendingInterruptQueue();
125 // Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
126 // thread, but they make no guarantees about whether you'll get an
127 // up-to-date value; the values are written on one thread and read without
128 // locking, on potentially different threads. Thus you should only use
129 // them when you don't particularly care about getting a recent value (e.g.
130 // in a memory report).
131 bool Unsound_IsClosed() const {
132 return mLink ? mLink->Unsound_IsClosed() : true;
133 }
134 uint32_t Unsound_NumQueuedMessages() const {
135 return mLink ? mLink->Unsound_NumQueuedMessages() : 0;
136 }
138 static bool IsPumpingMessages() {
139 return sIsPumpingMessages;
140 }
141 static void SetIsPumpingMessages(bool aIsPumping) {
142 sIsPumpingMessages = aIsPumping;
143 }
145 #ifdef OS_WIN
146 struct MOZ_STACK_CLASS SyncStackFrame
147 {
148 SyncStackFrame(MessageChannel* channel, bool interrupt);
149 ~SyncStackFrame();
151 bool mInterrupt;
152 bool mSpinNestedEvents;
153 bool mListenerNotified;
154 MessageChannel* mChannel;
156 // The previous stack frame for this channel.
157 SyncStackFrame* mPrev;
159 // The previous stack frame on any channel.
160 SyncStackFrame* mStaticPrev;
161 };
162 friend struct MessageChannel::SyncStackFrame;
164 static bool IsSpinLoopActive() {
165 for (SyncStackFrame* frame = sStaticTopFrame; frame; frame = frame->mPrev) {
166 if (frame->mSpinNestedEvents)
167 return true;
168 }
169 return false;
170 }
172 protected:
173 // The deepest sync stack frame for this channel.
174 SyncStackFrame* mTopFrame;
176 bool mIsSyncWaitingOnNonMainThread;
178 // The deepest sync stack frame on any channel.
179 static SyncStackFrame* sStaticTopFrame;
181 public:
182 void ProcessNativeEventsInInterruptCall();
183 static void NotifyGeckoEventDispatch();
185 private:
186 void SpinInternalEventLoop();
187 #endif
189 private:
190 void CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide);
191 void OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide);
193 void PostErrorNotifyTask();
194 void OnNotifyMaybeChannelError();
195 void ReportConnectionError(const char* aChannelName) const;
196 void ReportMessageRouteError(const char* channelName) const;
197 bool MaybeHandleError(Result code, const char* channelName);
199 void Clear();
201 // Send OnChannelConnected notification to listeners.
202 void DispatchOnChannelConnected(int32_t peer_pid);
204 // Any protocol that requires blocking until a reply arrives, will send its
205 // outgoing message through this function. Currently, two protocols do this:
206 //
207 // sync, which can only initiate messages from child to parent.
208 // urgent, which can only initiate messages from parent to child.
209 //
210 // SendAndWait() expects that the worker thread owns the monitor, and that
211 // the message has been prepared to be sent over the link. It returns as
212 // soon as a reply has been received, or an error has occurred.
213 //
214 // Note that while the child is blocked waiting for a sync reply, it can wake
215 // up to process urgent calls from the parent.
216 bool SendAndWait(Message* aMsg, Message* aReply);
218 bool RPCCall(Message* aMsg, Message* aReply);
219 bool InterruptCall(Message* aMsg, Message* aReply);
220 bool UrgentCall(Message* aMsg, Message* aReply);
222 bool InterruptEventOccurred();
224 bool ProcessPendingUrgentRequest();
225 bool ProcessPendingRPCCall();
227 void MaybeUndeferIncall();
228 void EnqueuePendingMessages();
230 // Executed on the worker thread. Dequeues one pending message.
231 bool OnMaybeDequeueOne();
232 bool DequeueOne(Message *recvd);
234 // Dispatches an incoming message to its appropriate handler.
235 void DispatchMessage(const Message &aMsg);
237 // DispatchMessage will route to one of these functions depending on the
238 // protocol type of the message.
239 void DispatchSyncMessage(const Message &aMsg);
240 void DispatchUrgentMessage(const Message &aMsg);
241 void DispatchAsyncMessage(const Message &aMsg);
242 void DispatchRPCMessage(const Message &aMsg);
243 void DispatchInterruptMessage(const Message &aMsg, size_t aStackDepth);
245 // Return true if the wait ended because a notification was received.
246 //
247 // Return false if the time elapsed from when we started the process of
248 // waiting until afterwards exceeded the currently allotted timeout.
249 // That *DOES NOT* mean false => "no event" (== timeout); there are many
250 // circumstances that could cause the measured elapsed time to exceed the
251 // timeout EVEN WHEN we were notified.
252 //
253 // So in sum: true is a meaningful return value; false isn't,
254 // necessarily.
255 bool WaitForSyncNotify();
256 bool WaitForInterruptNotify();
258 bool WaitResponse(bool aWaitTimedOut);
260 bool ShouldContinueFromTimeout();
262 // The "remote view of stack depth" can be different than the
263 // actual stack depth when there are out-of-turn replies. When we
264 // receive one, our actual Interrupt stack depth doesn't decrease, but
265 // the other side (that sent the reply) thinks it has. So, the
266 // "view" returned here is |stackDepth| minus the number of
267 // out-of-turn replies.
268 //
269 // Only called from the worker thread.
270 size_t RemoteViewOfStackDepth(size_t stackDepth) const {
271 AssertWorkerThread();
272 return stackDepth - mOutOfTurnReplies.size();
273 }
275 int32_t NextSeqno() {
276 AssertWorkerThread();
277 return (mSide == ChildSide) ? --mNextSeqno : ++mNextSeqno;
278 }
280 // This helper class manages mCxxStackDepth on behalf of MessageChannel.
281 // When the stack depth is incremented from zero to non-zero, it invokes
282 // a callback, and similarly for when the depth goes from non-zero to zero.
283 void EnteredCxxStack() {
284 mListener->OnEnteredCxxStack();
285 }
287 void ExitedCxxStack();
289 void EnteredCall() {
290 mListener->OnEnteredCall();
291 }
293 void ExitedCall() {
294 mListener->OnExitedCall();
295 }
297 MessageListener *Listener() const {
298 return mListener.get();
299 }
301 void DebugAbort(const char* file, int line, const char* cond,
302 const char* why,
303 bool reply=false) const;
305 // This method is only safe to call on the worker thread, or in a
306 // debugger with all threads paused.
307 void DumpInterruptStack(const char* const pfx="") const;
309 private:
310 // Called from both threads
311 size_t InterruptStackDepth() const {
312 mMonitor->AssertCurrentThreadOwns();
313 return mInterruptStack.size();
314 }
316 // Returns true if we're blocking waiting for a reply.
317 bool AwaitingSyncReply() const {
318 mMonitor->AssertCurrentThreadOwns();
319 return mPendingSyncReplies > 0;
320 }
321 bool AwaitingUrgentReply() const {
322 mMonitor->AssertCurrentThreadOwns();
323 return mPendingUrgentReplies > 0;
324 }
325 bool AwaitingRPCReply() const {
326 mMonitor->AssertCurrentThreadOwns();
327 return mPendingRPCReplies > 0;
328 }
329 bool AwaitingInterruptReply() const {
330 mMonitor->AssertCurrentThreadOwns();
331 return !mInterruptStack.empty();
332 }
334 // Returns true if we're dispatching a sync message's callback.
335 bool DispatchingSyncMessage() const {
336 return mDispatchingSyncMessage;
337 }
339 // Returns true if we're dispatching an urgent message's callback.
340 bool DispatchingUrgentMessage() const {
341 return mDispatchingUrgentMessageCount > 0;
342 }
344 bool Connected() const;
346 private:
347 // Executed on the IO thread.
348 void NotifyWorkerThread();
350 // Return true if |aMsg| is a special message targeted at the IO
351 // thread, in which case it shouldn't be delivered to the worker.
352 bool MaybeInterceptSpecialIOMessage(const Message& aMsg);
354 void OnChannelConnected(int32_t peer_id);
356 // Tell the IO thread to close the channel and wait for it to ACK.
357 void SynchronouslyClose();
359 void OnMessageReceivedFromLink(const Message& aMsg);
360 void OnChannelErrorFromLink();
362 private:
363 // Run on the not current thread.
364 void NotifyChannelClosed();
365 void NotifyMaybeChannelError();
367 private:
368 // Can be run on either thread
369 void AssertWorkerThread() const
370 {
371 NS_ABORT_IF_FALSE(mWorkerLoopID == MessageLoop::current()->id(),
372 "not on worker thread!");
373 }
375 // The "link" thread is either the I/O thread (ProcessLink) or the
376 // other actor's work thread (ThreadLink). In either case, it is
377 // NOT our worker thread.
378 void AssertLinkThread() const
379 {
380 NS_ABORT_IF_FALSE(mWorkerLoopID != MessageLoop::current()->id(),
381 "on worker thread but should not be!");
382 }
384 private:
385 typedef IPC::Message::msgid_t msgid_t;
386 typedef std::deque<Message> MessageQueue;
387 typedef std::map<size_t, Message> MessageMap;
389 // All dequeuing tasks require a single point of cancellation,
390 // which is handled via a reference-counted task.
391 class RefCountedTask
392 {
393 public:
394 RefCountedTask(CancelableTask* aTask)
395 : mTask(aTask)
396 { }
397 ~RefCountedTask() { delete mTask; }
398 void Run() { mTask->Run(); }
399 void Cancel() { mTask->Cancel(); }
401 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
403 private:
404 CancelableTask* mTask;
405 };
407 // Wrap an existing task which can be cancelled at any time
408 // without the wrapper's knowledge.
409 class DequeueTask : public Task
410 {
411 public:
412 DequeueTask(RefCountedTask* aTask)
413 : mTask(aTask)
414 { }
415 void Run() { mTask->Run(); }
417 private:
418 nsRefPtr<RefCountedTask> mTask;
419 };
421 private:
422 mozilla::WeakPtr<MessageListener> mListener;
423 ChannelState mChannelState;
424 nsRefPtr<RefCountedMonitor> mMonitor;
425 Side mSide;
426 MessageLink* mLink;
427 MessageLoop* mWorkerLoop; // thread where work is done
428 CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
430 // id() of mWorkerLoop. This persists even after mWorkerLoop is cleared
431 // during channel shutdown.
432 int mWorkerLoopID;
434 // A task encapsulating dequeuing one pending message.
435 nsRefPtr<RefCountedTask> mDequeueOneTask;
437 // Timeout periods are broken up in two to prevent system suspension from
438 // triggering an abort. This method (called by WaitForEvent with a 'did
439 // timeout' flag) decides if we should wait again for half of mTimeoutMs
440 // or give up.
441 int32_t mTimeoutMs;
442 bool mInTimeoutSecondHalf;
444 // Worker-thread only; sequence numbers for messages that require
445 // synchronous replies.
446 int32_t mNextSeqno;
448 static bool sIsPumpingMessages;
450 class AutoEnterPendingReply {
451 public:
452 AutoEnterPendingReply(size_t &replyVar)
453 : mReplyVar(replyVar)
454 {
455 mReplyVar++;
456 }
457 ~AutoEnterPendingReply() {
458 mReplyVar--;
459 }
460 private:
461 size_t& mReplyVar;
462 };
464 // Worker-thread only; type we're expecting for the reply to a sync
465 // out-message. This will never be greater than 1.
466 size_t mPendingSyncReplies;
468 // Worker-thread only; Number of urgent and rpc replies we're waiting on.
469 // These are mutually exclusive since one channel cannot have outcalls of
470 // both kinds.
471 size_t mPendingUrgentReplies;
472 size_t mPendingRPCReplies;
474 // When we send an urgent request from the parent process, we could race
475 // with an RPC message that was issued by the child beforehand. In this
476 // case, if the parent were to wake up while waiting for the urgent reply,
477 // and process the RPC, it could send an additional urgent message. The
478 // child would wake up to process the urgent message (as it always will),
479 // then send a reply, which could be received by the parent out-of-order
480 // with respect to the first urgent reply.
481 //
482 // To address this problem, urgent or RPC requests are associated with a
483 // "transaction". Whenever one side of the channel wishes to start a
484 // chain of RPC/urgent messages, it allocates a new transaction ID. Any
485 // messages the parent receives, not apart of this transaction, are
486 // deferred. When issuing RPC/urgent requests on top of a started
487 // transaction, the initiating transaction ID is used.
488 //
489 // To ensure IDs are unique, we use sequence numbers for transaction IDs,
490 // which grow in opposite directions from child to parent.
492 // The current transaction ID.
493 int32_t mCurrentRPCTransaction;
495 class AutoEnterRPCTransaction
496 {
497 public:
498 AutoEnterRPCTransaction(MessageChannel *aChan)
499 : mChan(aChan),
500 mOldTransaction(mChan->mCurrentRPCTransaction)
501 {
502 mChan->mMonitor->AssertCurrentThreadOwns();
503 if (mChan->mCurrentRPCTransaction == 0)
504 mChan->mCurrentRPCTransaction = mChan->NextSeqno();
505 }
506 AutoEnterRPCTransaction(MessageChannel *aChan, Message *message)
507 : mChan(aChan),
508 mOldTransaction(mChan->mCurrentRPCTransaction)
509 {
510 mChan->mMonitor->AssertCurrentThreadOwns();
512 if (!message->is_rpc() && !message->is_urgent())
513 return;
515 MOZ_ASSERT_IF(mChan->mSide == ParentSide,
516 !mOldTransaction || mOldTransaction == message->transaction_id());
517 mChan->mCurrentRPCTransaction = message->transaction_id();
518 }
519 ~AutoEnterRPCTransaction() {
520 mChan->mMonitor->AssertCurrentThreadOwns();
521 mChan->mCurrentRPCTransaction = mOldTransaction;
522 }
524 private:
525 MessageChannel *mChan;
526 int32_t mOldTransaction;
527 };
529 // If waiting for the reply to a sync out-message, it will be saved here
530 // on the I/O thread and then read and cleared by the worker thread.
531 nsAutoPtr<Message> mRecvd;
533 // Set while we are dispatching a synchronous message.
534 bool mDispatchingSyncMessage;
536 // Count of the recursion depth of dispatching urgent messages.
537 size_t mDispatchingUrgentMessageCount;
539 // Queue of all incoming messages, except for replies to sync and urgent
540 // messages, which are delivered directly to mRecvd, and any pending urgent
541 // incall, which is stored in mPendingUrgentRequest.
542 //
543 // If both this side and the other side are functioning correctly, the queue
544 // can only be in certain configurations. Let
545 //
546 // |A<| be an async in-message,
547 // |S<| be a sync in-message,
548 // |C<| be an Interrupt in-call,
549 // |R<| be an Interrupt reply.
550 //
551 // The queue can only match this configuration
552 //
553 // A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
554 //
555 // The other side can send as many async messages |A<*| as it wants before
556 // sending us a blocking message.
557 //
558 // The first case is |S<|, a sync in-msg. The other side must be blocked,
559 // and thus can't send us any more messages until we process the sync
560 // in-msg.
561 //
562 // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
563 // (There's a subtlety here: this in-call might have raced with an
564 // out-call, but we detect that with the mechanism below,
565 // |mRemoteStackDepth|, and races don't matter to the queue.)
566 //
567 // Final case, the other side replied to our most recent out-call |R<|.
568 // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
569 // then other side "finished with us," and went back to its own business.
570 // That business might have included sending any number of async message
571 // |A<*| until sending a blocking message |(S< | C<)|. If we had more than
572 // one Interrupt call on our stack, the other side *better* not have sent us
573 // another blocking message, because it's blocked on a reply from us.
574 //
575 MessageQueue mPending;
577 // Note that these two pointers are mutually exclusive. One channel cannot
578 // send both urgent requests (parent -> child) and RPC calls (child->parent).
579 // Also note that since initiating either requires blocking, they cannot
580 // queue up on the other side. One message slot is enough.
581 //
582 // Normally, all other message types are deferred into into mPending, and
583 // only these two types have special treatment (since they wake up blocked
584 // requests). However, when an RPC in-call races with an urgent out-call,
585 // the RPC message will be put into mPending instead of its slot below.
586 nsAutoPtr<Message> mPendingUrgentRequest;
587 nsAutoPtr<Message> mPendingRPCCall;
589 // Stack of all the out-calls on which this channel is awaiting responses.
590 // Each stack refers to a different protocol and the stacks are mutually
591 // exclusive: multiple outcalls of the same kind cannot be initiated while
592 // another is active.
593 std::stack<Message> mInterruptStack;
595 // This is what we think the Interrupt stack depth is on the "other side" of this
596 // Interrupt channel. We maintain this variable so that we can detect racy Interrupt
597 // calls. With each Interrupt out-call sent, we send along what *we* think the
598 // stack depth of the remote side is *before* it will receive the Interrupt call.
599 //
600 // After sending the out-call, our stack depth is "incremented" by pushing
601 // that pending message onto mPending.
602 //
603 // Then when processing an in-call |c|, it must be true that
604 //
605 // mStack.size() == c.remoteDepth
606 //
607 // I.e., my depth is actually the same as what the other side thought it
608 // was when it sent in-call |c|. If this fails to hold, we have detected
609 // racy Interrupt calls.
610 //
611 // We then increment mRemoteStackDepth *just before* processing the
612 // in-call, since we know the other side is waiting on it, and decrement
613 // it *just after* finishing processing that in-call, since our response
614 // will pop the top of the other side's |mPending|.
615 //
616 // One nice aspect of this race detection is that it is symmetric; if one
617 // side detects a race, then the other side must also detect the same race.
618 size_t mRemoteStackDepthGuess;
620 // Approximation of code frames on the C++ stack. It can only be
621 // interpreted as the implication:
622 //
623 // !mCxxStackFrames.empty() => MessageChannel code on C++ stack
624 //
625 // This member is only accessed on the worker thread, and so is not
626 // protected by mMonitor. It is managed exclusively by the helper
627 // |class CxxStackFrame|.
628 mozilla::Vector<InterruptFrame> mCxxStackFrames;
630 // Did we process an Interrupt out-call during this stack? Only meaningful in
631 // ExitedCxxStack(), from which this variable is reset.
632 bool mSawInterruptOutMsg;
634 // Map of replies received "out of turn", because of Interrupt
635 // in-calls racing with replies to outstanding in-calls. See
636 // https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
637 MessageMap mOutOfTurnReplies;
639 // Stack of Interrupt in-calls that were deferred because of race
640 // conditions.
641 std::stack<Message> mDeferred;
643 #ifdef OS_WIN
644 HANDLE mEvent;
645 #endif
647 // Should the channel abort the process from the I/O thread when
648 // a channel error occurs?
649 bool mAbortOnError;
651 // See SetChannelFlags
652 ChannelFlags mFlags;
653 };
655 } // namespace ipc
656 } // namespace mozilla
658 #endif // ifndef ipc_glue_MessageChannel_h