Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 /**
7 * The multiplex stream concatenates a list of input streams into a single
8 * stream.
9 */
11 #include "mozilla/Attributes.h"
12 #include "mozilla/MathAlgorithms.h"
14 #include "base/basictypes.h"
16 #include "nsMultiplexInputStream.h"
17 #include "nsIMultiplexInputStream.h"
18 #include "nsISeekableStream.h"
19 #include "nsCOMPtr.h"
20 #include "nsCOMArray.h"
21 #include "nsIClassInfoImpl.h"
22 #include "nsIIPCSerializableInputStream.h"
23 #include "mozilla/ipc/InputStreamUtils.h"
25 using namespace mozilla::ipc;
27 using mozilla::DeprecatedAbs;
29 class nsMultiplexInputStream MOZ_FINAL : public nsIMultiplexInputStream,
30 public nsISeekableStream,
31 public nsIIPCSerializableInputStream
32 {
33 public:
34 nsMultiplexInputStream();
36 NS_DECL_THREADSAFE_ISUPPORTS
37 NS_DECL_NSIINPUTSTREAM
38 NS_DECL_NSIMULTIPLEXINPUTSTREAM
39 NS_DECL_NSISEEKABLESTREAM
40 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
42 private:
43 ~nsMultiplexInputStream() {}
45 struct ReadSegmentsState {
46 nsIInputStream* mThisStream;
47 uint32_t mOffset;
48 nsWriteSegmentFun mWriter;
49 void* mClosure;
50 bool mDone;
51 };
53 static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure,
54 const char* aFromRawSegment, uint32_t aToOffset,
55 uint32_t aCount, uint32_t *aWriteCount);
57 nsTArray<nsCOMPtr<nsIInputStream> > mStreams;
58 uint32_t mCurrentStream;
59 bool mStartedReadingCurrent;
60 nsresult mStatus;
61 };
63 NS_IMPL_ADDREF(nsMultiplexInputStream)
64 NS_IMPL_RELEASE(nsMultiplexInputStream)
66 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
67 NS_MULTIPLEXINPUTSTREAM_CID)
69 NS_IMPL_QUERY_INTERFACE_CI(nsMultiplexInputStream,
70 nsIMultiplexInputStream,
71 nsIInputStream,
72 nsISeekableStream,
73 nsIIPCSerializableInputStream)
74 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
75 nsIMultiplexInputStream,
76 nsIInputStream,
77 nsISeekableStream)
79 nsMultiplexInputStream::nsMultiplexInputStream()
80 : mCurrentStream(0),
81 mStartedReadingCurrent(false),
82 mStatus(NS_OK)
83 {
84 }
86 /* readonly attribute unsigned long count; */
87 NS_IMETHODIMP
88 nsMultiplexInputStream::GetCount(uint32_t *aCount)
89 {
90 *aCount = mStreams.Length();
91 return NS_OK;
92 }
94 #ifdef DEBUG
95 static bool
96 SeekableStreamAtBeginning(nsIInputStream *aStream)
97 {
98 int64_t streamPos;
99 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(aStream);
100 if (stream && NS_SUCCEEDED(stream->Tell(&streamPos)) && streamPos != 0) {
101 return false;
102 }
103 return true;
104 }
105 #endif
107 /* void appendStream (in nsIInputStream stream); */
108 NS_IMETHODIMP
109 nsMultiplexInputStream::AppendStream(nsIInputStream *aStream)
110 {
111 NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Appended stream not at beginning.");
112 return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
113 }
115 /* void insertStream (in nsIInputStream stream, in unsigned long index); */
116 NS_IMETHODIMP
117 nsMultiplexInputStream::InsertStream(nsIInputStream *aStream, uint32_t aIndex)
118 {
119 NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Inserted stream not at beginning.");
120 mStreams.InsertElementAt(aIndex, aStream);
121 if (mCurrentStream > aIndex ||
122 (mCurrentStream == aIndex && mStartedReadingCurrent))
123 ++mCurrentStream;
124 return NS_OK;
125 }
127 /* void removeStream (in unsigned long index); */
128 NS_IMETHODIMP
129 nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
130 {
131 mStreams.RemoveElementAt(aIndex);
132 if (mCurrentStream > aIndex)
133 --mCurrentStream;
134 else if (mCurrentStream == aIndex)
135 mStartedReadingCurrent = false;
137 return NS_OK;
138 }
140 /* nsIInputStream getStream (in unsigned long index); */
141 NS_IMETHODIMP
142 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream **_retval)
143 {
144 *_retval = mStreams.SafeElementAt(aIndex, nullptr);
145 if (NS_WARN_IF(!*_retval))
146 return NS_ERROR_NOT_AVAILABLE;
148 NS_ADDREF(*_retval);
149 return NS_OK;
150 }
152 /* void close (); */
153 NS_IMETHODIMP
154 nsMultiplexInputStream::Close()
155 {
156 mStatus = NS_BASE_STREAM_CLOSED;
158 nsresult rv = NS_OK;
160 uint32_t len = mStreams.Length();
161 for (uint32_t i = 0; i < len; ++i) {
162 nsresult rv2 = mStreams[i]->Close();
163 // We still want to close all streams, but we should return an error
164 if (NS_FAILED(rv2))
165 rv = rv2;
166 }
167 return rv;
168 }
170 /* unsigned long long available (); */
171 NS_IMETHODIMP
172 nsMultiplexInputStream::Available(uint64_t *_retval)
173 {
174 if (NS_FAILED(mStatus))
175 return mStatus;
177 nsresult rv;
178 uint64_t avail = 0;
180 uint32_t len = mStreams.Length();
181 for (uint32_t i = mCurrentStream; i < len; i++) {
182 uint64_t streamAvail;
183 rv = mStreams[i]->Available(&streamAvail);
184 if (NS_WARN_IF(NS_FAILED(rv)))
185 return rv;
186 avail += streamAvail;
187 }
188 *_retval = avail;
189 return NS_OK;
190 }
192 /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
193 NS_IMETHODIMP
194 nsMultiplexInputStream::Read(char * aBuf, uint32_t aCount, uint32_t *_retval)
195 {
196 // It is tempting to implement this method in terms of ReadSegments, but
197 // that would prevent this class from being used with streams that only
198 // implement Read (e.g., file streams).
200 *_retval = 0;
202 if (mStatus == NS_BASE_STREAM_CLOSED)
203 return NS_OK;
204 if (NS_FAILED(mStatus))
205 return mStatus;
207 nsresult rv = NS_OK;
209 uint32_t len = mStreams.Length();
210 while (mCurrentStream < len && aCount) {
211 uint32_t read;
212 rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
214 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
215 // (This is a bug in those stream implementations)
216 if (rv == NS_BASE_STREAM_CLOSED) {
217 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
218 rv = NS_OK;
219 read = 0;
220 }
221 else if (NS_FAILED(rv))
222 break;
224 if (read == 0) {
225 ++mCurrentStream;
226 mStartedReadingCurrent = false;
227 }
228 else {
229 NS_ASSERTION(aCount >= read, "Read more than requested");
230 *_retval += read;
231 aCount -= read;
232 aBuf += read;
233 mStartedReadingCurrent = true;
234 }
235 }
236 return *_retval ? NS_OK : rv;
237 }
239 /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
240 * in voidPtr closure,
241 * in unsigned long count); */
242 NS_IMETHODIMP
243 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void *aClosure,
244 uint32_t aCount, uint32_t *_retval)
245 {
246 if (mStatus == NS_BASE_STREAM_CLOSED) {
247 *_retval = 0;
248 return NS_OK;
249 }
250 if (NS_FAILED(mStatus))
251 return mStatus;
253 NS_ASSERTION(aWriter, "missing aWriter");
255 nsresult rv = NS_OK;
256 ReadSegmentsState state;
257 state.mThisStream = this;
258 state.mOffset = 0;
259 state.mWriter = aWriter;
260 state.mClosure = aClosure;
261 state.mDone = false;
263 uint32_t len = mStreams.Length();
264 while (mCurrentStream < len && aCount) {
265 uint32_t read;
266 rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
268 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
269 // (This is a bug in those stream implementations)
270 if (rv == NS_BASE_STREAM_CLOSED) {
271 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
272 rv = NS_OK;
273 read = 0;
274 }
276 // if |aWriter| decided to stop reading segments...
277 if (state.mDone || NS_FAILED(rv))
278 break;
280 // if stream is empty, then advance to the next stream.
281 if (read == 0) {
282 ++mCurrentStream;
283 mStartedReadingCurrent = false;
284 }
285 else {
286 NS_ASSERTION(aCount >= read, "Read more than requested");
287 state.mOffset += read;
288 aCount -= read;
289 mStartedReadingCurrent = true;
290 }
291 }
293 // if we successfully read some data, then this call succeeded.
294 *_retval = state.mOffset;
295 return state.mOffset ? NS_OK : rv;
296 }
298 NS_METHOD
299 nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
300 const char* aFromRawSegment,
301 uint32_t aToOffset, uint32_t aCount,
302 uint32_t *aWriteCount)
303 {
304 nsresult rv;
305 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
306 rv = (state->mWriter)(state->mThisStream,
307 state->mClosure,
308 aFromRawSegment,
309 aToOffset + state->mOffset,
310 aCount,
311 aWriteCount);
312 if (NS_FAILED(rv))
313 state->mDone = true;
314 return rv;
315 }
317 /* readonly attribute boolean nonBlocking; */
318 NS_IMETHODIMP
319 nsMultiplexInputStream::IsNonBlocking(bool *aNonBlocking)
320 {
321 uint32_t len = mStreams.Length();
322 if (len == 0) {
323 // Claim to be non-blocking, since we won't block the caller.
324 // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
325 // so maybe we should claim to be blocking? It probably doesn't
326 // matter in practice.
327 *aNonBlocking = true;
328 return NS_OK;
329 }
330 for (uint32_t i = 0; i < len; ++i) {
331 nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
332 if (NS_WARN_IF(NS_FAILED(rv)))
333 return rv;
334 // If one is non-blocking the entire stream becomes non-blocking
335 // (except that we don't implement nsIAsyncInputStream, so there's
336 // not much for the caller to do if Read returns "would block")
337 if (*aNonBlocking)
338 return NS_OK;
339 }
340 return NS_OK;
341 }
343 /* void seek (in int32_t whence, in int32_t offset); */
344 NS_IMETHODIMP
345 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
346 {
347 if (NS_FAILED(mStatus))
348 return mStatus;
350 nsresult rv;
352 uint32_t oldCurrentStream = mCurrentStream;
353 bool oldStartedReadingCurrent = mStartedReadingCurrent;
355 if (aWhence == NS_SEEK_SET) {
356 int64_t remaining = aOffset;
357 if (aOffset == 0) {
358 mCurrentStream = 0;
359 }
360 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
361 nsCOMPtr<nsISeekableStream> stream =
362 do_QueryInterface(mStreams[i]);
363 if (!stream) {
364 return NS_ERROR_FAILURE;
365 }
367 // See if all remaining streams should be rewound
368 if (remaining == 0) {
369 if (i < oldCurrentStream ||
370 (i == oldCurrentStream && oldStartedReadingCurrent)) {
371 rv = stream->Seek(NS_SEEK_SET, 0);
372 if (NS_WARN_IF(NS_FAILED(rv)))
373 return rv;
374 continue;
375 }
376 else {
377 break;
378 }
379 }
381 // Get position in current stream
382 int64_t streamPos;
383 if (i > oldCurrentStream ||
384 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
385 streamPos = 0;
386 }
387 else {
388 rv = stream->Tell(&streamPos);
389 if (NS_WARN_IF(NS_FAILED(rv)))
390 return rv;
391 }
393 // See if we need to seek current stream forward or backward
394 if (remaining < streamPos) {
395 rv = stream->Seek(NS_SEEK_SET, remaining);
396 if (NS_WARN_IF(NS_FAILED(rv)))
397 return rv;
399 mCurrentStream = i;
400 mStartedReadingCurrent = remaining != 0;
402 remaining = 0;
403 }
404 else if (remaining > streamPos) {
405 if (i < oldCurrentStream) {
406 // We're already at end so no need to seek this stream
407 remaining -= streamPos;
408 NS_ASSERTION(remaining >= 0, "Remaining invalid");
409 }
410 else {
411 uint64_t avail;
412 rv = mStreams[i]->Available(&avail);
413 if (NS_WARN_IF(NS_FAILED(rv)))
414 return rv;
416 int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
418 rv = stream->Seek(NS_SEEK_SET, newPos);
419 if (NS_WARN_IF(NS_FAILED(rv)))
420 return rv;
422 mCurrentStream = i;
423 mStartedReadingCurrent = true;
425 remaining -= newPos;
426 NS_ASSERTION(remaining >= 0, "Remaining invalid");
427 }
428 }
429 else {
430 NS_ASSERTION(remaining == streamPos, "Huh?");
431 remaining = 0;
432 }
433 }
435 return NS_OK;
436 }
438 if (aWhence == NS_SEEK_CUR && aOffset > 0) {
439 int64_t remaining = aOffset;
440 for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
441 nsCOMPtr<nsISeekableStream> stream =
442 do_QueryInterface(mStreams[i]);
444 uint64_t avail;
445 rv = mStreams[i]->Available(&avail);
446 if (NS_WARN_IF(NS_FAILED(rv)))
447 return rv;
449 int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
451 rv = stream->Seek(NS_SEEK_CUR, seek);
452 if (NS_WARN_IF(NS_FAILED(rv)))
453 return rv;
455 mCurrentStream = i;
456 mStartedReadingCurrent = true;
458 remaining -= seek;
459 }
461 return NS_OK;
462 }
464 if (aWhence == NS_SEEK_CUR && aOffset < 0) {
465 int64_t remaining = -aOffset;
466 for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
467 nsCOMPtr<nsISeekableStream> stream =
468 do_QueryInterface(mStreams[i]);
470 int64_t pos;
471 rv = stream->Tell(&pos);
472 if (NS_WARN_IF(NS_FAILED(rv)))
473 return rv;
475 int64_t seek = XPCOM_MIN(pos, remaining);
477 rv = stream->Seek(NS_SEEK_CUR, -seek);
478 if (NS_WARN_IF(NS_FAILED(rv)))
479 return rv;
481 mCurrentStream = i;
482 mStartedReadingCurrent = seek != -pos;
484 remaining -= seek;
485 }
487 return NS_OK;
488 }
490 if (aWhence == NS_SEEK_CUR) {
491 NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
493 return NS_OK;
494 }
496 if (aWhence == NS_SEEK_END) {
497 if (aOffset > 0) {
498 return NS_ERROR_INVALID_ARG;
499 }
500 int64_t remaining = aOffset;
501 for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
502 nsCOMPtr<nsISeekableStream> stream =
503 do_QueryInterface(mStreams[i]);
505 // See if all remaining streams should be seeked to end
506 if (remaining == 0) {
507 if (i >= oldCurrentStream) {
508 rv = stream->Seek(NS_SEEK_END, 0);
509 if (NS_WARN_IF(NS_FAILED(rv)))
510 return rv;
511 }
512 else {
513 break;
514 }
515 }
517 // Get position in current stream
518 int64_t streamPos;
519 if (i < oldCurrentStream) {
520 streamPos = 0;
521 } else {
522 uint64_t avail;
523 rv = mStreams[i]->Available(&avail);
524 if (NS_WARN_IF(NS_FAILED(rv)))
525 return rv;
527 streamPos = avail;
528 }
530 // See if we have enough data in the current stream.
531 if (DeprecatedAbs(remaining) < streamPos) {
532 rv = stream->Seek(NS_SEEK_END, remaining);
533 if (NS_WARN_IF(NS_FAILED(rv)))
534 return rv;
536 mCurrentStream = i;
537 mStartedReadingCurrent = true;
539 remaining = 0;
540 } else if (DeprecatedAbs(remaining) > streamPos) {
541 if (i > oldCurrentStream ||
542 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
543 // We're already at start so no need to seek this stream
544 remaining += streamPos;
545 } else {
546 int64_t avail;
547 rv = stream->Tell(&avail);
548 if (NS_WARN_IF(NS_FAILED(rv)))
549 return rv;
551 int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
553 rv = stream->Seek(NS_SEEK_END, -newPos);
554 if (NS_WARN_IF(NS_FAILED(rv)))
555 return rv;
557 mCurrentStream = i;
558 mStartedReadingCurrent = true;
560 remaining += newPos;
561 }
562 }
563 else {
564 NS_ASSERTION(remaining == streamPos, "Huh?");
565 remaining = 0;
566 }
567 }
569 return NS_OK;
570 }
572 // other Seeks not implemented yet
573 return NS_ERROR_NOT_IMPLEMENTED;
574 }
576 /* uint32_t tell (); */
577 NS_IMETHODIMP
578 nsMultiplexInputStream::Tell(int64_t *_retval)
579 {
580 if (NS_FAILED(mStatus))
581 return mStatus;
583 nsresult rv;
584 int64_t ret64 = 0;
585 uint32_t i, last;
586 last = mStartedReadingCurrent ? mCurrentStream+1 : mCurrentStream;
587 for (i = 0; i < last; ++i) {
588 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
589 if (NS_WARN_IF(!stream))
590 return NS_ERROR_NO_INTERFACE;
592 int64_t pos;
593 rv = stream->Tell(&pos);
594 if (NS_WARN_IF(NS_FAILED(rv)))
595 return rv;
596 ret64 += pos;
597 }
598 *_retval = ret64;
600 return NS_OK;
601 }
603 /* void setEOF (); */
604 NS_IMETHODIMP
605 nsMultiplexInputStream::SetEOF()
606 {
607 return NS_ERROR_NOT_IMPLEMENTED;
608 }
610 nsresult
611 nsMultiplexInputStreamConstructor(nsISupports *outer,
612 REFNSIID iid,
613 void **result)
614 {
615 *result = nullptr;
617 if (outer)
618 return NS_ERROR_NO_AGGREGATION;
620 nsMultiplexInputStream *inst = new nsMultiplexInputStream();
621 if (!inst)
622 return NS_ERROR_OUT_OF_MEMORY;
624 NS_ADDREF(inst);
625 nsresult rv = inst->QueryInterface(iid, result);
626 NS_RELEASE(inst);
628 return rv;
629 }
631 void
632 nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
633 FileDescriptorArray& aFileDescriptors)
634 {
635 MultiplexInputStreamParams params;
637 uint32_t streamCount = mStreams.Length();
639 if (streamCount) {
640 InfallibleTArray<InputStreamParams>& streams = params.streams();
642 streams.SetCapacity(streamCount);
643 for (uint32_t index = 0; index < streamCount; index++) {
644 InputStreamParams childStreamParams;
645 SerializeInputStream(mStreams[index], childStreamParams,
646 aFileDescriptors);
648 streams.AppendElement(childStreamParams);
649 }
650 }
652 params.currentStream() = mCurrentStream;
653 params.status() = mStatus;
654 params.startedReadingCurrent() = mStartedReadingCurrent;
656 aParams = params;
657 }
659 bool
660 nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
661 const FileDescriptorArray& aFileDescriptors)
662 {
663 if (aParams.type() !=
664 InputStreamParams::TMultiplexInputStreamParams) {
665 NS_ERROR("Received unknown parameters from the other process!");
666 return false;
667 }
669 const MultiplexInputStreamParams& params =
670 aParams.get_MultiplexInputStreamParams();
672 const InfallibleTArray<InputStreamParams>& streams = params.streams();
674 uint32_t streamCount = streams.Length();
675 for (uint32_t index = 0; index < streamCount; index++) {
676 nsCOMPtr<nsIInputStream> stream =
677 DeserializeInputStream(streams[index], aFileDescriptors);
678 if (!stream) {
679 NS_WARNING("Deserialize failed!");
680 return false;
681 }
683 if (NS_FAILED(AppendStream(stream))) {
684 NS_WARNING("AppendStream failed!");
685 return false;
686 }
687 }
689 mCurrentStream = params.currentStream();
690 mStatus = params.status();
691 mStartedReadingCurrent = params.startedReadingCurrent();
693 return true;
694 }