testing/xpcshell/node-http2/node_modules/http2-protocol/lib/flow.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 var assert = require('assert');
     3 // The Flow class
     4 // ==============
     6 // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be
     7 // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html).
     8 // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex
    10 var Duplex  = require('stream').Duplex;
    12 exports.Flow = Flow;
    14 // Public API
    15 // ----------
    17 // * **Event: 'error' (type)**: signals an error
    18 //
    19 // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time*
    20 //   ([as described in the standard][1]) using this method
    21 //
    22 // [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2
    24 // API for child classes
    25 // ---------------------
    27 // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames
    28 //   with the given `flowControlId` (or every update frame if not given)
    29 //
    30 // * **_send()**: called when more frames should be pushed. The child class is expected to override
    31 //   this (instead of the `_read` method of the Duplex class).
    32 //
    33 // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is
    34 //   expected to override this (instead of the `_write` method of the Duplex class).
    35 //
    36 // * **push(frame): bool**: schedules `frame` for sending.
    37 //
    38 //   Returns `true` if it needs more frames in the output queue, `false` if the output queue is
    39 //   full, and `null` if did not push the frame into the output queue (instead, it pushed it into
    40 //   the flow control queue).
    41 //
    42 // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA
    43 //   frames, length of the payload for DATA frames) of the returned frame will be under `limit`.
    44 //   Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the
    45 //   same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0).
    46 //
    47 // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers
    48 //
    49 // * **_log**: the Flow class uses the `_log` object of the parent
    51 // Constructor
    52 // -----------
    54 // When a HTTP/2.0 connection is first established, new streams are created with an initial flow
    55 // control window size of 65535 bytes.
    56 var INITIAL_WINDOW_SIZE = 65535;
    58 // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched.
    59 function Flow(flowControlId) {
    60   Duplex.call(this, { objectMode: true });
    62   this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
    63   this._flowControlId = flowControlId;
    64   this._queue = [];
    65   this._ended = false;
    66   this._received = 0;
    67 }
    68 Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });
    70 // Incoming frames
    71 // ---------------
    73 // `_receive` is called when there's an incoming frame.
    74 Flow.prototype._receive = function _receive(frame, callback) {
    75   throw new Error('The _receive(frame, callback) method has to be overridden by the child class!');
    76 };
    78 // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s
    79 // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the
    80 // incoming frame is a WINDOW_UPDATE.
    81 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
    82 Flow.prototype._write = function _write(frame, encoding, callback) {
    83   if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) {
    84     this._ended = true;
    85   }
    87   if ((frame.type === 'DATA') && (frame.data.length > 0)) {
    88     this._receive(frame, function() {
    89       this._received += frame.data.length;
    90       if (!this._restoreWindowTimer) {
    91         this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
    92       }
    93       callback();
    94     }.bind(this));
    95   }
    97   else {
    98     this._receive(frame, callback);
    99   }
   101   if ((frame.type === 'WINDOW_UPDATE') &&
   102       ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) {
   103     this._updateWindow(frame);
   104   }
   105 };
   107 // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
   108 // a WINDOW_UPDATE that restores the flow control window of the remote end.
   109 Flow.prototype._restoreWindow = function _restoreWindow() {
   110   delete this._restoreWindowTimer;
   111   if (!this._ended && (this._received > 0)) {
   112     this.push({
   113       type: 'WINDOW_UPDATE',
   114       flags: {},
   115       stream: this._flowControlId,
   116       window_size: this._received
   117     });
   118     this._received = 0;
   119   }
   120 };
   122 // Outgoing frames - sending procedure
   123 // -----------------------------------
   125 //                                         flow
   126 //                +-------------------------------------------------+
   127 //                |                                                 |
   128 //                +--------+           +---------+                  |
   129 //        read()  | output |  _read()  | flow    |  _send()         |
   130 //     <----------|        |<----------| control |<-------------    |
   131 //                | buffer |           | buffer  |                  |
   132 //                +--------+           +---------+                  |
   133 //                | input  |                                        |
   134 //     ---------->|        |----------------------------------->    |
   135 //       write()  | buffer |  _write()              _receive()      |
   136 //                +--------+                                        |
   137 //                |                                                 |
   138 //                +-------------------------------------------------+
   140 // `_send` is called when more frames should be pushed to the output buffer.
   141 Flow.prototype._send = function _send() {
   142   throw new Error('The _send() method has to be overridden by the child class!');
   143 };
   145 // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more
   146 // items in the output queue.
   147 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
   148 Flow.prototype._read = function _read() {
   149   // * if the flow control queue is empty, then let the user push more frames
   150   if (this._queue.length === 0) {
   151     this._send();
   152   }
   154   // * if there are items in the flow control queue, then let's put them into the output queue (to
   155   //   the extent it is possible with respect to the window size and output queue feedback)
   156   else if (this._window > 0) {
   157     this._readableState.sync = true; // to avoid reentrant calls
   158     do {
   159       var moreNeeded = this._push(this._queue[0]);
   160       if (moreNeeded !== null) {
   161         this._queue.shift();
   162       }
   163     } while (moreNeeded && (this._queue.length > 0));
   164     this._readableState.sync = false;
   166     assert((moreNeeded == false) ||                              // * output queue is full
   167            (this._queue.length === 0) ||                         // * flow control queue is empty
   168            (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update
   169   }
   171   // * otherwise, come back when the flow control window is positive
   172   else {
   173     this.once('window_update', this._read);
   174   }
   175 };
   177 var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383
   179 // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control
   180 // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will
   181 // be under `limit`.
   182 Flow.prototype.read = function read(limit) {
   183   if (limit === 0) {
   184     return Duplex.prototype.read.call(this, 0);
   185   } else if (limit === -1) {
   186     limit = 0;
   187   } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) {
   188     limit = MAX_PAYLOAD_SIZE;
   189   }
   191   // * Looking at the first frame in the queue without pulling it out if possible. This will save
   192   //   a costly unshift if the frame proves to be too large to return.
   193   var firstInQueue = this._readableState.buffer[0];
   194   var frame = firstInQueue || Duplex.prototype.read.call(this);
   196   if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) {
   197     if (firstInQueue) {
   198       Duplex.prototype.read.call(this);
   199     }
   200     return frame;
   201   }
   203   else if (limit <= 0) {
   204     if (!firstInQueue) {
   205       this.unshift(frame);
   206     }
   207     return null;
   208   }
   210   else {
   211     this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit },
   212                     'Splitting out forwardable part of a DATA frame.');
   213     var forwardable = {
   214       type: 'DATA',
   215       flags: {},
   216       stream: frame.stream,
   217       data: frame.data.slice(0, limit)
   218     };
   219     frame.data = frame.data.slice(limit);
   221     if (!firstInQueue) {
   222       this.unshift(frame);
   223     }
   224     return forwardable;
   225   }
   226 };
   228 // `_parentPush` pushes the given `frame` into the output queue
   229 Flow.prototype._parentPush = function _parentPush(frame) {
   230   this._log.trace({ frame: frame }, 'Pushing frame into the output queue');
   232   if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) {
   233     this._log.trace({ window: this._window, by: frame.data.length },
   234                     'Decreasing flow control window size.');
   235     this._window -= frame.data.length;
   236     assert(this._window >= 0);
   237   }
   239   return Duplex.prototype.push.call(this, frame);
   240 };
   242 // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size.
   243 // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to
   244 // push the whole frame. The return value is similar to `push` except that it returns `null` if it
   245 // did not push the whole frame to the output queue (but maybe it did push part of the frame).
   246 Flow.prototype._push = function _push(frame) {
   247   var data = frame && (frame.type === 'DATA') && frame.data;
   249   if (!data || (data.length <= this._window)) {
   250     return this._parentPush(frame);
   251   }
   253   else if (this._window <= 0) {
   254     return null;
   255   }
   257   else {
   258     this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window },
   259                     'Splitting out forwardable part of a DATA frame.');
   260     frame.data = data.slice(this._window);
   261     this._parentPush({
   262       type: 'DATA',
   263       flags: {},
   264       stream: frame.stream,
   265       data: data.slice(0, this._window)
   266     });
   267     return null;
   268   }
   269 };
   271 // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
   272 Flow.prototype.push = function push(frame) {
   273   if (frame === null) {
   274     this._log.debug('Enqueueing outgoing End Of Stream');
   275   } else {
   276     this._log.debug({ frame: frame }, 'Enqueueing outgoing frame');
   277   }
   279   var moreNeeded = null;
   280   if (this._queue.length === 0) {
   281     moreNeeded = this._push(frame);
   282   }
   284   if (moreNeeded === null) {
   285     this._queue.push(frame);
   286   }
   288   return moreNeeded;
   289 };
   291 // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the
   292 // [Stream](stream.html) class to mark the last frame with END_STREAM flag.
   293 Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() {
   294   var readableQueue = this._readableState.buffer;
   295   return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1];
   296 };
   298 // Outgoing frames - managing the window size
   299 // ------------------------------------------
   301 // Flow control window size is manipulated using the `_increaseWindow` method.
   302 //
   303 // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled
   304 //   again once disabled. Any attempt to re-enable flow control MUST be rejected with a
   305 //   FLOW_CONTROL_ERROR error code.
   306 // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken
   307 //   depends on it being a stream or the connection itself.
   309 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
   311 Flow.prototype._increaseWindow = function _increaseWindow(size) {
   312   if ((this._window === Infinity) && (size !== Infinity)) {
   313     this._log.error('Trying to increase flow control window after flow control was turned off.');
   314     this.emit('error', 'FLOW_CONTROL_ERROR');
   315   } else {
   316     this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.');
   317     this._window += size;
   318     if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) {
   319       this._log.error('Flow control window grew too large.');
   320       this.emit('error', 'FLOW_CONTROL_ERROR');
   321     } else {
   322       this.emit('window_update');
   323     }
   324   }
   325 };
   327 // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It
   328 // modifies the flow control window:
   329 //
   330 // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the
   331 //   END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL
   332 //   flag set is ignored.
   333 // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount
   334 //   specified in the frame.
   335 Flow.prototype._updateWindow = function _updateWindow(frame) {
   336   this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size);
   337 };
   339 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
   340 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by
   341 // calling the `setInitialWindow` method. The window size has to be modified by the difference
   342 // between the new value and the old value.
   343 Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) {
   344   this._increaseWindow(initialWindow - this._initialWindow);
   345   this._initialWindow = initialWindow;
   346 };

mercurial