Tue, 06 Jan 2015 21:39:09 +0100
Conditionally force memory storage according to privacy.thirdparty.isolate;
This solves Tor bug #9701, complying with disk avoidance documented in
https://www.torproject.org/projects/torbrowser/design/#disk-avoidance.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "mozilla/Attributes.h"
6 #include "mozilla/ReentrantMonitor.h"
7 #include "nsIPipe.h"
8 #include "nsIEventTarget.h"
9 #include "nsISeekableStream.h"
10 #include "nsIProgrammingLanguage.h"
11 #include "nsSegmentedBuffer.h"
12 #include "nsStreamUtils.h"
13 #include "nsCOMPtr.h"
14 #include "nsCRT.h"
15 #include "prlog.h"
16 #include "nsIClassInfoImpl.h"
17 #include "nsAlgorithm.h"
18 #include "nsMemory.h"
19 #include "nsIAsyncInputStream.h"
20 #include "nsIAsyncOutputStream.h"
22 using namespace mozilla;
24 #ifdef LOG
25 #undef LOG
26 #endif
27 #if defined(PR_LOGGING)
28 //
29 // set NSPR_LOG_MODULES=nsPipe:5
30 //
31 static PRLogModuleInfo *
32 GetPipeLog()
33 {
34 static PRLogModuleInfo *sLog;
35 if (!sLog)
36 sLog = PR_NewLogModule("nsPipe");
37 return sLog;
38 }
39 #define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args)
40 #else
41 #define LOG(args)
42 #endif
44 #define DEFAULT_SEGMENT_SIZE 4096
45 #define DEFAULT_SEGMENT_COUNT 16
47 class nsPipe;
48 class nsPipeEvents;
49 class nsPipeInputStream;
50 class nsPipeOutputStream;
52 //-----------------------------------------------------------------------------
54 // this class is used to delay notifications until the end of a particular
55 // scope. it helps avoid the complexity of issuing callbacks while inside
56 // a critical section.
57 class nsPipeEvents
58 {
59 public:
60 nsPipeEvents() { }
61 ~nsPipeEvents();
63 inline void NotifyInputReady(nsIAsyncInputStream *stream,
64 nsIInputStreamCallback *callback)
65 {
66 NS_ASSERTION(!mInputCallback, "already have an input event");
67 mInputStream = stream;
68 mInputCallback = callback;
69 }
71 inline void NotifyOutputReady(nsIAsyncOutputStream *stream,
72 nsIOutputStreamCallback *callback)
73 {
74 NS_ASSERTION(!mOutputCallback, "already have an output event");
75 mOutputStream = stream;
76 mOutputCallback = callback;
77 }
79 private:
80 nsCOMPtr<nsIAsyncInputStream> mInputStream;
81 nsCOMPtr<nsIInputStreamCallback> mInputCallback;
82 nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
83 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
84 };
86 //-----------------------------------------------------------------------------
88 // the input end of a pipe (allocated as a member of the pipe).
89 class nsPipeInputStream : public nsIAsyncInputStream
90 , public nsISeekableStream
91 , public nsISearchableInputStream
92 , public nsIClassInfo
93 {
94 public:
95 // since this class will be allocated as a member of the pipe, we do not
96 // need our own ref count. instead, we share the lifetime (the ref count)
97 // of the entire pipe. this macro is just convenience since it does not
98 // declare a mRefCount variable; however, don't let the name fool you...
99 // we are not inheriting from nsPipe ;-)
100 NS_DECL_ISUPPORTS_INHERITED
102 NS_DECL_NSIINPUTSTREAM
103 NS_DECL_NSIASYNCINPUTSTREAM
104 NS_DECL_NSISEEKABLESTREAM
105 NS_DECL_NSISEARCHABLEINPUTSTREAM
106 NS_DECL_NSICLASSINFO
108 nsPipeInputStream(nsPipe *pipe)
109 : mPipe(pipe)
110 , mReaderRefCnt(0)
111 , mLogicalOffset(0)
112 , mBlocking(true)
113 , mBlocked(false)
114 , mAvailable(0)
115 , mCallbackFlags(0)
116 { }
118 nsresult Fill();
119 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
121 uint32_t Available() { return mAvailable; }
122 void ReduceAvailable(uint32_t avail) { mAvailable -= avail; }
124 // synchronously wait for the pipe to become readable.
125 nsresult Wait();
127 // these functions return true to indicate that the pipe's monitor should
128 // be notified, to wake up a blocked reader if any.
129 bool OnInputReadable(uint32_t bytesWritten, nsPipeEvents &);
130 bool OnInputException(nsresult, nsPipeEvents &);
132 private:
133 nsPipe *mPipe;
135 // separate refcnt so that we know when to close the consumer
136 mozilla::ThreadSafeAutoRefCnt mReaderRefCnt;
137 int64_t mLogicalOffset;
138 bool mBlocking;
140 // these variables can only be accessed while inside the pipe's monitor
141 bool mBlocked;
142 uint32_t mAvailable;
143 nsCOMPtr<nsIInputStreamCallback> mCallback;
144 uint32_t mCallbackFlags;
145 };
147 //-----------------------------------------------------------------------------
149 // the output end of a pipe (allocated as a member of the pipe).
150 class nsPipeOutputStream : public nsIAsyncOutputStream
151 , public nsIClassInfo
152 {
153 public:
154 // since this class will be allocated as a member of the pipe, we do not
155 // need our own ref count. instead, we share the lifetime (the ref count)
156 // of the entire pipe. this macro is just convenience since it does not
157 // declare a mRefCount variable; however, don't let the name fool you...
158 // we are not inheriting from nsPipe ;-)
159 NS_DECL_ISUPPORTS_INHERITED
161 NS_DECL_NSIOUTPUTSTREAM
162 NS_DECL_NSIASYNCOUTPUTSTREAM
163 NS_DECL_NSICLASSINFO
165 nsPipeOutputStream(nsPipe *pipe)
166 : mPipe(pipe)
167 , mWriterRefCnt(0)
168 , mLogicalOffset(0)
169 , mBlocking(true)
170 , mBlocked(false)
171 , mWritable(true)
172 , mCallbackFlags(0)
173 { }
175 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
176 void SetWritable(bool writable) { mWritable = writable; }
178 // synchronously wait for the pipe to become writable.
179 nsresult Wait();
181 // these functions return true to indicate that the pipe's monitor should
182 // be notified, to wake up a blocked writer if any.
183 bool OnOutputWritable(nsPipeEvents &);
184 bool OnOutputException(nsresult, nsPipeEvents &);
186 private:
187 nsPipe *mPipe;
189 // separate refcnt so that we know when to close the producer
190 mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
191 int64_t mLogicalOffset;
192 bool mBlocking;
194 // these variables can only be accessed while inside the pipe's monitor
195 bool mBlocked;
196 bool mWritable;
197 nsCOMPtr<nsIOutputStreamCallback> mCallback;
198 uint32_t mCallbackFlags;
199 };
201 //-----------------------------------------------------------------------------
203 class nsPipe MOZ_FINAL : public nsIPipe
204 {
205 public:
206 friend class nsPipeInputStream;
207 friend class nsPipeOutputStream;
209 NS_DECL_THREADSAFE_ISUPPORTS
210 NS_DECL_NSIPIPE
212 // nsPipe methods:
213 nsPipe();
215 private:
216 ~nsPipe();
218 public:
219 //
220 // methods below may only be called while inside the pipe's monitor
221 //
223 void PeekSegment(uint32_t n, char *&cursor, char *&limit);
225 //
226 // methods below may be called while outside the pipe's monitor
227 //
229 nsresult GetReadSegment(const char *&segment, uint32_t &segmentLen);
230 void AdvanceReadCursor(uint32_t count);
232 nsresult GetWriteSegment(char *&segment, uint32_t &segmentLen);
233 void AdvanceWriteCursor(uint32_t count);
235 void OnPipeException(nsresult reason, bool outputOnly = false);
237 protected:
238 // We can't inherit from both nsIInputStream and nsIOutputStream
239 // because they collide on their Close method. Consequently we nest their
240 // implementations to avoid the extra object allocation.
241 nsPipeInputStream mInput;
242 nsPipeOutputStream mOutput;
244 ReentrantMonitor mReentrantMonitor;
245 nsSegmentedBuffer mBuffer;
247 char* mReadCursor;
248 char* mReadLimit;
250 int32_t mWriteSegment;
251 char* mWriteCursor;
252 char* mWriteLimit;
254 nsresult mStatus;
255 bool mInited;
256 };
258 //
259 // NOTES on buffer architecture:
260 //
261 // +-----------------+ - - mBuffer.GetSegment(0)
262 // | |
263 // + - - - - - - - - + - - mReadCursor
264 // |/////////////////|
265 // |/////////////////|
266 // |/////////////////|
267 // |/////////////////|
268 // +-----------------+ - - mReadLimit
269 // |
270 // +-----------------+
271 // |/////////////////|
272 // |/////////////////|
273 // |/////////////////|
274 // |/////////////////|
275 // |/////////////////|
276 // |/////////////////|
277 // +-----------------+
278 // |
279 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
280 // |/////////////////|
281 // |/////////////////|
282 // |/////////////////|
283 // + - - - - - - - - + - - mWriteCursor
284 // | |
285 // | |
286 // +-----------------+ - - mWriteLimit
287 //
288 // (shaded region contains data)
289 //
290 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
291 // small allocations (e.g., 64 byte allocations). this means that buffers may
292 // be allocated back-to-back. in the diagram above, for example, mReadLimit
293 // would actually be pointing at the beginning of the next segment. when
294 // making changes to this file, please keep this fact in mind.
295 //
297 //-----------------------------------------------------------------------------
298 // nsPipe methods:
299 //-----------------------------------------------------------------------------
301 nsPipe::nsPipe()
302 : mInput(MOZ_THIS_IN_INITIALIZER_LIST())
303 , mOutput(MOZ_THIS_IN_INITIALIZER_LIST())
304 , mReentrantMonitor("nsPipe.mReentrantMonitor")
305 , mReadCursor(nullptr)
306 , mReadLimit(nullptr)
307 , mWriteSegment(-1)
308 , mWriteCursor(nullptr)
309 , mWriteLimit(nullptr)
310 , mStatus(NS_OK)
311 , mInited(false)
312 {
313 }
315 nsPipe::~nsPipe()
316 {
317 }
319 NS_IMPL_ISUPPORTS(nsPipe, nsIPipe)
321 NS_IMETHODIMP
322 nsPipe::Init(bool nonBlockingIn,
323 bool nonBlockingOut,
324 uint32_t segmentSize,
325 uint32_t segmentCount)
326 {
327 mInited = true;
329 if (segmentSize == 0)
330 segmentSize = DEFAULT_SEGMENT_SIZE;
331 if (segmentCount == 0)
332 segmentCount = DEFAULT_SEGMENT_COUNT;
334 // protect against overflow
335 uint32_t maxCount = uint32_t(-1) / segmentSize;
336 if (segmentCount > maxCount)
337 segmentCount = maxCount;
339 nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount);
340 if (NS_FAILED(rv))
341 return rv;
343 mInput.SetNonBlocking(nonBlockingIn);
344 mOutput.SetNonBlocking(nonBlockingOut);
345 return NS_OK;
346 }
348 NS_IMETHODIMP
349 nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream)
350 {
351 NS_ADDREF(*aInputStream = &mInput);
352 return NS_OK;
353 }
355 NS_IMETHODIMP
356 nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream)
357 {
358 if (NS_WARN_IF(!mInited))
359 return NS_ERROR_NOT_INITIALIZED;
360 NS_ADDREF(*aOutputStream = &mOutput);
361 return NS_OK;
362 }
364 void
365 nsPipe::PeekSegment(uint32_t index, char *&cursor, char *&limit)
366 {
367 if (index == 0) {
368 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
369 cursor = mReadCursor;
370 limit = mReadLimit;
371 }
372 else {
373 uint32_t numSegments = mBuffer.GetSegmentCount();
374 if (index >= numSegments)
375 cursor = limit = nullptr;
376 else {
377 cursor = mBuffer.GetSegment(index);
378 if (mWriteSegment == (int32_t) index)
379 limit = mWriteCursor;
380 else
381 limit = cursor + mBuffer.GetSegmentSize();
382 }
383 }
384 }
386 nsresult
387 nsPipe::GetReadSegment(const char *&segment, uint32_t &segmentLen)
388 {
389 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
391 if (mReadCursor == mReadLimit)
392 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
394 segment = mReadCursor;
395 segmentLen = mReadLimit - mReadCursor;
396 return NS_OK;
397 }
399 void
400 nsPipe::AdvanceReadCursor(uint32_t bytesRead)
401 {
402 NS_ASSERTION(bytesRead, "don't call if no bytes read");
404 nsPipeEvents events;
405 {
406 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
408 LOG(("III advancing read cursor by %u\n", bytesRead));
409 NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much");
411 mReadCursor += bytesRead;
412 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
414 mInput.ReduceAvailable(bytesRead);
416 if (mReadCursor == mReadLimit) {
417 // we've reached the limit of how much we can read from this segment.
418 // if at the end of this segment, then we must discard this segment.
420 // if still writing in this segment then bail because we're not done
421 // with the segment and have to wait for now...
422 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
423 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
424 return;
425 }
427 // shift write segment index (-1 indicates an empty buffer).
428 --mWriteSegment;
430 // done with this segment
431 mBuffer.DeleteFirstSegment();
432 LOG(("III deleting first segment\n"));
434 if (mWriteSegment == -1) {
435 // buffer is completely empty
436 mReadCursor = nullptr;
437 mReadLimit = nullptr;
438 mWriteCursor = nullptr;
439 mWriteLimit = nullptr;
440 }
441 else {
442 // advance read cursor and limit to next buffer segment
443 mReadCursor = mBuffer.GetSegment(0);
444 if (mWriteSegment == 0)
445 mReadLimit = mWriteCursor;
446 else
447 mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
448 }
450 // we've free'd up a segment, so notify output stream that pipe has
451 // room for a new segment.
452 if (mOutput.OnOutputWritable(events))
453 mon.Notify();
454 }
455 }
456 }
458 nsresult
459 nsPipe::GetWriteSegment(char *&segment, uint32_t &segmentLen)
460 {
461 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
463 if (NS_FAILED(mStatus))
464 return mStatus;
466 // write cursor and limit may both be null indicating an empty buffer.
467 if (mWriteCursor == mWriteLimit) {
468 char *seg = mBuffer.AppendNewSegment();
469 // pipe is full
470 if (seg == nullptr)
471 return NS_BASE_STREAM_WOULD_BLOCK;
472 LOG(("OOO appended new segment\n"));
473 mWriteCursor = seg;
474 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
475 ++mWriteSegment;
476 }
478 // make sure read cursor is initialized
479 if (mReadCursor == nullptr) {
480 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
481 mReadCursor = mReadLimit = mWriteCursor;
482 }
484 // check to see if we can roll-back our read and write cursors to the
485 // beginning of the current/first segment. this is purely an optimization.
486 if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
487 char *head = mBuffer.GetSegment(0);
488 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
489 mWriteCursor = mReadCursor = mReadLimit = head;
490 }
492 segment = mWriteCursor;
493 segmentLen = mWriteLimit - mWriteCursor;
494 return NS_OK;
495 }
497 void
498 nsPipe::AdvanceWriteCursor(uint32_t bytesWritten)
499 {
500 NS_ASSERTION(bytesWritten, "don't call if no bytes written");
502 nsPipeEvents events;
503 {
504 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
506 LOG(("OOO advancing write cursor by %u\n", bytesWritten));
508 char *newWriteCursor = mWriteCursor + bytesWritten;
509 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
511 // update read limit if reading in the same segment
512 if (mWriteSegment == 0 && mReadLimit == mWriteCursor)
513 mReadLimit = newWriteCursor;
515 mWriteCursor = newWriteCursor;
517 // The only way mReadCursor == mWriteCursor is if:
518 //
519 // - mReadCursor is at the start of a segment (which, based on how
520 // nsSegmentedBuffer works, means that this segment is the "first"
521 // segment)
522 // - mWriteCursor points at the location past the end of the current
523 // write segment (so the current write filled the current write
524 // segment, so we've incremented mWriteCursor to point past the end
525 // of it)
526 // - the segment to which data has just been written is located
527 // exactly one segment's worth of bytes before the first segment
528 // where mReadCursor is located
529 //
530 // Consequently, the byte immediately after the end of the current
531 // write segment is the first byte of the first segment, so
532 // mReadCursor == mWriteCursor. (Another way to think about this is
533 // to consider the buffer architecture diagram above, but consider it
534 // with an arena allocator which allocates from the *end* of the
535 // arena to the *beginning* of the arena.)
536 NS_ASSERTION(mReadCursor != mWriteCursor ||
537 (mBuffer.GetSegment(0) == mReadCursor &&
538 mWriteCursor == mWriteLimit),
539 "read cursor is bad");
541 // update the writable flag on the output stream
542 if (mWriteCursor == mWriteLimit) {
543 if (mBuffer.GetSize() >= mBuffer.GetMaxSize())
544 mOutput.SetWritable(false);
545 }
547 // notify input stream that pipe now contains additional data
548 if (mInput.OnInputReadable(bytesWritten, events))
549 mon.Notify();
550 }
551 }
553 void
554 nsPipe::OnPipeException(nsresult reason, bool outputOnly)
555 {
556 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
557 reason, outputOnly));
559 nsPipeEvents events;
560 {
561 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
563 // if we've already hit an exception, then ignore this one.
564 if (NS_FAILED(mStatus))
565 return;
567 mStatus = reason;
569 // an output-only exception applies to the input end if the pipe has
570 // zero bytes available.
571 if (outputOnly && !mInput.Available())
572 outputOnly = false;
574 if (!outputOnly)
575 if (mInput.OnInputException(reason, events))
576 mon.Notify();
578 if (mOutput.OnOutputException(reason, events))
579 mon.Notify();
580 }
581 }
583 //-----------------------------------------------------------------------------
584 // nsPipeEvents methods:
585 //-----------------------------------------------------------------------------
587 nsPipeEvents::~nsPipeEvents()
588 {
589 // dispatch any pending events
591 if (mInputCallback) {
592 mInputCallback->OnInputStreamReady(mInputStream);
593 mInputCallback = 0;
594 mInputStream = 0;
595 }
596 if (mOutputCallback) {
597 mOutputCallback->OnOutputStreamReady(mOutputStream);
598 mOutputCallback = 0;
599 mOutputStream = 0;
600 }
601 }
603 //-----------------------------------------------------------------------------
604 // nsPipeInputStream methods:
605 //-----------------------------------------------------------------------------
607 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream,
608 nsIInputStream,
609 nsIAsyncInputStream,
610 nsISeekableStream,
611 nsISearchableInputStream,
612 nsIClassInfo)
614 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
615 nsIInputStream,
616 nsIAsyncInputStream,
617 nsISeekableStream,
618 nsISearchableInputStream)
620 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
622 nsresult
623 nsPipeInputStream::Wait()
624 {
625 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
627 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
629 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
630 LOG(("III pipe input: waiting for data\n"));
632 mBlocked = true;
633 mon.Wait();
634 mBlocked = false;
636 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
637 mPipe->mStatus, mAvailable));
638 }
640 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
641 }
643 bool
644 nsPipeInputStream::OnInputReadable(uint32_t bytesWritten, nsPipeEvents &events)
645 {
646 bool result = false;
648 mAvailable += bytesWritten;
650 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
651 events.NotifyInputReady(this, mCallback);
652 mCallback = 0;
653 mCallbackFlags = 0;
654 }
655 else if (mBlocked)
656 result = true;
658 return result;
659 }
661 bool
662 nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events)
663 {
664 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
665 this, reason));
667 bool result = false;
669 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
671 // force count of available bytes to zero.
672 mAvailable = 0;
674 if (mCallback) {
675 events.NotifyInputReady(this, mCallback);
676 mCallback = 0;
677 mCallbackFlags = 0;
678 }
679 else if (mBlocked)
680 result = true;
682 return result;
683 }
685 NS_IMETHODIMP_(MozExternalRefCountType)
686 nsPipeInputStream::AddRef(void)
687 {
688 ++mReaderRefCnt;
689 return mPipe->AddRef();
690 }
692 NS_IMETHODIMP_(MozExternalRefCountType)
693 nsPipeInputStream::Release(void)
694 {
695 if (--mReaderRefCnt == 0)
696 Close();
697 return mPipe->Release();
698 }
700 NS_IMETHODIMP
701 nsPipeInputStream::CloseWithStatus(nsresult reason)
702 {
703 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason));
705 if (NS_SUCCEEDED(reason))
706 reason = NS_BASE_STREAM_CLOSED;
708 mPipe->OnPipeException(reason);
709 return NS_OK;
710 }
712 NS_IMETHODIMP
713 nsPipeInputStream::Close()
714 {
715 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
716 }
718 NS_IMETHODIMP
719 nsPipeInputStream::Available(uint64_t *result)
720 {
721 // nsPipeInputStream supports under 4GB stream only
722 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
724 // return error if pipe closed
725 if (!mAvailable && NS_FAILED(mPipe->mStatus))
726 return mPipe->mStatus;
728 *result = (uint64_t)mAvailable;
729 return NS_OK;
730 }
732 NS_IMETHODIMP
733 nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer,
734 void *closure,
735 uint32_t count,
736 uint32_t *readCount)
737 {
738 LOG(("III ReadSegments [this=%x count=%u]\n", this, count));
740 nsresult rv = NS_OK;
742 const char *segment;
743 uint32_t segmentLen;
745 *readCount = 0;
746 while (count) {
747 rv = mPipe->GetReadSegment(segment, segmentLen);
748 if (NS_FAILED(rv)) {
749 // ignore this error if we've already read something.
750 if (*readCount > 0) {
751 rv = NS_OK;
752 break;
753 }
754 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
755 // pipe is empty
756 if (!mBlocking)
757 break;
758 // wait for some data to be written to the pipe
759 rv = Wait();
760 if (NS_SUCCEEDED(rv))
761 continue;
762 }
763 // ignore this error, just return.
764 if (rv == NS_BASE_STREAM_CLOSED) {
765 rv = NS_OK;
766 break;
767 }
768 mPipe->OnPipeException(rv);
769 break;
770 }
772 // read no more than count
773 if (segmentLen > count)
774 segmentLen = count;
776 uint32_t writeCount, originalLen = segmentLen;
777 while (segmentLen) {
778 writeCount = 0;
780 rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount);
782 if (NS_FAILED(rv) || writeCount == 0) {
783 count = 0;
784 // any errors returned from the writer end here: do not
785 // propagate to the caller of ReadSegments.
786 rv = NS_OK;
787 break;
788 }
790 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
791 segment += writeCount;
792 segmentLen -= writeCount;
793 count -= writeCount;
794 *readCount += writeCount;
795 mLogicalOffset += writeCount;
796 }
798 if (segmentLen < originalLen)
799 mPipe->AdvanceReadCursor(originalLen - segmentLen);
800 }
802 return rv;
803 }
805 NS_IMETHODIMP
806 nsPipeInputStream::Read(char* toBuf, uint32_t bufLen, uint32_t *readCount)
807 {
808 return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount);
809 }
811 NS_IMETHODIMP
812 nsPipeInputStream::IsNonBlocking(bool *aNonBlocking)
813 {
814 *aNonBlocking = !mBlocking;
815 return NS_OK;
816 }
818 NS_IMETHODIMP
819 nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback,
820 uint32_t flags,
821 uint32_t requestedCount,
822 nsIEventTarget *target)
823 {
824 LOG(("III AsyncWait [this=%x]\n", this));
826 nsPipeEvents pipeEvents;
827 {
828 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
830 // replace a pending callback
831 mCallback = 0;
832 mCallbackFlags = 0;
834 if (!callback)
835 return NS_OK;
837 nsCOMPtr<nsIInputStreamCallback> proxy;
838 if (target) {
839 proxy = NS_NewInputStreamReadyEvent(callback, target);
840 callback = proxy;
841 }
843 if (NS_FAILED(mPipe->mStatus) ||
844 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) {
845 // stream is already closed or readable; post event.
846 pipeEvents.NotifyInputReady(this, callback);
847 }
848 else {
849 // queue up callback object to be notified when data becomes available
850 mCallback = callback;
851 mCallbackFlags = flags;
852 }
853 }
854 return NS_OK;
855 }
857 NS_IMETHODIMP
858 nsPipeInputStream::Seek(int32_t whence, int64_t offset)
859 {
860 NS_NOTREACHED("nsPipeInputStream::Seek");
861 return NS_ERROR_NOT_IMPLEMENTED;
862 }
864 NS_IMETHODIMP
865 nsPipeInputStream::Tell(int64_t *offset)
866 {
867 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
869 // return error if pipe closed
870 if (!mAvailable && NS_FAILED(mPipe->mStatus))
871 return mPipe->mStatus;
873 *offset = mLogicalOffset;
874 return NS_OK;
875 }
877 NS_IMETHODIMP
878 nsPipeInputStream::SetEOF()
879 {
880 NS_NOTREACHED("nsPipeInputStream::SetEOF");
881 return NS_ERROR_NOT_IMPLEMENTED;
882 }
884 #define COMPARE(s1, s2, i) \
885 (ignoreCase \
886 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \
887 : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i))
889 NS_IMETHODIMP
890 nsPipeInputStream::Search(const char *forString,
891 bool ignoreCase,
892 bool *found,
893 uint32_t *offsetSearchedTo)
894 {
895 LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase));
897 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
899 char *cursor1, *limit1;
900 uint32_t index = 0, offset = 0;
901 uint32_t strLen = strlen(forString);
903 mPipe->PeekSegment(0, cursor1, limit1);
904 if (cursor1 == limit1) {
905 *found = false;
906 *offsetSearchedTo = 0;
907 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
908 return NS_OK;
909 }
911 while (true) {
912 uint32_t i, len1 = limit1 - cursor1;
914 // check if the string is in the buffer segment
915 for (i = 0; i < len1 - strLen + 1; i++) {
916 if (COMPARE(&cursor1[i], forString, strLen) == 0) {
917 *found = true;
918 *offsetSearchedTo = offset + i;
919 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
920 return NS_OK;
921 }
922 }
924 // get the next segment
925 char *cursor2, *limit2;
926 uint32_t len2;
928 index++;
929 offset += len1;
931 mPipe->PeekSegment(index, cursor2, limit2);
932 if (cursor2 == limit2) {
933 *found = false;
934 *offsetSearchedTo = offset - strLen + 1;
935 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
936 return NS_OK;
937 }
938 len2 = limit2 - cursor2;
940 // check if the string is straddling the next buffer segment
941 uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
942 for (i = 0; i < lim; ++i) {
943 uint32_t strPart1Len = strLen - i - 1;
944 uint32_t strPart2Len = strLen - strPart1Len;
945 const char* strPart2 = &forString[strLen - strPart2Len];
946 uint32_t bufSeg1Offset = len1 - strPart1Len;
947 if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 &&
948 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
949 *found = true;
950 *offsetSearchedTo = offset - strPart1Len;
951 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
952 return NS_OK;
953 }
954 }
956 // finally continue with the next buffer
957 cursor1 = cursor2;
958 limit1 = limit2;
959 }
961 NS_NOTREACHED("can't get here");
962 return NS_ERROR_UNEXPECTED; // keep compiler happy
963 }
965 //-----------------------------------------------------------------------------
966 // nsPipeOutputStream methods:
967 //-----------------------------------------------------------------------------
969 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
970 nsIOutputStream,
971 nsIAsyncOutputStream,
972 nsIClassInfo)
974 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
975 nsIOutputStream,
976 nsIAsyncOutputStream)
978 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
980 nsresult
981 nsPipeOutputStream::Wait()
982 {
983 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
985 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
987 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
988 LOG(("OOO pipe output: waiting for space\n"));
989 mBlocked = true;
990 mon.Wait();
991 mBlocked = false;
992 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
993 mPipe->mStatus, mWritable));
994 }
996 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
997 }
999 bool
1000 nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events)
1001 {
1002 bool result = false;
1004 mWritable = true;
1006 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1007 events.NotifyOutputReady(this, mCallback);
1008 mCallback = 0;
1009 mCallbackFlags = 0;
1010 }
1011 else if (mBlocked)
1012 result = true;
1014 return result;
1015 }
1017 bool
1018 nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events)
1019 {
1020 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
1021 this, reason));
1023 bool result = false;
1025 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
1026 mWritable = false;
1028 if (mCallback) {
1029 events.NotifyOutputReady(this, mCallback);
1030 mCallback = 0;
1031 mCallbackFlags = 0;
1032 }
1033 else if (mBlocked)
1034 result = true;
1036 return result;
1037 }
1040 NS_IMETHODIMP_(MozExternalRefCountType)
1041 nsPipeOutputStream::AddRef()
1042 {
1043 ++mWriterRefCnt;
1044 return mPipe->AddRef();
1045 }
1047 NS_IMETHODIMP_(MozExternalRefCountType)
1048 nsPipeOutputStream::Release()
1049 {
1050 if (--mWriterRefCnt == 0)
1051 Close();
1052 return mPipe->Release();
1053 }
1055 NS_IMETHODIMP
1056 nsPipeOutputStream::CloseWithStatus(nsresult reason)
1057 {
1058 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason));
1060 if (NS_SUCCEEDED(reason))
1061 reason = NS_BASE_STREAM_CLOSED;
1063 // input stream may remain open
1064 mPipe->OnPipeException(reason, true);
1065 return NS_OK;
1066 }
1068 NS_IMETHODIMP
1069 nsPipeOutputStream::Close()
1070 {
1071 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1072 }
1074 NS_IMETHODIMP
1075 nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader,
1076 void* closure,
1077 uint32_t count,
1078 uint32_t *writeCount)
1079 {
1080 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count));
1082 nsresult rv = NS_OK;
1084 char *segment;
1085 uint32_t segmentLen;
1087 *writeCount = 0;
1088 while (count) {
1089 rv = mPipe->GetWriteSegment(segment, segmentLen);
1090 if (NS_FAILED(rv)) {
1091 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1092 // pipe is full
1093 if (!mBlocking) {
1094 // ignore this error if we've already written something
1095 if (*writeCount > 0)
1096 rv = NS_OK;
1097 break;
1098 }
1099 // wait for the pipe to have an empty segment.
1100 rv = Wait();
1101 if (NS_SUCCEEDED(rv))
1102 continue;
1103 }
1104 mPipe->OnPipeException(rv);
1105 break;
1106 }
1108 // write no more than count
1109 if (segmentLen > count)
1110 segmentLen = count;
1112 uint32_t readCount, originalLen = segmentLen;
1113 while (segmentLen) {
1114 readCount = 0;
1116 rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount);
1118 if (NS_FAILED(rv) || readCount == 0) {
1119 count = 0;
1120 // any errors returned from the reader end here: do not
1121 // propagate to the caller of WriteSegments.
1122 rv = NS_OK;
1123 break;
1124 }
1126 NS_ASSERTION(readCount <= segmentLen, "read more than expected");
1127 segment += readCount;
1128 segmentLen -= readCount;
1129 count -= readCount;
1130 *writeCount += readCount;
1131 mLogicalOffset += readCount;
1132 }
1134 if (segmentLen < originalLen)
1135 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1136 }
1138 return rv;
1139 }
1141 static NS_METHOD
1142 nsReadFromRawBuffer(nsIOutputStream* outStr,
1143 void* closure,
1144 char* toRawSegment,
1145 uint32_t offset,
1146 uint32_t count,
1147 uint32_t *readCount)
1148 {
1149 const char* fromBuf = (const char*)closure;
1150 memcpy(toRawSegment, &fromBuf[offset], count);
1151 *readCount = count;
1152 return NS_OK;
1153 }
1155 NS_IMETHODIMP
1156 nsPipeOutputStream::Write(const char* fromBuf,
1157 uint32_t bufLen,
1158 uint32_t *writeCount)
1159 {
1160 return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount);
1161 }
1163 NS_IMETHODIMP
1164 nsPipeOutputStream::Flush(void)
1165 {
1166 // nothing to do
1167 return NS_OK;
1168 }
1170 static NS_METHOD
1171 nsReadFromInputStream(nsIOutputStream* outStr,
1172 void* closure,
1173 char* toRawSegment,
1174 uint32_t offset,
1175 uint32_t count,
1176 uint32_t *readCount)
1177 {
1178 nsIInputStream* fromStream = (nsIInputStream*)closure;
1179 return fromStream->Read(toRawSegment, count, readCount);
1180 }
1182 NS_IMETHODIMP
1183 nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream,
1184 uint32_t count,
1185 uint32_t *writeCount)
1186 {
1187 return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount);
1188 }
1190 NS_IMETHODIMP
1191 nsPipeOutputStream::IsNonBlocking(bool *aNonBlocking)
1192 {
1193 *aNonBlocking = !mBlocking;
1194 return NS_OK;
1195 }
1197 NS_IMETHODIMP
1198 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
1199 uint32_t flags,
1200 uint32_t requestedCount,
1201 nsIEventTarget *target)
1202 {
1203 LOG(("OOO AsyncWait [this=%x]\n", this));
1205 nsPipeEvents pipeEvents;
1206 {
1207 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1209 // replace a pending callback
1210 mCallback = 0;
1211 mCallbackFlags = 0;
1213 if (!callback)
1214 return NS_OK;
1216 nsCOMPtr<nsIOutputStreamCallback> proxy;
1217 if (target) {
1218 proxy = NS_NewOutputStreamReadyEvent(callback, target);
1219 callback = proxy;
1220 }
1222 if (NS_FAILED(mPipe->mStatus) ||
1223 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) {
1224 // stream is already closed or writable; post event.
1225 pipeEvents.NotifyOutputReady(this, callback);
1226 }
1227 else {
1228 // queue up callback object to be notified when data becomes available
1229 mCallback = callback;
1230 mCallbackFlags = flags;
1231 }
1232 }
1233 return NS_OK;
1234 }
1236 ////////////////////////////////////////////////////////////////////////////////
1238 nsresult
1239 NS_NewPipe(nsIInputStream **pipeIn,
1240 nsIOutputStream **pipeOut,
1241 uint32_t segmentSize,
1242 uint32_t maxSize,
1243 bool nonBlockingInput,
1244 bool nonBlockingOutput)
1245 {
1246 if (segmentSize == 0)
1247 segmentSize = DEFAULT_SEGMENT_SIZE;
1249 // Handle maxSize of UINT32_MAX as a special case
1250 uint32_t segmentCount;
1251 if (maxSize == UINT32_MAX)
1252 segmentCount = UINT32_MAX;
1253 else
1254 segmentCount = maxSize / segmentSize;
1256 nsIAsyncInputStream *in;
1257 nsIAsyncOutputStream *out;
1258 nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput,
1259 segmentSize, segmentCount);
1260 if (NS_FAILED(rv)) return rv;
1262 *pipeIn = in;
1263 *pipeOut = out;
1264 return NS_OK;
1265 }
1267 nsresult
1268 NS_NewPipe2(nsIAsyncInputStream **pipeIn,
1269 nsIAsyncOutputStream **pipeOut,
1270 bool nonBlockingInput,
1271 bool nonBlockingOutput,
1272 uint32_t segmentSize,
1273 uint32_t segmentCount)
1274 {
1275 nsresult rv;
1277 nsPipe *pipe = new nsPipe();
1278 if (!pipe)
1279 return NS_ERROR_OUT_OF_MEMORY;
1281 rv = pipe->Init(nonBlockingInput,
1282 nonBlockingOutput,
1283 segmentSize,
1284 segmentCount);
1285 if (NS_FAILED(rv)) {
1286 NS_ADDREF(pipe);
1287 NS_RELEASE(pipe);
1288 return rv;
1289 }
1291 pipe->GetInputStream(pipeIn);
1292 pipe->GetOutputStream(pipeOut);
1293 return NS_OK;
1294 }
1296 nsresult
1297 nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result)
1298 {
1299 if (outer)
1300 return NS_ERROR_NO_AGGREGATION;
1301 nsPipe *pipe = new nsPipe();
1302 if (!pipe)
1303 return NS_ERROR_OUT_OF_MEMORY;
1304 NS_ADDREF(pipe);
1305 nsresult rv = pipe->QueryInterface(iid, result);
1306 NS_RELEASE(pipe);
1307 return rv;
1308 }
1310 ////////////////////////////////////////////////////////////////////////////////