addon-sdk/source/lib/sdk/io/stream.js

changeset 0
6474c204b198
equal deleted inserted replaced
-1:000000000000 0:13e71684a540
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 "use strict";
5
6 module.metadata = {
7 "stability": "experimental"
8 };
9
10 const { CC, Cc, Ci, Cu, Cr, components } = require("chrome");
11 const { EventTarget } = require("../event/target");
12 const { emit } = require("../event/core");
13 const { Buffer } = require("./buffer");
14 const { Class } = require("../core/heritage");
15 const { setTimeout } = require("../timers");
16
17
18 const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1",
19 "nsIMultiplexInputStream");
20 const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
21 "nsIAsyncStreamCopier", "init");
22 const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
23 "nsIStringInputStream");
24 const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
25 "nsIArrayBufferInputStream");
26
27 const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
28 "nsIBinaryInputStream", "setInputStream");
29 const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1",
30 "nsIInputStreamPump", "init");
31
32 const threadManager = Cc["@mozilla.org/thread-manager;1"].
33 getService(Ci.nsIThreadManager);
34
35 const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"].
36 getService(Ci.nsIEventTarget);
37
38 let isFunction = value => typeof(value) === "function"
39
40 function accessor() {
41 let map = new WeakMap();
42 return function(target, value) {
43 if (value)
44 map.set(target, value);
45 return map.get(target);
46 }
47 }
48
49 const Stream = Class({
50 extends: EventTarget,
51 initialize: function() {
52 this.readable = false;
53 this.writable = false;
54 this.encoding = null;
55 },
56 setEncoding: function setEncoding(encoding) {
57 this.encoding = String(encoding).toUpperCase();
58 },
59 pipe: function pipe(target, options) {
60 let source = this;
61 function onData(chunk) {
62 if (target.writable) {
63 if (false === target.write(chunk))
64 source.pause();
65 }
66 }
67 function onDrain() {
68 if (source.readable)
69 source.resume();
70 }
71 function onEnd() {
72 target.end();
73 }
74 function onPause() {
75 source.pause();
76 }
77 function onResume() {
78 if (source.readable)
79 source.resume();
80 }
81
82 function cleanup() {
83 source.removeListener("data", onData);
84 target.removeListener("drain", onDrain);
85 source.removeListener("end", onEnd);
86
87 target.removeListener("pause", onPause);
88 target.removeListener("resume", onResume);
89
90 source.removeListener("end", cleanup);
91 source.removeListener("close", cleanup);
92
93 target.removeListener("end", cleanup);
94 target.removeListener("close", cleanup);
95 }
96
97 if (!options || options.end !== false)
98 target.on("end", onEnd);
99
100 source.on("data", onData);
101 target.on("drain", onDrain);
102 target.on("resume", onResume);
103 target.on("pause", onPause);
104
105 source.on("end", cleanup);
106 source.on("close", cleanup);
107
108 target.on("end", cleanup);
109 target.on("close", cleanup);
110
111 emit(target, "pipe", source);
112 },
113 pause: function pause() {
114 emit(this, "pause");
115 },
116 resume: function resume() {
117 emit(this, "resume");
118 },
119 destroySoon: function destroySoon() {
120 this.destroy();
121 }
122 });
123 exports.Stream = Stream;
124
125
126 let nsIStreamListener = accessor();
127 let nsIInputStreamPump = accessor();
128 let nsIAsyncInputStream = accessor();
129 let nsIBinaryInputStream = accessor();
130
131 const StreamListener = Class({
132 initialize: function(stream) {
133 this.stream = stream;
134 },
135
136 // Next three methods are part of `nsIStreamListener` interface and are
137 // invoked by `nsIInputStreamPump.asyncRead`.
138 onDataAvailable: function(request, context, input, offset, count) {
139 let stream = this.stream;
140 let buffer = new ArrayBuffer(count);
141 nsIBinaryInputStream(stream).readArrayBuffer(count, buffer);
142 emit(stream, "data", new Buffer(buffer));
143 },
144
145 // Next two methods implement `nsIRequestObserver` interface and are invoked
146 // by `nsIInputStreamPump.asyncRead`.
147 onStartRequest: function() {},
148 // Called to signify the end of an asynchronous request. We only care to
149 // discover errors.
150 onStopRequest: function(request, context, status) {
151 let stream = this.stream;
152 stream.readable = false;
153 if (!components.isSuccessCode(status))
154 emit(stream, "error", status);
155 else
156 emit(stream, "end");
157 }
158 });
159
160
161 const InputStream = Class({
162 extends: Stream,
163 readable: false,
164 paused: false,
165 initialize: function initialize(options) {
166 let { asyncInputStream } = options;
167
168 this.readable = true;
169
170 let binaryInputStream = new BinaryInputStream(asyncInputStream);
171 let inputStreamPump = new InputStreamPump(asyncInputStream,
172 -1, -1, 0, 0, false);
173 let streamListener = new StreamListener(this);
174
175 nsIAsyncInputStream(this, asyncInputStream);
176 nsIInputStreamPump(this, inputStreamPump);
177 nsIBinaryInputStream(this, binaryInputStream);
178 nsIStreamListener(this, streamListener);
179
180 this.asyncInputStream = asyncInputStream;
181 this.inputStreamPump = inputStreamPump;
182 this.binaryInputStream = binaryInputStream;
183 },
184 get status() nsIInputStreamPump(this).status,
185 read: function() {
186 nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null);
187 },
188 pause: function pause() {
189 this.paused = true;
190 nsIInputStreamPump(this).suspend();
191 emit(this, "paused");
192 },
193 resume: function resume() {
194 this.paused = false;
195 nsIInputStreamPump(this).resume();
196 emit(this, "resume");
197 },
198 close: function close() {
199 this.readable = false;
200 nsIInputStreamPump(this).cancel(Cr.NS_OK);
201 nsIBinaryInputStream(this).close();
202 nsIAsyncInputStream(this).close();
203 },
204 destroy: function destroy() {
205 this.close();
206
207 nsIInputStreamPump(this);
208 nsIAsyncInputStream(this);
209 nsIBinaryInputStream(this);
210 nsIStreamListener(this);
211 }
212 });
213 exports.InputStream = InputStream;
214
215
216
217 let nsIRequestObserver = accessor();
218 let nsIAsyncOutputStream = accessor();
219 let nsIAsyncStreamCopier = accessor();
220 let nsIMultiplexInputStream = accessor();
221
222 const RequestObserver = Class({
223 initialize: function(stream) {
224 this.stream = stream;
225 },
226 // Method is part of `nsIRequestObserver` interface that is
227 // invoked by `nsIAsyncStreamCopier.asyncCopy`.
228 onStartRequest: function() {},
229 // Method is part of `nsIRequestObserver` interface that is
230 // invoked by `nsIAsyncStreamCopier.asyncCopy`.
231 onStopRequest: function(request, context, status) {
232 let stream = this.stream;
233 stream.drained = true;
234
235 // Remove copied chunk.
236 let multiplexInputStream = nsIMultiplexInputStream(stream);
237 multiplexInputStream.removeStream(0);
238
239 // If there was an error report.
240 if (!components.isSuccessCode(status))
241 emit(stream, "error", status);
242
243 // If there more chunks in queue then flush them.
244 else if (multiplexInputStream.count)
245 stream.flush();
246
247 // If stream is still writable notify that queue has drained.
248 else if (stream.writable)
249 emit(stream, "drain");
250
251 // If stream is no longer writable close it.
252 else {
253 nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK);
254 nsIMultiplexInputStream(stream).close();
255 nsIAsyncOutputStream(stream).close();
256 nsIAsyncOutputStream(stream).flush();
257 }
258 }
259 });
260
261 const OutputStreamCallback = Class({
262 initialize: function(stream) {
263 this.stream = stream;
264 },
265 // Method is part of `nsIOutputStreamCallback` interface that
266 // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered
267 // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior,
268 // causing the `onOutputStreamReady` notification to be suppressed until
269 // the stream becomes closed.
270 onOutputStreamReady: function(nsIAsyncOutputStream) {
271 emit(this.stream, "finish");
272 }
273 });
274
275 const OutputStream = Class({
276 extends: Stream,
277 writable: false,
278 drained: true,
279 get bufferSize() {
280 let multiplexInputStream = nsIMultiplexInputStream(this);
281 return multiplexInputStream && multiplexInputStream.available();
282 },
283 initialize: function initialize(options) {
284 let { asyncOutputStream, output } = options;
285 this.writable = true;
286
287 // Ensure that `nsIAsyncOutputStream` was provided.
288 asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream);
289
290 // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former
291 // is used to queue written data chunks that `asyncStreamCopier` will
292 // asynchronously drain into `asyncOutputStream`.
293 let multiplexInputStream = MultiplexInputStream();
294 let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream,
295 output || asyncOutputStream,
296 eventTarget,
297 // nsIMultiplexInputStream
298 // implemnts .readSegments()
299 true,
300 // nsIOutputStream may or
301 // may not implemnet
302 // .writeSegments().
303 false,
304 // Use default buffer size.
305 null,
306 // Should not close an input.
307 false,
308 // Should not close an output.
309 false);
310
311 // Create `requestObserver` implementing `nsIRequestObserver` interface
312 // in the constructor that's gonna be reused across several flushes.
313 let requestObserver = RequestObserver(this);
314
315
316 // Create observer that implements `nsIOutputStreamCallback` and register
317 // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once
318 // `nsIAsyncOutputStream` is closed.
319 asyncOutputStream.asyncWait(OutputStreamCallback(this),
320 asyncOutputStream.WAIT_CLOSURE_ONLY,
321 0,
322 threadManager.currentThread);
323
324 nsIRequestObserver(this, requestObserver);
325 nsIAsyncOutputStream(this, asyncOutputStream);
326 nsIMultiplexInputStream(this, multiplexInputStream);
327 nsIAsyncStreamCopier(this, asyncStreamCopier);
328
329 this.asyncOutputStream = asyncOutputStream;
330 this.multiplexInputStream = multiplexInputStream;
331 this.asyncStreamCopier = asyncStreamCopier;
332 },
333 write: function write(content, encoding, callback) {
334 if (isFunction(encoding)) {
335 callback = encoding;
336 encoding = callback;
337 }
338
339 // If stream is not writable we throw an error.
340 if (!this.writable) throw Error("stream is not writable");
341
342 let chunk = null;
343
344 // If content is not a buffer then we create one out of it.
345 if (Buffer.isBuffer(content)) {
346 chunk = new ArrayBufferInputStream();
347 chunk.setData(content.buffer, 0, content.length);
348 }
349 else {
350 chunk = new StringInputStream();
351 chunk.setData(content, content.length);
352 }
353
354 if (callback)
355 this.once("drain", callback);
356
357 // Queue up chunk to be copied to output sync.
358 nsIMultiplexInputStream(this).appendStream(chunk);
359 this.flush();
360
361 return this.drained;
362 },
363 flush: function() {
364 if (this.drained) {
365 this.drained = false;
366 nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null);
367 }
368 },
369 end: function end(content, encoding, callback) {
370 if (isFunction(content)) {
371 callback = content
372 content = callback
373 }
374 if (isFunction(encoding)) {
375 callback = encoding
376 encoding = callback
377 }
378
379 // Setting a listener to "finish" event if passed.
380 if (isFunction(callback))
381 this.once("finish", callback);
382
383
384 if (content)
385 this.write(content, encoding);
386 this.writable = false;
387
388 // Close `asyncOutputStream` only if output has drained. If it's
389 // not drained than `asyncStreamCopier` is busy writing, so let
390 // it finish. Note that since `this.writable` is false copier will
391 // close `asyncOutputStream` once output drains.
392 if (this.drained)
393 nsIAsyncOutputStream(this).close();
394 },
395 destroy: function destroy() {
396 nsIAsyncOutputStream(this).close();
397 nsIAsyncOutputStream(this);
398 nsIMultiplexInputStream(this);
399 nsIAsyncStreamCopier(this);
400 nsIRequestObserver(this);
401 }
402 });
403 exports.OutputStream = OutputStream;
404
405 const DuplexStream = Class({
406 extends: Stream,
407 implements: [InputStream, OutputStream],
408 allowHalfOpen: true,
409 initialize: function initialize(options) {
410 options = options || {};
411 let { readable, writable, allowHalfOpen } = options;
412
413 InputStream.prototype.initialize.call(this, options);
414 OutputStream.prototype.initialize.call(this, options);
415
416 if (readable === false)
417 this.readable = false;
418
419 if (writable === false)
420 this.writable = false;
421
422 if (allowHalfOpen === false)
423 this.allowHalfOpen = false;
424
425 // If in a half open state and it's disabled enforce end.
426 this.once("end", () => {
427 if (!this.allowHalfOpen && (!this.readable || !this.writable))
428 this.end();
429 });
430 },
431 destroy: function destroy(error) {
432 InputStream.prototype.destroy.call(this);
433 OutputStream.prototype.destroy.call(this);
434 }
435 });
436 exports.DuplexStream = DuplexStream;

mercurial