addon-sdk/source/lib/sdk/io/stream.js

Sat, 03 Jan 2015 20:18:00 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Sat, 03 Jan 2015 20:18:00 +0100
branch
TOR_BUG_3246
changeset 7
129ffea94266
permissions
-rw-r--r--

Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     4 "use strict";
     6 module.metadata = {
     7   "stability": "experimental"
     8 };
    10 const { CC, Cc, Ci, Cu, Cr, components } = require("chrome");
    11 const { EventTarget } = require("../event/target");
    12 const { emit } = require("../event/core");
    13 const { Buffer } = require("./buffer");
    14 const { Class } = require("../core/heritage");
    15 const { setTimeout } = require("../timers");
    18 const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1",
    19                                 "nsIMultiplexInputStream");
    20 const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
    21                              "nsIAsyncStreamCopier", "init");
    22 const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
    23                              "nsIStringInputStream");
    24 const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
    25                                   "nsIArrayBufferInputStream");
    27 const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
    28                              "nsIBinaryInputStream", "setInputStream");
    29 const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1",
    30                            "nsIInputStreamPump", "init");
    32 const threadManager = Cc["@mozilla.org/thread-manager;1"].
    33                       getService(Ci.nsIThreadManager);
    35 const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"].
    36                     getService(Ci.nsIEventTarget);
    38 let isFunction = value => typeof(value) === "function"
    40 function accessor() {
    41   let map = new WeakMap();
    42   return function(target, value) {
    43     if (value)
    44       map.set(target, value);
    45     return map.get(target);
    46   }
    47 }
    49 const Stream = Class({
    50   extends: EventTarget,
    51   initialize: function() {
    52     this.readable = false;
    53     this.writable = false;
    54     this.encoding = null;
    55   },
    56   setEncoding: function setEncoding(encoding) {
    57     this.encoding = String(encoding).toUpperCase();
    58   },
    59   pipe: function pipe(target, options) {
    60     let source = this;
    61     function onData(chunk) {
    62       if (target.writable) {
    63         if (false === target.write(chunk))
    64           source.pause();
    65       }
    66     }
    67     function onDrain() {
    68       if (source.readable)
    69         source.resume();
    70     }
    71     function onEnd() {
    72       target.end();
    73     }
    74     function onPause() {
    75       source.pause();
    76     }
    77     function onResume() {
    78       if (source.readable)
    79         source.resume();
    80     }
    82     function cleanup() {
    83       source.removeListener("data", onData);
    84       target.removeListener("drain", onDrain);
    85       source.removeListener("end", onEnd);
    87       target.removeListener("pause", onPause);
    88       target.removeListener("resume", onResume);
    90       source.removeListener("end", cleanup);
    91       source.removeListener("close", cleanup);
    93       target.removeListener("end", cleanup);
    94       target.removeListener("close", cleanup);
    95     }
    97     if (!options || options.end !== false)
    98       target.on("end", onEnd);
   100     source.on("data", onData);
   101     target.on("drain", onDrain);
   102     target.on("resume", onResume);
   103     target.on("pause", onPause);
   105     source.on("end", cleanup);
   106     source.on("close", cleanup);
   108     target.on("end", cleanup);
   109     target.on("close", cleanup);
   111     emit(target, "pipe", source);
   112   },
   113   pause: function pause() {
   114     emit(this, "pause");
   115   },
   116   resume: function resume() {
   117     emit(this, "resume");
   118   },
   119   destroySoon: function destroySoon() {
   120     this.destroy();
   121   }
   122 });
   123 exports.Stream = Stream;
   126 let nsIStreamListener = accessor();
   127 let nsIInputStreamPump = accessor();
   128 let nsIAsyncInputStream = accessor();
   129 let nsIBinaryInputStream = accessor();
   131 const StreamListener = Class({
   132   initialize: function(stream) {
   133     this.stream = stream;
   134   },
   136   // Next three methods are part of `nsIStreamListener` interface and are
   137   // invoked by `nsIInputStreamPump.asyncRead`.
   138   onDataAvailable: function(request, context, input, offset, count) {
   139     let stream = this.stream;
   140     let buffer = new ArrayBuffer(count);
   141     nsIBinaryInputStream(stream).readArrayBuffer(count, buffer);
   142     emit(stream, "data", new Buffer(buffer));
   143   },
   145   // Next two methods implement `nsIRequestObserver` interface and are invoked
   146   // by `nsIInputStreamPump.asyncRead`.
   147   onStartRequest: function() {},
   148   // Called to signify the end of an asynchronous request. We only care to
   149   // discover errors.
   150   onStopRequest: function(request, context, status) {
   151     let stream = this.stream;
   152     stream.readable = false;
   153     if (!components.isSuccessCode(status))
   154       emit(stream, "error", status);
   155     else
   156       emit(stream, "end");
   157   }
   158 });
   161 const InputStream = Class({
   162   extends: Stream,
   163   readable: false,
   164   paused: false,
   165   initialize: function initialize(options) {
   166     let { asyncInputStream } = options;
   168     this.readable = true;
   170     let binaryInputStream = new BinaryInputStream(asyncInputStream);
   171     let inputStreamPump = new InputStreamPump(asyncInputStream,
   172                                               -1, -1, 0, 0, false);
   173     let streamListener = new StreamListener(this);
   175     nsIAsyncInputStream(this, asyncInputStream);
   176     nsIInputStreamPump(this, inputStreamPump);
   177     nsIBinaryInputStream(this, binaryInputStream);
   178     nsIStreamListener(this, streamListener);
   180     this.asyncInputStream = asyncInputStream;
   181     this.inputStreamPump = inputStreamPump;
   182     this.binaryInputStream = binaryInputStream;
   183   },
   184   get status() nsIInputStreamPump(this).status,
   185   read: function() {
   186     nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null);
   187   },
   188   pause: function pause() {
   189     this.paused = true;
   190     nsIInputStreamPump(this).suspend();
   191     emit(this, "paused");
   192   },
   193   resume: function resume() {
   194     this.paused = false;
   195     nsIInputStreamPump(this).resume();
   196     emit(this, "resume");
   197   },
   198   close: function close() {
   199     this.readable = false;
   200     nsIInputStreamPump(this).cancel(Cr.NS_OK);
   201     nsIBinaryInputStream(this).close();
   202     nsIAsyncInputStream(this).close();
   203   },
   204   destroy: function destroy() {
   205     this.close();
   207     nsIInputStreamPump(this);
   208     nsIAsyncInputStream(this);
   209     nsIBinaryInputStream(this);
   210     nsIStreamListener(this);
   211   }
   212 });
   213 exports.InputStream = InputStream;
   217 let nsIRequestObserver = accessor();
   218 let nsIAsyncOutputStream = accessor();
   219 let nsIAsyncStreamCopier = accessor();
   220 let nsIMultiplexInputStream = accessor();
   222 const RequestObserver = Class({
   223   initialize: function(stream) {
   224     this.stream = stream;
   225   },
   226   // Method is part of `nsIRequestObserver` interface that is
   227   // invoked by `nsIAsyncStreamCopier.asyncCopy`.
   228   onStartRequest: function() {},
   229   // Method is part of `nsIRequestObserver` interface that is
   230   // invoked by `nsIAsyncStreamCopier.asyncCopy`.
   231   onStopRequest: function(request, context, status) {
   232     let stream = this.stream;
   233     stream.drained = true;
   235     // Remove copied chunk.
   236     let multiplexInputStream = nsIMultiplexInputStream(stream);
   237     multiplexInputStream.removeStream(0);
   239     // If there was an error report.
   240     if (!components.isSuccessCode(status))
   241       emit(stream, "error", status);
   243     // If there more chunks in queue then flush them.
   244     else if (multiplexInputStream.count)
   245       stream.flush();
   247     // If stream is still writable notify that queue has drained.
   248     else if (stream.writable)
   249       emit(stream, "drain");
   251     // If stream is no longer writable close it.
   252     else {
   253       nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK);
   254       nsIMultiplexInputStream(stream).close();
   255       nsIAsyncOutputStream(stream).close();
   256       nsIAsyncOutputStream(stream).flush();
   257     }
   258   }
   259 });
   261 const OutputStreamCallback = Class({
   262   initialize: function(stream) {
   263     this.stream = stream;
   264   },
   265   // Method is part of `nsIOutputStreamCallback` interface that
   266   // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered
   267   // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior,
   268   // causing the `onOutputStreamReady` notification to be suppressed until
   269   // the stream becomes closed.
   270   onOutputStreamReady: function(nsIAsyncOutputStream) {
   271     emit(this.stream, "finish");
   272   }
   273 });
   275 const OutputStream = Class({
   276   extends: Stream,
   277   writable: false,
   278   drained: true,
   279   get bufferSize() {
   280     let multiplexInputStream = nsIMultiplexInputStream(this);
   281     return multiplexInputStream && multiplexInputStream.available();
   282   },
   283   initialize: function initialize(options) {
   284     let { asyncOutputStream, output } = options;
   285     this.writable = true;
   287     // Ensure that `nsIAsyncOutputStream` was provided.
   288     asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream);
   290     // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former
   291     // is used to queue written data chunks that `asyncStreamCopier` will
   292     // asynchronously drain into `asyncOutputStream`.
   293     let multiplexInputStream = MultiplexInputStream();
   294     let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream,
   295                                               output || asyncOutputStream,
   296                                               eventTarget,
   297                                               // nsIMultiplexInputStream
   298                                               // implemnts .readSegments()
   299                                               true,
   300                                               // nsIOutputStream may or
   301                                               // may not implemnet
   302                                               // .writeSegments().
   303                                               false,
   304                                               // Use default buffer size.
   305                                               null,
   306                                               // Should not close an input.
   307                                               false,
   308                                               // Should not close an output.
   309                                               false);
   311     // Create `requestObserver` implementing `nsIRequestObserver` interface
   312     // in the constructor that's gonna be reused across several flushes.
   313     let requestObserver = RequestObserver(this);
   316     // Create observer that implements `nsIOutputStreamCallback` and register
   317     // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once
   318     // `nsIAsyncOutputStream` is closed.
   319     asyncOutputStream.asyncWait(OutputStreamCallback(this),
   320                                 asyncOutputStream.WAIT_CLOSURE_ONLY,
   321                                 0,
   322                                 threadManager.currentThread);
   324     nsIRequestObserver(this, requestObserver);
   325     nsIAsyncOutputStream(this, asyncOutputStream);
   326     nsIMultiplexInputStream(this, multiplexInputStream);
   327     nsIAsyncStreamCopier(this, asyncStreamCopier);
   329     this.asyncOutputStream = asyncOutputStream;
   330     this.multiplexInputStream = multiplexInputStream;
   331     this.asyncStreamCopier = asyncStreamCopier;
   332   },
   333   write: function write(content, encoding, callback) {
   334     if (isFunction(encoding)) {
   335       callback = encoding;
   336       encoding = callback;
   337     }
   339     // If stream is not writable we throw an error.
   340     if (!this.writable) throw Error("stream is not writable");
   342     let chunk = null;
   344     // If content is not a buffer then we create one out of it.
   345     if (Buffer.isBuffer(content)) {
   346       chunk = new ArrayBufferInputStream();
   347       chunk.setData(content.buffer, 0, content.length);
   348     }
   349     else {
   350       chunk = new StringInputStream();
   351       chunk.setData(content, content.length);
   352     }
   354     if (callback)
   355       this.once("drain", callback);
   357     // Queue up chunk to be copied to output sync.
   358     nsIMultiplexInputStream(this).appendStream(chunk);
   359     this.flush();
   361     return this.drained;
   362   },
   363   flush: function() {
   364     if (this.drained) {
   365       this.drained = false;
   366       nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null);
   367     }
   368   },
   369   end: function end(content, encoding, callback) {
   370     if (isFunction(content)) {
   371       callback = content
   372       content = callback
   373     }
   374     if (isFunction(encoding)) {
   375       callback = encoding
   376       encoding = callback
   377     }
   379     // Setting a listener to "finish" event if passed.
   380     if (isFunction(callback))
   381       this.once("finish", callback);
   384     if (content)
   385       this.write(content, encoding);
   386     this.writable = false;
   388     // Close `asyncOutputStream` only if output has drained. If it's
   389     // not drained than `asyncStreamCopier` is busy writing, so let
   390     // it finish. Note that since `this.writable` is false copier will
   391     // close `asyncOutputStream` once output drains.
   392     if (this.drained)
   393       nsIAsyncOutputStream(this).close();
   394   },
   395   destroy: function destroy() {
   396     nsIAsyncOutputStream(this).close();
   397     nsIAsyncOutputStream(this);
   398     nsIMultiplexInputStream(this);
   399     nsIAsyncStreamCopier(this);
   400     nsIRequestObserver(this);
   401   }
   402 });
   403 exports.OutputStream = OutputStream;
   405 const DuplexStream = Class({
   406   extends: Stream,
   407   implements: [InputStream, OutputStream],
   408   allowHalfOpen: true,
   409   initialize: function initialize(options) {
   410     options = options || {};
   411     let { readable, writable, allowHalfOpen } = options;
   413     InputStream.prototype.initialize.call(this, options);
   414     OutputStream.prototype.initialize.call(this, options);
   416     if (readable === false)
   417       this.readable = false;
   419     if (writable === false)
   420       this.writable = false;
   422     if (allowHalfOpen === false)
   423       this.allowHalfOpen = false;
   425     // If in a half open state and it's disabled enforce end.
   426     this.once("end", () => {
   427       if (!this.allowHalfOpen && (!this.readable || !this.writable))
   428         this.end();
   429     });
   430   },
   431   destroy: function destroy(error) {
   432     InputStream.prototype.destroy.call(this);
   433     OutputStream.prototype.destroy.call(this);
   434   }
   435 });
   436 exports.DuplexStream = DuplexStream;

mercurial