Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
michael@0 | 1 | var assert = require('assert'); |
michael@0 | 2 | |
michael@0 | 3 | // The Flow class |
michael@0 | 4 | // ============== |
michael@0 | 5 | |
michael@0 | 6 | // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be |
michael@0 | 7 | // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). |
michael@0 | 8 | // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex |
michael@0 | 9 | |
michael@0 | 10 | var Duplex = require('stream').Duplex; |
michael@0 | 11 | |
michael@0 | 12 | exports.Flow = Flow; |
michael@0 | 13 | |
michael@0 | 14 | // Public API |
michael@0 | 15 | // ---------- |
michael@0 | 16 | |
michael@0 | 17 | // * **Event: 'error' (type)**: signals an error |
michael@0 | 18 | // |
michael@0 | 19 | // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* |
michael@0 | 20 | // ([as described in the standard][1]) using this method |
michael@0 | 21 | // |
michael@0 | 22 | // [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2 |
michael@0 | 23 | |
michael@0 | 24 | // API for child classes |
michael@0 | 25 | // --------------------- |
michael@0 | 26 | |
michael@0 | 27 | // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames |
michael@0 | 28 | // with the given `flowControlId` (or every update frame if not given) |
michael@0 | 29 | // |
michael@0 | 30 | // * **_send()**: called when more frames should be pushed. The child class is expected to override |
michael@0 | 31 | // this (instead of the `_read` method of the Duplex class). |
michael@0 | 32 | // |
michael@0 | 33 | // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is |
michael@0 | 34 | // expected to override this (instead of the `_write` method of the Duplex class). |
michael@0 | 35 | // |
michael@0 | 36 | // * **push(frame): bool**: schedules `frame` for sending. |
michael@0 | 37 | // |
michael@0 | 38 | // Returns `true` if it needs more frames in the output queue, `false` if the output queue is |
michael@0 | 39 | // full, and `null` if did not push the frame into the output queue (instead, it pushed it into |
michael@0 | 40 | // the flow control queue). |
michael@0 | 41 | // |
michael@0 | 42 | // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA |
michael@0 | 43 | // frames, length of the payload for DATA frames) of the returned frame will be under `limit`. |
michael@0 | 44 | // Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the |
michael@0 | 45 | // same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0). |
michael@0 | 46 | // |
michael@0 | 47 | // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers |
michael@0 | 48 | // |
michael@0 | 49 | // * **_log**: the Flow class uses the `_log` object of the parent |
michael@0 | 50 | |
michael@0 | 51 | // Constructor |
michael@0 | 52 | // ----------- |
michael@0 | 53 | |
michael@0 | 54 | // When a HTTP/2.0 connection is first established, new streams are created with an initial flow |
michael@0 | 55 | // control window size of 65535 bytes. |
michael@0 | 56 | var INITIAL_WINDOW_SIZE = 65535; |
michael@0 | 57 | |
michael@0 | 58 | // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. |
michael@0 | 59 | function Flow(flowControlId) { |
michael@0 | 60 | Duplex.call(this, { objectMode: true }); |
michael@0 | 61 | |
michael@0 | 62 | this._window = this._initialWindow = INITIAL_WINDOW_SIZE; |
michael@0 | 63 | this._flowControlId = flowControlId; |
michael@0 | 64 | this._queue = []; |
michael@0 | 65 | this._ended = false; |
michael@0 | 66 | this._received = 0; |
michael@0 | 67 | } |
michael@0 | 68 | Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); |
michael@0 | 69 | |
michael@0 | 70 | // Incoming frames |
michael@0 | 71 | // --------------- |
michael@0 | 72 | |
michael@0 | 73 | // `_receive` is called when there's an incoming frame. |
michael@0 | 74 | Flow.prototype._receive = function _receive(frame, callback) { |
michael@0 | 75 | throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); |
michael@0 | 76 | }; |
michael@0 | 77 | |
michael@0 | 78 | // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s |
michael@0 | 79 | // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the |
michael@0 | 80 | // incoming frame is a WINDOW_UPDATE. |
michael@0 | 81 | // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 |
michael@0 | 82 | Flow.prototype._write = function _write(frame, encoding, callback) { |
michael@0 | 83 | if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) { |
michael@0 | 84 | this._ended = true; |
michael@0 | 85 | } |
michael@0 | 86 | |
michael@0 | 87 | if ((frame.type === 'DATA') && (frame.data.length > 0)) { |
michael@0 | 88 | this._receive(frame, function() { |
michael@0 | 89 | this._received += frame.data.length; |
michael@0 | 90 | if (!this._restoreWindowTimer) { |
michael@0 | 91 | this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); |
michael@0 | 92 | } |
michael@0 | 93 | callback(); |
michael@0 | 94 | }.bind(this)); |
michael@0 | 95 | } |
michael@0 | 96 | |
michael@0 | 97 | else { |
michael@0 | 98 | this._receive(frame, callback); |
michael@0 | 99 | } |
michael@0 | 100 | |
michael@0 | 101 | if ((frame.type === 'WINDOW_UPDATE') && |
michael@0 | 102 | ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) { |
michael@0 | 103 | this._updateWindow(frame); |
michael@0 | 104 | } |
michael@0 | 105 | }; |
michael@0 | 106 | |
michael@0 | 107 | // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends |
michael@0 | 108 | // a WINDOW_UPDATE that restores the flow control window of the remote end. |
michael@0 | 109 | Flow.prototype._restoreWindow = function _restoreWindow() { |
michael@0 | 110 | delete this._restoreWindowTimer; |
michael@0 | 111 | if (!this._ended && (this._received > 0)) { |
michael@0 | 112 | this.push({ |
michael@0 | 113 | type: 'WINDOW_UPDATE', |
michael@0 | 114 | flags: {}, |
michael@0 | 115 | stream: this._flowControlId, |
michael@0 | 116 | window_size: this._received |
michael@0 | 117 | }); |
michael@0 | 118 | this._received = 0; |
michael@0 | 119 | } |
michael@0 | 120 | }; |
michael@0 | 121 | |
michael@0 | 122 | // Outgoing frames - sending procedure |
michael@0 | 123 | // ----------------------------------- |
michael@0 | 124 | |
michael@0 | 125 | // flow |
michael@0 | 126 | // +-------------------------------------------------+ |
michael@0 | 127 | // | | |
michael@0 | 128 | // +--------+ +---------+ | |
michael@0 | 129 | // read() | output | _read() | flow | _send() | |
michael@0 | 130 | // <----------| |<----------| control |<------------- | |
michael@0 | 131 | // | buffer | | buffer | | |
michael@0 | 132 | // +--------+ +---------+ | |
michael@0 | 133 | // | input | | |
michael@0 | 134 | // ---------->| |-----------------------------------> | |
michael@0 | 135 | // write() | buffer | _write() _receive() | |
michael@0 | 136 | // +--------+ | |
michael@0 | 137 | // | | |
michael@0 | 138 | // +-------------------------------------------------+ |
michael@0 | 139 | |
michael@0 | 140 | // `_send` is called when more frames should be pushed to the output buffer. |
michael@0 | 141 | Flow.prototype._send = function _send() { |
michael@0 | 142 | throw new Error('The _send() method has to be overridden by the child class!'); |
michael@0 | 143 | }; |
michael@0 | 144 | |
michael@0 | 145 | // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more |
michael@0 | 146 | // items in the output queue. |
michael@0 | 147 | // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 |
michael@0 | 148 | Flow.prototype._read = function _read() { |
michael@0 | 149 | // * if the flow control queue is empty, then let the user push more frames |
michael@0 | 150 | if (this._queue.length === 0) { |
michael@0 | 151 | this._send(); |
michael@0 | 152 | } |
michael@0 | 153 | |
michael@0 | 154 | // * if there are items in the flow control queue, then let's put them into the output queue (to |
michael@0 | 155 | // the extent it is possible with respect to the window size and output queue feedback) |
michael@0 | 156 | else if (this._window > 0) { |
michael@0 | 157 | this._readableState.sync = true; // to avoid reentrant calls |
michael@0 | 158 | do { |
michael@0 | 159 | var moreNeeded = this._push(this._queue[0]); |
michael@0 | 160 | if (moreNeeded !== null) { |
michael@0 | 161 | this._queue.shift(); |
michael@0 | 162 | } |
michael@0 | 163 | } while (moreNeeded && (this._queue.length > 0)); |
michael@0 | 164 | this._readableState.sync = false; |
michael@0 | 165 | |
michael@0 | 166 | assert((moreNeeded == false) || // * output queue is full |
michael@0 | 167 | (this._queue.length === 0) || // * flow control queue is empty |
michael@0 | 168 | (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update |
michael@0 | 169 | } |
michael@0 | 170 | |
michael@0 | 171 | // * otherwise, come back when the flow control window is positive |
michael@0 | 172 | else { |
michael@0 | 173 | this.once('window_update', this._read); |
michael@0 | 174 | } |
michael@0 | 175 | }; |
michael@0 | 176 | |
michael@0 | 177 | var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 |
michael@0 | 178 | |
michael@0 | 179 | // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control |
michael@0 | 180 | // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will |
michael@0 | 181 | // be under `limit`. |
michael@0 | 182 | Flow.prototype.read = function read(limit) { |
michael@0 | 183 | if (limit === 0) { |
michael@0 | 184 | return Duplex.prototype.read.call(this, 0); |
michael@0 | 185 | } else if (limit === -1) { |
michael@0 | 186 | limit = 0; |
michael@0 | 187 | } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { |
michael@0 | 188 | limit = MAX_PAYLOAD_SIZE; |
michael@0 | 189 | } |
michael@0 | 190 | |
michael@0 | 191 | // * Looking at the first frame in the queue without pulling it out if possible. This will save |
michael@0 | 192 | // a costly unshift if the frame proves to be too large to return. |
michael@0 | 193 | var firstInQueue = this._readableState.buffer[0]; |
michael@0 | 194 | var frame = firstInQueue || Duplex.prototype.read.call(this); |
michael@0 | 195 | |
michael@0 | 196 | if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) { |
michael@0 | 197 | if (firstInQueue) { |
michael@0 | 198 | Duplex.prototype.read.call(this); |
michael@0 | 199 | } |
michael@0 | 200 | return frame; |
michael@0 | 201 | } |
michael@0 | 202 | |
michael@0 | 203 | else if (limit <= 0) { |
michael@0 | 204 | if (!firstInQueue) { |
michael@0 | 205 | this.unshift(frame); |
michael@0 | 206 | } |
michael@0 | 207 | return null; |
michael@0 | 208 | } |
michael@0 | 209 | |
michael@0 | 210 | else { |
michael@0 | 211 | this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, |
michael@0 | 212 | 'Splitting out forwardable part of a DATA frame.'); |
michael@0 | 213 | var forwardable = { |
michael@0 | 214 | type: 'DATA', |
michael@0 | 215 | flags: {}, |
michael@0 | 216 | stream: frame.stream, |
michael@0 | 217 | data: frame.data.slice(0, limit) |
michael@0 | 218 | }; |
michael@0 | 219 | frame.data = frame.data.slice(limit); |
michael@0 | 220 | |
michael@0 | 221 | if (!firstInQueue) { |
michael@0 | 222 | this.unshift(frame); |
michael@0 | 223 | } |
michael@0 | 224 | return forwardable; |
michael@0 | 225 | } |
michael@0 | 226 | }; |
michael@0 | 227 | |
michael@0 | 228 | // `_parentPush` pushes the given `frame` into the output queue |
michael@0 | 229 | Flow.prototype._parentPush = function _parentPush(frame) { |
michael@0 | 230 | this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); |
michael@0 | 231 | |
michael@0 | 232 | if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { |
michael@0 | 233 | this._log.trace({ window: this._window, by: frame.data.length }, |
michael@0 | 234 | 'Decreasing flow control window size.'); |
michael@0 | 235 | this._window -= frame.data.length; |
michael@0 | 236 | assert(this._window >= 0); |
michael@0 | 237 | } |
michael@0 | 238 | |
michael@0 | 239 | return Duplex.prototype.push.call(this, frame); |
michael@0 | 240 | }; |
michael@0 | 241 | |
michael@0 | 242 | // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. |
michael@0 | 243 | // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to |
michael@0 | 244 | // push the whole frame. The return value is similar to `push` except that it returns `null` if it |
michael@0 | 245 | // did not push the whole frame to the output queue (but maybe it did push part of the frame). |
michael@0 | 246 | Flow.prototype._push = function _push(frame) { |
michael@0 | 247 | var data = frame && (frame.type === 'DATA') && frame.data; |
michael@0 | 248 | |
michael@0 | 249 | if (!data || (data.length <= this._window)) { |
michael@0 | 250 | return this._parentPush(frame); |
michael@0 | 251 | } |
michael@0 | 252 | |
michael@0 | 253 | else if (this._window <= 0) { |
michael@0 | 254 | return null; |
michael@0 | 255 | } |
michael@0 | 256 | |
michael@0 | 257 | else { |
michael@0 | 258 | this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, |
michael@0 | 259 | 'Splitting out forwardable part of a DATA frame.'); |
michael@0 | 260 | frame.data = data.slice(this._window); |
michael@0 | 261 | this._parentPush({ |
michael@0 | 262 | type: 'DATA', |
michael@0 | 263 | flags: {}, |
michael@0 | 264 | stream: frame.stream, |
michael@0 | 265 | data: data.slice(0, this._window) |
michael@0 | 266 | }); |
michael@0 | 267 | return null; |
michael@0 | 268 | } |
michael@0 | 269 | }; |
michael@0 | 270 | |
michael@0 | 271 | // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue |
michael@0 | 272 | Flow.prototype.push = function push(frame) { |
michael@0 | 273 | if (frame === null) { |
michael@0 | 274 | this._log.debug('Enqueueing outgoing End Of Stream'); |
michael@0 | 275 | } else { |
michael@0 | 276 | this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); |
michael@0 | 277 | } |
michael@0 | 278 | |
michael@0 | 279 | var moreNeeded = null; |
michael@0 | 280 | if (this._queue.length === 0) { |
michael@0 | 281 | moreNeeded = this._push(frame); |
michael@0 | 282 | } |
michael@0 | 283 | |
michael@0 | 284 | if (moreNeeded === null) { |
michael@0 | 285 | this._queue.push(frame); |
michael@0 | 286 | } |
michael@0 | 287 | |
michael@0 | 288 | return moreNeeded; |
michael@0 | 289 | }; |
michael@0 | 290 | |
michael@0 | 291 | // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the |
michael@0 | 292 | // [Stream](stream.html) class to mark the last frame with END_STREAM flag. |
michael@0 | 293 | Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { |
michael@0 | 294 | var readableQueue = this._readableState.buffer; |
michael@0 | 295 | return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; |
michael@0 | 296 | }; |
michael@0 | 297 | |
michael@0 | 298 | // Outgoing frames - managing the window size |
michael@0 | 299 | // ------------------------------------------ |
michael@0 | 300 | |
michael@0 | 301 | // Flow control window size is manipulated using the `_increaseWindow` method. |
michael@0 | 302 | // |
michael@0 | 303 | // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled |
michael@0 | 304 | // again once disabled. Any attempt to re-enable flow control MUST be rejected with a |
michael@0 | 305 | // FLOW_CONTROL_ERROR error code. |
michael@0 | 306 | // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken |
michael@0 | 307 | // depends on it being a stream or the connection itself. |
michael@0 | 308 | |
michael@0 | 309 | var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; |
michael@0 | 310 | |
michael@0 | 311 | Flow.prototype._increaseWindow = function _increaseWindow(size) { |
michael@0 | 312 | if ((this._window === Infinity) && (size !== Infinity)) { |
michael@0 | 313 | this._log.error('Trying to increase flow control window after flow control was turned off.'); |
michael@0 | 314 | this.emit('error', 'FLOW_CONTROL_ERROR'); |
michael@0 | 315 | } else { |
michael@0 | 316 | this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); |
michael@0 | 317 | this._window += size; |
michael@0 | 318 | if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { |
michael@0 | 319 | this._log.error('Flow control window grew too large.'); |
michael@0 | 320 | this.emit('error', 'FLOW_CONTROL_ERROR'); |
michael@0 | 321 | } else { |
michael@0 | 322 | this.emit('window_update'); |
michael@0 | 323 | } |
michael@0 | 324 | } |
michael@0 | 325 | }; |
michael@0 | 326 | |
michael@0 | 327 | // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It |
michael@0 | 328 | // modifies the flow control window: |
michael@0 | 329 | // |
michael@0 | 330 | // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the |
michael@0 | 331 | // END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL |
michael@0 | 332 | // flag set is ignored. |
michael@0 | 333 | // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount |
michael@0 | 334 | // specified in the frame. |
michael@0 | 335 | Flow.prototype._updateWindow = function _updateWindow(frame) { |
michael@0 | 336 | this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); |
michael@0 | 337 | }; |
michael@0 | 338 | |
michael@0 | 339 | // A SETTINGS frame can alter the initial flow control window size for all current streams. When the |
michael@0 | 340 | // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by |
michael@0 | 341 | // calling the `setInitialWindow` method. The window size has to be modified by the difference |
michael@0 | 342 | // between the new value and the old value. |
michael@0 | 343 | Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { |
michael@0 | 344 | this._increaseWindow(initialWindow - this._initialWindow); |
michael@0 | 345 | this._initialWindow = initialWindow; |
michael@0 | 346 | }; |