1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/xpcom/io/nsPipe3.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1310 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +#include "mozilla/Attributes.h" 1.9 +#include "mozilla/ReentrantMonitor.h" 1.10 +#include "nsIPipe.h" 1.11 +#include "nsIEventTarget.h" 1.12 +#include "nsISeekableStream.h" 1.13 +#include "nsIProgrammingLanguage.h" 1.14 +#include "nsSegmentedBuffer.h" 1.15 +#include "nsStreamUtils.h" 1.16 +#include "nsCOMPtr.h" 1.17 +#include "nsCRT.h" 1.18 +#include "prlog.h" 1.19 +#include "nsIClassInfoImpl.h" 1.20 +#include "nsAlgorithm.h" 1.21 +#include "nsMemory.h" 1.22 +#include "nsIAsyncInputStream.h" 1.23 +#include "nsIAsyncOutputStream.h" 1.24 + 1.25 +using namespace mozilla; 1.26 + 1.27 +#ifdef LOG 1.28 +#undef LOG 1.29 +#endif 1.30 +#if defined(PR_LOGGING) 1.31 +// 1.32 +// set NSPR_LOG_MODULES=nsPipe:5 1.33 +// 1.34 +static PRLogModuleInfo * 1.35 +GetPipeLog() 1.36 +{ 1.37 + static PRLogModuleInfo *sLog; 1.38 + if (!sLog) 1.39 + sLog = PR_NewLogModule("nsPipe"); 1.40 + return sLog; 1.41 +} 1.42 +#define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args) 1.43 +#else 1.44 +#define LOG(args) 1.45 +#endif 1.46 + 1.47 +#define DEFAULT_SEGMENT_SIZE 4096 1.48 +#define DEFAULT_SEGMENT_COUNT 16 1.49 + 1.50 +class nsPipe; 1.51 +class nsPipeEvents; 1.52 +class nsPipeInputStream; 1.53 +class nsPipeOutputStream; 1.54 + 1.55 +//----------------------------------------------------------------------------- 1.56 + 1.57 +// this class is used to delay notifications until the end of a particular 1.58 +// scope. it helps avoid the complexity of issuing callbacks while inside 1.59 +// a critical section. 1.60 +class nsPipeEvents 1.61 +{ 1.62 +public: 1.63 + nsPipeEvents() { } 1.64 + ~nsPipeEvents(); 1.65 + 1.66 + inline void NotifyInputReady(nsIAsyncInputStream *stream, 1.67 + nsIInputStreamCallback *callback) 1.68 + { 1.69 + NS_ASSERTION(!mInputCallback, "already have an input event"); 1.70 + mInputStream = stream; 1.71 + mInputCallback = callback; 1.72 + } 1.73 + 1.74 + inline void NotifyOutputReady(nsIAsyncOutputStream *stream, 1.75 + nsIOutputStreamCallback *callback) 1.76 + { 1.77 + NS_ASSERTION(!mOutputCallback, "already have an output event"); 1.78 + mOutputStream = stream; 1.79 + mOutputCallback = callback; 1.80 + } 1.81 + 1.82 +private: 1.83 + nsCOMPtr<nsIAsyncInputStream> mInputStream; 1.84 + nsCOMPtr<nsIInputStreamCallback> mInputCallback; 1.85 + nsCOMPtr<nsIAsyncOutputStream> mOutputStream; 1.86 + nsCOMPtr<nsIOutputStreamCallback> mOutputCallback; 1.87 +}; 1.88 + 1.89 +//----------------------------------------------------------------------------- 1.90 + 1.91 +// the input end of a pipe (allocated as a member of the pipe). 1.92 +class nsPipeInputStream : public nsIAsyncInputStream 1.93 + , public nsISeekableStream 1.94 + , public nsISearchableInputStream 1.95 + , public nsIClassInfo 1.96 +{ 1.97 +public: 1.98 + // since this class will be allocated as a member of the pipe, we do not 1.99 + // need our own ref count. instead, we share the lifetime (the ref count) 1.100 + // of the entire pipe. this macro is just convenience since it does not 1.101 + // declare a mRefCount variable; however, don't let the name fool you... 1.102 + // we are not inheriting from nsPipe ;-) 1.103 + NS_DECL_ISUPPORTS_INHERITED 1.104 + 1.105 + NS_DECL_NSIINPUTSTREAM 1.106 + NS_DECL_NSIASYNCINPUTSTREAM 1.107 + NS_DECL_NSISEEKABLESTREAM 1.108 + NS_DECL_NSISEARCHABLEINPUTSTREAM 1.109 + NS_DECL_NSICLASSINFO 1.110 + 1.111 + nsPipeInputStream(nsPipe *pipe) 1.112 + : mPipe(pipe) 1.113 + , mReaderRefCnt(0) 1.114 + , mLogicalOffset(0) 1.115 + , mBlocking(true) 1.116 + , mBlocked(false) 1.117 + , mAvailable(0) 1.118 + , mCallbackFlags(0) 1.119 + { } 1.120 + 1.121 + nsresult Fill(); 1.122 + void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } 1.123 + 1.124 + uint32_t Available() { return mAvailable; } 1.125 + void ReduceAvailable(uint32_t avail) { mAvailable -= avail; } 1.126 + 1.127 + // synchronously wait for the pipe to become readable. 1.128 + nsresult Wait(); 1.129 + 1.130 + // these functions return true to indicate that the pipe's monitor should 1.131 + // be notified, to wake up a blocked reader if any. 1.132 + bool OnInputReadable(uint32_t bytesWritten, nsPipeEvents &); 1.133 + bool OnInputException(nsresult, nsPipeEvents &); 1.134 + 1.135 +private: 1.136 + nsPipe *mPipe; 1.137 + 1.138 + // separate refcnt so that we know when to close the consumer 1.139 + mozilla::ThreadSafeAutoRefCnt mReaderRefCnt; 1.140 + int64_t mLogicalOffset; 1.141 + bool mBlocking; 1.142 + 1.143 + // these variables can only be accessed while inside the pipe's monitor 1.144 + bool mBlocked; 1.145 + uint32_t mAvailable; 1.146 + nsCOMPtr<nsIInputStreamCallback> mCallback; 1.147 + uint32_t mCallbackFlags; 1.148 +}; 1.149 + 1.150 +//----------------------------------------------------------------------------- 1.151 + 1.152 +// the output end of a pipe (allocated as a member of the pipe). 1.153 +class nsPipeOutputStream : public nsIAsyncOutputStream 1.154 + , public nsIClassInfo 1.155 +{ 1.156 +public: 1.157 + // since this class will be allocated as a member of the pipe, we do not 1.158 + // need our own ref count. instead, we share the lifetime (the ref count) 1.159 + // of the entire pipe. this macro is just convenience since it does not 1.160 + // declare a mRefCount variable; however, don't let the name fool you... 1.161 + // we are not inheriting from nsPipe ;-) 1.162 + NS_DECL_ISUPPORTS_INHERITED 1.163 + 1.164 + NS_DECL_NSIOUTPUTSTREAM 1.165 + NS_DECL_NSIASYNCOUTPUTSTREAM 1.166 + NS_DECL_NSICLASSINFO 1.167 + 1.168 + nsPipeOutputStream(nsPipe *pipe) 1.169 + : mPipe(pipe) 1.170 + , mWriterRefCnt(0) 1.171 + , mLogicalOffset(0) 1.172 + , mBlocking(true) 1.173 + , mBlocked(false) 1.174 + , mWritable(true) 1.175 + , mCallbackFlags(0) 1.176 + { } 1.177 + 1.178 + void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } 1.179 + void SetWritable(bool writable) { mWritable = writable; } 1.180 + 1.181 + // synchronously wait for the pipe to become writable. 1.182 + nsresult Wait(); 1.183 + 1.184 + // these functions return true to indicate that the pipe's monitor should 1.185 + // be notified, to wake up a blocked writer if any. 1.186 + bool OnOutputWritable(nsPipeEvents &); 1.187 + bool OnOutputException(nsresult, nsPipeEvents &); 1.188 + 1.189 +private: 1.190 + nsPipe *mPipe; 1.191 + 1.192 + // separate refcnt so that we know when to close the producer 1.193 + mozilla::ThreadSafeAutoRefCnt mWriterRefCnt; 1.194 + int64_t mLogicalOffset; 1.195 + bool mBlocking; 1.196 + 1.197 + // these variables can only be accessed while inside the pipe's monitor 1.198 + bool mBlocked; 1.199 + bool mWritable; 1.200 + nsCOMPtr<nsIOutputStreamCallback> mCallback; 1.201 + uint32_t mCallbackFlags; 1.202 +}; 1.203 + 1.204 +//----------------------------------------------------------------------------- 1.205 + 1.206 +class nsPipe MOZ_FINAL : public nsIPipe 1.207 +{ 1.208 +public: 1.209 + friend class nsPipeInputStream; 1.210 + friend class nsPipeOutputStream; 1.211 + 1.212 + NS_DECL_THREADSAFE_ISUPPORTS 1.213 + NS_DECL_NSIPIPE 1.214 + 1.215 + // nsPipe methods: 1.216 + nsPipe(); 1.217 + 1.218 +private: 1.219 + ~nsPipe(); 1.220 + 1.221 +public: 1.222 + // 1.223 + // methods below may only be called while inside the pipe's monitor 1.224 + // 1.225 + 1.226 + void PeekSegment(uint32_t n, char *&cursor, char *&limit); 1.227 + 1.228 + // 1.229 + // methods below may be called while outside the pipe's monitor 1.230 + // 1.231 + 1.232 + nsresult GetReadSegment(const char *&segment, uint32_t &segmentLen); 1.233 + void AdvanceReadCursor(uint32_t count); 1.234 + 1.235 + nsresult GetWriteSegment(char *&segment, uint32_t &segmentLen); 1.236 + void AdvanceWriteCursor(uint32_t count); 1.237 + 1.238 + void OnPipeException(nsresult reason, bool outputOnly = false); 1.239 + 1.240 +protected: 1.241 + // We can't inherit from both nsIInputStream and nsIOutputStream 1.242 + // because they collide on their Close method. Consequently we nest their 1.243 + // implementations to avoid the extra object allocation. 1.244 + nsPipeInputStream mInput; 1.245 + nsPipeOutputStream mOutput; 1.246 + 1.247 + ReentrantMonitor mReentrantMonitor; 1.248 + nsSegmentedBuffer mBuffer; 1.249 + 1.250 + char* mReadCursor; 1.251 + char* mReadLimit; 1.252 + 1.253 + int32_t mWriteSegment; 1.254 + char* mWriteCursor; 1.255 + char* mWriteLimit; 1.256 + 1.257 + nsresult mStatus; 1.258 + bool mInited; 1.259 +}; 1.260 + 1.261 +// 1.262 +// NOTES on buffer architecture: 1.263 +// 1.264 +// +-----------------+ - - mBuffer.GetSegment(0) 1.265 +// | | 1.266 +// + - - - - - - - - + - - mReadCursor 1.267 +// |/////////////////| 1.268 +// |/////////////////| 1.269 +// |/////////////////| 1.270 +// |/////////////////| 1.271 +// +-----------------+ - - mReadLimit 1.272 +// | 1.273 +// +-----------------+ 1.274 +// |/////////////////| 1.275 +// |/////////////////| 1.276 +// |/////////////////| 1.277 +// |/////////////////| 1.278 +// |/////////////////| 1.279 +// |/////////////////| 1.280 +// +-----------------+ 1.281 +// | 1.282 +// +-----------------+ - - mBuffer.GetSegment(mWriteSegment) 1.283 +// |/////////////////| 1.284 +// |/////////////////| 1.285 +// |/////////////////| 1.286 +// + - - - - - - - - + - - mWriteCursor 1.287 +// | | 1.288 +// | | 1.289 +// +-----------------+ - - mWriteLimit 1.290 +// 1.291 +// (shaded region contains data) 1.292 +// 1.293 +// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for 1.294 +// small allocations (e.g., 64 byte allocations). this means that buffers may 1.295 +// be allocated back-to-back. in the diagram above, for example, mReadLimit 1.296 +// would actually be pointing at the beginning of the next segment. when 1.297 +// making changes to this file, please keep this fact in mind. 1.298 +// 1.299 + 1.300 +//----------------------------------------------------------------------------- 1.301 +// nsPipe methods: 1.302 +//----------------------------------------------------------------------------- 1.303 + 1.304 +nsPipe::nsPipe() 1.305 + : mInput(MOZ_THIS_IN_INITIALIZER_LIST()) 1.306 + , mOutput(MOZ_THIS_IN_INITIALIZER_LIST()) 1.307 + , mReentrantMonitor("nsPipe.mReentrantMonitor") 1.308 + , mReadCursor(nullptr) 1.309 + , mReadLimit(nullptr) 1.310 + , mWriteSegment(-1) 1.311 + , mWriteCursor(nullptr) 1.312 + , mWriteLimit(nullptr) 1.313 + , mStatus(NS_OK) 1.314 + , mInited(false) 1.315 +{ 1.316 +} 1.317 + 1.318 +nsPipe::~nsPipe() 1.319 +{ 1.320 +} 1.321 + 1.322 +NS_IMPL_ISUPPORTS(nsPipe, nsIPipe) 1.323 + 1.324 +NS_IMETHODIMP 1.325 +nsPipe::Init(bool nonBlockingIn, 1.326 + bool nonBlockingOut, 1.327 + uint32_t segmentSize, 1.328 + uint32_t segmentCount) 1.329 +{ 1.330 + mInited = true; 1.331 + 1.332 + if (segmentSize == 0) 1.333 + segmentSize = DEFAULT_SEGMENT_SIZE; 1.334 + if (segmentCount == 0) 1.335 + segmentCount = DEFAULT_SEGMENT_COUNT; 1.336 + 1.337 + // protect against overflow 1.338 + uint32_t maxCount = uint32_t(-1) / segmentSize; 1.339 + if (segmentCount > maxCount) 1.340 + segmentCount = maxCount; 1.341 + 1.342 + nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount); 1.343 + if (NS_FAILED(rv)) 1.344 + return rv; 1.345 + 1.346 + mInput.SetNonBlocking(nonBlockingIn); 1.347 + mOutput.SetNonBlocking(nonBlockingOut); 1.348 + return NS_OK; 1.349 +} 1.350 + 1.351 +NS_IMETHODIMP 1.352 +nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream) 1.353 +{ 1.354 + NS_ADDREF(*aInputStream = &mInput); 1.355 + return NS_OK; 1.356 +} 1.357 + 1.358 +NS_IMETHODIMP 1.359 +nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream) 1.360 +{ 1.361 + if (NS_WARN_IF(!mInited)) 1.362 + return NS_ERROR_NOT_INITIALIZED; 1.363 + NS_ADDREF(*aOutputStream = &mOutput); 1.364 + return NS_OK; 1.365 +} 1.366 + 1.367 +void 1.368 +nsPipe::PeekSegment(uint32_t index, char *&cursor, char *&limit) 1.369 +{ 1.370 + if (index == 0) { 1.371 + NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state"); 1.372 + cursor = mReadCursor; 1.373 + limit = mReadLimit; 1.374 + } 1.375 + else { 1.376 + uint32_t numSegments = mBuffer.GetSegmentCount(); 1.377 + if (index >= numSegments) 1.378 + cursor = limit = nullptr; 1.379 + else { 1.380 + cursor = mBuffer.GetSegment(index); 1.381 + if (mWriteSegment == (int32_t) index) 1.382 + limit = mWriteCursor; 1.383 + else 1.384 + limit = cursor + mBuffer.GetSegmentSize(); 1.385 + } 1.386 + } 1.387 +} 1.388 + 1.389 +nsresult 1.390 +nsPipe::GetReadSegment(const char *&segment, uint32_t &segmentLen) 1.391 +{ 1.392 + ReentrantMonitorAutoEnter mon(mReentrantMonitor); 1.393 + 1.394 + if (mReadCursor == mReadLimit) 1.395 + return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; 1.396 + 1.397 + segment = mReadCursor; 1.398 + segmentLen = mReadLimit - mReadCursor; 1.399 + return NS_OK; 1.400 +} 1.401 + 1.402 +void 1.403 +nsPipe::AdvanceReadCursor(uint32_t bytesRead) 1.404 +{ 1.405 + NS_ASSERTION(bytesRead, "don't call if no bytes read"); 1.406 + 1.407 + nsPipeEvents events; 1.408 + { 1.409 + ReentrantMonitorAutoEnter mon(mReentrantMonitor); 1.410 + 1.411 + LOG(("III advancing read cursor by %u\n", bytesRead)); 1.412 + NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much"); 1.413 + 1.414 + mReadCursor += bytesRead; 1.415 + NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit"); 1.416 + 1.417 + mInput.ReduceAvailable(bytesRead); 1.418 + 1.419 + if (mReadCursor == mReadLimit) { 1.420 + // we've reached the limit of how much we can read from this segment. 1.421 + // if at the end of this segment, then we must discard this segment. 1.422 + 1.423 + // if still writing in this segment then bail because we're not done 1.424 + // with the segment and have to wait for now... 1.425 + if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) { 1.426 + NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state"); 1.427 + return; 1.428 + } 1.429 + 1.430 + // shift write segment index (-1 indicates an empty buffer). 1.431 + --mWriteSegment; 1.432 + 1.433 + // done with this segment 1.434 + mBuffer.DeleteFirstSegment(); 1.435 + LOG(("III deleting first segment\n")); 1.436 + 1.437 + if (mWriteSegment == -1) { 1.438 + // buffer is completely empty 1.439 + mReadCursor = nullptr; 1.440 + mReadLimit = nullptr; 1.441 + mWriteCursor = nullptr; 1.442 + mWriteLimit = nullptr; 1.443 + } 1.444 + else { 1.445 + // advance read cursor and limit to next buffer segment 1.446 + mReadCursor = mBuffer.GetSegment(0); 1.447 + if (mWriteSegment == 0) 1.448 + mReadLimit = mWriteCursor; 1.449 + else 1.450 + mReadLimit = mReadCursor + mBuffer.GetSegmentSize(); 1.451 + } 1.452 + 1.453 + // we've free'd up a segment, so notify output stream that pipe has 1.454 + // room for a new segment. 1.455 + if (mOutput.OnOutputWritable(events)) 1.456 + mon.Notify(); 1.457 + } 1.458 + } 1.459 +} 1.460 + 1.461 +nsresult 1.462 +nsPipe::GetWriteSegment(char *&segment, uint32_t &segmentLen) 1.463 +{ 1.464 + ReentrantMonitorAutoEnter mon(mReentrantMonitor); 1.465 + 1.466 + if (NS_FAILED(mStatus)) 1.467 + return mStatus; 1.468 + 1.469 + // write cursor and limit may both be null indicating an empty buffer. 1.470 + if (mWriteCursor == mWriteLimit) { 1.471 + char *seg = mBuffer.AppendNewSegment(); 1.472 + // pipe is full 1.473 + if (seg == nullptr) 1.474 + return NS_BASE_STREAM_WOULD_BLOCK; 1.475 + LOG(("OOO appended new segment\n")); 1.476 + mWriteCursor = seg; 1.477 + mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); 1.478 + ++mWriteSegment; 1.479 + } 1.480 + 1.481 + // make sure read cursor is initialized 1.482 + if (mReadCursor == nullptr) { 1.483 + NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor"); 1.484 + mReadCursor = mReadLimit = mWriteCursor; 1.485 + } 1.486 + 1.487 + // check to see if we can roll-back our read and write cursors to the 1.488 + // beginning of the current/first segment. this is purely an optimization. 1.489 + if (mReadCursor == mWriteCursor && mWriteSegment == 0) { 1.490 + char *head = mBuffer.GetSegment(0); 1.491 + LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head)); 1.492 + mWriteCursor = mReadCursor = mReadLimit = head; 1.493 + } 1.494 + 1.495 + segment = mWriteCursor; 1.496 + segmentLen = mWriteLimit - mWriteCursor; 1.497 + return NS_OK; 1.498 +} 1.499 + 1.500 +void 1.501 +nsPipe::AdvanceWriteCursor(uint32_t bytesWritten) 1.502 +{ 1.503 + NS_ASSERTION(bytesWritten, "don't call if no bytes written"); 1.504 + 1.505 + nsPipeEvents events; 1.506 + { 1.507 + ReentrantMonitorAutoEnter mon(mReentrantMonitor); 1.508 + 1.509 + LOG(("OOO advancing write cursor by %u\n", bytesWritten)); 1.510 + 1.511 + char *newWriteCursor = mWriteCursor + bytesWritten; 1.512 + NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit"); 1.513 + 1.514 + // update read limit if reading in the same segment 1.515 + if (mWriteSegment == 0 && mReadLimit == mWriteCursor) 1.516 + mReadLimit = newWriteCursor; 1.517 + 1.518 + mWriteCursor = newWriteCursor; 1.519 + 1.520 + // The only way mReadCursor == mWriteCursor is if: 1.521 + // 1.522 + // - mReadCursor is at the start of a segment (which, based on how 1.523 + // nsSegmentedBuffer works, means that this segment is the "first" 1.524 + // segment) 1.525 + // - mWriteCursor points at the location past the end of the current 1.526 + // write segment (so the current write filled the current write 1.527 + // segment, so we've incremented mWriteCursor to point past the end 1.528 + // of it) 1.529 + // - the segment to which data has just been written is located 1.530 + // exactly one segment's worth of bytes before the first segment 1.531 + // where mReadCursor is located 1.532 + // 1.533 + // Consequently, the byte immediately after the end of the current 1.534 + // write segment is the first byte of the first segment, so 1.535 + // mReadCursor == mWriteCursor. (Another way to think about this is 1.536 + // to consider the buffer architecture diagram above, but consider it 1.537 + // with an arena allocator which allocates from the *end* of the 1.538 + // arena to the *beginning* of the arena.) 1.539 + NS_ASSERTION(mReadCursor != mWriteCursor || 1.540 + (mBuffer.GetSegment(0) == mReadCursor && 1.541 + mWriteCursor == mWriteLimit), 1.542 + "read cursor is bad"); 1.543 + 1.544 + // update the writable flag on the output stream 1.545 + if (mWriteCursor == mWriteLimit) { 1.546 + if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) 1.547 + mOutput.SetWritable(false); 1.548 + } 1.549 + 1.550 + // notify input stream that pipe now contains additional data 1.551 + if (mInput.OnInputReadable(bytesWritten, events)) 1.552 + mon.Notify(); 1.553 + } 1.554 +} 1.555 + 1.556 +void 1.557 +nsPipe::OnPipeException(nsresult reason, bool outputOnly) 1.558 +{ 1.559 + LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n", 1.560 + reason, outputOnly)); 1.561 + 1.562 + nsPipeEvents events; 1.563 + { 1.564 + ReentrantMonitorAutoEnter mon(mReentrantMonitor); 1.565 + 1.566 + // if we've already hit an exception, then ignore this one. 1.567 + if (NS_FAILED(mStatus)) 1.568 + return; 1.569 + 1.570 + mStatus = reason; 1.571 + 1.572 + // an output-only exception applies to the input end if the pipe has 1.573 + // zero bytes available. 1.574 + if (outputOnly && !mInput.Available()) 1.575 + outputOnly = false; 1.576 + 1.577 + if (!outputOnly) 1.578 + if (mInput.OnInputException(reason, events)) 1.579 + mon.Notify(); 1.580 + 1.581 + if (mOutput.OnOutputException(reason, events)) 1.582 + mon.Notify(); 1.583 + } 1.584 +} 1.585 + 1.586 +//----------------------------------------------------------------------------- 1.587 +// nsPipeEvents methods: 1.588 +//----------------------------------------------------------------------------- 1.589 + 1.590 +nsPipeEvents::~nsPipeEvents() 1.591 +{ 1.592 + // dispatch any pending events 1.593 + 1.594 + if (mInputCallback) { 1.595 + mInputCallback->OnInputStreamReady(mInputStream); 1.596 + mInputCallback = 0; 1.597 + mInputStream = 0; 1.598 + } 1.599 + if (mOutputCallback) { 1.600 + mOutputCallback->OnOutputStreamReady(mOutputStream); 1.601 + mOutputCallback = 0; 1.602 + mOutputStream = 0; 1.603 + } 1.604 +} 1.605 + 1.606 +//----------------------------------------------------------------------------- 1.607 +// nsPipeInputStream methods: 1.608 +//----------------------------------------------------------------------------- 1.609 + 1.610 +NS_IMPL_QUERY_INTERFACE(nsPipeInputStream, 1.611 + nsIInputStream, 1.612 + nsIAsyncInputStream, 1.613 + nsISeekableStream, 1.614 + nsISearchableInputStream, 1.615 + nsIClassInfo) 1.616 + 1.617 +NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, 1.618 + nsIInputStream, 1.619 + nsIAsyncInputStream, 1.620 + nsISeekableStream, 1.621 + nsISearchableInputStream) 1.622 + 1.623 +NS_IMPL_THREADSAFE_CI(nsPipeInputStream) 1.624 + 1.625 +nsresult 1.626 +nsPipeInputStream::Wait() 1.627 +{ 1.628 + NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream"); 1.629 + 1.630 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.631 + 1.632 + while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) { 1.633 + LOG(("III pipe input: waiting for data\n")); 1.634 + 1.635 + mBlocked = true; 1.636 + mon.Wait(); 1.637 + mBlocked = false; 1.638 + 1.639 + LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n", 1.640 + mPipe->mStatus, mAvailable)); 1.641 + } 1.642 + 1.643 + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; 1.644 +} 1.645 + 1.646 +bool 1.647 +nsPipeInputStream::OnInputReadable(uint32_t bytesWritten, nsPipeEvents &events) 1.648 +{ 1.649 + bool result = false; 1.650 + 1.651 + mAvailable += bytesWritten; 1.652 + 1.653 + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { 1.654 + events.NotifyInputReady(this, mCallback); 1.655 + mCallback = 0; 1.656 + mCallbackFlags = 0; 1.657 + } 1.658 + else if (mBlocked) 1.659 + result = true; 1.660 + 1.661 + return result; 1.662 +} 1.663 + 1.664 +bool 1.665 +nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events) 1.666 +{ 1.667 + LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", 1.668 + this, reason)); 1.669 + 1.670 + bool result = false; 1.671 + 1.672 + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); 1.673 + 1.674 + // force count of available bytes to zero. 1.675 + mAvailable = 0; 1.676 + 1.677 + if (mCallback) { 1.678 + events.NotifyInputReady(this, mCallback); 1.679 + mCallback = 0; 1.680 + mCallbackFlags = 0; 1.681 + } 1.682 + else if (mBlocked) 1.683 + result = true; 1.684 + 1.685 + return result; 1.686 +} 1.687 + 1.688 +NS_IMETHODIMP_(MozExternalRefCountType) 1.689 +nsPipeInputStream::AddRef(void) 1.690 +{ 1.691 + ++mReaderRefCnt; 1.692 + return mPipe->AddRef(); 1.693 +} 1.694 + 1.695 +NS_IMETHODIMP_(MozExternalRefCountType) 1.696 +nsPipeInputStream::Release(void) 1.697 +{ 1.698 + if (--mReaderRefCnt == 0) 1.699 + Close(); 1.700 + return mPipe->Release(); 1.701 +} 1.702 + 1.703 +NS_IMETHODIMP 1.704 +nsPipeInputStream::CloseWithStatus(nsresult reason) 1.705 +{ 1.706 + LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason)); 1.707 + 1.708 + if (NS_SUCCEEDED(reason)) 1.709 + reason = NS_BASE_STREAM_CLOSED; 1.710 + 1.711 + mPipe->OnPipeException(reason); 1.712 + return NS_OK; 1.713 +} 1.714 + 1.715 +NS_IMETHODIMP 1.716 +nsPipeInputStream::Close() 1.717 +{ 1.718 + return CloseWithStatus(NS_BASE_STREAM_CLOSED); 1.719 +} 1.720 + 1.721 +NS_IMETHODIMP 1.722 +nsPipeInputStream::Available(uint64_t *result) 1.723 +{ 1.724 + // nsPipeInputStream supports under 4GB stream only 1.725 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.726 + 1.727 + // return error if pipe closed 1.728 + if (!mAvailable && NS_FAILED(mPipe->mStatus)) 1.729 + return mPipe->mStatus; 1.730 + 1.731 + *result = (uint64_t)mAvailable; 1.732 + return NS_OK; 1.733 +} 1.734 + 1.735 +NS_IMETHODIMP 1.736 +nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer, 1.737 + void *closure, 1.738 + uint32_t count, 1.739 + uint32_t *readCount) 1.740 +{ 1.741 + LOG(("III ReadSegments [this=%x count=%u]\n", this, count)); 1.742 + 1.743 + nsresult rv = NS_OK; 1.744 + 1.745 + const char *segment; 1.746 + uint32_t segmentLen; 1.747 + 1.748 + *readCount = 0; 1.749 + while (count) { 1.750 + rv = mPipe->GetReadSegment(segment, segmentLen); 1.751 + if (NS_FAILED(rv)) { 1.752 + // ignore this error if we've already read something. 1.753 + if (*readCount > 0) { 1.754 + rv = NS_OK; 1.755 + break; 1.756 + } 1.757 + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 1.758 + // pipe is empty 1.759 + if (!mBlocking) 1.760 + break; 1.761 + // wait for some data to be written to the pipe 1.762 + rv = Wait(); 1.763 + if (NS_SUCCEEDED(rv)) 1.764 + continue; 1.765 + } 1.766 + // ignore this error, just return. 1.767 + if (rv == NS_BASE_STREAM_CLOSED) { 1.768 + rv = NS_OK; 1.769 + break; 1.770 + } 1.771 + mPipe->OnPipeException(rv); 1.772 + break; 1.773 + } 1.774 + 1.775 + // read no more than count 1.776 + if (segmentLen > count) 1.777 + segmentLen = count; 1.778 + 1.779 + uint32_t writeCount, originalLen = segmentLen; 1.780 + while (segmentLen) { 1.781 + writeCount = 0; 1.782 + 1.783 + rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount); 1.784 + 1.785 + if (NS_FAILED(rv) || writeCount == 0) { 1.786 + count = 0; 1.787 + // any errors returned from the writer end here: do not 1.788 + // propagate to the caller of ReadSegments. 1.789 + rv = NS_OK; 1.790 + break; 1.791 + } 1.792 + 1.793 + NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected"); 1.794 + segment += writeCount; 1.795 + segmentLen -= writeCount; 1.796 + count -= writeCount; 1.797 + *readCount += writeCount; 1.798 + mLogicalOffset += writeCount; 1.799 + } 1.800 + 1.801 + if (segmentLen < originalLen) 1.802 + mPipe->AdvanceReadCursor(originalLen - segmentLen); 1.803 + } 1.804 + 1.805 + return rv; 1.806 +} 1.807 + 1.808 +NS_IMETHODIMP 1.809 +nsPipeInputStream::Read(char* toBuf, uint32_t bufLen, uint32_t *readCount) 1.810 +{ 1.811 + return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount); 1.812 +} 1.813 + 1.814 +NS_IMETHODIMP 1.815 +nsPipeInputStream::IsNonBlocking(bool *aNonBlocking) 1.816 +{ 1.817 + *aNonBlocking = !mBlocking; 1.818 + return NS_OK; 1.819 +} 1.820 + 1.821 +NS_IMETHODIMP 1.822 +nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback, 1.823 + uint32_t flags, 1.824 + uint32_t requestedCount, 1.825 + nsIEventTarget *target) 1.826 +{ 1.827 + LOG(("III AsyncWait [this=%x]\n", this)); 1.828 + 1.829 + nsPipeEvents pipeEvents; 1.830 + { 1.831 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.832 + 1.833 + // replace a pending callback 1.834 + mCallback = 0; 1.835 + mCallbackFlags = 0; 1.836 + 1.837 + if (!callback) 1.838 + return NS_OK; 1.839 + 1.840 + nsCOMPtr<nsIInputStreamCallback> proxy; 1.841 + if (target) { 1.842 + proxy = NS_NewInputStreamReadyEvent(callback, target); 1.843 + callback = proxy; 1.844 + } 1.845 + 1.846 + if (NS_FAILED(mPipe->mStatus) || 1.847 + (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) { 1.848 + // stream is already closed or readable; post event. 1.849 + pipeEvents.NotifyInputReady(this, callback); 1.850 + } 1.851 + else { 1.852 + // queue up callback object to be notified when data becomes available 1.853 + mCallback = callback; 1.854 + mCallbackFlags = flags; 1.855 + } 1.856 + } 1.857 + return NS_OK; 1.858 +} 1.859 + 1.860 +NS_IMETHODIMP 1.861 +nsPipeInputStream::Seek(int32_t whence, int64_t offset) 1.862 +{ 1.863 + NS_NOTREACHED("nsPipeInputStream::Seek"); 1.864 + return NS_ERROR_NOT_IMPLEMENTED; 1.865 +} 1.866 + 1.867 +NS_IMETHODIMP 1.868 +nsPipeInputStream::Tell(int64_t *offset) 1.869 +{ 1.870 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.871 + 1.872 + // return error if pipe closed 1.873 + if (!mAvailable && NS_FAILED(mPipe->mStatus)) 1.874 + return mPipe->mStatus; 1.875 + 1.876 + *offset = mLogicalOffset; 1.877 + return NS_OK; 1.878 +} 1.879 + 1.880 +NS_IMETHODIMP 1.881 +nsPipeInputStream::SetEOF() 1.882 +{ 1.883 + NS_NOTREACHED("nsPipeInputStream::SetEOF"); 1.884 + return NS_ERROR_NOT_IMPLEMENTED; 1.885 +} 1.886 + 1.887 +#define COMPARE(s1, s2, i) \ 1.888 + (ignoreCase \ 1.889 + ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \ 1.890 + : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i)) 1.891 + 1.892 +NS_IMETHODIMP 1.893 +nsPipeInputStream::Search(const char *forString, 1.894 + bool ignoreCase, 1.895 + bool *found, 1.896 + uint32_t *offsetSearchedTo) 1.897 +{ 1.898 + LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase)); 1.899 + 1.900 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.901 + 1.902 + char *cursor1, *limit1; 1.903 + uint32_t index = 0, offset = 0; 1.904 + uint32_t strLen = strlen(forString); 1.905 + 1.906 + mPipe->PeekSegment(0, cursor1, limit1); 1.907 + if (cursor1 == limit1) { 1.908 + *found = false; 1.909 + *offsetSearchedTo = 0; 1.910 + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); 1.911 + return NS_OK; 1.912 + } 1.913 + 1.914 + while (true) { 1.915 + uint32_t i, len1 = limit1 - cursor1; 1.916 + 1.917 + // check if the string is in the buffer segment 1.918 + for (i = 0; i < len1 - strLen + 1; i++) { 1.919 + if (COMPARE(&cursor1[i], forString, strLen) == 0) { 1.920 + *found = true; 1.921 + *offsetSearchedTo = offset + i; 1.922 + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); 1.923 + return NS_OK; 1.924 + } 1.925 + } 1.926 + 1.927 + // get the next segment 1.928 + char *cursor2, *limit2; 1.929 + uint32_t len2; 1.930 + 1.931 + index++; 1.932 + offset += len1; 1.933 + 1.934 + mPipe->PeekSegment(index, cursor2, limit2); 1.935 + if (cursor2 == limit2) { 1.936 + *found = false; 1.937 + *offsetSearchedTo = offset - strLen + 1; 1.938 + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); 1.939 + return NS_OK; 1.940 + } 1.941 + len2 = limit2 - cursor2; 1.942 + 1.943 + // check if the string is straddling the next buffer segment 1.944 + uint32_t lim = XPCOM_MIN(strLen, len2 + 1); 1.945 + for (i = 0; i < lim; ++i) { 1.946 + uint32_t strPart1Len = strLen - i - 1; 1.947 + uint32_t strPart2Len = strLen - strPart1Len; 1.948 + const char* strPart2 = &forString[strLen - strPart2Len]; 1.949 + uint32_t bufSeg1Offset = len1 - strPart1Len; 1.950 + if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 && 1.951 + COMPARE(cursor2, strPart2, strPart2Len) == 0) { 1.952 + *found = true; 1.953 + *offsetSearchedTo = offset - strPart1Len; 1.954 + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); 1.955 + return NS_OK; 1.956 + } 1.957 + } 1.958 + 1.959 + // finally continue with the next buffer 1.960 + cursor1 = cursor2; 1.961 + limit1 = limit2; 1.962 + } 1.963 + 1.964 + NS_NOTREACHED("can't get here"); 1.965 + return NS_ERROR_UNEXPECTED; // keep compiler happy 1.966 +} 1.967 + 1.968 +//----------------------------------------------------------------------------- 1.969 +// nsPipeOutputStream methods: 1.970 +//----------------------------------------------------------------------------- 1.971 + 1.972 +NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, 1.973 + nsIOutputStream, 1.974 + nsIAsyncOutputStream, 1.975 + nsIClassInfo) 1.976 + 1.977 +NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, 1.978 + nsIOutputStream, 1.979 + nsIAsyncOutputStream) 1.980 + 1.981 +NS_IMPL_THREADSAFE_CI(nsPipeOutputStream) 1.982 + 1.983 +nsresult 1.984 +nsPipeOutputStream::Wait() 1.985 +{ 1.986 + NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream"); 1.987 + 1.988 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.989 + 1.990 + if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { 1.991 + LOG(("OOO pipe output: waiting for space\n")); 1.992 + mBlocked = true; 1.993 + mon.Wait(); 1.994 + mBlocked = false; 1.995 + LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n", 1.996 + mPipe->mStatus, mWritable)); 1.997 + } 1.998 + 1.999 + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; 1.1000 +} 1.1001 + 1.1002 +bool 1.1003 +nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events) 1.1004 +{ 1.1005 + bool result = false; 1.1006 + 1.1007 + mWritable = true; 1.1008 + 1.1009 + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { 1.1010 + events.NotifyOutputReady(this, mCallback); 1.1011 + mCallback = 0; 1.1012 + mCallbackFlags = 0; 1.1013 + } 1.1014 + else if (mBlocked) 1.1015 + result = true; 1.1016 + 1.1017 + return result; 1.1018 +} 1.1019 + 1.1020 +bool 1.1021 +nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events) 1.1022 +{ 1.1023 + LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", 1.1024 + this, reason)); 1.1025 + 1.1026 + bool result = false; 1.1027 + 1.1028 + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); 1.1029 + mWritable = false; 1.1030 + 1.1031 + if (mCallback) { 1.1032 + events.NotifyOutputReady(this, mCallback); 1.1033 + mCallback = 0; 1.1034 + mCallbackFlags = 0; 1.1035 + } 1.1036 + else if (mBlocked) 1.1037 + result = true; 1.1038 + 1.1039 + return result; 1.1040 +} 1.1041 + 1.1042 + 1.1043 +NS_IMETHODIMP_(MozExternalRefCountType) 1.1044 +nsPipeOutputStream::AddRef() 1.1045 +{ 1.1046 + ++mWriterRefCnt; 1.1047 + return mPipe->AddRef(); 1.1048 +} 1.1049 + 1.1050 +NS_IMETHODIMP_(MozExternalRefCountType) 1.1051 +nsPipeOutputStream::Release() 1.1052 +{ 1.1053 + if (--mWriterRefCnt == 0) 1.1054 + Close(); 1.1055 + return mPipe->Release(); 1.1056 +} 1.1057 + 1.1058 +NS_IMETHODIMP 1.1059 +nsPipeOutputStream::CloseWithStatus(nsresult reason) 1.1060 +{ 1.1061 + LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason)); 1.1062 + 1.1063 + if (NS_SUCCEEDED(reason)) 1.1064 + reason = NS_BASE_STREAM_CLOSED; 1.1065 + 1.1066 + // input stream may remain open 1.1067 + mPipe->OnPipeException(reason, true); 1.1068 + return NS_OK; 1.1069 +} 1.1070 + 1.1071 +NS_IMETHODIMP 1.1072 +nsPipeOutputStream::Close() 1.1073 +{ 1.1074 + return CloseWithStatus(NS_BASE_STREAM_CLOSED); 1.1075 +} 1.1076 + 1.1077 +NS_IMETHODIMP 1.1078 +nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader, 1.1079 + void* closure, 1.1080 + uint32_t count, 1.1081 + uint32_t *writeCount) 1.1082 +{ 1.1083 + LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count)); 1.1084 + 1.1085 + nsresult rv = NS_OK; 1.1086 + 1.1087 + char *segment; 1.1088 + uint32_t segmentLen; 1.1089 + 1.1090 + *writeCount = 0; 1.1091 + while (count) { 1.1092 + rv = mPipe->GetWriteSegment(segment, segmentLen); 1.1093 + if (NS_FAILED(rv)) { 1.1094 + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 1.1095 + // pipe is full 1.1096 + if (!mBlocking) { 1.1097 + // ignore this error if we've already written something 1.1098 + if (*writeCount > 0) 1.1099 + rv = NS_OK; 1.1100 + break; 1.1101 + } 1.1102 + // wait for the pipe to have an empty segment. 1.1103 + rv = Wait(); 1.1104 + if (NS_SUCCEEDED(rv)) 1.1105 + continue; 1.1106 + } 1.1107 + mPipe->OnPipeException(rv); 1.1108 + break; 1.1109 + } 1.1110 + 1.1111 + // write no more than count 1.1112 + if (segmentLen > count) 1.1113 + segmentLen = count; 1.1114 + 1.1115 + uint32_t readCount, originalLen = segmentLen; 1.1116 + while (segmentLen) { 1.1117 + readCount = 0; 1.1118 + 1.1119 + rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount); 1.1120 + 1.1121 + if (NS_FAILED(rv) || readCount == 0) { 1.1122 + count = 0; 1.1123 + // any errors returned from the reader end here: do not 1.1124 + // propagate to the caller of WriteSegments. 1.1125 + rv = NS_OK; 1.1126 + break; 1.1127 + } 1.1128 + 1.1129 + NS_ASSERTION(readCount <= segmentLen, "read more than expected"); 1.1130 + segment += readCount; 1.1131 + segmentLen -= readCount; 1.1132 + count -= readCount; 1.1133 + *writeCount += readCount; 1.1134 + mLogicalOffset += readCount; 1.1135 + } 1.1136 + 1.1137 + if (segmentLen < originalLen) 1.1138 + mPipe->AdvanceWriteCursor(originalLen - segmentLen); 1.1139 + } 1.1140 + 1.1141 + return rv; 1.1142 +} 1.1143 + 1.1144 +static NS_METHOD 1.1145 +nsReadFromRawBuffer(nsIOutputStream* outStr, 1.1146 + void* closure, 1.1147 + char* toRawSegment, 1.1148 + uint32_t offset, 1.1149 + uint32_t count, 1.1150 + uint32_t *readCount) 1.1151 +{ 1.1152 + const char* fromBuf = (const char*)closure; 1.1153 + memcpy(toRawSegment, &fromBuf[offset], count); 1.1154 + *readCount = count; 1.1155 + return NS_OK; 1.1156 +} 1.1157 + 1.1158 +NS_IMETHODIMP 1.1159 +nsPipeOutputStream::Write(const char* fromBuf, 1.1160 + uint32_t bufLen, 1.1161 + uint32_t *writeCount) 1.1162 +{ 1.1163 + return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount); 1.1164 +} 1.1165 + 1.1166 +NS_IMETHODIMP 1.1167 +nsPipeOutputStream::Flush(void) 1.1168 +{ 1.1169 + // nothing to do 1.1170 + return NS_OK; 1.1171 +} 1.1172 + 1.1173 +static NS_METHOD 1.1174 +nsReadFromInputStream(nsIOutputStream* outStr, 1.1175 + void* closure, 1.1176 + char* toRawSegment, 1.1177 + uint32_t offset, 1.1178 + uint32_t count, 1.1179 + uint32_t *readCount) 1.1180 +{ 1.1181 + nsIInputStream* fromStream = (nsIInputStream*)closure; 1.1182 + return fromStream->Read(toRawSegment, count, readCount); 1.1183 +} 1.1184 + 1.1185 +NS_IMETHODIMP 1.1186 +nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream, 1.1187 + uint32_t count, 1.1188 + uint32_t *writeCount) 1.1189 +{ 1.1190 + return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount); 1.1191 +} 1.1192 + 1.1193 +NS_IMETHODIMP 1.1194 +nsPipeOutputStream::IsNonBlocking(bool *aNonBlocking) 1.1195 +{ 1.1196 + *aNonBlocking = !mBlocking; 1.1197 + return NS_OK; 1.1198 +} 1.1199 + 1.1200 +NS_IMETHODIMP 1.1201 +nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback, 1.1202 + uint32_t flags, 1.1203 + uint32_t requestedCount, 1.1204 + nsIEventTarget *target) 1.1205 +{ 1.1206 + LOG(("OOO AsyncWait [this=%x]\n", this)); 1.1207 + 1.1208 + nsPipeEvents pipeEvents; 1.1209 + { 1.1210 + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); 1.1211 + 1.1212 + // replace a pending callback 1.1213 + mCallback = 0; 1.1214 + mCallbackFlags = 0; 1.1215 + 1.1216 + if (!callback) 1.1217 + return NS_OK; 1.1218 + 1.1219 + nsCOMPtr<nsIOutputStreamCallback> proxy; 1.1220 + if (target) { 1.1221 + proxy = NS_NewOutputStreamReadyEvent(callback, target); 1.1222 + callback = proxy; 1.1223 + } 1.1224 + 1.1225 + if (NS_FAILED(mPipe->mStatus) || 1.1226 + (mWritable && !(flags & WAIT_CLOSURE_ONLY))) { 1.1227 + // stream is already closed or writable; post event. 1.1228 + pipeEvents.NotifyOutputReady(this, callback); 1.1229 + } 1.1230 + else { 1.1231 + // queue up callback object to be notified when data becomes available 1.1232 + mCallback = callback; 1.1233 + mCallbackFlags = flags; 1.1234 + } 1.1235 + } 1.1236 + return NS_OK; 1.1237 +} 1.1238 + 1.1239 +//////////////////////////////////////////////////////////////////////////////// 1.1240 + 1.1241 +nsresult 1.1242 +NS_NewPipe(nsIInputStream **pipeIn, 1.1243 + nsIOutputStream **pipeOut, 1.1244 + uint32_t segmentSize, 1.1245 + uint32_t maxSize, 1.1246 + bool nonBlockingInput, 1.1247 + bool nonBlockingOutput) 1.1248 +{ 1.1249 + if (segmentSize == 0) 1.1250 + segmentSize = DEFAULT_SEGMENT_SIZE; 1.1251 + 1.1252 + // Handle maxSize of UINT32_MAX as a special case 1.1253 + uint32_t segmentCount; 1.1254 + if (maxSize == UINT32_MAX) 1.1255 + segmentCount = UINT32_MAX; 1.1256 + else 1.1257 + segmentCount = maxSize / segmentSize; 1.1258 + 1.1259 + nsIAsyncInputStream *in; 1.1260 + nsIAsyncOutputStream *out; 1.1261 + nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, 1.1262 + segmentSize, segmentCount); 1.1263 + if (NS_FAILED(rv)) return rv; 1.1264 + 1.1265 + *pipeIn = in; 1.1266 + *pipeOut = out; 1.1267 + return NS_OK; 1.1268 +} 1.1269 + 1.1270 +nsresult 1.1271 +NS_NewPipe2(nsIAsyncInputStream **pipeIn, 1.1272 + nsIAsyncOutputStream **pipeOut, 1.1273 + bool nonBlockingInput, 1.1274 + bool nonBlockingOutput, 1.1275 + uint32_t segmentSize, 1.1276 + uint32_t segmentCount) 1.1277 +{ 1.1278 + nsresult rv; 1.1279 + 1.1280 + nsPipe *pipe = new nsPipe(); 1.1281 + if (!pipe) 1.1282 + return NS_ERROR_OUT_OF_MEMORY; 1.1283 + 1.1284 + rv = pipe->Init(nonBlockingInput, 1.1285 + nonBlockingOutput, 1.1286 + segmentSize, 1.1287 + segmentCount); 1.1288 + if (NS_FAILED(rv)) { 1.1289 + NS_ADDREF(pipe); 1.1290 + NS_RELEASE(pipe); 1.1291 + return rv; 1.1292 + } 1.1293 + 1.1294 + pipe->GetInputStream(pipeIn); 1.1295 + pipe->GetOutputStream(pipeOut); 1.1296 + return NS_OK; 1.1297 +} 1.1298 + 1.1299 +nsresult 1.1300 +nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result) 1.1301 +{ 1.1302 + if (outer) 1.1303 + return NS_ERROR_NO_AGGREGATION; 1.1304 + nsPipe *pipe = new nsPipe(); 1.1305 + if (!pipe) 1.1306 + return NS_ERROR_OUT_OF_MEMORY; 1.1307 + NS_ADDREF(pipe); 1.1308 + nsresult rv = pipe->QueryInterface(iid, result); 1.1309 + NS_RELEASE(pipe); 1.1310 + return rv; 1.1311 +} 1.1312 + 1.1313 +////////////////////////////////////////////////////////////////////////////////