netwerk/base/src/nsInputStreamPump.cpp

branch
TOR_BUG_9701
changeset 15
b8a032363ba2
equal deleted inserted replaced
-1:000000000000 0:f7a57f356f6d
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set ts=4 sts=4 sw=4 et cin: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include "nsIOService.h"
8 #include "nsInputStreamPump.h"
9 #include "nsIStreamTransportService.h"
10 #include "nsISeekableStream.h"
11 #include "nsITransport.h"
12 #include "nsIThreadRetargetableStreamListener.h"
13 #include "nsThreadUtils.h"
14 #include "nsCOMPtr.h"
15 #include "prlog.h"
16 #include "GeckoProfiler.h"
17 #include "nsIStreamListener.h"
18 #include "nsILoadGroup.h"
19 #include "nsNetCID.h"
20 #include <algorithm>
21
22 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
23
24 #if defined(PR_LOGGING)
25 //
26 // NSPR_LOG_MODULES=nsStreamPump:5
27 //
28 static PRLogModuleInfo *gStreamPumpLog = nullptr;
29 #endif
30 #undef LOG
31 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
32
33 //-----------------------------------------------------------------------------
34 // nsInputStreamPump methods
35 //-----------------------------------------------------------------------------
36
37 nsInputStreamPump::nsInputStreamPump()
38 : mState(STATE_IDLE)
39 , mStreamOffset(0)
40 , mStreamLength(UINT64_MAX)
41 , mStatus(NS_OK)
42 , mSuspendCount(0)
43 , mLoadFlags(LOAD_NORMAL)
44 , mProcessingCallbacks(false)
45 , mWaitingForInputStreamReady(false)
46 , mCloseWhenDone(false)
47 , mRetargeting(false)
48 , mMonitor("nsInputStreamPump")
49 {
50 #if defined(PR_LOGGING)
51 if (!gStreamPumpLog)
52 gStreamPumpLog = PR_NewLogModule("nsStreamPump");
53 #endif
54 }
55
56 nsInputStreamPump::~nsInputStreamPump()
57 {
58 }
59
60 nsresult
61 nsInputStreamPump::Create(nsInputStreamPump **result,
62 nsIInputStream *stream,
63 int64_t streamPos,
64 int64_t streamLen,
65 uint32_t segsize,
66 uint32_t segcount,
67 bool closeWhenDone)
68 {
69 nsresult rv = NS_ERROR_OUT_OF_MEMORY;
70 nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
71 if (pump) {
72 rv = pump->Init(stream, streamPos, streamLen,
73 segsize, segcount, closeWhenDone);
74 if (NS_SUCCEEDED(rv)) {
75 *result = nullptr;
76 pump.swap(*result);
77 }
78 }
79 return rv;
80 }
81
82 struct PeekData {
83 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
84 : mFunc(fun), mClosure(closure) {}
85
86 nsInputStreamPump::PeekSegmentFun mFunc;
87 void* mClosure;
88 };
89
90 static NS_METHOD
91 CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
92 const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
93 uint32_t *aWriteCount)
94 {
95 NS_ASSERTION(aToOffset == 0, "Called more than once?");
96 NS_ASSERTION(aCount > 0, "Called without data?");
97
98 PeekData* data = static_cast<PeekData*>(aClosure);
99 data->mFunc(data->mClosure,
100 reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
101 return NS_BINDING_ABORTED;
102 }
103
104 nsresult
105 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
106 {
107 ReentrantMonitorAutoEnter mon(mMonitor);
108
109 NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
110
111 // See if the pipe is closed by checking the return of Available.
112 uint64_t dummy64;
113 nsresult rv = mAsyncStream->Available(&dummy64);
114 if (NS_FAILED(rv))
115 return rv;
116 uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
117
118 PeekData data(callback, closure);
119 return mAsyncStream->ReadSegments(CallPeekFunc,
120 &data,
121 nsIOService::gDefaultSegmentSize,
122 &dummy);
123 }
124
125 nsresult
126 nsInputStreamPump::EnsureWaiting()
127 {
128 mMonitor.AssertCurrentThreadIn();
129
130 // no need to worry about multiple threads... an input stream pump lives
131 // on only one thread at a time.
132 MOZ_ASSERT(mAsyncStream);
133 if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
134 // Ensure OnStateStop is called on the main thread.
135 if (mState == STATE_STOP) {
136 nsCOMPtr<nsIThread> mainThread = do_GetMainThread();
137 if (mTargetThread != mainThread) {
138 mTargetThread = do_QueryInterface(mainThread);
139 }
140 }
141 MOZ_ASSERT(mTargetThread);
142 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
143 if (NS_FAILED(rv)) {
144 NS_ERROR("AsyncWait failed");
145 return rv;
146 }
147 // Any retargeting during STATE_START or START_TRANSFER is complete
148 // after the call to AsyncWait; next callback wil be on mTargetThread.
149 mRetargeting = false;
150 mWaitingForInputStreamReady = true;
151 }
152 return NS_OK;
153 }
154
155 //-----------------------------------------------------------------------------
156 // nsInputStreamPump::nsISupports
157 //-----------------------------------------------------------------------------
158
159 // although this class can only be accessed from one thread at a time, we do
160 // allow its ownership to move from thread to thread, assuming the consumer
161 // understands the limitations of this.
162 NS_IMPL_ISUPPORTS(nsInputStreamPump,
163 nsIRequest,
164 nsIThreadRetargetableRequest,
165 nsIInputStreamCallback,
166 nsIInputStreamPump)
167
168 //-----------------------------------------------------------------------------
169 // nsInputStreamPump::nsIRequest
170 //-----------------------------------------------------------------------------
171
172 NS_IMETHODIMP
173 nsInputStreamPump::GetName(nsACString &result)
174 {
175 ReentrantMonitorAutoEnter mon(mMonitor);
176
177 result.Truncate();
178 return NS_OK;
179 }
180
181 NS_IMETHODIMP
182 nsInputStreamPump::IsPending(bool *result)
183 {
184 ReentrantMonitorAutoEnter mon(mMonitor);
185
186 *result = (mState != STATE_IDLE);
187 return NS_OK;
188 }
189
190 NS_IMETHODIMP
191 nsInputStreamPump::GetStatus(nsresult *status)
192 {
193 ReentrantMonitorAutoEnter mon(mMonitor);
194
195 *status = mStatus;
196 return NS_OK;
197 }
198
199 NS_IMETHODIMP
200 nsInputStreamPump::Cancel(nsresult status)
201 {
202 MOZ_ASSERT(NS_IsMainThread());
203
204 ReentrantMonitorAutoEnter mon(mMonitor);
205
206 LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n",
207 this, status));
208
209 if (NS_FAILED(mStatus)) {
210 LOG((" already canceled\n"));
211 return NS_OK;
212 }
213
214 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
215 mStatus = status;
216
217 // close input stream
218 if (mAsyncStream) {
219 mAsyncStream->CloseWithStatus(status);
220 if (mSuspendCount == 0)
221 EnsureWaiting();
222 // Otherwise, EnsureWaiting will be called by Resume().
223 // Note that while suspended, OnInputStreamReady will
224 // not do anything, and also note that calling asyncWait
225 // on a closed stream works and will dispatch an event immediately.
226 }
227 return NS_OK;
228 }
229
230 NS_IMETHODIMP
231 nsInputStreamPump::Suspend()
232 {
233 ReentrantMonitorAutoEnter mon(mMonitor);
234
235 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
236 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
237 ++mSuspendCount;
238 return NS_OK;
239 }
240
241 NS_IMETHODIMP
242 nsInputStreamPump::Resume()
243 {
244 ReentrantMonitorAutoEnter mon(mMonitor);
245
246 LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
247 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
248 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
249
250 if (--mSuspendCount == 0)
251 EnsureWaiting();
252 return NS_OK;
253 }
254
255 NS_IMETHODIMP
256 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
257 {
258 ReentrantMonitorAutoEnter mon(mMonitor);
259
260 *aLoadFlags = mLoadFlags;
261 return NS_OK;
262 }
263
264 NS_IMETHODIMP
265 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
266 {
267 ReentrantMonitorAutoEnter mon(mMonitor);
268
269 mLoadFlags = aLoadFlags;
270 return NS_OK;
271 }
272
273 NS_IMETHODIMP
274 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
275 {
276 ReentrantMonitorAutoEnter mon(mMonitor);
277
278 NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
279 return NS_OK;
280 }
281
282 NS_IMETHODIMP
283 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
284 {
285 ReentrantMonitorAutoEnter mon(mMonitor);
286
287 mLoadGroup = aLoadGroup;
288 return NS_OK;
289 }
290
291 //-----------------------------------------------------------------------------
292 // nsInputStreamPump::nsIInputStreamPump implementation
293 //-----------------------------------------------------------------------------
294
295 NS_IMETHODIMP
296 nsInputStreamPump::Init(nsIInputStream *stream,
297 int64_t streamPos, int64_t streamLen,
298 uint32_t segsize, uint32_t segcount,
299 bool closeWhenDone)
300 {
301 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
302
303 mStreamOffset = uint64_t(streamPos);
304 if (int64_t(streamLen) >= int64_t(0))
305 mStreamLength = uint64_t(streamLen);
306 mStream = stream;
307 mSegSize = segsize;
308 mSegCount = segcount;
309 mCloseWhenDone = closeWhenDone;
310
311 return NS_OK;
312 }
313
314 NS_IMETHODIMP
315 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
316 {
317 ReentrantMonitorAutoEnter mon(mMonitor);
318
319 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
320 NS_ENSURE_ARG_POINTER(listener);
321 MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
322 "main thread only.");
323
324 //
325 // OK, we need to use the stream transport service if
326 //
327 // (1) the stream is blocking
328 // (2) the stream does not support nsIAsyncInputStream
329 //
330
331 bool nonBlocking;
332 nsresult rv = mStream->IsNonBlocking(&nonBlocking);
333 if (NS_FAILED(rv)) return rv;
334
335 if (nonBlocking) {
336 mAsyncStream = do_QueryInterface(mStream);
337 //
338 // if the stream supports nsIAsyncInputStream, and if we need to seek
339 // to a starting offset, then we must do so here. in the non-async
340 // stream case, the stream transport service will take care of seeking
341 // for us.
342 //
343 if (mAsyncStream && (mStreamOffset != UINT64_MAX)) {
344 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
345 if (seekable)
346 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
347 }
348 }
349
350 if (!mAsyncStream) {
351 // ok, let's use the stream transport service to read this stream.
352 nsCOMPtr<nsIStreamTransportService> sts =
353 do_GetService(kStreamTransportServiceCID, &rv);
354 if (NS_FAILED(rv)) return rv;
355
356 nsCOMPtr<nsITransport> transport;
357 rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
358 mCloseWhenDone, getter_AddRefs(transport));
359 if (NS_FAILED(rv)) return rv;
360
361 nsCOMPtr<nsIInputStream> wrapper;
362 rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
363 if (NS_FAILED(rv)) return rv;
364
365 mAsyncStream = do_QueryInterface(wrapper, &rv);
366 if (NS_FAILED(rv)) return rv;
367 }
368
369 // release our reference to the original stream. from this point forward,
370 // we only reference the "stream" via mAsyncStream.
371 mStream = 0;
372
373 // mStreamOffset now holds the number of bytes currently read. we use this
374 // to enforce the mStreamLength restriction.
375 mStreamOffset = 0;
376
377 // grab event queue (we must do this here by contract, since all notifications
378 // must go to the thread which called AsyncRead)
379 mTargetThread = do_GetCurrentThread();
380 NS_ENSURE_STATE(mTargetThread);
381
382 rv = EnsureWaiting();
383 if (NS_FAILED(rv)) return rv;
384
385 if (mLoadGroup)
386 mLoadGroup->AddRequest(this, nullptr);
387
388 mState = STATE_START;
389 mListener = listener;
390 mListenerContext = ctxt;
391 return NS_OK;
392 }
393
394 //-----------------------------------------------------------------------------
395 // nsInputStreamPump::nsIInputStreamCallback implementation
396 //-----------------------------------------------------------------------------
397
398 NS_IMETHODIMP
399 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
400 {
401 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
402
403 PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady");
404 // this function has been called from a PLEvent, so we can safely call
405 // any listener or progress sink methods directly from here.
406
407 for (;;) {
408 // There should only be one iteration of this loop happening at a time.
409 // To prevent AsyncWait() (called during callbacks or on other threads)
410 // from creating a parallel OnInputStreamReady(), we use:
411 // -- a monitor; and
412 // -- a boolean mProcessingCallbacks to detect parallel loops
413 // when exiting the monitor for callbacks.
414 ReentrantMonitorAutoEnter lock(mMonitor);
415
416 // Prevent parallel execution during callbacks, while out of monitor.
417 if (mProcessingCallbacks) {
418 MOZ_ASSERT(!mProcessingCallbacks);
419 break;
420 }
421 mProcessingCallbacks = true;
422 if (mSuspendCount || mState == STATE_IDLE) {
423 mWaitingForInputStreamReady = false;
424 mProcessingCallbacks = false;
425 break;
426 }
427
428 uint32_t nextState;
429 switch (mState) {
430 case STATE_START:
431 nextState = OnStateStart();
432 break;
433 case STATE_TRANSFER:
434 nextState = OnStateTransfer();
435 break;
436 case STATE_STOP:
437 mRetargeting = false;
438 nextState = OnStateStop();
439 break;
440 default:
441 nextState = 0;
442 NS_NOTREACHED("Unknown enum value.");
443 return NS_ERROR_UNEXPECTED;
444 }
445
446 bool stillTransferring = (mState == STATE_TRANSFER &&
447 nextState == STATE_TRANSFER);
448 if (stillTransferring) {
449 NS_ASSERTION(NS_SUCCEEDED(mStatus),
450 "Should not have failed status for ongoing transfer");
451 } else {
452 NS_ASSERTION(mState != nextState,
453 "Only OnStateTransfer can be called more than once.");
454 }
455 if (mRetargeting) {
456 NS_ASSERTION(mState != STATE_STOP,
457 "Retargeting should not happen during OnStateStop.");
458 }
459
460 // Set mRetargeting so EnsureWaiting will be called. It ensures that
461 // OnStateStop is called on the main thread.
462 if (nextState == STATE_STOP && !NS_IsMainThread()) {
463 mRetargeting = true;
464 }
465
466 // Unset mProcessingCallbacks here (while we have lock) so our own call to
467 // EnsureWaiting isn't blocked by it.
468 mProcessingCallbacks = false;
469
470 // Wait asynchronously if there is still data to transfer, or we're
471 // switching event delivery to another thread.
472 if (!mSuspendCount && (stillTransferring || mRetargeting)) {
473 mState = nextState;
474 mWaitingForInputStreamReady = false;
475 nsresult rv = EnsureWaiting();
476 if (NS_SUCCEEDED(rv))
477 break;
478
479 // Failure to start asynchronous wait: stop transfer.
480 // Do not set mStatus if it was previously set to report a failure.
481 if (NS_SUCCEEDED(mStatus)) {
482 mStatus = rv;
483 }
484 nextState = STATE_STOP;
485 }
486
487 mState = nextState;
488 }
489 return NS_OK;
490 }
491
492 uint32_t
493 nsInputStreamPump::OnStateStart()
494 {
495 mMonitor.AssertCurrentThreadIn();
496
497 PROFILER_LABEL("nsInputStreamPump", "OnStateStart");
498 LOG((" OnStateStart [this=%p]\n", this));
499
500 nsresult rv;
501
502 // need to check the reason why the stream is ready. this is required
503 // so our listener can check our status from OnStartRequest.
504 // XXX async streams should have a GetStatus method!
505 if (NS_SUCCEEDED(mStatus)) {
506 uint64_t avail;
507 rv = mAsyncStream->Available(&avail);
508 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
509 mStatus = rv;
510 }
511
512 {
513 // Note: Must exit monitor for call to OnStartRequest to avoid
514 // deadlocks when calls to RetargetDeliveryTo for multiple
515 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
516 mMonitor.Exit();
517 rv = mListener->OnStartRequest(this, mListenerContext);
518 mMonitor.Enter();
519 }
520
521 // an error returned from OnStartRequest should cause us to abort; however,
522 // we must not stomp on mStatus if already canceled.
523 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
524 mStatus = rv;
525
526 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
527 }
528
529 uint32_t
530 nsInputStreamPump::OnStateTransfer()
531 {
532 mMonitor.AssertCurrentThreadIn();
533
534 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
535 LOG((" OnStateTransfer [this=%p]\n", this));
536
537 // if canceled, go directly to STATE_STOP...
538 if (NS_FAILED(mStatus))
539 return STATE_STOP;
540
541 nsresult rv;
542
543 uint64_t avail;
544 rv = mAsyncStream->Available(&avail);
545 LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail));
546
547 if (rv == NS_BASE_STREAM_CLOSED) {
548 rv = NS_OK;
549 avail = 0;
550 }
551 else if (NS_SUCCEEDED(rv) && avail) {
552 // figure out how much data to report (XXX detect overflow??)
553 if (avail > mStreamLength - mStreamOffset)
554 avail = mStreamLength - mStreamOffset;
555
556 if (avail) {
557 // we used to limit avail to 16K - we were afraid some ODA handlers
558 // might assume they wouldn't get more than 16K at once
559 // we're removing that limit since it speeds up local file access.
560 // Now there's an implicit 64K limit of 4 16K segments
561 // NOTE: ok, so the story is as follows. OnDataAvailable impls
562 // are by contract supposed to consume exactly |avail| bytes.
563 // however, many do not... mailnews... stream converters...
564 // cough, cough. the input stream pump is fairly tolerant
565 // in this regard; however, if an ODA does not consume any
566 // data from the stream, then we could potentially end up in
567 // an infinite loop. we do our best here to try to catch
568 // such an error. (see bug 189672)
569
570 // in most cases this QI will succeed (mAsyncStream is almost always
571 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
572 int64_t offsetBefore;
573 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
574 if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
575 NS_NOTREACHED("Tell failed on readable stream");
576 offsetBefore = 0;
577 }
578
579 uint32_t odaAvail =
580 avail > UINT32_MAX ?
581 UINT32_MAX : uint32_t(avail);
582
583 LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n",
584 mStreamOffset, avail, odaAvail));
585
586 {
587 // Note: Must exit monitor for call to OnStartRequest to avoid
588 // deadlocks when calls to RetargetDeliveryTo for multiple
589 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
590 mMonitor.Exit();
591 rv = mListener->OnDataAvailable(this, mListenerContext,
592 mAsyncStream, mStreamOffset,
593 odaAvail);
594 mMonitor.Enter();
595 }
596
597 // don't enter this code if ODA failed or called Cancel
598 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
599 // test to see if this ODA failed to consume data
600 if (seekable) {
601 // NOTE: if Tell fails, which can happen if the stream is
602 // now closed, then we assume that everything was read.
603 int64_t offsetAfter;
604 if (NS_FAILED(seekable->Tell(&offsetAfter)))
605 offsetAfter = offsetBefore + odaAvail;
606 if (offsetAfter > offsetBefore)
607 mStreamOffset += (offsetAfter - offsetBefore);
608 else if (mSuspendCount == 0) {
609 //
610 // possible infinite loop if we continue pumping data!
611 //
612 // NOTE: although not allowed by nsIStreamListener, we
613 // will allow the ODA impl to Suspend the pump. IMAP
614 // does this :-(
615 //
616 NS_ERROR("OnDataAvailable implementation consumed no data");
617 mStatus = NS_ERROR_UNEXPECTED;
618 }
619 }
620 else
621 mStreamOffset += odaAvail; // assume ODA behaved well
622 }
623 }
624 }
625
626 // an error returned from Available or OnDataAvailable should cause us to
627 // abort; however, we must not stomp on mStatus if already canceled.
628
629 if (NS_SUCCEEDED(mStatus)) {
630 if (NS_FAILED(rv))
631 mStatus = rv;
632 else if (avail) {
633 // if stream is now closed, advance to STATE_STOP right away.
634 // Available may return 0 bytes available at the moment; that
635 // would not mean that we are done.
636 // XXX async streams should have a GetStatus method!
637 rv = mAsyncStream->Available(&avail);
638 if (NS_SUCCEEDED(rv))
639 return STATE_TRANSFER;
640 if (rv != NS_BASE_STREAM_CLOSED)
641 mStatus = rv;
642 }
643 }
644 return STATE_STOP;
645 }
646
647 nsresult
648 nsInputStreamPump::CallOnStateStop()
649 {
650 ReentrantMonitorAutoEnter mon(mMonitor);
651
652 MOZ_ASSERT(NS_IsMainThread(),
653 "CallOnStateStop should only be called on the main thread.");
654
655 mState = OnStateStop();
656 return NS_OK;
657 }
658
659 uint32_t
660 nsInputStreamPump::OnStateStop()
661 {
662 mMonitor.AssertCurrentThreadIn();
663
664 if (!NS_IsMainThread()) {
665 // Hopefully temporary hack: OnStateStop should only run on the main
666 // thread, but we're seeing some rare off-main-thread calls. For now
667 // just redispatch to the main thread in release builds, and crash in
668 // debug builds.
669 MOZ_ASSERT(NS_IsMainThread(),
670 "OnStateStop should only be called on the main thread.");
671 nsresult rv = NS_DispatchToMainThread(
672 NS_NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop));
673 NS_ENSURE_SUCCESS(rv, STATE_IDLE);
674 return STATE_IDLE;
675 }
676
677 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
678 LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus));
679
680 // if an error occurred, we must be sure to pass the error onto the async
681 // stream. in some cases, this is redundant, but since close is idempotent,
682 // this is OK. otherwise, be sure to honor the "close-when-done" option.
683
684 if (!mAsyncStream || !mListener) {
685 MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
686 MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
687 return STATE_IDLE;
688 }
689
690 if (NS_FAILED(mStatus))
691 mAsyncStream->CloseWithStatus(mStatus);
692 else if (mCloseWhenDone)
693 mAsyncStream->Close();
694
695 mAsyncStream = 0;
696 mTargetThread = 0;
697 mIsPending = false;
698 {
699 // Note: Must exit monitor for call to OnStartRequest to avoid
700 // deadlocks when calls to RetargetDeliveryTo for multiple
701 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
702 mMonitor.Exit();
703 mListener->OnStopRequest(this, mListenerContext, mStatus);
704 mMonitor.Enter();
705 }
706 mListener = 0;
707 mListenerContext = 0;
708
709 if (mLoadGroup)
710 mLoadGroup->RemoveRequest(this, nullptr, mStatus);
711
712 return STATE_IDLE;
713 }
714
715 //-----------------------------------------------------------------------------
716 // nsIThreadRetargetableRequest
717 //-----------------------------------------------------------------------------
718
719 NS_IMETHODIMP
720 nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
721 {
722 ReentrantMonitorAutoEnter mon(mMonitor);
723
724 NS_ENSURE_ARG(aNewTarget);
725 NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
726 NS_ERROR_UNEXPECTED);
727
728 // If canceled, do not retarget. Return with canceled status.
729 if (NS_FAILED(mStatus)) {
730 return mStatus;
731 }
732
733 if (aNewTarget == mTargetThread) {
734 NS_WARNING("Retargeting delivery to same thread");
735 return NS_OK;
736 }
737
738 // Ensure that |mListener| and any subsequent listeners can be retargeted
739 // to another thread.
740 nsresult rv = NS_OK;
741 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
742 do_QueryInterface(mListener, &rv);
743 if (NS_SUCCEEDED(rv) && retargetableListener) {
744 rv = retargetableListener->CheckListenerChain();
745 if (NS_SUCCEEDED(rv)) {
746 mTargetThread = aNewTarget;
747 mRetargeting = true;
748 }
749 }
750 LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] "
751 "%s listener [%p] rv[%x]",
752 this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
753 (nsIStreamListener*)mListener, rv));
754 return rv;
755 }

mercurial