addon-sdk/source/lib/sdk/io/stream.js

Sat, 03 Jan 2015 20:18:00 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Sat, 03 Jan 2015 20:18:00 +0100
branch
TOR_BUG_3246
changeset 7
129ffea94266
permissions
-rw-r--r--

Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4 "use strict";
michael@0 5
michael@0 6 module.metadata = {
michael@0 7 "stability": "experimental"
michael@0 8 };
michael@0 9
michael@0 10 const { CC, Cc, Ci, Cu, Cr, components } = require("chrome");
michael@0 11 const { EventTarget } = require("../event/target");
michael@0 12 const { emit } = require("../event/core");
michael@0 13 const { Buffer } = require("./buffer");
michael@0 14 const { Class } = require("../core/heritage");
michael@0 15 const { setTimeout } = require("../timers");
michael@0 16
michael@0 17
michael@0 18 const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1",
michael@0 19 "nsIMultiplexInputStream");
michael@0 20 const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
michael@0 21 "nsIAsyncStreamCopier", "init");
michael@0 22 const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
michael@0 23 "nsIStringInputStream");
michael@0 24 const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
michael@0 25 "nsIArrayBufferInputStream");
michael@0 26
michael@0 27 const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
michael@0 28 "nsIBinaryInputStream", "setInputStream");
michael@0 29 const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1",
michael@0 30 "nsIInputStreamPump", "init");
michael@0 31
michael@0 32 const threadManager = Cc["@mozilla.org/thread-manager;1"].
michael@0 33 getService(Ci.nsIThreadManager);
michael@0 34
michael@0 35 const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"].
michael@0 36 getService(Ci.nsIEventTarget);
michael@0 37
michael@0 38 let isFunction = value => typeof(value) === "function"
michael@0 39
michael@0 40 function accessor() {
michael@0 41 let map = new WeakMap();
michael@0 42 return function(target, value) {
michael@0 43 if (value)
michael@0 44 map.set(target, value);
michael@0 45 return map.get(target);
michael@0 46 }
michael@0 47 }
michael@0 48
michael@0 49 const Stream = Class({
michael@0 50 extends: EventTarget,
michael@0 51 initialize: function() {
michael@0 52 this.readable = false;
michael@0 53 this.writable = false;
michael@0 54 this.encoding = null;
michael@0 55 },
michael@0 56 setEncoding: function setEncoding(encoding) {
michael@0 57 this.encoding = String(encoding).toUpperCase();
michael@0 58 },
michael@0 59 pipe: function pipe(target, options) {
michael@0 60 let source = this;
michael@0 61 function onData(chunk) {
michael@0 62 if (target.writable) {
michael@0 63 if (false === target.write(chunk))
michael@0 64 source.pause();
michael@0 65 }
michael@0 66 }
michael@0 67 function onDrain() {
michael@0 68 if (source.readable)
michael@0 69 source.resume();
michael@0 70 }
michael@0 71 function onEnd() {
michael@0 72 target.end();
michael@0 73 }
michael@0 74 function onPause() {
michael@0 75 source.pause();
michael@0 76 }
michael@0 77 function onResume() {
michael@0 78 if (source.readable)
michael@0 79 source.resume();
michael@0 80 }
michael@0 81
michael@0 82 function cleanup() {
michael@0 83 source.removeListener("data", onData);
michael@0 84 target.removeListener("drain", onDrain);
michael@0 85 source.removeListener("end", onEnd);
michael@0 86
michael@0 87 target.removeListener("pause", onPause);
michael@0 88 target.removeListener("resume", onResume);
michael@0 89
michael@0 90 source.removeListener("end", cleanup);
michael@0 91 source.removeListener("close", cleanup);
michael@0 92
michael@0 93 target.removeListener("end", cleanup);
michael@0 94 target.removeListener("close", cleanup);
michael@0 95 }
michael@0 96
michael@0 97 if (!options || options.end !== false)
michael@0 98 target.on("end", onEnd);
michael@0 99
michael@0 100 source.on("data", onData);
michael@0 101 target.on("drain", onDrain);
michael@0 102 target.on("resume", onResume);
michael@0 103 target.on("pause", onPause);
michael@0 104
michael@0 105 source.on("end", cleanup);
michael@0 106 source.on("close", cleanup);
michael@0 107
michael@0 108 target.on("end", cleanup);
michael@0 109 target.on("close", cleanup);
michael@0 110
michael@0 111 emit(target, "pipe", source);
michael@0 112 },
michael@0 113 pause: function pause() {
michael@0 114 emit(this, "pause");
michael@0 115 },
michael@0 116 resume: function resume() {
michael@0 117 emit(this, "resume");
michael@0 118 },
michael@0 119 destroySoon: function destroySoon() {
michael@0 120 this.destroy();
michael@0 121 }
michael@0 122 });
michael@0 123 exports.Stream = Stream;
michael@0 124
michael@0 125
michael@0 126 let nsIStreamListener = accessor();
michael@0 127 let nsIInputStreamPump = accessor();
michael@0 128 let nsIAsyncInputStream = accessor();
michael@0 129 let nsIBinaryInputStream = accessor();
michael@0 130
michael@0 131 const StreamListener = Class({
michael@0 132 initialize: function(stream) {
michael@0 133 this.stream = stream;
michael@0 134 },
michael@0 135
michael@0 136 // Next three methods are part of `nsIStreamListener` interface and are
michael@0 137 // invoked by `nsIInputStreamPump.asyncRead`.
michael@0 138 onDataAvailable: function(request, context, input, offset, count) {
michael@0 139 let stream = this.stream;
michael@0 140 let buffer = new ArrayBuffer(count);
michael@0 141 nsIBinaryInputStream(stream).readArrayBuffer(count, buffer);
michael@0 142 emit(stream, "data", new Buffer(buffer));
michael@0 143 },
michael@0 144
michael@0 145 // Next two methods implement `nsIRequestObserver` interface and are invoked
michael@0 146 // by `nsIInputStreamPump.asyncRead`.
michael@0 147 onStartRequest: function() {},
michael@0 148 // Called to signify the end of an asynchronous request. We only care to
michael@0 149 // discover errors.
michael@0 150 onStopRequest: function(request, context, status) {
michael@0 151 let stream = this.stream;
michael@0 152 stream.readable = false;
michael@0 153 if (!components.isSuccessCode(status))
michael@0 154 emit(stream, "error", status);
michael@0 155 else
michael@0 156 emit(stream, "end");
michael@0 157 }
michael@0 158 });
michael@0 159
michael@0 160
michael@0 161 const InputStream = Class({
michael@0 162 extends: Stream,
michael@0 163 readable: false,
michael@0 164 paused: false,
michael@0 165 initialize: function initialize(options) {
michael@0 166 let { asyncInputStream } = options;
michael@0 167
michael@0 168 this.readable = true;
michael@0 169
michael@0 170 let binaryInputStream = new BinaryInputStream(asyncInputStream);
michael@0 171 let inputStreamPump = new InputStreamPump(asyncInputStream,
michael@0 172 -1, -1, 0, 0, false);
michael@0 173 let streamListener = new StreamListener(this);
michael@0 174
michael@0 175 nsIAsyncInputStream(this, asyncInputStream);
michael@0 176 nsIInputStreamPump(this, inputStreamPump);
michael@0 177 nsIBinaryInputStream(this, binaryInputStream);
michael@0 178 nsIStreamListener(this, streamListener);
michael@0 179
michael@0 180 this.asyncInputStream = asyncInputStream;
michael@0 181 this.inputStreamPump = inputStreamPump;
michael@0 182 this.binaryInputStream = binaryInputStream;
michael@0 183 },
michael@0 184 get status() nsIInputStreamPump(this).status,
michael@0 185 read: function() {
michael@0 186 nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null);
michael@0 187 },
michael@0 188 pause: function pause() {
michael@0 189 this.paused = true;
michael@0 190 nsIInputStreamPump(this).suspend();
michael@0 191 emit(this, "paused");
michael@0 192 },
michael@0 193 resume: function resume() {
michael@0 194 this.paused = false;
michael@0 195 nsIInputStreamPump(this).resume();
michael@0 196 emit(this, "resume");
michael@0 197 },
michael@0 198 close: function close() {
michael@0 199 this.readable = false;
michael@0 200 nsIInputStreamPump(this).cancel(Cr.NS_OK);
michael@0 201 nsIBinaryInputStream(this).close();
michael@0 202 nsIAsyncInputStream(this).close();
michael@0 203 },
michael@0 204 destroy: function destroy() {
michael@0 205 this.close();
michael@0 206
michael@0 207 nsIInputStreamPump(this);
michael@0 208 nsIAsyncInputStream(this);
michael@0 209 nsIBinaryInputStream(this);
michael@0 210 nsIStreamListener(this);
michael@0 211 }
michael@0 212 });
michael@0 213 exports.InputStream = InputStream;
michael@0 214
michael@0 215
michael@0 216
michael@0 217 let nsIRequestObserver = accessor();
michael@0 218 let nsIAsyncOutputStream = accessor();
michael@0 219 let nsIAsyncStreamCopier = accessor();
michael@0 220 let nsIMultiplexInputStream = accessor();
michael@0 221
michael@0 222 const RequestObserver = Class({
michael@0 223 initialize: function(stream) {
michael@0 224 this.stream = stream;
michael@0 225 },
michael@0 226 // Method is part of `nsIRequestObserver` interface that is
michael@0 227 // invoked by `nsIAsyncStreamCopier.asyncCopy`.
michael@0 228 onStartRequest: function() {},
michael@0 229 // Method is part of `nsIRequestObserver` interface that is
michael@0 230 // invoked by `nsIAsyncStreamCopier.asyncCopy`.
michael@0 231 onStopRequest: function(request, context, status) {
michael@0 232 let stream = this.stream;
michael@0 233 stream.drained = true;
michael@0 234
michael@0 235 // Remove copied chunk.
michael@0 236 let multiplexInputStream = nsIMultiplexInputStream(stream);
michael@0 237 multiplexInputStream.removeStream(0);
michael@0 238
michael@0 239 // If there was an error report.
michael@0 240 if (!components.isSuccessCode(status))
michael@0 241 emit(stream, "error", status);
michael@0 242
michael@0 243 // If there more chunks in queue then flush them.
michael@0 244 else if (multiplexInputStream.count)
michael@0 245 stream.flush();
michael@0 246
michael@0 247 // If stream is still writable notify that queue has drained.
michael@0 248 else if (stream.writable)
michael@0 249 emit(stream, "drain");
michael@0 250
michael@0 251 // If stream is no longer writable close it.
michael@0 252 else {
michael@0 253 nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK);
michael@0 254 nsIMultiplexInputStream(stream).close();
michael@0 255 nsIAsyncOutputStream(stream).close();
michael@0 256 nsIAsyncOutputStream(stream).flush();
michael@0 257 }
michael@0 258 }
michael@0 259 });
michael@0 260
michael@0 261 const OutputStreamCallback = Class({
michael@0 262 initialize: function(stream) {
michael@0 263 this.stream = stream;
michael@0 264 },
michael@0 265 // Method is part of `nsIOutputStreamCallback` interface that
michael@0 266 // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered
michael@0 267 // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior,
michael@0 268 // causing the `onOutputStreamReady` notification to be suppressed until
michael@0 269 // the stream becomes closed.
michael@0 270 onOutputStreamReady: function(nsIAsyncOutputStream) {
michael@0 271 emit(this.stream, "finish");
michael@0 272 }
michael@0 273 });
michael@0 274
michael@0 275 const OutputStream = Class({
michael@0 276 extends: Stream,
michael@0 277 writable: false,
michael@0 278 drained: true,
michael@0 279 get bufferSize() {
michael@0 280 let multiplexInputStream = nsIMultiplexInputStream(this);
michael@0 281 return multiplexInputStream && multiplexInputStream.available();
michael@0 282 },
michael@0 283 initialize: function initialize(options) {
michael@0 284 let { asyncOutputStream, output } = options;
michael@0 285 this.writable = true;
michael@0 286
michael@0 287 // Ensure that `nsIAsyncOutputStream` was provided.
michael@0 288 asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream);
michael@0 289
michael@0 290 // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former
michael@0 291 // is used to queue written data chunks that `asyncStreamCopier` will
michael@0 292 // asynchronously drain into `asyncOutputStream`.
michael@0 293 let multiplexInputStream = MultiplexInputStream();
michael@0 294 let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream,
michael@0 295 output || asyncOutputStream,
michael@0 296 eventTarget,
michael@0 297 // nsIMultiplexInputStream
michael@0 298 // implemnts .readSegments()
michael@0 299 true,
michael@0 300 // nsIOutputStream may or
michael@0 301 // may not implemnet
michael@0 302 // .writeSegments().
michael@0 303 false,
michael@0 304 // Use default buffer size.
michael@0 305 null,
michael@0 306 // Should not close an input.
michael@0 307 false,
michael@0 308 // Should not close an output.
michael@0 309 false);
michael@0 310
michael@0 311 // Create `requestObserver` implementing `nsIRequestObserver` interface
michael@0 312 // in the constructor that's gonna be reused across several flushes.
michael@0 313 let requestObserver = RequestObserver(this);
michael@0 314
michael@0 315
michael@0 316 // Create observer that implements `nsIOutputStreamCallback` and register
michael@0 317 // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once
michael@0 318 // `nsIAsyncOutputStream` is closed.
michael@0 319 asyncOutputStream.asyncWait(OutputStreamCallback(this),
michael@0 320 asyncOutputStream.WAIT_CLOSURE_ONLY,
michael@0 321 0,
michael@0 322 threadManager.currentThread);
michael@0 323
michael@0 324 nsIRequestObserver(this, requestObserver);
michael@0 325 nsIAsyncOutputStream(this, asyncOutputStream);
michael@0 326 nsIMultiplexInputStream(this, multiplexInputStream);
michael@0 327 nsIAsyncStreamCopier(this, asyncStreamCopier);
michael@0 328
michael@0 329 this.asyncOutputStream = asyncOutputStream;
michael@0 330 this.multiplexInputStream = multiplexInputStream;
michael@0 331 this.asyncStreamCopier = asyncStreamCopier;
michael@0 332 },
michael@0 333 write: function write(content, encoding, callback) {
michael@0 334 if (isFunction(encoding)) {
michael@0 335 callback = encoding;
michael@0 336 encoding = callback;
michael@0 337 }
michael@0 338
michael@0 339 // If stream is not writable we throw an error.
michael@0 340 if (!this.writable) throw Error("stream is not writable");
michael@0 341
michael@0 342 let chunk = null;
michael@0 343
michael@0 344 // If content is not a buffer then we create one out of it.
michael@0 345 if (Buffer.isBuffer(content)) {
michael@0 346 chunk = new ArrayBufferInputStream();
michael@0 347 chunk.setData(content.buffer, 0, content.length);
michael@0 348 }
michael@0 349 else {
michael@0 350 chunk = new StringInputStream();
michael@0 351 chunk.setData(content, content.length);
michael@0 352 }
michael@0 353
michael@0 354 if (callback)
michael@0 355 this.once("drain", callback);
michael@0 356
michael@0 357 // Queue up chunk to be copied to output sync.
michael@0 358 nsIMultiplexInputStream(this).appendStream(chunk);
michael@0 359 this.flush();
michael@0 360
michael@0 361 return this.drained;
michael@0 362 },
michael@0 363 flush: function() {
michael@0 364 if (this.drained) {
michael@0 365 this.drained = false;
michael@0 366 nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null);
michael@0 367 }
michael@0 368 },
michael@0 369 end: function end(content, encoding, callback) {
michael@0 370 if (isFunction(content)) {
michael@0 371 callback = content
michael@0 372 content = callback
michael@0 373 }
michael@0 374 if (isFunction(encoding)) {
michael@0 375 callback = encoding
michael@0 376 encoding = callback
michael@0 377 }
michael@0 378
michael@0 379 // Setting a listener to "finish" event if passed.
michael@0 380 if (isFunction(callback))
michael@0 381 this.once("finish", callback);
michael@0 382
michael@0 383
michael@0 384 if (content)
michael@0 385 this.write(content, encoding);
michael@0 386 this.writable = false;
michael@0 387
michael@0 388 // Close `asyncOutputStream` only if output has drained. If it's
michael@0 389 // not drained than `asyncStreamCopier` is busy writing, so let
michael@0 390 // it finish. Note that since `this.writable` is false copier will
michael@0 391 // close `asyncOutputStream` once output drains.
michael@0 392 if (this.drained)
michael@0 393 nsIAsyncOutputStream(this).close();
michael@0 394 },
michael@0 395 destroy: function destroy() {
michael@0 396 nsIAsyncOutputStream(this).close();
michael@0 397 nsIAsyncOutputStream(this);
michael@0 398 nsIMultiplexInputStream(this);
michael@0 399 nsIAsyncStreamCopier(this);
michael@0 400 nsIRequestObserver(this);
michael@0 401 }
michael@0 402 });
michael@0 403 exports.OutputStream = OutputStream;
michael@0 404
michael@0 405 const DuplexStream = Class({
michael@0 406 extends: Stream,
michael@0 407 implements: [InputStream, OutputStream],
michael@0 408 allowHalfOpen: true,
michael@0 409 initialize: function initialize(options) {
michael@0 410 options = options || {};
michael@0 411 let { readable, writable, allowHalfOpen } = options;
michael@0 412
michael@0 413 InputStream.prototype.initialize.call(this, options);
michael@0 414 OutputStream.prototype.initialize.call(this, options);
michael@0 415
michael@0 416 if (readable === false)
michael@0 417 this.readable = false;
michael@0 418
michael@0 419 if (writable === false)
michael@0 420 this.writable = false;
michael@0 421
michael@0 422 if (allowHalfOpen === false)
michael@0 423 this.allowHalfOpen = false;
michael@0 424
michael@0 425 // If in a half open state and it's disabled enforce end.
michael@0 426 this.once("end", () => {
michael@0 427 if (!this.allowHalfOpen && (!this.readable || !this.writable))
michael@0 428 this.end();
michael@0 429 });
michael@0 430 },
michael@0 431 destroy: function destroy(error) {
michael@0 432 InputStream.prototype.destroy.call(this);
michael@0 433 OutputStream.prototype.destroy.call(this);
michael@0 434 }
michael@0 435 });
michael@0 436 exports.DuplexStream = DuplexStream;

mercurial