michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: "use strict"; michael@0: michael@0: module.metadata = { michael@0: "stability": "experimental" michael@0: }; michael@0: michael@0: const { CC, Cc, Ci, Cu, Cr, components } = require("chrome"); michael@0: const { EventTarget } = require("../event/target"); michael@0: const { emit } = require("../event/core"); michael@0: const { Buffer } = require("./buffer"); michael@0: const { Class } = require("../core/heritage"); michael@0: const { setTimeout } = require("../timers"); michael@0: michael@0: michael@0: const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1", michael@0: "nsIMultiplexInputStream"); michael@0: const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1", michael@0: "nsIAsyncStreamCopier", "init"); michael@0: const StringInputStream = CC("@mozilla.org/io/string-input-stream;1", michael@0: "nsIStringInputStream"); michael@0: const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1", michael@0: "nsIArrayBufferInputStream"); michael@0: michael@0: const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", michael@0: "nsIBinaryInputStream", "setInputStream"); michael@0: const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1", michael@0: "nsIInputStreamPump", "init"); michael@0: michael@0: const threadManager = Cc["@mozilla.org/thread-manager;1"]. michael@0: getService(Ci.nsIThreadManager); michael@0: michael@0: const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"]. michael@0: getService(Ci.nsIEventTarget); michael@0: michael@0: let isFunction = value => typeof(value) === "function" michael@0: michael@0: function accessor() { michael@0: let map = new WeakMap(); michael@0: return function(target, value) { michael@0: if (value) michael@0: map.set(target, value); michael@0: return map.get(target); michael@0: } michael@0: } michael@0: michael@0: const Stream = Class({ michael@0: extends: EventTarget, michael@0: initialize: function() { michael@0: this.readable = false; michael@0: this.writable = false; michael@0: this.encoding = null; michael@0: }, michael@0: setEncoding: function setEncoding(encoding) { michael@0: this.encoding = String(encoding).toUpperCase(); michael@0: }, michael@0: pipe: function pipe(target, options) { michael@0: let source = this; michael@0: function onData(chunk) { michael@0: if (target.writable) { michael@0: if (false === target.write(chunk)) michael@0: source.pause(); michael@0: } michael@0: } michael@0: function onDrain() { michael@0: if (source.readable) michael@0: source.resume(); michael@0: } michael@0: function onEnd() { michael@0: target.end(); michael@0: } michael@0: function onPause() { michael@0: source.pause(); michael@0: } michael@0: function onResume() { michael@0: if (source.readable) michael@0: source.resume(); michael@0: } michael@0: michael@0: function cleanup() { michael@0: source.removeListener("data", onData); michael@0: target.removeListener("drain", onDrain); michael@0: source.removeListener("end", onEnd); michael@0: michael@0: target.removeListener("pause", onPause); michael@0: target.removeListener("resume", onResume); michael@0: michael@0: source.removeListener("end", cleanup); michael@0: source.removeListener("close", cleanup); michael@0: michael@0: target.removeListener("end", cleanup); michael@0: target.removeListener("close", cleanup); michael@0: } michael@0: michael@0: if (!options || options.end !== false) michael@0: target.on("end", onEnd); michael@0: michael@0: source.on("data", onData); michael@0: target.on("drain", onDrain); michael@0: target.on("resume", onResume); michael@0: target.on("pause", onPause); michael@0: michael@0: source.on("end", cleanup); michael@0: source.on("close", cleanup); michael@0: michael@0: target.on("end", cleanup); michael@0: target.on("close", cleanup); michael@0: michael@0: emit(target, "pipe", source); michael@0: }, michael@0: pause: function pause() { michael@0: emit(this, "pause"); michael@0: }, michael@0: resume: function resume() { michael@0: emit(this, "resume"); michael@0: }, michael@0: destroySoon: function destroySoon() { michael@0: this.destroy(); michael@0: } michael@0: }); michael@0: exports.Stream = Stream; michael@0: michael@0: michael@0: let nsIStreamListener = accessor(); michael@0: let nsIInputStreamPump = accessor(); michael@0: let nsIAsyncInputStream = accessor(); michael@0: let nsIBinaryInputStream = accessor(); michael@0: michael@0: const StreamListener = Class({ michael@0: initialize: function(stream) { michael@0: this.stream = stream; michael@0: }, michael@0: michael@0: // Next three methods are part of `nsIStreamListener` interface and are michael@0: // invoked by `nsIInputStreamPump.asyncRead`. michael@0: onDataAvailable: function(request, context, input, offset, count) { michael@0: let stream = this.stream; michael@0: let buffer = new ArrayBuffer(count); michael@0: nsIBinaryInputStream(stream).readArrayBuffer(count, buffer); michael@0: emit(stream, "data", new Buffer(buffer)); michael@0: }, michael@0: michael@0: // Next two methods implement `nsIRequestObserver` interface and are invoked michael@0: // by `nsIInputStreamPump.asyncRead`. michael@0: onStartRequest: function() {}, michael@0: // Called to signify the end of an asynchronous request. We only care to michael@0: // discover errors. michael@0: onStopRequest: function(request, context, status) { michael@0: let stream = this.stream; michael@0: stream.readable = false; michael@0: if (!components.isSuccessCode(status)) michael@0: emit(stream, "error", status); michael@0: else michael@0: emit(stream, "end"); michael@0: } michael@0: }); michael@0: michael@0: michael@0: const InputStream = Class({ michael@0: extends: Stream, michael@0: readable: false, michael@0: paused: false, michael@0: initialize: function initialize(options) { michael@0: let { asyncInputStream } = options; michael@0: michael@0: this.readable = true; michael@0: michael@0: let binaryInputStream = new BinaryInputStream(asyncInputStream); michael@0: let inputStreamPump = new InputStreamPump(asyncInputStream, michael@0: -1, -1, 0, 0, false); michael@0: let streamListener = new StreamListener(this); michael@0: michael@0: nsIAsyncInputStream(this, asyncInputStream); michael@0: nsIInputStreamPump(this, inputStreamPump); michael@0: nsIBinaryInputStream(this, binaryInputStream); michael@0: nsIStreamListener(this, streamListener); michael@0: michael@0: this.asyncInputStream = asyncInputStream; michael@0: this.inputStreamPump = inputStreamPump; michael@0: this.binaryInputStream = binaryInputStream; michael@0: }, michael@0: get status() nsIInputStreamPump(this).status, michael@0: read: function() { michael@0: nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null); michael@0: }, michael@0: pause: function pause() { michael@0: this.paused = true; michael@0: nsIInputStreamPump(this).suspend(); michael@0: emit(this, "paused"); michael@0: }, michael@0: resume: function resume() { michael@0: this.paused = false; michael@0: nsIInputStreamPump(this).resume(); michael@0: emit(this, "resume"); michael@0: }, michael@0: close: function close() { michael@0: this.readable = false; michael@0: nsIInputStreamPump(this).cancel(Cr.NS_OK); michael@0: nsIBinaryInputStream(this).close(); michael@0: nsIAsyncInputStream(this).close(); michael@0: }, michael@0: destroy: function destroy() { michael@0: this.close(); michael@0: michael@0: nsIInputStreamPump(this); michael@0: nsIAsyncInputStream(this); michael@0: nsIBinaryInputStream(this); michael@0: nsIStreamListener(this); michael@0: } michael@0: }); michael@0: exports.InputStream = InputStream; michael@0: michael@0: michael@0: michael@0: let nsIRequestObserver = accessor(); michael@0: let nsIAsyncOutputStream = accessor(); michael@0: let nsIAsyncStreamCopier = accessor(); michael@0: let nsIMultiplexInputStream = accessor(); michael@0: michael@0: const RequestObserver = Class({ michael@0: initialize: function(stream) { michael@0: this.stream = stream; michael@0: }, michael@0: // Method is part of `nsIRequestObserver` interface that is michael@0: // invoked by `nsIAsyncStreamCopier.asyncCopy`. michael@0: onStartRequest: function() {}, michael@0: // Method is part of `nsIRequestObserver` interface that is michael@0: // invoked by `nsIAsyncStreamCopier.asyncCopy`. michael@0: onStopRequest: function(request, context, status) { michael@0: let stream = this.stream; michael@0: stream.drained = true; michael@0: michael@0: // Remove copied chunk. michael@0: let multiplexInputStream = nsIMultiplexInputStream(stream); michael@0: multiplexInputStream.removeStream(0); michael@0: michael@0: // If there was an error report. michael@0: if (!components.isSuccessCode(status)) michael@0: emit(stream, "error", status); michael@0: michael@0: // If there more chunks in queue then flush them. michael@0: else if (multiplexInputStream.count) michael@0: stream.flush(); michael@0: michael@0: // If stream is still writable notify that queue has drained. michael@0: else if (stream.writable) michael@0: emit(stream, "drain"); michael@0: michael@0: // If stream is no longer writable close it. michael@0: else { michael@0: nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK); michael@0: nsIMultiplexInputStream(stream).close(); michael@0: nsIAsyncOutputStream(stream).close(); michael@0: nsIAsyncOutputStream(stream).flush(); michael@0: } michael@0: } michael@0: }); michael@0: michael@0: const OutputStreamCallback = Class({ michael@0: initialize: function(stream) { michael@0: this.stream = stream; michael@0: }, michael@0: // Method is part of `nsIOutputStreamCallback` interface that michael@0: // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered michael@0: // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior, michael@0: // causing the `onOutputStreamReady` notification to be suppressed until michael@0: // the stream becomes closed. michael@0: onOutputStreamReady: function(nsIAsyncOutputStream) { michael@0: emit(this.stream, "finish"); michael@0: } michael@0: }); michael@0: michael@0: const OutputStream = Class({ michael@0: extends: Stream, michael@0: writable: false, michael@0: drained: true, michael@0: get bufferSize() { michael@0: let multiplexInputStream = nsIMultiplexInputStream(this); michael@0: return multiplexInputStream && multiplexInputStream.available(); michael@0: }, michael@0: initialize: function initialize(options) { michael@0: let { asyncOutputStream, output } = options; michael@0: this.writable = true; michael@0: michael@0: // Ensure that `nsIAsyncOutputStream` was provided. michael@0: asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream); michael@0: michael@0: // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former michael@0: // is used to queue written data chunks that `asyncStreamCopier` will michael@0: // asynchronously drain into `asyncOutputStream`. michael@0: let multiplexInputStream = MultiplexInputStream(); michael@0: let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream, michael@0: output || asyncOutputStream, michael@0: eventTarget, michael@0: // nsIMultiplexInputStream michael@0: // implemnts .readSegments() michael@0: true, michael@0: // nsIOutputStream may or michael@0: // may not implemnet michael@0: // .writeSegments(). michael@0: false, michael@0: // Use default buffer size. michael@0: null, michael@0: // Should not close an input. michael@0: false, michael@0: // Should not close an output. michael@0: false); michael@0: michael@0: // Create `requestObserver` implementing `nsIRequestObserver` interface michael@0: // in the constructor that's gonna be reused across several flushes. michael@0: let requestObserver = RequestObserver(this); michael@0: michael@0: michael@0: // Create observer that implements `nsIOutputStreamCallback` and register michael@0: // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once michael@0: // `nsIAsyncOutputStream` is closed. michael@0: asyncOutputStream.asyncWait(OutputStreamCallback(this), michael@0: asyncOutputStream.WAIT_CLOSURE_ONLY, michael@0: 0, michael@0: threadManager.currentThread); michael@0: michael@0: nsIRequestObserver(this, requestObserver); michael@0: nsIAsyncOutputStream(this, asyncOutputStream); michael@0: nsIMultiplexInputStream(this, multiplexInputStream); michael@0: nsIAsyncStreamCopier(this, asyncStreamCopier); michael@0: michael@0: this.asyncOutputStream = asyncOutputStream; michael@0: this.multiplexInputStream = multiplexInputStream; michael@0: this.asyncStreamCopier = asyncStreamCopier; michael@0: }, michael@0: write: function write(content, encoding, callback) { michael@0: if (isFunction(encoding)) { michael@0: callback = encoding; michael@0: encoding = callback; michael@0: } michael@0: michael@0: // If stream is not writable we throw an error. michael@0: if (!this.writable) throw Error("stream is not writable"); michael@0: michael@0: let chunk = null; michael@0: michael@0: // If content is not a buffer then we create one out of it. michael@0: if (Buffer.isBuffer(content)) { michael@0: chunk = new ArrayBufferInputStream(); michael@0: chunk.setData(content.buffer, 0, content.length); michael@0: } michael@0: else { michael@0: chunk = new StringInputStream(); michael@0: chunk.setData(content, content.length); michael@0: } michael@0: michael@0: if (callback) michael@0: this.once("drain", callback); michael@0: michael@0: // Queue up chunk to be copied to output sync. michael@0: nsIMultiplexInputStream(this).appendStream(chunk); michael@0: this.flush(); michael@0: michael@0: return this.drained; michael@0: }, michael@0: flush: function() { michael@0: if (this.drained) { michael@0: this.drained = false; michael@0: nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null); michael@0: } michael@0: }, michael@0: end: function end(content, encoding, callback) { michael@0: if (isFunction(content)) { michael@0: callback = content michael@0: content = callback michael@0: } michael@0: if (isFunction(encoding)) { michael@0: callback = encoding michael@0: encoding = callback michael@0: } michael@0: michael@0: // Setting a listener to "finish" event if passed. michael@0: if (isFunction(callback)) michael@0: this.once("finish", callback); michael@0: michael@0: michael@0: if (content) michael@0: this.write(content, encoding); michael@0: this.writable = false; michael@0: michael@0: // Close `asyncOutputStream` only if output has drained. If it's michael@0: // not drained than `asyncStreamCopier` is busy writing, so let michael@0: // it finish. Note that since `this.writable` is false copier will michael@0: // close `asyncOutputStream` once output drains. michael@0: if (this.drained) michael@0: nsIAsyncOutputStream(this).close(); michael@0: }, michael@0: destroy: function destroy() { michael@0: nsIAsyncOutputStream(this).close(); michael@0: nsIAsyncOutputStream(this); michael@0: nsIMultiplexInputStream(this); michael@0: nsIAsyncStreamCopier(this); michael@0: nsIRequestObserver(this); michael@0: } michael@0: }); michael@0: exports.OutputStream = OutputStream; michael@0: michael@0: const DuplexStream = Class({ michael@0: extends: Stream, michael@0: implements: [InputStream, OutputStream], michael@0: allowHalfOpen: true, michael@0: initialize: function initialize(options) { michael@0: options = options || {}; michael@0: let { readable, writable, allowHalfOpen } = options; michael@0: michael@0: InputStream.prototype.initialize.call(this, options); michael@0: OutputStream.prototype.initialize.call(this, options); michael@0: michael@0: if (readable === false) michael@0: this.readable = false; michael@0: michael@0: if (writable === false) michael@0: this.writable = false; michael@0: michael@0: if (allowHalfOpen === false) michael@0: this.allowHalfOpen = false; michael@0: michael@0: // If in a half open state and it's disabled enforce end. michael@0: this.once("end", () => { michael@0: if (!this.allowHalfOpen && (!this.readable || !this.writable)) michael@0: this.end(); michael@0: }); michael@0: }, michael@0: destroy: function destroy(error) { michael@0: InputStream.prototype.destroy.call(this); michael@0: OutputStream.prototype.destroy.call(this); michael@0: } michael@0: }); michael@0: exports.DuplexStream = DuplexStream;