|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 #include "mozilla/Attributes.h" |
|
6 #include "mozilla/ReentrantMonitor.h" |
|
7 #include "nsIPipe.h" |
|
8 #include "nsIEventTarget.h" |
|
9 #include "nsISeekableStream.h" |
|
10 #include "nsIProgrammingLanguage.h" |
|
11 #include "nsSegmentedBuffer.h" |
|
12 #include "nsStreamUtils.h" |
|
13 #include "nsCOMPtr.h" |
|
14 #include "nsCRT.h" |
|
15 #include "prlog.h" |
|
16 #include "nsIClassInfoImpl.h" |
|
17 #include "nsAlgorithm.h" |
|
18 #include "nsMemory.h" |
|
19 #include "nsIAsyncInputStream.h" |
|
20 #include "nsIAsyncOutputStream.h" |
|
21 |
|
22 using namespace mozilla; |
|
23 |
|
24 #ifdef LOG |
|
25 #undef LOG |
|
26 #endif |
|
27 #if defined(PR_LOGGING) |
|
28 // |
|
29 // set NSPR_LOG_MODULES=nsPipe:5 |
|
30 // |
|
31 static PRLogModuleInfo * |
|
32 GetPipeLog() |
|
33 { |
|
34 static PRLogModuleInfo *sLog; |
|
35 if (!sLog) |
|
36 sLog = PR_NewLogModule("nsPipe"); |
|
37 return sLog; |
|
38 } |
|
39 #define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args) |
|
40 #else |
|
41 #define LOG(args) |
|
42 #endif |
|
43 |
|
44 #define DEFAULT_SEGMENT_SIZE 4096 |
|
45 #define DEFAULT_SEGMENT_COUNT 16 |
|
46 |
|
47 class nsPipe; |
|
48 class nsPipeEvents; |
|
49 class nsPipeInputStream; |
|
50 class nsPipeOutputStream; |
|
51 |
|
52 //----------------------------------------------------------------------------- |
|
53 |
|
54 // this class is used to delay notifications until the end of a particular |
|
55 // scope. it helps avoid the complexity of issuing callbacks while inside |
|
56 // a critical section. |
|
57 class nsPipeEvents |
|
58 { |
|
59 public: |
|
60 nsPipeEvents() { } |
|
61 ~nsPipeEvents(); |
|
62 |
|
63 inline void NotifyInputReady(nsIAsyncInputStream *stream, |
|
64 nsIInputStreamCallback *callback) |
|
65 { |
|
66 NS_ASSERTION(!mInputCallback, "already have an input event"); |
|
67 mInputStream = stream; |
|
68 mInputCallback = callback; |
|
69 } |
|
70 |
|
71 inline void NotifyOutputReady(nsIAsyncOutputStream *stream, |
|
72 nsIOutputStreamCallback *callback) |
|
73 { |
|
74 NS_ASSERTION(!mOutputCallback, "already have an output event"); |
|
75 mOutputStream = stream; |
|
76 mOutputCallback = callback; |
|
77 } |
|
78 |
|
79 private: |
|
80 nsCOMPtr<nsIAsyncInputStream> mInputStream; |
|
81 nsCOMPtr<nsIInputStreamCallback> mInputCallback; |
|
82 nsCOMPtr<nsIAsyncOutputStream> mOutputStream; |
|
83 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback; |
|
84 }; |
|
85 |
|
86 //----------------------------------------------------------------------------- |
|
87 |
|
88 // the input end of a pipe (allocated as a member of the pipe). |
|
89 class nsPipeInputStream : public nsIAsyncInputStream |
|
90 , public nsISeekableStream |
|
91 , public nsISearchableInputStream |
|
92 , public nsIClassInfo |
|
93 { |
|
94 public: |
|
95 // since this class will be allocated as a member of the pipe, we do not |
|
96 // need our own ref count. instead, we share the lifetime (the ref count) |
|
97 // of the entire pipe. this macro is just convenience since it does not |
|
98 // declare a mRefCount variable; however, don't let the name fool you... |
|
99 // we are not inheriting from nsPipe ;-) |
|
100 NS_DECL_ISUPPORTS_INHERITED |
|
101 |
|
102 NS_DECL_NSIINPUTSTREAM |
|
103 NS_DECL_NSIASYNCINPUTSTREAM |
|
104 NS_DECL_NSISEEKABLESTREAM |
|
105 NS_DECL_NSISEARCHABLEINPUTSTREAM |
|
106 NS_DECL_NSICLASSINFO |
|
107 |
|
108 nsPipeInputStream(nsPipe *pipe) |
|
109 : mPipe(pipe) |
|
110 , mReaderRefCnt(0) |
|
111 , mLogicalOffset(0) |
|
112 , mBlocking(true) |
|
113 , mBlocked(false) |
|
114 , mAvailable(0) |
|
115 , mCallbackFlags(0) |
|
116 { } |
|
117 |
|
118 nsresult Fill(); |
|
119 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } |
|
120 |
|
121 uint32_t Available() { return mAvailable; } |
|
122 void ReduceAvailable(uint32_t avail) { mAvailable -= avail; } |
|
123 |
|
124 // synchronously wait for the pipe to become readable. |
|
125 nsresult Wait(); |
|
126 |
|
127 // these functions return true to indicate that the pipe's monitor should |
|
128 // be notified, to wake up a blocked reader if any. |
|
129 bool OnInputReadable(uint32_t bytesWritten, nsPipeEvents &); |
|
130 bool OnInputException(nsresult, nsPipeEvents &); |
|
131 |
|
132 private: |
|
133 nsPipe *mPipe; |
|
134 |
|
135 // separate refcnt so that we know when to close the consumer |
|
136 mozilla::ThreadSafeAutoRefCnt mReaderRefCnt; |
|
137 int64_t mLogicalOffset; |
|
138 bool mBlocking; |
|
139 |
|
140 // these variables can only be accessed while inside the pipe's monitor |
|
141 bool mBlocked; |
|
142 uint32_t mAvailable; |
|
143 nsCOMPtr<nsIInputStreamCallback> mCallback; |
|
144 uint32_t mCallbackFlags; |
|
145 }; |
|
146 |
|
147 //----------------------------------------------------------------------------- |
|
148 |
|
149 // the output end of a pipe (allocated as a member of the pipe). |
|
150 class nsPipeOutputStream : public nsIAsyncOutputStream |
|
151 , public nsIClassInfo |
|
152 { |
|
153 public: |
|
154 // since this class will be allocated as a member of the pipe, we do not |
|
155 // need our own ref count. instead, we share the lifetime (the ref count) |
|
156 // of the entire pipe. this macro is just convenience since it does not |
|
157 // declare a mRefCount variable; however, don't let the name fool you... |
|
158 // we are not inheriting from nsPipe ;-) |
|
159 NS_DECL_ISUPPORTS_INHERITED |
|
160 |
|
161 NS_DECL_NSIOUTPUTSTREAM |
|
162 NS_DECL_NSIASYNCOUTPUTSTREAM |
|
163 NS_DECL_NSICLASSINFO |
|
164 |
|
165 nsPipeOutputStream(nsPipe *pipe) |
|
166 : mPipe(pipe) |
|
167 , mWriterRefCnt(0) |
|
168 , mLogicalOffset(0) |
|
169 , mBlocking(true) |
|
170 , mBlocked(false) |
|
171 , mWritable(true) |
|
172 , mCallbackFlags(0) |
|
173 { } |
|
174 |
|
175 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; } |
|
176 void SetWritable(bool writable) { mWritable = writable; } |
|
177 |
|
178 // synchronously wait for the pipe to become writable. |
|
179 nsresult Wait(); |
|
180 |
|
181 // these functions return true to indicate that the pipe's monitor should |
|
182 // be notified, to wake up a blocked writer if any. |
|
183 bool OnOutputWritable(nsPipeEvents &); |
|
184 bool OnOutputException(nsresult, nsPipeEvents &); |
|
185 |
|
186 private: |
|
187 nsPipe *mPipe; |
|
188 |
|
189 // separate refcnt so that we know when to close the producer |
|
190 mozilla::ThreadSafeAutoRefCnt mWriterRefCnt; |
|
191 int64_t mLogicalOffset; |
|
192 bool mBlocking; |
|
193 |
|
194 // these variables can only be accessed while inside the pipe's monitor |
|
195 bool mBlocked; |
|
196 bool mWritable; |
|
197 nsCOMPtr<nsIOutputStreamCallback> mCallback; |
|
198 uint32_t mCallbackFlags; |
|
199 }; |
|
200 |
|
201 //----------------------------------------------------------------------------- |
|
202 |
|
203 class nsPipe MOZ_FINAL : public nsIPipe |
|
204 { |
|
205 public: |
|
206 friend class nsPipeInputStream; |
|
207 friend class nsPipeOutputStream; |
|
208 |
|
209 NS_DECL_THREADSAFE_ISUPPORTS |
|
210 NS_DECL_NSIPIPE |
|
211 |
|
212 // nsPipe methods: |
|
213 nsPipe(); |
|
214 |
|
215 private: |
|
216 ~nsPipe(); |
|
217 |
|
218 public: |
|
219 // |
|
220 // methods below may only be called while inside the pipe's monitor |
|
221 // |
|
222 |
|
223 void PeekSegment(uint32_t n, char *&cursor, char *&limit); |
|
224 |
|
225 // |
|
226 // methods below may be called while outside the pipe's monitor |
|
227 // |
|
228 |
|
229 nsresult GetReadSegment(const char *&segment, uint32_t &segmentLen); |
|
230 void AdvanceReadCursor(uint32_t count); |
|
231 |
|
232 nsresult GetWriteSegment(char *&segment, uint32_t &segmentLen); |
|
233 void AdvanceWriteCursor(uint32_t count); |
|
234 |
|
235 void OnPipeException(nsresult reason, bool outputOnly = false); |
|
236 |
|
237 protected: |
|
238 // We can't inherit from both nsIInputStream and nsIOutputStream |
|
239 // because they collide on their Close method. Consequently we nest their |
|
240 // implementations to avoid the extra object allocation. |
|
241 nsPipeInputStream mInput; |
|
242 nsPipeOutputStream mOutput; |
|
243 |
|
244 ReentrantMonitor mReentrantMonitor; |
|
245 nsSegmentedBuffer mBuffer; |
|
246 |
|
247 char* mReadCursor; |
|
248 char* mReadLimit; |
|
249 |
|
250 int32_t mWriteSegment; |
|
251 char* mWriteCursor; |
|
252 char* mWriteLimit; |
|
253 |
|
254 nsresult mStatus; |
|
255 bool mInited; |
|
256 }; |
|
257 |
|
258 // |
|
259 // NOTES on buffer architecture: |
|
260 // |
|
261 // +-----------------+ - - mBuffer.GetSegment(0) |
|
262 // | | |
|
263 // + - - - - - - - - + - - mReadCursor |
|
264 // |/////////////////| |
|
265 // |/////////////////| |
|
266 // |/////////////////| |
|
267 // |/////////////////| |
|
268 // +-----------------+ - - mReadLimit |
|
269 // | |
|
270 // +-----------------+ |
|
271 // |/////////////////| |
|
272 // |/////////////////| |
|
273 // |/////////////////| |
|
274 // |/////////////////| |
|
275 // |/////////////////| |
|
276 // |/////////////////| |
|
277 // +-----------------+ |
|
278 // | |
|
279 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment) |
|
280 // |/////////////////| |
|
281 // |/////////////////| |
|
282 // |/////////////////| |
|
283 // + - - - - - - - - + - - mWriteCursor |
|
284 // | | |
|
285 // | | |
|
286 // +-----------------+ - - mWriteLimit |
|
287 // |
|
288 // (shaded region contains data) |
|
289 // |
|
290 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for |
|
291 // small allocations (e.g., 64 byte allocations). this means that buffers may |
|
292 // be allocated back-to-back. in the diagram above, for example, mReadLimit |
|
293 // would actually be pointing at the beginning of the next segment. when |
|
294 // making changes to this file, please keep this fact in mind. |
|
295 // |
|
296 |
|
297 //----------------------------------------------------------------------------- |
|
298 // nsPipe methods: |
|
299 //----------------------------------------------------------------------------- |
|
300 |
|
301 nsPipe::nsPipe() |
|
302 : mInput(MOZ_THIS_IN_INITIALIZER_LIST()) |
|
303 , mOutput(MOZ_THIS_IN_INITIALIZER_LIST()) |
|
304 , mReentrantMonitor("nsPipe.mReentrantMonitor") |
|
305 , mReadCursor(nullptr) |
|
306 , mReadLimit(nullptr) |
|
307 , mWriteSegment(-1) |
|
308 , mWriteCursor(nullptr) |
|
309 , mWriteLimit(nullptr) |
|
310 , mStatus(NS_OK) |
|
311 , mInited(false) |
|
312 { |
|
313 } |
|
314 |
|
315 nsPipe::~nsPipe() |
|
316 { |
|
317 } |
|
318 |
|
319 NS_IMPL_ISUPPORTS(nsPipe, nsIPipe) |
|
320 |
|
321 NS_IMETHODIMP |
|
322 nsPipe::Init(bool nonBlockingIn, |
|
323 bool nonBlockingOut, |
|
324 uint32_t segmentSize, |
|
325 uint32_t segmentCount) |
|
326 { |
|
327 mInited = true; |
|
328 |
|
329 if (segmentSize == 0) |
|
330 segmentSize = DEFAULT_SEGMENT_SIZE; |
|
331 if (segmentCount == 0) |
|
332 segmentCount = DEFAULT_SEGMENT_COUNT; |
|
333 |
|
334 // protect against overflow |
|
335 uint32_t maxCount = uint32_t(-1) / segmentSize; |
|
336 if (segmentCount > maxCount) |
|
337 segmentCount = maxCount; |
|
338 |
|
339 nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount); |
|
340 if (NS_FAILED(rv)) |
|
341 return rv; |
|
342 |
|
343 mInput.SetNonBlocking(nonBlockingIn); |
|
344 mOutput.SetNonBlocking(nonBlockingOut); |
|
345 return NS_OK; |
|
346 } |
|
347 |
|
348 NS_IMETHODIMP |
|
349 nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream) |
|
350 { |
|
351 NS_ADDREF(*aInputStream = &mInput); |
|
352 return NS_OK; |
|
353 } |
|
354 |
|
355 NS_IMETHODIMP |
|
356 nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream) |
|
357 { |
|
358 if (NS_WARN_IF(!mInited)) |
|
359 return NS_ERROR_NOT_INITIALIZED; |
|
360 NS_ADDREF(*aOutputStream = &mOutput); |
|
361 return NS_OK; |
|
362 } |
|
363 |
|
364 void |
|
365 nsPipe::PeekSegment(uint32_t index, char *&cursor, char *&limit) |
|
366 { |
|
367 if (index == 0) { |
|
368 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state"); |
|
369 cursor = mReadCursor; |
|
370 limit = mReadLimit; |
|
371 } |
|
372 else { |
|
373 uint32_t numSegments = mBuffer.GetSegmentCount(); |
|
374 if (index >= numSegments) |
|
375 cursor = limit = nullptr; |
|
376 else { |
|
377 cursor = mBuffer.GetSegment(index); |
|
378 if (mWriteSegment == (int32_t) index) |
|
379 limit = mWriteCursor; |
|
380 else |
|
381 limit = cursor + mBuffer.GetSegmentSize(); |
|
382 } |
|
383 } |
|
384 } |
|
385 |
|
386 nsresult |
|
387 nsPipe::GetReadSegment(const char *&segment, uint32_t &segmentLen) |
|
388 { |
|
389 ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
|
390 |
|
391 if (mReadCursor == mReadLimit) |
|
392 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; |
|
393 |
|
394 segment = mReadCursor; |
|
395 segmentLen = mReadLimit - mReadCursor; |
|
396 return NS_OK; |
|
397 } |
|
398 |
|
399 void |
|
400 nsPipe::AdvanceReadCursor(uint32_t bytesRead) |
|
401 { |
|
402 NS_ASSERTION(bytesRead, "don't call if no bytes read"); |
|
403 |
|
404 nsPipeEvents events; |
|
405 { |
|
406 ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
|
407 |
|
408 LOG(("III advancing read cursor by %u\n", bytesRead)); |
|
409 NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much"); |
|
410 |
|
411 mReadCursor += bytesRead; |
|
412 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit"); |
|
413 |
|
414 mInput.ReduceAvailable(bytesRead); |
|
415 |
|
416 if (mReadCursor == mReadLimit) { |
|
417 // we've reached the limit of how much we can read from this segment. |
|
418 // if at the end of this segment, then we must discard this segment. |
|
419 |
|
420 // if still writing in this segment then bail because we're not done |
|
421 // with the segment and have to wait for now... |
|
422 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) { |
|
423 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state"); |
|
424 return; |
|
425 } |
|
426 |
|
427 // shift write segment index (-1 indicates an empty buffer). |
|
428 --mWriteSegment; |
|
429 |
|
430 // done with this segment |
|
431 mBuffer.DeleteFirstSegment(); |
|
432 LOG(("III deleting first segment\n")); |
|
433 |
|
434 if (mWriteSegment == -1) { |
|
435 // buffer is completely empty |
|
436 mReadCursor = nullptr; |
|
437 mReadLimit = nullptr; |
|
438 mWriteCursor = nullptr; |
|
439 mWriteLimit = nullptr; |
|
440 } |
|
441 else { |
|
442 // advance read cursor and limit to next buffer segment |
|
443 mReadCursor = mBuffer.GetSegment(0); |
|
444 if (mWriteSegment == 0) |
|
445 mReadLimit = mWriteCursor; |
|
446 else |
|
447 mReadLimit = mReadCursor + mBuffer.GetSegmentSize(); |
|
448 } |
|
449 |
|
450 // we've free'd up a segment, so notify output stream that pipe has |
|
451 // room for a new segment. |
|
452 if (mOutput.OnOutputWritable(events)) |
|
453 mon.Notify(); |
|
454 } |
|
455 } |
|
456 } |
|
457 |
|
458 nsresult |
|
459 nsPipe::GetWriteSegment(char *&segment, uint32_t &segmentLen) |
|
460 { |
|
461 ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
|
462 |
|
463 if (NS_FAILED(mStatus)) |
|
464 return mStatus; |
|
465 |
|
466 // write cursor and limit may both be null indicating an empty buffer. |
|
467 if (mWriteCursor == mWriteLimit) { |
|
468 char *seg = mBuffer.AppendNewSegment(); |
|
469 // pipe is full |
|
470 if (seg == nullptr) |
|
471 return NS_BASE_STREAM_WOULD_BLOCK; |
|
472 LOG(("OOO appended new segment\n")); |
|
473 mWriteCursor = seg; |
|
474 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); |
|
475 ++mWriteSegment; |
|
476 } |
|
477 |
|
478 // make sure read cursor is initialized |
|
479 if (mReadCursor == nullptr) { |
|
480 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor"); |
|
481 mReadCursor = mReadLimit = mWriteCursor; |
|
482 } |
|
483 |
|
484 // check to see if we can roll-back our read and write cursors to the |
|
485 // beginning of the current/first segment. this is purely an optimization. |
|
486 if (mReadCursor == mWriteCursor && mWriteSegment == 0) { |
|
487 char *head = mBuffer.GetSegment(0); |
|
488 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head)); |
|
489 mWriteCursor = mReadCursor = mReadLimit = head; |
|
490 } |
|
491 |
|
492 segment = mWriteCursor; |
|
493 segmentLen = mWriteLimit - mWriteCursor; |
|
494 return NS_OK; |
|
495 } |
|
496 |
|
497 void |
|
498 nsPipe::AdvanceWriteCursor(uint32_t bytesWritten) |
|
499 { |
|
500 NS_ASSERTION(bytesWritten, "don't call if no bytes written"); |
|
501 |
|
502 nsPipeEvents events; |
|
503 { |
|
504 ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
|
505 |
|
506 LOG(("OOO advancing write cursor by %u\n", bytesWritten)); |
|
507 |
|
508 char *newWriteCursor = mWriteCursor + bytesWritten; |
|
509 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit"); |
|
510 |
|
511 // update read limit if reading in the same segment |
|
512 if (mWriteSegment == 0 && mReadLimit == mWriteCursor) |
|
513 mReadLimit = newWriteCursor; |
|
514 |
|
515 mWriteCursor = newWriteCursor; |
|
516 |
|
517 // The only way mReadCursor == mWriteCursor is if: |
|
518 // |
|
519 // - mReadCursor is at the start of a segment (which, based on how |
|
520 // nsSegmentedBuffer works, means that this segment is the "first" |
|
521 // segment) |
|
522 // - mWriteCursor points at the location past the end of the current |
|
523 // write segment (so the current write filled the current write |
|
524 // segment, so we've incremented mWriteCursor to point past the end |
|
525 // of it) |
|
526 // - the segment to which data has just been written is located |
|
527 // exactly one segment's worth of bytes before the first segment |
|
528 // where mReadCursor is located |
|
529 // |
|
530 // Consequently, the byte immediately after the end of the current |
|
531 // write segment is the first byte of the first segment, so |
|
532 // mReadCursor == mWriteCursor. (Another way to think about this is |
|
533 // to consider the buffer architecture diagram above, but consider it |
|
534 // with an arena allocator which allocates from the *end* of the |
|
535 // arena to the *beginning* of the arena.) |
|
536 NS_ASSERTION(mReadCursor != mWriteCursor || |
|
537 (mBuffer.GetSegment(0) == mReadCursor && |
|
538 mWriteCursor == mWriteLimit), |
|
539 "read cursor is bad"); |
|
540 |
|
541 // update the writable flag on the output stream |
|
542 if (mWriteCursor == mWriteLimit) { |
|
543 if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) |
|
544 mOutput.SetWritable(false); |
|
545 } |
|
546 |
|
547 // notify input stream that pipe now contains additional data |
|
548 if (mInput.OnInputReadable(bytesWritten, events)) |
|
549 mon.Notify(); |
|
550 } |
|
551 } |
|
552 |
|
553 void |
|
554 nsPipe::OnPipeException(nsresult reason, bool outputOnly) |
|
555 { |
|
556 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n", |
|
557 reason, outputOnly)); |
|
558 |
|
559 nsPipeEvents events; |
|
560 { |
|
561 ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
|
562 |
|
563 // if we've already hit an exception, then ignore this one. |
|
564 if (NS_FAILED(mStatus)) |
|
565 return; |
|
566 |
|
567 mStatus = reason; |
|
568 |
|
569 // an output-only exception applies to the input end if the pipe has |
|
570 // zero bytes available. |
|
571 if (outputOnly && !mInput.Available()) |
|
572 outputOnly = false; |
|
573 |
|
574 if (!outputOnly) |
|
575 if (mInput.OnInputException(reason, events)) |
|
576 mon.Notify(); |
|
577 |
|
578 if (mOutput.OnOutputException(reason, events)) |
|
579 mon.Notify(); |
|
580 } |
|
581 } |
|
582 |
|
583 //----------------------------------------------------------------------------- |
|
584 // nsPipeEvents methods: |
|
585 //----------------------------------------------------------------------------- |
|
586 |
|
587 nsPipeEvents::~nsPipeEvents() |
|
588 { |
|
589 // dispatch any pending events |
|
590 |
|
591 if (mInputCallback) { |
|
592 mInputCallback->OnInputStreamReady(mInputStream); |
|
593 mInputCallback = 0; |
|
594 mInputStream = 0; |
|
595 } |
|
596 if (mOutputCallback) { |
|
597 mOutputCallback->OnOutputStreamReady(mOutputStream); |
|
598 mOutputCallback = 0; |
|
599 mOutputStream = 0; |
|
600 } |
|
601 } |
|
602 |
|
603 //----------------------------------------------------------------------------- |
|
604 // nsPipeInputStream methods: |
|
605 //----------------------------------------------------------------------------- |
|
606 |
|
607 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream, |
|
608 nsIInputStream, |
|
609 nsIAsyncInputStream, |
|
610 nsISeekableStream, |
|
611 nsISearchableInputStream, |
|
612 nsIClassInfo) |
|
613 |
|
614 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, |
|
615 nsIInputStream, |
|
616 nsIAsyncInputStream, |
|
617 nsISeekableStream, |
|
618 nsISearchableInputStream) |
|
619 |
|
620 NS_IMPL_THREADSAFE_CI(nsPipeInputStream) |
|
621 |
|
622 nsresult |
|
623 nsPipeInputStream::Wait() |
|
624 { |
|
625 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream"); |
|
626 |
|
627 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
628 |
|
629 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) { |
|
630 LOG(("III pipe input: waiting for data\n")); |
|
631 |
|
632 mBlocked = true; |
|
633 mon.Wait(); |
|
634 mBlocked = false; |
|
635 |
|
636 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n", |
|
637 mPipe->mStatus, mAvailable)); |
|
638 } |
|
639 |
|
640 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; |
|
641 } |
|
642 |
|
643 bool |
|
644 nsPipeInputStream::OnInputReadable(uint32_t bytesWritten, nsPipeEvents &events) |
|
645 { |
|
646 bool result = false; |
|
647 |
|
648 mAvailable += bytesWritten; |
|
649 |
|
650 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { |
|
651 events.NotifyInputReady(this, mCallback); |
|
652 mCallback = 0; |
|
653 mCallbackFlags = 0; |
|
654 } |
|
655 else if (mBlocked) |
|
656 result = true; |
|
657 |
|
658 return result; |
|
659 } |
|
660 |
|
661 bool |
|
662 nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events) |
|
663 { |
|
664 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", |
|
665 this, reason)); |
|
666 |
|
667 bool result = false; |
|
668 |
|
669 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); |
|
670 |
|
671 // force count of available bytes to zero. |
|
672 mAvailable = 0; |
|
673 |
|
674 if (mCallback) { |
|
675 events.NotifyInputReady(this, mCallback); |
|
676 mCallback = 0; |
|
677 mCallbackFlags = 0; |
|
678 } |
|
679 else if (mBlocked) |
|
680 result = true; |
|
681 |
|
682 return result; |
|
683 } |
|
684 |
|
685 NS_IMETHODIMP_(MozExternalRefCountType) |
|
686 nsPipeInputStream::AddRef(void) |
|
687 { |
|
688 ++mReaderRefCnt; |
|
689 return mPipe->AddRef(); |
|
690 } |
|
691 |
|
692 NS_IMETHODIMP_(MozExternalRefCountType) |
|
693 nsPipeInputStream::Release(void) |
|
694 { |
|
695 if (--mReaderRefCnt == 0) |
|
696 Close(); |
|
697 return mPipe->Release(); |
|
698 } |
|
699 |
|
700 NS_IMETHODIMP |
|
701 nsPipeInputStream::CloseWithStatus(nsresult reason) |
|
702 { |
|
703 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason)); |
|
704 |
|
705 if (NS_SUCCEEDED(reason)) |
|
706 reason = NS_BASE_STREAM_CLOSED; |
|
707 |
|
708 mPipe->OnPipeException(reason); |
|
709 return NS_OK; |
|
710 } |
|
711 |
|
712 NS_IMETHODIMP |
|
713 nsPipeInputStream::Close() |
|
714 { |
|
715 return CloseWithStatus(NS_BASE_STREAM_CLOSED); |
|
716 } |
|
717 |
|
718 NS_IMETHODIMP |
|
719 nsPipeInputStream::Available(uint64_t *result) |
|
720 { |
|
721 // nsPipeInputStream supports under 4GB stream only |
|
722 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
723 |
|
724 // return error if pipe closed |
|
725 if (!mAvailable && NS_FAILED(mPipe->mStatus)) |
|
726 return mPipe->mStatus; |
|
727 |
|
728 *result = (uint64_t)mAvailable; |
|
729 return NS_OK; |
|
730 } |
|
731 |
|
732 NS_IMETHODIMP |
|
733 nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer, |
|
734 void *closure, |
|
735 uint32_t count, |
|
736 uint32_t *readCount) |
|
737 { |
|
738 LOG(("III ReadSegments [this=%x count=%u]\n", this, count)); |
|
739 |
|
740 nsresult rv = NS_OK; |
|
741 |
|
742 const char *segment; |
|
743 uint32_t segmentLen; |
|
744 |
|
745 *readCount = 0; |
|
746 while (count) { |
|
747 rv = mPipe->GetReadSegment(segment, segmentLen); |
|
748 if (NS_FAILED(rv)) { |
|
749 // ignore this error if we've already read something. |
|
750 if (*readCount > 0) { |
|
751 rv = NS_OK; |
|
752 break; |
|
753 } |
|
754 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { |
|
755 // pipe is empty |
|
756 if (!mBlocking) |
|
757 break; |
|
758 // wait for some data to be written to the pipe |
|
759 rv = Wait(); |
|
760 if (NS_SUCCEEDED(rv)) |
|
761 continue; |
|
762 } |
|
763 // ignore this error, just return. |
|
764 if (rv == NS_BASE_STREAM_CLOSED) { |
|
765 rv = NS_OK; |
|
766 break; |
|
767 } |
|
768 mPipe->OnPipeException(rv); |
|
769 break; |
|
770 } |
|
771 |
|
772 // read no more than count |
|
773 if (segmentLen > count) |
|
774 segmentLen = count; |
|
775 |
|
776 uint32_t writeCount, originalLen = segmentLen; |
|
777 while (segmentLen) { |
|
778 writeCount = 0; |
|
779 |
|
780 rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount); |
|
781 |
|
782 if (NS_FAILED(rv) || writeCount == 0) { |
|
783 count = 0; |
|
784 // any errors returned from the writer end here: do not |
|
785 // propagate to the caller of ReadSegments. |
|
786 rv = NS_OK; |
|
787 break; |
|
788 } |
|
789 |
|
790 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected"); |
|
791 segment += writeCount; |
|
792 segmentLen -= writeCount; |
|
793 count -= writeCount; |
|
794 *readCount += writeCount; |
|
795 mLogicalOffset += writeCount; |
|
796 } |
|
797 |
|
798 if (segmentLen < originalLen) |
|
799 mPipe->AdvanceReadCursor(originalLen - segmentLen); |
|
800 } |
|
801 |
|
802 return rv; |
|
803 } |
|
804 |
|
805 NS_IMETHODIMP |
|
806 nsPipeInputStream::Read(char* toBuf, uint32_t bufLen, uint32_t *readCount) |
|
807 { |
|
808 return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount); |
|
809 } |
|
810 |
|
811 NS_IMETHODIMP |
|
812 nsPipeInputStream::IsNonBlocking(bool *aNonBlocking) |
|
813 { |
|
814 *aNonBlocking = !mBlocking; |
|
815 return NS_OK; |
|
816 } |
|
817 |
|
818 NS_IMETHODIMP |
|
819 nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback, |
|
820 uint32_t flags, |
|
821 uint32_t requestedCount, |
|
822 nsIEventTarget *target) |
|
823 { |
|
824 LOG(("III AsyncWait [this=%x]\n", this)); |
|
825 |
|
826 nsPipeEvents pipeEvents; |
|
827 { |
|
828 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
829 |
|
830 // replace a pending callback |
|
831 mCallback = 0; |
|
832 mCallbackFlags = 0; |
|
833 |
|
834 if (!callback) |
|
835 return NS_OK; |
|
836 |
|
837 nsCOMPtr<nsIInputStreamCallback> proxy; |
|
838 if (target) { |
|
839 proxy = NS_NewInputStreamReadyEvent(callback, target); |
|
840 callback = proxy; |
|
841 } |
|
842 |
|
843 if (NS_FAILED(mPipe->mStatus) || |
|
844 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) { |
|
845 // stream is already closed or readable; post event. |
|
846 pipeEvents.NotifyInputReady(this, callback); |
|
847 } |
|
848 else { |
|
849 // queue up callback object to be notified when data becomes available |
|
850 mCallback = callback; |
|
851 mCallbackFlags = flags; |
|
852 } |
|
853 } |
|
854 return NS_OK; |
|
855 } |
|
856 |
|
857 NS_IMETHODIMP |
|
858 nsPipeInputStream::Seek(int32_t whence, int64_t offset) |
|
859 { |
|
860 NS_NOTREACHED("nsPipeInputStream::Seek"); |
|
861 return NS_ERROR_NOT_IMPLEMENTED; |
|
862 } |
|
863 |
|
864 NS_IMETHODIMP |
|
865 nsPipeInputStream::Tell(int64_t *offset) |
|
866 { |
|
867 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
868 |
|
869 // return error if pipe closed |
|
870 if (!mAvailable && NS_FAILED(mPipe->mStatus)) |
|
871 return mPipe->mStatus; |
|
872 |
|
873 *offset = mLogicalOffset; |
|
874 return NS_OK; |
|
875 } |
|
876 |
|
877 NS_IMETHODIMP |
|
878 nsPipeInputStream::SetEOF() |
|
879 { |
|
880 NS_NOTREACHED("nsPipeInputStream::SetEOF"); |
|
881 return NS_ERROR_NOT_IMPLEMENTED; |
|
882 } |
|
883 |
|
884 #define COMPARE(s1, s2, i) \ |
|
885 (ignoreCase \ |
|
886 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \ |
|
887 : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i)) |
|
888 |
|
889 NS_IMETHODIMP |
|
890 nsPipeInputStream::Search(const char *forString, |
|
891 bool ignoreCase, |
|
892 bool *found, |
|
893 uint32_t *offsetSearchedTo) |
|
894 { |
|
895 LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase)); |
|
896 |
|
897 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
898 |
|
899 char *cursor1, *limit1; |
|
900 uint32_t index = 0, offset = 0; |
|
901 uint32_t strLen = strlen(forString); |
|
902 |
|
903 mPipe->PeekSegment(0, cursor1, limit1); |
|
904 if (cursor1 == limit1) { |
|
905 *found = false; |
|
906 *offsetSearchedTo = 0; |
|
907 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); |
|
908 return NS_OK; |
|
909 } |
|
910 |
|
911 while (true) { |
|
912 uint32_t i, len1 = limit1 - cursor1; |
|
913 |
|
914 // check if the string is in the buffer segment |
|
915 for (i = 0; i < len1 - strLen + 1; i++) { |
|
916 if (COMPARE(&cursor1[i], forString, strLen) == 0) { |
|
917 *found = true; |
|
918 *offsetSearchedTo = offset + i; |
|
919 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); |
|
920 return NS_OK; |
|
921 } |
|
922 } |
|
923 |
|
924 // get the next segment |
|
925 char *cursor2, *limit2; |
|
926 uint32_t len2; |
|
927 |
|
928 index++; |
|
929 offset += len1; |
|
930 |
|
931 mPipe->PeekSegment(index, cursor2, limit2); |
|
932 if (cursor2 == limit2) { |
|
933 *found = false; |
|
934 *offsetSearchedTo = offset - strLen + 1; |
|
935 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); |
|
936 return NS_OK; |
|
937 } |
|
938 len2 = limit2 - cursor2; |
|
939 |
|
940 // check if the string is straddling the next buffer segment |
|
941 uint32_t lim = XPCOM_MIN(strLen, len2 + 1); |
|
942 for (i = 0; i < lim; ++i) { |
|
943 uint32_t strPart1Len = strLen - i - 1; |
|
944 uint32_t strPart2Len = strLen - strPart1Len; |
|
945 const char* strPart2 = &forString[strLen - strPart2Len]; |
|
946 uint32_t bufSeg1Offset = len1 - strPart1Len; |
|
947 if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 && |
|
948 COMPARE(cursor2, strPart2, strPart2Len) == 0) { |
|
949 *found = true; |
|
950 *offsetSearchedTo = offset - strPart1Len; |
|
951 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); |
|
952 return NS_OK; |
|
953 } |
|
954 } |
|
955 |
|
956 // finally continue with the next buffer |
|
957 cursor1 = cursor2; |
|
958 limit1 = limit2; |
|
959 } |
|
960 |
|
961 NS_NOTREACHED("can't get here"); |
|
962 return NS_ERROR_UNEXPECTED; // keep compiler happy |
|
963 } |
|
964 |
|
965 //----------------------------------------------------------------------------- |
|
966 // nsPipeOutputStream methods: |
|
967 //----------------------------------------------------------------------------- |
|
968 |
|
969 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, |
|
970 nsIOutputStream, |
|
971 nsIAsyncOutputStream, |
|
972 nsIClassInfo) |
|
973 |
|
974 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, |
|
975 nsIOutputStream, |
|
976 nsIAsyncOutputStream) |
|
977 |
|
978 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream) |
|
979 |
|
980 nsresult |
|
981 nsPipeOutputStream::Wait() |
|
982 { |
|
983 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream"); |
|
984 |
|
985 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
986 |
|
987 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { |
|
988 LOG(("OOO pipe output: waiting for space\n")); |
|
989 mBlocked = true; |
|
990 mon.Wait(); |
|
991 mBlocked = false; |
|
992 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n", |
|
993 mPipe->mStatus, mWritable)); |
|
994 } |
|
995 |
|
996 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; |
|
997 } |
|
998 |
|
999 bool |
|
1000 nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events) |
|
1001 { |
|
1002 bool result = false; |
|
1003 |
|
1004 mWritable = true; |
|
1005 |
|
1006 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { |
|
1007 events.NotifyOutputReady(this, mCallback); |
|
1008 mCallback = 0; |
|
1009 mCallbackFlags = 0; |
|
1010 } |
|
1011 else if (mBlocked) |
|
1012 result = true; |
|
1013 |
|
1014 return result; |
|
1015 } |
|
1016 |
|
1017 bool |
|
1018 nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events) |
|
1019 { |
|
1020 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", |
|
1021 this, reason)); |
|
1022 |
|
1023 bool result = false; |
|
1024 |
|
1025 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); |
|
1026 mWritable = false; |
|
1027 |
|
1028 if (mCallback) { |
|
1029 events.NotifyOutputReady(this, mCallback); |
|
1030 mCallback = 0; |
|
1031 mCallbackFlags = 0; |
|
1032 } |
|
1033 else if (mBlocked) |
|
1034 result = true; |
|
1035 |
|
1036 return result; |
|
1037 } |
|
1038 |
|
1039 |
|
1040 NS_IMETHODIMP_(MozExternalRefCountType) |
|
1041 nsPipeOutputStream::AddRef() |
|
1042 { |
|
1043 ++mWriterRefCnt; |
|
1044 return mPipe->AddRef(); |
|
1045 } |
|
1046 |
|
1047 NS_IMETHODIMP_(MozExternalRefCountType) |
|
1048 nsPipeOutputStream::Release() |
|
1049 { |
|
1050 if (--mWriterRefCnt == 0) |
|
1051 Close(); |
|
1052 return mPipe->Release(); |
|
1053 } |
|
1054 |
|
1055 NS_IMETHODIMP |
|
1056 nsPipeOutputStream::CloseWithStatus(nsresult reason) |
|
1057 { |
|
1058 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason)); |
|
1059 |
|
1060 if (NS_SUCCEEDED(reason)) |
|
1061 reason = NS_BASE_STREAM_CLOSED; |
|
1062 |
|
1063 // input stream may remain open |
|
1064 mPipe->OnPipeException(reason, true); |
|
1065 return NS_OK; |
|
1066 } |
|
1067 |
|
1068 NS_IMETHODIMP |
|
1069 nsPipeOutputStream::Close() |
|
1070 { |
|
1071 return CloseWithStatus(NS_BASE_STREAM_CLOSED); |
|
1072 } |
|
1073 |
|
1074 NS_IMETHODIMP |
|
1075 nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader, |
|
1076 void* closure, |
|
1077 uint32_t count, |
|
1078 uint32_t *writeCount) |
|
1079 { |
|
1080 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count)); |
|
1081 |
|
1082 nsresult rv = NS_OK; |
|
1083 |
|
1084 char *segment; |
|
1085 uint32_t segmentLen; |
|
1086 |
|
1087 *writeCount = 0; |
|
1088 while (count) { |
|
1089 rv = mPipe->GetWriteSegment(segment, segmentLen); |
|
1090 if (NS_FAILED(rv)) { |
|
1091 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { |
|
1092 // pipe is full |
|
1093 if (!mBlocking) { |
|
1094 // ignore this error if we've already written something |
|
1095 if (*writeCount > 0) |
|
1096 rv = NS_OK; |
|
1097 break; |
|
1098 } |
|
1099 // wait for the pipe to have an empty segment. |
|
1100 rv = Wait(); |
|
1101 if (NS_SUCCEEDED(rv)) |
|
1102 continue; |
|
1103 } |
|
1104 mPipe->OnPipeException(rv); |
|
1105 break; |
|
1106 } |
|
1107 |
|
1108 // write no more than count |
|
1109 if (segmentLen > count) |
|
1110 segmentLen = count; |
|
1111 |
|
1112 uint32_t readCount, originalLen = segmentLen; |
|
1113 while (segmentLen) { |
|
1114 readCount = 0; |
|
1115 |
|
1116 rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount); |
|
1117 |
|
1118 if (NS_FAILED(rv) || readCount == 0) { |
|
1119 count = 0; |
|
1120 // any errors returned from the reader end here: do not |
|
1121 // propagate to the caller of WriteSegments. |
|
1122 rv = NS_OK; |
|
1123 break; |
|
1124 } |
|
1125 |
|
1126 NS_ASSERTION(readCount <= segmentLen, "read more than expected"); |
|
1127 segment += readCount; |
|
1128 segmentLen -= readCount; |
|
1129 count -= readCount; |
|
1130 *writeCount += readCount; |
|
1131 mLogicalOffset += readCount; |
|
1132 } |
|
1133 |
|
1134 if (segmentLen < originalLen) |
|
1135 mPipe->AdvanceWriteCursor(originalLen - segmentLen); |
|
1136 } |
|
1137 |
|
1138 return rv; |
|
1139 } |
|
1140 |
|
1141 static NS_METHOD |
|
1142 nsReadFromRawBuffer(nsIOutputStream* outStr, |
|
1143 void* closure, |
|
1144 char* toRawSegment, |
|
1145 uint32_t offset, |
|
1146 uint32_t count, |
|
1147 uint32_t *readCount) |
|
1148 { |
|
1149 const char* fromBuf = (const char*)closure; |
|
1150 memcpy(toRawSegment, &fromBuf[offset], count); |
|
1151 *readCount = count; |
|
1152 return NS_OK; |
|
1153 } |
|
1154 |
|
1155 NS_IMETHODIMP |
|
1156 nsPipeOutputStream::Write(const char* fromBuf, |
|
1157 uint32_t bufLen, |
|
1158 uint32_t *writeCount) |
|
1159 { |
|
1160 return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount); |
|
1161 } |
|
1162 |
|
1163 NS_IMETHODIMP |
|
1164 nsPipeOutputStream::Flush(void) |
|
1165 { |
|
1166 // nothing to do |
|
1167 return NS_OK; |
|
1168 } |
|
1169 |
|
1170 static NS_METHOD |
|
1171 nsReadFromInputStream(nsIOutputStream* outStr, |
|
1172 void* closure, |
|
1173 char* toRawSegment, |
|
1174 uint32_t offset, |
|
1175 uint32_t count, |
|
1176 uint32_t *readCount) |
|
1177 { |
|
1178 nsIInputStream* fromStream = (nsIInputStream*)closure; |
|
1179 return fromStream->Read(toRawSegment, count, readCount); |
|
1180 } |
|
1181 |
|
1182 NS_IMETHODIMP |
|
1183 nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream, |
|
1184 uint32_t count, |
|
1185 uint32_t *writeCount) |
|
1186 { |
|
1187 return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount); |
|
1188 } |
|
1189 |
|
1190 NS_IMETHODIMP |
|
1191 nsPipeOutputStream::IsNonBlocking(bool *aNonBlocking) |
|
1192 { |
|
1193 *aNonBlocking = !mBlocking; |
|
1194 return NS_OK; |
|
1195 } |
|
1196 |
|
1197 NS_IMETHODIMP |
|
1198 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback, |
|
1199 uint32_t flags, |
|
1200 uint32_t requestedCount, |
|
1201 nsIEventTarget *target) |
|
1202 { |
|
1203 LOG(("OOO AsyncWait [this=%x]\n", this)); |
|
1204 |
|
1205 nsPipeEvents pipeEvents; |
|
1206 { |
|
1207 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
|
1208 |
|
1209 // replace a pending callback |
|
1210 mCallback = 0; |
|
1211 mCallbackFlags = 0; |
|
1212 |
|
1213 if (!callback) |
|
1214 return NS_OK; |
|
1215 |
|
1216 nsCOMPtr<nsIOutputStreamCallback> proxy; |
|
1217 if (target) { |
|
1218 proxy = NS_NewOutputStreamReadyEvent(callback, target); |
|
1219 callback = proxy; |
|
1220 } |
|
1221 |
|
1222 if (NS_FAILED(mPipe->mStatus) || |
|
1223 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) { |
|
1224 // stream is already closed or writable; post event. |
|
1225 pipeEvents.NotifyOutputReady(this, callback); |
|
1226 } |
|
1227 else { |
|
1228 // queue up callback object to be notified when data becomes available |
|
1229 mCallback = callback; |
|
1230 mCallbackFlags = flags; |
|
1231 } |
|
1232 } |
|
1233 return NS_OK; |
|
1234 } |
|
1235 |
|
1236 //////////////////////////////////////////////////////////////////////////////// |
|
1237 |
|
1238 nsresult |
|
1239 NS_NewPipe(nsIInputStream **pipeIn, |
|
1240 nsIOutputStream **pipeOut, |
|
1241 uint32_t segmentSize, |
|
1242 uint32_t maxSize, |
|
1243 bool nonBlockingInput, |
|
1244 bool nonBlockingOutput) |
|
1245 { |
|
1246 if (segmentSize == 0) |
|
1247 segmentSize = DEFAULT_SEGMENT_SIZE; |
|
1248 |
|
1249 // Handle maxSize of UINT32_MAX as a special case |
|
1250 uint32_t segmentCount; |
|
1251 if (maxSize == UINT32_MAX) |
|
1252 segmentCount = UINT32_MAX; |
|
1253 else |
|
1254 segmentCount = maxSize / segmentSize; |
|
1255 |
|
1256 nsIAsyncInputStream *in; |
|
1257 nsIAsyncOutputStream *out; |
|
1258 nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, |
|
1259 segmentSize, segmentCount); |
|
1260 if (NS_FAILED(rv)) return rv; |
|
1261 |
|
1262 *pipeIn = in; |
|
1263 *pipeOut = out; |
|
1264 return NS_OK; |
|
1265 } |
|
1266 |
|
1267 nsresult |
|
1268 NS_NewPipe2(nsIAsyncInputStream **pipeIn, |
|
1269 nsIAsyncOutputStream **pipeOut, |
|
1270 bool nonBlockingInput, |
|
1271 bool nonBlockingOutput, |
|
1272 uint32_t segmentSize, |
|
1273 uint32_t segmentCount) |
|
1274 { |
|
1275 nsresult rv; |
|
1276 |
|
1277 nsPipe *pipe = new nsPipe(); |
|
1278 if (!pipe) |
|
1279 return NS_ERROR_OUT_OF_MEMORY; |
|
1280 |
|
1281 rv = pipe->Init(nonBlockingInput, |
|
1282 nonBlockingOutput, |
|
1283 segmentSize, |
|
1284 segmentCount); |
|
1285 if (NS_FAILED(rv)) { |
|
1286 NS_ADDREF(pipe); |
|
1287 NS_RELEASE(pipe); |
|
1288 return rv; |
|
1289 } |
|
1290 |
|
1291 pipe->GetInputStream(pipeIn); |
|
1292 pipe->GetOutputStream(pipeOut); |
|
1293 return NS_OK; |
|
1294 } |
|
1295 |
|
1296 nsresult |
|
1297 nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result) |
|
1298 { |
|
1299 if (outer) |
|
1300 return NS_ERROR_NO_AGGREGATION; |
|
1301 nsPipe *pipe = new nsPipe(); |
|
1302 if (!pipe) |
|
1303 return NS_ERROR_OUT_OF_MEMORY; |
|
1304 NS_ADDREF(pipe); |
|
1305 nsresult rv = pipe->QueryInterface(iid, result); |
|
1306 NS_RELEASE(pipe); |
|
1307 return rv; |
|
1308 } |
|
1309 |
|
1310 //////////////////////////////////////////////////////////////////////////////// |