testing/xpcshell/node-http2/node_modules/http2-protocol/lib/flow.js

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/flow.js	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,346 @@
     1.4 +var assert = require('assert');
     1.5 +
     1.6 +// The Flow class
     1.7 +// ==============
     1.8 +
     1.9 +// Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be
    1.10 +// subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html).
    1.11 +// [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex
    1.12 +
    1.13 +var Duplex  = require('stream').Duplex;
    1.14 +
    1.15 +exports.Flow = Flow;
    1.16 +
    1.17 +// Public API
    1.18 +// ----------
    1.19 +
    1.20 +// * **Event: 'error' (type)**: signals an error
    1.21 +//
    1.22 +// * **setInitialWindow(size)**: the initial flow control window size can be changed *any time*
    1.23 +//   ([as described in the standard][1]) using this method
    1.24 +//
    1.25 +// [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2
    1.26 +
    1.27 +// API for child classes
    1.28 +// ---------------------
    1.29 +
    1.30 +// * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames
    1.31 +//   with the given `flowControlId` (or every update frame if not given)
    1.32 +//
    1.33 +// * **_send()**: called when more frames should be pushed. The child class is expected to override
    1.34 +//   this (instead of the `_read` method of the Duplex class).
    1.35 +//
    1.36 +// * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is
    1.37 +//   expected to override this (instead of the `_write` method of the Duplex class).
    1.38 +//
    1.39 +// * **push(frame): bool**: schedules `frame` for sending.
    1.40 +//
    1.41 +//   Returns `true` if it needs more frames in the output queue, `false` if the output queue is
    1.42 +//   full, and `null` if did not push the frame into the output queue (instead, it pushed it into
    1.43 +//   the flow control queue).
    1.44 +//
    1.45 +// * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA
    1.46 +//   frames, length of the payload for DATA frames) of the returned frame will be under `limit`.
    1.47 +//   Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the
    1.48 +//   same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0).
    1.49 +//
    1.50 +// * **getLastQueuedFrame(): frame**: returns the last frame in output buffers
    1.51 +//
    1.52 +// * **_log**: the Flow class uses the `_log` object of the parent
    1.53 +
    1.54 +// Constructor
    1.55 +// -----------
    1.56 +
    1.57 +// When a HTTP/2.0 connection is first established, new streams are created with an initial flow
    1.58 +// control window size of 65535 bytes.
    1.59 +var INITIAL_WINDOW_SIZE = 65535;
    1.60 +
    1.61 +// `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched.
    1.62 +function Flow(flowControlId) {
    1.63 +  Duplex.call(this, { objectMode: true });
    1.64 +
    1.65 +  this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
    1.66 +  this._flowControlId = flowControlId;
    1.67 +  this._queue = [];
    1.68 +  this._ended = false;
    1.69 +  this._received = 0;
    1.70 +}
    1.71 +Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });
    1.72 +
    1.73 +// Incoming frames
    1.74 +// ---------------
    1.75 +
    1.76 +// `_receive` is called when there's an incoming frame.
    1.77 +Flow.prototype._receive = function _receive(frame, callback) {
    1.78 +  throw new Error('The _receive(frame, callback) method has to be overridden by the child class!');
    1.79 +};
    1.80 +
    1.81 +// `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s
    1.82 +// to the flow. It emits the 'receiving' event and notifies the window size tracking code if the
    1.83 +// incoming frame is a WINDOW_UPDATE.
    1.84 +// [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
    1.85 +Flow.prototype._write = function _write(frame, encoding, callback) {
    1.86 +  if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) {
    1.87 +    this._ended = true;
    1.88 +  }
    1.89 +
    1.90 +  if ((frame.type === 'DATA') && (frame.data.length > 0)) {
    1.91 +    this._receive(frame, function() {
    1.92 +      this._received += frame.data.length;
    1.93 +      if (!this._restoreWindowTimer) {
    1.94 +        this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
    1.95 +      }
    1.96 +      callback();
    1.97 +    }.bind(this));
    1.98 +  }
    1.99 +
   1.100 +  else {
   1.101 +    this._receive(frame, callback);
   1.102 +  }
   1.103 +
   1.104 +  if ((frame.type === 'WINDOW_UPDATE') &&
   1.105 +      ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) {
   1.106 +    this._updateWindow(frame);
   1.107 +  }
   1.108 +};
   1.109 +
   1.110 +// `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
   1.111 +// a WINDOW_UPDATE that restores the flow control window of the remote end.
   1.112 +Flow.prototype._restoreWindow = function _restoreWindow() {
   1.113 +  delete this._restoreWindowTimer;
   1.114 +  if (!this._ended && (this._received > 0)) {
   1.115 +    this.push({
   1.116 +      type: 'WINDOW_UPDATE',
   1.117 +      flags: {},
   1.118 +      stream: this._flowControlId,
   1.119 +      window_size: this._received
   1.120 +    });
   1.121 +    this._received = 0;
   1.122 +  }
   1.123 +};
   1.124 +
   1.125 +// Outgoing frames - sending procedure
   1.126 +// -----------------------------------
   1.127 +
   1.128 +//                                         flow
   1.129 +//                +-------------------------------------------------+
   1.130 +//                |                                                 |
   1.131 +//                +--------+           +---------+                  |
   1.132 +//        read()  | output |  _read()  | flow    |  _send()         |
   1.133 +//     <----------|        |<----------| control |<-------------    |
   1.134 +//                | buffer |           | buffer  |                  |
   1.135 +//                +--------+           +---------+                  |
   1.136 +//                | input  |                                        |
   1.137 +//     ---------->|        |----------------------------------->    |
   1.138 +//       write()  | buffer |  _write()              _receive()      |
   1.139 +//                +--------+                                        |
   1.140 +//                |                                                 |
   1.141 +//                +-------------------------------------------------+
   1.142 +
   1.143 +// `_send` is called when more frames should be pushed to the output buffer.
   1.144 +Flow.prototype._send = function _send() {
   1.145 +  throw new Error('The _send() method has to be overridden by the child class!');
   1.146 +};
   1.147 +
   1.148 +// `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more
   1.149 +// items in the output queue.
   1.150 +// [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
   1.151 +Flow.prototype._read = function _read() {
   1.152 +  // * if the flow control queue is empty, then let the user push more frames
   1.153 +  if (this._queue.length === 0) {
   1.154 +    this._send();
   1.155 +  }
   1.156 +
   1.157 +  // * if there are items in the flow control queue, then let's put them into the output queue (to
   1.158 +  //   the extent it is possible with respect to the window size and output queue feedback)
   1.159 +  else if (this._window > 0) {
   1.160 +    this._readableState.sync = true; // to avoid reentrant calls
   1.161 +    do {
   1.162 +      var moreNeeded = this._push(this._queue[0]);
   1.163 +      if (moreNeeded !== null) {
   1.164 +        this._queue.shift();
   1.165 +      }
   1.166 +    } while (moreNeeded && (this._queue.length > 0));
   1.167 +    this._readableState.sync = false;
   1.168 +
   1.169 +    assert((moreNeeded == false) ||                              // * output queue is full
   1.170 +           (this._queue.length === 0) ||                         // * flow control queue is empty
   1.171 +           (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update
   1.172 +  }
   1.173 +
   1.174 +  // * otherwise, come back when the flow control window is positive
   1.175 +  else {
   1.176 +    this.once('window_update', this._read);
   1.177 +  }
   1.178 +};
   1.179 +
   1.180 +var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383
   1.181 +
   1.182 +// `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control
   1.183 +// size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will
   1.184 +// be under `limit`.
   1.185 +Flow.prototype.read = function read(limit) {
   1.186 +  if (limit === 0) {
   1.187 +    return Duplex.prototype.read.call(this, 0);
   1.188 +  } else if (limit === -1) {
   1.189 +    limit = 0;
   1.190 +  } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) {
   1.191 +    limit = MAX_PAYLOAD_SIZE;
   1.192 +  }
   1.193 +
   1.194 +  // * Looking at the first frame in the queue without pulling it out if possible. This will save
   1.195 +  //   a costly unshift if the frame proves to be too large to return.
   1.196 +  var firstInQueue = this._readableState.buffer[0];
   1.197 +  var frame = firstInQueue || Duplex.prototype.read.call(this);
   1.198 +
   1.199 +  if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) {
   1.200 +    if (firstInQueue) {
   1.201 +      Duplex.prototype.read.call(this);
   1.202 +    }
   1.203 +    return frame;
   1.204 +  }
   1.205 +
   1.206 +  else if (limit <= 0) {
   1.207 +    if (!firstInQueue) {
   1.208 +      this.unshift(frame);
   1.209 +    }
   1.210 +    return null;
   1.211 +  }
   1.212 +
   1.213 +  else {
   1.214 +    this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit },
   1.215 +                    'Splitting out forwardable part of a DATA frame.');
   1.216 +    var forwardable = {
   1.217 +      type: 'DATA',
   1.218 +      flags: {},
   1.219 +      stream: frame.stream,
   1.220 +      data: frame.data.slice(0, limit)
   1.221 +    };
   1.222 +    frame.data = frame.data.slice(limit);
   1.223 +
   1.224 +    if (!firstInQueue) {
   1.225 +      this.unshift(frame);
   1.226 +    }
   1.227 +    return forwardable;
   1.228 +  }
   1.229 +};
   1.230 +
   1.231 +// `_parentPush` pushes the given `frame` into the output queue
   1.232 +Flow.prototype._parentPush = function _parentPush(frame) {
   1.233 +  this._log.trace({ frame: frame }, 'Pushing frame into the output queue');
   1.234 +
   1.235 +  if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) {
   1.236 +    this._log.trace({ window: this._window, by: frame.data.length },
   1.237 +                    'Decreasing flow control window size.');
   1.238 +    this._window -= frame.data.length;
   1.239 +    assert(this._window >= 0);
   1.240 +  }
   1.241 +
   1.242 +  return Duplex.prototype.push.call(this, frame);
   1.243 +};
   1.244 +
   1.245 +// `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size.
   1.246 +// It is capable of splitting DATA frames into smaller parts, if the window size is not enough to
   1.247 +// push the whole frame. The return value is similar to `push` except that it returns `null` if it
   1.248 +// did not push the whole frame to the output queue (but maybe it did push part of the frame).
   1.249 +Flow.prototype._push = function _push(frame) {
   1.250 +  var data = frame && (frame.type === 'DATA') && frame.data;
   1.251 +
   1.252 +  if (!data || (data.length <= this._window)) {
   1.253 +    return this._parentPush(frame);
   1.254 +  }
   1.255 +
   1.256 +  else if (this._window <= 0) {
   1.257 +    return null;
   1.258 +  }
   1.259 +
   1.260 +  else {
   1.261 +    this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window },
   1.262 +                    'Splitting out forwardable part of a DATA frame.');
   1.263 +    frame.data = data.slice(this._window);
   1.264 +    this._parentPush({
   1.265 +      type: 'DATA',
   1.266 +      flags: {},
   1.267 +      stream: frame.stream,
   1.268 +      data: data.slice(0, this._window)
   1.269 +    });
   1.270 +    return null;
   1.271 +  }
   1.272 +};
   1.273 +
   1.274 +// Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
   1.275 +Flow.prototype.push = function push(frame) {
   1.276 +  if (frame === null) {
   1.277 +    this._log.debug('Enqueueing outgoing End Of Stream');
   1.278 +  } else {
   1.279 +    this._log.debug({ frame: frame }, 'Enqueueing outgoing frame');
   1.280 +  }
   1.281 +
   1.282 +  var moreNeeded = null;
   1.283 +  if (this._queue.length === 0) {
   1.284 +    moreNeeded = this._push(frame);
   1.285 +  }
   1.286 +
   1.287 +  if (moreNeeded === null) {
   1.288 +    this._queue.push(frame);
   1.289 +  }
   1.290 +
   1.291 +  return moreNeeded;
   1.292 +};
   1.293 +
   1.294 +// `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the
   1.295 +// [Stream](stream.html) class to mark the last frame with END_STREAM flag.
   1.296 +Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() {
   1.297 +  var readableQueue = this._readableState.buffer;
   1.298 +  return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1];
   1.299 +};
   1.300 +
   1.301 +// Outgoing frames - managing the window size
   1.302 +// ------------------------------------------
   1.303 +
   1.304 +// Flow control window size is manipulated using the `_increaseWindow` method.
   1.305 +//
   1.306 +// * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled
   1.307 +//   again once disabled. Any attempt to re-enable flow control MUST be rejected with a
   1.308 +//   FLOW_CONTROL_ERROR error code.
   1.309 +// * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken
   1.310 +//   depends on it being a stream or the connection itself.
   1.311 +
   1.312 +var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
   1.313 +
   1.314 +Flow.prototype._increaseWindow = function _increaseWindow(size) {
   1.315 +  if ((this._window === Infinity) && (size !== Infinity)) {
   1.316 +    this._log.error('Trying to increase flow control window after flow control was turned off.');
   1.317 +    this.emit('error', 'FLOW_CONTROL_ERROR');
   1.318 +  } else {
   1.319 +    this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.');
   1.320 +    this._window += size;
   1.321 +    if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) {
   1.322 +      this._log.error('Flow control window grew too large.');
   1.323 +      this.emit('error', 'FLOW_CONTROL_ERROR');
   1.324 +    } else {
   1.325 +      this.emit('window_update');
   1.326 +    }
   1.327 +  }
   1.328 +};
   1.329 +
   1.330 +// The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It
   1.331 +// modifies the flow control window:
   1.332 +//
   1.333 +// * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the
   1.334 +//   END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL
   1.335 +//   flag set is ignored.
   1.336 +// * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount
   1.337 +//   specified in the frame.
   1.338 +Flow.prototype._updateWindow = function _updateWindow(frame) {
   1.339 +  this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size);
   1.340 +};
   1.341 +
   1.342 +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
   1.343 +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by
   1.344 +// calling the `setInitialWindow` method. The window size has to be modified by the difference
   1.345 +// between the new value and the old value.
   1.346 +Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) {
   1.347 +  this._increaseWindow(initialWindow - this._initialWindow);
   1.348 +  this._initialWindow = initialWindow;
   1.349 +};

mercurial