michael@0: /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ michael@0: /* vim:set ts=4 sts=4 sw=4 et cin: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "nsIOService.h" michael@0: #include "nsInputStreamPump.h" michael@0: #include "nsIStreamTransportService.h" michael@0: #include "nsISeekableStream.h" michael@0: #include "nsITransport.h" michael@0: #include "nsIThreadRetargetableStreamListener.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "nsCOMPtr.h" michael@0: #include "prlog.h" michael@0: #include "GeckoProfiler.h" michael@0: #include "nsIStreamListener.h" michael@0: #include "nsILoadGroup.h" michael@0: #include "nsNetCID.h" michael@0: #include michael@0: michael@0: static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); michael@0: michael@0: #if defined(PR_LOGGING) michael@0: // michael@0: // NSPR_LOG_MODULES=nsStreamPump:5 michael@0: // michael@0: static PRLogModuleInfo *gStreamPumpLog = nullptr; michael@0: #endif michael@0: #undef LOG michael@0: #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamPump methods michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsInputStreamPump::nsInputStreamPump() michael@0: : mState(STATE_IDLE) michael@0: , mStreamOffset(0) michael@0: , mStreamLength(UINT64_MAX) michael@0: , mStatus(NS_OK) michael@0: , mSuspendCount(0) michael@0: , mLoadFlags(LOAD_NORMAL) michael@0: , mProcessingCallbacks(false) michael@0: , mWaitingForInputStreamReady(false) michael@0: , mCloseWhenDone(false) michael@0: , mRetargeting(false) michael@0: , mMonitor("nsInputStreamPump") michael@0: { michael@0: #if defined(PR_LOGGING) michael@0: if (!gStreamPumpLog) michael@0: gStreamPumpLog = PR_NewLogModule("nsStreamPump"); michael@0: #endif michael@0: } michael@0: michael@0: nsInputStreamPump::~nsInputStreamPump() michael@0: { michael@0: } michael@0: michael@0: nsresult michael@0: nsInputStreamPump::Create(nsInputStreamPump **result, michael@0: nsIInputStream *stream, michael@0: int64_t streamPos, michael@0: int64_t streamLen, michael@0: uint32_t segsize, michael@0: uint32_t segcount, michael@0: bool closeWhenDone) michael@0: { michael@0: nsresult rv = NS_ERROR_OUT_OF_MEMORY; michael@0: nsRefPtr pump = new nsInputStreamPump(); michael@0: if (pump) { michael@0: rv = pump->Init(stream, streamPos, streamLen, michael@0: segsize, segcount, closeWhenDone); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: *result = nullptr; michael@0: pump.swap(*result); michael@0: } michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: struct PeekData { michael@0: PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure) michael@0: : mFunc(fun), mClosure(closure) {} michael@0: michael@0: nsInputStreamPump::PeekSegmentFun mFunc; michael@0: void* mClosure; michael@0: }; michael@0: michael@0: static NS_METHOD michael@0: CallPeekFunc(nsIInputStream *aInStream, void *aClosure, michael@0: const char *aFromSegment, uint32_t aToOffset, uint32_t aCount, michael@0: uint32_t *aWriteCount) michael@0: { michael@0: NS_ASSERTION(aToOffset == 0, "Called more than once?"); michael@0: NS_ASSERTION(aCount > 0, "Called without data?"); michael@0: michael@0: PeekData* data = static_cast(aClosure); michael@0: data->mFunc(data->mClosure, michael@0: reinterpret_cast(aFromSegment), aCount); michael@0: return NS_BINDING_ABORTED; michael@0: } michael@0: michael@0: nsresult michael@0: nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: NS_ASSERTION(mAsyncStream, "PeekStream called without stream"); michael@0: michael@0: // See if the pipe is closed by checking the return of Available. michael@0: uint64_t dummy64; michael@0: nsresult rv = mAsyncStream->Available(&dummy64); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX); michael@0: michael@0: PeekData data(callback, closure); michael@0: return mAsyncStream->ReadSegments(CallPeekFunc, michael@0: &data, michael@0: nsIOService::gDefaultSegmentSize, michael@0: &dummy); michael@0: } michael@0: michael@0: nsresult michael@0: nsInputStreamPump::EnsureWaiting() michael@0: { michael@0: mMonitor.AssertCurrentThreadIn(); michael@0: michael@0: // no need to worry about multiple threads... an input stream pump lives michael@0: // on only one thread at a time. michael@0: MOZ_ASSERT(mAsyncStream); michael@0: if (!mWaitingForInputStreamReady && !mProcessingCallbacks) { michael@0: // Ensure OnStateStop is called on the main thread. michael@0: if (mState == STATE_STOP) { michael@0: nsCOMPtr mainThread = do_GetMainThread(); michael@0: if (mTargetThread != mainThread) { michael@0: mTargetThread = do_QueryInterface(mainThread); michael@0: } michael@0: } michael@0: MOZ_ASSERT(mTargetThread); michael@0: nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread); michael@0: if (NS_FAILED(rv)) { michael@0: NS_ERROR("AsyncWait failed"); michael@0: return rv; michael@0: } michael@0: // Any retargeting during STATE_START or START_TRANSFER is complete michael@0: // after the call to AsyncWait; next callback wil be on mTargetThread. michael@0: mRetargeting = false; michael@0: mWaitingForInputStreamReady = true; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamPump::nsISupports michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: // although this class can only be accessed from one thread at a time, we do michael@0: // allow its ownership to move from thread to thread, assuming the consumer michael@0: // understands the limitations of this. michael@0: NS_IMPL_ISUPPORTS(nsInputStreamPump, michael@0: nsIRequest, michael@0: nsIThreadRetargetableRequest, michael@0: nsIInputStreamCallback, michael@0: nsIInputStreamPump) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamPump::nsIRequest michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::GetName(nsACString &result) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: result.Truncate(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::IsPending(bool *result) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: *result = (mState != STATE_IDLE); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::GetStatus(nsresult *status) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: *status = mStatus; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::Cancel(nsresult status) michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n", michael@0: this, status)); michael@0: michael@0: if (NS_FAILED(mStatus)) { michael@0: LOG((" already canceled\n")); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code"); michael@0: mStatus = status; michael@0: michael@0: // close input stream michael@0: if (mAsyncStream) { michael@0: mAsyncStream->CloseWithStatus(status); michael@0: if (mSuspendCount == 0) michael@0: EnsureWaiting(); michael@0: // Otherwise, EnsureWaiting will be called by Resume(). michael@0: // Note that while suspended, OnInputStreamReady will michael@0: // not do anything, and also note that calling asyncWait michael@0: // on a closed stream works and will dispatch an event immediately. michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::Suspend() michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: LOG(("nsInputStreamPump::Suspend [this=%p]\n", this)); michael@0: NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); michael@0: ++mSuspendCount; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::Resume() michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: LOG(("nsInputStreamPump::Resume [this=%p]\n", this)); michael@0: NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED); michael@0: NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED); michael@0: michael@0: if (--mSuspendCount == 0) michael@0: EnsureWaiting(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: *aLoadFlags = mLoadFlags; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: mLoadFlags = aLoadFlags; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: NS_IF_ADDREF(*aLoadGroup = mLoadGroup); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: mLoadGroup = aLoadGroup; michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamPump::nsIInputStreamPump implementation michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::Init(nsIInputStream *stream, michael@0: int64_t streamPos, int64_t streamLen, michael@0: uint32_t segsize, uint32_t segcount, michael@0: bool closeWhenDone) michael@0: { michael@0: NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); michael@0: michael@0: mStreamOffset = uint64_t(streamPos); michael@0: if (int64_t(streamLen) >= int64_t(0)) michael@0: mStreamLength = uint64_t(streamLen); michael@0: mStream = stream; michael@0: mSegSize = segsize; michael@0: mSegCount = segcount; michael@0: mCloseWhenDone = closeWhenDone; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); michael@0: NS_ENSURE_ARG_POINTER(listener); michael@0: MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the " michael@0: "main thread only."); michael@0: michael@0: // michael@0: // OK, we need to use the stream transport service if michael@0: // michael@0: // (1) the stream is blocking michael@0: // (2) the stream does not support nsIAsyncInputStream michael@0: // michael@0: michael@0: bool nonBlocking; michael@0: nsresult rv = mStream->IsNonBlocking(&nonBlocking); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: if (nonBlocking) { michael@0: mAsyncStream = do_QueryInterface(mStream); michael@0: // michael@0: // if the stream supports nsIAsyncInputStream, and if we need to seek michael@0: // to a starting offset, then we must do so here. in the non-async michael@0: // stream case, the stream transport service will take care of seeking michael@0: // for us. michael@0: // michael@0: if (mAsyncStream && (mStreamOffset != UINT64_MAX)) { michael@0: nsCOMPtr seekable = do_QueryInterface(mStream); michael@0: if (seekable) michael@0: seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset); michael@0: } michael@0: } michael@0: michael@0: if (!mAsyncStream) { michael@0: // ok, let's use the stream transport service to read this stream. michael@0: nsCOMPtr sts = michael@0: do_GetService(kStreamTransportServiceCID, &rv); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: nsCOMPtr transport; michael@0: rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength, michael@0: mCloseWhenDone, getter_AddRefs(transport)); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: nsCOMPtr wrapper; michael@0: rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper)); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: mAsyncStream = do_QueryInterface(wrapper, &rv); michael@0: if (NS_FAILED(rv)) return rv; michael@0: } michael@0: michael@0: // release our reference to the original stream. from this point forward, michael@0: // we only reference the "stream" via mAsyncStream. michael@0: mStream = 0; michael@0: michael@0: // mStreamOffset now holds the number of bytes currently read. we use this michael@0: // to enforce the mStreamLength restriction. michael@0: mStreamOffset = 0; michael@0: michael@0: // grab event queue (we must do this here by contract, since all notifications michael@0: // must go to the thread which called AsyncRead) michael@0: mTargetThread = do_GetCurrentThread(); michael@0: NS_ENSURE_STATE(mTargetThread); michael@0: michael@0: rv = EnsureWaiting(); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: if (mLoadGroup) michael@0: mLoadGroup->AddRequest(this, nullptr); michael@0: michael@0: mState = STATE_START; michael@0: mListener = listener; michael@0: mListenerContext = ctxt; michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamPump::nsIInputStreamCallback implementation michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream) michael@0: { michael@0: LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this)); michael@0: michael@0: PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady"); michael@0: // this function has been called from a PLEvent, so we can safely call michael@0: // any listener or progress sink methods directly from here. michael@0: michael@0: for (;;) { michael@0: // There should only be one iteration of this loop happening at a time. michael@0: // To prevent AsyncWait() (called during callbacks or on other threads) michael@0: // from creating a parallel OnInputStreamReady(), we use: michael@0: // -- a monitor; and michael@0: // -- a boolean mProcessingCallbacks to detect parallel loops michael@0: // when exiting the monitor for callbacks. michael@0: ReentrantMonitorAutoEnter lock(mMonitor); michael@0: michael@0: // Prevent parallel execution during callbacks, while out of monitor. michael@0: if (mProcessingCallbacks) { michael@0: MOZ_ASSERT(!mProcessingCallbacks); michael@0: break; michael@0: } michael@0: mProcessingCallbacks = true; michael@0: if (mSuspendCount || mState == STATE_IDLE) { michael@0: mWaitingForInputStreamReady = false; michael@0: mProcessingCallbacks = false; michael@0: break; michael@0: } michael@0: michael@0: uint32_t nextState; michael@0: switch (mState) { michael@0: case STATE_START: michael@0: nextState = OnStateStart(); michael@0: break; michael@0: case STATE_TRANSFER: michael@0: nextState = OnStateTransfer(); michael@0: break; michael@0: case STATE_STOP: michael@0: mRetargeting = false; michael@0: nextState = OnStateStop(); michael@0: break; michael@0: default: michael@0: nextState = 0; michael@0: NS_NOTREACHED("Unknown enum value."); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: bool stillTransferring = (mState == STATE_TRANSFER && michael@0: nextState == STATE_TRANSFER); michael@0: if (stillTransferring) { michael@0: NS_ASSERTION(NS_SUCCEEDED(mStatus), michael@0: "Should not have failed status for ongoing transfer"); michael@0: } else { michael@0: NS_ASSERTION(mState != nextState, michael@0: "Only OnStateTransfer can be called more than once."); michael@0: } michael@0: if (mRetargeting) { michael@0: NS_ASSERTION(mState != STATE_STOP, michael@0: "Retargeting should not happen during OnStateStop."); michael@0: } michael@0: michael@0: // Set mRetargeting so EnsureWaiting will be called. It ensures that michael@0: // OnStateStop is called on the main thread. michael@0: if (nextState == STATE_STOP && !NS_IsMainThread()) { michael@0: mRetargeting = true; michael@0: } michael@0: michael@0: // Unset mProcessingCallbacks here (while we have lock) so our own call to michael@0: // EnsureWaiting isn't blocked by it. michael@0: mProcessingCallbacks = false; michael@0: michael@0: // Wait asynchronously if there is still data to transfer, or we're michael@0: // switching event delivery to another thread. michael@0: if (!mSuspendCount && (stillTransferring || mRetargeting)) { michael@0: mState = nextState; michael@0: mWaitingForInputStreamReady = false; michael@0: nsresult rv = EnsureWaiting(); michael@0: if (NS_SUCCEEDED(rv)) michael@0: break; michael@0: michael@0: // Failure to start asynchronous wait: stop transfer. michael@0: // Do not set mStatus if it was previously set to report a failure. michael@0: if (NS_SUCCEEDED(mStatus)) { michael@0: mStatus = rv; michael@0: } michael@0: nextState = STATE_STOP; michael@0: } michael@0: michael@0: mState = nextState; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: uint32_t michael@0: nsInputStreamPump::OnStateStart() michael@0: { michael@0: mMonitor.AssertCurrentThreadIn(); michael@0: michael@0: PROFILER_LABEL("nsInputStreamPump", "OnStateStart"); michael@0: LOG((" OnStateStart [this=%p]\n", this)); michael@0: michael@0: nsresult rv; michael@0: michael@0: // need to check the reason why the stream is ready. this is required michael@0: // so our listener can check our status from OnStartRequest. michael@0: // XXX async streams should have a GetStatus method! michael@0: if (NS_SUCCEEDED(mStatus)) { michael@0: uint64_t avail; michael@0: rv = mAsyncStream->Available(&avail); michael@0: if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) michael@0: mStatus = rv; michael@0: } michael@0: michael@0: { michael@0: // Note: Must exit monitor for call to OnStartRequest to avoid michael@0: // deadlocks when calls to RetargetDeliveryTo for multiple michael@0: // nsInputStreamPumps are needed (e.g. nsHttpChannel). michael@0: mMonitor.Exit(); michael@0: rv = mListener->OnStartRequest(this, mListenerContext); michael@0: mMonitor.Enter(); michael@0: } michael@0: michael@0: // an error returned from OnStartRequest should cause us to abort; however, michael@0: // we must not stomp on mStatus if already canceled. michael@0: if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) michael@0: mStatus = rv; michael@0: michael@0: return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP; michael@0: } michael@0: michael@0: uint32_t michael@0: nsInputStreamPump::OnStateTransfer() michael@0: { michael@0: mMonitor.AssertCurrentThreadIn(); michael@0: michael@0: PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); michael@0: LOG((" OnStateTransfer [this=%p]\n", this)); michael@0: michael@0: // if canceled, go directly to STATE_STOP... michael@0: if (NS_FAILED(mStatus)) michael@0: return STATE_STOP; michael@0: michael@0: nsresult rv; michael@0: michael@0: uint64_t avail; michael@0: rv = mAsyncStream->Available(&avail); michael@0: LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail)); michael@0: michael@0: if (rv == NS_BASE_STREAM_CLOSED) { michael@0: rv = NS_OK; michael@0: avail = 0; michael@0: } michael@0: else if (NS_SUCCEEDED(rv) && avail) { michael@0: // figure out how much data to report (XXX detect overflow??) michael@0: if (avail > mStreamLength - mStreamOffset) michael@0: avail = mStreamLength - mStreamOffset; michael@0: michael@0: if (avail) { michael@0: // we used to limit avail to 16K - we were afraid some ODA handlers michael@0: // might assume they wouldn't get more than 16K at once michael@0: // we're removing that limit since it speeds up local file access. michael@0: // Now there's an implicit 64K limit of 4 16K segments michael@0: // NOTE: ok, so the story is as follows. OnDataAvailable impls michael@0: // are by contract supposed to consume exactly |avail| bytes. michael@0: // however, many do not... mailnews... stream converters... michael@0: // cough, cough. the input stream pump is fairly tolerant michael@0: // in this regard; however, if an ODA does not consume any michael@0: // data from the stream, then we could potentially end up in michael@0: // an infinite loop. we do our best here to try to catch michael@0: // such an error. (see bug 189672) michael@0: michael@0: // in most cases this QI will succeed (mAsyncStream is almost always michael@0: // a nsPipeInputStream, which implements nsISeekableStream::Tell). michael@0: int64_t offsetBefore; michael@0: nsCOMPtr seekable = do_QueryInterface(mAsyncStream); michael@0: if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) { michael@0: NS_NOTREACHED("Tell failed on readable stream"); michael@0: offsetBefore = 0; michael@0: } michael@0: michael@0: uint32_t odaAvail = michael@0: avail > UINT32_MAX ? michael@0: UINT32_MAX : uint32_t(avail); michael@0: michael@0: LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n", michael@0: mStreamOffset, avail, odaAvail)); michael@0: michael@0: { michael@0: // Note: Must exit monitor for call to OnStartRequest to avoid michael@0: // deadlocks when calls to RetargetDeliveryTo for multiple michael@0: // nsInputStreamPumps are needed (e.g. nsHttpChannel). michael@0: mMonitor.Exit(); michael@0: rv = mListener->OnDataAvailable(this, mListenerContext, michael@0: mAsyncStream, mStreamOffset, michael@0: odaAvail); michael@0: mMonitor.Enter(); michael@0: } michael@0: michael@0: // don't enter this code if ODA failed or called Cancel michael@0: if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) { michael@0: // test to see if this ODA failed to consume data michael@0: if (seekable) { michael@0: // NOTE: if Tell fails, which can happen if the stream is michael@0: // now closed, then we assume that everything was read. michael@0: int64_t offsetAfter; michael@0: if (NS_FAILED(seekable->Tell(&offsetAfter))) michael@0: offsetAfter = offsetBefore + odaAvail; michael@0: if (offsetAfter > offsetBefore) michael@0: mStreamOffset += (offsetAfter - offsetBefore); michael@0: else if (mSuspendCount == 0) { michael@0: // michael@0: // possible infinite loop if we continue pumping data! michael@0: // michael@0: // NOTE: although not allowed by nsIStreamListener, we michael@0: // will allow the ODA impl to Suspend the pump. IMAP michael@0: // does this :-( michael@0: // michael@0: NS_ERROR("OnDataAvailable implementation consumed no data"); michael@0: mStatus = NS_ERROR_UNEXPECTED; michael@0: } michael@0: } michael@0: else michael@0: mStreamOffset += odaAvail; // assume ODA behaved well michael@0: } michael@0: } michael@0: } michael@0: michael@0: // an error returned from Available or OnDataAvailable should cause us to michael@0: // abort; however, we must not stomp on mStatus if already canceled. michael@0: michael@0: if (NS_SUCCEEDED(mStatus)) { michael@0: if (NS_FAILED(rv)) michael@0: mStatus = rv; michael@0: else if (avail) { michael@0: // if stream is now closed, advance to STATE_STOP right away. michael@0: // Available may return 0 bytes available at the moment; that michael@0: // would not mean that we are done. michael@0: // XXX async streams should have a GetStatus method! michael@0: rv = mAsyncStream->Available(&avail); michael@0: if (NS_SUCCEEDED(rv)) michael@0: return STATE_TRANSFER; michael@0: if (rv != NS_BASE_STREAM_CLOSED) michael@0: mStatus = rv; michael@0: } michael@0: } michael@0: return STATE_STOP; michael@0: } michael@0: michael@0: nsresult michael@0: nsInputStreamPump::CallOnStateStop() michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: MOZ_ASSERT(NS_IsMainThread(), michael@0: "CallOnStateStop should only be called on the main thread."); michael@0: michael@0: mState = OnStateStop(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: uint32_t michael@0: nsInputStreamPump::OnStateStop() michael@0: { michael@0: mMonitor.AssertCurrentThreadIn(); michael@0: michael@0: if (!NS_IsMainThread()) { michael@0: // Hopefully temporary hack: OnStateStop should only run on the main michael@0: // thread, but we're seeing some rare off-main-thread calls. For now michael@0: // just redispatch to the main thread in release builds, and crash in michael@0: // debug builds. michael@0: MOZ_ASSERT(NS_IsMainThread(), michael@0: "OnStateStop should only be called on the main thread."); michael@0: nsresult rv = NS_DispatchToMainThread( michael@0: NS_NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop)); michael@0: NS_ENSURE_SUCCESS(rv, STATE_IDLE); michael@0: return STATE_IDLE; michael@0: } michael@0: michael@0: PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer"); michael@0: LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus)); michael@0: michael@0: // if an error occurred, we must be sure to pass the error onto the async michael@0: // stream. in some cases, this is redundant, but since close is idempotent, michael@0: // this is OK. otherwise, be sure to honor the "close-when-done" option. michael@0: michael@0: if (!mAsyncStream || !mListener) { michael@0: MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?"); michael@0: MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?"); michael@0: return STATE_IDLE; michael@0: } michael@0: michael@0: if (NS_FAILED(mStatus)) michael@0: mAsyncStream->CloseWithStatus(mStatus); michael@0: else if (mCloseWhenDone) michael@0: mAsyncStream->Close(); michael@0: michael@0: mAsyncStream = 0; michael@0: mTargetThread = 0; michael@0: mIsPending = false; michael@0: { michael@0: // Note: Must exit monitor for call to OnStartRequest to avoid michael@0: // deadlocks when calls to RetargetDeliveryTo for multiple michael@0: // nsInputStreamPumps are needed (e.g. nsHttpChannel). michael@0: mMonitor.Exit(); michael@0: mListener->OnStopRequest(this, mListenerContext, mStatus); michael@0: mMonitor.Enter(); michael@0: } michael@0: mListener = 0; michael@0: mListenerContext = 0; michael@0: michael@0: if (mLoadGroup) michael@0: mLoadGroup->RemoveRequest(this, nullptr, mStatus); michael@0: michael@0: return STATE_IDLE; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsIThreadRetargetableRequest michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget) michael@0: { michael@0: ReentrantMonitorAutoEnter mon(mMonitor); michael@0: michael@0: NS_ENSURE_ARG(aNewTarget); michael@0: NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER, michael@0: NS_ERROR_UNEXPECTED); michael@0: michael@0: // If canceled, do not retarget. Return with canceled status. michael@0: if (NS_FAILED(mStatus)) { michael@0: return mStatus; michael@0: } michael@0: michael@0: if (aNewTarget == mTargetThread) { michael@0: NS_WARNING("Retargeting delivery to same thread"); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Ensure that |mListener| and any subsequent listeners can be retargeted michael@0: // to another thread. michael@0: nsresult rv = NS_OK; michael@0: nsCOMPtr retargetableListener = michael@0: do_QueryInterface(mListener, &rv); michael@0: if (NS_SUCCEEDED(rv) && retargetableListener) { michael@0: rv = retargetableListener->CheckListenerChain(); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mTargetThread = aNewTarget; michael@0: mRetargeting = true; michael@0: } michael@0: } michael@0: LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] " michael@0: "%s listener [%p] rv[%x]", michael@0: this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"), michael@0: (nsIStreamListener*)mListener, rv)); michael@0: return rv; michael@0: }