michael@0: var assert = require('assert'); michael@0: michael@0: // The Flow class michael@0: // ============== michael@0: michael@0: // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be michael@0: // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). michael@0: // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex michael@0: michael@0: var Duplex = require('stream').Duplex; michael@0: michael@0: exports.Flow = Flow; michael@0: michael@0: // Public API michael@0: // ---------- michael@0: michael@0: // * **Event: 'error' (type)**: signals an error michael@0: // michael@0: // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* michael@0: // ([as described in the standard][1]) using this method michael@0: // michael@0: // [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2 michael@0: michael@0: // API for child classes michael@0: // --------------------- michael@0: michael@0: // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames michael@0: // with the given `flowControlId` (or every update frame if not given) michael@0: // michael@0: // * **_send()**: called when more frames should be pushed. The child class is expected to override michael@0: // this (instead of the `_read` method of the Duplex class). michael@0: // michael@0: // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is michael@0: // expected to override this (instead of the `_write` method of the Duplex class). michael@0: // michael@0: // * **push(frame): bool**: schedules `frame` for sending. michael@0: // michael@0: // Returns `true` if it needs more frames in the output queue, `false` if the output queue is michael@0: // full, and `null` if did not push the frame into the output queue (instead, it pushed it into michael@0: // the flow control queue). michael@0: // michael@0: // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA michael@0: // frames, length of the payload for DATA frames) of the returned frame will be under `limit`. michael@0: // Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the michael@0: // same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0). michael@0: // michael@0: // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers michael@0: // michael@0: // * **_log**: the Flow class uses the `_log` object of the parent michael@0: michael@0: // Constructor michael@0: // ----------- michael@0: michael@0: // When a HTTP/2.0 connection is first established, new streams are created with an initial flow michael@0: // control window size of 65535 bytes. michael@0: var INITIAL_WINDOW_SIZE = 65535; michael@0: michael@0: // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. michael@0: function Flow(flowControlId) { michael@0: Duplex.call(this, { objectMode: true }); michael@0: michael@0: this._window = this._initialWindow = INITIAL_WINDOW_SIZE; michael@0: this._flowControlId = flowControlId; michael@0: this._queue = []; michael@0: this._ended = false; michael@0: this._received = 0; michael@0: } michael@0: Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); michael@0: michael@0: // Incoming frames michael@0: // --------------- michael@0: michael@0: // `_receive` is called when there's an incoming frame. michael@0: Flow.prototype._receive = function _receive(frame, callback) { michael@0: throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); michael@0: }; michael@0: michael@0: // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s michael@0: // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the michael@0: // incoming frame is a WINDOW_UPDATE. michael@0: // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 michael@0: Flow.prototype._write = function _write(frame, encoding, callback) { michael@0: if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) { michael@0: this._ended = true; michael@0: } michael@0: michael@0: if ((frame.type === 'DATA') && (frame.data.length > 0)) { michael@0: this._receive(frame, function() { michael@0: this._received += frame.data.length; michael@0: if (!this._restoreWindowTimer) { michael@0: this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); michael@0: } michael@0: callback(); michael@0: }.bind(this)); michael@0: } michael@0: michael@0: else { michael@0: this._receive(frame, callback); michael@0: } michael@0: michael@0: if ((frame.type === 'WINDOW_UPDATE') && michael@0: ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) { michael@0: this._updateWindow(frame); michael@0: } michael@0: }; michael@0: michael@0: // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends michael@0: // a WINDOW_UPDATE that restores the flow control window of the remote end. michael@0: Flow.prototype._restoreWindow = function _restoreWindow() { michael@0: delete this._restoreWindowTimer; michael@0: if (!this._ended && (this._received > 0)) { michael@0: this.push({ michael@0: type: 'WINDOW_UPDATE', michael@0: flags: {}, michael@0: stream: this._flowControlId, michael@0: window_size: this._received michael@0: }); michael@0: this._received = 0; michael@0: } michael@0: }; michael@0: michael@0: // Outgoing frames - sending procedure michael@0: // ----------------------------------- michael@0: michael@0: // flow michael@0: // +-------------------------------------------------+ michael@0: // | | michael@0: // +--------+ +---------+ | michael@0: // read() | output | _read() | flow | _send() | michael@0: // <----------| |<----------| control |<------------- | michael@0: // | buffer | | buffer | | michael@0: // +--------+ +---------+ | michael@0: // | input | | michael@0: // ---------->| |-----------------------------------> | michael@0: // write() | buffer | _write() _receive() | michael@0: // +--------+ | michael@0: // | | michael@0: // +-------------------------------------------------+ michael@0: michael@0: // `_send` is called when more frames should be pushed to the output buffer. michael@0: Flow.prototype._send = function _send() { michael@0: throw new Error('The _send() method has to be overridden by the child class!'); michael@0: }; michael@0: michael@0: // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more michael@0: // items in the output queue. michael@0: // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 michael@0: Flow.prototype._read = function _read() { michael@0: // * if the flow control queue is empty, then let the user push more frames michael@0: if (this._queue.length === 0) { michael@0: this._send(); michael@0: } michael@0: michael@0: // * if there are items in the flow control queue, then let's put them into the output queue (to michael@0: // the extent it is possible with respect to the window size and output queue feedback) michael@0: else if (this._window > 0) { michael@0: this._readableState.sync = true; // to avoid reentrant calls michael@0: do { michael@0: var moreNeeded = this._push(this._queue[0]); michael@0: if (moreNeeded !== null) { michael@0: this._queue.shift(); michael@0: } michael@0: } while (moreNeeded && (this._queue.length > 0)); michael@0: this._readableState.sync = false; michael@0: michael@0: assert((moreNeeded == false) || // * output queue is full michael@0: (this._queue.length === 0) || // * flow control queue is empty michael@0: (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update michael@0: } michael@0: michael@0: // * otherwise, come back when the flow control window is positive michael@0: else { michael@0: this.once('window_update', this._read); michael@0: } michael@0: }; michael@0: michael@0: var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 michael@0: michael@0: // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control michael@0: // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will michael@0: // be under `limit`. michael@0: Flow.prototype.read = function read(limit) { michael@0: if (limit === 0) { michael@0: return Duplex.prototype.read.call(this, 0); michael@0: } else if (limit === -1) { michael@0: limit = 0; michael@0: } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { michael@0: limit = MAX_PAYLOAD_SIZE; michael@0: } michael@0: michael@0: // * Looking at the first frame in the queue without pulling it out if possible. This will save michael@0: // a costly unshift if the frame proves to be too large to return. michael@0: var firstInQueue = this._readableState.buffer[0]; michael@0: var frame = firstInQueue || Duplex.prototype.read.call(this); michael@0: michael@0: if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) { michael@0: if (firstInQueue) { michael@0: Duplex.prototype.read.call(this); michael@0: } michael@0: return frame; michael@0: } michael@0: michael@0: else if (limit <= 0) { michael@0: if (!firstInQueue) { michael@0: this.unshift(frame); michael@0: } michael@0: return null; michael@0: } michael@0: michael@0: else { michael@0: this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, michael@0: 'Splitting out forwardable part of a DATA frame.'); michael@0: var forwardable = { michael@0: type: 'DATA', michael@0: flags: {}, michael@0: stream: frame.stream, michael@0: data: frame.data.slice(0, limit) michael@0: }; michael@0: frame.data = frame.data.slice(limit); michael@0: michael@0: if (!firstInQueue) { michael@0: this.unshift(frame); michael@0: } michael@0: return forwardable; michael@0: } michael@0: }; michael@0: michael@0: // `_parentPush` pushes the given `frame` into the output queue michael@0: Flow.prototype._parentPush = function _parentPush(frame) { michael@0: this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); michael@0: michael@0: if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { michael@0: this._log.trace({ window: this._window, by: frame.data.length }, michael@0: 'Decreasing flow control window size.'); michael@0: this._window -= frame.data.length; michael@0: assert(this._window >= 0); michael@0: } michael@0: michael@0: return Duplex.prototype.push.call(this, frame); michael@0: }; michael@0: michael@0: // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. michael@0: // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to michael@0: // push the whole frame. The return value is similar to `push` except that it returns `null` if it michael@0: // did not push the whole frame to the output queue (but maybe it did push part of the frame). michael@0: Flow.prototype._push = function _push(frame) { michael@0: var data = frame && (frame.type === 'DATA') && frame.data; michael@0: michael@0: if (!data || (data.length <= this._window)) { michael@0: return this._parentPush(frame); michael@0: } michael@0: michael@0: else if (this._window <= 0) { michael@0: return null; michael@0: } michael@0: michael@0: else { michael@0: this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, michael@0: 'Splitting out forwardable part of a DATA frame.'); michael@0: frame.data = data.slice(this._window); michael@0: this._parentPush({ michael@0: type: 'DATA', michael@0: flags: {}, michael@0: stream: frame.stream, michael@0: data: data.slice(0, this._window) michael@0: }); michael@0: return null; michael@0: } michael@0: }; michael@0: michael@0: // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue michael@0: Flow.prototype.push = function push(frame) { michael@0: if (frame === null) { michael@0: this._log.debug('Enqueueing outgoing End Of Stream'); michael@0: } else { michael@0: this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); michael@0: } michael@0: michael@0: var moreNeeded = null; michael@0: if (this._queue.length === 0) { michael@0: moreNeeded = this._push(frame); michael@0: } michael@0: michael@0: if (moreNeeded === null) { michael@0: this._queue.push(frame); michael@0: } michael@0: michael@0: return moreNeeded; michael@0: }; michael@0: michael@0: // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the michael@0: // [Stream](stream.html) class to mark the last frame with END_STREAM flag. michael@0: Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { michael@0: var readableQueue = this._readableState.buffer; michael@0: return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; michael@0: }; michael@0: michael@0: // Outgoing frames - managing the window size michael@0: // ------------------------------------------ michael@0: michael@0: // Flow control window size is manipulated using the `_increaseWindow` method. michael@0: // michael@0: // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled michael@0: // again once disabled. Any attempt to re-enable flow control MUST be rejected with a michael@0: // FLOW_CONTROL_ERROR error code. michael@0: // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken michael@0: // depends on it being a stream or the connection itself. michael@0: michael@0: var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; michael@0: michael@0: Flow.prototype._increaseWindow = function _increaseWindow(size) { michael@0: if ((this._window === Infinity) && (size !== Infinity)) { michael@0: this._log.error('Trying to increase flow control window after flow control was turned off.'); michael@0: this.emit('error', 'FLOW_CONTROL_ERROR'); michael@0: } else { michael@0: this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); michael@0: this._window += size; michael@0: if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { michael@0: this._log.error('Flow control window grew too large.'); michael@0: this.emit('error', 'FLOW_CONTROL_ERROR'); michael@0: } else { michael@0: this.emit('window_update'); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It michael@0: // modifies the flow control window: michael@0: // michael@0: // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the michael@0: // END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL michael@0: // flag set is ignored. michael@0: // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount michael@0: // specified in the frame. michael@0: Flow.prototype._updateWindow = function _updateWindow(frame) { michael@0: this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); michael@0: }; michael@0: michael@0: // A SETTINGS frame can alter the initial flow control window size for all current streams. When the michael@0: // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by michael@0: // calling the `setInitialWindow` method. The window size has to be modified by the difference michael@0: // between the new value and the old value. michael@0: Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { michael@0: this._increaseWindow(initialWindow - this._initialWindow); michael@0: this._initialWindow = initialWindow; michael@0: };