xpcom/io/nsPipe3.cpp

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 #include "mozilla/Attributes.h"
michael@0 6 #include "mozilla/ReentrantMonitor.h"
michael@0 7 #include "nsIPipe.h"
michael@0 8 #include "nsIEventTarget.h"
michael@0 9 #include "nsISeekableStream.h"
michael@0 10 #include "nsIProgrammingLanguage.h"
michael@0 11 #include "nsSegmentedBuffer.h"
michael@0 12 #include "nsStreamUtils.h"
michael@0 13 #include "nsCOMPtr.h"
michael@0 14 #include "nsCRT.h"
michael@0 15 #include "prlog.h"
michael@0 16 #include "nsIClassInfoImpl.h"
michael@0 17 #include "nsAlgorithm.h"
michael@0 18 #include "nsMemory.h"
michael@0 19 #include "nsIAsyncInputStream.h"
michael@0 20 #include "nsIAsyncOutputStream.h"
michael@0 21
michael@0 22 using namespace mozilla;
michael@0 23
michael@0 24 #ifdef LOG
michael@0 25 #undef LOG
michael@0 26 #endif
michael@0 27 #if defined(PR_LOGGING)
michael@0 28 //
michael@0 29 // set NSPR_LOG_MODULES=nsPipe:5
michael@0 30 //
michael@0 31 static PRLogModuleInfo *
michael@0 32 GetPipeLog()
michael@0 33 {
michael@0 34 static PRLogModuleInfo *sLog;
michael@0 35 if (!sLog)
michael@0 36 sLog = PR_NewLogModule("nsPipe");
michael@0 37 return sLog;
michael@0 38 }
michael@0 39 #define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args)
michael@0 40 #else
michael@0 41 #define LOG(args)
michael@0 42 #endif
michael@0 43
michael@0 44 #define DEFAULT_SEGMENT_SIZE 4096
michael@0 45 #define DEFAULT_SEGMENT_COUNT 16
michael@0 46
michael@0 47 class nsPipe;
michael@0 48 class nsPipeEvents;
michael@0 49 class nsPipeInputStream;
michael@0 50 class nsPipeOutputStream;
michael@0 51
michael@0 52 //-----------------------------------------------------------------------------
michael@0 53
michael@0 54 // this class is used to delay notifications until the end of a particular
michael@0 55 // scope. it helps avoid the complexity of issuing callbacks while inside
michael@0 56 // a critical section.
michael@0 57 class nsPipeEvents
michael@0 58 {
michael@0 59 public:
michael@0 60 nsPipeEvents() { }
michael@0 61 ~nsPipeEvents();
michael@0 62
michael@0 63 inline void NotifyInputReady(nsIAsyncInputStream *stream,
michael@0 64 nsIInputStreamCallback *callback)
michael@0 65 {
michael@0 66 NS_ASSERTION(!mInputCallback, "already have an input event");
michael@0 67 mInputStream = stream;
michael@0 68 mInputCallback = callback;
michael@0 69 }
michael@0 70
michael@0 71 inline void NotifyOutputReady(nsIAsyncOutputStream *stream,
michael@0 72 nsIOutputStreamCallback *callback)
michael@0 73 {
michael@0 74 NS_ASSERTION(!mOutputCallback, "already have an output event");
michael@0 75 mOutputStream = stream;
michael@0 76 mOutputCallback = callback;
michael@0 77 }
michael@0 78
michael@0 79 private:
michael@0 80 nsCOMPtr<nsIAsyncInputStream> mInputStream;
michael@0 81 nsCOMPtr<nsIInputStreamCallback> mInputCallback;
michael@0 82 nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
michael@0 83 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
michael@0 84 };
michael@0 85
michael@0 86 //-----------------------------------------------------------------------------
michael@0 87
michael@0 88 // the input end of a pipe (allocated as a member of the pipe).
michael@0 89 class nsPipeInputStream : public nsIAsyncInputStream
michael@0 90 , public nsISeekableStream
michael@0 91 , public nsISearchableInputStream
michael@0 92 , public nsIClassInfo
michael@0 93 {
michael@0 94 public:
michael@0 95 // since this class will be allocated as a member of the pipe, we do not
michael@0 96 // need our own ref count. instead, we share the lifetime (the ref count)
michael@0 97 // of the entire pipe. this macro is just convenience since it does not
michael@0 98 // declare a mRefCount variable; however, don't let the name fool you...
michael@0 99 // we are not inheriting from nsPipe ;-)
michael@0 100 NS_DECL_ISUPPORTS_INHERITED
michael@0 101
michael@0 102 NS_DECL_NSIINPUTSTREAM
michael@0 103 NS_DECL_NSIASYNCINPUTSTREAM
michael@0 104 NS_DECL_NSISEEKABLESTREAM
michael@0 105 NS_DECL_NSISEARCHABLEINPUTSTREAM
michael@0 106 NS_DECL_NSICLASSINFO
michael@0 107
michael@0 108 nsPipeInputStream(nsPipe *pipe)
michael@0 109 : mPipe(pipe)
michael@0 110 , mReaderRefCnt(0)
michael@0 111 , mLogicalOffset(0)
michael@0 112 , mBlocking(true)
michael@0 113 , mBlocked(false)
michael@0 114 , mAvailable(0)
michael@0 115 , mCallbackFlags(0)
michael@0 116 { }
michael@0 117
michael@0 118 nsresult Fill();
michael@0 119 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
michael@0 120
michael@0 121 uint32_t Available() { return mAvailable; }
michael@0 122 void ReduceAvailable(uint32_t avail) { mAvailable -= avail; }
michael@0 123
michael@0 124 // synchronously wait for the pipe to become readable.
michael@0 125 nsresult Wait();
michael@0 126
michael@0 127 // these functions return true to indicate that the pipe's monitor should
michael@0 128 // be notified, to wake up a blocked reader if any.
michael@0 129 bool OnInputReadable(uint32_t bytesWritten, nsPipeEvents &);
michael@0 130 bool OnInputException(nsresult, nsPipeEvents &);
michael@0 131
michael@0 132 private:
michael@0 133 nsPipe *mPipe;
michael@0 134
michael@0 135 // separate refcnt so that we know when to close the consumer
michael@0 136 mozilla::ThreadSafeAutoRefCnt mReaderRefCnt;
michael@0 137 int64_t mLogicalOffset;
michael@0 138 bool mBlocking;
michael@0 139
michael@0 140 // these variables can only be accessed while inside the pipe's monitor
michael@0 141 bool mBlocked;
michael@0 142 uint32_t mAvailable;
michael@0 143 nsCOMPtr<nsIInputStreamCallback> mCallback;
michael@0 144 uint32_t mCallbackFlags;
michael@0 145 };
michael@0 146
michael@0 147 //-----------------------------------------------------------------------------
michael@0 148
michael@0 149 // the output end of a pipe (allocated as a member of the pipe).
michael@0 150 class nsPipeOutputStream : public nsIAsyncOutputStream
michael@0 151 , public nsIClassInfo
michael@0 152 {
michael@0 153 public:
michael@0 154 // since this class will be allocated as a member of the pipe, we do not
michael@0 155 // need our own ref count. instead, we share the lifetime (the ref count)
michael@0 156 // of the entire pipe. this macro is just convenience since it does not
michael@0 157 // declare a mRefCount variable; however, don't let the name fool you...
michael@0 158 // we are not inheriting from nsPipe ;-)
michael@0 159 NS_DECL_ISUPPORTS_INHERITED
michael@0 160
michael@0 161 NS_DECL_NSIOUTPUTSTREAM
michael@0 162 NS_DECL_NSIASYNCOUTPUTSTREAM
michael@0 163 NS_DECL_NSICLASSINFO
michael@0 164
michael@0 165 nsPipeOutputStream(nsPipe *pipe)
michael@0 166 : mPipe(pipe)
michael@0 167 , mWriterRefCnt(0)
michael@0 168 , mLogicalOffset(0)
michael@0 169 , mBlocking(true)
michael@0 170 , mBlocked(false)
michael@0 171 , mWritable(true)
michael@0 172 , mCallbackFlags(0)
michael@0 173 { }
michael@0 174
michael@0 175 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
michael@0 176 void SetWritable(bool writable) { mWritable = writable; }
michael@0 177
michael@0 178 // synchronously wait for the pipe to become writable.
michael@0 179 nsresult Wait();
michael@0 180
michael@0 181 // these functions return true to indicate that the pipe's monitor should
michael@0 182 // be notified, to wake up a blocked writer if any.
michael@0 183 bool OnOutputWritable(nsPipeEvents &);
michael@0 184 bool OnOutputException(nsresult, nsPipeEvents &);
michael@0 185
michael@0 186 private:
michael@0 187 nsPipe *mPipe;
michael@0 188
michael@0 189 // separate refcnt so that we know when to close the producer
michael@0 190 mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
michael@0 191 int64_t mLogicalOffset;
michael@0 192 bool mBlocking;
michael@0 193
michael@0 194 // these variables can only be accessed while inside the pipe's monitor
michael@0 195 bool mBlocked;
michael@0 196 bool mWritable;
michael@0 197 nsCOMPtr<nsIOutputStreamCallback> mCallback;
michael@0 198 uint32_t mCallbackFlags;
michael@0 199 };
michael@0 200
michael@0 201 //-----------------------------------------------------------------------------
michael@0 202
michael@0 203 class nsPipe MOZ_FINAL : public nsIPipe
michael@0 204 {
michael@0 205 public:
michael@0 206 friend class nsPipeInputStream;
michael@0 207 friend class nsPipeOutputStream;
michael@0 208
michael@0 209 NS_DECL_THREADSAFE_ISUPPORTS
michael@0 210 NS_DECL_NSIPIPE
michael@0 211
michael@0 212 // nsPipe methods:
michael@0 213 nsPipe();
michael@0 214
michael@0 215 private:
michael@0 216 ~nsPipe();
michael@0 217
michael@0 218 public:
michael@0 219 //
michael@0 220 // methods below may only be called while inside the pipe's monitor
michael@0 221 //
michael@0 222
michael@0 223 void PeekSegment(uint32_t n, char *&cursor, char *&limit);
michael@0 224
michael@0 225 //
michael@0 226 // methods below may be called while outside the pipe's monitor
michael@0 227 //
michael@0 228
michael@0 229 nsresult GetReadSegment(const char *&segment, uint32_t &segmentLen);
michael@0 230 void AdvanceReadCursor(uint32_t count);
michael@0 231
michael@0 232 nsresult GetWriteSegment(char *&segment, uint32_t &segmentLen);
michael@0 233 void AdvanceWriteCursor(uint32_t count);
michael@0 234
michael@0 235 void OnPipeException(nsresult reason, bool outputOnly = false);
michael@0 236
michael@0 237 protected:
michael@0 238 // We can't inherit from both nsIInputStream and nsIOutputStream
michael@0 239 // because they collide on their Close method. Consequently we nest their
michael@0 240 // implementations to avoid the extra object allocation.
michael@0 241 nsPipeInputStream mInput;
michael@0 242 nsPipeOutputStream mOutput;
michael@0 243
michael@0 244 ReentrantMonitor mReentrantMonitor;
michael@0 245 nsSegmentedBuffer mBuffer;
michael@0 246
michael@0 247 char* mReadCursor;
michael@0 248 char* mReadLimit;
michael@0 249
michael@0 250 int32_t mWriteSegment;
michael@0 251 char* mWriteCursor;
michael@0 252 char* mWriteLimit;
michael@0 253
michael@0 254 nsresult mStatus;
michael@0 255 bool mInited;
michael@0 256 };
michael@0 257
michael@0 258 //
michael@0 259 // NOTES on buffer architecture:
michael@0 260 //
michael@0 261 // +-----------------+ - - mBuffer.GetSegment(0)
michael@0 262 // | |
michael@0 263 // + - - - - - - - - + - - mReadCursor
michael@0 264 // |/////////////////|
michael@0 265 // |/////////////////|
michael@0 266 // |/////////////////|
michael@0 267 // |/////////////////|
michael@0 268 // +-----------------+ - - mReadLimit
michael@0 269 // |
michael@0 270 // +-----------------+
michael@0 271 // |/////////////////|
michael@0 272 // |/////////////////|
michael@0 273 // |/////////////////|
michael@0 274 // |/////////////////|
michael@0 275 // |/////////////////|
michael@0 276 // |/////////////////|
michael@0 277 // +-----------------+
michael@0 278 // |
michael@0 279 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
michael@0 280 // |/////////////////|
michael@0 281 // |/////////////////|
michael@0 282 // |/////////////////|
michael@0 283 // + - - - - - - - - + - - mWriteCursor
michael@0 284 // | |
michael@0 285 // | |
michael@0 286 // +-----------------+ - - mWriteLimit
michael@0 287 //
michael@0 288 // (shaded region contains data)
michael@0 289 //
michael@0 290 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
michael@0 291 // small allocations (e.g., 64 byte allocations). this means that buffers may
michael@0 292 // be allocated back-to-back. in the diagram above, for example, mReadLimit
michael@0 293 // would actually be pointing at the beginning of the next segment. when
michael@0 294 // making changes to this file, please keep this fact in mind.
michael@0 295 //
michael@0 296
michael@0 297 //-----------------------------------------------------------------------------
michael@0 298 // nsPipe methods:
michael@0 299 //-----------------------------------------------------------------------------
michael@0 300
michael@0 301 nsPipe::nsPipe()
michael@0 302 : mInput(MOZ_THIS_IN_INITIALIZER_LIST())
michael@0 303 , mOutput(MOZ_THIS_IN_INITIALIZER_LIST())
michael@0 304 , mReentrantMonitor("nsPipe.mReentrantMonitor")
michael@0 305 , mReadCursor(nullptr)
michael@0 306 , mReadLimit(nullptr)
michael@0 307 , mWriteSegment(-1)
michael@0 308 , mWriteCursor(nullptr)
michael@0 309 , mWriteLimit(nullptr)
michael@0 310 , mStatus(NS_OK)
michael@0 311 , mInited(false)
michael@0 312 {
michael@0 313 }
michael@0 314
michael@0 315 nsPipe::~nsPipe()
michael@0 316 {
michael@0 317 }
michael@0 318
michael@0 319 NS_IMPL_ISUPPORTS(nsPipe, nsIPipe)
michael@0 320
michael@0 321 NS_IMETHODIMP
michael@0 322 nsPipe::Init(bool nonBlockingIn,
michael@0 323 bool nonBlockingOut,
michael@0 324 uint32_t segmentSize,
michael@0 325 uint32_t segmentCount)
michael@0 326 {
michael@0 327 mInited = true;
michael@0 328
michael@0 329 if (segmentSize == 0)
michael@0 330 segmentSize = DEFAULT_SEGMENT_SIZE;
michael@0 331 if (segmentCount == 0)
michael@0 332 segmentCount = DEFAULT_SEGMENT_COUNT;
michael@0 333
michael@0 334 // protect against overflow
michael@0 335 uint32_t maxCount = uint32_t(-1) / segmentSize;
michael@0 336 if (segmentCount > maxCount)
michael@0 337 segmentCount = maxCount;
michael@0 338
michael@0 339 nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount);
michael@0 340 if (NS_FAILED(rv))
michael@0 341 return rv;
michael@0 342
michael@0 343 mInput.SetNonBlocking(nonBlockingIn);
michael@0 344 mOutput.SetNonBlocking(nonBlockingOut);
michael@0 345 return NS_OK;
michael@0 346 }
michael@0 347
michael@0 348 NS_IMETHODIMP
michael@0 349 nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream)
michael@0 350 {
michael@0 351 NS_ADDREF(*aInputStream = &mInput);
michael@0 352 return NS_OK;
michael@0 353 }
michael@0 354
michael@0 355 NS_IMETHODIMP
michael@0 356 nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream)
michael@0 357 {
michael@0 358 if (NS_WARN_IF(!mInited))
michael@0 359 return NS_ERROR_NOT_INITIALIZED;
michael@0 360 NS_ADDREF(*aOutputStream = &mOutput);
michael@0 361 return NS_OK;
michael@0 362 }
michael@0 363
michael@0 364 void
michael@0 365 nsPipe::PeekSegment(uint32_t index, char *&cursor, char *&limit)
michael@0 366 {
michael@0 367 if (index == 0) {
michael@0 368 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
michael@0 369 cursor = mReadCursor;
michael@0 370 limit = mReadLimit;
michael@0 371 }
michael@0 372 else {
michael@0 373 uint32_t numSegments = mBuffer.GetSegmentCount();
michael@0 374 if (index >= numSegments)
michael@0 375 cursor = limit = nullptr;
michael@0 376 else {
michael@0 377 cursor = mBuffer.GetSegment(index);
michael@0 378 if (mWriteSegment == (int32_t) index)
michael@0 379 limit = mWriteCursor;
michael@0 380 else
michael@0 381 limit = cursor + mBuffer.GetSegmentSize();
michael@0 382 }
michael@0 383 }
michael@0 384 }
michael@0 385
michael@0 386 nsresult
michael@0 387 nsPipe::GetReadSegment(const char *&segment, uint32_t &segmentLen)
michael@0 388 {
michael@0 389 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
michael@0 390
michael@0 391 if (mReadCursor == mReadLimit)
michael@0 392 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
michael@0 393
michael@0 394 segment = mReadCursor;
michael@0 395 segmentLen = mReadLimit - mReadCursor;
michael@0 396 return NS_OK;
michael@0 397 }
michael@0 398
michael@0 399 void
michael@0 400 nsPipe::AdvanceReadCursor(uint32_t bytesRead)
michael@0 401 {
michael@0 402 NS_ASSERTION(bytesRead, "don't call if no bytes read");
michael@0 403
michael@0 404 nsPipeEvents events;
michael@0 405 {
michael@0 406 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
michael@0 407
michael@0 408 LOG(("III advancing read cursor by %u\n", bytesRead));
michael@0 409 NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much");
michael@0 410
michael@0 411 mReadCursor += bytesRead;
michael@0 412 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
michael@0 413
michael@0 414 mInput.ReduceAvailable(bytesRead);
michael@0 415
michael@0 416 if (mReadCursor == mReadLimit) {
michael@0 417 // we've reached the limit of how much we can read from this segment.
michael@0 418 // if at the end of this segment, then we must discard this segment.
michael@0 419
michael@0 420 // if still writing in this segment then bail because we're not done
michael@0 421 // with the segment and have to wait for now...
michael@0 422 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
michael@0 423 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
michael@0 424 return;
michael@0 425 }
michael@0 426
michael@0 427 // shift write segment index (-1 indicates an empty buffer).
michael@0 428 --mWriteSegment;
michael@0 429
michael@0 430 // done with this segment
michael@0 431 mBuffer.DeleteFirstSegment();
michael@0 432 LOG(("III deleting first segment\n"));
michael@0 433
michael@0 434 if (mWriteSegment == -1) {
michael@0 435 // buffer is completely empty
michael@0 436 mReadCursor = nullptr;
michael@0 437 mReadLimit = nullptr;
michael@0 438 mWriteCursor = nullptr;
michael@0 439 mWriteLimit = nullptr;
michael@0 440 }
michael@0 441 else {
michael@0 442 // advance read cursor and limit to next buffer segment
michael@0 443 mReadCursor = mBuffer.GetSegment(0);
michael@0 444 if (mWriteSegment == 0)
michael@0 445 mReadLimit = mWriteCursor;
michael@0 446 else
michael@0 447 mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
michael@0 448 }
michael@0 449
michael@0 450 // we've free'd up a segment, so notify output stream that pipe has
michael@0 451 // room for a new segment.
michael@0 452 if (mOutput.OnOutputWritable(events))
michael@0 453 mon.Notify();
michael@0 454 }
michael@0 455 }
michael@0 456 }
michael@0 457
michael@0 458 nsresult
michael@0 459 nsPipe::GetWriteSegment(char *&segment, uint32_t &segmentLen)
michael@0 460 {
michael@0 461 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
michael@0 462
michael@0 463 if (NS_FAILED(mStatus))
michael@0 464 return mStatus;
michael@0 465
michael@0 466 // write cursor and limit may both be null indicating an empty buffer.
michael@0 467 if (mWriteCursor == mWriteLimit) {
michael@0 468 char *seg = mBuffer.AppendNewSegment();
michael@0 469 // pipe is full
michael@0 470 if (seg == nullptr)
michael@0 471 return NS_BASE_STREAM_WOULD_BLOCK;
michael@0 472 LOG(("OOO appended new segment\n"));
michael@0 473 mWriteCursor = seg;
michael@0 474 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
michael@0 475 ++mWriteSegment;
michael@0 476 }
michael@0 477
michael@0 478 // make sure read cursor is initialized
michael@0 479 if (mReadCursor == nullptr) {
michael@0 480 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
michael@0 481 mReadCursor = mReadLimit = mWriteCursor;
michael@0 482 }
michael@0 483
michael@0 484 // check to see if we can roll-back our read and write cursors to the
michael@0 485 // beginning of the current/first segment. this is purely an optimization.
michael@0 486 if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
michael@0 487 char *head = mBuffer.GetSegment(0);
michael@0 488 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
michael@0 489 mWriteCursor = mReadCursor = mReadLimit = head;
michael@0 490 }
michael@0 491
michael@0 492 segment = mWriteCursor;
michael@0 493 segmentLen = mWriteLimit - mWriteCursor;
michael@0 494 return NS_OK;
michael@0 495 }
michael@0 496
michael@0 497 void
michael@0 498 nsPipe::AdvanceWriteCursor(uint32_t bytesWritten)
michael@0 499 {
michael@0 500 NS_ASSERTION(bytesWritten, "don't call if no bytes written");
michael@0 501
michael@0 502 nsPipeEvents events;
michael@0 503 {
michael@0 504 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
michael@0 505
michael@0 506 LOG(("OOO advancing write cursor by %u\n", bytesWritten));
michael@0 507
michael@0 508 char *newWriteCursor = mWriteCursor + bytesWritten;
michael@0 509 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
michael@0 510
michael@0 511 // update read limit if reading in the same segment
michael@0 512 if (mWriteSegment == 0 && mReadLimit == mWriteCursor)
michael@0 513 mReadLimit = newWriteCursor;
michael@0 514
michael@0 515 mWriteCursor = newWriteCursor;
michael@0 516
michael@0 517 // The only way mReadCursor == mWriteCursor is if:
michael@0 518 //
michael@0 519 // - mReadCursor is at the start of a segment (which, based on how
michael@0 520 // nsSegmentedBuffer works, means that this segment is the "first"
michael@0 521 // segment)
michael@0 522 // - mWriteCursor points at the location past the end of the current
michael@0 523 // write segment (so the current write filled the current write
michael@0 524 // segment, so we've incremented mWriteCursor to point past the end
michael@0 525 // of it)
michael@0 526 // - the segment to which data has just been written is located
michael@0 527 // exactly one segment's worth of bytes before the first segment
michael@0 528 // where mReadCursor is located
michael@0 529 //
michael@0 530 // Consequently, the byte immediately after the end of the current
michael@0 531 // write segment is the first byte of the first segment, so
michael@0 532 // mReadCursor == mWriteCursor. (Another way to think about this is
michael@0 533 // to consider the buffer architecture diagram above, but consider it
michael@0 534 // with an arena allocator which allocates from the *end* of the
michael@0 535 // arena to the *beginning* of the arena.)
michael@0 536 NS_ASSERTION(mReadCursor != mWriteCursor ||
michael@0 537 (mBuffer.GetSegment(0) == mReadCursor &&
michael@0 538 mWriteCursor == mWriteLimit),
michael@0 539 "read cursor is bad");
michael@0 540
michael@0 541 // update the writable flag on the output stream
michael@0 542 if (mWriteCursor == mWriteLimit) {
michael@0 543 if (mBuffer.GetSize() >= mBuffer.GetMaxSize())
michael@0 544 mOutput.SetWritable(false);
michael@0 545 }
michael@0 546
michael@0 547 // notify input stream that pipe now contains additional data
michael@0 548 if (mInput.OnInputReadable(bytesWritten, events))
michael@0 549 mon.Notify();
michael@0 550 }
michael@0 551 }
michael@0 552
michael@0 553 void
michael@0 554 nsPipe::OnPipeException(nsresult reason, bool outputOnly)
michael@0 555 {
michael@0 556 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
michael@0 557 reason, outputOnly));
michael@0 558
michael@0 559 nsPipeEvents events;
michael@0 560 {
michael@0 561 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
michael@0 562
michael@0 563 // if we've already hit an exception, then ignore this one.
michael@0 564 if (NS_FAILED(mStatus))
michael@0 565 return;
michael@0 566
michael@0 567 mStatus = reason;
michael@0 568
michael@0 569 // an output-only exception applies to the input end if the pipe has
michael@0 570 // zero bytes available.
michael@0 571 if (outputOnly && !mInput.Available())
michael@0 572 outputOnly = false;
michael@0 573
michael@0 574 if (!outputOnly)
michael@0 575 if (mInput.OnInputException(reason, events))
michael@0 576 mon.Notify();
michael@0 577
michael@0 578 if (mOutput.OnOutputException(reason, events))
michael@0 579 mon.Notify();
michael@0 580 }
michael@0 581 }
michael@0 582
michael@0 583 //-----------------------------------------------------------------------------
michael@0 584 // nsPipeEvents methods:
michael@0 585 //-----------------------------------------------------------------------------
michael@0 586
michael@0 587 nsPipeEvents::~nsPipeEvents()
michael@0 588 {
michael@0 589 // dispatch any pending events
michael@0 590
michael@0 591 if (mInputCallback) {
michael@0 592 mInputCallback->OnInputStreamReady(mInputStream);
michael@0 593 mInputCallback = 0;
michael@0 594 mInputStream = 0;
michael@0 595 }
michael@0 596 if (mOutputCallback) {
michael@0 597 mOutputCallback->OnOutputStreamReady(mOutputStream);
michael@0 598 mOutputCallback = 0;
michael@0 599 mOutputStream = 0;
michael@0 600 }
michael@0 601 }
michael@0 602
michael@0 603 //-----------------------------------------------------------------------------
michael@0 604 // nsPipeInputStream methods:
michael@0 605 //-----------------------------------------------------------------------------
michael@0 606
michael@0 607 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream,
michael@0 608 nsIInputStream,
michael@0 609 nsIAsyncInputStream,
michael@0 610 nsISeekableStream,
michael@0 611 nsISearchableInputStream,
michael@0 612 nsIClassInfo)
michael@0 613
michael@0 614 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
michael@0 615 nsIInputStream,
michael@0 616 nsIAsyncInputStream,
michael@0 617 nsISeekableStream,
michael@0 618 nsISearchableInputStream)
michael@0 619
michael@0 620 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
michael@0 621
michael@0 622 nsresult
michael@0 623 nsPipeInputStream::Wait()
michael@0 624 {
michael@0 625 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
michael@0 626
michael@0 627 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 628
michael@0 629 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
michael@0 630 LOG(("III pipe input: waiting for data\n"));
michael@0 631
michael@0 632 mBlocked = true;
michael@0 633 mon.Wait();
michael@0 634 mBlocked = false;
michael@0 635
michael@0 636 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
michael@0 637 mPipe->mStatus, mAvailable));
michael@0 638 }
michael@0 639
michael@0 640 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
michael@0 641 }
michael@0 642
michael@0 643 bool
michael@0 644 nsPipeInputStream::OnInputReadable(uint32_t bytesWritten, nsPipeEvents &events)
michael@0 645 {
michael@0 646 bool result = false;
michael@0 647
michael@0 648 mAvailable += bytesWritten;
michael@0 649
michael@0 650 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
michael@0 651 events.NotifyInputReady(this, mCallback);
michael@0 652 mCallback = 0;
michael@0 653 mCallbackFlags = 0;
michael@0 654 }
michael@0 655 else if (mBlocked)
michael@0 656 result = true;
michael@0 657
michael@0 658 return result;
michael@0 659 }
michael@0 660
michael@0 661 bool
michael@0 662 nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events)
michael@0 663 {
michael@0 664 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
michael@0 665 this, reason));
michael@0 666
michael@0 667 bool result = false;
michael@0 668
michael@0 669 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
michael@0 670
michael@0 671 // force count of available bytes to zero.
michael@0 672 mAvailable = 0;
michael@0 673
michael@0 674 if (mCallback) {
michael@0 675 events.NotifyInputReady(this, mCallback);
michael@0 676 mCallback = 0;
michael@0 677 mCallbackFlags = 0;
michael@0 678 }
michael@0 679 else if (mBlocked)
michael@0 680 result = true;
michael@0 681
michael@0 682 return result;
michael@0 683 }
michael@0 684
michael@0 685 NS_IMETHODIMP_(MozExternalRefCountType)
michael@0 686 nsPipeInputStream::AddRef(void)
michael@0 687 {
michael@0 688 ++mReaderRefCnt;
michael@0 689 return mPipe->AddRef();
michael@0 690 }
michael@0 691
michael@0 692 NS_IMETHODIMP_(MozExternalRefCountType)
michael@0 693 nsPipeInputStream::Release(void)
michael@0 694 {
michael@0 695 if (--mReaderRefCnt == 0)
michael@0 696 Close();
michael@0 697 return mPipe->Release();
michael@0 698 }
michael@0 699
michael@0 700 NS_IMETHODIMP
michael@0 701 nsPipeInputStream::CloseWithStatus(nsresult reason)
michael@0 702 {
michael@0 703 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason));
michael@0 704
michael@0 705 if (NS_SUCCEEDED(reason))
michael@0 706 reason = NS_BASE_STREAM_CLOSED;
michael@0 707
michael@0 708 mPipe->OnPipeException(reason);
michael@0 709 return NS_OK;
michael@0 710 }
michael@0 711
michael@0 712 NS_IMETHODIMP
michael@0 713 nsPipeInputStream::Close()
michael@0 714 {
michael@0 715 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
michael@0 716 }
michael@0 717
michael@0 718 NS_IMETHODIMP
michael@0 719 nsPipeInputStream::Available(uint64_t *result)
michael@0 720 {
michael@0 721 // nsPipeInputStream supports under 4GB stream only
michael@0 722 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 723
michael@0 724 // return error if pipe closed
michael@0 725 if (!mAvailable && NS_FAILED(mPipe->mStatus))
michael@0 726 return mPipe->mStatus;
michael@0 727
michael@0 728 *result = (uint64_t)mAvailable;
michael@0 729 return NS_OK;
michael@0 730 }
michael@0 731
michael@0 732 NS_IMETHODIMP
michael@0 733 nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer,
michael@0 734 void *closure,
michael@0 735 uint32_t count,
michael@0 736 uint32_t *readCount)
michael@0 737 {
michael@0 738 LOG(("III ReadSegments [this=%x count=%u]\n", this, count));
michael@0 739
michael@0 740 nsresult rv = NS_OK;
michael@0 741
michael@0 742 const char *segment;
michael@0 743 uint32_t segmentLen;
michael@0 744
michael@0 745 *readCount = 0;
michael@0 746 while (count) {
michael@0 747 rv = mPipe->GetReadSegment(segment, segmentLen);
michael@0 748 if (NS_FAILED(rv)) {
michael@0 749 // ignore this error if we've already read something.
michael@0 750 if (*readCount > 0) {
michael@0 751 rv = NS_OK;
michael@0 752 break;
michael@0 753 }
michael@0 754 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
michael@0 755 // pipe is empty
michael@0 756 if (!mBlocking)
michael@0 757 break;
michael@0 758 // wait for some data to be written to the pipe
michael@0 759 rv = Wait();
michael@0 760 if (NS_SUCCEEDED(rv))
michael@0 761 continue;
michael@0 762 }
michael@0 763 // ignore this error, just return.
michael@0 764 if (rv == NS_BASE_STREAM_CLOSED) {
michael@0 765 rv = NS_OK;
michael@0 766 break;
michael@0 767 }
michael@0 768 mPipe->OnPipeException(rv);
michael@0 769 break;
michael@0 770 }
michael@0 771
michael@0 772 // read no more than count
michael@0 773 if (segmentLen > count)
michael@0 774 segmentLen = count;
michael@0 775
michael@0 776 uint32_t writeCount, originalLen = segmentLen;
michael@0 777 while (segmentLen) {
michael@0 778 writeCount = 0;
michael@0 779
michael@0 780 rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount);
michael@0 781
michael@0 782 if (NS_FAILED(rv) || writeCount == 0) {
michael@0 783 count = 0;
michael@0 784 // any errors returned from the writer end here: do not
michael@0 785 // propagate to the caller of ReadSegments.
michael@0 786 rv = NS_OK;
michael@0 787 break;
michael@0 788 }
michael@0 789
michael@0 790 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
michael@0 791 segment += writeCount;
michael@0 792 segmentLen -= writeCount;
michael@0 793 count -= writeCount;
michael@0 794 *readCount += writeCount;
michael@0 795 mLogicalOffset += writeCount;
michael@0 796 }
michael@0 797
michael@0 798 if (segmentLen < originalLen)
michael@0 799 mPipe->AdvanceReadCursor(originalLen - segmentLen);
michael@0 800 }
michael@0 801
michael@0 802 return rv;
michael@0 803 }
michael@0 804
michael@0 805 NS_IMETHODIMP
michael@0 806 nsPipeInputStream::Read(char* toBuf, uint32_t bufLen, uint32_t *readCount)
michael@0 807 {
michael@0 808 return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount);
michael@0 809 }
michael@0 810
michael@0 811 NS_IMETHODIMP
michael@0 812 nsPipeInputStream::IsNonBlocking(bool *aNonBlocking)
michael@0 813 {
michael@0 814 *aNonBlocking = !mBlocking;
michael@0 815 return NS_OK;
michael@0 816 }
michael@0 817
michael@0 818 NS_IMETHODIMP
michael@0 819 nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback,
michael@0 820 uint32_t flags,
michael@0 821 uint32_t requestedCount,
michael@0 822 nsIEventTarget *target)
michael@0 823 {
michael@0 824 LOG(("III AsyncWait [this=%x]\n", this));
michael@0 825
michael@0 826 nsPipeEvents pipeEvents;
michael@0 827 {
michael@0 828 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 829
michael@0 830 // replace a pending callback
michael@0 831 mCallback = 0;
michael@0 832 mCallbackFlags = 0;
michael@0 833
michael@0 834 if (!callback)
michael@0 835 return NS_OK;
michael@0 836
michael@0 837 nsCOMPtr<nsIInputStreamCallback> proxy;
michael@0 838 if (target) {
michael@0 839 proxy = NS_NewInputStreamReadyEvent(callback, target);
michael@0 840 callback = proxy;
michael@0 841 }
michael@0 842
michael@0 843 if (NS_FAILED(mPipe->mStatus) ||
michael@0 844 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) {
michael@0 845 // stream is already closed or readable; post event.
michael@0 846 pipeEvents.NotifyInputReady(this, callback);
michael@0 847 }
michael@0 848 else {
michael@0 849 // queue up callback object to be notified when data becomes available
michael@0 850 mCallback = callback;
michael@0 851 mCallbackFlags = flags;
michael@0 852 }
michael@0 853 }
michael@0 854 return NS_OK;
michael@0 855 }
michael@0 856
michael@0 857 NS_IMETHODIMP
michael@0 858 nsPipeInputStream::Seek(int32_t whence, int64_t offset)
michael@0 859 {
michael@0 860 NS_NOTREACHED("nsPipeInputStream::Seek");
michael@0 861 return NS_ERROR_NOT_IMPLEMENTED;
michael@0 862 }
michael@0 863
michael@0 864 NS_IMETHODIMP
michael@0 865 nsPipeInputStream::Tell(int64_t *offset)
michael@0 866 {
michael@0 867 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 868
michael@0 869 // return error if pipe closed
michael@0 870 if (!mAvailable && NS_FAILED(mPipe->mStatus))
michael@0 871 return mPipe->mStatus;
michael@0 872
michael@0 873 *offset = mLogicalOffset;
michael@0 874 return NS_OK;
michael@0 875 }
michael@0 876
michael@0 877 NS_IMETHODIMP
michael@0 878 nsPipeInputStream::SetEOF()
michael@0 879 {
michael@0 880 NS_NOTREACHED("nsPipeInputStream::SetEOF");
michael@0 881 return NS_ERROR_NOT_IMPLEMENTED;
michael@0 882 }
michael@0 883
michael@0 884 #define COMPARE(s1, s2, i) \
michael@0 885 (ignoreCase \
michael@0 886 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \
michael@0 887 : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i))
michael@0 888
michael@0 889 NS_IMETHODIMP
michael@0 890 nsPipeInputStream::Search(const char *forString,
michael@0 891 bool ignoreCase,
michael@0 892 bool *found,
michael@0 893 uint32_t *offsetSearchedTo)
michael@0 894 {
michael@0 895 LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase));
michael@0 896
michael@0 897 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 898
michael@0 899 char *cursor1, *limit1;
michael@0 900 uint32_t index = 0, offset = 0;
michael@0 901 uint32_t strLen = strlen(forString);
michael@0 902
michael@0 903 mPipe->PeekSegment(0, cursor1, limit1);
michael@0 904 if (cursor1 == limit1) {
michael@0 905 *found = false;
michael@0 906 *offsetSearchedTo = 0;
michael@0 907 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
michael@0 908 return NS_OK;
michael@0 909 }
michael@0 910
michael@0 911 while (true) {
michael@0 912 uint32_t i, len1 = limit1 - cursor1;
michael@0 913
michael@0 914 // check if the string is in the buffer segment
michael@0 915 for (i = 0; i < len1 - strLen + 1; i++) {
michael@0 916 if (COMPARE(&cursor1[i], forString, strLen) == 0) {
michael@0 917 *found = true;
michael@0 918 *offsetSearchedTo = offset + i;
michael@0 919 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
michael@0 920 return NS_OK;
michael@0 921 }
michael@0 922 }
michael@0 923
michael@0 924 // get the next segment
michael@0 925 char *cursor2, *limit2;
michael@0 926 uint32_t len2;
michael@0 927
michael@0 928 index++;
michael@0 929 offset += len1;
michael@0 930
michael@0 931 mPipe->PeekSegment(index, cursor2, limit2);
michael@0 932 if (cursor2 == limit2) {
michael@0 933 *found = false;
michael@0 934 *offsetSearchedTo = offset - strLen + 1;
michael@0 935 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
michael@0 936 return NS_OK;
michael@0 937 }
michael@0 938 len2 = limit2 - cursor2;
michael@0 939
michael@0 940 // check if the string is straddling the next buffer segment
michael@0 941 uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
michael@0 942 for (i = 0; i < lim; ++i) {
michael@0 943 uint32_t strPart1Len = strLen - i - 1;
michael@0 944 uint32_t strPart2Len = strLen - strPart1Len;
michael@0 945 const char* strPart2 = &forString[strLen - strPart2Len];
michael@0 946 uint32_t bufSeg1Offset = len1 - strPart1Len;
michael@0 947 if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 &&
michael@0 948 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
michael@0 949 *found = true;
michael@0 950 *offsetSearchedTo = offset - strPart1Len;
michael@0 951 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
michael@0 952 return NS_OK;
michael@0 953 }
michael@0 954 }
michael@0 955
michael@0 956 // finally continue with the next buffer
michael@0 957 cursor1 = cursor2;
michael@0 958 limit1 = limit2;
michael@0 959 }
michael@0 960
michael@0 961 NS_NOTREACHED("can't get here");
michael@0 962 return NS_ERROR_UNEXPECTED; // keep compiler happy
michael@0 963 }
michael@0 964
michael@0 965 //-----------------------------------------------------------------------------
michael@0 966 // nsPipeOutputStream methods:
michael@0 967 //-----------------------------------------------------------------------------
michael@0 968
michael@0 969 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
michael@0 970 nsIOutputStream,
michael@0 971 nsIAsyncOutputStream,
michael@0 972 nsIClassInfo)
michael@0 973
michael@0 974 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
michael@0 975 nsIOutputStream,
michael@0 976 nsIAsyncOutputStream)
michael@0 977
michael@0 978 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
michael@0 979
michael@0 980 nsresult
michael@0 981 nsPipeOutputStream::Wait()
michael@0 982 {
michael@0 983 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
michael@0 984
michael@0 985 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 986
michael@0 987 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
michael@0 988 LOG(("OOO pipe output: waiting for space\n"));
michael@0 989 mBlocked = true;
michael@0 990 mon.Wait();
michael@0 991 mBlocked = false;
michael@0 992 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
michael@0 993 mPipe->mStatus, mWritable));
michael@0 994 }
michael@0 995
michael@0 996 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
michael@0 997 }
michael@0 998
michael@0 999 bool
michael@0 1000 nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events)
michael@0 1001 {
michael@0 1002 bool result = false;
michael@0 1003
michael@0 1004 mWritable = true;
michael@0 1005
michael@0 1006 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
michael@0 1007 events.NotifyOutputReady(this, mCallback);
michael@0 1008 mCallback = 0;
michael@0 1009 mCallbackFlags = 0;
michael@0 1010 }
michael@0 1011 else if (mBlocked)
michael@0 1012 result = true;
michael@0 1013
michael@0 1014 return result;
michael@0 1015 }
michael@0 1016
michael@0 1017 bool
michael@0 1018 nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events)
michael@0 1019 {
michael@0 1020 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
michael@0 1021 this, reason));
michael@0 1022
michael@0 1023 bool result = false;
michael@0 1024
michael@0 1025 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
michael@0 1026 mWritable = false;
michael@0 1027
michael@0 1028 if (mCallback) {
michael@0 1029 events.NotifyOutputReady(this, mCallback);
michael@0 1030 mCallback = 0;
michael@0 1031 mCallbackFlags = 0;
michael@0 1032 }
michael@0 1033 else if (mBlocked)
michael@0 1034 result = true;
michael@0 1035
michael@0 1036 return result;
michael@0 1037 }
michael@0 1038
michael@0 1039
michael@0 1040 NS_IMETHODIMP_(MozExternalRefCountType)
michael@0 1041 nsPipeOutputStream::AddRef()
michael@0 1042 {
michael@0 1043 ++mWriterRefCnt;
michael@0 1044 return mPipe->AddRef();
michael@0 1045 }
michael@0 1046
michael@0 1047 NS_IMETHODIMP_(MozExternalRefCountType)
michael@0 1048 nsPipeOutputStream::Release()
michael@0 1049 {
michael@0 1050 if (--mWriterRefCnt == 0)
michael@0 1051 Close();
michael@0 1052 return mPipe->Release();
michael@0 1053 }
michael@0 1054
michael@0 1055 NS_IMETHODIMP
michael@0 1056 nsPipeOutputStream::CloseWithStatus(nsresult reason)
michael@0 1057 {
michael@0 1058 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason));
michael@0 1059
michael@0 1060 if (NS_SUCCEEDED(reason))
michael@0 1061 reason = NS_BASE_STREAM_CLOSED;
michael@0 1062
michael@0 1063 // input stream may remain open
michael@0 1064 mPipe->OnPipeException(reason, true);
michael@0 1065 return NS_OK;
michael@0 1066 }
michael@0 1067
michael@0 1068 NS_IMETHODIMP
michael@0 1069 nsPipeOutputStream::Close()
michael@0 1070 {
michael@0 1071 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
michael@0 1072 }
michael@0 1073
michael@0 1074 NS_IMETHODIMP
michael@0 1075 nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader,
michael@0 1076 void* closure,
michael@0 1077 uint32_t count,
michael@0 1078 uint32_t *writeCount)
michael@0 1079 {
michael@0 1080 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count));
michael@0 1081
michael@0 1082 nsresult rv = NS_OK;
michael@0 1083
michael@0 1084 char *segment;
michael@0 1085 uint32_t segmentLen;
michael@0 1086
michael@0 1087 *writeCount = 0;
michael@0 1088 while (count) {
michael@0 1089 rv = mPipe->GetWriteSegment(segment, segmentLen);
michael@0 1090 if (NS_FAILED(rv)) {
michael@0 1091 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
michael@0 1092 // pipe is full
michael@0 1093 if (!mBlocking) {
michael@0 1094 // ignore this error if we've already written something
michael@0 1095 if (*writeCount > 0)
michael@0 1096 rv = NS_OK;
michael@0 1097 break;
michael@0 1098 }
michael@0 1099 // wait for the pipe to have an empty segment.
michael@0 1100 rv = Wait();
michael@0 1101 if (NS_SUCCEEDED(rv))
michael@0 1102 continue;
michael@0 1103 }
michael@0 1104 mPipe->OnPipeException(rv);
michael@0 1105 break;
michael@0 1106 }
michael@0 1107
michael@0 1108 // write no more than count
michael@0 1109 if (segmentLen > count)
michael@0 1110 segmentLen = count;
michael@0 1111
michael@0 1112 uint32_t readCount, originalLen = segmentLen;
michael@0 1113 while (segmentLen) {
michael@0 1114 readCount = 0;
michael@0 1115
michael@0 1116 rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount);
michael@0 1117
michael@0 1118 if (NS_FAILED(rv) || readCount == 0) {
michael@0 1119 count = 0;
michael@0 1120 // any errors returned from the reader end here: do not
michael@0 1121 // propagate to the caller of WriteSegments.
michael@0 1122 rv = NS_OK;
michael@0 1123 break;
michael@0 1124 }
michael@0 1125
michael@0 1126 NS_ASSERTION(readCount <= segmentLen, "read more than expected");
michael@0 1127 segment += readCount;
michael@0 1128 segmentLen -= readCount;
michael@0 1129 count -= readCount;
michael@0 1130 *writeCount += readCount;
michael@0 1131 mLogicalOffset += readCount;
michael@0 1132 }
michael@0 1133
michael@0 1134 if (segmentLen < originalLen)
michael@0 1135 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
michael@0 1136 }
michael@0 1137
michael@0 1138 return rv;
michael@0 1139 }
michael@0 1140
michael@0 1141 static NS_METHOD
michael@0 1142 nsReadFromRawBuffer(nsIOutputStream* outStr,
michael@0 1143 void* closure,
michael@0 1144 char* toRawSegment,
michael@0 1145 uint32_t offset,
michael@0 1146 uint32_t count,
michael@0 1147 uint32_t *readCount)
michael@0 1148 {
michael@0 1149 const char* fromBuf = (const char*)closure;
michael@0 1150 memcpy(toRawSegment, &fromBuf[offset], count);
michael@0 1151 *readCount = count;
michael@0 1152 return NS_OK;
michael@0 1153 }
michael@0 1154
michael@0 1155 NS_IMETHODIMP
michael@0 1156 nsPipeOutputStream::Write(const char* fromBuf,
michael@0 1157 uint32_t bufLen,
michael@0 1158 uint32_t *writeCount)
michael@0 1159 {
michael@0 1160 return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount);
michael@0 1161 }
michael@0 1162
michael@0 1163 NS_IMETHODIMP
michael@0 1164 nsPipeOutputStream::Flush(void)
michael@0 1165 {
michael@0 1166 // nothing to do
michael@0 1167 return NS_OK;
michael@0 1168 }
michael@0 1169
michael@0 1170 static NS_METHOD
michael@0 1171 nsReadFromInputStream(nsIOutputStream* outStr,
michael@0 1172 void* closure,
michael@0 1173 char* toRawSegment,
michael@0 1174 uint32_t offset,
michael@0 1175 uint32_t count,
michael@0 1176 uint32_t *readCount)
michael@0 1177 {
michael@0 1178 nsIInputStream* fromStream = (nsIInputStream*)closure;
michael@0 1179 return fromStream->Read(toRawSegment, count, readCount);
michael@0 1180 }
michael@0 1181
michael@0 1182 NS_IMETHODIMP
michael@0 1183 nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream,
michael@0 1184 uint32_t count,
michael@0 1185 uint32_t *writeCount)
michael@0 1186 {
michael@0 1187 return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount);
michael@0 1188 }
michael@0 1189
michael@0 1190 NS_IMETHODIMP
michael@0 1191 nsPipeOutputStream::IsNonBlocking(bool *aNonBlocking)
michael@0 1192 {
michael@0 1193 *aNonBlocking = !mBlocking;
michael@0 1194 return NS_OK;
michael@0 1195 }
michael@0 1196
michael@0 1197 NS_IMETHODIMP
michael@0 1198 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
michael@0 1199 uint32_t flags,
michael@0 1200 uint32_t requestedCount,
michael@0 1201 nsIEventTarget *target)
michael@0 1202 {
michael@0 1203 LOG(("OOO AsyncWait [this=%x]\n", this));
michael@0 1204
michael@0 1205 nsPipeEvents pipeEvents;
michael@0 1206 {
michael@0 1207 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
michael@0 1208
michael@0 1209 // replace a pending callback
michael@0 1210 mCallback = 0;
michael@0 1211 mCallbackFlags = 0;
michael@0 1212
michael@0 1213 if (!callback)
michael@0 1214 return NS_OK;
michael@0 1215
michael@0 1216 nsCOMPtr<nsIOutputStreamCallback> proxy;
michael@0 1217 if (target) {
michael@0 1218 proxy = NS_NewOutputStreamReadyEvent(callback, target);
michael@0 1219 callback = proxy;
michael@0 1220 }
michael@0 1221
michael@0 1222 if (NS_FAILED(mPipe->mStatus) ||
michael@0 1223 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) {
michael@0 1224 // stream is already closed or writable; post event.
michael@0 1225 pipeEvents.NotifyOutputReady(this, callback);
michael@0 1226 }
michael@0 1227 else {
michael@0 1228 // queue up callback object to be notified when data becomes available
michael@0 1229 mCallback = callback;
michael@0 1230 mCallbackFlags = flags;
michael@0 1231 }
michael@0 1232 }
michael@0 1233 return NS_OK;
michael@0 1234 }
michael@0 1235
michael@0 1236 ////////////////////////////////////////////////////////////////////////////////
michael@0 1237
michael@0 1238 nsresult
michael@0 1239 NS_NewPipe(nsIInputStream **pipeIn,
michael@0 1240 nsIOutputStream **pipeOut,
michael@0 1241 uint32_t segmentSize,
michael@0 1242 uint32_t maxSize,
michael@0 1243 bool nonBlockingInput,
michael@0 1244 bool nonBlockingOutput)
michael@0 1245 {
michael@0 1246 if (segmentSize == 0)
michael@0 1247 segmentSize = DEFAULT_SEGMENT_SIZE;
michael@0 1248
michael@0 1249 // Handle maxSize of UINT32_MAX as a special case
michael@0 1250 uint32_t segmentCount;
michael@0 1251 if (maxSize == UINT32_MAX)
michael@0 1252 segmentCount = UINT32_MAX;
michael@0 1253 else
michael@0 1254 segmentCount = maxSize / segmentSize;
michael@0 1255
michael@0 1256 nsIAsyncInputStream *in;
michael@0 1257 nsIAsyncOutputStream *out;
michael@0 1258 nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput,
michael@0 1259 segmentSize, segmentCount);
michael@0 1260 if (NS_FAILED(rv)) return rv;
michael@0 1261
michael@0 1262 *pipeIn = in;
michael@0 1263 *pipeOut = out;
michael@0 1264 return NS_OK;
michael@0 1265 }
michael@0 1266
michael@0 1267 nsresult
michael@0 1268 NS_NewPipe2(nsIAsyncInputStream **pipeIn,
michael@0 1269 nsIAsyncOutputStream **pipeOut,
michael@0 1270 bool nonBlockingInput,
michael@0 1271 bool nonBlockingOutput,
michael@0 1272 uint32_t segmentSize,
michael@0 1273 uint32_t segmentCount)
michael@0 1274 {
michael@0 1275 nsresult rv;
michael@0 1276
michael@0 1277 nsPipe *pipe = new nsPipe();
michael@0 1278 if (!pipe)
michael@0 1279 return NS_ERROR_OUT_OF_MEMORY;
michael@0 1280
michael@0 1281 rv = pipe->Init(nonBlockingInput,
michael@0 1282 nonBlockingOutput,
michael@0 1283 segmentSize,
michael@0 1284 segmentCount);
michael@0 1285 if (NS_FAILED(rv)) {
michael@0 1286 NS_ADDREF(pipe);
michael@0 1287 NS_RELEASE(pipe);
michael@0 1288 return rv;
michael@0 1289 }
michael@0 1290
michael@0 1291 pipe->GetInputStream(pipeIn);
michael@0 1292 pipe->GetOutputStream(pipeOut);
michael@0 1293 return NS_OK;
michael@0 1294 }
michael@0 1295
michael@0 1296 nsresult
michael@0 1297 nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result)
michael@0 1298 {
michael@0 1299 if (outer)
michael@0 1300 return NS_ERROR_NO_AGGREGATION;
michael@0 1301 nsPipe *pipe = new nsPipe();
michael@0 1302 if (!pipe)
michael@0 1303 return NS_ERROR_OUT_OF_MEMORY;
michael@0 1304 NS_ADDREF(pipe);
michael@0 1305 nsresult rv = pipe->QueryInterface(iid, result);
michael@0 1306 NS_RELEASE(pipe);
michael@0 1307 return rv;
michael@0 1308 }
michael@0 1309
michael@0 1310 ////////////////////////////////////////////////////////////////////////////////

mercurial