testing/xpcshell/node-http2/node_modules/http2-protocol/lib/flow.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 var assert = require('assert');
michael@0 2
michael@0 3 // The Flow class
michael@0 4 // ==============
michael@0 5
michael@0 6 // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be
michael@0 7 // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html).
michael@0 8 // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex
michael@0 9
michael@0 10 var Duplex = require('stream').Duplex;
michael@0 11
michael@0 12 exports.Flow = Flow;
michael@0 13
michael@0 14 // Public API
michael@0 15 // ----------
michael@0 16
michael@0 17 // * **Event: 'error' (type)**: signals an error
michael@0 18 //
michael@0 19 // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time*
michael@0 20 // ([as described in the standard][1]) using this method
michael@0 21 //
michael@0 22 // [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2
michael@0 23
michael@0 24 // API for child classes
michael@0 25 // ---------------------
michael@0 26
michael@0 27 // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames
michael@0 28 // with the given `flowControlId` (or every update frame if not given)
michael@0 29 //
michael@0 30 // * **_send()**: called when more frames should be pushed. The child class is expected to override
michael@0 31 // this (instead of the `_read` method of the Duplex class).
michael@0 32 //
michael@0 33 // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is
michael@0 34 // expected to override this (instead of the `_write` method of the Duplex class).
michael@0 35 //
michael@0 36 // * **push(frame): bool**: schedules `frame` for sending.
michael@0 37 //
michael@0 38 // Returns `true` if it needs more frames in the output queue, `false` if the output queue is
michael@0 39 // full, and `null` if did not push the frame into the output queue (instead, it pushed it into
michael@0 40 // the flow control queue).
michael@0 41 //
michael@0 42 // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA
michael@0 43 // frames, length of the payload for DATA frames) of the returned frame will be under `limit`.
michael@0 44 // Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the
michael@0 45 // same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0).
michael@0 46 //
michael@0 47 // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers
michael@0 48 //
michael@0 49 // * **_log**: the Flow class uses the `_log` object of the parent
michael@0 50
michael@0 51 // Constructor
michael@0 52 // -----------
michael@0 53
michael@0 54 // When a HTTP/2.0 connection is first established, new streams are created with an initial flow
michael@0 55 // control window size of 65535 bytes.
michael@0 56 var INITIAL_WINDOW_SIZE = 65535;
michael@0 57
michael@0 58 // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched.
michael@0 59 function Flow(flowControlId) {
michael@0 60 Duplex.call(this, { objectMode: true });
michael@0 61
michael@0 62 this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
michael@0 63 this._flowControlId = flowControlId;
michael@0 64 this._queue = [];
michael@0 65 this._ended = false;
michael@0 66 this._received = 0;
michael@0 67 }
michael@0 68 Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });
michael@0 69
michael@0 70 // Incoming frames
michael@0 71 // ---------------
michael@0 72
michael@0 73 // `_receive` is called when there's an incoming frame.
michael@0 74 Flow.prototype._receive = function _receive(frame, callback) {
michael@0 75 throw new Error('The _receive(frame, callback) method has to be overridden by the child class!');
michael@0 76 };
michael@0 77
michael@0 78 // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s
michael@0 79 // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the
michael@0 80 // incoming frame is a WINDOW_UPDATE.
michael@0 81 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
michael@0 82 Flow.prototype._write = function _write(frame, encoding, callback) {
michael@0 83 if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) {
michael@0 84 this._ended = true;
michael@0 85 }
michael@0 86
michael@0 87 if ((frame.type === 'DATA') && (frame.data.length > 0)) {
michael@0 88 this._receive(frame, function() {
michael@0 89 this._received += frame.data.length;
michael@0 90 if (!this._restoreWindowTimer) {
michael@0 91 this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
michael@0 92 }
michael@0 93 callback();
michael@0 94 }.bind(this));
michael@0 95 }
michael@0 96
michael@0 97 else {
michael@0 98 this._receive(frame, callback);
michael@0 99 }
michael@0 100
michael@0 101 if ((frame.type === 'WINDOW_UPDATE') &&
michael@0 102 ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) {
michael@0 103 this._updateWindow(frame);
michael@0 104 }
michael@0 105 };
michael@0 106
michael@0 107 // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
michael@0 108 // a WINDOW_UPDATE that restores the flow control window of the remote end.
michael@0 109 Flow.prototype._restoreWindow = function _restoreWindow() {
michael@0 110 delete this._restoreWindowTimer;
michael@0 111 if (!this._ended && (this._received > 0)) {
michael@0 112 this.push({
michael@0 113 type: 'WINDOW_UPDATE',
michael@0 114 flags: {},
michael@0 115 stream: this._flowControlId,
michael@0 116 window_size: this._received
michael@0 117 });
michael@0 118 this._received = 0;
michael@0 119 }
michael@0 120 };
michael@0 121
michael@0 122 // Outgoing frames - sending procedure
michael@0 123 // -----------------------------------
michael@0 124
michael@0 125 // flow
michael@0 126 // +-------------------------------------------------+
michael@0 127 // | |
michael@0 128 // +--------+ +---------+ |
michael@0 129 // read() | output | _read() | flow | _send() |
michael@0 130 // <----------| |<----------| control |<------------- |
michael@0 131 // | buffer | | buffer | |
michael@0 132 // +--------+ +---------+ |
michael@0 133 // | input | |
michael@0 134 // ---------->| |-----------------------------------> |
michael@0 135 // write() | buffer | _write() _receive() |
michael@0 136 // +--------+ |
michael@0 137 // | |
michael@0 138 // +-------------------------------------------------+
michael@0 139
michael@0 140 // `_send` is called when more frames should be pushed to the output buffer.
michael@0 141 Flow.prototype._send = function _send() {
michael@0 142 throw new Error('The _send() method has to be overridden by the child class!');
michael@0 143 };
michael@0 144
michael@0 145 // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more
michael@0 146 // items in the output queue.
michael@0 147 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
michael@0 148 Flow.prototype._read = function _read() {
michael@0 149 // * if the flow control queue is empty, then let the user push more frames
michael@0 150 if (this._queue.length === 0) {
michael@0 151 this._send();
michael@0 152 }
michael@0 153
michael@0 154 // * if there are items in the flow control queue, then let's put them into the output queue (to
michael@0 155 // the extent it is possible with respect to the window size and output queue feedback)
michael@0 156 else if (this._window > 0) {
michael@0 157 this._readableState.sync = true; // to avoid reentrant calls
michael@0 158 do {
michael@0 159 var moreNeeded = this._push(this._queue[0]);
michael@0 160 if (moreNeeded !== null) {
michael@0 161 this._queue.shift();
michael@0 162 }
michael@0 163 } while (moreNeeded && (this._queue.length > 0));
michael@0 164 this._readableState.sync = false;
michael@0 165
michael@0 166 assert((moreNeeded == false) || // * output queue is full
michael@0 167 (this._queue.length === 0) || // * flow control queue is empty
michael@0 168 (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update
michael@0 169 }
michael@0 170
michael@0 171 // * otherwise, come back when the flow control window is positive
michael@0 172 else {
michael@0 173 this.once('window_update', this._read);
michael@0 174 }
michael@0 175 };
michael@0 176
michael@0 177 var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383
michael@0 178
michael@0 179 // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control
michael@0 180 // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will
michael@0 181 // be under `limit`.
michael@0 182 Flow.prototype.read = function read(limit) {
michael@0 183 if (limit === 0) {
michael@0 184 return Duplex.prototype.read.call(this, 0);
michael@0 185 } else if (limit === -1) {
michael@0 186 limit = 0;
michael@0 187 } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) {
michael@0 188 limit = MAX_PAYLOAD_SIZE;
michael@0 189 }
michael@0 190
michael@0 191 // * Looking at the first frame in the queue without pulling it out if possible. This will save
michael@0 192 // a costly unshift if the frame proves to be too large to return.
michael@0 193 var firstInQueue = this._readableState.buffer[0];
michael@0 194 var frame = firstInQueue || Duplex.prototype.read.call(this);
michael@0 195
michael@0 196 if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) {
michael@0 197 if (firstInQueue) {
michael@0 198 Duplex.prototype.read.call(this);
michael@0 199 }
michael@0 200 return frame;
michael@0 201 }
michael@0 202
michael@0 203 else if (limit <= 0) {
michael@0 204 if (!firstInQueue) {
michael@0 205 this.unshift(frame);
michael@0 206 }
michael@0 207 return null;
michael@0 208 }
michael@0 209
michael@0 210 else {
michael@0 211 this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit },
michael@0 212 'Splitting out forwardable part of a DATA frame.');
michael@0 213 var forwardable = {
michael@0 214 type: 'DATA',
michael@0 215 flags: {},
michael@0 216 stream: frame.stream,
michael@0 217 data: frame.data.slice(0, limit)
michael@0 218 };
michael@0 219 frame.data = frame.data.slice(limit);
michael@0 220
michael@0 221 if (!firstInQueue) {
michael@0 222 this.unshift(frame);
michael@0 223 }
michael@0 224 return forwardable;
michael@0 225 }
michael@0 226 };
michael@0 227
michael@0 228 // `_parentPush` pushes the given `frame` into the output queue
michael@0 229 Flow.prototype._parentPush = function _parentPush(frame) {
michael@0 230 this._log.trace({ frame: frame }, 'Pushing frame into the output queue');
michael@0 231
michael@0 232 if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) {
michael@0 233 this._log.trace({ window: this._window, by: frame.data.length },
michael@0 234 'Decreasing flow control window size.');
michael@0 235 this._window -= frame.data.length;
michael@0 236 assert(this._window >= 0);
michael@0 237 }
michael@0 238
michael@0 239 return Duplex.prototype.push.call(this, frame);
michael@0 240 };
michael@0 241
michael@0 242 // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size.
michael@0 243 // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to
michael@0 244 // push the whole frame. The return value is similar to `push` except that it returns `null` if it
michael@0 245 // did not push the whole frame to the output queue (but maybe it did push part of the frame).
michael@0 246 Flow.prototype._push = function _push(frame) {
michael@0 247 var data = frame && (frame.type === 'DATA') && frame.data;
michael@0 248
michael@0 249 if (!data || (data.length <= this._window)) {
michael@0 250 return this._parentPush(frame);
michael@0 251 }
michael@0 252
michael@0 253 else if (this._window <= 0) {
michael@0 254 return null;
michael@0 255 }
michael@0 256
michael@0 257 else {
michael@0 258 this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window },
michael@0 259 'Splitting out forwardable part of a DATA frame.');
michael@0 260 frame.data = data.slice(this._window);
michael@0 261 this._parentPush({
michael@0 262 type: 'DATA',
michael@0 263 flags: {},
michael@0 264 stream: frame.stream,
michael@0 265 data: data.slice(0, this._window)
michael@0 266 });
michael@0 267 return null;
michael@0 268 }
michael@0 269 };
michael@0 270
michael@0 271 // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
michael@0 272 Flow.prototype.push = function push(frame) {
michael@0 273 if (frame === null) {
michael@0 274 this._log.debug('Enqueueing outgoing End Of Stream');
michael@0 275 } else {
michael@0 276 this._log.debug({ frame: frame }, 'Enqueueing outgoing frame');
michael@0 277 }
michael@0 278
michael@0 279 var moreNeeded = null;
michael@0 280 if (this._queue.length === 0) {
michael@0 281 moreNeeded = this._push(frame);
michael@0 282 }
michael@0 283
michael@0 284 if (moreNeeded === null) {
michael@0 285 this._queue.push(frame);
michael@0 286 }
michael@0 287
michael@0 288 return moreNeeded;
michael@0 289 };
michael@0 290
michael@0 291 // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the
michael@0 292 // [Stream](stream.html) class to mark the last frame with END_STREAM flag.
michael@0 293 Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() {
michael@0 294 var readableQueue = this._readableState.buffer;
michael@0 295 return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1];
michael@0 296 };
michael@0 297
michael@0 298 // Outgoing frames - managing the window size
michael@0 299 // ------------------------------------------
michael@0 300
michael@0 301 // Flow control window size is manipulated using the `_increaseWindow` method.
michael@0 302 //
michael@0 303 // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled
michael@0 304 // again once disabled. Any attempt to re-enable flow control MUST be rejected with a
michael@0 305 // FLOW_CONTROL_ERROR error code.
michael@0 306 // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken
michael@0 307 // depends on it being a stream or the connection itself.
michael@0 308
michael@0 309 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
michael@0 310
michael@0 311 Flow.prototype._increaseWindow = function _increaseWindow(size) {
michael@0 312 if ((this._window === Infinity) && (size !== Infinity)) {
michael@0 313 this._log.error('Trying to increase flow control window after flow control was turned off.');
michael@0 314 this.emit('error', 'FLOW_CONTROL_ERROR');
michael@0 315 } else {
michael@0 316 this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.');
michael@0 317 this._window += size;
michael@0 318 if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) {
michael@0 319 this._log.error('Flow control window grew too large.');
michael@0 320 this.emit('error', 'FLOW_CONTROL_ERROR');
michael@0 321 } else {
michael@0 322 this.emit('window_update');
michael@0 323 }
michael@0 324 }
michael@0 325 };
michael@0 326
michael@0 327 // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It
michael@0 328 // modifies the flow control window:
michael@0 329 //
michael@0 330 // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the
michael@0 331 // END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL
michael@0 332 // flag set is ignored.
michael@0 333 // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount
michael@0 334 // specified in the frame.
michael@0 335 Flow.prototype._updateWindow = function _updateWindow(frame) {
michael@0 336 this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size);
michael@0 337 };
michael@0 338
michael@0 339 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
michael@0 340 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by
michael@0 341 // calling the `setInitialWindow` method. The window size has to be modified by the difference
michael@0 342 // between the new value and the old value.
michael@0 343 Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) {
michael@0 344 this._increaseWindow(initialWindow - this._initialWindow);
michael@0 345 this._initialWindow = initialWindow;
michael@0 346 };

mercurial