1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/netwerk/base/src/nsInputStreamPump.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,755 @@ 1.4 +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 1.5 +/* vim:set ts=4 sts=4 sw=4 et cin: */ 1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.8 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +#include "nsIOService.h" 1.11 +#include "nsInputStreamPump.h" 1.12 +#include "nsIStreamTransportService.h" 1.13 +#include "nsISeekableStream.h" 1.14 +#include "nsITransport.h" 1.15 +#include "nsIThreadRetargetableStreamListener.h" 1.16 +#include "nsThreadUtils.h" 1.17 +#include "nsCOMPtr.h" 1.18 +#include "prlog.h" 1.19 +#include "GeckoProfiler.h" 1.20 +#include "nsIStreamListener.h" 1.21 +#include "nsILoadGroup.h" 1.22 +#include "nsNetCID.h" 1.23 +#include <algorithm> 1.24 + 1.25 +static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); 1.26 + 1.27 +#if defined(PR_LOGGING) 1.28 +// 1.29 +// NSPR_LOG_MODULES=nsStreamPump:5 1.30 +// 1.31 +static PRLogModuleInfo *gStreamPumpLog = nullptr; 1.32 +#endif 1.33 +#undef LOG 1.34 +#define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args) 1.35 + 1.36 +//----------------------------------------------------------------------------- 1.37 +// nsInputStreamPump methods 1.38 +//----------------------------------------------------------------------------- 1.39 + 1.40 +nsInputStreamPump::nsInputStreamPump() 1.41 + : mState(STATE_IDLE) 1.42 + , mStreamOffset(0) 1.43 + , mStreamLength(UINT64_MAX) 1.44 + , mStatus(NS_OK) 1.45 + , mSuspendCount(0) 1.46 + , mLoadFlags(LOAD_NORMAL) 1.47 + , mProcessingCallbacks(false) 1.48 + , mWaitingForInputStreamReady(false) 1.49 + , mCloseWhenDone(false) 1.50 + , mRetargeting(false) 1.51 + , mMonitor("nsInputStreamPump") 1.52 +{ 1.53 +#if defined(PR_LOGGING) 1.54 + if (!gStreamPumpLog) 1.55 + gStreamPumpLog = PR_NewLogModule("nsStreamPump"); 1.56 +#endif 1.57 +} 1.58 + 1.59 +nsInputStreamPump::~nsInputStreamPump() 1.60 +{ 1.61 +} 1.62 + 1.63 +nsresult 1.64 +nsInputStreamPump::Create(nsInputStreamPump **result, 1.65 + nsIInputStream *stream, 1.66 + int64_t streamPos, 1.67 + int64_t streamLen, 1.68 + uint32_t segsize, 1.69 + uint32_t segcount, 1.70 + bool closeWhenDone) 1.71 +{ 1.72 + nsresult rv = NS_ERROR_OUT_OF_MEMORY; 1.73 + nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump(); 1.74 + if (pump) { 1.75 + rv = pump->Init(stream, streamPos, streamLen, 1.76 + segsize, segcount, closeWhenDone); 1.77 + if (NS_SUCCEEDED(rv)) { 1.78 + *result = nullptr; 1.79 + pump.swap(*result); 1.80 + } 1.81 + } 1.82 + return rv; 1.83 +} 1.84 + 1.85 +struct PeekData { 1.86 + PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure) 1.87 + : mFunc(fun), mClosure(closure) {} 1.88 + 1.89 + nsInputStreamPump::PeekSegmentFun mFunc; 1.90 + void* mClosure; 1.91 +}; 1.92 + 1.93 +static NS_METHOD 1.94 +CallPeekFunc(nsIInputStream *aInStream, void *aClosure, 1.95 + const char *aFromSegment, uint32_t aToOffset, uint32_t aCount, 1.96 + uint32_t *aWriteCount) 1.97 +{ 1.98 + NS_ASSERTION(aToOffset == 0, "Called more than once?"); 1.99 + NS_ASSERTION(aCount > 0, "Called without data?"); 1.100 + 1.101 + PeekData* data = static_cast<PeekData*>(aClosure); 1.102 + data->mFunc(data->mClosure, 1.103 + reinterpret_cast<const uint8_t*>(aFromSegment), aCount); 1.104 + return NS_BINDING_ABORTED; 1.105 +} 1.106 + 1.107 +nsresult 1.108 +nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) 1.109 +{ 1.110 + ReentrantMonitorAutoEnter mon(mMonitor); 1.111 + 1.112 + NS_ASSERTION(mAsyncStream, "PeekStream called without stream"); 1.113 + 1.114 + // See if the pipe is closed by checking the return of Available. 1.115 + uint64_t dummy64; 1.116 + nsresult rv = mAsyncStream->Available(&dummy64); 1.117 + if (NS_FAILED(rv)) 1.118 + return rv; 1.119 + uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX); 1.120 + 1.121 + PeekData data(callback, closure); 1.122 + return mAsyncStream->ReadSegments(CallPeekFunc, 1.123 + &data, 1.124 + nsIOService::gDefaultSegmentSize, 1.125 + &dummy); 1.126 +} 1.127 + 1.128 +nsresult 1.129 +nsInputStreamPump::EnsureWaiting() 1.130 +{ 1.131 + mMonitor.AssertCurrentThreadIn(); 1.132 + 1.133 + // no need to worry about multiple threads... an input stream pump lives 1.134 + // on only one thread at a time. 1.135 + MOZ_ASSERT(mAsyncStream); 1.136 + if (!mWaitingForInputStreamReady && !mProcessingCallbacks) { 1.137 + // Ensure OnStateStop is called on the main thread. 1.138 + if (mState == STATE_STOP) { 1.139 + nsCOMPtr<nsIThread> mainThread = do_GetMainThread(); 1.140 + if (mTargetThread != mainThread) { 1.141 + mTargetThread = do_QueryInterface(mainThread); 1.142 + } 1.143 + } 1.144 + MOZ_ASSERT(mTargetThread); 1.145 + nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread); 1.146 + if (NS_FAILED(rv)) { 1.147 + NS_ERROR("AsyncWait failed"); 1.148 + return rv; 1.149 + } 1.150 + // Any retargeting during STATE_START or START_TRANSFER is complete 1.151 + // after the call to AsyncWait; next callback wil be on mTargetThread. 1.152 + mRetargeting = false; 1.153 + mWaitingForInputStreamReady = true; 1.154 + } 1.155 + return NS_OK; 1.156 +} 1.157 + 1.158 +//----------------------------------------------------------------------------- 1.159 +// nsInputStreamPump::nsISupports 1.160 +//----------------------------------------------------------------------------- 1.161 + 1.162 +// although this class can only be accessed from one thread at a time, we do 1.163 +// allow its ownership to move from thread to thread, assuming the consumer 1.164 +// understands the limitations of this. 1.165 +NS_IMPL_ISUPPORTS(nsInputStreamPump, 1.166 + nsIRequest, 1.167 + nsIThreadRetargetableRequest, 1.168 + nsIInputStreamCallback, 1.169 + nsIInputStreamPump) 1.170 + 1.171 +//----------------------------------------------------------------------------- 1.172 +// nsInputStreamPump::nsIRequest 1.173 +//----------------------------------------------------------------------------- 1.174 + 1.175 +NS_IMETHODIMP 1.176 +nsInputStreamPump::GetName(nsACString &result) 1.177 +{ 1.178 + ReentrantMonitorAutoEnter mon(mMonitor); 1.179 + 1.180 + result.Truncate(); 1.181 + return NS_OK; 1.182 +} 1.183 + 1.184 +NS_IMETHODIMP 1.185 +nsInputStreamPump::IsPending(bool *result) 1.186 +{ 1.187 + ReentrantMonitorAutoEnter mon(mMonitor); 1.188 + 1.189 + *result = (mState != STATE_IDLE); 1.190 + return NS_OK; 1.191 +} 1.192 + 1.193 +NS_IMETHODIMP 1.194 +nsInputStreamPump::GetStatus(nsresult *status) 1.195 +{ 1.196 + ReentrantMonitorAutoEnter mon(mMonitor); 1.197 + 1.198 + *status = mStatus; 1.199 + return NS_OK; 1.200 +} 1.201 + 1.202 +NS_IMETHODIMP 1.203 +nsInputStreamPump::Cancel(nsresult status) 1.204 +{ 1.205 + MOZ_ASSERT(NS_IsMainThread()); 1.206 + 1.207 + ReentrantMonitorAutoEnter mon(mMonitor); 1.208 + 1.209 + LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n", 1.210 + this, status)); 1.211 + 1.212 + if (NS_FAILED(mStatus)) { 1.213 + LOG((" already canceled\n")); 1.214 + return NS_OK; 1.215 + } 1.216 + 1.217 + NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code"); 1.218 + mStatus = status; 1.219 + 1.220 + // close input stream 1.221 + if (mAsyncStream) { 1.222 + mAsyncStream->CloseWithStatus(status); 1.223 + if (mSuspendCount == 0) 1.224 + EnsureWaiting(); 1.225 + // Otherwise, EnsureWaiting will be called by Resume(). 1.226 + // Note that while suspended, OnInputStreamReady will 1.227 + // not do anything, and also note that calling asyncWait 1.228 + // on a closed stream works and will dispatch an event immediately. 1.229 + } 1.230 + return NS_OK; 1.231 +} 1.232 + 1.233 +NS_IMETHODIMP 1.234 +nsInputStreamPump::Suspend() 1.235 +{ 1.236 + ReentrantMonitorAutoEnter mon(mMonitor); 1.237 + 1.238 + LOG(("nsInputStreamPump::Suspend [this=%p]\n", this)); 1.239 + NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); 1.240 + ++mSuspendCount; 1.241 + return NS_OK; 1.242 +} 1.243 + 1.244 +NS_IMETHODIMP 1.245 +nsInputStreamPump::Resume() 1.246 +{ 1.247 + ReentrantMonitorAutoEnter mon(mMonitor); 1.248 + 1.249 + LOG(("nsInputStreamPump::Resume [this=%p]\n", this)); 1.250 + NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED); 1.251 + NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); 1.252 + 1.253 + if (--mSuspendCount == 0) 1.254 + EnsureWaiting(); 1.255 + return NS_OK; 1.256 +} 1.257 + 1.258 +NS_IMETHODIMP 1.259 +nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags) 1.260 +{ 1.261 + ReentrantMonitorAutoEnter mon(mMonitor); 1.262 + 1.263 + *aLoadFlags = mLoadFlags; 1.264 + return NS_OK; 1.265 +} 1.266 + 1.267 +NS_IMETHODIMP 1.268 +nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) 1.269 +{ 1.270 + ReentrantMonitorAutoEnter mon(mMonitor); 1.271 + 1.272 + mLoadFlags = aLoadFlags; 1.273 + return NS_OK; 1.274 +} 1.275 + 1.276 +NS_IMETHODIMP 1.277 +nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup) 1.278 +{ 1.279 + ReentrantMonitorAutoEnter mon(mMonitor); 1.280 + 1.281 + NS_IF_ADDREF(*aLoadGroup = mLoadGroup); 1.282 + return NS_OK; 1.283 +} 1.284 + 1.285 +NS_IMETHODIMP 1.286 +nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup) 1.287 +{ 1.288 + ReentrantMonitorAutoEnter mon(mMonitor); 1.289 + 1.290 + mLoadGroup = aLoadGroup; 1.291 + return NS_OK; 1.292 +} 1.293 + 1.294 +//----------------------------------------------------------------------------- 1.295 +// nsInputStreamPump::nsIInputStreamPump implementation 1.296 +//----------------------------------------------------------------------------- 1.297 + 1.298 +NS_IMETHODIMP 1.299 +nsInputStreamPump::Init(nsIInputStream *stream, 1.300 + int64_t streamPos, int64_t streamLen, 1.301 + uint32_t segsize, uint32_t segcount, 1.302 + bool closeWhenDone) 1.303 +{ 1.304 + NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); 1.305 + 1.306 + mStreamOffset = uint64_t(streamPos); 1.307 + if (int64_t(streamLen) >= int64_t(0)) 1.308 + mStreamLength = uint64_t(streamLen); 1.309 + mStream = stream; 1.310 + mSegSize = segsize; 1.311 + mSegCount = segcount; 1.312 + mCloseWhenDone = closeWhenDone; 1.313 + 1.314 + return NS_OK; 1.315 +} 1.316 + 1.317 +NS_IMETHODIMP 1.318 +nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt) 1.319 +{ 1.320 + ReentrantMonitorAutoEnter mon(mMonitor); 1.321 + 1.322 + NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); 1.323 + NS_ENSURE_ARG_POINTER(listener); 1.324 + MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the " 1.325 + "main thread only."); 1.326 + 1.327 + // 1.328 + // OK, we need to use the stream transport service if 1.329 + // 1.330 + // (1) the stream is blocking 1.331 + // (2) the stream does not support nsIAsyncInputStream 1.332 + // 1.333 + 1.334 + bool nonBlocking; 1.335 + nsresult rv = mStream->IsNonBlocking(&nonBlocking); 1.336 + if (NS_FAILED(rv)) return rv; 1.337 + 1.338 + if (nonBlocking) { 1.339 + mAsyncStream = do_QueryInterface(mStream); 1.340 + // 1.341 + // if the stream supports nsIAsyncInputStream, and if we need to seek 1.342 + // to a starting offset, then we must do so here. in the non-async 1.343 + // stream case, the stream transport service will take care of seeking 1.344 + // for us. 1.345 + // 1.346 + if (mAsyncStream && (mStreamOffset != UINT64_MAX)) { 1.347 + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream); 1.348 + if (seekable) 1.349 + seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset); 1.350 + } 1.351 + } 1.352 + 1.353 + if (!mAsyncStream) { 1.354 + // ok, let's use the stream transport service to read this stream. 1.355 + nsCOMPtr<nsIStreamTransportService> sts = 1.356 + do_GetService(kStreamTransportServiceCID, &rv); 1.357 + if (NS_FAILED(rv)) return rv; 1.358 + 1.359 + nsCOMPtr<nsITransport> transport; 1.360 + rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength, 1.361 + mCloseWhenDone, getter_AddRefs(transport)); 1.362 + if (NS_FAILED(rv)) return rv; 1.363 + 1.364 + nsCOMPtr<nsIInputStream> wrapper; 1.365 + rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper)); 1.366 + if (NS_FAILED(rv)) return rv; 1.367 + 1.368 + mAsyncStream = do_QueryInterface(wrapper, &rv); 1.369 + if (NS_FAILED(rv)) return rv; 1.370 + } 1.371 + 1.372 + // release our reference to the original stream. from this point forward, 1.373 + // we only reference the "stream" via mAsyncStream. 1.374 + mStream = 0; 1.375 + 1.376 + // mStreamOffset now holds the number of bytes currently read. we use this 1.377 + // to enforce the mStreamLength restriction. 1.378 + mStreamOffset = 0; 1.379 + 1.380 + // grab event queue (we must do this here by contract, since all notifications 1.381 + // must go to the thread which called AsyncRead) 1.382 + mTargetThread = do_GetCurrentThread(); 1.383 + NS_ENSURE_STATE(mTargetThread); 1.384 + 1.385 + rv = EnsureWaiting(); 1.386 + if (NS_FAILED(rv)) return rv; 1.387 + 1.388 + if (mLoadGroup) 1.389 + mLoadGroup->AddRequest(this, nullptr); 1.390 + 1.391 + mState = STATE_START; 1.392 + mListener = listener; 1.393 + mListenerContext = ctxt; 1.394 + return NS_OK; 1.395 +} 1.396 + 1.397 +//----------------------------------------------------------------------------- 1.398 +// nsInputStreamPump::nsIInputStreamCallback implementation 1.399 +//----------------------------------------------------------------------------- 1.400 + 1.401 +NS_IMETHODIMP 1.402 +nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream) 1.403 +{ 1.404 + LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this)); 1.405 + 1.406 + PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady"); 1.407 + // this function has been called from a PLEvent, so we can safely call 1.408 + // any listener or progress sink methods directly from here. 1.409 + 1.410 + for (;;) { 1.411 + // There should only be one iteration of this loop happening at a time. 1.412 + // To prevent AsyncWait() (called during callbacks or on other threads) 1.413 + // from creating a parallel OnInputStreamReady(), we use: 1.414 + // -- a monitor; and 1.415 + // -- a boolean mProcessingCallbacks to detect parallel loops 1.416 + // when exiting the monitor for callbacks. 1.417 + ReentrantMonitorAutoEnter lock(mMonitor); 1.418 + 1.419 + // Prevent parallel execution during callbacks, while out of monitor. 1.420 + if (mProcessingCallbacks) { 1.421 + MOZ_ASSERT(!mProcessingCallbacks); 1.422 + break; 1.423 + } 1.424 + mProcessingCallbacks = true; 1.425 + if (mSuspendCount || mState == STATE_IDLE) { 1.426 + mWaitingForInputStreamReady = false; 1.427 + mProcessingCallbacks = false; 1.428 + break; 1.429 + } 1.430 + 1.431 + uint32_t nextState; 1.432 + switch (mState) { 1.433 + case STATE_START: 1.434 + nextState = OnStateStart(); 1.435 + break; 1.436 + case STATE_TRANSFER: 1.437 + nextState = OnStateTransfer(); 1.438 + break; 1.439 + case STATE_STOP: 1.440 + mRetargeting = false; 1.441 + nextState = OnStateStop(); 1.442 + break; 1.443 + default: 1.444 + nextState = 0; 1.445 + NS_NOTREACHED("Unknown enum value."); 1.446 + return NS_ERROR_UNEXPECTED; 1.447 + } 1.448 + 1.449 + bool stillTransferring = (mState == STATE_TRANSFER && 1.450 + nextState == STATE_TRANSFER); 1.451 + if (stillTransferring) { 1.452 + NS_ASSERTION(NS_SUCCEEDED(mStatus), 1.453 + "Should not have failed status for ongoing transfer"); 1.454 + } else { 1.455 + NS_ASSERTION(mState != nextState, 1.456 + "Only OnStateTransfer can be called more than once."); 1.457 + } 1.458 + if (mRetargeting) { 1.459 + NS_ASSERTION(mState != STATE_STOP, 1.460 + "Retargeting should not happen during OnStateStop."); 1.461 + } 1.462 + 1.463 + // Set mRetargeting so EnsureWaiting will be called. It ensures that 1.464 + // OnStateStop is called on the main thread. 1.465 + if (nextState == STATE_STOP && !NS_IsMainThread()) { 1.466 + mRetargeting = true; 1.467 + } 1.468 + 1.469 + // Unset mProcessingCallbacks here (while we have lock) so our own call to 1.470 + // EnsureWaiting isn't blocked by it. 1.471 + mProcessingCallbacks = false; 1.472 + 1.473 + // Wait asynchronously if there is still data to transfer, or we're 1.474 + // switching event delivery to another thread. 1.475 + if (!mSuspendCount && (stillTransferring || mRetargeting)) { 1.476 + mState = nextState; 1.477 + mWaitingForInputStreamReady = false; 1.478 + nsresult rv = EnsureWaiting(); 1.479 + if (NS_SUCCEEDED(rv)) 1.480 + break; 1.481 + 1.482 + // Failure to start asynchronous wait: stop transfer. 1.483 + // Do not set mStatus if it was previously set to report a failure. 1.484 + if (NS_SUCCEEDED(mStatus)) { 1.485 + mStatus = rv; 1.486 + } 1.487 + nextState = STATE_STOP; 1.488 + } 1.489 + 1.490 + mState = nextState; 1.491 + } 1.492 + return NS_OK; 1.493 +} 1.494 + 1.495 +uint32_t 1.496 +nsInputStreamPump::OnStateStart() 1.497 +{ 1.498 + mMonitor.AssertCurrentThreadIn(); 1.499 + 1.500 + PROFILER_LABEL("nsInputStreamPump", "OnStateStart"); 1.501 + LOG((" OnStateStart [this=%p]\n", this)); 1.502 + 1.503 + nsresult rv; 1.504 + 1.505 + // need to check the reason why the stream is ready. this is required 1.506 + // so our listener can check our status from OnStartRequest. 1.507 + // XXX async streams should have a GetStatus method! 1.508 + if (NS_SUCCEEDED(mStatus)) { 1.509 + uint64_t avail; 1.510 + rv = mAsyncStream->Available(&avail); 1.511 + if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) 1.512 + mStatus = rv; 1.513 + } 1.514 + 1.515 + { 1.516 + // Note: Must exit monitor for call to OnStartRequest to avoid 1.517 + // deadlocks when calls to RetargetDeliveryTo for multiple 1.518 + // nsInputStreamPumps are needed (e.g. nsHttpChannel). 1.519 + mMonitor.Exit(); 1.520 + rv = mListener->OnStartRequest(this, mListenerContext); 1.521 + mMonitor.Enter(); 1.522 + } 1.523 + 1.524 + // an error returned from OnStartRequest should cause us to abort; however, 1.525 + // we must not stomp on mStatus if already canceled. 1.526 + if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) 1.527 + mStatus = rv; 1.528 + 1.529 + return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP; 1.530 +} 1.531 + 1.532 +uint32_t 1.533 +nsInputStreamPump::OnStateTransfer() 1.534 +{ 1.535 + mMonitor.AssertCurrentThreadIn(); 1.536 + 1.537 + PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); 1.538 + LOG((" OnStateTransfer [this=%p]\n", this)); 1.539 + 1.540 + // if canceled, go directly to STATE_STOP... 1.541 + if (NS_FAILED(mStatus)) 1.542 + return STATE_STOP; 1.543 + 1.544 + nsresult rv; 1.545 + 1.546 + uint64_t avail; 1.547 + rv = mAsyncStream->Available(&avail); 1.548 + LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail)); 1.549 + 1.550 + if (rv == NS_BASE_STREAM_CLOSED) { 1.551 + rv = NS_OK; 1.552 + avail = 0; 1.553 + } 1.554 + else if (NS_SUCCEEDED(rv) && avail) { 1.555 + // figure out how much data to report (XXX detect overflow??) 1.556 + if (avail > mStreamLength - mStreamOffset) 1.557 + avail = mStreamLength - mStreamOffset; 1.558 + 1.559 + if (avail) { 1.560 + // we used to limit avail to 16K - we were afraid some ODA handlers 1.561 + // might assume they wouldn't get more than 16K at once 1.562 + // we're removing that limit since it speeds up local file access. 1.563 + // Now there's an implicit 64K limit of 4 16K segments 1.564 + // NOTE: ok, so the story is as follows. OnDataAvailable impls 1.565 + // are by contract supposed to consume exactly |avail| bytes. 1.566 + // however, many do not... mailnews... stream converters... 1.567 + // cough, cough. the input stream pump is fairly tolerant 1.568 + // in this regard; however, if an ODA does not consume any 1.569 + // data from the stream, then we could potentially end up in 1.570 + // an infinite loop. we do our best here to try to catch 1.571 + // such an error. (see bug 189672) 1.572 + 1.573 + // in most cases this QI will succeed (mAsyncStream is almost always 1.574 + // a nsPipeInputStream, which implements nsISeekableStream::Tell). 1.575 + int64_t offsetBefore; 1.576 + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream); 1.577 + if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) { 1.578 + NS_NOTREACHED("Tell failed on readable stream"); 1.579 + offsetBefore = 0; 1.580 + } 1.581 + 1.582 + uint32_t odaAvail = 1.583 + avail > UINT32_MAX ? 1.584 + UINT32_MAX : uint32_t(avail); 1.585 + 1.586 + LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n", 1.587 + mStreamOffset, avail, odaAvail)); 1.588 + 1.589 + { 1.590 + // Note: Must exit monitor for call to OnStartRequest to avoid 1.591 + // deadlocks when calls to RetargetDeliveryTo for multiple 1.592 + // nsInputStreamPumps are needed (e.g. nsHttpChannel). 1.593 + mMonitor.Exit(); 1.594 + rv = mListener->OnDataAvailable(this, mListenerContext, 1.595 + mAsyncStream, mStreamOffset, 1.596 + odaAvail); 1.597 + mMonitor.Enter(); 1.598 + } 1.599 + 1.600 + // don't enter this code if ODA failed or called Cancel 1.601 + if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) { 1.602 + // test to see if this ODA failed to consume data 1.603 + if (seekable) { 1.604 + // NOTE: if Tell fails, which can happen if the stream is 1.605 + // now closed, then we assume that everything was read. 1.606 + int64_t offsetAfter; 1.607 + if (NS_FAILED(seekable->Tell(&offsetAfter))) 1.608 + offsetAfter = offsetBefore + odaAvail; 1.609 + if (offsetAfter > offsetBefore) 1.610 + mStreamOffset += (offsetAfter - offsetBefore); 1.611 + else if (mSuspendCount == 0) { 1.612 + // 1.613 + // possible infinite loop if we continue pumping data! 1.614 + // 1.615 + // NOTE: although not allowed by nsIStreamListener, we 1.616 + // will allow the ODA impl to Suspend the pump. IMAP 1.617 + // does this :-( 1.618 + // 1.619 + NS_ERROR("OnDataAvailable implementation consumed no data"); 1.620 + mStatus = NS_ERROR_UNEXPECTED; 1.621 + } 1.622 + } 1.623 + else 1.624 + mStreamOffset += odaAvail; // assume ODA behaved well 1.625 + } 1.626 + } 1.627 + } 1.628 + 1.629 + // an error returned from Available or OnDataAvailable should cause us to 1.630 + // abort; however, we must not stomp on mStatus if already canceled. 1.631 + 1.632 + if (NS_SUCCEEDED(mStatus)) { 1.633 + if (NS_FAILED(rv)) 1.634 + mStatus = rv; 1.635 + else if (avail) { 1.636 + // if stream is now closed, advance to STATE_STOP right away. 1.637 + // Available may return 0 bytes available at the moment; that 1.638 + // would not mean that we are done. 1.639 + // XXX async streams should have a GetStatus method! 1.640 + rv = mAsyncStream->Available(&avail); 1.641 + if (NS_SUCCEEDED(rv)) 1.642 + return STATE_TRANSFER; 1.643 + if (rv != NS_BASE_STREAM_CLOSED) 1.644 + mStatus = rv; 1.645 + } 1.646 + } 1.647 + return STATE_STOP; 1.648 +} 1.649 + 1.650 +nsresult 1.651 +nsInputStreamPump::CallOnStateStop() 1.652 +{ 1.653 + ReentrantMonitorAutoEnter mon(mMonitor); 1.654 + 1.655 + MOZ_ASSERT(NS_IsMainThread(), 1.656 + "CallOnStateStop should only be called on the main thread."); 1.657 + 1.658 + mState = OnStateStop(); 1.659 + return NS_OK; 1.660 +} 1.661 + 1.662 +uint32_t 1.663 +nsInputStreamPump::OnStateStop() 1.664 +{ 1.665 + mMonitor.AssertCurrentThreadIn(); 1.666 + 1.667 + if (!NS_IsMainThread()) { 1.668 + // Hopefully temporary hack: OnStateStop should only run on the main 1.669 + // thread, but we're seeing some rare off-main-thread calls. For now 1.670 + // just redispatch to the main thread in release builds, and crash in 1.671 + // debug builds. 1.672 + MOZ_ASSERT(NS_IsMainThread(), 1.673 + "OnStateStop should only be called on the main thread."); 1.674 + nsresult rv = NS_DispatchToMainThread( 1.675 + NS_NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop)); 1.676 + NS_ENSURE_SUCCESS(rv, STATE_IDLE); 1.677 + return STATE_IDLE; 1.678 + } 1.679 + 1.680 + PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); 1.681 + LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus)); 1.682 + 1.683 + // if an error occurred, we must be sure to pass the error onto the async 1.684 + // stream. in some cases, this is redundant, but since close is idempotent, 1.685 + // this is OK. otherwise, be sure to honor the "close-when-done" option. 1.686 + 1.687 + if (!mAsyncStream || !mListener) { 1.688 + MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?"); 1.689 + MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?"); 1.690 + return STATE_IDLE; 1.691 + } 1.692 + 1.693 + if (NS_FAILED(mStatus)) 1.694 + mAsyncStream->CloseWithStatus(mStatus); 1.695 + else if (mCloseWhenDone) 1.696 + mAsyncStream->Close(); 1.697 + 1.698 + mAsyncStream = 0; 1.699 + mTargetThread = 0; 1.700 + mIsPending = false; 1.701 + { 1.702 + // Note: Must exit monitor for call to OnStartRequest to avoid 1.703 + // deadlocks when calls to RetargetDeliveryTo for multiple 1.704 + // nsInputStreamPumps are needed (e.g. nsHttpChannel). 1.705 + mMonitor.Exit(); 1.706 + mListener->OnStopRequest(this, mListenerContext, mStatus); 1.707 + mMonitor.Enter(); 1.708 + } 1.709 + mListener = 0; 1.710 + mListenerContext = 0; 1.711 + 1.712 + if (mLoadGroup) 1.713 + mLoadGroup->RemoveRequest(this, nullptr, mStatus); 1.714 + 1.715 + return STATE_IDLE; 1.716 +} 1.717 + 1.718 +//----------------------------------------------------------------------------- 1.719 +// nsIThreadRetargetableRequest 1.720 +//----------------------------------------------------------------------------- 1.721 + 1.722 +NS_IMETHODIMP 1.723 +nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget) 1.724 +{ 1.725 + ReentrantMonitorAutoEnter mon(mMonitor); 1.726 + 1.727 + NS_ENSURE_ARG(aNewTarget); 1.728 + NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER, 1.729 + NS_ERROR_UNEXPECTED); 1.730 + 1.731 + // If canceled, do not retarget. Return with canceled status. 1.732 + if (NS_FAILED(mStatus)) { 1.733 + return mStatus; 1.734 + } 1.735 + 1.736 + if (aNewTarget == mTargetThread) { 1.737 + NS_WARNING("Retargeting delivery to same thread"); 1.738 + return NS_OK; 1.739 + } 1.740 + 1.741 + // Ensure that |mListener| and any subsequent listeners can be retargeted 1.742 + // to another thread. 1.743 + nsresult rv = NS_OK; 1.744 + nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener = 1.745 + do_QueryInterface(mListener, &rv); 1.746 + if (NS_SUCCEEDED(rv) && retargetableListener) { 1.747 + rv = retargetableListener->CheckListenerChain(); 1.748 + if (NS_SUCCEEDED(rv)) { 1.749 + mTargetThread = aNewTarget; 1.750 + mRetargeting = true; 1.751 + } 1.752 + } 1.753 + LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] " 1.754 + "%s listener [%p] rv[%x]", 1.755 + this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"), 1.756 + (nsIStreamListener*)mListener, rv)); 1.757 + return rv; 1.758 +}