1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/flow.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,346 @@ 1.4 +var assert = require('assert'); 1.5 + 1.6 +// The Flow class 1.7 +// ============== 1.8 + 1.9 +// Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be 1.10 +// subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). 1.11 +// [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex 1.12 + 1.13 +var Duplex = require('stream').Duplex; 1.14 + 1.15 +exports.Flow = Flow; 1.16 + 1.17 +// Public API 1.18 +// ---------- 1.19 + 1.20 +// * **Event: 'error' (type)**: signals an error 1.21 +// 1.22 +// * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* 1.23 +// ([as described in the standard][1]) using this method 1.24 +// 1.25 +// [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2 1.26 + 1.27 +// API for child classes 1.28 +// --------------------- 1.29 + 1.30 +// * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames 1.31 +// with the given `flowControlId` (or every update frame if not given) 1.32 +// 1.33 +// * **_send()**: called when more frames should be pushed. The child class is expected to override 1.34 +// this (instead of the `_read` method of the Duplex class). 1.35 +// 1.36 +// * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is 1.37 +// expected to override this (instead of the `_write` method of the Duplex class). 1.38 +// 1.39 +// * **push(frame): bool**: schedules `frame` for sending. 1.40 +// 1.41 +// Returns `true` if it needs more frames in the output queue, `false` if the output queue is 1.42 +// full, and `null` if did not push the frame into the output queue (instead, it pushed it into 1.43 +// the flow control queue). 1.44 +// 1.45 +// * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA 1.46 +// frames, length of the payload for DATA frames) of the returned frame will be under `limit`. 1.47 +// Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the 1.48 +// same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0). 1.49 +// 1.50 +// * **getLastQueuedFrame(): frame**: returns the last frame in output buffers 1.51 +// 1.52 +// * **_log**: the Flow class uses the `_log` object of the parent 1.53 + 1.54 +// Constructor 1.55 +// ----------- 1.56 + 1.57 +// When a HTTP/2.0 connection is first established, new streams are created with an initial flow 1.58 +// control window size of 65535 bytes. 1.59 +var INITIAL_WINDOW_SIZE = 65535; 1.60 + 1.61 +// `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. 1.62 +function Flow(flowControlId) { 1.63 + Duplex.call(this, { objectMode: true }); 1.64 + 1.65 + this._window = this._initialWindow = INITIAL_WINDOW_SIZE; 1.66 + this._flowControlId = flowControlId; 1.67 + this._queue = []; 1.68 + this._ended = false; 1.69 + this._received = 0; 1.70 +} 1.71 +Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); 1.72 + 1.73 +// Incoming frames 1.74 +// --------------- 1.75 + 1.76 +// `_receive` is called when there's an incoming frame. 1.77 +Flow.prototype._receive = function _receive(frame, callback) { 1.78 + throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); 1.79 +}; 1.80 + 1.81 +// `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s 1.82 +// to the flow. It emits the 'receiving' event and notifies the window size tracking code if the 1.83 +// incoming frame is a WINDOW_UPDATE. 1.84 +// [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 1.85 +Flow.prototype._write = function _write(frame, encoding, callback) { 1.86 + if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) { 1.87 + this._ended = true; 1.88 + } 1.89 + 1.90 + if ((frame.type === 'DATA') && (frame.data.length > 0)) { 1.91 + this._receive(frame, function() { 1.92 + this._received += frame.data.length; 1.93 + if (!this._restoreWindowTimer) { 1.94 + this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); 1.95 + } 1.96 + callback(); 1.97 + }.bind(this)); 1.98 + } 1.99 + 1.100 + else { 1.101 + this._receive(frame, callback); 1.102 + } 1.103 + 1.104 + if ((frame.type === 'WINDOW_UPDATE') && 1.105 + ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) { 1.106 + this._updateWindow(frame); 1.107 + } 1.108 +}; 1.109 + 1.110 +// `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends 1.111 +// a WINDOW_UPDATE that restores the flow control window of the remote end. 1.112 +Flow.prototype._restoreWindow = function _restoreWindow() { 1.113 + delete this._restoreWindowTimer; 1.114 + if (!this._ended && (this._received > 0)) { 1.115 + this.push({ 1.116 + type: 'WINDOW_UPDATE', 1.117 + flags: {}, 1.118 + stream: this._flowControlId, 1.119 + window_size: this._received 1.120 + }); 1.121 + this._received = 0; 1.122 + } 1.123 +}; 1.124 + 1.125 +// Outgoing frames - sending procedure 1.126 +// ----------------------------------- 1.127 + 1.128 +// flow 1.129 +// +-------------------------------------------------+ 1.130 +// | | 1.131 +// +--------+ +---------+ | 1.132 +// read() | output | _read() | flow | _send() | 1.133 +// <----------| |<----------| control |<------------- | 1.134 +// | buffer | | buffer | | 1.135 +// +--------+ +---------+ | 1.136 +// | input | | 1.137 +// ---------->| |-----------------------------------> | 1.138 +// write() | buffer | _write() _receive() | 1.139 +// +--------+ | 1.140 +// | | 1.141 +// +-------------------------------------------------+ 1.142 + 1.143 +// `_send` is called when more frames should be pushed to the output buffer. 1.144 +Flow.prototype._send = function _send() { 1.145 + throw new Error('The _send() method has to be overridden by the child class!'); 1.146 +}; 1.147 + 1.148 +// `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more 1.149 +// items in the output queue. 1.150 +// [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 1.151 +Flow.prototype._read = function _read() { 1.152 + // * if the flow control queue is empty, then let the user push more frames 1.153 + if (this._queue.length === 0) { 1.154 + this._send(); 1.155 + } 1.156 + 1.157 + // * if there are items in the flow control queue, then let's put them into the output queue (to 1.158 + // the extent it is possible with respect to the window size and output queue feedback) 1.159 + else if (this._window > 0) { 1.160 + this._readableState.sync = true; // to avoid reentrant calls 1.161 + do { 1.162 + var moreNeeded = this._push(this._queue[0]); 1.163 + if (moreNeeded !== null) { 1.164 + this._queue.shift(); 1.165 + } 1.166 + } while (moreNeeded && (this._queue.length > 0)); 1.167 + this._readableState.sync = false; 1.168 + 1.169 + assert((moreNeeded == false) || // * output queue is full 1.170 + (this._queue.length === 0) || // * flow control queue is empty 1.171 + (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update 1.172 + } 1.173 + 1.174 + // * otherwise, come back when the flow control window is positive 1.175 + else { 1.176 + this.once('window_update', this._read); 1.177 + } 1.178 +}; 1.179 + 1.180 +var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 1.181 + 1.182 +// `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control 1.183 +// size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will 1.184 +// be under `limit`. 1.185 +Flow.prototype.read = function read(limit) { 1.186 + if (limit === 0) { 1.187 + return Duplex.prototype.read.call(this, 0); 1.188 + } else if (limit === -1) { 1.189 + limit = 0; 1.190 + } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { 1.191 + limit = MAX_PAYLOAD_SIZE; 1.192 + } 1.193 + 1.194 + // * Looking at the first frame in the queue without pulling it out if possible. This will save 1.195 + // a costly unshift if the frame proves to be too large to return. 1.196 + var firstInQueue = this._readableState.buffer[0]; 1.197 + var frame = firstInQueue || Duplex.prototype.read.call(this); 1.198 + 1.199 + if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) { 1.200 + if (firstInQueue) { 1.201 + Duplex.prototype.read.call(this); 1.202 + } 1.203 + return frame; 1.204 + } 1.205 + 1.206 + else if (limit <= 0) { 1.207 + if (!firstInQueue) { 1.208 + this.unshift(frame); 1.209 + } 1.210 + return null; 1.211 + } 1.212 + 1.213 + else { 1.214 + this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, 1.215 + 'Splitting out forwardable part of a DATA frame.'); 1.216 + var forwardable = { 1.217 + type: 'DATA', 1.218 + flags: {}, 1.219 + stream: frame.stream, 1.220 + data: frame.data.slice(0, limit) 1.221 + }; 1.222 + frame.data = frame.data.slice(limit); 1.223 + 1.224 + if (!firstInQueue) { 1.225 + this.unshift(frame); 1.226 + } 1.227 + return forwardable; 1.228 + } 1.229 +}; 1.230 + 1.231 +// `_parentPush` pushes the given `frame` into the output queue 1.232 +Flow.prototype._parentPush = function _parentPush(frame) { 1.233 + this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); 1.234 + 1.235 + if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { 1.236 + this._log.trace({ window: this._window, by: frame.data.length }, 1.237 + 'Decreasing flow control window size.'); 1.238 + this._window -= frame.data.length; 1.239 + assert(this._window >= 0); 1.240 + } 1.241 + 1.242 + return Duplex.prototype.push.call(this, frame); 1.243 +}; 1.244 + 1.245 +// `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. 1.246 +// It is capable of splitting DATA frames into smaller parts, if the window size is not enough to 1.247 +// push the whole frame. The return value is similar to `push` except that it returns `null` if it 1.248 +// did not push the whole frame to the output queue (but maybe it did push part of the frame). 1.249 +Flow.prototype._push = function _push(frame) { 1.250 + var data = frame && (frame.type === 'DATA') && frame.data; 1.251 + 1.252 + if (!data || (data.length <= this._window)) { 1.253 + return this._parentPush(frame); 1.254 + } 1.255 + 1.256 + else if (this._window <= 0) { 1.257 + return null; 1.258 + } 1.259 + 1.260 + else { 1.261 + this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, 1.262 + 'Splitting out forwardable part of a DATA frame.'); 1.263 + frame.data = data.slice(this._window); 1.264 + this._parentPush({ 1.265 + type: 'DATA', 1.266 + flags: {}, 1.267 + stream: frame.stream, 1.268 + data: data.slice(0, this._window) 1.269 + }); 1.270 + return null; 1.271 + } 1.272 +}; 1.273 + 1.274 +// Push `frame` into the flow control queue, or if it's empty, then directly into the output queue 1.275 +Flow.prototype.push = function push(frame) { 1.276 + if (frame === null) { 1.277 + this._log.debug('Enqueueing outgoing End Of Stream'); 1.278 + } else { 1.279 + this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); 1.280 + } 1.281 + 1.282 + var moreNeeded = null; 1.283 + if (this._queue.length === 0) { 1.284 + moreNeeded = this._push(frame); 1.285 + } 1.286 + 1.287 + if (moreNeeded === null) { 1.288 + this._queue.push(frame); 1.289 + } 1.290 + 1.291 + return moreNeeded; 1.292 +}; 1.293 + 1.294 +// `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the 1.295 +// [Stream](stream.html) class to mark the last frame with END_STREAM flag. 1.296 +Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { 1.297 + var readableQueue = this._readableState.buffer; 1.298 + return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; 1.299 +}; 1.300 + 1.301 +// Outgoing frames - managing the window size 1.302 +// ------------------------------------------ 1.303 + 1.304 +// Flow control window size is manipulated using the `_increaseWindow` method. 1.305 +// 1.306 +// * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled 1.307 +// again once disabled. Any attempt to re-enable flow control MUST be rejected with a 1.308 +// FLOW_CONTROL_ERROR error code. 1.309 +// * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken 1.310 +// depends on it being a stream or the connection itself. 1.311 + 1.312 +var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; 1.313 + 1.314 +Flow.prototype._increaseWindow = function _increaseWindow(size) { 1.315 + if ((this._window === Infinity) && (size !== Infinity)) { 1.316 + this._log.error('Trying to increase flow control window after flow control was turned off.'); 1.317 + this.emit('error', 'FLOW_CONTROL_ERROR'); 1.318 + } else { 1.319 + this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); 1.320 + this._window += size; 1.321 + if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { 1.322 + this._log.error('Flow control window grew too large.'); 1.323 + this.emit('error', 'FLOW_CONTROL_ERROR'); 1.324 + } else { 1.325 + this.emit('window_update'); 1.326 + } 1.327 + } 1.328 +}; 1.329 + 1.330 +// The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It 1.331 +// modifies the flow control window: 1.332 +// 1.333 +// * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the 1.334 +// END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL 1.335 +// flag set is ignored. 1.336 +// * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount 1.337 +// specified in the frame. 1.338 +Flow.prototype._updateWindow = function _updateWindow(frame) { 1.339 + this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); 1.340 +}; 1.341 + 1.342 +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the 1.343 +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by 1.344 +// calling the `setInitialWindow` method. The window size has to be modified by the difference 1.345 +// between the new value and the old value. 1.346 +Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { 1.347 + this._increaseWindow(initialWindow - this._initialWindow); 1.348 + this._initialWindow = initialWindow; 1.349 +};