michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "nsStreamTransportService.h" michael@0: #include "nsXPCOMCIDInternal.h" michael@0: #include "nsNetSegmentUtils.h" michael@0: #include "nsTransportUtils.h" michael@0: #include "nsStreamUtils.h" michael@0: #include "nsError.h" michael@0: #include "nsNetCID.h" michael@0: michael@0: #include "nsIAsyncInputStream.h" michael@0: #include "nsIAsyncOutputStream.h" michael@0: #include "nsISeekableStream.h" michael@0: #include "nsIPipe.h" michael@0: #include "nsITransport.h" michael@0: #include "nsIObserverService.h" michael@0: #include "nsIThreadPool.h" michael@0: #include "mozilla/Services.h" michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsInputStreamTransport michael@0: // michael@0: // Implements nsIInputStream as a wrapper around the real input stream. This michael@0: // allows the transport to support seeking, range-limiting, progress reporting, michael@0: // and close-when-done semantics while utilizing NS_AsyncCopy. michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class nsInputStreamTransport : public nsITransport michael@0: , public nsIInputStream michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: NS_DECL_NSITRANSPORT michael@0: NS_DECL_NSIINPUTSTREAM michael@0: michael@0: nsInputStreamTransport(nsIInputStream *source, michael@0: uint64_t offset, michael@0: uint64_t limit, michael@0: bool closeWhenDone) michael@0: : mSource(source) michael@0: , mOffset(offset) michael@0: , mLimit(limit) michael@0: , mCloseWhenDone(closeWhenDone) michael@0: , mFirstTime(true) michael@0: , mInProgress(false) michael@0: { michael@0: } michael@0: michael@0: virtual ~nsInputStreamTransport() michael@0: { michael@0: } michael@0: michael@0: private: michael@0: nsCOMPtr mPipeIn; michael@0: michael@0: // while the copy is active, these members may only be accessed from the michael@0: // nsIInputStream implementation. michael@0: nsCOMPtr mEventSink; michael@0: nsCOMPtr mSource; michael@0: uint64_t mOffset; michael@0: uint64_t mLimit; michael@0: bool mCloseWhenDone; michael@0: bool mFirstTime; michael@0: michael@0: // this variable serves as a lock to prevent the state of the transport michael@0: // from being modified once the copy is in progress. michael@0: bool mInProgress; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(nsInputStreamTransport, michael@0: nsITransport, michael@0: nsIInputStream) michael@0: michael@0: /** nsITransport **/ michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::OpenInputStream(uint32_t flags, michael@0: uint32_t segsize, michael@0: uint32_t segcount, michael@0: nsIInputStream **result) michael@0: { michael@0: NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); michael@0: michael@0: nsresult rv; michael@0: nsCOMPtr target = michael@0: do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: // XXX if the caller requests an unbuffered stream, then perhaps michael@0: // we'd want to simply return mSource; however, then we would michael@0: // not be reading mSource on a background thread. is this ok? michael@0: michael@0: bool nonblocking = !(flags & OPEN_BLOCKING); michael@0: michael@0: net_ResolveSegmentParams(segsize, segcount); michael@0: michael@0: nsCOMPtr pipeOut; michael@0: rv = NS_NewPipe2(getter_AddRefs(mPipeIn), michael@0: getter_AddRefs(pipeOut), michael@0: nonblocking, true, michael@0: segsize, segcount); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: mInProgress = true; michael@0: michael@0: // startup async copy process... michael@0: rv = NS_AsyncCopy(this, pipeOut, target, michael@0: NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize); michael@0: if (NS_SUCCEEDED(rv)) michael@0: NS_ADDREF(*result = mPipeIn); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::OpenOutputStream(uint32_t flags, michael@0: uint32_t segsize, michael@0: uint32_t segcount, michael@0: nsIOutputStream **result) michael@0: { michael@0: // this transport only supports reading! michael@0: NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream"); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::Close(nsresult reason) michael@0: { michael@0: if (NS_SUCCEEDED(reason)) michael@0: reason = NS_BASE_STREAM_CLOSED; michael@0: michael@0: return mPipeIn->CloseWithStatus(reason); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink, michael@0: nsIEventTarget *target) michael@0: { michael@0: NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); michael@0: michael@0: if (target) michael@0: return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), michael@0: sink, target); michael@0: michael@0: mEventSink = sink; michael@0: return NS_OK; michael@0: } michael@0: michael@0: /** nsIInputStream **/ michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::Close() michael@0: { michael@0: if (mCloseWhenDone) michael@0: mSource->Close(); michael@0: michael@0: // make additional reads return early... michael@0: mOffset = mLimit = 0; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::Available(uint64_t *result) michael@0: { michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) michael@0: { michael@0: if (mFirstTime) { michael@0: mFirstTime = false; michael@0: if (mOffset != 0) { michael@0: // read from current position if offset equal to max michael@0: if (mOffset != UINT64_MAX) { michael@0: nsCOMPtr seekable = do_QueryInterface(mSource); michael@0: if (seekable) michael@0: seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); michael@0: } michael@0: // reset offset to zero so we can use it to enforce limit michael@0: mOffset = 0; michael@0: } michael@0: } michael@0: michael@0: // limit amount read michael@0: uint64_t max = mLimit - mOffset; michael@0: if (max == 0) { michael@0: *result = 0; michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (count > max) michael@0: count = static_cast(max); michael@0: michael@0: nsresult rv = mSource->Read(buf, count, result); michael@0: michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mOffset += *result; michael@0: if (mEventSink) michael@0: mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, michael@0: mLimit); michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure, michael@0: uint32_t count, uint32_t *result) michael@0: { michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTransport::IsNonBlocking(bool *result) michael@0: { michael@0: *result = false; michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsOutputStreamTransport michael@0: // michael@0: // Implements nsIOutputStream as a wrapper around the real input stream. This michael@0: // allows the transport to support seeking, range-limiting, progress reporting, michael@0: // and close-when-done semantics while utilizing NS_AsyncCopy. michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class nsOutputStreamTransport : public nsITransport michael@0: , public nsIOutputStream michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: NS_DECL_NSITRANSPORT michael@0: NS_DECL_NSIOUTPUTSTREAM michael@0: michael@0: nsOutputStreamTransport(nsIOutputStream *sink, michael@0: uint64_t offset, michael@0: uint64_t limit, michael@0: bool closeWhenDone) michael@0: : mSink(sink) michael@0: , mOffset(offset) michael@0: , mLimit(limit) michael@0: , mCloseWhenDone(closeWhenDone) michael@0: , mFirstTime(true) michael@0: , mInProgress(false) michael@0: { michael@0: } michael@0: michael@0: virtual ~nsOutputStreamTransport() michael@0: { michael@0: } michael@0: michael@0: private: michael@0: nsCOMPtr mPipeOut; michael@0: michael@0: // while the copy is active, these members may only be accessed from the michael@0: // nsIOutputStream implementation. michael@0: nsCOMPtr mEventSink; michael@0: nsCOMPtr mSink; michael@0: uint64_t mOffset; michael@0: uint64_t mLimit; michael@0: bool mCloseWhenDone; michael@0: bool mFirstTime; michael@0: michael@0: // this variable serves as a lock to prevent the state of the transport michael@0: // from being modified once the copy is in progress. michael@0: bool mInProgress; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(nsOutputStreamTransport, michael@0: nsITransport, michael@0: nsIOutputStream) michael@0: michael@0: /** nsITransport **/ michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::OpenInputStream(uint32_t flags, michael@0: uint32_t segsize, michael@0: uint32_t segcount, michael@0: nsIInputStream **result) michael@0: { michael@0: // this transport only supports writing! michael@0: NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream"); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::OpenOutputStream(uint32_t flags, michael@0: uint32_t segsize, michael@0: uint32_t segcount, michael@0: nsIOutputStream **result) michael@0: { michael@0: NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); michael@0: michael@0: nsresult rv; michael@0: nsCOMPtr target = michael@0: do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: // XXX if the caller requests an unbuffered stream, then perhaps michael@0: // we'd want to simply return mSink; however, then we would michael@0: // not be writing to mSink on a background thread. is this ok? michael@0: michael@0: bool nonblocking = !(flags & OPEN_BLOCKING); michael@0: michael@0: net_ResolveSegmentParams(segsize, segcount); michael@0: michael@0: nsCOMPtr pipeIn; michael@0: rv = NS_NewPipe2(getter_AddRefs(pipeIn), michael@0: getter_AddRefs(mPipeOut), michael@0: true, nonblocking, michael@0: segsize, segcount); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: mInProgress = true; michael@0: michael@0: // startup async copy process... michael@0: rv = NS_AsyncCopy(pipeIn, this, target, michael@0: NS_ASYNCCOPY_VIA_READSEGMENTS, segsize); michael@0: if (NS_SUCCEEDED(rv)) michael@0: NS_ADDREF(*result = mPipeOut); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::Close(nsresult reason) michael@0: { michael@0: if (NS_SUCCEEDED(reason)) michael@0: reason = NS_BASE_STREAM_CLOSED; michael@0: michael@0: return mPipeOut->CloseWithStatus(reason); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink, michael@0: nsIEventTarget *target) michael@0: { michael@0: NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); michael@0: michael@0: if (target) michael@0: return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), michael@0: sink, target); michael@0: michael@0: mEventSink = sink; michael@0: return NS_OK; michael@0: } michael@0: michael@0: /** nsIOutputStream **/ michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::Close() michael@0: { michael@0: if (mCloseWhenDone) michael@0: mSink->Close(); michael@0: michael@0: // make additional writes return early... michael@0: mOffset = mLimit = 0; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::Flush() michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result) michael@0: { michael@0: if (mFirstTime) { michael@0: mFirstTime = false; michael@0: if (mOffset != 0) { michael@0: // write to current position if offset equal to max michael@0: if (mOffset != UINT64_MAX) { michael@0: nsCOMPtr seekable = do_QueryInterface(mSink); michael@0: if (seekable) michael@0: seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); michael@0: } michael@0: // reset offset to zero so we can use it to enforce limit michael@0: mOffset = 0; michael@0: } michael@0: } michael@0: michael@0: // limit amount written michael@0: uint64_t max = mLimit - mOffset; michael@0: if (max == 0) { michael@0: *result = 0; michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (count > max) michael@0: count = static_cast(max); michael@0: michael@0: nsresult rv = mSink->Write(buf, count, result); michael@0: michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mOffset += *result; michael@0: if (mEventSink) michael@0: mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset, michael@0: mLimit); michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure, michael@0: uint32_t count, uint32_t *result) michael@0: { michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result) michael@0: { michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsOutputStreamTransport::IsNonBlocking(bool *result) michael@0: { michael@0: *result = false; michael@0: return NS_OK; michael@0: } michael@0: michael@0: #ifdef MOZ_NUWA_PROCESS michael@0: #include "ipc/Nuwa.h" michael@0: michael@0: class STSThreadPoolListener : public nsIThreadPoolListener michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: NS_DECL_NSITHREADPOOLLISTENER michael@0: michael@0: STSThreadPoolListener() {} michael@0: ~STSThreadPoolListener() {} michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(STSThreadPoolListener, nsIThreadPoolListener) michael@0: michael@0: NS_IMETHODIMP michael@0: STSThreadPoolListener::OnThreadCreated() michael@0: { michael@0: if (IsNuwaProcess()) { michael@0: NuwaMarkCurrentThread(nullptr, nullptr); michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: STSThreadPoolListener::OnThreadShuttingDown() michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: #endif // MOZ_NUWA_PROCESS michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsStreamTransportService michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsStreamTransportService::~nsStreamTransportService() michael@0: { michael@0: NS_ASSERTION(!mPool, "thread pool wasn't shutdown"); michael@0: } michael@0: michael@0: nsresult michael@0: nsStreamTransportService::Init() michael@0: { michael@0: mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID); michael@0: NS_ENSURE_STATE(mPool); michael@0: michael@0: // Configure the pool michael@0: mPool->SetName(NS_LITERAL_CSTRING("StreamTrans")); michael@0: mPool->SetThreadLimit(25); michael@0: mPool->SetIdleThreadLimit(1); michael@0: mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30)); michael@0: #ifdef MOZ_NUWA_PROCESS michael@0: if (IsNuwaProcess()) { michael@0: mPool->SetListener(new STSThreadPoolListener()); michael@0: } michael@0: #endif michael@0: michael@0: nsCOMPtr obsSvc = michael@0: mozilla::services::GetObserverService(); michael@0: if (obsSvc) michael@0: obsSvc->AddObserver(this, "xpcom-shutdown-threads", false); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMPL_ISUPPORTS(nsStreamTransportService, michael@0: nsIStreamTransportService, michael@0: nsIEventTarget, michael@0: nsIObserver) michael@0: michael@0: NS_IMETHODIMP michael@0: nsStreamTransportService::Dispatch(nsIRunnable *task, uint32_t flags) michael@0: { michael@0: NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED); michael@0: return mPool->Dispatch(task, flags); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsStreamTransportService::IsOnCurrentThread(bool *result) michael@0: { michael@0: NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED); michael@0: return mPool->IsOnCurrentThread(result); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsStreamTransportService::CreateInputTransport(nsIInputStream *stream, michael@0: int64_t offset, michael@0: int64_t limit, michael@0: bool closeWhenDone, michael@0: nsITransport **result) michael@0: { michael@0: nsInputStreamTransport *trans = michael@0: new nsInputStreamTransport(stream, offset, limit, closeWhenDone); michael@0: if (!trans) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: NS_ADDREF(*result = trans); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream, michael@0: int64_t offset, michael@0: int64_t limit, michael@0: bool closeWhenDone, michael@0: nsITransport **result) michael@0: { michael@0: nsOutputStreamTransport *trans = michael@0: new nsOutputStreamTransport(stream, offset, limit, closeWhenDone); michael@0: if (!trans) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: NS_ADDREF(*result = trans); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsStreamTransportService::Observe(nsISupports *subject, const char *topic, michael@0: const char16_t *data) michael@0: { michael@0: NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops"); michael@0: michael@0: if (mPool) { michael@0: mPool->Shutdown(); michael@0: mPool = nullptr; michael@0: } michael@0: return NS_OK; michael@0: }