xpcom/io/nsMultiplexInputStream.cpp

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
michael@0 2 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 3 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 5
michael@0 6 /**
michael@0 7 * The multiplex stream concatenates a list of input streams into a single
michael@0 8 * stream.
michael@0 9 */
michael@0 10
michael@0 11 #include "mozilla/Attributes.h"
michael@0 12 #include "mozilla/MathAlgorithms.h"
michael@0 13
michael@0 14 #include "base/basictypes.h"
michael@0 15
michael@0 16 #include "nsMultiplexInputStream.h"
michael@0 17 #include "nsIMultiplexInputStream.h"
michael@0 18 #include "nsISeekableStream.h"
michael@0 19 #include "nsCOMPtr.h"
michael@0 20 #include "nsCOMArray.h"
michael@0 21 #include "nsIClassInfoImpl.h"
michael@0 22 #include "nsIIPCSerializableInputStream.h"
michael@0 23 #include "mozilla/ipc/InputStreamUtils.h"
michael@0 24
michael@0 25 using namespace mozilla::ipc;
michael@0 26
michael@0 27 using mozilla::DeprecatedAbs;
michael@0 28
michael@0 29 class nsMultiplexInputStream MOZ_FINAL : public nsIMultiplexInputStream,
michael@0 30 public nsISeekableStream,
michael@0 31 public nsIIPCSerializableInputStream
michael@0 32 {
michael@0 33 public:
michael@0 34 nsMultiplexInputStream();
michael@0 35
michael@0 36 NS_DECL_THREADSAFE_ISUPPORTS
michael@0 37 NS_DECL_NSIINPUTSTREAM
michael@0 38 NS_DECL_NSIMULTIPLEXINPUTSTREAM
michael@0 39 NS_DECL_NSISEEKABLESTREAM
michael@0 40 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
michael@0 41
michael@0 42 private:
michael@0 43 ~nsMultiplexInputStream() {}
michael@0 44
michael@0 45 struct ReadSegmentsState {
michael@0 46 nsIInputStream* mThisStream;
michael@0 47 uint32_t mOffset;
michael@0 48 nsWriteSegmentFun mWriter;
michael@0 49 void* mClosure;
michael@0 50 bool mDone;
michael@0 51 };
michael@0 52
michael@0 53 static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure,
michael@0 54 const char* aFromRawSegment, uint32_t aToOffset,
michael@0 55 uint32_t aCount, uint32_t *aWriteCount);
michael@0 56
michael@0 57 nsTArray<nsCOMPtr<nsIInputStream> > mStreams;
michael@0 58 uint32_t mCurrentStream;
michael@0 59 bool mStartedReadingCurrent;
michael@0 60 nsresult mStatus;
michael@0 61 };
michael@0 62
michael@0 63 NS_IMPL_ADDREF(nsMultiplexInputStream)
michael@0 64 NS_IMPL_RELEASE(nsMultiplexInputStream)
michael@0 65
michael@0 66 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
michael@0 67 NS_MULTIPLEXINPUTSTREAM_CID)
michael@0 68
michael@0 69 NS_IMPL_QUERY_INTERFACE_CI(nsMultiplexInputStream,
michael@0 70 nsIMultiplexInputStream,
michael@0 71 nsIInputStream,
michael@0 72 nsISeekableStream,
michael@0 73 nsIIPCSerializableInputStream)
michael@0 74 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
michael@0 75 nsIMultiplexInputStream,
michael@0 76 nsIInputStream,
michael@0 77 nsISeekableStream)
michael@0 78
michael@0 79 nsMultiplexInputStream::nsMultiplexInputStream()
michael@0 80 : mCurrentStream(0),
michael@0 81 mStartedReadingCurrent(false),
michael@0 82 mStatus(NS_OK)
michael@0 83 {
michael@0 84 }
michael@0 85
michael@0 86 /* readonly attribute unsigned long count; */
michael@0 87 NS_IMETHODIMP
michael@0 88 nsMultiplexInputStream::GetCount(uint32_t *aCount)
michael@0 89 {
michael@0 90 *aCount = mStreams.Length();
michael@0 91 return NS_OK;
michael@0 92 }
michael@0 93
michael@0 94 #ifdef DEBUG
michael@0 95 static bool
michael@0 96 SeekableStreamAtBeginning(nsIInputStream *aStream)
michael@0 97 {
michael@0 98 int64_t streamPos;
michael@0 99 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(aStream);
michael@0 100 if (stream && NS_SUCCEEDED(stream->Tell(&streamPos)) && streamPos != 0) {
michael@0 101 return false;
michael@0 102 }
michael@0 103 return true;
michael@0 104 }
michael@0 105 #endif
michael@0 106
michael@0 107 /* void appendStream (in nsIInputStream stream); */
michael@0 108 NS_IMETHODIMP
michael@0 109 nsMultiplexInputStream::AppendStream(nsIInputStream *aStream)
michael@0 110 {
michael@0 111 NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Appended stream not at beginning.");
michael@0 112 return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
michael@0 113 }
michael@0 114
michael@0 115 /* void insertStream (in nsIInputStream stream, in unsigned long index); */
michael@0 116 NS_IMETHODIMP
michael@0 117 nsMultiplexInputStream::InsertStream(nsIInputStream *aStream, uint32_t aIndex)
michael@0 118 {
michael@0 119 NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Inserted stream not at beginning.");
michael@0 120 mStreams.InsertElementAt(aIndex, aStream);
michael@0 121 if (mCurrentStream > aIndex ||
michael@0 122 (mCurrentStream == aIndex && mStartedReadingCurrent))
michael@0 123 ++mCurrentStream;
michael@0 124 return NS_OK;
michael@0 125 }
michael@0 126
michael@0 127 /* void removeStream (in unsigned long index); */
michael@0 128 NS_IMETHODIMP
michael@0 129 nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
michael@0 130 {
michael@0 131 mStreams.RemoveElementAt(aIndex);
michael@0 132 if (mCurrentStream > aIndex)
michael@0 133 --mCurrentStream;
michael@0 134 else if (mCurrentStream == aIndex)
michael@0 135 mStartedReadingCurrent = false;
michael@0 136
michael@0 137 return NS_OK;
michael@0 138 }
michael@0 139
michael@0 140 /* nsIInputStream getStream (in unsigned long index); */
michael@0 141 NS_IMETHODIMP
michael@0 142 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream **_retval)
michael@0 143 {
michael@0 144 *_retval = mStreams.SafeElementAt(aIndex, nullptr);
michael@0 145 if (NS_WARN_IF(!*_retval))
michael@0 146 return NS_ERROR_NOT_AVAILABLE;
michael@0 147
michael@0 148 NS_ADDREF(*_retval);
michael@0 149 return NS_OK;
michael@0 150 }
michael@0 151
michael@0 152 /* void close (); */
michael@0 153 NS_IMETHODIMP
michael@0 154 nsMultiplexInputStream::Close()
michael@0 155 {
michael@0 156 mStatus = NS_BASE_STREAM_CLOSED;
michael@0 157
michael@0 158 nsresult rv = NS_OK;
michael@0 159
michael@0 160 uint32_t len = mStreams.Length();
michael@0 161 for (uint32_t i = 0; i < len; ++i) {
michael@0 162 nsresult rv2 = mStreams[i]->Close();
michael@0 163 // We still want to close all streams, but we should return an error
michael@0 164 if (NS_FAILED(rv2))
michael@0 165 rv = rv2;
michael@0 166 }
michael@0 167 return rv;
michael@0 168 }
michael@0 169
michael@0 170 /* unsigned long long available (); */
michael@0 171 NS_IMETHODIMP
michael@0 172 nsMultiplexInputStream::Available(uint64_t *_retval)
michael@0 173 {
michael@0 174 if (NS_FAILED(mStatus))
michael@0 175 return mStatus;
michael@0 176
michael@0 177 nsresult rv;
michael@0 178 uint64_t avail = 0;
michael@0 179
michael@0 180 uint32_t len = mStreams.Length();
michael@0 181 for (uint32_t i = mCurrentStream; i < len; i++) {
michael@0 182 uint64_t streamAvail;
michael@0 183 rv = mStreams[i]->Available(&streamAvail);
michael@0 184 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 185 return rv;
michael@0 186 avail += streamAvail;
michael@0 187 }
michael@0 188 *_retval = avail;
michael@0 189 return NS_OK;
michael@0 190 }
michael@0 191
michael@0 192 /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
michael@0 193 NS_IMETHODIMP
michael@0 194 nsMultiplexInputStream::Read(char * aBuf, uint32_t aCount, uint32_t *_retval)
michael@0 195 {
michael@0 196 // It is tempting to implement this method in terms of ReadSegments, but
michael@0 197 // that would prevent this class from being used with streams that only
michael@0 198 // implement Read (e.g., file streams).
michael@0 199
michael@0 200 *_retval = 0;
michael@0 201
michael@0 202 if (mStatus == NS_BASE_STREAM_CLOSED)
michael@0 203 return NS_OK;
michael@0 204 if (NS_FAILED(mStatus))
michael@0 205 return mStatus;
michael@0 206
michael@0 207 nsresult rv = NS_OK;
michael@0 208
michael@0 209 uint32_t len = mStreams.Length();
michael@0 210 while (mCurrentStream < len && aCount) {
michael@0 211 uint32_t read;
michael@0 212 rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
michael@0 213
michael@0 214 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
michael@0 215 // (This is a bug in those stream implementations)
michael@0 216 if (rv == NS_BASE_STREAM_CLOSED) {
michael@0 217 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
michael@0 218 rv = NS_OK;
michael@0 219 read = 0;
michael@0 220 }
michael@0 221 else if (NS_FAILED(rv))
michael@0 222 break;
michael@0 223
michael@0 224 if (read == 0) {
michael@0 225 ++mCurrentStream;
michael@0 226 mStartedReadingCurrent = false;
michael@0 227 }
michael@0 228 else {
michael@0 229 NS_ASSERTION(aCount >= read, "Read more than requested");
michael@0 230 *_retval += read;
michael@0 231 aCount -= read;
michael@0 232 aBuf += read;
michael@0 233 mStartedReadingCurrent = true;
michael@0 234 }
michael@0 235 }
michael@0 236 return *_retval ? NS_OK : rv;
michael@0 237 }
michael@0 238
michael@0 239 /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
michael@0 240 * in voidPtr closure,
michael@0 241 * in unsigned long count); */
michael@0 242 NS_IMETHODIMP
michael@0 243 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void *aClosure,
michael@0 244 uint32_t aCount, uint32_t *_retval)
michael@0 245 {
michael@0 246 if (mStatus == NS_BASE_STREAM_CLOSED) {
michael@0 247 *_retval = 0;
michael@0 248 return NS_OK;
michael@0 249 }
michael@0 250 if (NS_FAILED(mStatus))
michael@0 251 return mStatus;
michael@0 252
michael@0 253 NS_ASSERTION(aWriter, "missing aWriter");
michael@0 254
michael@0 255 nsresult rv = NS_OK;
michael@0 256 ReadSegmentsState state;
michael@0 257 state.mThisStream = this;
michael@0 258 state.mOffset = 0;
michael@0 259 state.mWriter = aWriter;
michael@0 260 state.mClosure = aClosure;
michael@0 261 state.mDone = false;
michael@0 262
michael@0 263 uint32_t len = mStreams.Length();
michael@0 264 while (mCurrentStream < len && aCount) {
michael@0 265 uint32_t read;
michael@0 266 rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
michael@0 267
michael@0 268 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
michael@0 269 // (This is a bug in those stream implementations)
michael@0 270 if (rv == NS_BASE_STREAM_CLOSED) {
michael@0 271 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
michael@0 272 rv = NS_OK;
michael@0 273 read = 0;
michael@0 274 }
michael@0 275
michael@0 276 // if |aWriter| decided to stop reading segments...
michael@0 277 if (state.mDone || NS_FAILED(rv))
michael@0 278 break;
michael@0 279
michael@0 280 // if stream is empty, then advance to the next stream.
michael@0 281 if (read == 0) {
michael@0 282 ++mCurrentStream;
michael@0 283 mStartedReadingCurrent = false;
michael@0 284 }
michael@0 285 else {
michael@0 286 NS_ASSERTION(aCount >= read, "Read more than requested");
michael@0 287 state.mOffset += read;
michael@0 288 aCount -= read;
michael@0 289 mStartedReadingCurrent = true;
michael@0 290 }
michael@0 291 }
michael@0 292
michael@0 293 // if we successfully read some data, then this call succeeded.
michael@0 294 *_retval = state.mOffset;
michael@0 295 return state.mOffset ? NS_OK : rv;
michael@0 296 }
michael@0 297
michael@0 298 NS_METHOD
michael@0 299 nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
michael@0 300 const char* aFromRawSegment,
michael@0 301 uint32_t aToOffset, uint32_t aCount,
michael@0 302 uint32_t *aWriteCount)
michael@0 303 {
michael@0 304 nsresult rv;
michael@0 305 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
michael@0 306 rv = (state->mWriter)(state->mThisStream,
michael@0 307 state->mClosure,
michael@0 308 aFromRawSegment,
michael@0 309 aToOffset + state->mOffset,
michael@0 310 aCount,
michael@0 311 aWriteCount);
michael@0 312 if (NS_FAILED(rv))
michael@0 313 state->mDone = true;
michael@0 314 return rv;
michael@0 315 }
michael@0 316
michael@0 317 /* readonly attribute boolean nonBlocking; */
michael@0 318 NS_IMETHODIMP
michael@0 319 nsMultiplexInputStream::IsNonBlocking(bool *aNonBlocking)
michael@0 320 {
michael@0 321 uint32_t len = mStreams.Length();
michael@0 322 if (len == 0) {
michael@0 323 // Claim to be non-blocking, since we won't block the caller.
michael@0 324 // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
michael@0 325 // so maybe we should claim to be blocking? It probably doesn't
michael@0 326 // matter in practice.
michael@0 327 *aNonBlocking = true;
michael@0 328 return NS_OK;
michael@0 329 }
michael@0 330 for (uint32_t i = 0; i < len; ++i) {
michael@0 331 nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
michael@0 332 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 333 return rv;
michael@0 334 // If one is non-blocking the entire stream becomes non-blocking
michael@0 335 // (except that we don't implement nsIAsyncInputStream, so there's
michael@0 336 // not much for the caller to do if Read returns "would block")
michael@0 337 if (*aNonBlocking)
michael@0 338 return NS_OK;
michael@0 339 }
michael@0 340 return NS_OK;
michael@0 341 }
michael@0 342
michael@0 343 /* void seek (in int32_t whence, in int32_t offset); */
michael@0 344 NS_IMETHODIMP
michael@0 345 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
michael@0 346 {
michael@0 347 if (NS_FAILED(mStatus))
michael@0 348 return mStatus;
michael@0 349
michael@0 350 nsresult rv;
michael@0 351
michael@0 352 uint32_t oldCurrentStream = mCurrentStream;
michael@0 353 bool oldStartedReadingCurrent = mStartedReadingCurrent;
michael@0 354
michael@0 355 if (aWhence == NS_SEEK_SET) {
michael@0 356 int64_t remaining = aOffset;
michael@0 357 if (aOffset == 0) {
michael@0 358 mCurrentStream = 0;
michael@0 359 }
michael@0 360 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
michael@0 361 nsCOMPtr<nsISeekableStream> stream =
michael@0 362 do_QueryInterface(mStreams[i]);
michael@0 363 if (!stream) {
michael@0 364 return NS_ERROR_FAILURE;
michael@0 365 }
michael@0 366
michael@0 367 // See if all remaining streams should be rewound
michael@0 368 if (remaining == 0) {
michael@0 369 if (i < oldCurrentStream ||
michael@0 370 (i == oldCurrentStream && oldStartedReadingCurrent)) {
michael@0 371 rv = stream->Seek(NS_SEEK_SET, 0);
michael@0 372 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 373 return rv;
michael@0 374 continue;
michael@0 375 }
michael@0 376 else {
michael@0 377 break;
michael@0 378 }
michael@0 379 }
michael@0 380
michael@0 381 // Get position in current stream
michael@0 382 int64_t streamPos;
michael@0 383 if (i > oldCurrentStream ||
michael@0 384 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
michael@0 385 streamPos = 0;
michael@0 386 }
michael@0 387 else {
michael@0 388 rv = stream->Tell(&streamPos);
michael@0 389 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 390 return rv;
michael@0 391 }
michael@0 392
michael@0 393 // See if we need to seek current stream forward or backward
michael@0 394 if (remaining < streamPos) {
michael@0 395 rv = stream->Seek(NS_SEEK_SET, remaining);
michael@0 396 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 397 return rv;
michael@0 398
michael@0 399 mCurrentStream = i;
michael@0 400 mStartedReadingCurrent = remaining != 0;
michael@0 401
michael@0 402 remaining = 0;
michael@0 403 }
michael@0 404 else if (remaining > streamPos) {
michael@0 405 if (i < oldCurrentStream) {
michael@0 406 // We're already at end so no need to seek this stream
michael@0 407 remaining -= streamPos;
michael@0 408 NS_ASSERTION(remaining >= 0, "Remaining invalid");
michael@0 409 }
michael@0 410 else {
michael@0 411 uint64_t avail;
michael@0 412 rv = mStreams[i]->Available(&avail);
michael@0 413 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 414 return rv;
michael@0 415
michael@0 416 int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
michael@0 417
michael@0 418 rv = stream->Seek(NS_SEEK_SET, newPos);
michael@0 419 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 420 return rv;
michael@0 421
michael@0 422 mCurrentStream = i;
michael@0 423 mStartedReadingCurrent = true;
michael@0 424
michael@0 425 remaining -= newPos;
michael@0 426 NS_ASSERTION(remaining >= 0, "Remaining invalid");
michael@0 427 }
michael@0 428 }
michael@0 429 else {
michael@0 430 NS_ASSERTION(remaining == streamPos, "Huh?");
michael@0 431 remaining = 0;
michael@0 432 }
michael@0 433 }
michael@0 434
michael@0 435 return NS_OK;
michael@0 436 }
michael@0 437
michael@0 438 if (aWhence == NS_SEEK_CUR && aOffset > 0) {
michael@0 439 int64_t remaining = aOffset;
michael@0 440 for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
michael@0 441 nsCOMPtr<nsISeekableStream> stream =
michael@0 442 do_QueryInterface(mStreams[i]);
michael@0 443
michael@0 444 uint64_t avail;
michael@0 445 rv = mStreams[i]->Available(&avail);
michael@0 446 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 447 return rv;
michael@0 448
michael@0 449 int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
michael@0 450
michael@0 451 rv = stream->Seek(NS_SEEK_CUR, seek);
michael@0 452 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 453 return rv;
michael@0 454
michael@0 455 mCurrentStream = i;
michael@0 456 mStartedReadingCurrent = true;
michael@0 457
michael@0 458 remaining -= seek;
michael@0 459 }
michael@0 460
michael@0 461 return NS_OK;
michael@0 462 }
michael@0 463
michael@0 464 if (aWhence == NS_SEEK_CUR && aOffset < 0) {
michael@0 465 int64_t remaining = -aOffset;
michael@0 466 for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
michael@0 467 nsCOMPtr<nsISeekableStream> stream =
michael@0 468 do_QueryInterface(mStreams[i]);
michael@0 469
michael@0 470 int64_t pos;
michael@0 471 rv = stream->Tell(&pos);
michael@0 472 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 473 return rv;
michael@0 474
michael@0 475 int64_t seek = XPCOM_MIN(pos, remaining);
michael@0 476
michael@0 477 rv = stream->Seek(NS_SEEK_CUR, -seek);
michael@0 478 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 479 return rv;
michael@0 480
michael@0 481 mCurrentStream = i;
michael@0 482 mStartedReadingCurrent = seek != -pos;
michael@0 483
michael@0 484 remaining -= seek;
michael@0 485 }
michael@0 486
michael@0 487 return NS_OK;
michael@0 488 }
michael@0 489
michael@0 490 if (aWhence == NS_SEEK_CUR) {
michael@0 491 NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
michael@0 492
michael@0 493 return NS_OK;
michael@0 494 }
michael@0 495
michael@0 496 if (aWhence == NS_SEEK_END) {
michael@0 497 if (aOffset > 0) {
michael@0 498 return NS_ERROR_INVALID_ARG;
michael@0 499 }
michael@0 500 int64_t remaining = aOffset;
michael@0 501 for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
michael@0 502 nsCOMPtr<nsISeekableStream> stream =
michael@0 503 do_QueryInterface(mStreams[i]);
michael@0 504
michael@0 505 // See if all remaining streams should be seeked to end
michael@0 506 if (remaining == 0) {
michael@0 507 if (i >= oldCurrentStream) {
michael@0 508 rv = stream->Seek(NS_SEEK_END, 0);
michael@0 509 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 510 return rv;
michael@0 511 }
michael@0 512 else {
michael@0 513 break;
michael@0 514 }
michael@0 515 }
michael@0 516
michael@0 517 // Get position in current stream
michael@0 518 int64_t streamPos;
michael@0 519 if (i < oldCurrentStream) {
michael@0 520 streamPos = 0;
michael@0 521 } else {
michael@0 522 uint64_t avail;
michael@0 523 rv = mStreams[i]->Available(&avail);
michael@0 524 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 525 return rv;
michael@0 526
michael@0 527 streamPos = avail;
michael@0 528 }
michael@0 529
michael@0 530 // See if we have enough data in the current stream.
michael@0 531 if (DeprecatedAbs(remaining) < streamPos) {
michael@0 532 rv = stream->Seek(NS_SEEK_END, remaining);
michael@0 533 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 534 return rv;
michael@0 535
michael@0 536 mCurrentStream = i;
michael@0 537 mStartedReadingCurrent = true;
michael@0 538
michael@0 539 remaining = 0;
michael@0 540 } else if (DeprecatedAbs(remaining) > streamPos) {
michael@0 541 if (i > oldCurrentStream ||
michael@0 542 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
michael@0 543 // We're already at start so no need to seek this stream
michael@0 544 remaining += streamPos;
michael@0 545 } else {
michael@0 546 int64_t avail;
michael@0 547 rv = stream->Tell(&avail);
michael@0 548 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 549 return rv;
michael@0 550
michael@0 551 int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
michael@0 552
michael@0 553 rv = stream->Seek(NS_SEEK_END, -newPos);
michael@0 554 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 555 return rv;
michael@0 556
michael@0 557 mCurrentStream = i;
michael@0 558 mStartedReadingCurrent = true;
michael@0 559
michael@0 560 remaining += newPos;
michael@0 561 }
michael@0 562 }
michael@0 563 else {
michael@0 564 NS_ASSERTION(remaining == streamPos, "Huh?");
michael@0 565 remaining = 0;
michael@0 566 }
michael@0 567 }
michael@0 568
michael@0 569 return NS_OK;
michael@0 570 }
michael@0 571
michael@0 572 // other Seeks not implemented yet
michael@0 573 return NS_ERROR_NOT_IMPLEMENTED;
michael@0 574 }
michael@0 575
michael@0 576 /* uint32_t tell (); */
michael@0 577 NS_IMETHODIMP
michael@0 578 nsMultiplexInputStream::Tell(int64_t *_retval)
michael@0 579 {
michael@0 580 if (NS_FAILED(mStatus))
michael@0 581 return mStatus;
michael@0 582
michael@0 583 nsresult rv;
michael@0 584 int64_t ret64 = 0;
michael@0 585 uint32_t i, last;
michael@0 586 last = mStartedReadingCurrent ? mCurrentStream+1 : mCurrentStream;
michael@0 587 for (i = 0; i < last; ++i) {
michael@0 588 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
michael@0 589 if (NS_WARN_IF(!stream))
michael@0 590 return NS_ERROR_NO_INTERFACE;
michael@0 591
michael@0 592 int64_t pos;
michael@0 593 rv = stream->Tell(&pos);
michael@0 594 if (NS_WARN_IF(NS_FAILED(rv)))
michael@0 595 return rv;
michael@0 596 ret64 += pos;
michael@0 597 }
michael@0 598 *_retval = ret64;
michael@0 599
michael@0 600 return NS_OK;
michael@0 601 }
michael@0 602
michael@0 603 /* void setEOF (); */
michael@0 604 NS_IMETHODIMP
michael@0 605 nsMultiplexInputStream::SetEOF()
michael@0 606 {
michael@0 607 return NS_ERROR_NOT_IMPLEMENTED;
michael@0 608 }
michael@0 609
michael@0 610 nsresult
michael@0 611 nsMultiplexInputStreamConstructor(nsISupports *outer,
michael@0 612 REFNSIID iid,
michael@0 613 void **result)
michael@0 614 {
michael@0 615 *result = nullptr;
michael@0 616
michael@0 617 if (outer)
michael@0 618 return NS_ERROR_NO_AGGREGATION;
michael@0 619
michael@0 620 nsMultiplexInputStream *inst = new nsMultiplexInputStream();
michael@0 621 if (!inst)
michael@0 622 return NS_ERROR_OUT_OF_MEMORY;
michael@0 623
michael@0 624 NS_ADDREF(inst);
michael@0 625 nsresult rv = inst->QueryInterface(iid, result);
michael@0 626 NS_RELEASE(inst);
michael@0 627
michael@0 628 return rv;
michael@0 629 }
michael@0 630
michael@0 631 void
michael@0 632 nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
michael@0 633 FileDescriptorArray& aFileDescriptors)
michael@0 634 {
michael@0 635 MultiplexInputStreamParams params;
michael@0 636
michael@0 637 uint32_t streamCount = mStreams.Length();
michael@0 638
michael@0 639 if (streamCount) {
michael@0 640 InfallibleTArray<InputStreamParams>& streams = params.streams();
michael@0 641
michael@0 642 streams.SetCapacity(streamCount);
michael@0 643 for (uint32_t index = 0; index < streamCount; index++) {
michael@0 644 InputStreamParams childStreamParams;
michael@0 645 SerializeInputStream(mStreams[index], childStreamParams,
michael@0 646 aFileDescriptors);
michael@0 647
michael@0 648 streams.AppendElement(childStreamParams);
michael@0 649 }
michael@0 650 }
michael@0 651
michael@0 652 params.currentStream() = mCurrentStream;
michael@0 653 params.status() = mStatus;
michael@0 654 params.startedReadingCurrent() = mStartedReadingCurrent;
michael@0 655
michael@0 656 aParams = params;
michael@0 657 }
michael@0 658
michael@0 659 bool
michael@0 660 nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
michael@0 661 const FileDescriptorArray& aFileDescriptors)
michael@0 662 {
michael@0 663 if (aParams.type() !=
michael@0 664 InputStreamParams::TMultiplexInputStreamParams) {
michael@0 665 NS_ERROR("Received unknown parameters from the other process!");
michael@0 666 return false;
michael@0 667 }
michael@0 668
michael@0 669 const MultiplexInputStreamParams& params =
michael@0 670 aParams.get_MultiplexInputStreamParams();
michael@0 671
michael@0 672 const InfallibleTArray<InputStreamParams>& streams = params.streams();
michael@0 673
michael@0 674 uint32_t streamCount = streams.Length();
michael@0 675 for (uint32_t index = 0; index < streamCount; index++) {
michael@0 676 nsCOMPtr<nsIInputStream> stream =
michael@0 677 DeserializeInputStream(streams[index], aFileDescriptors);
michael@0 678 if (!stream) {
michael@0 679 NS_WARNING("Deserialize failed!");
michael@0 680 return false;
michael@0 681 }
michael@0 682
michael@0 683 if (NS_FAILED(AppendStream(stream))) {
michael@0 684 NS_WARNING("AppendStream failed!");
michael@0 685 return false;
michael@0 686 }
michael@0 687 }
michael@0 688
michael@0 689 mCurrentStream = params.currentStream();
michael@0 690 mStatus = params.status();
michael@0 691 mStartedReadingCurrent = params.startedReadingCurrent();
michael@0 692
michael@0 693 return true;
michael@0 694 }

mercurial