1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/addon-sdk/source/lib/sdk/io/stream.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,436 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 +"use strict"; 1.8 + 1.9 +module.metadata = { 1.10 + "stability": "experimental" 1.11 +}; 1.12 + 1.13 +const { CC, Cc, Ci, Cu, Cr, components } = require("chrome"); 1.14 +const { EventTarget } = require("../event/target"); 1.15 +const { emit } = require("../event/core"); 1.16 +const { Buffer } = require("./buffer"); 1.17 +const { Class } = require("../core/heritage"); 1.18 +const { setTimeout } = require("../timers"); 1.19 + 1.20 + 1.21 +const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1", 1.22 + "nsIMultiplexInputStream"); 1.23 +const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1", 1.24 + "nsIAsyncStreamCopier", "init"); 1.25 +const StringInputStream = CC("@mozilla.org/io/string-input-stream;1", 1.26 + "nsIStringInputStream"); 1.27 +const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1", 1.28 + "nsIArrayBufferInputStream"); 1.29 + 1.30 +const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", 1.31 + "nsIBinaryInputStream", "setInputStream"); 1.32 +const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1", 1.33 + "nsIInputStreamPump", "init"); 1.34 + 1.35 +const threadManager = Cc["@mozilla.org/thread-manager;1"]. 1.36 + getService(Ci.nsIThreadManager); 1.37 + 1.38 +const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"]. 1.39 + getService(Ci.nsIEventTarget); 1.40 + 1.41 +let isFunction = value => typeof(value) === "function" 1.42 + 1.43 +function accessor() { 1.44 + let map = new WeakMap(); 1.45 + return function(target, value) { 1.46 + if (value) 1.47 + map.set(target, value); 1.48 + return map.get(target); 1.49 + } 1.50 +} 1.51 + 1.52 +const Stream = Class({ 1.53 + extends: EventTarget, 1.54 + initialize: function() { 1.55 + this.readable = false; 1.56 + this.writable = false; 1.57 + this.encoding = null; 1.58 + }, 1.59 + setEncoding: function setEncoding(encoding) { 1.60 + this.encoding = String(encoding).toUpperCase(); 1.61 + }, 1.62 + pipe: function pipe(target, options) { 1.63 + let source = this; 1.64 + function onData(chunk) { 1.65 + if (target.writable) { 1.66 + if (false === target.write(chunk)) 1.67 + source.pause(); 1.68 + } 1.69 + } 1.70 + function onDrain() { 1.71 + if (source.readable) 1.72 + source.resume(); 1.73 + } 1.74 + function onEnd() { 1.75 + target.end(); 1.76 + } 1.77 + function onPause() { 1.78 + source.pause(); 1.79 + } 1.80 + function onResume() { 1.81 + if (source.readable) 1.82 + source.resume(); 1.83 + } 1.84 + 1.85 + function cleanup() { 1.86 + source.removeListener("data", onData); 1.87 + target.removeListener("drain", onDrain); 1.88 + source.removeListener("end", onEnd); 1.89 + 1.90 + target.removeListener("pause", onPause); 1.91 + target.removeListener("resume", onResume); 1.92 + 1.93 + source.removeListener("end", cleanup); 1.94 + source.removeListener("close", cleanup); 1.95 + 1.96 + target.removeListener("end", cleanup); 1.97 + target.removeListener("close", cleanup); 1.98 + } 1.99 + 1.100 + if (!options || options.end !== false) 1.101 + target.on("end", onEnd); 1.102 + 1.103 + source.on("data", onData); 1.104 + target.on("drain", onDrain); 1.105 + target.on("resume", onResume); 1.106 + target.on("pause", onPause); 1.107 + 1.108 + source.on("end", cleanup); 1.109 + source.on("close", cleanup); 1.110 + 1.111 + target.on("end", cleanup); 1.112 + target.on("close", cleanup); 1.113 + 1.114 + emit(target, "pipe", source); 1.115 + }, 1.116 + pause: function pause() { 1.117 + emit(this, "pause"); 1.118 + }, 1.119 + resume: function resume() { 1.120 + emit(this, "resume"); 1.121 + }, 1.122 + destroySoon: function destroySoon() { 1.123 + this.destroy(); 1.124 + } 1.125 +}); 1.126 +exports.Stream = Stream; 1.127 + 1.128 + 1.129 +let nsIStreamListener = accessor(); 1.130 +let nsIInputStreamPump = accessor(); 1.131 +let nsIAsyncInputStream = accessor(); 1.132 +let nsIBinaryInputStream = accessor(); 1.133 + 1.134 +const StreamListener = Class({ 1.135 + initialize: function(stream) { 1.136 + this.stream = stream; 1.137 + }, 1.138 + 1.139 + // Next three methods are part of `nsIStreamListener` interface and are 1.140 + // invoked by `nsIInputStreamPump.asyncRead`. 1.141 + onDataAvailable: function(request, context, input, offset, count) { 1.142 + let stream = this.stream; 1.143 + let buffer = new ArrayBuffer(count); 1.144 + nsIBinaryInputStream(stream).readArrayBuffer(count, buffer); 1.145 + emit(stream, "data", new Buffer(buffer)); 1.146 + }, 1.147 + 1.148 + // Next two methods implement `nsIRequestObserver` interface and are invoked 1.149 + // by `nsIInputStreamPump.asyncRead`. 1.150 + onStartRequest: function() {}, 1.151 + // Called to signify the end of an asynchronous request. We only care to 1.152 + // discover errors. 1.153 + onStopRequest: function(request, context, status) { 1.154 + let stream = this.stream; 1.155 + stream.readable = false; 1.156 + if (!components.isSuccessCode(status)) 1.157 + emit(stream, "error", status); 1.158 + else 1.159 + emit(stream, "end"); 1.160 + } 1.161 +}); 1.162 + 1.163 + 1.164 +const InputStream = Class({ 1.165 + extends: Stream, 1.166 + readable: false, 1.167 + paused: false, 1.168 + initialize: function initialize(options) { 1.169 + let { asyncInputStream } = options; 1.170 + 1.171 + this.readable = true; 1.172 + 1.173 + let binaryInputStream = new BinaryInputStream(asyncInputStream); 1.174 + let inputStreamPump = new InputStreamPump(asyncInputStream, 1.175 + -1, -1, 0, 0, false); 1.176 + let streamListener = new StreamListener(this); 1.177 + 1.178 + nsIAsyncInputStream(this, asyncInputStream); 1.179 + nsIInputStreamPump(this, inputStreamPump); 1.180 + nsIBinaryInputStream(this, binaryInputStream); 1.181 + nsIStreamListener(this, streamListener); 1.182 + 1.183 + this.asyncInputStream = asyncInputStream; 1.184 + this.inputStreamPump = inputStreamPump; 1.185 + this.binaryInputStream = binaryInputStream; 1.186 + }, 1.187 + get status() nsIInputStreamPump(this).status, 1.188 + read: function() { 1.189 + nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null); 1.190 + }, 1.191 + pause: function pause() { 1.192 + this.paused = true; 1.193 + nsIInputStreamPump(this).suspend(); 1.194 + emit(this, "paused"); 1.195 + }, 1.196 + resume: function resume() { 1.197 + this.paused = false; 1.198 + nsIInputStreamPump(this).resume(); 1.199 + emit(this, "resume"); 1.200 + }, 1.201 + close: function close() { 1.202 + this.readable = false; 1.203 + nsIInputStreamPump(this).cancel(Cr.NS_OK); 1.204 + nsIBinaryInputStream(this).close(); 1.205 + nsIAsyncInputStream(this).close(); 1.206 + }, 1.207 + destroy: function destroy() { 1.208 + this.close(); 1.209 + 1.210 + nsIInputStreamPump(this); 1.211 + nsIAsyncInputStream(this); 1.212 + nsIBinaryInputStream(this); 1.213 + nsIStreamListener(this); 1.214 + } 1.215 +}); 1.216 +exports.InputStream = InputStream; 1.217 + 1.218 + 1.219 + 1.220 +let nsIRequestObserver = accessor(); 1.221 +let nsIAsyncOutputStream = accessor(); 1.222 +let nsIAsyncStreamCopier = accessor(); 1.223 +let nsIMultiplexInputStream = accessor(); 1.224 + 1.225 +const RequestObserver = Class({ 1.226 + initialize: function(stream) { 1.227 + this.stream = stream; 1.228 + }, 1.229 + // Method is part of `nsIRequestObserver` interface that is 1.230 + // invoked by `nsIAsyncStreamCopier.asyncCopy`. 1.231 + onStartRequest: function() {}, 1.232 + // Method is part of `nsIRequestObserver` interface that is 1.233 + // invoked by `nsIAsyncStreamCopier.asyncCopy`. 1.234 + onStopRequest: function(request, context, status) { 1.235 + let stream = this.stream; 1.236 + stream.drained = true; 1.237 + 1.238 + // Remove copied chunk. 1.239 + let multiplexInputStream = nsIMultiplexInputStream(stream); 1.240 + multiplexInputStream.removeStream(0); 1.241 + 1.242 + // If there was an error report. 1.243 + if (!components.isSuccessCode(status)) 1.244 + emit(stream, "error", status); 1.245 + 1.246 + // If there more chunks in queue then flush them. 1.247 + else if (multiplexInputStream.count) 1.248 + stream.flush(); 1.249 + 1.250 + // If stream is still writable notify that queue has drained. 1.251 + else if (stream.writable) 1.252 + emit(stream, "drain"); 1.253 + 1.254 + // If stream is no longer writable close it. 1.255 + else { 1.256 + nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK); 1.257 + nsIMultiplexInputStream(stream).close(); 1.258 + nsIAsyncOutputStream(stream).close(); 1.259 + nsIAsyncOutputStream(stream).flush(); 1.260 + } 1.261 + } 1.262 +}); 1.263 + 1.264 +const OutputStreamCallback = Class({ 1.265 + initialize: function(stream) { 1.266 + this.stream = stream; 1.267 + }, 1.268 + // Method is part of `nsIOutputStreamCallback` interface that 1.269 + // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered 1.270 + // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior, 1.271 + // causing the `onOutputStreamReady` notification to be suppressed until 1.272 + // the stream becomes closed. 1.273 + onOutputStreamReady: function(nsIAsyncOutputStream) { 1.274 + emit(this.stream, "finish"); 1.275 + } 1.276 +}); 1.277 + 1.278 +const OutputStream = Class({ 1.279 + extends: Stream, 1.280 + writable: false, 1.281 + drained: true, 1.282 + get bufferSize() { 1.283 + let multiplexInputStream = nsIMultiplexInputStream(this); 1.284 + return multiplexInputStream && multiplexInputStream.available(); 1.285 + }, 1.286 + initialize: function initialize(options) { 1.287 + let { asyncOutputStream, output } = options; 1.288 + this.writable = true; 1.289 + 1.290 + // Ensure that `nsIAsyncOutputStream` was provided. 1.291 + asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream); 1.292 + 1.293 + // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former 1.294 + // is used to queue written data chunks that `asyncStreamCopier` will 1.295 + // asynchronously drain into `asyncOutputStream`. 1.296 + let multiplexInputStream = MultiplexInputStream(); 1.297 + let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream, 1.298 + output || asyncOutputStream, 1.299 + eventTarget, 1.300 + // nsIMultiplexInputStream 1.301 + // implemnts .readSegments() 1.302 + true, 1.303 + // nsIOutputStream may or 1.304 + // may not implemnet 1.305 + // .writeSegments(). 1.306 + false, 1.307 + // Use default buffer size. 1.308 + null, 1.309 + // Should not close an input. 1.310 + false, 1.311 + // Should not close an output. 1.312 + false); 1.313 + 1.314 + // Create `requestObserver` implementing `nsIRequestObserver` interface 1.315 + // in the constructor that's gonna be reused across several flushes. 1.316 + let requestObserver = RequestObserver(this); 1.317 + 1.318 + 1.319 + // Create observer that implements `nsIOutputStreamCallback` and register 1.320 + // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once 1.321 + // `nsIAsyncOutputStream` is closed. 1.322 + asyncOutputStream.asyncWait(OutputStreamCallback(this), 1.323 + asyncOutputStream.WAIT_CLOSURE_ONLY, 1.324 + 0, 1.325 + threadManager.currentThread); 1.326 + 1.327 + nsIRequestObserver(this, requestObserver); 1.328 + nsIAsyncOutputStream(this, asyncOutputStream); 1.329 + nsIMultiplexInputStream(this, multiplexInputStream); 1.330 + nsIAsyncStreamCopier(this, asyncStreamCopier); 1.331 + 1.332 + this.asyncOutputStream = asyncOutputStream; 1.333 + this.multiplexInputStream = multiplexInputStream; 1.334 + this.asyncStreamCopier = asyncStreamCopier; 1.335 + }, 1.336 + write: function write(content, encoding, callback) { 1.337 + if (isFunction(encoding)) { 1.338 + callback = encoding; 1.339 + encoding = callback; 1.340 + } 1.341 + 1.342 + // If stream is not writable we throw an error. 1.343 + if (!this.writable) throw Error("stream is not writable"); 1.344 + 1.345 + let chunk = null; 1.346 + 1.347 + // If content is not a buffer then we create one out of it. 1.348 + if (Buffer.isBuffer(content)) { 1.349 + chunk = new ArrayBufferInputStream(); 1.350 + chunk.setData(content.buffer, 0, content.length); 1.351 + } 1.352 + else { 1.353 + chunk = new StringInputStream(); 1.354 + chunk.setData(content, content.length); 1.355 + } 1.356 + 1.357 + if (callback) 1.358 + this.once("drain", callback); 1.359 + 1.360 + // Queue up chunk to be copied to output sync. 1.361 + nsIMultiplexInputStream(this).appendStream(chunk); 1.362 + this.flush(); 1.363 + 1.364 + return this.drained; 1.365 + }, 1.366 + flush: function() { 1.367 + if (this.drained) { 1.368 + this.drained = false; 1.369 + nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null); 1.370 + } 1.371 + }, 1.372 + end: function end(content, encoding, callback) { 1.373 + if (isFunction(content)) { 1.374 + callback = content 1.375 + content = callback 1.376 + } 1.377 + if (isFunction(encoding)) { 1.378 + callback = encoding 1.379 + encoding = callback 1.380 + } 1.381 + 1.382 + // Setting a listener to "finish" event if passed. 1.383 + if (isFunction(callback)) 1.384 + this.once("finish", callback); 1.385 + 1.386 + 1.387 + if (content) 1.388 + this.write(content, encoding); 1.389 + this.writable = false; 1.390 + 1.391 + // Close `asyncOutputStream` only if output has drained. If it's 1.392 + // not drained than `asyncStreamCopier` is busy writing, so let 1.393 + // it finish. Note that since `this.writable` is false copier will 1.394 + // close `asyncOutputStream` once output drains. 1.395 + if (this.drained) 1.396 + nsIAsyncOutputStream(this).close(); 1.397 + }, 1.398 + destroy: function destroy() { 1.399 + nsIAsyncOutputStream(this).close(); 1.400 + nsIAsyncOutputStream(this); 1.401 + nsIMultiplexInputStream(this); 1.402 + nsIAsyncStreamCopier(this); 1.403 + nsIRequestObserver(this); 1.404 + } 1.405 +}); 1.406 +exports.OutputStream = OutputStream; 1.407 + 1.408 +const DuplexStream = Class({ 1.409 + extends: Stream, 1.410 + implements: [InputStream, OutputStream], 1.411 + allowHalfOpen: true, 1.412 + initialize: function initialize(options) { 1.413 + options = options || {}; 1.414 + let { readable, writable, allowHalfOpen } = options; 1.415 + 1.416 + InputStream.prototype.initialize.call(this, options); 1.417 + OutputStream.prototype.initialize.call(this, options); 1.418 + 1.419 + if (readable === false) 1.420 + this.readable = false; 1.421 + 1.422 + if (writable === false) 1.423 + this.writable = false; 1.424 + 1.425 + if (allowHalfOpen === false) 1.426 + this.allowHalfOpen = false; 1.427 + 1.428 + // If in a half open state and it's disabled enforce end. 1.429 + this.once("end", () => { 1.430 + if (!this.allowHalfOpen && (!this.readable || !this.writable)) 1.431 + this.end(); 1.432 + }); 1.433 + }, 1.434 + destroy: function destroy(error) { 1.435 + InputStream.prototype.destroy.call(this); 1.436 + OutputStream.prototype.destroy.call(this); 1.437 + } 1.438 +}); 1.439 +exports.DuplexStream = DuplexStream;