|
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
|
2 /* vim:set ts=4 sts=4 sw=4 et cin: */ |
|
3 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
4 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
6 |
|
7 #include "nsIOService.h" |
|
8 #include "nsInputStreamPump.h" |
|
9 #include "nsIStreamTransportService.h" |
|
10 #include "nsISeekableStream.h" |
|
11 #include "nsITransport.h" |
|
12 #include "nsIThreadRetargetableStreamListener.h" |
|
13 #include "nsThreadUtils.h" |
|
14 #include "nsCOMPtr.h" |
|
15 #include "prlog.h" |
|
16 #include "GeckoProfiler.h" |
|
17 #include "nsIStreamListener.h" |
|
18 #include "nsILoadGroup.h" |
|
19 #include "nsNetCID.h" |
|
20 #include <algorithm> |
|
21 |
|
22 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); |
|
23 |
|
24 #if defined(PR_LOGGING) |
|
25 // |
|
26 // NSPR_LOG_MODULES=nsStreamPump:5 |
|
27 // |
|
28 static PRLogModuleInfo *gStreamPumpLog = nullptr; |
|
29 #endif |
|
30 #undef LOG |
|
31 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args) |
|
32 |
|
33 //----------------------------------------------------------------------------- |
|
34 // nsInputStreamPump methods |
|
35 //----------------------------------------------------------------------------- |
|
36 |
|
37 nsInputStreamPump::nsInputStreamPump() |
|
38 : mState(STATE_IDLE) |
|
39 , mStreamOffset(0) |
|
40 , mStreamLength(UINT64_MAX) |
|
41 , mStatus(NS_OK) |
|
42 , mSuspendCount(0) |
|
43 , mLoadFlags(LOAD_NORMAL) |
|
44 , mProcessingCallbacks(false) |
|
45 , mWaitingForInputStreamReady(false) |
|
46 , mCloseWhenDone(false) |
|
47 , mRetargeting(false) |
|
48 , mMonitor("nsInputStreamPump") |
|
49 { |
|
50 #if defined(PR_LOGGING) |
|
51 if (!gStreamPumpLog) |
|
52 gStreamPumpLog = PR_NewLogModule("nsStreamPump"); |
|
53 #endif |
|
54 } |
|
55 |
|
56 nsInputStreamPump::~nsInputStreamPump() |
|
57 { |
|
58 } |
|
59 |
|
60 nsresult |
|
61 nsInputStreamPump::Create(nsInputStreamPump **result, |
|
62 nsIInputStream *stream, |
|
63 int64_t streamPos, |
|
64 int64_t streamLen, |
|
65 uint32_t segsize, |
|
66 uint32_t segcount, |
|
67 bool closeWhenDone) |
|
68 { |
|
69 nsresult rv = NS_ERROR_OUT_OF_MEMORY; |
|
70 nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump(); |
|
71 if (pump) { |
|
72 rv = pump->Init(stream, streamPos, streamLen, |
|
73 segsize, segcount, closeWhenDone); |
|
74 if (NS_SUCCEEDED(rv)) { |
|
75 *result = nullptr; |
|
76 pump.swap(*result); |
|
77 } |
|
78 } |
|
79 return rv; |
|
80 } |
|
81 |
|
82 struct PeekData { |
|
83 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure) |
|
84 : mFunc(fun), mClosure(closure) {} |
|
85 |
|
86 nsInputStreamPump::PeekSegmentFun mFunc; |
|
87 void* mClosure; |
|
88 }; |
|
89 |
|
90 static NS_METHOD |
|
91 CallPeekFunc(nsIInputStream *aInStream, void *aClosure, |
|
92 const char *aFromSegment, uint32_t aToOffset, uint32_t aCount, |
|
93 uint32_t *aWriteCount) |
|
94 { |
|
95 NS_ASSERTION(aToOffset == 0, "Called more than once?"); |
|
96 NS_ASSERTION(aCount > 0, "Called without data?"); |
|
97 |
|
98 PeekData* data = static_cast<PeekData*>(aClosure); |
|
99 data->mFunc(data->mClosure, |
|
100 reinterpret_cast<const uint8_t*>(aFromSegment), aCount); |
|
101 return NS_BINDING_ABORTED; |
|
102 } |
|
103 |
|
104 nsresult |
|
105 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) |
|
106 { |
|
107 ReentrantMonitorAutoEnter mon(mMonitor); |
|
108 |
|
109 NS_ASSERTION(mAsyncStream, "PeekStream called without stream"); |
|
110 |
|
111 // See if the pipe is closed by checking the return of Available. |
|
112 uint64_t dummy64; |
|
113 nsresult rv = mAsyncStream->Available(&dummy64); |
|
114 if (NS_FAILED(rv)) |
|
115 return rv; |
|
116 uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX); |
|
117 |
|
118 PeekData data(callback, closure); |
|
119 return mAsyncStream->ReadSegments(CallPeekFunc, |
|
120 &data, |
|
121 nsIOService::gDefaultSegmentSize, |
|
122 &dummy); |
|
123 } |
|
124 |
|
125 nsresult |
|
126 nsInputStreamPump::EnsureWaiting() |
|
127 { |
|
128 mMonitor.AssertCurrentThreadIn(); |
|
129 |
|
130 // no need to worry about multiple threads... an input stream pump lives |
|
131 // on only one thread at a time. |
|
132 MOZ_ASSERT(mAsyncStream); |
|
133 if (!mWaitingForInputStreamReady && !mProcessingCallbacks) { |
|
134 // Ensure OnStateStop is called on the main thread. |
|
135 if (mState == STATE_STOP) { |
|
136 nsCOMPtr<nsIThread> mainThread = do_GetMainThread(); |
|
137 if (mTargetThread != mainThread) { |
|
138 mTargetThread = do_QueryInterface(mainThread); |
|
139 } |
|
140 } |
|
141 MOZ_ASSERT(mTargetThread); |
|
142 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread); |
|
143 if (NS_FAILED(rv)) { |
|
144 NS_ERROR("AsyncWait failed"); |
|
145 return rv; |
|
146 } |
|
147 // Any retargeting during STATE_START or START_TRANSFER is complete |
|
148 // after the call to AsyncWait; next callback wil be on mTargetThread. |
|
149 mRetargeting = false; |
|
150 mWaitingForInputStreamReady = true; |
|
151 } |
|
152 return NS_OK; |
|
153 } |
|
154 |
|
155 //----------------------------------------------------------------------------- |
|
156 // nsInputStreamPump::nsISupports |
|
157 //----------------------------------------------------------------------------- |
|
158 |
|
159 // although this class can only be accessed from one thread at a time, we do |
|
160 // allow its ownership to move from thread to thread, assuming the consumer |
|
161 // understands the limitations of this. |
|
162 NS_IMPL_ISUPPORTS(nsInputStreamPump, |
|
163 nsIRequest, |
|
164 nsIThreadRetargetableRequest, |
|
165 nsIInputStreamCallback, |
|
166 nsIInputStreamPump) |
|
167 |
|
168 //----------------------------------------------------------------------------- |
|
169 // nsInputStreamPump::nsIRequest |
|
170 //----------------------------------------------------------------------------- |
|
171 |
|
172 NS_IMETHODIMP |
|
173 nsInputStreamPump::GetName(nsACString &result) |
|
174 { |
|
175 ReentrantMonitorAutoEnter mon(mMonitor); |
|
176 |
|
177 result.Truncate(); |
|
178 return NS_OK; |
|
179 } |
|
180 |
|
181 NS_IMETHODIMP |
|
182 nsInputStreamPump::IsPending(bool *result) |
|
183 { |
|
184 ReentrantMonitorAutoEnter mon(mMonitor); |
|
185 |
|
186 *result = (mState != STATE_IDLE); |
|
187 return NS_OK; |
|
188 } |
|
189 |
|
190 NS_IMETHODIMP |
|
191 nsInputStreamPump::GetStatus(nsresult *status) |
|
192 { |
|
193 ReentrantMonitorAutoEnter mon(mMonitor); |
|
194 |
|
195 *status = mStatus; |
|
196 return NS_OK; |
|
197 } |
|
198 |
|
199 NS_IMETHODIMP |
|
200 nsInputStreamPump::Cancel(nsresult status) |
|
201 { |
|
202 MOZ_ASSERT(NS_IsMainThread()); |
|
203 |
|
204 ReentrantMonitorAutoEnter mon(mMonitor); |
|
205 |
|
206 LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n", |
|
207 this, status)); |
|
208 |
|
209 if (NS_FAILED(mStatus)) { |
|
210 LOG((" already canceled\n")); |
|
211 return NS_OK; |
|
212 } |
|
213 |
|
214 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code"); |
|
215 mStatus = status; |
|
216 |
|
217 // close input stream |
|
218 if (mAsyncStream) { |
|
219 mAsyncStream->CloseWithStatus(status); |
|
220 if (mSuspendCount == 0) |
|
221 EnsureWaiting(); |
|
222 // Otherwise, EnsureWaiting will be called by Resume(). |
|
223 // Note that while suspended, OnInputStreamReady will |
|
224 // not do anything, and also note that calling asyncWait |
|
225 // on a closed stream works and will dispatch an event immediately. |
|
226 } |
|
227 return NS_OK; |
|
228 } |
|
229 |
|
230 NS_IMETHODIMP |
|
231 nsInputStreamPump::Suspend() |
|
232 { |
|
233 ReentrantMonitorAutoEnter mon(mMonitor); |
|
234 |
|
235 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this)); |
|
236 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); |
|
237 ++mSuspendCount; |
|
238 return NS_OK; |
|
239 } |
|
240 |
|
241 NS_IMETHODIMP |
|
242 nsInputStreamPump::Resume() |
|
243 { |
|
244 ReentrantMonitorAutoEnter mon(mMonitor); |
|
245 |
|
246 LOG(("nsInputStreamPump::Resume [this=%p]\n", this)); |
|
247 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED); |
|
248 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); |
|
249 |
|
250 if (--mSuspendCount == 0) |
|
251 EnsureWaiting(); |
|
252 return NS_OK; |
|
253 } |
|
254 |
|
255 NS_IMETHODIMP |
|
256 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags) |
|
257 { |
|
258 ReentrantMonitorAutoEnter mon(mMonitor); |
|
259 |
|
260 *aLoadFlags = mLoadFlags; |
|
261 return NS_OK; |
|
262 } |
|
263 |
|
264 NS_IMETHODIMP |
|
265 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) |
|
266 { |
|
267 ReentrantMonitorAutoEnter mon(mMonitor); |
|
268 |
|
269 mLoadFlags = aLoadFlags; |
|
270 return NS_OK; |
|
271 } |
|
272 |
|
273 NS_IMETHODIMP |
|
274 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup) |
|
275 { |
|
276 ReentrantMonitorAutoEnter mon(mMonitor); |
|
277 |
|
278 NS_IF_ADDREF(*aLoadGroup = mLoadGroup); |
|
279 return NS_OK; |
|
280 } |
|
281 |
|
282 NS_IMETHODIMP |
|
283 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup) |
|
284 { |
|
285 ReentrantMonitorAutoEnter mon(mMonitor); |
|
286 |
|
287 mLoadGroup = aLoadGroup; |
|
288 return NS_OK; |
|
289 } |
|
290 |
|
291 //----------------------------------------------------------------------------- |
|
292 // nsInputStreamPump::nsIInputStreamPump implementation |
|
293 //----------------------------------------------------------------------------- |
|
294 |
|
295 NS_IMETHODIMP |
|
296 nsInputStreamPump::Init(nsIInputStream *stream, |
|
297 int64_t streamPos, int64_t streamLen, |
|
298 uint32_t segsize, uint32_t segcount, |
|
299 bool closeWhenDone) |
|
300 { |
|
301 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); |
|
302 |
|
303 mStreamOffset = uint64_t(streamPos); |
|
304 if (int64_t(streamLen) >= int64_t(0)) |
|
305 mStreamLength = uint64_t(streamLen); |
|
306 mStream = stream; |
|
307 mSegSize = segsize; |
|
308 mSegCount = segcount; |
|
309 mCloseWhenDone = closeWhenDone; |
|
310 |
|
311 return NS_OK; |
|
312 } |
|
313 |
|
314 NS_IMETHODIMP |
|
315 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt) |
|
316 { |
|
317 ReentrantMonitorAutoEnter mon(mMonitor); |
|
318 |
|
319 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); |
|
320 NS_ENSURE_ARG_POINTER(listener); |
|
321 MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the " |
|
322 "main thread only."); |
|
323 |
|
324 // |
|
325 // OK, we need to use the stream transport service if |
|
326 // |
|
327 // (1) the stream is blocking |
|
328 // (2) the stream does not support nsIAsyncInputStream |
|
329 // |
|
330 |
|
331 bool nonBlocking; |
|
332 nsresult rv = mStream->IsNonBlocking(&nonBlocking); |
|
333 if (NS_FAILED(rv)) return rv; |
|
334 |
|
335 if (nonBlocking) { |
|
336 mAsyncStream = do_QueryInterface(mStream); |
|
337 // |
|
338 // if the stream supports nsIAsyncInputStream, and if we need to seek |
|
339 // to a starting offset, then we must do so here. in the non-async |
|
340 // stream case, the stream transport service will take care of seeking |
|
341 // for us. |
|
342 // |
|
343 if (mAsyncStream && (mStreamOffset != UINT64_MAX)) { |
|
344 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream); |
|
345 if (seekable) |
|
346 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset); |
|
347 } |
|
348 } |
|
349 |
|
350 if (!mAsyncStream) { |
|
351 // ok, let's use the stream transport service to read this stream. |
|
352 nsCOMPtr<nsIStreamTransportService> sts = |
|
353 do_GetService(kStreamTransportServiceCID, &rv); |
|
354 if (NS_FAILED(rv)) return rv; |
|
355 |
|
356 nsCOMPtr<nsITransport> transport; |
|
357 rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength, |
|
358 mCloseWhenDone, getter_AddRefs(transport)); |
|
359 if (NS_FAILED(rv)) return rv; |
|
360 |
|
361 nsCOMPtr<nsIInputStream> wrapper; |
|
362 rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper)); |
|
363 if (NS_FAILED(rv)) return rv; |
|
364 |
|
365 mAsyncStream = do_QueryInterface(wrapper, &rv); |
|
366 if (NS_FAILED(rv)) return rv; |
|
367 } |
|
368 |
|
369 // release our reference to the original stream. from this point forward, |
|
370 // we only reference the "stream" via mAsyncStream. |
|
371 mStream = 0; |
|
372 |
|
373 // mStreamOffset now holds the number of bytes currently read. we use this |
|
374 // to enforce the mStreamLength restriction. |
|
375 mStreamOffset = 0; |
|
376 |
|
377 // grab event queue (we must do this here by contract, since all notifications |
|
378 // must go to the thread which called AsyncRead) |
|
379 mTargetThread = do_GetCurrentThread(); |
|
380 NS_ENSURE_STATE(mTargetThread); |
|
381 |
|
382 rv = EnsureWaiting(); |
|
383 if (NS_FAILED(rv)) return rv; |
|
384 |
|
385 if (mLoadGroup) |
|
386 mLoadGroup->AddRequest(this, nullptr); |
|
387 |
|
388 mState = STATE_START; |
|
389 mListener = listener; |
|
390 mListenerContext = ctxt; |
|
391 return NS_OK; |
|
392 } |
|
393 |
|
394 //----------------------------------------------------------------------------- |
|
395 // nsInputStreamPump::nsIInputStreamCallback implementation |
|
396 //----------------------------------------------------------------------------- |
|
397 |
|
398 NS_IMETHODIMP |
|
399 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream) |
|
400 { |
|
401 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this)); |
|
402 |
|
403 PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady"); |
|
404 // this function has been called from a PLEvent, so we can safely call |
|
405 // any listener or progress sink methods directly from here. |
|
406 |
|
407 for (;;) { |
|
408 // There should only be one iteration of this loop happening at a time. |
|
409 // To prevent AsyncWait() (called during callbacks or on other threads) |
|
410 // from creating a parallel OnInputStreamReady(), we use: |
|
411 // -- a monitor; and |
|
412 // -- a boolean mProcessingCallbacks to detect parallel loops |
|
413 // when exiting the monitor for callbacks. |
|
414 ReentrantMonitorAutoEnter lock(mMonitor); |
|
415 |
|
416 // Prevent parallel execution during callbacks, while out of monitor. |
|
417 if (mProcessingCallbacks) { |
|
418 MOZ_ASSERT(!mProcessingCallbacks); |
|
419 break; |
|
420 } |
|
421 mProcessingCallbacks = true; |
|
422 if (mSuspendCount || mState == STATE_IDLE) { |
|
423 mWaitingForInputStreamReady = false; |
|
424 mProcessingCallbacks = false; |
|
425 break; |
|
426 } |
|
427 |
|
428 uint32_t nextState; |
|
429 switch (mState) { |
|
430 case STATE_START: |
|
431 nextState = OnStateStart(); |
|
432 break; |
|
433 case STATE_TRANSFER: |
|
434 nextState = OnStateTransfer(); |
|
435 break; |
|
436 case STATE_STOP: |
|
437 mRetargeting = false; |
|
438 nextState = OnStateStop(); |
|
439 break; |
|
440 default: |
|
441 nextState = 0; |
|
442 NS_NOTREACHED("Unknown enum value."); |
|
443 return NS_ERROR_UNEXPECTED; |
|
444 } |
|
445 |
|
446 bool stillTransferring = (mState == STATE_TRANSFER && |
|
447 nextState == STATE_TRANSFER); |
|
448 if (stillTransferring) { |
|
449 NS_ASSERTION(NS_SUCCEEDED(mStatus), |
|
450 "Should not have failed status for ongoing transfer"); |
|
451 } else { |
|
452 NS_ASSERTION(mState != nextState, |
|
453 "Only OnStateTransfer can be called more than once."); |
|
454 } |
|
455 if (mRetargeting) { |
|
456 NS_ASSERTION(mState != STATE_STOP, |
|
457 "Retargeting should not happen during OnStateStop."); |
|
458 } |
|
459 |
|
460 // Set mRetargeting so EnsureWaiting will be called. It ensures that |
|
461 // OnStateStop is called on the main thread. |
|
462 if (nextState == STATE_STOP && !NS_IsMainThread()) { |
|
463 mRetargeting = true; |
|
464 } |
|
465 |
|
466 // Unset mProcessingCallbacks here (while we have lock) so our own call to |
|
467 // EnsureWaiting isn't blocked by it. |
|
468 mProcessingCallbacks = false; |
|
469 |
|
470 // Wait asynchronously if there is still data to transfer, or we're |
|
471 // switching event delivery to another thread. |
|
472 if (!mSuspendCount && (stillTransferring || mRetargeting)) { |
|
473 mState = nextState; |
|
474 mWaitingForInputStreamReady = false; |
|
475 nsresult rv = EnsureWaiting(); |
|
476 if (NS_SUCCEEDED(rv)) |
|
477 break; |
|
478 |
|
479 // Failure to start asynchronous wait: stop transfer. |
|
480 // Do not set mStatus if it was previously set to report a failure. |
|
481 if (NS_SUCCEEDED(mStatus)) { |
|
482 mStatus = rv; |
|
483 } |
|
484 nextState = STATE_STOP; |
|
485 } |
|
486 |
|
487 mState = nextState; |
|
488 } |
|
489 return NS_OK; |
|
490 } |
|
491 |
|
492 uint32_t |
|
493 nsInputStreamPump::OnStateStart() |
|
494 { |
|
495 mMonitor.AssertCurrentThreadIn(); |
|
496 |
|
497 PROFILER_LABEL("nsInputStreamPump", "OnStateStart"); |
|
498 LOG((" OnStateStart [this=%p]\n", this)); |
|
499 |
|
500 nsresult rv; |
|
501 |
|
502 // need to check the reason why the stream is ready. this is required |
|
503 // so our listener can check our status from OnStartRequest. |
|
504 // XXX async streams should have a GetStatus method! |
|
505 if (NS_SUCCEEDED(mStatus)) { |
|
506 uint64_t avail; |
|
507 rv = mAsyncStream->Available(&avail); |
|
508 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) |
|
509 mStatus = rv; |
|
510 } |
|
511 |
|
512 { |
|
513 // Note: Must exit monitor for call to OnStartRequest to avoid |
|
514 // deadlocks when calls to RetargetDeliveryTo for multiple |
|
515 // nsInputStreamPumps are needed (e.g. nsHttpChannel). |
|
516 mMonitor.Exit(); |
|
517 rv = mListener->OnStartRequest(this, mListenerContext); |
|
518 mMonitor.Enter(); |
|
519 } |
|
520 |
|
521 // an error returned from OnStartRequest should cause us to abort; however, |
|
522 // we must not stomp on mStatus if already canceled. |
|
523 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) |
|
524 mStatus = rv; |
|
525 |
|
526 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP; |
|
527 } |
|
528 |
|
529 uint32_t |
|
530 nsInputStreamPump::OnStateTransfer() |
|
531 { |
|
532 mMonitor.AssertCurrentThreadIn(); |
|
533 |
|
534 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); |
|
535 LOG((" OnStateTransfer [this=%p]\n", this)); |
|
536 |
|
537 // if canceled, go directly to STATE_STOP... |
|
538 if (NS_FAILED(mStatus)) |
|
539 return STATE_STOP; |
|
540 |
|
541 nsresult rv; |
|
542 |
|
543 uint64_t avail; |
|
544 rv = mAsyncStream->Available(&avail); |
|
545 LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail)); |
|
546 |
|
547 if (rv == NS_BASE_STREAM_CLOSED) { |
|
548 rv = NS_OK; |
|
549 avail = 0; |
|
550 } |
|
551 else if (NS_SUCCEEDED(rv) && avail) { |
|
552 // figure out how much data to report (XXX detect overflow??) |
|
553 if (avail > mStreamLength - mStreamOffset) |
|
554 avail = mStreamLength - mStreamOffset; |
|
555 |
|
556 if (avail) { |
|
557 // we used to limit avail to 16K - we were afraid some ODA handlers |
|
558 // might assume they wouldn't get more than 16K at once |
|
559 // we're removing that limit since it speeds up local file access. |
|
560 // Now there's an implicit 64K limit of 4 16K segments |
|
561 // NOTE: ok, so the story is as follows. OnDataAvailable impls |
|
562 // are by contract supposed to consume exactly |avail| bytes. |
|
563 // however, many do not... mailnews... stream converters... |
|
564 // cough, cough. the input stream pump is fairly tolerant |
|
565 // in this regard; however, if an ODA does not consume any |
|
566 // data from the stream, then we could potentially end up in |
|
567 // an infinite loop. we do our best here to try to catch |
|
568 // such an error. (see bug 189672) |
|
569 |
|
570 // in most cases this QI will succeed (mAsyncStream is almost always |
|
571 // a nsPipeInputStream, which implements nsISeekableStream::Tell). |
|
572 int64_t offsetBefore; |
|
573 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream); |
|
574 if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) { |
|
575 NS_NOTREACHED("Tell failed on readable stream"); |
|
576 offsetBefore = 0; |
|
577 } |
|
578 |
|
579 uint32_t odaAvail = |
|
580 avail > UINT32_MAX ? |
|
581 UINT32_MAX : uint32_t(avail); |
|
582 |
|
583 LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n", |
|
584 mStreamOffset, avail, odaAvail)); |
|
585 |
|
586 { |
|
587 // Note: Must exit monitor for call to OnStartRequest to avoid |
|
588 // deadlocks when calls to RetargetDeliveryTo for multiple |
|
589 // nsInputStreamPumps are needed (e.g. nsHttpChannel). |
|
590 mMonitor.Exit(); |
|
591 rv = mListener->OnDataAvailable(this, mListenerContext, |
|
592 mAsyncStream, mStreamOffset, |
|
593 odaAvail); |
|
594 mMonitor.Enter(); |
|
595 } |
|
596 |
|
597 // don't enter this code if ODA failed or called Cancel |
|
598 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) { |
|
599 // test to see if this ODA failed to consume data |
|
600 if (seekable) { |
|
601 // NOTE: if Tell fails, which can happen if the stream is |
|
602 // now closed, then we assume that everything was read. |
|
603 int64_t offsetAfter; |
|
604 if (NS_FAILED(seekable->Tell(&offsetAfter))) |
|
605 offsetAfter = offsetBefore + odaAvail; |
|
606 if (offsetAfter > offsetBefore) |
|
607 mStreamOffset += (offsetAfter - offsetBefore); |
|
608 else if (mSuspendCount == 0) { |
|
609 // |
|
610 // possible infinite loop if we continue pumping data! |
|
611 // |
|
612 // NOTE: although not allowed by nsIStreamListener, we |
|
613 // will allow the ODA impl to Suspend the pump. IMAP |
|
614 // does this :-( |
|
615 // |
|
616 NS_ERROR("OnDataAvailable implementation consumed no data"); |
|
617 mStatus = NS_ERROR_UNEXPECTED; |
|
618 } |
|
619 } |
|
620 else |
|
621 mStreamOffset += odaAvail; // assume ODA behaved well |
|
622 } |
|
623 } |
|
624 } |
|
625 |
|
626 // an error returned from Available or OnDataAvailable should cause us to |
|
627 // abort; however, we must not stomp on mStatus if already canceled. |
|
628 |
|
629 if (NS_SUCCEEDED(mStatus)) { |
|
630 if (NS_FAILED(rv)) |
|
631 mStatus = rv; |
|
632 else if (avail) { |
|
633 // if stream is now closed, advance to STATE_STOP right away. |
|
634 // Available may return 0 bytes available at the moment; that |
|
635 // would not mean that we are done. |
|
636 // XXX async streams should have a GetStatus method! |
|
637 rv = mAsyncStream->Available(&avail); |
|
638 if (NS_SUCCEEDED(rv)) |
|
639 return STATE_TRANSFER; |
|
640 if (rv != NS_BASE_STREAM_CLOSED) |
|
641 mStatus = rv; |
|
642 } |
|
643 } |
|
644 return STATE_STOP; |
|
645 } |
|
646 |
|
647 nsresult |
|
648 nsInputStreamPump::CallOnStateStop() |
|
649 { |
|
650 ReentrantMonitorAutoEnter mon(mMonitor); |
|
651 |
|
652 MOZ_ASSERT(NS_IsMainThread(), |
|
653 "CallOnStateStop should only be called on the main thread."); |
|
654 |
|
655 mState = OnStateStop(); |
|
656 return NS_OK; |
|
657 } |
|
658 |
|
659 uint32_t |
|
660 nsInputStreamPump::OnStateStop() |
|
661 { |
|
662 mMonitor.AssertCurrentThreadIn(); |
|
663 |
|
664 if (!NS_IsMainThread()) { |
|
665 // Hopefully temporary hack: OnStateStop should only run on the main |
|
666 // thread, but we're seeing some rare off-main-thread calls. For now |
|
667 // just redispatch to the main thread in release builds, and crash in |
|
668 // debug builds. |
|
669 MOZ_ASSERT(NS_IsMainThread(), |
|
670 "OnStateStop should only be called on the main thread."); |
|
671 nsresult rv = NS_DispatchToMainThread( |
|
672 NS_NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop)); |
|
673 NS_ENSURE_SUCCESS(rv, STATE_IDLE); |
|
674 return STATE_IDLE; |
|
675 } |
|
676 |
|
677 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); |
|
678 LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus)); |
|
679 |
|
680 // if an error occurred, we must be sure to pass the error onto the async |
|
681 // stream. in some cases, this is redundant, but since close is idempotent, |
|
682 // this is OK. otherwise, be sure to honor the "close-when-done" option. |
|
683 |
|
684 if (!mAsyncStream || !mListener) { |
|
685 MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?"); |
|
686 MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?"); |
|
687 return STATE_IDLE; |
|
688 } |
|
689 |
|
690 if (NS_FAILED(mStatus)) |
|
691 mAsyncStream->CloseWithStatus(mStatus); |
|
692 else if (mCloseWhenDone) |
|
693 mAsyncStream->Close(); |
|
694 |
|
695 mAsyncStream = 0; |
|
696 mTargetThread = 0; |
|
697 mIsPending = false; |
|
698 { |
|
699 // Note: Must exit monitor for call to OnStartRequest to avoid |
|
700 // deadlocks when calls to RetargetDeliveryTo for multiple |
|
701 // nsInputStreamPumps are needed (e.g. nsHttpChannel). |
|
702 mMonitor.Exit(); |
|
703 mListener->OnStopRequest(this, mListenerContext, mStatus); |
|
704 mMonitor.Enter(); |
|
705 } |
|
706 mListener = 0; |
|
707 mListenerContext = 0; |
|
708 |
|
709 if (mLoadGroup) |
|
710 mLoadGroup->RemoveRequest(this, nullptr, mStatus); |
|
711 |
|
712 return STATE_IDLE; |
|
713 } |
|
714 |
|
715 //----------------------------------------------------------------------------- |
|
716 // nsIThreadRetargetableRequest |
|
717 //----------------------------------------------------------------------------- |
|
718 |
|
719 NS_IMETHODIMP |
|
720 nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget) |
|
721 { |
|
722 ReentrantMonitorAutoEnter mon(mMonitor); |
|
723 |
|
724 NS_ENSURE_ARG(aNewTarget); |
|
725 NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER, |
|
726 NS_ERROR_UNEXPECTED); |
|
727 |
|
728 // If canceled, do not retarget. Return with canceled status. |
|
729 if (NS_FAILED(mStatus)) { |
|
730 return mStatus; |
|
731 } |
|
732 |
|
733 if (aNewTarget == mTargetThread) { |
|
734 NS_WARNING("Retargeting delivery to same thread"); |
|
735 return NS_OK; |
|
736 } |
|
737 |
|
738 // Ensure that |mListener| and any subsequent listeners can be retargeted |
|
739 // to another thread. |
|
740 nsresult rv = NS_OK; |
|
741 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener = |
|
742 do_QueryInterface(mListener, &rv); |
|
743 if (NS_SUCCEEDED(rv) && retargetableListener) { |
|
744 rv = retargetableListener->CheckListenerChain(); |
|
745 if (NS_SUCCEEDED(rv)) { |
|
746 mTargetThread = aNewTarget; |
|
747 mRetargeting = true; |
|
748 } |
|
749 } |
|
750 LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] " |
|
751 "%s listener [%p] rv[%x]", |
|
752 this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"), |
|
753 (nsIStreamListener*)mListener, rv)); |
|
754 return rv; |
|
755 } |