michael@0: /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include michael@0: #include "prlog.h" michael@0: michael@0: #include "mozilla/Mutex.h" michael@0: #include "mozilla/Attributes.h" michael@0: #include "nsIInputStreamTee.h" michael@0: #include "nsIInputStream.h" michael@0: #include "nsIOutputStream.h" michael@0: #include "nsCOMPtr.h" michael@0: #include "nsAutoPtr.h" michael@0: #include "nsIEventTarget.h" michael@0: #include "nsThreadUtils.h" michael@0: michael@0: using namespace mozilla; michael@0: michael@0: #ifdef LOG michael@0: #undef LOG michael@0: #endif michael@0: #ifdef PR_LOGGING michael@0: static PRLogModuleInfo* michael@0: GetTeeLog() michael@0: { michael@0: static PRLogModuleInfo *sLog; michael@0: if (!sLog) michael@0: sLog = PR_NewLogModule("nsInputStreamTee"); michael@0: return sLog; michael@0: } michael@0: #define LOG(args) PR_LOG(GetTeeLog(), PR_LOG_DEBUG, args) michael@0: #else michael@0: #define LOG(args) michael@0: #endif michael@0: michael@0: class nsInputStreamTee MOZ_FINAL : public nsIInputStreamTee michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: NS_DECL_NSIINPUTSTREAM michael@0: NS_DECL_NSIINPUTSTREAMTEE michael@0: michael@0: nsInputStreamTee(); michael@0: bool SinkIsValid(); michael@0: void InvalidateSink(); michael@0: michael@0: private: michael@0: ~nsInputStreamTee() {} michael@0: michael@0: nsresult TeeSegment(const char *buf, uint32_t count); michael@0: michael@0: static NS_METHOD WriteSegmentFun(nsIInputStream *, void *, const char *, michael@0: uint32_t, uint32_t, uint32_t *); michael@0: michael@0: private: michael@0: nsCOMPtr mSource; michael@0: nsCOMPtr mSink; michael@0: nsCOMPtr mEventTarget; michael@0: nsWriteSegmentFun mWriter; // for implementing ReadSegments michael@0: void *mClosure; // for implementing ReadSegments michael@0: nsAutoPtr mLock; // synchronize access to mSinkIsValid michael@0: bool mSinkIsValid; // False if TeeWriteEvent fails michael@0: }; michael@0: michael@0: class nsInputStreamTeeWriteEvent : public nsRunnable { michael@0: public: michael@0: // aTee's lock is held across construction of this object michael@0: nsInputStreamTeeWriteEvent(const char *aBuf, uint32_t aCount, michael@0: nsIOutputStream *aSink, michael@0: nsInputStreamTee *aTee) michael@0: { michael@0: // copy the buffer - will be free'd by dtor michael@0: mBuf = (char *)malloc(aCount); michael@0: if (mBuf) memcpy(mBuf, (char *)aBuf, aCount); michael@0: mCount = aCount; michael@0: mSink = aSink; michael@0: bool isNonBlocking; michael@0: mSink->IsNonBlocking(&isNonBlocking); michael@0: NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking"); michael@0: mTee = aTee; michael@0: } michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: if (!mBuf) { michael@0: NS_WARNING("nsInputStreamTeeWriteEvent::Run() " michael@0: "memory not allocated\n"); michael@0: return NS_OK; michael@0: } michael@0: NS_ABORT_IF_FALSE(mSink, "mSink is null!"); michael@0: michael@0: // The output stream could have been invalidated between when michael@0: // this event was dispatched and now, so check before writing. michael@0: if (!mTee->SinkIsValid()) { michael@0: return NS_OK; michael@0: } michael@0: michael@0: LOG(("nsInputStreamTeeWriteEvent::Run() [%p]" michael@0: "will write %u bytes to %p\n", michael@0: this, mCount, mSink.get())); michael@0: michael@0: uint32_t totalBytesWritten = 0; michael@0: while (mCount) { michael@0: nsresult rv; michael@0: uint32_t bytesWritten = 0; michael@0: rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %x in writing", michael@0: this,rv)); michael@0: mTee->InvalidateSink(); michael@0: break; michael@0: } michael@0: totalBytesWritten += bytesWritten; michael@0: NS_ASSERTION(bytesWritten <= mCount, "wrote too much"); michael@0: mCount -= bytesWritten; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: protected: michael@0: virtual ~nsInputStreamTeeWriteEvent() michael@0: { michael@0: if (mBuf) free(mBuf); michael@0: mBuf = nullptr; michael@0: } michael@0: michael@0: private: michael@0: char *mBuf; michael@0: uint32_t mCount; michael@0: nsCOMPtr mSink; michael@0: // back pointer to the tee that created this runnable michael@0: nsRefPtr mTee; michael@0: }; michael@0: michael@0: nsInputStreamTee::nsInputStreamTee(): mLock(nullptr) michael@0: , mSinkIsValid(true) michael@0: { michael@0: } michael@0: michael@0: bool michael@0: nsInputStreamTee::SinkIsValid() michael@0: { michael@0: MutexAutoLock lock(*mLock); michael@0: return mSinkIsValid; michael@0: } michael@0: michael@0: void michael@0: nsInputStreamTee::InvalidateSink() michael@0: { michael@0: MutexAutoLock lock(*mLock); michael@0: mSinkIsValid = false; michael@0: } michael@0: michael@0: nsresult michael@0: nsInputStreamTee::TeeSegment(const char *buf, uint32_t count) michael@0: { michael@0: if (!mSink) return NS_OK; // nothing to do michael@0: if (mLock) { // asynchronous case michael@0: NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null."); michael@0: if (!SinkIsValid()) { michael@0: return NS_OK; // nothing to do michael@0: } michael@0: nsRefPtr event = michael@0: new nsInputStreamTeeWriteEvent(buf, count, mSink, this); michael@0: LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", michael@0: this, count)); michael@0: return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL); michael@0: } else { // synchronous case michael@0: NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null."); michael@0: nsresult rv; michael@0: uint32_t totalBytesWritten = 0; michael@0: while (count) { michael@0: uint32_t bytesWritten = 0; michael@0: rv = mSink->Write(buf + totalBytesWritten, count, &bytesWritten); michael@0: if (NS_FAILED(rv)) { michael@0: // ok, this is not a fatal error... just drop our reference to mSink michael@0: // and continue on as if nothing happened. michael@0: NS_WARNING("Write failed (non-fatal)"); michael@0: // catch possible misuse of the input stream tee michael@0: NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream"); michael@0: mSink = 0; michael@0: break; michael@0: } michael@0: totalBytesWritten += bytesWritten; michael@0: NS_ASSERTION(bytesWritten <= count, "wrote too much"); michael@0: count -= bytesWritten; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: } michael@0: michael@0: NS_METHOD michael@0: nsInputStreamTee::WriteSegmentFun(nsIInputStream *in, void *closure, const char *fromSegment, michael@0: uint32_t offset, uint32_t count, uint32_t *writeCount) michael@0: { michael@0: nsInputStreamTee *tee = reinterpret_cast(closure); michael@0: michael@0: nsresult rv = tee->mWriter(in, tee->mClosure, fromSegment, offset, count, writeCount); michael@0: if (NS_FAILED(rv) || (*writeCount == 0)) { michael@0: NS_ASSERTION((NS_FAILED(rv) ? (*writeCount == 0) : true), michael@0: "writer returned an error with non-zero writeCount"); michael@0: return rv; michael@0: } michael@0: michael@0: return tee->TeeSegment(fromSegment, *writeCount); michael@0: } michael@0: michael@0: NS_IMPL_ISUPPORTS(nsInputStreamTee, michael@0: nsIInputStreamTee, michael@0: nsIInputStream) michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::Close() michael@0: { michael@0: if (NS_WARN_IF(!mSource)) michael@0: return NS_ERROR_NOT_INITIALIZED; michael@0: nsresult rv = mSource->Close(); michael@0: mSource = 0; michael@0: mSink = 0; michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::Available(uint64_t *avail) michael@0: { michael@0: if (NS_WARN_IF(!mSource)) michael@0: return NS_ERROR_NOT_INITIALIZED; michael@0: return mSource->Available(avail); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::Read(char *buf, uint32_t count, uint32_t *bytesRead) michael@0: { michael@0: if (NS_WARN_IF(!mSource)) michael@0: return NS_ERROR_NOT_INITIALIZED; michael@0: michael@0: nsresult rv = mSource->Read(buf, count, bytesRead); michael@0: if (NS_FAILED(rv) || (*bytesRead == 0)) michael@0: return rv; michael@0: michael@0: return TeeSegment(buf, *bytesRead); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::ReadSegments(nsWriteSegmentFun writer, michael@0: void *closure, michael@0: uint32_t count, michael@0: uint32_t *bytesRead) michael@0: { michael@0: if (NS_WARN_IF(!mSource)) michael@0: return NS_ERROR_NOT_INITIALIZED; michael@0: michael@0: mWriter = writer; michael@0: mClosure = closure; michael@0: michael@0: return mSource->ReadSegments(WriteSegmentFun, this, count, bytesRead); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::IsNonBlocking(bool *result) michael@0: { michael@0: if (NS_WARN_IF(!mSource)) michael@0: return NS_ERROR_NOT_INITIALIZED; michael@0: return mSource->IsNonBlocking(result); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::SetSource(nsIInputStream *source) michael@0: { michael@0: mSource = source; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::GetSource(nsIInputStream **source) michael@0: { michael@0: NS_IF_ADDREF(*source = mSource); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::SetSink(nsIOutputStream *sink) michael@0: { michael@0: #ifdef DEBUG michael@0: if (sink) { michael@0: bool nonBlocking; michael@0: nsresult rv = sink->IsNonBlocking(&nonBlocking); michael@0: if (NS_FAILED(rv) || nonBlocking) michael@0: NS_ERROR("sink should be a blocking stream"); michael@0: } michael@0: #endif michael@0: mSink = sink; michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::GetSink(nsIOutputStream **sink) michael@0: { michael@0: NS_IF_ADDREF(*sink = mSink); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::SetEventTarget(nsIEventTarget *anEventTarget) michael@0: { michael@0: mEventTarget = anEventTarget; michael@0: if (mEventTarget) { michael@0: // Only need synchronization if this is an async tee michael@0: mLock = new Mutex("nsInputStreamTee.mLock"); michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: nsInputStreamTee::GetEventTarget(nsIEventTarget **anEventTarget) michael@0: { michael@0: NS_IF_ADDREF(*anEventTarget = mEventTarget); michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: nsresult michael@0: NS_NewInputStreamTeeAsync(nsIInputStream **result, michael@0: nsIInputStream *source, michael@0: nsIOutputStream *sink, michael@0: nsIEventTarget *anEventTarget) michael@0: { michael@0: nsresult rv; michael@0: michael@0: nsCOMPtr tee = new nsInputStreamTee(); michael@0: if (!tee) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: rv = tee->SetSource(source); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: rv = tee->SetSink(sink); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: rv = tee->SetEventTarget(anEventTarget); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: NS_ADDREF(*result = tee); michael@0: return rv; michael@0: } michael@0: michael@0: nsresult michael@0: NS_NewInputStreamTee(nsIInputStream **result, michael@0: nsIInputStream *source, michael@0: nsIOutputStream *sink) michael@0: { michael@0: return NS_NewInputStreamTeeAsync(result, source, sink, nullptr); michael@0: } michael@0: michael@0: #undef LOG