diff -r 000000000000 -r 6474c204b198 xpcom/io/nsStreamUtils.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xpcom/io/nsStreamUtils.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,815 @@ +/* vim:set ts=4 sw=4 sts=4 et cin: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/Mutex.h" +#include "mozilla/Attributes.h" +#include "nsStreamUtils.h" +#include "nsAutoPtr.h" +#include "nsCOMPtr.h" +#include "nsIPipe.h" +#include "nsIEventTarget.h" +#include "nsIRunnable.h" +#include "nsISafeOutputStream.h" +#include "nsString.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsIBufferedStreams.h" + +using namespace mozilla; + +//----------------------------------------------------------------------------- + +class nsInputStreamReadyEvent MOZ_FINAL : public nsIRunnable + , public nsIInputStreamCallback +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + + nsInputStreamReadyEvent(nsIInputStreamCallback *callback, + nsIEventTarget *target) + : mCallback(callback) + , mTarget(target) + { + } + +private: + ~nsInputStreamReadyEvent() + { + if (!mCallback) + return; + // + // whoa!! looks like we never posted this event. take care to + // release mCallback on the correct thread. if mTarget lives on the + // calling thread, then we are ok. otherwise, we have to try to + // proxy the Release over the right thread. if that thread is dead, + // then there's nothing we can do... better to leak than crash. + // + bool val; + nsresult rv = mTarget->IsOnCurrentThread(&val); + if (NS_FAILED(rv) || !val) { + nsCOMPtr event = + NS_NewInputStreamReadyEvent(mCallback, mTarget); + mCallback = nullptr; + if (event) { + rv = event->OnInputStreamReady(nullptr); + if (NS_FAILED(rv)) { + NS_NOTREACHED("leaking stream event"); + nsISupports *sup = event; + NS_ADDREF(sup); + } + } + } + } + +public: + NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *stream) + { + mStream = stream; + + nsresult rv = + mTarget->Dispatch(this, NS_DISPATCH_NORMAL); + if (NS_FAILED(rv)) { + NS_WARNING("Dispatch failed"); + return NS_ERROR_FAILURE; + } + + return NS_OK; + } + + NS_IMETHOD Run() + { + if (mCallback) { + if (mStream) + mCallback->OnInputStreamReady(mStream); + mCallback = nullptr; + } + return NS_OK; + } + +private: + nsCOMPtr mStream; + nsCOMPtr mCallback; + nsCOMPtr mTarget; +}; + +NS_IMPL_ISUPPORTS(nsInputStreamReadyEvent, nsIRunnable, + nsIInputStreamCallback) + +//----------------------------------------------------------------------------- + +class nsOutputStreamReadyEvent MOZ_FINAL : public nsIRunnable + , public nsIOutputStreamCallback +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + + nsOutputStreamReadyEvent(nsIOutputStreamCallback *callback, + nsIEventTarget *target) + : mCallback(callback) + , mTarget(target) + { + } + +private: + ~nsOutputStreamReadyEvent() + { + if (!mCallback) + return; + // + // whoa!! looks like we never posted this event. take care to + // release mCallback on the correct thread. if mTarget lives on the + // calling thread, then we are ok. otherwise, we have to try to + // proxy the Release over the right thread. if that thread is dead, + // then there's nothing we can do... better to leak than crash. + // + bool val; + nsresult rv = mTarget->IsOnCurrentThread(&val); + if (NS_FAILED(rv) || !val) { + nsCOMPtr event = + NS_NewOutputStreamReadyEvent(mCallback, mTarget); + mCallback = nullptr; + if (event) { + rv = event->OnOutputStreamReady(nullptr); + if (NS_FAILED(rv)) { + NS_NOTREACHED("leaking stream event"); + nsISupports *sup = event; + NS_ADDREF(sup); + } + } + } + } + +public: + NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *stream) + { + mStream = stream; + + nsresult rv = + mTarget->Dispatch(this, NS_DISPATCH_NORMAL); + if (NS_FAILED(rv)) { + NS_WARNING("PostEvent failed"); + return NS_ERROR_FAILURE; + } + + return NS_OK; + } + + NS_IMETHOD Run() + { + if (mCallback) { + if (mStream) + mCallback->OnOutputStreamReady(mStream); + mCallback = nullptr; + } + return NS_OK; + } + +private: + nsCOMPtr mStream; + nsCOMPtr mCallback; + nsCOMPtr mTarget; +}; + +NS_IMPL_ISUPPORTS(nsOutputStreamReadyEvent, nsIRunnable, + nsIOutputStreamCallback) + +//----------------------------------------------------------------------------- + +already_AddRefed +NS_NewInputStreamReadyEvent(nsIInputStreamCallback *callback, + nsIEventTarget *target) +{ + NS_ASSERTION(callback, "null callback"); + NS_ASSERTION(target, "null target"); + nsRefPtr ev = + new nsInputStreamReadyEvent(callback, target); + return ev.forget(); +} + +already_AddRefed +NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback *callback, + nsIEventTarget *target) +{ + NS_ASSERTION(callback, "null callback"); + NS_ASSERTION(target, "null target"); + nsRefPtr ev = + new nsOutputStreamReadyEvent(callback, target); + return ev.forget(); +} + +//----------------------------------------------------------------------------- +// NS_AsyncCopy implementation + +// abstract stream copier... +class nsAStreamCopier : public nsIInputStreamCallback + , public nsIOutputStreamCallback + , public nsIRunnable +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + + nsAStreamCopier() + : mLock("nsAStreamCopier.mLock") + , mCallback(nullptr) + , mProgressCallback(nullptr) + , mClosure(nullptr) + , mChunkSize(0) + , mEventInProcess(false) + , mEventIsPending(false) + , mCloseSource(true) + , mCloseSink(true) + , mCanceled(false) + , mCancelStatus(NS_OK) + { + } + + // virtual since subclasses call superclass Release() + virtual ~nsAStreamCopier() + { + } + + // kick off the async copy... + nsresult Start(nsIInputStream *source, + nsIOutputStream *sink, + nsIEventTarget *target, + nsAsyncCopyCallbackFun callback, + void *closure, + uint32_t chunksize, + bool closeSource, + bool closeSink, + nsAsyncCopyProgressFun progressCallback) + { + mSource = source; + mSink = sink; + mTarget = target; + mCallback = callback; + mClosure = closure; + mChunkSize = chunksize; + mCloseSource = closeSource; + mCloseSink = closeSink; + mProgressCallback = progressCallback; + + mAsyncSource = do_QueryInterface(mSource); + mAsyncSink = do_QueryInterface(mSink); + + return PostContinuationEvent(); + } + + // implemented by subclasses, returns number of bytes copied and + // sets source and sink condition before returning. + virtual uint32_t DoCopy(nsresult *sourceCondition, nsresult *sinkCondition) = 0; + + void Process() + { + if (!mSource || !mSink) + return; + + nsresult sourceCondition, sinkCondition; + nsresult cancelStatus; + bool canceled; + { + MutexAutoLock lock(mLock); + canceled = mCanceled; + cancelStatus = mCancelStatus; + } + + // Copy data from the source to the sink until we hit failure or have + // copied all the data. + for (;;) { + // Note: copyFailed will be true if the source or the sink have + // reported an error, or if we failed to write any bytes + // because we have consumed all of our data. + bool copyFailed = false; + if (!canceled) { + uint32_t n = DoCopy(&sourceCondition, &sinkCondition); + if (n > 0 && mProgressCallback) { + mProgressCallback(mClosure, n); + } + copyFailed = NS_FAILED(sourceCondition) || + NS_FAILED(sinkCondition) || n == 0; + + MutexAutoLock lock(mLock); + canceled = mCanceled; + cancelStatus = mCancelStatus; + } + if (copyFailed && !canceled) { + if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) { + // need to wait for more data from source. while waiting for + // more source data, be sure to observe failures on output end. + mAsyncSource->AsyncWait(this, 0, 0, nullptr); + + if (mAsyncSink) + mAsyncSink->AsyncWait(this, + nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, + 0, nullptr); + break; + } + else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) { + // need to wait for more room in the sink. while waiting for + // more room in the sink, be sure to observer failures on the + // input end. + mAsyncSink->AsyncWait(this, 0, 0, nullptr); + + if (mAsyncSource) + mAsyncSource->AsyncWait(this, + nsIAsyncInputStream::WAIT_CLOSURE_ONLY, + 0, nullptr); + break; + } + } + if (copyFailed || canceled) { + if (mCloseSource) { + // close source + if (mAsyncSource) + mAsyncSource->CloseWithStatus(canceled ? cancelStatus : + sinkCondition); + else + mSource->Close(); + } + mAsyncSource = nullptr; + mSource = nullptr; + + if (mCloseSink) { + // close sink + if (mAsyncSink) + mAsyncSink->CloseWithStatus(canceled ? cancelStatus : + sourceCondition); + else { + // If we have an nsISafeOutputStream, and our + // sourceCondition and sinkCondition are not set to a + // failure state, finish writing. + nsCOMPtr sostream = + do_QueryInterface(mSink); + if (sostream && NS_SUCCEEDED(sourceCondition) && + NS_SUCCEEDED(sinkCondition)) + sostream->Finish(); + else + mSink->Close(); + } + } + mAsyncSink = nullptr; + mSink = nullptr; + + // notify state complete... + if (mCallback) { + nsresult status; + if (!canceled) { + status = sourceCondition; + if (NS_SUCCEEDED(status)) + status = sinkCondition; + if (status == NS_BASE_STREAM_CLOSED) + status = NS_OK; + } else { + status = cancelStatus; + } + mCallback(mClosure, status); + } + break; + } + } + } + + nsresult Cancel(nsresult aReason) + { + MutexAutoLock lock(mLock); + if (mCanceled) + return NS_ERROR_FAILURE; + + if (NS_SUCCEEDED(aReason)) { + NS_WARNING("cancel with non-failure status code"); + aReason = NS_BASE_STREAM_CLOSED; + } + + mCanceled = true; + mCancelStatus = aReason; + return NS_OK; + } + + NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *source) + { + PostContinuationEvent(); + return NS_OK; + } + + NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *sink) + { + PostContinuationEvent(); + return NS_OK; + } + + // continuation event handler + NS_IMETHOD Run() + { + Process(); + + // clear "in process" flag and post any pending continuation event + MutexAutoLock lock(mLock); + mEventInProcess = false; + if (mEventIsPending) { + mEventIsPending = false; + PostContinuationEvent_Locked(); + } + + return NS_OK; + } + + nsresult PostContinuationEvent() + { + // we cannot post a continuation event if there is currently + // an event in process. doing so could result in Process being + // run simultaneously on multiple threads, so we mark the event + // as pending, and if an event is already in process then we + // just let that existing event take care of posting the real + // continuation event. + + MutexAutoLock lock(mLock); + return PostContinuationEvent_Locked(); + } + + nsresult PostContinuationEvent_Locked() + { + nsresult rv = NS_OK; + if (mEventInProcess) + mEventIsPending = true; + else { + rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL); + if (NS_SUCCEEDED(rv)) + mEventInProcess = true; + else + NS_WARNING("unable to post continuation event"); + } + return rv; + } + +protected: + nsCOMPtr mSource; + nsCOMPtr mSink; + nsCOMPtr mAsyncSource; + nsCOMPtr mAsyncSink; + nsCOMPtr mTarget; + Mutex mLock; + nsAsyncCopyCallbackFun mCallback; + nsAsyncCopyProgressFun mProgressCallback; + void *mClosure; + uint32_t mChunkSize; + bool mEventInProcess; + bool mEventIsPending; + bool mCloseSource; + bool mCloseSink; + bool mCanceled; + nsresult mCancelStatus; +}; + +NS_IMPL_ISUPPORTS(nsAStreamCopier, + nsIInputStreamCallback, + nsIOutputStreamCallback, + nsIRunnable) + +class nsStreamCopierIB MOZ_FINAL : public nsAStreamCopier +{ +public: + nsStreamCopierIB() : nsAStreamCopier() {} + virtual ~nsStreamCopierIB() {} + + struct ReadSegmentsState { + nsIOutputStream *mSink; + nsresult mSinkCondition; + }; + + static NS_METHOD ConsumeInputBuffer(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) + { + ReadSegmentsState *state = (ReadSegmentsState *) closure; + + nsresult rv = state->mSink->Write(buffer, count, countWritten); + if (NS_FAILED(rv)) + state->mSinkCondition = rv; + else if (*countWritten == 0) + state->mSinkCondition = NS_BASE_STREAM_CLOSED; + + return state->mSinkCondition; + } + + uint32_t DoCopy(nsresult *sourceCondition, nsresult *sinkCondition) + { + ReadSegmentsState state; + state.mSink = mSink; + state.mSinkCondition = NS_OK; + + uint32_t n; + *sourceCondition = + mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n); + *sinkCondition = state.mSinkCondition; + return n; + } +}; + +class nsStreamCopierOB MOZ_FINAL : public nsAStreamCopier +{ +public: + nsStreamCopierOB() : nsAStreamCopier() {} + virtual ~nsStreamCopierOB() {} + + struct WriteSegmentsState { + nsIInputStream *mSource; + nsresult mSourceCondition; + }; + + static NS_METHOD FillOutputBuffer(nsIOutputStream *outStr, + void *closure, + char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countRead) + { + WriteSegmentsState *state = (WriteSegmentsState *) closure; + + nsresult rv = state->mSource->Read(buffer, count, countRead); + if (NS_FAILED(rv)) + state->mSourceCondition = rv; + else if (*countRead == 0) + state->mSourceCondition = NS_BASE_STREAM_CLOSED; + + return state->mSourceCondition; + } + + uint32_t DoCopy(nsresult *sourceCondition, nsresult *sinkCondition) + { + WriteSegmentsState state; + state.mSource = mSource; + state.mSourceCondition = NS_OK; + + uint32_t n; + *sinkCondition = + mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n); + *sourceCondition = state.mSourceCondition; + return n; + } +}; + +//----------------------------------------------------------------------------- + +nsresult +NS_AsyncCopy(nsIInputStream *source, + nsIOutputStream *sink, + nsIEventTarget *target, + nsAsyncCopyMode mode, + uint32_t chunkSize, + nsAsyncCopyCallbackFun callback, + void *closure, + bool closeSource, + bool closeSink, + nsISupports **aCopierCtx, + nsAsyncCopyProgressFun progressCallback) +{ + NS_ASSERTION(target, "non-null target required"); + + nsresult rv; + nsAStreamCopier *copier; + + if (mode == NS_ASYNCCOPY_VIA_READSEGMENTS) + copier = new nsStreamCopierIB(); + else + copier = new nsStreamCopierOB(); + + if (!copier) + return NS_ERROR_OUT_OF_MEMORY; + + // Start() takes an owning ref to the copier... + NS_ADDREF(copier); + rv = copier->Start(source, sink, target, callback, closure, chunkSize, + closeSource, closeSink, progressCallback); + + if (aCopierCtx) { + *aCopierCtx = static_cast( + static_cast(copier)); + NS_ADDREF(*aCopierCtx); + } + NS_RELEASE(copier); + + return rv; +} + +//----------------------------------------------------------------------------- + +nsresult +NS_CancelAsyncCopy(nsISupports *aCopierCtx, nsresult aReason) +{ + nsAStreamCopier *copier = static_cast( + static_cast(aCopierCtx)); + return copier->Cancel(aReason); +} + +//----------------------------------------------------------------------------- + +nsresult +NS_ConsumeStream(nsIInputStream *stream, uint32_t maxCount, nsACString &result) +{ + nsresult rv = NS_OK; + result.Truncate(); + + while (maxCount) { + uint64_t avail64; + rv = stream->Available(&avail64); + if (NS_FAILED(rv)) { + if (rv == NS_BASE_STREAM_CLOSED) + rv = NS_OK; + break; + } + if (avail64 == 0) + break; + + uint32_t avail = (uint32_t)XPCOM_MIN(avail64, maxCount); + + // resize result buffer + uint32_t length = result.Length(); + if (avail > UINT32_MAX - length) + return NS_ERROR_FILE_TOO_BIG; + + result.SetLength(length + avail); + if (result.Length() != (length + avail)) + return NS_ERROR_OUT_OF_MEMORY; + char *buf = result.BeginWriting() + length; + + uint32_t n; + rv = stream->Read(buf, avail, &n); + if (NS_FAILED(rv)) + break; + if (n != avail) + result.SetLength(length + n); + if (n == 0) + break; + maxCount -= n; + } + + return rv; +} + +//----------------------------------------------------------------------------- + +static NS_METHOD +TestInputStream(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) +{ + bool *result = static_cast(closure); + *result = true; + return NS_ERROR_ABORT; // don't call me anymore +} + +bool +NS_InputStreamIsBuffered(nsIInputStream *stream) +{ + nsCOMPtr bufferedIn = do_QueryInterface(stream); + if (bufferedIn) { + return true; + } + + bool result = false; + uint32_t n; + nsresult rv = stream->ReadSegments(TestInputStream, + &result, 1, &n); + return result || NS_SUCCEEDED(rv); +} + +static NS_METHOD +TestOutputStream(nsIOutputStream *outStr, + void *closure, + char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countRead) +{ + bool *result = static_cast(closure); + *result = true; + return NS_ERROR_ABORT; // don't call me anymore +} + +bool +NS_OutputStreamIsBuffered(nsIOutputStream *stream) +{ + nsCOMPtr bufferedOut = do_QueryInterface(stream); + if (bufferedOut) { + return true; + } + + bool result = false; + uint32_t n; + stream->WriteSegments(TestOutputStream, &result, 1, &n); + return result; +} + +//----------------------------------------------------------------------------- + +NS_METHOD +NS_CopySegmentToStream(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) +{ + nsIOutputStream *outStr = static_cast(closure); + *countWritten = 0; + while (count) { + uint32_t n; + nsresult rv = outStr->Write(buffer, count, &n); + if (NS_FAILED(rv)) + return rv; + buffer += n; + count -= n; + *countWritten += n; + } + return NS_OK; +} + +NS_METHOD +NS_CopySegmentToBuffer(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) +{ + char *toBuf = static_cast(closure); + memcpy(&toBuf[offset], buffer, count); + *countWritten = count; + return NS_OK; +} + +NS_METHOD +NS_CopySegmentToBuffer(nsIOutputStream *outStr, + void *closure, + char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countRead) +{ + const char* fromBuf = static_cast(closure); + memcpy(buffer, &fromBuf[offset], count); + *countRead = count; + return NS_OK; +} + +NS_METHOD +NS_DiscardSegment(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) +{ + *countWritten = count; + return NS_OK; +} + +//----------------------------------------------------------------------------- + +NS_METHOD +NS_WriteSegmentThunk(nsIInputStream *inStr, + void *closure, + const char *buffer, + uint32_t offset, + uint32_t count, + uint32_t *countWritten) +{ + nsWriteSegmentThunk *thunk = static_cast(closure); + return thunk->mFun(thunk->mStream, thunk->mClosure, buffer, offset, count, + countWritten); +} + +NS_METHOD +NS_FillArray(FallibleTArray& aDest, nsIInputStream *aInput, + uint32_t aKeep, uint32_t *aNewBytes) +{ + MOZ_ASSERT(aInput, "null stream"); + MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count"); + + char* aBuffer = aDest.Elements(); + int64_t keepOffset = int64_t(aDest.Length()) - aKeep; + if (0 != aKeep && keepOffset > 0) { + memmove(aBuffer, aBuffer + keepOffset, aKeep); + } + + nsresult rv = + aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes); + if (NS_FAILED(rv)) { + *aNewBytes = 0; + } + // NOTE: we rely on the fact that the new slots are NOT initialized by + // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct() + // in nsTArray.h: + aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes); + + MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow"); + return rv; +}