michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "nsAsyncStreamCopier.h" michael@0: #include "nsIOService.h" michael@0: #include "nsIEventTarget.h" michael@0: #include "nsStreamUtils.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "nsNetUtil.h" michael@0: #include "prlog.h" michael@0: michael@0: using namespace mozilla; michael@0: michael@0: #undef LOG michael@0: #if defined(PR_LOGGING) michael@0: // michael@0: // NSPR_LOG_MODULES=nsStreamCopier:5 michael@0: // michael@0: static PRLogModuleInfo *gStreamCopierLog = nullptr; michael@0: #endif michael@0: #define LOG(args) PR_LOG(gStreamCopierLog, PR_LOG_DEBUG, args) michael@0: michael@0: /** michael@0: * An event used to perform initialization off the main thread. michael@0: */ michael@0: class AsyncApplyBufferingPolicyEvent MOZ_FINAL: public nsRunnable michael@0: { michael@0: public: michael@0: /** michael@0: * @param aCopier michael@0: * The nsAsyncStreamCopier requesting the information. michael@0: */ michael@0: AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier) michael@0: : mCopier(aCopier) michael@0: , mTarget(NS_GetCurrentThread()) michael@0: { } michael@0: NS_METHOD Run() michael@0: { michael@0: nsresult rv = mCopier->ApplyBufferingPolicy(); michael@0: if (NS_FAILED(rv)) { michael@0: mCopier->Cancel(rv); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsCOMPtr event = NS_NewRunnableMethod(mCopier, &nsAsyncStreamCopier::AsyncCopyInternal); michael@0: rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); michael@0: MOZ_ASSERT(NS_SUCCEEDED(rv)); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: mCopier->Cancel(rv); michael@0: } michael@0: return NS_OK; michael@0: } michael@0: private: michael@0: nsRefPtr mCopier; michael@0: nsCOMPtr mTarget; michael@0: }; michael@0: michael@0: michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsAsyncStreamCopier::nsAsyncStreamCopier() michael@0: : mLock("nsAsyncStreamCopier.mLock") michael@0: , mMode(NS_ASYNCCOPY_VIA_READSEGMENTS) michael@0: , mChunkSize(nsIOService::gDefaultSegmentSize) michael@0: , mStatus(NS_OK) michael@0: , mIsPending(false) michael@0: , mShouldSniffBuffering(false) michael@0: { michael@0: #if defined(PR_LOGGING) michael@0: if (!gStreamCopierLog) michael@0: gStreamCopierLog = PR_NewLogModule("nsStreamCopier"); michael@0: #endif michael@0: LOG(("Creating nsAsyncStreamCopier @%x\n", this)); michael@0: } michael@0: michael@0: nsAsyncStreamCopier::~nsAsyncStreamCopier() michael@0: { michael@0: LOG(("Destroying nsAsyncStreamCopier @%x\n", this)); michael@0: } michael@0: michael@0: bool michael@0: nsAsyncStreamCopier::IsComplete(nsresult *status) michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: if (status) michael@0: *status = mStatus; michael@0: return !mIsPending; michael@0: } michael@0: michael@0: nsIRequest* michael@0: nsAsyncStreamCopier::AsRequest() michael@0: { michael@0: return static_cast(static_cast(this)); michael@0: } michael@0: michael@0: void michael@0: nsAsyncStreamCopier::Complete(nsresult status) michael@0: { michael@0: LOG(("nsAsyncStreamCopier::Complete [this=%p status=%x]\n", this, status)); michael@0: michael@0: nsCOMPtr observer; michael@0: nsCOMPtr ctx; michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: mCopierCtx = nullptr; michael@0: michael@0: if (mIsPending) { michael@0: mIsPending = false; michael@0: mStatus = status; michael@0: michael@0: // setup OnStopRequest callback and release references... michael@0: observer = mObserver; michael@0: mObserver = nullptr; michael@0: } michael@0: } michael@0: michael@0: if (observer) { michael@0: LOG((" calling OnStopRequest [status=%x]\n", status)); michael@0: observer->OnStopRequest(AsRequest(), ctx, status); michael@0: } michael@0: } michael@0: michael@0: void michael@0: nsAsyncStreamCopier::OnAsyncCopyComplete(void *closure, nsresult status) michael@0: { michael@0: nsAsyncStreamCopier *self = (nsAsyncStreamCopier *) closure; michael@0: self->Complete(status); michael@0: NS_RELEASE(self); // addref'd in AsyncCopy michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsISupports michael@0: michael@0: // We cannot use simply NS_IMPL_ISUPPORTSx as both michael@0: // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest michael@0: michael@0: NS_IMPL_ADDREF(nsAsyncStreamCopier) michael@0: NS_IMPL_RELEASE(nsAsyncStreamCopier) michael@0: NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier) michael@0: NS_INTERFACE_TABLE_BEGIN michael@0: NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier) michael@0: NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2) michael@0: NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, nsIAsyncStreamCopier) michael@0: NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, nsIAsyncStreamCopier) michael@0: NS_INTERFACE_TABLE_END michael@0: NS_INTERFACE_TABLE_TAIL michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsIRequest michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::GetName(nsACString &name) michael@0: { michael@0: name.Truncate(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::IsPending(bool *result) michael@0: { michael@0: *result = !IsComplete(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::GetStatus(nsresult *status) michael@0: { michael@0: IsComplete(status); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::Cancel(nsresult status) michael@0: { michael@0: nsCOMPtr copierCtx; michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: if (!mIsPending) michael@0: return NS_OK; michael@0: copierCtx.swap(mCopierCtx); michael@0: } michael@0: michael@0: if (NS_SUCCEEDED(status)) { michael@0: NS_WARNING("cancel with non-failure status code"); michael@0: status = NS_BASE_STREAM_CLOSED; michael@0: } michael@0: michael@0: if (copierCtx) michael@0: NS_CancelAsyncCopy(copierCtx, status); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::Suspend() michael@0: { michael@0: NS_NOTREACHED("nsAsyncStreamCopier::Suspend"); michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::Resume() michael@0: { michael@0: NS_NOTREACHED("nsAsyncStreamCopier::Resume"); michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags *aLoadFlags) michael@0: { michael@0: *aLoadFlags = LOAD_NORMAL; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags) michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup **aLoadGroup) michael@0: { michael@0: *aLoadGroup = nullptr; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup *aLoadGroup) michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: nsAsyncStreamCopier::InitInternal(nsIInputStream *source, michael@0: nsIOutputStream *sink, michael@0: nsIEventTarget *target, michael@0: uint32_t chunkSize, michael@0: bool closeSource, michael@0: bool closeSink) michael@0: { michael@0: NS_ASSERTION(!mSource && !mSink, "Init() called more than once"); michael@0: if (chunkSize == 0) { michael@0: chunkSize = nsIOService::gDefaultSegmentSize; michael@0: } michael@0: mChunkSize = chunkSize; michael@0: michael@0: mSource = source; michael@0: mSink = sink; michael@0: mCloseSource = closeSource; michael@0: mCloseSink = closeSink; michael@0: michael@0: if (target) { michael@0: mTarget = target; michael@0: } else { michael@0: nsresult rv; michael@0: mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: return rv; michael@0: } michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsIAsyncStreamCopier michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::Init(nsIInputStream *source, michael@0: nsIOutputStream *sink, michael@0: nsIEventTarget *target, michael@0: bool sourceBuffered, michael@0: bool sinkBuffered, michael@0: uint32_t chunkSize, michael@0: bool closeSource, michael@0: bool closeSink) michael@0: { michael@0: NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered"); michael@0: mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS michael@0: : NS_ASYNCCOPY_VIA_WRITESEGMENTS; michael@0: michael@0: return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsIAsyncStreamCopier2 michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::Init(nsIInputStream *source, michael@0: nsIOutputStream *sink, michael@0: nsIEventTarget *target, michael@0: uint32_t chunkSize, michael@0: bool closeSource, michael@0: bool closeSink) michael@0: { michael@0: mShouldSniffBuffering = true; michael@0: michael@0: return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); michael@0: } michael@0: michael@0: /** michael@0: * Detect whether the input or the output stream is buffered, michael@0: * bufferize one of them if neither is buffered. michael@0: */ michael@0: nsresult michael@0: nsAsyncStreamCopier::ApplyBufferingPolicy() michael@0: { michael@0: // This function causes I/O, it must not be executed on the main michael@0: // thread. michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: michael@0: if (NS_OutputStreamIsBuffered(mSink)) { michael@0: // Sink is buffered, no need to perform additional buffering michael@0: mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; michael@0: return NS_OK; michael@0: } michael@0: if (NS_InputStreamIsBuffered(mSource)) { michael@0: // Source is buffered, no need to perform additional buffering michael@0: mMode = NS_ASYNCCOPY_VIA_READSEGMENTS; michael@0: return NS_OK; michael@0: } michael@0: michael@0: // No buffering, let's buffer the sink michael@0: nsresult rv; michael@0: nsCOMPtr sink = michael@0: do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: return rv; michael@0: } michael@0: michael@0: rv = sink->Init(mSink, mChunkSize); michael@0: if (NS_FAILED(rv)) { michael@0: return rv; michael@0: } michael@0: michael@0: mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; michael@0: mSink = sink; michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2 michael@0: michael@0: NS_IMETHODIMP michael@0: nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx) michael@0: { michael@0: LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%x]\n", this, observer)); michael@0: michael@0: NS_ASSERTION(mSource && mSink, "not initialized"); michael@0: nsresult rv; michael@0: michael@0: if (observer) { michael@0: // build proxy for observer events michael@0: rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx); michael@0: if (NS_FAILED(rv)) return rv; michael@0: } michael@0: michael@0: // from this point forward, AsyncCopy is going to return NS_OK. any errors michael@0: // will be reported via OnStopRequest. michael@0: mIsPending = true; michael@0: michael@0: if (mObserver) { michael@0: rv = mObserver->OnStartRequest(AsRequest(), nullptr); michael@0: if (NS_FAILED(rv)) michael@0: Cancel(rv); michael@0: } michael@0: michael@0: if (!mShouldSniffBuffering) { michael@0: // No buffer sniffing required, let's proceed michael@0: AsyncCopyInternal(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (NS_IsMainThread()) { michael@0: // Don't perform buffer sniffing on the main thread michael@0: nsCOMPtr event michael@0: = new AsyncApplyBufferingPolicyEvent(this); michael@0: rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); michael@0: if (NS_FAILED(rv)) { michael@0: Cancel(rv); michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: // We're not going to block the main thread, so let's sniff here michael@0: rv = ApplyBufferingPolicy(); michael@0: if (NS_FAILED(rv)) { michael@0: Cancel(rv); michael@0: } michael@0: AsyncCopyInternal(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Launch async copy. michael@0: // All errors are reported through the observer. michael@0: void michael@0: nsAsyncStreamCopier::AsyncCopyInternal() michael@0: { michael@0: MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS michael@0: || mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS); michael@0: michael@0: nsresult rv; michael@0: // we want to receive progress notifications; release happens in michael@0: // OnAsyncCopyComplete. michael@0: NS_ADDREF_THIS(); michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize, michael@0: OnAsyncCopyComplete, this, mCloseSource, mCloseSink, michael@0: getter_AddRefs(mCopierCtx)); michael@0: } michael@0: if (NS_FAILED(rv)) { michael@0: NS_RELEASE_THIS(); michael@0: Cancel(rv); michael@0: } michael@0: } michael@0: michael@0: