diff -r 000000000000 -r 6474c204b198 xpcom/io/nsPipe3.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xpcom/io/nsPipe3.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,1310 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/Attributes.h" +#include "mozilla/ReentrantMonitor.h" +#include "nsIPipe.h" +#include "nsIEventTarget.h" +#include "nsISeekableStream.h" +#include "nsIProgrammingLanguage.h" +#include "nsSegmentedBuffer.h" +#include "nsStreamUtils.h" +#include "nsCOMPtr.h" +#include "nsCRT.h" +#include "prlog.h" +#include "nsIClassInfoImpl.h" +#include "nsAlgorithm.h" +#include "nsMemory.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" + +using namespace mozilla; + +#ifdef LOG +#undef LOG +#endif +#if defined(PR_LOGGING) +// +// set NSPR_LOG_MODULES=nsPipe:5 +// +static PRLogModuleInfo * +GetPipeLog() +{ + static PRLogModuleInfo *sLog; + if (!sLog) + sLog = PR_NewLogModule("nsPipe"); + return sLog; +} +#define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args) +#else +#define LOG(args) +#endif + +#define DEFAULT_SEGMENT_SIZE 4096 +#define DEFAULT_SEGMENT_COUNT 16 + +class nsPipe; +class nsPipeEvents; +class nsPipeInputStream; +class nsPipeOutputStream; + +//----------------------------------------------------------------------------- + +// this class is used to delay notifications until the end of a particular +// scope. it helps avoid the complexity of issuing callbacks while inside +// a critical section. +class nsPipeEvents +{ +public: + nsPipeEvents() { } + ~nsPipeEvents(); + + inline void NotifyInputReady(nsIAsyncInputStream *stream, + nsIInputStreamCallback *callback) + { + NS_ASSERTION(!mInputCallback, "already have an input event"); + mInputStream = stream; + mInputCallback = callback; + } + + inline void NotifyOutputReady(nsIAsyncOutputStream *stream, + nsIOutputStreamCallback *callback) + { + NS_ASSERTION(!mOutputCallback, "already have an output event"); + mOutputStream = stream; + mOutputCallback = callback; + } + +private: + nsCOMPtr mInputStream; + nsCOMPtr mInputCallback; + nsCOMPtr mOutputStream; + nsCOMPtr mOutputCallback; +}; + +//----------------------------------------------------------------------------- + +// the input end of a pipe (allocated as a member of the pipe). +class nsPipeInputStream : public nsIAsyncInputStream + , public nsISeekableStream + , public nsISearchableInputStream + , public nsIClassInfo +{ +public: + // since this class will be allocated as a member of the pipe, we do not + // need our own ref count. instead, we share the lifetime (the ref count) + // of the entire pipe. this macro is just convenience since it does not + // declare a mRefCount variable; however, don't let the name fool you... + // we are not inheriting from nsPipe ;-) + NS_DECL_ISUPPORTS_INHERITED + + NS_DECL_NSIINPUTSTREAM + NS_DECL_NSIASYNCINPUTSTREAM + NS_DECL_NSISEEKABLESTREAM + NS_DECL_NSISEARCHABLEINPUTSTREAM + NS_DECL_NSICLASSINFO + + nsPipeInputStream(nsPipe *pipe) + : mPipe(pipe) + , mReaderRefCnt(0) + , mLogicalOffset(0) + , mBlocking(true) + , mBlocked(false) + , mAvailable(0) + , mCallbackFlags(0) + { } + + nsresult Fill(); + void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } + + uint32_t Available() { return mAvailable; } + void ReduceAvailable(uint32_t avail) { mAvailable -= avail; } + + // synchronously wait for the pipe to become readable. + nsresult Wait(); + + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked reader if any. + bool OnInputReadable(uint32_t bytesWritten, nsPipeEvents &); + bool OnInputException(nsresult, nsPipeEvents &); + +private: + nsPipe *mPipe; + + // separate refcnt so that we know when to close the consumer + mozilla::ThreadSafeAutoRefCnt mReaderRefCnt; + int64_t mLogicalOffset; + bool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + bool mBlocked; + uint32_t mAvailable; + nsCOMPtr mCallback; + uint32_t mCallbackFlags; +}; + +//----------------------------------------------------------------------------- + +// the output end of a pipe (allocated as a member of the pipe). +class nsPipeOutputStream : public nsIAsyncOutputStream + , public nsIClassInfo +{ +public: + // since this class will be allocated as a member of the pipe, we do not + // need our own ref count. instead, we share the lifetime (the ref count) + // of the entire pipe. this macro is just convenience since it does not + // declare a mRefCount variable; however, don't let the name fool you... + // we are not inheriting from nsPipe ;-) + NS_DECL_ISUPPORTS_INHERITED + + NS_DECL_NSIOUTPUTSTREAM + NS_DECL_NSIASYNCOUTPUTSTREAM + NS_DECL_NSICLASSINFO + + nsPipeOutputStream(nsPipe *pipe) + : mPipe(pipe) + , mWriterRefCnt(0) + , mLogicalOffset(0) + , mBlocking(true) + , mBlocked(false) + , mWritable(true) + , mCallbackFlags(0) + { } + + void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } + void SetWritable(bool writable) { mWritable = writable; } + + // synchronously wait for the pipe to become writable. + nsresult Wait(); + + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked writer if any. + bool OnOutputWritable(nsPipeEvents &); + bool OnOutputException(nsresult, nsPipeEvents &); + +private: + nsPipe *mPipe; + + // separate refcnt so that we know when to close the producer + mozilla::ThreadSafeAutoRefCnt mWriterRefCnt; + int64_t mLogicalOffset; + bool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + bool mBlocked; + bool mWritable; + nsCOMPtr mCallback; + uint32_t mCallbackFlags; +}; + +//----------------------------------------------------------------------------- + +class nsPipe MOZ_FINAL : public nsIPipe +{ +public: + friend class nsPipeInputStream; + friend class nsPipeOutputStream; + + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIPIPE + + // nsPipe methods: + nsPipe(); + +private: + ~nsPipe(); + +public: + // + // methods below may only be called while inside the pipe's monitor + // + + void PeekSegment(uint32_t n, char *&cursor, char *&limit); + + // + // methods below may be called while outside the pipe's monitor + // + + nsresult GetReadSegment(const char *&segment, uint32_t &segmentLen); + void AdvanceReadCursor(uint32_t count); + + nsresult GetWriteSegment(char *&segment, uint32_t &segmentLen); + void AdvanceWriteCursor(uint32_t count); + + void OnPipeException(nsresult reason, bool outputOnly = false); + +protected: + // We can't inherit from both nsIInputStream and nsIOutputStream + // because they collide on their Close method. Consequently we nest their + // implementations to avoid the extra object allocation. + nsPipeInputStream mInput; + nsPipeOutputStream mOutput; + + ReentrantMonitor mReentrantMonitor; + nsSegmentedBuffer mBuffer; + + char* mReadCursor; + char* mReadLimit; + + int32_t mWriteSegment; + char* mWriteCursor; + char* mWriteLimit; + + nsresult mStatus; + bool mInited; +}; + +// +// NOTES on buffer architecture: +// +// +-----------------+ - - mBuffer.GetSegment(0) +// | | +// + - - - - - - - - + - - mReadCursor +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ - - mReadLimit +// | +// +-----------------+ +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ +// | +// +-----------------+ - - mBuffer.GetSegment(mWriteSegment) +// |/////////////////| +// |/////////////////| +// |/////////////////| +// + - - - - - - - - + - - mWriteCursor +// | | +// | | +// +-----------------+ - - mWriteLimit +// +// (shaded region contains data) +// +// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for +// small allocations (e.g., 64 byte allocations). this means that buffers may +// be allocated back-to-back. in the diagram above, for example, mReadLimit +// would actually be pointing at the beginning of the next segment. when +// making changes to this file, please keep this fact in mind. +// + +//----------------------------------------------------------------------------- +// nsPipe methods: +//----------------------------------------------------------------------------- + +nsPipe::nsPipe() + : mInput(MOZ_THIS_IN_INITIALIZER_LIST()) + , mOutput(MOZ_THIS_IN_INITIALIZER_LIST()) + , mReentrantMonitor("nsPipe.mReentrantMonitor") + , mReadCursor(nullptr) + , mReadLimit(nullptr) + , mWriteSegment(-1) + , mWriteCursor(nullptr) + , mWriteLimit(nullptr) + , mStatus(NS_OK) + , mInited(false) +{ +} + +nsPipe::~nsPipe() +{ +} + +NS_IMPL_ISUPPORTS(nsPipe, nsIPipe) + +NS_IMETHODIMP +nsPipe::Init(bool nonBlockingIn, + bool nonBlockingOut, + uint32_t segmentSize, + uint32_t segmentCount) +{ + mInited = true; + + if (segmentSize == 0) + segmentSize = DEFAULT_SEGMENT_SIZE; + if (segmentCount == 0) + segmentCount = DEFAULT_SEGMENT_COUNT; + + // protect against overflow + uint32_t maxCount = uint32_t(-1) / segmentSize; + if (segmentCount > maxCount) + segmentCount = maxCount; + + nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount); + if (NS_FAILED(rv)) + return rv; + + mInput.SetNonBlocking(nonBlockingIn); + mOutput.SetNonBlocking(nonBlockingOut); + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream) +{ + NS_ADDREF(*aInputStream = &mInput); + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream) +{ + if (NS_WARN_IF(!mInited)) + return NS_ERROR_NOT_INITIALIZED; + NS_ADDREF(*aOutputStream = &mOutput); + return NS_OK; +} + +void +nsPipe::PeekSegment(uint32_t index, char *&cursor, char *&limit) +{ + if (index == 0) { + NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state"); + cursor = mReadCursor; + limit = mReadLimit; + } + else { + uint32_t numSegments = mBuffer.GetSegmentCount(); + if (index >= numSegments) + cursor = limit = nullptr; + else { + cursor = mBuffer.GetSegment(index); + if (mWriteSegment == (int32_t) index) + limit = mWriteCursor; + else + limit = cursor + mBuffer.GetSegmentSize(); + } + } +} + +nsresult +nsPipe::GetReadSegment(const char *&segment, uint32_t &segmentLen) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + if (mReadCursor == mReadLimit) + return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; + + segment = mReadCursor; + segmentLen = mReadLimit - mReadCursor; + return NS_OK; +} + +void +nsPipe::AdvanceReadCursor(uint32_t bytesRead) +{ + NS_ASSERTION(bytesRead, "don't call if no bytes read"); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + LOG(("III advancing read cursor by %u\n", bytesRead)); + NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much"); + + mReadCursor += bytesRead; + NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit"); + + mInput.ReduceAvailable(bytesRead); + + if (mReadCursor == mReadLimit) { + // we've reached the limit of how much we can read from this segment. + // if at the end of this segment, then we must discard this segment. + + // if still writing in this segment then bail because we're not done + // with the segment and have to wait for now... + if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) { + NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state"); + return; + } + + // shift write segment index (-1 indicates an empty buffer). + --mWriteSegment; + + // done with this segment + mBuffer.DeleteFirstSegment(); + LOG(("III deleting first segment\n")); + + if (mWriteSegment == -1) { + // buffer is completely empty + mReadCursor = nullptr; + mReadLimit = nullptr; + mWriteCursor = nullptr; + mWriteLimit = nullptr; + } + else { + // advance read cursor and limit to next buffer segment + mReadCursor = mBuffer.GetSegment(0); + if (mWriteSegment == 0) + mReadLimit = mWriteCursor; + else + mReadLimit = mReadCursor + mBuffer.GetSegmentSize(); + } + + // we've free'd up a segment, so notify output stream that pipe has + // room for a new segment. + if (mOutput.OnOutputWritable(events)) + mon.Notify(); + } + } +} + +nsresult +nsPipe::GetWriteSegment(char *&segment, uint32_t &segmentLen) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + if (NS_FAILED(mStatus)) + return mStatus; + + // write cursor and limit may both be null indicating an empty buffer. + if (mWriteCursor == mWriteLimit) { + char *seg = mBuffer.AppendNewSegment(); + // pipe is full + if (seg == nullptr) + return NS_BASE_STREAM_WOULD_BLOCK; + LOG(("OOO appended new segment\n")); + mWriteCursor = seg; + mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); + ++mWriteSegment; + } + + // make sure read cursor is initialized + if (mReadCursor == nullptr) { + NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor"); + mReadCursor = mReadLimit = mWriteCursor; + } + + // check to see if we can roll-back our read and write cursors to the + // beginning of the current/first segment. this is purely an optimization. + if (mReadCursor == mWriteCursor && mWriteSegment == 0) { + char *head = mBuffer.GetSegment(0); + LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head)); + mWriteCursor = mReadCursor = mReadLimit = head; + } + + segment = mWriteCursor; + segmentLen = mWriteLimit - mWriteCursor; + return NS_OK; +} + +void +nsPipe::AdvanceWriteCursor(uint32_t bytesWritten) +{ + NS_ASSERTION(bytesWritten, "don't call if no bytes written"); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + LOG(("OOO advancing write cursor by %u\n", bytesWritten)); + + char *newWriteCursor = mWriteCursor + bytesWritten; + NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit"); + + // update read limit if reading in the same segment + if (mWriteSegment == 0 && mReadLimit == mWriteCursor) + mReadLimit = newWriteCursor; + + mWriteCursor = newWriteCursor; + + // The only way mReadCursor == mWriteCursor is if: + // + // - mReadCursor is at the start of a segment (which, based on how + // nsSegmentedBuffer works, means that this segment is the "first" + // segment) + // - mWriteCursor points at the location past the end of the current + // write segment (so the current write filled the current write + // segment, so we've incremented mWriteCursor to point past the end + // of it) + // - the segment to which data has just been written is located + // exactly one segment's worth of bytes before the first segment + // where mReadCursor is located + // + // Consequently, the byte immediately after the end of the current + // write segment is the first byte of the first segment, so + // mReadCursor == mWriteCursor. (Another way to think about this is + // to consider the buffer architecture diagram above, but consider it + // with an arena allocator which allocates from the *end* of the + // arena to the *beginning* of the arena.) + NS_ASSERTION(mReadCursor != mWriteCursor || + (mBuffer.GetSegment(0) == mReadCursor && + mWriteCursor == mWriteLimit), + "read cursor is bad"); + + // update the writable flag on the output stream + if (mWriteCursor == mWriteLimit) { + if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) + mOutput.SetWritable(false); + } + + // notify input stream that pipe now contains additional data + if (mInput.OnInputReadable(bytesWritten, events)) + mon.Notify(); + } +} + +void +nsPipe::OnPipeException(nsresult reason, bool outputOnly) +{ + LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n", + reason, outputOnly)); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + // if we've already hit an exception, then ignore this one. + if (NS_FAILED(mStatus)) + return; + + mStatus = reason; + + // an output-only exception applies to the input end if the pipe has + // zero bytes available. + if (outputOnly && !mInput.Available()) + outputOnly = false; + + if (!outputOnly) + if (mInput.OnInputException(reason, events)) + mon.Notify(); + + if (mOutput.OnOutputException(reason, events)) + mon.Notify(); + } +} + +//----------------------------------------------------------------------------- +// nsPipeEvents methods: +//----------------------------------------------------------------------------- + +nsPipeEvents::~nsPipeEvents() +{ + // dispatch any pending events + + if (mInputCallback) { + mInputCallback->OnInputStreamReady(mInputStream); + mInputCallback = 0; + mInputStream = 0; + } + if (mOutputCallback) { + mOutputCallback->OnOutputStreamReady(mOutputStream); + mOutputCallback = 0; + mOutputStream = 0; + } +} + +//----------------------------------------------------------------------------- +// nsPipeInputStream methods: +//----------------------------------------------------------------------------- + +NS_IMPL_QUERY_INTERFACE(nsPipeInputStream, + nsIInputStream, + nsIAsyncInputStream, + nsISeekableStream, + nsISearchableInputStream, + nsIClassInfo) + +NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, + nsIInputStream, + nsIAsyncInputStream, + nsISeekableStream, + nsISearchableInputStream) + +NS_IMPL_THREADSAFE_CI(nsPipeInputStream) + +nsresult +nsPipeInputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream"); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) { + LOG(("III pipe input: waiting for data\n")); + + mBlocked = true; + mon.Wait(); + mBlocked = false; + + LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n", + mPipe->mStatus, mAvailable)); + } + + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; +} + +bool +nsPipeInputStream::OnInputReadable(uint32_t bytesWritten, nsPipeEvents &events) +{ + bool result = false; + + mAvailable += bytesWritten; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + events.NotifyInputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = true; + + return result; +} + +bool +nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events) +{ + LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", + this, reason)); + + bool result = false; + + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); + + // force count of available bytes to zero. + mAvailable = 0; + + if (mCallback) { + events.NotifyInputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = true; + + return result; +} + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeInputStream::AddRef(void) +{ + ++mReaderRefCnt; + return mPipe->AddRef(); +} + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeInputStream::Release(void) +{ + if (--mReaderRefCnt == 0) + Close(); + return mPipe->Release(); +} + +NS_IMETHODIMP +nsPipeInputStream::CloseWithStatus(nsresult reason) +{ + LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason)); + + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + mPipe->OnPipeException(reason); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeInputStream::Available(uint64_t *result) +{ + // nsPipeInputStream supports under 4GB stream only + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // return error if pipe closed + if (!mAvailable && NS_FAILED(mPipe->mStatus)) + return mPipe->mStatus; + + *result = (uint64_t)mAvailable; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer, + void *closure, + uint32_t count, + uint32_t *readCount) +{ + LOG(("III ReadSegments [this=%x count=%u]\n", this, count)); + + nsresult rv = NS_OK; + + const char *segment; + uint32_t segmentLen; + + *readCount = 0; + while (count) { + rv = mPipe->GetReadSegment(segment, segmentLen); + if (NS_FAILED(rv)) { + // ignore this error if we've already read something. + if (*readCount > 0) { + rv = NS_OK; + break; + } + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is empty + if (!mBlocking) + break; + // wait for some data to be written to the pipe + rv = Wait(); + if (NS_SUCCEEDED(rv)) + continue; + } + // ignore this error, just return. + if (rv == NS_BASE_STREAM_CLOSED) { + rv = NS_OK; + break; + } + mPipe->OnPipeException(rv); + break; + } + + // read no more than count + if (segmentLen > count) + segmentLen = count; + + uint32_t writeCount, originalLen = segmentLen; + while (segmentLen) { + writeCount = 0; + + rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount); + + if (NS_FAILED(rv) || writeCount == 0) { + count = 0; + // any errors returned from the writer end here: do not + // propagate to the caller of ReadSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected"); + segment += writeCount; + segmentLen -= writeCount; + count -= writeCount; + *readCount += writeCount; + mLogicalOffset += writeCount; + } + + if (segmentLen < originalLen) + mPipe->AdvanceReadCursor(originalLen - segmentLen); + } + + return rv; +} + +NS_IMETHODIMP +nsPipeInputStream::Read(char* toBuf, uint32_t bufLen, uint32_t *readCount) +{ + return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount); +} + +NS_IMETHODIMP +nsPipeInputStream::IsNonBlocking(bool *aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback, + uint32_t flags, + uint32_t requestedCount, + nsIEventTarget *target) +{ + LOG(("III AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // replace a pending callback + mCallback = 0; + mCallbackFlags = 0; + + if (!callback) + return NS_OK; + + nsCOMPtr proxy; + if (target) { + proxy = NS_NewInputStreamReadyEvent(callback, target); + callback = proxy; + } + + if (NS_FAILED(mPipe->mStatus) || + (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or readable; post event. + pipeEvents.NotifyInputReady(this, callback); + } + else { + // queue up callback object to be notified when data becomes available + mCallback = callback; + mCallbackFlags = flags; + } + } + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Seek(int32_t whence, int64_t offset) +{ + NS_NOTREACHED("nsPipeInputStream::Seek"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsPipeInputStream::Tell(int64_t *offset) +{ + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // return error if pipe closed + if (!mAvailable && NS_FAILED(mPipe->mStatus)) + return mPipe->mStatus; + + *offset = mLogicalOffset; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::SetEOF() +{ + NS_NOTREACHED("nsPipeInputStream::SetEOF"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +#define COMPARE(s1, s2, i) \ + (ignoreCase \ + ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \ + : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i)) + +NS_IMETHODIMP +nsPipeInputStream::Search(const char *forString, + bool ignoreCase, + bool *found, + uint32_t *offsetSearchedTo) +{ + LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase)); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + char *cursor1, *limit1; + uint32_t index = 0, offset = 0; + uint32_t strLen = strlen(forString); + + mPipe->PeekSegment(0, cursor1, limit1); + if (cursor1 == limit1) { + *found = false; + *offsetSearchedTo = 0; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + + while (true) { + uint32_t i, len1 = limit1 - cursor1; + + // check if the string is in the buffer segment + for (i = 0; i < len1 - strLen + 1; i++) { + if (COMPARE(&cursor1[i], forString, strLen) == 0) { + *found = true; + *offsetSearchedTo = offset + i; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + } + + // get the next segment + char *cursor2, *limit2; + uint32_t len2; + + index++; + offset += len1; + + mPipe->PeekSegment(index, cursor2, limit2); + if (cursor2 == limit2) { + *found = false; + *offsetSearchedTo = offset - strLen + 1; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + len2 = limit2 - cursor2; + + // check if the string is straddling the next buffer segment + uint32_t lim = XPCOM_MIN(strLen, len2 + 1); + for (i = 0; i < lim; ++i) { + uint32_t strPart1Len = strLen - i - 1; + uint32_t strPart2Len = strLen - strPart1Len; + const char* strPart2 = &forString[strLen - strPart2Len]; + uint32_t bufSeg1Offset = len1 - strPart1Len; + if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 && + COMPARE(cursor2, strPart2, strPart2Len) == 0) { + *found = true; + *offsetSearchedTo = offset - strPart1Len; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + } + + // finally continue with the next buffer + cursor1 = cursor2; + limit1 = limit2; + } + + NS_NOTREACHED("can't get here"); + return NS_ERROR_UNEXPECTED; // keep compiler happy +} + +//----------------------------------------------------------------------------- +// nsPipeOutputStream methods: +//----------------------------------------------------------------------------- + +NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, + nsIOutputStream, + nsIAsyncOutputStream, + nsIClassInfo) + +NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, + nsIOutputStream, + nsIAsyncOutputStream) + +NS_IMPL_THREADSAFE_CI(nsPipeOutputStream) + +nsresult +nsPipeOutputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream"); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { + LOG(("OOO pipe output: waiting for space\n")); + mBlocked = true; + mon.Wait(); + mBlocked = false; + LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n", + mPipe->mStatus, mWritable)); + } + + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; +} + +bool +nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events) +{ + bool result = false; + + mWritable = true; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + events.NotifyOutputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = true; + + return result; +} + +bool +nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events) +{ + LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", + this, reason)); + + bool result = false; + + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); + mWritable = false; + + if (mCallback) { + events.NotifyOutputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = true; + + return result; +} + + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeOutputStream::AddRef() +{ + ++mWriterRefCnt; + return mPipe->AddRef(); +} + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeOutputStream::Release() +{ + if (--mWriterRefCnt == 0) + Close(); + return mPipe->Release(); +} + +NS_IMETHODIMP +nsPipeOutputStream::CloseWithStatus(nsresult reason) +{ + LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason)); + + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + // input stream may remain open + mPipe->OnPipeException(reason, true); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader, + void* closure, + uint32_t count, + uint32_t *writeCount) +{ + LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count)); + + nsresult rv = NS_OK; + + char *segment; + uint32_t segmentLen; + + *writeCount = 0; + while (count) { + rv = mPipe->GetWriteSegment(segment, segmentLen); + if (NS_FAILED(rv)) { + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is full + if (!mBlocking) { + // ignore this error if we've already written something + if (*writeCount > 0) + rv = NS_OK; + break; + } + // wait for the pipe to have an empty segment. + rv = Wait(); + if (NS_SUCCEEDED(rv)) + continue; + } + mPipe->OnPipeException(rv); + break; + } + + // write no more than count + if (segmentLen > count) + segmentLen = count; + + uint32_t readCount, originalLen = segmentLen; + while (segmentLen) { + readCount = 0; + + rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount); + + if (NS_FAILED(rv) || readCount == 0) { + count = 0; + // any errors returned from the reader end here: do not + // propagate to the caller of WriteSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(readCount <= segmentLen, "read more than expected"); + segment += readCount; + segmentLen -= readCount; + count -= readCount; + *writeCount += readCount; + mLogicalOffset += readCount; + } + + if (segmentLen < originalLen) + mPipe->AdvanceWriteCursor(originalLen - segmentLen); + } + + return rv; +} + +static NS_METHOD +nsReadFromRawBuffer(nsIOutputStream* outStr, + void* closure, + char* toRawSegment, + uint32_t offset, + uint32_t count, + uint32_t *readCount) +{ + const char* fromBuf = (const char*)closure; + memcpy(toRawSegment, &fromBuf[offset], count); + *readCount = count; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Write(const char* fromBuf, + uint32_t bufLen, + uint32_t *writeCount) +{ + return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::Flush(void) +{ + // nothing to do + return NS_OK; +} + +static NS_METHOD +nsReadFromInputStream(nsIOutputStream* outStr, + void* closure, + char* toRawSegment, + uint32_t offset, + uint32_t count, + uint32_t *readCount) +{ + nsIInputStream* fromStream = (nsIInputStream*)closure; + return fromStream->Read(toRawSegment, count, readCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream, + uint32_t count, + uint32_t *writeCount) +{ + return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::IsNonBlocking(bool *aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback, + uint32_t flags, + uint32_t requestedCount, + nsIEventTarget *target) +{ + LOG(("OOO AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // replace a pending callback + mCallback = 0; + mCallbackFlags = 0; + + if (!callback) + return NS_OK; + + nsCOMPtr proxy; + if (target) { + proxy = NS_NewOutputStreamReadyEvent(callback, target); + callback = proxy; + } + + if (NS_FAILED(mPipe->mStatus) || + (mWritable && !(flags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or writable; post event. + pipeEvents.NotifyOutputReady(this, callback); + } + else { + // queue up callback object to be notified when data becomes available + mCallback = callback; + mCallbackFlags = flags; + } + } + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +nsresult +NS_NewPipe(nsIInputStream **pipeIn, + nsIOutputStream **pipeOut, + uint32_t segmentSize, + uint32_t maxSize, + bool nonBlockingInput, + bool nonBlockingOutput) +{ + if (segmentSize == 0) + segmentSize = DEFAULT_SEGMENT_SIZE; + + // Handle maxSize of UINT32_MAX as a special case + uint32_t segmentCount; + if (maxSize == UINT32_MAX) + segmentCount = UINT32_MAX; + else + segmentCount = maxSize / segmentSize; + + nsIAsyncInputStream *in; + nsIAsyncOutputStream *out; + nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, + segmentSize, segmentCount); + if (NS_FAILED(rv)) return rv; + + *pipeIn = in; + *pipeOut = out; + return NS_OK; +} + +nsresult +NS_NewPipe2(nsIAsyncInputStream **pipeIn, + nsIAsyncOutputStream **pipeOut, + bool nonBlockingInput, + bool nonBlockingOutput, + uint32_t segmentSize, + uint32_t segmentCount) +{ + nsresult rv; + + nsPipe *pipe = new nsPipe(); + if (!pipe) + return NS_ERROR_OUT_OF_MEMORY; + + rv = pipe->Init(nonBlockingInput, + nonBlockingOutput, + segmentSize, + segmentCount); + if (NS_FAILED(rv)) { + NS_ADDREF(pipe); + NS_RELEASE(pipe); + return rv; + } + + pipe->GetInputStream(pipeIn); + pipe->GetOutputStream(pipeOut); + return NS_OK; +} + +nsresult +nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result) +{ + if (outer) + return NS_ERROR_NO_AGGREGATION; + nsPipe *pipe = new nsPipe(); + if (!pipe) + return NS_ERROR_OUT_OF_MEMORY; + NS_ADDREF(pipe); + nsresult rv = pipe->QueryInterface(iid, result); + NS_RELEASE(pipe); + return rv; +} + +////////////////////////////////////////////////////////////////////////////////