netwerk/base/src/nsInputStreamPump.cpp

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
michael@0 2 /* vim:set ts=4 sts=4 sw=4 et cin: */
michael@0 3 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 4 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 6
michael@0 7 #include "nsIOService.h"
michael@0 8 #include "nsInputStreamPump.h"
michael@0 9 #include "nsIStreamTransportService.h"
michael@0 10 #include "nsISeekableStream.h"
michael@0 11 #include "nsITransport.h"
michael@0 12 #include "nsIThreadRetargetableStreamListener.h"
michael@0 13 #include "nsThreadUtils.h"
michael@0 14 #include "nsCOMPtr.h"
michael@0 15 #include "prlog.h"
michael@0 16 #include "GeckoProfiler.h"
michael@0 17 #include "nsIStreamListener.h"
michael@0 18 #include "nsILoadGroup.h"
michael@0 19 #include "nsNetCID.h"
michael@0 20 #include <algorithm>
michael@0 21
michael@0 22 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
michael@0 23
michael@0 24 #if defined(PR_LOGGING)
michael@0 25 //
michael@0 26 // NSPR_LOG_MODULES=nsStreamPump:5
michael@0 27 //
michael@0 28 static PRLogModuleInfo *gStreamPumpLog = nullptr;
michael@0 29 #endif
michael@0 30 #undef LOG
michael@0 31 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
michael@0 32
michael@0 33 //-----------------------------------------------------------------------------
michael@0 34 // nsInputStreamPump methods
michael@0 35 //-----------------------------------------------------------------------------
michael@0 36
michael@0 37 nsInputStreamPump::nsInputStreamPump()
michael@0 38 : mState(STATE_IDLE)
michael@0 39 , mStreamOffset(0)
michael@0 40 , mStreamLength(UINT64_MAX)
michael@0 41 , mStatus(NS_OK)
michael@0 42 , mSuspendCount(0)
michael@0 43 , mLoadFlags(LOAD_NORMAL)
michael@0 44 , mProcessingCallbacks(false)
michael@0 45 , mWaitingForInputStreamReady(false)
michael@0 46 , mCloseWhenDone(false)
michael@0 47 , mRetargeting(false)
michael@0 48 , mMonitor("nsInputStreamPump")
michael@0 49 {
michael@0 50 #if defined(PR_LOGGING)
michael@0 51 if (!gStreamPumpLog)
michael@0 52 gStreamPumpLog = PR_NewLogModule("nsStreamPump");
michael@0 53 #endif
michael@0 54 }
michael@0 55
michael@0 56 nsInputStreamPump::~nsInputStreamPump()
michael@0 57 {
michael@0 58 }
michael@0 59
michael@0 60 nsresult
michael@0 61 nsInputStreamPump::Create(nsInputStreamPump **result,
michael@0 62 nsIInputStream *stream,
michael@0 63 int64_t streamPos,
michael@0 64 int64_t streamLen,
michael@0 65 uint32_t segsize,
michael@0 66 uint32_t segcount,
michael@0 67 bool closeWhenDone)
michael@0 68 {
michael@0 69 nsresult rv = NS_ERROR_OUT_OF_MEMORY;
michael@0 70 nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
michael@0 71 if (pump) {
michael@0 72 rv = pump->Init(stream, streamPos, streamLen,
michael@0 73 segsize, segcount, closeWhenDone);
michael@0 74 if (NS_SUCCEEDED(rv)) {
michael@0 75 *result = nullptr;
michael@0 76 pump.swap(*result);
michael@0 77 }
michael@0 78 }
michael@0 79 return rv;
michael@0 80 }
michael@0 81
michael@0 82 struct PeekData {
michael@0 83 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
michael@0 84 : mFunc(fun), mClosure(closure) {}
michael@0 85
michael@0 86 nsInputStreamPump::PeekSegmentFun mFunc;
michael@0 87 void* mClosure;
michael@0 88 };
michael@0 89
michael@0 90 static NS_METHOD
michael@0 91 CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
michael@0 92 const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
michael@0 93 uint32_t *aWriteCount)
michael@0 94 {
michael@0 95 NS_ASSERTION(aToOffset == 0, "Called more than once?");
michael@0 96 NS_ASSERTION(aCount > 0, "Called without data?");
michael@0 97
michael@0 98 PeekData* data = static_cast<PeekData*>(aClosure);
michael@0 99 data->mFunc(data->mClosure,
michael@0 100 reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
michael@0 101 return NS_BINDING_ABORTED;
michael@0 102 }
michael@0 103
michael@0 104 nsresult
michael@0 105 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
michael@0 106 {
michael@0 107 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 108
michael@0 109 NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
michael@0 110
michael@0 111 // See if the pipe is closed by checking the return of Available.
michael@0 112 uint64_t dummy64;
michael@0 113 nsresult rv = mAsyncStream->Available(&dummy64);
michael@0 114 if (NS_FAILED(rv))
michael@0 115 return rv;
michael@0 116 uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
michael@0 117
michael@0 118 PeekData data(callback, closure);
michael@0 119 return mAsyncStream->ReadSegments(CallPeekFunc,
michael@0 120 &data,
michael@0 121 nsIOService::gDefaultSegmentSize,
michael@0 122 &dummy);
michael@0 123 }
michael@0 124
michael@0 125 nsresult
michael@0 126 nsInputStreamPump::EnsureWaiting()
michael@0 127 {
michael@0 128 mMonitor.AssertCurrentThreadIn();
michael@0 129
michael@0 130 // no need to worry about multiple threads... an input stream pump lives
michael@0 131 // on only one thread at a time.
michael@0 132 MOZ_ASSERT(mAsyncStream);
michael@0 133 if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
michael@0 134 // Ensure OnStateStop is called on the main thread.
michael@0 135 if (mState == STATE_STOP) {
michael@0 136 nsCOMPtr<nsIThread> mainThread = do_GetMainThread();
michael@0 137 if (mTargetThread != mainThread) {
michael@0 138 mTargetThread = do_QueryInterface(mainThread);
michael@0 139 }
michael@0 140 }
michael@0 141 MOZ_ASSERT(mTargetThread);
michael@0 142 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
michael@0 143 if (NS_FAILED(rv)) {
michael@0 144 NS_ERROR("AsyncWait failed");
michael@0 145 return rv;
michael@0 146 }
michael@0 147 // Any retargeting during STATE_START or START_TRANSFER is complete
michael@0 148 // after the call to AsyncWait; next callback wil be on mTargetThread.
michael@0 149 mRetargeting = false;
michael@0 150 mWaitingForInputStreamReady = true;
michael@0 151 }
michael@0 152 return NS_OK;
michael@0 153 }
michael@0 154
michael@0 155 //-----------------------------------------------------------------------------
michael@0 156 // nsInputStreamPump::nsISupports
michael@0 157 //-----------------------------------------------------------------------------
michael@0 158
michael@0 159 // although this class can only be accessed from one thread at a time, we do
michael@0 160 // allow its ownership to move from thread to thread, assuming the consumer
michael@0 161 // understands the limitations of this.
michael@0 162 NS_IMPL_ISUPPORTS(nsInputStreamPump,
michael@0 163 nsIRequest,
michael@0 164 nsIThreadRetargetableRequest,
michael@0 165 nsIInputStreamCallback,
michael@0 166 nsIInputStreamPump)
michael@0 167
michael@0 168 //-----------------------------------------------------------------------------
michael@0 169 // nsInputStreamPump::nsIRequest
michael@0 170 //-----------------------------------------------------------------------------
michael@0 171
michael@0 172 NS_IMETHODIMP
michael@0 173 nsInputStreamPump::GetName(nsACString &result)
michael@0 174 {
michael@0 175 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 176
michael@0 177 result.Truncate();
michael@0 178 return NS_OK;
michael@0 179 }
michael@0 180
michael@0 181 NS_IMETHODIMP
michael@0 182 nsInputStreamPump::IsPending(bool *result)
michael@0 183 {
michael@0 184 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 185
michael@0 186 *result = (mState != STATE_IDLE);
michael@0 187 return NS_OK;
michael@0 188 }
michael@0 189
michael@0 190 NS_IMETHODIMP
michael@0 191 nsInputStreamPump::GetStatus(nsresult *status)
michael@0 192 {
michael@0 193 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 194
michael@0 195 *status = mStatus;
michael@0 196 return NS_OK;
michael@0 197 }
michael@0 198
michael@0 199 NS_IMETHODIMP
michael@0 200 nsInputStreamPump::Cancel(nsresult status)
michael@0 201 {
michael@0 202 MOZ_ASSERT(NS_IsMainThread());
michael@0 203
michael@0 204 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 205
michael@0 206 LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n",
michael@0 207 this, status));
michael@0 208
michael@0 209 if (NS_FAILED(mStatus)) {
michael@0 210 LOG((" already canceled\n"));
michael@0 211 return NS_OK;
michael@0 212 }
michael@0 213
michael@0 214 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
michael@0 215 mStatus = status;
michael@0 216
michael@0 217 // close input stream
michael@0 218 if (mAsyncStream) {
michael@0 219 mAsyncStream->CloseWithStatus(status);
michael@0 220 if (mSuspendCount == 0)
michael@0 221 EnsureWaiting();
michael@0 222 // Otherwise, EnsureWaiting will be called by Resume().
michael@0 223 // Note that while suspended, OnInputStreamReady will
michael@0 224 // not do anything, and also note that calling asyncWait
michael@0 225 // on a closed stream works and will dispatch an event immediately.
michael@0 226 }
michael@0 227 return NS_OK;
michael@0 228 }
michael@0 229
michael@0 230 NS_IMETHODIMP
michael@0 231 nsInputStreamPump::Suspend()
michael@0 232 {
michael@0 233 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 234
michael@0 235 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
michael@0 236 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
michael@0 237 ++mSuspendCount;
michael@0 238 return NS_OK;
michael@0 239 }
michael@0 240
michael@0 241 NS_IMETHODIMP
michael@0 242 nsInputStreamPump::Resume()
michael@0 243 {
michael@0 244 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 245
michael@0 246 LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
michael@0 247 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
michael@0 248 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
michael@0 249
michael@0 250 if (--mSuspendCount == 0)
michael@0 251 EnsureWaiting();
michael@0 252 return NS_OK;
michael@0 253 }
michael@0 254
michael@0 255 NS_IMETHODIMP
michael@0 256 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
michael@0 257 {
michael@0 258 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 259
michael@0 260 *aLoadFlags = mLoadFlags;
michael@0 261 return NS_OK;
michael@0 262 }
michael@0 263
michael@0 264 NS_IMETHODIMP
michael@0 265 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
michael@0 266 {
michael@0 267 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 268
michael@0 269 mLoadFlags = aLoadFlags;
michael@0 270 return NS_OK;
michael@0 271 }
michael@0 272
michael@0 273 NS_IMETHODIMP
michael@0 274 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
michael@0 275 {
michael@0 276 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 277
michael@0 278 NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
michael@0 279 return NS_OK;
michael@0 280 }
michael@0 281
michael@0 282 NS_IMETHODIMP
michael@0 283 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
michael@0 284 {
michael@0 285 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 286
michael@0 287 mLoadGroup = aLoadGroup;
michael@0 288 return NS_OK;
michael@0 289 }
michael@0 290
michael@0 291 //-----------------------------------------------------------------------------
michael@0 292 // nsInputStreamPump::nsIInputStreamPump implementation
michael@0 293 //-----------------------------------------------------------------------------
michael@0 294
michael@0 295 NS_IMETHODIMP
michael@0 296 nsInputStreamPump::Init(nsIInputStream *stream,
michael@0 297 int64_t streamPos, int64_t streamLen,
michael@0 298 uint32_t segsize, uint32_t segcount,
michael@0 299 bool closeWhenDone)
michael@0 300 {
michael@0 301 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
michael@0 302
michael@0 303 mStreamOffset = uint64_t(streamPos);
michael@0 304 if (int64_t(streamLen) >= int64_t(0))
michael@0 305 mStreamLength = uint64_t(streamLen);
michael@0 306 mStream = stream;
michael@0 307 mSegSize = segsize;
michael@0 308 mSegCount = segcount;
michael@0 309 mCloseWhenDone = closeWhenDone;
michael@0 310
michael@0 311 return NS_OK;
michael@0 312 }
michael@0 313
michael@0 314 NS_IMETHODIMP
michael@0 315 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
michael@0 316 {
michael@0 317 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 318
michael@0 319 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
michael@0 320 NS_ENSURE_ARG_POINTER(listener);
michael@0 321 MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
michael@0 322 "main thread only.");
michael@0 323
michael@0 324 //
michael@0 325 // OK, we need to use the stream transport service if
michael@0 326 //
michael@0 327 // (1) the stream is blocking
michael@0 328 // (2) the stream does not support nsIAsyncInputStream
michael@0 329 //
michael@0 330
michael@0 331 bool nonBlocking;
michael@0 332 nsresult rv = mStream->IsNonBlocking(&nonBlocking);
michael@0 333 if (NS_FAILED(rv)) return rv;
michael@0 334
michael@0 335 if (nonBlocking) {
michael@0 336 mAsyncStream = do_QueryInterface(mStream);
michael@0 337 //
michael@0 338 // if the stream supports nsIAsyncInputStream, and if we need to seek
michael@0 339 // to a starting offset, then we must do so here. in the non-async
michael@0 340 // stream case, the stream transport service will take care of seeking
michael@0 341 // for us.
michael@0 342 //
michael@0 343 if (mAsyncStream && (mStreamOffset != UINT64_MAX)) {
michael@0 344 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
michael@0 345 if (seekable)
michael@0 346 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
michael@0 347 }
michael@0 348 }
michael@0 349
michael@0 350 if (!mAsyncStream) {
michael@0 351 // ok, let's use the stream transport service to read this stream.
michael@0 352 nsCOMPtr<nsIStreamTransportService> sts =
michael@0 353 do_GetService(kStreamTransportServiceCID, &rv);
michael@0 354 if (NS_FAILED(rv)) return rv;
michael@0 355
michael@0 356 nsCOMPtr<nsITransport> transport;
michael@0 357 rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
michael@0 358 mCloseWhenDone, getter_AddRefs(transport));
michael@0 359 if (NS_FAILED(rv)) return rv;
michael@0 360
michael@0 361 nsCOMPtr<nsIInputStream> wrapper;
michael@0 362 rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
michael@0 363 if (NS_FAILED(rv)) return rv;
michael@0 364
michael@0 365 mAsyncStream = do_QueryInterface(wrapper, &rv);
michael@0 366 if (NS_FAILED(rv)) return rv;
michael@0 367 }
michael@0 368
michael@0 369 // release our reference to the original stream. from this point forward,
michael@0 370 // we only reference the "stream" via mAsyncStream.
michael@0 371 mStream = 0;
michael@0 372
michael@0 373 // mStreamOffset now holds the number of bytes currently read. we use this
michael@0 374 // to enforce the mStreamLength restriction.
michael@0 375 mStreamOffset = 0;
michael@0 376
michael@0 377 // grab event queue (we must do this here by contract, since all notifications
michael@0 378 // must go to the thread which called AsyncRead)
michael@0 379 mTargetThread = do_GetCurrentThread();
michael@0 380 NS_ENSURE_STATE(mTargetThread);
michael@0 381
michael@0 382 rv = EnsureWaiting();
michael@0 383 if (NS_FAILED(rv)) return rv;
michael@0 384
michael@0 385 if (mLoadGroup)
michael@0 386 mLoadGroup->AddRequest(this, nullptr);
michael@0 387
michael@0 388 mState = STATE_START;
michael@0 389 mListener = listener;
michael@0 390 mListenerContext = ctxt;
michael@0 391 return NS_OK;
michael@0 392 }
michael@0 393
michael@0 394 //-----------------------------------------------------------------------------
michael@0 395 // nsInputStreamPump::nsIInputStreamCallback implementation
michael@0 396 //-----------------------------------------------------------------------------
michael@0 397
michael@0 398 NS_IMETHODIMP
michael@0 399 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
michael@0 400 {
michael@0 401 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
michael@0 402
michael@0 403 PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady");
michael@0 404 // this function has been called from a PLEvent, so we can safely call
michael@0 405 // any listener or progress sink methods directly from here.
michael@0 406
michael@0 407 for (;;) {
michael@0 408 // There should only be one iteration of this loop happening at a time.
michael@0 409 // To prevent AsyncWait() (called during callbacks or on other threads)
michael@0 410 // from creating a parallel OnInputStreamReady(), we use:
michael@0 411 // -- a monitor; and
michael@0 412 // -- a boolean mProcessingCallbacks to detect parallel loops
michael@0 413 // when exiting the monitor for callbacks.
michael@0 414 ReentrantMonitorAutoEnter lock(mMonitor);
michael@0 415
michael@0 416 // Prevent parallel execution during callbacks, while out of monitor.
michael@0 417 if (mProcessingCallbacks) {
michael@0 418 MOZ_ASSERT(!mProcessingCallbacks);
michael@0 419 break;
michael@0 420 }
michael@0 421 mProcessingCallbacks = true;
michael@0 422 if (mSuspendCount || mState == STATE_IDLE) {
michael@0 423 mWaitingForInputStreamReady = false;
michael@0 424 mProcessingCallbacks = false;
michael@0 425 break;
michael@0 426 }
michael@0 427
michael@0 428 uint32_t nextState;
michael@0 429 switch (mState) {
michael@0 430 case STATE_START:
michael@0 431 nextState = OnStateStart();
michael@0 432 break;
michael@0 433 case STATE_TRANSFER:
michael@0 434 nextState = OnStateTransfer();
michael@0 435 break;
michael@0 436 case STATE_STOP:
michael@0 437 mRetargeting = false;
michael@0 438 nextState = OnStateStop();
michael@0 439 break;
michael@0 440 default:
michael@0 441 nextState = 0;
michael@0 442 NS_NOTREACHED("Unknown enum value.");
michael@0 443 return NS_ERROR_UNEXPECTED;
michael@0 444 }
michael@0 445
michael@0 446 bool stillTransferring = (mState == STATE_TRANSFER &&
michael@0 447 nextState == STATE_TRANSFER);
michael@0 448 if (stillTransferring) {
michael@0 449 NS_ASSERTION(NS_SUCCEEDED(mStatus),
michael@0 450 "Should not have failed status for ongoing transfer");
michael@0 451 } else {
michael@0 452 NS_ASSERTION(mState != nextState,
michael@0 453 "Only OnStateTransfer can be called more than once.");
michael@0 454 }
michael@0 455 if (mRetargeting) {
michael@0 456 NS_ASSERTION(mState != STATE_STOP,
michael@0 457 "Retargeting should not happen during OnStateStop.");
michael@0 458 }
michael@0 459
michael@0 460 // Set mRetargeting so EnsureWaiting will be called. It ensures that
michael@0 461 // OnStateStop is called on the main thread.
michael@0 462 if (nextState == STATE_STOP && !NS_IsMainThread()) {
michael@0 463 mRetargeting = true;
michael@0 464 }
michael@0 465
michael@0 466 // Unset mProcessingCallbacks here (while we have lock) so our own call to
michael@0 467 // EnsureWaiting isn't blocked by it.
michael@0 468 mProcessingCallbacks = false;
michael@0 469
michael@0 470 // Wait asynchronously if there is still data to transfer, or we're
michael@0 471 // switching event delivery to another thread.
michael@0 472 if (!mSuspendCount && (stillTransferring || mRetargeting)) {
michael@0 473 mState = nextState;
michael@0 474 mWaitingForInputStreamReady = false;
michael@0 475 nsresult rv = EnsureWaiting();
michael@0 476 if (NS_SUCCEEDED(rv))
michael@0 477 break;
michael@0 478
michael@0 479 // Failure to start asynchronous wait: stop transfer.
michael@0 480 // Do not set mStatus if it was previously set to report a failure.
michael@0 481 if (NS_SUCCEEDED(mStatus)) {
michael@0 482 mStatus = rv;
michael@0 483 }
michael@0 484 nextState = STATE_STOP;
michael@0 485 }
michael@0 486
michael@0 487 mState = nextState;
michael@0 488 }
michael@0 489 return NS_OK;
michael@0 490 }
michael@0 491
michael@0 492 uint32_t
michael@0 493 nsInputStreamPump::OnStateStart()
michael@0 494 {
michael@0 495 mMonitor.AssertCurrentThreadIn();
michael@0 496
michael@0 497 PROFILER_LABEL("nsInputStreamPump", "OnStateStart");
michael@0 498 LOG((" OnStateStart [this=%p]\n", this));
michael@0 499
michael@0 500 nsresult rv;
michael@0 501
michael@0 502 // need to check the reason why the stream is ready. this is required
michael@0 503 // so our listener can check our status from OnStartRequest.
michael@0 504 // XXX async streams should have a GetStatus method!
michael@0 505 if (NS_SUCCEEDED(mStatus)) {
michael@0 506 uint64_t avail;
michael@0 507 rv = mAsyncStream->Available(&avail);
michael@0 508 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
michael@0 509 mStatus = rv;
michael@0 510 }
michael@0 511
michael@0 512 {
michael@0 513 // Note: Must exit monitor for call to OnStartRequest to avoid
michael@0 514 // deadlocks when calls to RetargetDeliveryTo for multiple
michael@0 515 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
michael@0 516 mMonitor.Exit();
michael@0 517 rv = mListener->OnStartRequest(this, mListenerContext);
michael@0 518 mMonitor.Enter();
michael@0 519 }
michael@0 520
michael@0 521 // an error returned from OnStartRequest should cause us to abort; however,
michael@0 522 // we must not stomp on mStatus if already canceled.
michael@0 523 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
michael@0 524 mStatus = rv;
michael@0 525
michael@0 526 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
michael@0 527 }
michael@0 528
michael@0 529 uint32_t
michael@0 530 nsInputStreamPump::OnStateTransfer()
michael@0 531 {
michael@0 532 mMonitor.AssertCurrentThreadIn();
michael@0 533
michael@0 534 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
michael@0 535 LOG((" OnStateTransfer [this=%p]\n", this));
michael@0 536
michael@0 537 // if canceled, go directly to STATE_STOP...
michael@0 538 if (NS_FAILED(mStatus))
michael@0 539 return STATE_STOP;
michael@0 540
michael@0 541 nsresult rv;
michael@0 542
michael@0 543 uint64_t avail;
michael@0 544 rv = mAsyncStream->Available(&avail);
michael@0 545 LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail));
michael@0 546
michael@0 547 if (rv == NS_BASE_STREAM_CLOSED) {
michael@0 548 rv = NS_OK;
michael@0 549 avail = 0;
michael@0 550 }
michael@0 551 else if (NS_SUCCEEDED(rv) && avail) {
michael@0 552 // figure out how much data to report (XXX detect overflow??)
michael@0 553 if (avail > mStreamLength - mStreamOffset)
michael@0 554 avail = mStreamLength - mStreamOffset;
michael@0 555
michael@0 556 if (avail) {
michael@0 557 // we used to limit avail to 16K - we were afraid some ODA handlers
michael@0 558 // might assume they wouldn't get more than 16K at once
michael@0 559 // we're removing that limit since it speeds up local file access.
michael@0 560 // Now there's an implicit 64K limit of 4 16K segments
michael@0 561 // NOTE: ok, so the story is as follows. OnDataAvailable impls
michael@0 562 // are by contract supposed to consume exactly |avail| bytes.
michael@0 563 // however, many do not... mailnews... stream converters...
michael@0 564 // cough, cough. the input stream pump is fairly tolerant
michael@0 565 // in this regard; however, if an ODA does not consume any
michael@0 566 // data from the stream, then we could potentially end up in
michael@0 567 // an infinite loop. we do our best here to try to catch
michael@0 568 // such an error. (see bug 189672)
michael@0 569
michael@0 570 // in most cases this QI will succeed (mAsyncStream is almost always
michael@0 571 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
michael@0 572 int64_t offsetBefore;
michael@0 573 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
michael@0 574 if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
michael@0 575 NS_NOTREACHED("Tell failed on readable stream");
michael@0 576 offsetBefore = 0;
michael@0 577 }
michael@0 578
michael@0 579 uint32_t odaAvail =
michael@0 580 avail > UINT32_MAX ?
michael@0 581 UINT32_MAX : uint32_t(avail);
michael@0 582
michael@0 583 LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n",
michael@0 584 mStreamOffset, avail, odaAvail));
michael@0 585
michael@0 586 {
michael@0 587 // Note: Must exit monitor for call to OnStartRequest to avoid
michael@0 588 // deadlocks when calls to RetargetDeliveryTo for multiple
michael@0 589 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
michael@0 590 mMonitor.Exit();
michael@0 591 rv = mListener->OnDataAvailable(this, mListenerContext,
michael@0 592 mAsyncStream, mStreamOffset,
michael@0 593 odaAvail);
michael@0 594 mMonitor.Enter();
michael@0 595 }
michael@0 596
michael@0 597 // don't enter this code if ODA failed or called Cancel
michael@0 598 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
michael@0 599 // test to see if this ODA failed to consume data
michael@0 600 if (seekable) {
michael@0 601 // NOTE: if Tell fails, which can happen if the stream is
michael@0 602 // now closed, then we assume that everything was read.
michael@0 603 int64_t offsetAfter;
michael@0 604 if (NS_FAILED(seekable->Tell(&offsetAfter)))
michael@0 605 offsetAfter = offsetBefore + odaAvail;
michael@0 606 if (offsetAfter > offsetBefore)
michael@0 607 mStreamOffset += (offsetAfter - offsetBefore);
michael@0 608 else if (mSuspendCount == 0) {
michael@0 609 //
michael@0 610 // possible infinite loop if we continue pumping data!
michael@0 611 //
michael@0 612 // NOTE: although not allowed by nsIStreamListener, we
michael@0 613 // will allow the ODA impl to Suspend the pump. IMAP
michael@0 614 // does this :-(
michael@0 615 //
michael@0 616 NS_ERROR("OnDataAvailable implementation consumed no data");
michael@0 617 mStatus = NS_ERROR_UNEXPECTED;
michael@0 618 }
michael@0 619 }
michael@0 620 else
michael@0 621 mStreamOffset += odaAvail; // assume ODA behaved well
michael@0 622 }
michael@0 623 }
michael@0 624 }
michael@0 625
michael@0 626 // an error returned from Available or OnDataAvailable should cause us to
michael@0 627 // abort; however, we must not stomp on mStatus if already canceled.
michael@0 628
michael@0 629 if (NS_SUCCEEDED(mStatus)) {
michael@0 630 if (NS_FAILED(rv))
michael@0 631 mStatus = rv;
michael@0 632 else if (avail) {
michael@0 633 // if stream is now closed, advance to STATE_STOP right away.
michael@0 634 // Available may return 0 bytes available at the moment; that
michael@0 635 // would not mean that we are done.
michael@0 636 // XXX async streams should have a GetStatus method!
michael@0 637 rv = mAsyncStream->Available(&avail);
michael@0 638 if (NS_SUCCEEDED(rv))
michael@0 639 return STATE_TRANSFER;
michael@0 640 if (rv != NS_BASE_STREAM_CLOSED)
michael@0 641 mStatus = rv;
michael@0 642 }
michael@0 643 }
michael@0 644 return STATE_STOP;
michael@0 645 }
michael@0 646
michael@0 647 nsresult
michael@0 648 nsInputStreamPump::CallOnStateStop()
michael@0 649 {
michael@0 650 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 651
michael@0 652 MOZ_ASSERT(NS_IsMainThread(),
michael@0 653 "CallOnStateStop should only be called on the main thread.");
michael@0 654
michael@0 655 mState = OnStateStop();
michael@0 656 return NS_OK;
michael@0 657 }
michael@0 658
michael@0 659 uint32_t
michael@0 660 nsInputStreamPump::OnStateStop()
michael@0 661 {
michael@0 662 mMonitor.AssertCurrentThreadIn();
michael@0 663
michael@0 664 if (!NS_IsMainThread()) {
michael@0 665 // Hopefully temporary hack: OnStateStop should only run on the main
michael@0 666 // thread, but we're seeing some rare off-main-thread calls. For now
michael@0 667 // just redispatch to the main thread in release builds, and crash in
michael@0 668 // debug builds.
michael@0 669 MOZ_ASSERT(NS_IsMainThread(),
michael@0 670 "OnStateStop should only be called on the main thread.");
michael@0 671 nsresult rv = NS_DispatchToMainThread(
michael@0 672 NS_NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop));
michael@0 673 NS_ENSURE_SUCCESS(rv, STATE_IDLE);
michael@0 674 return STATE_IDLE;
michael@0 675 }
michael@0 676
michael@0 677 PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
michael@0 678 LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus));
michael@0 679
michael@0 680 // if an error occurred, we must be sure to pass the error onto the async
michael@0 681 // stream. in some cases, this is redundant, but since close is idempotent,
michael@0 682 // this is OK. otherwise, be sure to honor the "close-when-done" option.
michael@0 683
michael@0 684 if (!mAsyncStream || !mListener) {
michael@0 685 MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
michael@0 686 MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
michael@0 687 return STATE_IDLE;
michael@0 688 }
michael@0 689
michael@0 690 if (NS_FAILED(mStatus))
michael@0 691 mAsyncStream->CloseWithStatus(mStatus);
michael@0 692 else if (mCloseWhenDone)
michael@0 693 mAsyncStream->Close();
michael@0 694
michael@0 695 mAsyncStream = 0;
michael@0 696 mTargetThread = 0;
michael@0 697 mIsPending = false;
michael@0 698 {
michael@0 699 // Note: Must exit monitor for call to OnStartRequest to avoid
michael@0 700 // deadlocks when calls to RetargetDeliveryTo for multiple
michael@0 701 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
michael@0 702 mMonitor.Exit();
michael@0 703 mListener->OnStopRequest(this, mListenerContext, mStatus);
michael@0 704 mMonitor.Enter();
michael@0 705 }
michael@0 706 mListener = 0;
michael@0 707 mListenerContext = 0;
michael@0 708
michael@0 709 if (mLoadGroup)
michael@0 710 mLoadGroup->RemoveRequest(this, nullptr, mStatus);
michael@0 711
michael@0 712 return STATE_IDLE;
michael@0 713 }
michael@0 714
michael@0 715 //-----------------------------------------------------------------------------
michael@0 716 // nsIThreadRetargetableRequest
michael@0 717 //-----------------------------------------------------------------------------
michael@0 718
michael@0 719 NS_IMETHODIMP
michael@0 720 nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
michael@0 721 {
michael@0 722 ReentrantMonitorAutoEnter mon(mMonitor);
michael@0 723
michael@0 724 NS_ENSURE_ARG(aNewTarget);
michael@0 725 NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
michael@0 726 NS_ERROR_UNEXPECTED);
michael@0 727
michael@0 728 // If canceled, do not retarget. Return with canceled status.
michael@0 729 if (NS_FAILED(mStatus)) {
michael@0 730 return mStatus;
michael@0 731 }
michael@0 732
michael@0 733 if (aNewTarget == mTargetThread) {
michael@0 734 NS_WARNING("Retargeting delivery to same thread");
michael@0 735 return NS_OK;
michael@0 736 }
michael@0 737
michael@0 738 // Ensure that |mListener| and any subsequent listeners can be retargeted
michael@0 739 // to another thread.
michael@0 740 nsresult rv = NS_OK;
michael@0 741 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
michael@0 742 do_QueryInterface(mListener, &rv);
michael@0 743 if (NS_SUCCEEDED(rv) && retargetableListener) {
michael@0 744 rv = retargetableListener->CheckListenerChain();
michael@0 745 if (NS_SUCCEEDED(rv)) {
michael@0 746 mTargetThread = aNewTarget;
michael@0 747 mRetargeting = true;
michael@0 748 }
michael@0 749 }
michael@0 750 LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] "
michael@0 751 "%s listener [%p] rv[%x]",
michael@0 752 this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
michael@0 753 (nsIStreamListener*)mListener, rv));
michael@0 754 return rv;
michael@0 755 }

mercurial