xpcom/io/nsMultiplexInputStream.cpp

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/xpcom/io/nsMultiplexInputStream.cpp	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,694 @@
     1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
     1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this
     1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     1.8 +
     1.9 +/**
    1.10 + * The multiplex stream concatenates a list of input streams into a single
    1.11 + * stream.
    1.12 + */
    1.13 +
    1.14 +#include "mozilla/Attributes.h"
    1.15 +#include "mozilla/MathAlgorithms.h"
    1.16 +
    1.17 +#include "base/basictypes.h"
    1.18 +
    1.19 +#include "nsMultiplexInputStream.h"
    1.20 +#include "nsIMultiplexInputStream.h"
    1.21 +#include "nsISeekableStream.h"
    1.22 +#include "nsCOMPtr.h"
    1.23 +#include "nsCOMArray.h"
    1.24 +#include "nsIClassInfoImpl.h"
    1.25 +#include "nsIIPCSerializableInputStream.h"
    1.26 +#include "mozilla/ipc/InputStreamUtils.h"
    1.27 +
    1.28 +using namespace mozilla::ipc;
    1.29 +
    1.30 +using mozilla::DeprecatedAbs;
    1.31 +
    1.32 +class nsMultiplexInputStream MOZ_FINAL : public nsIMultiplexInputStream,
    1.33 +                                         public nsISeekableStream,
    1.34 +                                         public nsIIPCSerializableInputStream
    1.35 +{
    1.36 +public:
    1.37 +    nsMultiplexInputStream();
    1.38 +
    1.39 +    NS_DECL_THREADSAFE_ISUPPORTS
    1.40 +    NS_DECL_NSIINPUTSTREAM
    1.41 +    NS_DECL_NSIMULTIPLEXINPUTSTREAM
    1.42 +    NS_DECL_NSISEEKABLESTREAM
    1.43 +    NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
    1.44 +
    1.45 +private:
    1.46 +    ~nsMultiplexInputStream() {}
    1.47 +
    1.48 +    struct ReadSegmentsState {
    1.49 +        nsIInputStream* mThisStream;
    1.50 +        uint32_t mOffset;
    1.51 +        nsWriteSegmentFun mWriter;
    1.52 +        void* mClosure;
    1.53 +        bool mDone;
    1.54 +    };
    1.55 +
    1.56 +    static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure,
    1.57 +                               const char* aFromRawSegment, uint32_t aToOffset,
    1.58 +                               uint32_t aCount, uint32_t *aWriteCount);
    1.59 +    
    1.60 +    nsTArray<nsCOMPtr<nsIInputStream> > mStreams;
    1.61 +    uint32_t mCurrentStream;
    1.62 +    bool mStartedReadingCurrent;
    1.63 +    nsresult mStatus;
    1.64 +};
    1.65 +
    1.66 +NS_IMPL_ADDREF(nsMultiplexInputStream)
    1.67 +NS_IMPL_RELEASE(nsMultiplexInputStream)
    1.68 +
    1.69 +NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
    1.70 +                  NS_MULTIPLEXINPUTSTREAM_CID)
    1.71 +
    1.72 +NS_IMPL_QUERY_INTERFACE_CI(nsMultiplexInputStream,
    1.73 +                           nsIMultiplexInputStream,
    1.74 +                           nsIInputStream,
    1.75 +                           nsISeekableStream,
    1.76 +                           nsIIPCSerializableInputStream)
    1.77 +NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
    1.78 +                            nsIMultiplexInputStream,
    1.79 +                            nsIInputStream,
    1.80 +                            nsISeekableStream)
    1.81 +
    1.82 +nsMultiplexInputStream::nsMultiplexInputStream()
    1.83 +    : mCurrentStream(0),
    1.84 +      mStartedReadingCurrent(false),
    1.85 +      mStatus(NS_OK)
    1.86 +{
    1.87 +}
    1.88 +
    1.89 +/* readonly attribute unsigned long count; */
    1.90 +NS_IMETHODIMP
    1.91 +nsMultiplexInputStream::GetCount(uint32_t *aCount)
    1.92 +{
    1.93 +    *aCount = mStreams.Length();
    1.94 +    return NS_OK;
    1.95 +}
    1.96 +
    1.97 +#ifdef DEBUG
    1.98 +static bool
    1.99 +SeekableStreamAtBeginning(nsIInputStream *aStream)
   1.100 +{
   1.101 +    int64_t streamPos;
   1.102 +    nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(aStream);
   1.103 +    if (stream && NS_SUCCEEDED(stream->Tell(&streamPos)) && streamPos != 0) {
   1.104 +        return false;
   1.105 +    }
   1.106 +    return true;
   1.107 +}
   1.108 +#endif
   1.109 +
   1.110 +/* void appendStream (in nsIInputStream stream); */
   1.111 +NS_IMETHODIMP
   1.112 +nsMultiplexInputStream::AppendStream(nsIInputStream *aStream)
   1.113 +{
   1.114 +    NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Appended stream not at beginning.");
   1.115 +    return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
   1.116 +}
   1.117 +
   1.118 +/* void insertStream (in nsIInputStream stream, in unsigned long index); */
   1.119 +NS_IMETHODIMP
   1.120 +nsMultiplexInputStream::InsertStream(nsIInputStream *aStream, uint32_t aIndex)
   1.121 +{
   1.122 +    NS_ASSERTION(SeekableStreamAtBeginning(aStream), "Inserted stream not at beginning.");
   1.123 +    mStreams.InsertElementAt(aIndex, aStream);
   1.124 +    if (mCurrentStream > aIndex ||
   1.125 +        (mCurrentStream == aIndex && mStartedReadingCurrent))
   1.126 +        ++mCurrentStream;
   1.127 +    return NS_OK;
   1.128 +}
   1.129 +
   1.130 +/* void removeStream (in unsigned long index); */
   1.131 +NS_IMETHODIMP
   1.132 +nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
   1.133 +{
   1.134 +    mStreams.RemoveElementAt(aIndex);
   1.135 +    if (mCurrentStream > aIndex)
   1.136 +        --mCurrentStream;
   1.137 +    else if (mCurrentStream == aIndex)
   1.138 +        mStartedReadingCurrent = false;
   1.139 +
   1.140 +    return NS_OK;
   1.141 +}
   1.142 +
   1.143 +/* nsIInputStream getStream (in unsigned long index); */
   1.144 +NS_IMETHODIMP
   1.145 +nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream **_retval)
   1.146 +{
   1.147 +    *_retval = mStreams.SafeElementAt(aIndex, nullptr);
   1.148 +    if (NS_WARN_IF(!*_retval))
   1.149 +        return NS_ERROR_NOT_AVAILABLE;
   1.150 +
   1.151 +    NS_ADDREF(*_retval);
   1.152 +    return NS_OK;
   1.153 +}
   1.154 +
   1.155 +/* void close (); */
   1.156 +NS_IMETHODIMP
   1.157 +nsMultiplexInputStream::Close()
   1.158 +{
   1.159 +    mStatus = NS_BASE_STREAM_CLOSED;
   1.160 +
   1.161 +    nsresult rv = NS_OK;
   1.162 +
   1.163 +    uint32_t len = mStreams.Length();
   1.164 +    for (uint32_t i = 0; i < len; ++i) {
   1.165 +        nsresult rv2 = mStreams[i]->Close();
   1.166 +        // We still want to close all streams, but we should return an error
   1.167 +        if (NS_FAILED(rv2))
   1.168 +            rv = rv2;
   1.169 +    }
   1.170 +    return rv;
   1.171 +}
   1.172 +
   1.173 +/* unsigned long long available (); */
   1.174 +NS_IMETHODIMP
   1.175 +nsMultiplexInputStream::Available(uint64_t *_retval)
   1.176 +{
   1.177 +    if (NS_FAILED(mStatus))
   1.178 +        return mStatus;
   1.179 +
   1.180 +    nsresult rv;
   1.181 +    uint64_t avail = 0;
   1.182 +
   1.183 +    uint32_t len = mStreams.Length();
   1.184 +    for (uint32_t i = mCurrentStream; i < len; i++) {
   1.185 +        uint64_t streamAvail;
   1.186 +        rv = mStreams[i]->Available(&streamAvail);
   1.187 +        if (NS_WARN_IF(NS_FAILED(rv)))
   1.188 +            return rv;
   1.189 +        avail += streamAvail;
   1.190 +    }
   1.191 +    *_retval = avail;
   1.192 +    return NS_OK;
   1.193 +}
   1.194 +
   1.195 +/* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
   1.196 +NS_IMETHODIMP
   1.197 +nsMultiplexInputStream::Read(char * aBuf, uint32_t aCount, uint32_t *_retval)
   1.198 +{
   1.199 +    // It is tempting to implement this method in terms of ReadSegments, but
   1.200 +    // that would prevent this class from being used with streams that only
   1.201 +    // implement Read (e.g., file streams).
   1.202 + 
   1.203 +    *_retval = 0;
   1.204 +
   1.205 +    if (mStatus == NS_BASE_STREAM_CLOSED)
   1.206 +        return NS_OK;
   1.207 +    if (NS_FAILED(mStatus))
   1.208 +        return mStatus;
   1.209 + 
   1.210 +    nsresult rv = NS_OK;
   1.211 +
   1.212 +    uint32_t len = mStreams.Length();
   1.213 +    while (mCurrentStream < len && aCount) {
   1.214 +        uint32_t read;
   1.215 +        rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
   1.216 +
   1.217 +        // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
   1.218 +        // (This is a bug in those stream implementations)
   1.219 +        if (rv == NS_BASE_STREAM_CLOSED) {
   1.220 +            NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
   1.221 +            rv = NS_OK;
   1.222 +            read = 0;
   1.223 +        }
   1.224 +        else if (NS_FAILED(rv))
   1.225 +            break;
   1.226 +
   1.227 +        if (read == 0) {
   1.228 +            ++mCurrentStream;
   1.229 +            mStartedReadingCurrent = false;
   1.230 +        }
   1.231 +        else {
   1.232 +            NS_ASSERTION(aCount >= read, "Read more than requested");
   1.233 +            *_retval += read;
   1.234 +            aCount -= read;
   1.235 +            aBuf += read;
   1.236 +            mStartedReadingCurrent = true;
   1.237 +        }
   1.238 +    }
   1.239 +    return *_retval ? NS_OK : rv;
   1.240 +}
   1.241 +
   1.242 +/* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
   1.243 + *                                        in voidPtr closure,
   1.244 + *                                        in unsigned long count); */
   1.245 +NS_IMETHODIMP
   1.246 +nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void *aClosure,
   1.247 +                                     uint32_t aCount, uint32_t *_retval)
   1.248 +{
   1.249 +    if (mStatus == NS_BASE_STREAM_CLOSED) {
   1.250 +        *_retval = 0;
   1.251 +        return NS_OK;
   1.252 +    }
   1.253 +    if (NS_FAILED(mStatus))
   1.254 +        return mStatus;
   1.255 +
   1.256 +    NS_ASSERTION(aWriter, "missing aWriter");
   1.257 +
   1.258 +    nsresult rv = NS_OK;
   1.259 +    ReadSegmentsState state;
   1.260 +    state.mThisStream = this;
   1.261 +    state.mOffset = 0;
   1.262 +    state.mWriter = aWriter;
   1.263 +    state.mClosure = aClosure;
   1.264 +    state.mDone = false;
   1.265 +    
   1.266 +    uint32_t len = mStreams.Length();
   1.267 +    while (mCurrentStream < len && aCount) {
   1.268 +        uint32_t read;
   1.269 +        rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
   1.270 +
   1.271 +        // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
   1.272 +        // (This is a bug in those stream implementations)
   1.273 +        if (rv == NS_BASE_STREAM_CLOSED) {
   1.274 +            NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
   1.275 +            rv = NS_OK;
   1.276 +            read = 0;
   1.277 +        }
   1.278 +
   1.279 +        // if |aWriter| decided to stop reading segments...
   1.280 +        if (state.mDone || NS_FAILED(rv))
   1.281 +            break;
   1.282 +
   1.283 +        // if stream is empty, then advance to the next stream.
   1.284 +        if (read == 0) {
   1.285 +            ++mCurrentStream;
   1.286 +            mStartedReadingCurrent = false;
   1.287 +        }
   1.288 +        else {
   1.289 +            NS_ASSERTION(aCount >= read, "Read more than requested");
   1.290 +            state.mOffset += read;
   1.291 +            aCount -= read;
   1.292 +            mStartedReadingCurrent = true;
   1.293 +        }
   1.294 +    }
   1.295 +
   1.296 +    // if we successfully read some data, then this call succeeded.
   1.297 +    *_retval = state.mOffset;
   1.298 +    return state.mOffset ? NS_OK : rv;
   1.299 +}
   1.300 +
   1.301 +NS_METHOD
   1.302 +nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
   1.303 +                                  const char* aFromRawSegment,
   1.304 +                                  uint32_t aToOffset, uint32_t aCount,
   1.305 +                                  uint32_t *aWriteCount)
   1.306 +{
   1.307 +    nsresult rv;
   1.308 +    ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
   1.309 +    rv = (state->mWriter)(state->mThisStream,
   1.310 +                          state->mClosure,
   1.311 +                          aFromRawSegment,
   1.312 +                          aToOffset + state->mOffset,
   1.313 +                          aCount,
   1.314 +                          aWriteCount);
   1.315 +    if (NS_FAILED(rv))
   1.316 +        state->mDone = true;
   1.317 +    return rv;
   1.318 +}
   1.319 +
   1.320 +/* readonly attribute boolean nonBlocking; */
   1.321 +NS_IMETHODIMP
   1.322 +nsMultiplexInputStream::IsNonBlocking(bool *aNonBlocking)
   1.323 +{
   1.324 +    uint32_t len = mStreams.Length();
   1.325 +    if (len == 0) {
   1.326 +        // Claim to be non-blocking, since we won't block the caller.
   1.327 +        // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
   1.328 +        // so maybe we should claim to be blocking?  It probably doesn't
   1.329 +        // matter in practice.
   1.330 +        *aNonBlocking = true;
   1.331 +        return NS_OK;
   1.332 +    }
   1.333 +    for (uint32_t i = 0; i < len; ++i) {
   1.334 +        nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
   1.335 +        if (NS_WARN_IF(NS_FAILED(rv)))
   1.336 +            return rv;
   1.337 +        // If one is non-blocking the entire stream becomes non-blocking
   1.338 +        // (except that we don't implement nsIAsyncInputStream, so there's
   1.339 +        //  not much for the caller to do if Read returns "would block")
   1.340 +        if (*aNonBlocking)
   1.341 +            return NS_OK;
   1.342 +    }
   1.343 +    return NS_OK;
   1.344 +}
   1.345 +
   1.346 +/* void seek (in int32_t whence, in int32_t offset); */
   1.347 +NS_IMETHODIMP
   1.348 +nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
   1.349 +{
   1.350 +    if (NS_FAILED(mStatus))
   1.351 +        return mStatus;
   1.352 +
   1.353 +    nsresult rv;
   1.354 +
   1.355 +    uint32_t oldCurrentStream = mCurrentStream;
   1.356 +    bool oldStartedReadingCurrent = mStartedReadingCurrent;
   1.357 +
   1.358 +    if (aWhence == NS_SEEK_SET) {
   1.359 +        int64_t remaining = aOffset;
   1.360 +        if (aOffset == 0) {
   1.361 +            mCurrentStream = 0;
   1.362 +        }
   1.363 +        for (uint32_t i = 0; i < mStreams.Length(); ++i) {
   1.364 +            nsCOMPtr<nsISeekableStream> stream =
   1.365 +                do_QueryInterface(mStreams[i]);
   1.366 +            if (!stream) {
   1.367 +              return NS_ERROR_FAILURE;
   1.368 +            }
   1.369 +
   1.370 +            // See if all remaining streams should be rewound
   1.371 +            if (remaining == 0) {
   1.372 +                if (i < oldCurrentStream ||
   1.373 +                    (i == oldCurrentStream && oldStartedReadingCurrent)) {
   1.374 +                    rv = stream->Seek(NS_SEEK_SET, 0);
   1.375 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.376 +                        return rv;
   1.377 +                    continue;
   1.378 +                }
   1.379 +                else {
   1.380 +                    break;
   1.381 +                }
   1.382 +            }
   1.383 +
   1.384 +            // Get position in current stream
   1.385 +            int64_t streamPos;
   1.386 +            if (i > oldCurrentStream ||
   1.387 +                (i == oldCurrentStream && !oldStartedReadingCurrent)) {
   1.388 +                streamPos = 0;
   1.389 +            }
   1.390 +            else {
   1.391 +                rv = stream->Tell(&streamPos);
   1.392 +                if (NS_WARN_IF(NS_FAILED(rv)))
   1.393 +                    return rv;
   1.394 +            }
   1.395 +
   1.396 +            // See if we need to seek current stream forward or backward
   1.397 +            if (remaining < streamPos) {
   1.398 +                rv = stream->Seek(NS_SEEK_SET, remaining);
   1.399 +                if (NS_WARN_IF(NS_FAILED(rv)))
   1.400 +                    return rv;
   1.401 +
   1.402 +                mCurrentStream = i;
   1.403 +                mStartedReadingCurrent = remaining != 0;
   1.404 +
   1.405 +                remaining = 0;
   1.406 +            }
   1.407 +            else if (remaining > streamPos) {
   1.408 +                if (i < oldCurrentStream) {
   1.409 +                    // We're already at end so no need to seek this stream
   1.410 +                    remaining -= streamPos;
   1.411 +                    NS_ASSERTION(remaining >= 0, "Remaining invalid");
   1.412 +                }
   1.413 +                else {
   1.414 +                    uint64_t avail;
   1.415 +                    rv = mStreams[i]->Available(&avail);
   1.416 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.417 +                        return rv;
   1.418 +
   1.419 +                    int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
   1.420 +
   1.421 +                    rv = stream->Seek(NS_SEEK_SET, newPos);
   1.422 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.423 +                        return rv;
   1.424 +
   1.425 +                    mCurrentStream = i;
   1.426 +                    mStartedReadingCurrent = true;
   1.427 +
   1.428 +                    remaining -= newPos;
   1.429 +                    NS_ASSERTION(remaining >= 0, "Remaining invalid");
   1.430 +                }
   1.431 +            }
   1.432 +            else {
   1.433 +                NS_ASSERTION(remaining == streamPos, "Huh?");
   1.434 +                remaining = 0;
   1.435 +            }
   1.436 +        }
   1.437 +
   1.438 +        return NS_OK;
   1.439 +    }
   1.440 +
   1.441 +    if (aWhence == NS_SEEK_CUR && aOffset > 0) {
   1.442 +        int64_t remaining = aOffset;
   1.443 +        for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
   1.444 +            nsCOMPtr<nsISeekableStream> stream =
   1.445 +                do_QueryInterface(mStreams[i]);
   1.446 +
   1.447 +            uint64_t avail;
   1.448 +            rv = mStreams[i]->Available(&avail);
   1.449 +            if (NS_WARN_IF(NS_FAILED(rv)))
   1.450 +                return rv;
   1.451 +
   1.452 +            int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
   1.453 +
   1.454 +            rv = stream->Seek(NS_SEEK_CUR, seek);
   1.455 +            if (NS_WARN_IF(NS_FAILED(rv)))
   1.456 +                return rv;
   1.457 +
   1.458 +            mCurrentStream = i;
   1.459 +            mStartedReadingCurrent = true;
   1.460 +
   1.461 +            remaining -= seek;
   1.462 +        }
   1.463 +
   1.464 +        return NS_OK;
   1.465 +    }
   1.466 +
   1.467 +    if (aWhence == NS_SEEK_CUR && aOffset < 0) {
   1.468 +        int64_t remaining = -aOffset;
   1.469 +        for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
   1.470 +            nsCOMPtr<nsISeekableStream> stream =
   1.471 +                do_QueryInterface(mStreams[i]);
   1.472 +
   1.473 +            int64_t pos;
   1.474 +            rv = stream->Tell(&pos);
   1.475 +            if (NS_WARN_IF(NS_FAILED(rv)))
   1.476 +                return rv;
   1.477 +
   1.478 +            int64_t seek = XPCOM_MIN(pos, remaining);
   1.479 +
   1.480 +            rv = stream->Seek(NS_SEEK_CUR, -seek);
   1.481 +            if (NS_WARN_IF(NS_FAILED(rv)))
   1.482 +                return rv;
   1.483 +
   1.484 +            mCurrentStream = i;
   1.485 +            mStartedReadingCurrent = seek != -pos;
   1.486 +
   1.487 +            remaining -= seek;
   1.488 +        }
   1.489 +
   1.490 +        return NS_OK;
   1.491 +    }
   1.492 +
   1.493 +    if (aWhence == NS_SEEK_CUR) {
   1.494 +        NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
   1.495 +
   1.496 +        return NS_OK;
   1.497 +    }
   1.498 +
   1.499 +    if (aWhence == NS_SEEK_END) {
   1.500 +        if (aOffset > 0) {
   1.501 +          return NS_ERROR_INVALID_ARG;
   1.502 +        }
   1.503 +        int64_t remaining = aOffset;
   1.504 +        for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
   1.505 +            nsCOMPtr<nsISeekableStream> stream =
   1.506 +                do_QueryInterface(mStreams[i]);
   1.507 +
   1.508 +            // See if all remaining streams should be seeked to end
   1.509 +            if (remaining == 0) {
   1.510 +                if (i >= oldCurrentStream) {
   1.511 +                    rv = stream->Seek(NS_SEEK_END, 0);
   1.512 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.513 +                        return rv;
   1.514 +                }
   1.515 +                else {
   1.516 +                    break;
   1.517 +                }
   1.518 +            }
   1.519 +
   1.520 +            // Get position in current stream
   1.521 +            int64_t streamPos;
   1.522 +            if (i < oldCurrentStream) {
   1.523 +                streamPos = 0;
   1.524 +            } else {
   1.525 +                uint64_t avail;
   1.526 +                rv = mStreams[i]->Available(&avail);
   1.527 +                if (NS_WARN_IF(NS_FAILED(rv)))
   1.528 +                    return rv;
   1.529 +
   1.530 +                streamPos = avail;
   1.531 +            }
   1.532 +
   1.533 +            // See if we have enough data in the current stream.
   1.534 +            if (DeprecatedAbs(remaining) < streamPos) {
   1.535 +                rv = stream->Seek(NS_SEEK_END, remaining);
   1.536 +                if (NS_WARN_IF(NS_FAILED(rv)))
   1.537 +                    return rv;
   1.538 +
   1.539 +                mCurrentStream = i;
   1.540 +                mStartedReadingCurrent = true;
   1.541 +
   1.542 +                remaining = 0;
   1.543 +            } else if (DeprecatedAbs(remaining) > streamPos) {
   1.544 +                if (i > oldCurrentStream ||
   1.545 +                    (i == oldCurrentStream && !oldStartedReadingCurrent)) {
   1.546 +                    // We're already at start so no need to seek this stream
   1.547 +                    remaining += streamPos;
   1.548 +                } else {
   1.549 +                    int64_t avail;
   1.550 +                    rv = stream->Tell(&avail);
   1.551 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.552 +                        return rv;
   1.553 +
   1.554 +                    int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
   1.555 +
   1.556 +                    rv = stream->Seek(NS_SEEK_END, -newPos);
   1.557 +                    if (NS_WARN_IF(NS_FAILED(rv)))
   1.558 +                        return rv;
   1.559 +
   1.560 +                    mCurrentStream = i;
   1.561 +                    mStartedReadingCurrent = true;
   1.562 +
   1.563 +                    remaining += newPos;
   1.564 +                }
   1.565 +            }
   1.566 +            else {
   1.567 +                NS_ASSERTION(remaining == streamPos, "Huh?");
   1.568 +                remaining = 0;
   1.569 +            }
   1.570 +        }
   1.571 +
   1.572 +        return NS_OK;
   1.573 +    }
   1.574 +
   1.575 +    // other Seeks not implemented yet
   1.576 +    return NS_ERROR_NOT_IMPLEMENTED;
   1.577 +}
   1.578 +
   1.579 +/* uint32_t tell (); */
   1.580 +NS_IMETHODIMP
   1.581 +nsMultiplexInputStream::Tell(int64_t *_retval)
   1.582 +{
   1.583 +    if (NS_FAILED(mStatus))
   1.584 +        return mStatus;
   1.585 +
   1.586 +    nsresult rv;
   1.587 +    int64_t ret64 = 0;
   1.588 +    uint32_t i, last;
   1.589 +    last = mStartedReadingCurrent ? mCurrentStream+1 : mCurrentStream;
   1.590 +    for (i = 0; i < last; ++i) {
   1.591 +        nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
   1.592 +        if (NS_WARN_IF(!stream))
   1.593 +            return NS_ERROR_NO_INTERFACE;
   1.594 +
   1.595 +        int64_t pos;
   1.596 +        rv = stream->Tell(&pos);
   1.597 +        if (NS_WARN_IF(NS_FAILED(rv)))
   1.598 +            return rv;
   1.599 +        ret64 += pos;
   1.600 +    }
   1.601 +    *_retval =  ret64;
   1.602 +
   1.603 +    return NS_OK;
   1.604 +}
   1.605 +
   1.606 +/* void setEOF (); */
   1.607 +NS_IMETHODIMP
   1.608 +nsMultiplexInputStream::SetEOF()
   1.609 +{
   1.610 +    return NS_ERROR_NOT_IMPLEMENTED;
   1.611 +}
   1.612 +
   1.613 +nsresult
   1.614 +nsMultiplexInputStreamConstructor(nsISupports *outer,
   1.615 +                                  REFNSIID iid,
   1.616 +                                  void **result)
   1.617 +{
   1.618 +    *result = nullptr;
   1.619 +
   1.620 +    if (outer)
   1.621 +        return NS_ERROR_NO_AGGREGATION;
   1.622 +
   1.623 +    nsMultiplexInputStream *inst = new nsMultiplexInputStream();
   1.624 +    if (!inst)
   1.625 +        return NS_ERROR_OUT_OF_MEMORY;
   1.626 +
   1.627 +    NS_ADDREF(inst);
   1.628 +    nsresult rv = inst->QueryInterface(iid, result);
   1.629 +    NS_RELEASE(inst);
   1.630 +
   1.631 +    return rv;
   1.632 +}
   1.633 +
   1.634 +void
   1.635 +nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
   1.636 +                                  FileDescriptorArray& aFileDescriptors)
   1.637 +{
   1.638 +    MultiplexInputStreamParams params;
   1.639 +
   1.640 +    uint32_t streamCount = mStreams.Length();
   1.641 +
   1.642 +    if (streamCount) {
   1.643 +        InfallibleTArray<InputStreamParams>& streams = params.streams();
   1.644 +
   1.645 +        streams.SetCapacity(streamCount);
   1.646 +        for (uint32_t index = 0; index < streamCount; index++) {
   1.647 +            InputStreamParams childStreamParams;
   1.648 +            SerializeInputStream(mStreams[index], childStreamParams,
   1.649 +                                 aFileDescriptors);
   1.650 +
   1.651 +            streams.AppendElement(childStreamParams);
   1.652 +        }
   1.653 +    }
   1.654 +
   1.655 +    params.currentStream() = mCurrentStream;
   1.656 +    params.status() = mStatus;
   1.657 +    params.startedReadingCurrent() = mStartedReadingCurrent;
   1.658 +
   1.659 +    aParams = params;
   1.660 +}
   1.661 +
   1.662 +bool
   1.663 +nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
   1.664 +                                    const FileDescriptorArray& aFileDescriptors)
   1.665 +{
   1.666 +    if (aParams.type() !=
   1.667 +            InputStreamParams::TMultiplexInputStreamParams) {
   1.668 +        NS_ERROR("Received unknown parameters from the other process!");
   1.669 +        return false;
   1.670 +    }
   1.671 +
   1.672 +    const MultiplexInputStreamParams& params =
   1.673 +        aParams.get_MultiplexInputStreamParams();
   1.674 +
   1.675 +    const InfallibleTArray<InputStreamParams>& streams = params.streams();
   1.676 +
   1.677 +    uint32_t streamCount = streams.Length();
   1.678 +    for (uint32_t index = 0; index < streamCount; index++) {
   1.679 +        nsCOMPtr<nsIInputStream> stream =
   1.680 +            DeserializeInputStream(streams[index], aFileDescriptors);
   1.681 +        if (!stream) {
   1.682 +            NS_WARNING("Deserialize failed!");
   1.683 +            return false;
   1.684 +        }
   1.685 +
   1.686 +        if (NS_FAILED(AppendStream(stream))) {
   1.687 +            NS_WARNING("AppendStream failed!");
   1.688 +            return false;
   1.689 +        }
   1.690 +    }
   1.691 +
   1.692 +    mCurrentStream = params.currentStream();
   1.693 +    mStatus = params.status();
   1.694 +    mStartedReadingCurrent = params.startedReadingCurrent();
   1.695 +
   1.696 +    return true;
   1.697 +}

mercurial