|
1 var assert = require('assert'); |
|
2 |
|
3 // The Flow class |
|
4 // ============== |
|
5 |
|
6 // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be |
|
7 // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). |
|
8 // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex |
|
9 |
|
10 var Duplex = require('stream').Duplex; |
|
11 |
|
12 exports.Flow = Flow; |
|
13 |
|
14 // Public API |
|
15 // ---------- |
|
16 |
|
17 // * **Event: 'error' (type)**: signals an error |
|
18 // |
|
19 // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* |
|
20 // ([as described in the standard][1]) using this method |
|
21 // |
|
22 // [1]: http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-6.9.2 |
|
23 |
|
24 // API for child classes |
|
25 // --------------------- |
|
26 |
|
27 // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames |
|
28 // with the given `flowControlId` (or every update frame if not given) |
|
29 // |
|
30 // * **_send()**: called when more frames should be pushed. The child class is expected to override |
|
31 // this (instead of the `_read` method of the Duplex class). |
|
32 // |
|
33 // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is |
|
34 // expected to override this (instead of the `_write` method of the Duplex class). |
|
35 // |
|
36 // * **push(frame): bool**: schedules `frame` for sending. |
|
37 // |
|
38 // Returns `true` if it needs more frames in the output queue, `false` if the output queue is |
|
39 // full, and `null` if did not push the frame into the output queue (instead, it pushed it into |
|
40 // the flow control queue). |
|
41 // |
|
42 // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA |
|
43 // frames, length of the payload for DATA frames) of the returned frame will be under `limit`. |
|
44 // Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the |
|
45 // same thing as [in the original API](http://nodejs.org/api/stream.html#stream_stream_read_0). |
|
46 // |
|
47 // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers |
|
48 // |
|
49 // * **_log**: the Flow class uses the `_log` object of the parent |
|
50 |
|
51 // Constructor |
|
52 // ----------- |
|
53 |
|
54 // When a HTTP/2.0 connection is first established, new streams are created with an initial flow |
|
55 // control window size of 65535 bytes. |
|
56 var INITIAL_WINDOW_SIZE = 65535; |
|
57 |
|
58 // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. |
|
59 function Flow(flowControlId) { |
|
60 Duplex.call(this, { objectMode: true }); |
|
61 |
|
62 this._window = this._initialWindow = INITIAL_WINDOW_SIZE; |
|
63 this._flowControlId = flowControlId; |
|
64 this._queue = []; |
|
65 this._ended = false; |
|
66 this._received = 0; |
|
67 } |
|
68 Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); |
|
69 |
|
70 // Incoming frames |
|
71 // --------------- |
|
72 |
|
73 // `_receive` is called when there's an incoming frame. |
|
74 Flow.prototype._receive = function _receive(frame, callback) { |
|
75 throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); |
|
76 }; |
|
77 |
|
78 // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s |
|
79 // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the |
|
80 // incoming frame is a WINDOW_UPDATE. |
|
81 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 |
|
82 Flow.prototype._write = function _write(frame, encoding, callback) { |
|
83 if (frame.flags.END_STREAM || (frame.type === 'RST_STREAM')) { |
|
84 this._ended = true; |
|
85 } |
|
86 |
|
87 if ((frame.type === 'DATA') && (frame.data.length > 0)) { |
|
88 this._receive(frame, function() { |
|
89 this._received += frame.data.length; |
|
90 if (!this._restoreWindowTimer) { |
|
91 this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); |
|
92 } |
|
93 callback(); |
|
94 }.bind(this)); |
|
95 } |
|
96 |
|
97 else { |
|
98 this._receive(frame, callback); |
|
99 } |
|
100 |
|
101 if ((frame.type === 'WINDOW_UPDATE') && |
|
102 ((this._flowControlId === undefined) || (frame.stream === this._flowControlId))) { |
|
103 this._updateWindow(frame); |
|
104 } |
|
105 }; |
|
106 |
|
107 // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends |
|
108 // a WINDOW_UPDATE that restores the flow control window of the remote end. |
|
109 Flow.prototype._restoreWindow = function _restoreWindow() { |
|
110 delete this._restoreWindowTimer; |
|
111 if (!this._ended && (this._received > 0)) { |
|
112 this.push({ |
|
113 type: 'WINDOW_UPDATE', |
|
114 flags: {}, |
|
115 stream: this._flowControlId, |
|
116 window_size: this._received |
|
117 }); |
|
118 this._received = 0; |
|
119 } |
|
120 }; |
|
121 |
|
122 // Outgoing frames - sending procedure |
|
123 // ----------------------------------- |
|
124 |
|
125 // flow |
|
126 // +-------------------------------------------------+ |
|
127 // | | |
|
128 // +--------+ +---------+ | |
|
129 // read() | output | _read() | flow | _send() | |
|
130 // <----------| |<----------| control |<------------- | |
|
131 // | buffer | | buffer | | |
|
132 // +--------+ +---------+ | |
|
133 // | input | | |
|
134 // ---------->| |-----------------------------------> | |
|
135 // write() | buffer | _write() _receive() | |
|
136 // +--------+ | |
|
137 // | | |
|
138 // +-------------------------------------------------+ |
|
139 |
|
140 // `_send` is called when more frames should be pushed to the output buffer. |
|
141 Flow.prototype._send = function _send() { |
|
142 throw new Error('The _send() method has to be overridden by the child class!'); |
|
143 }; |
|
144 |
|
145 // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more |
|
146 // items in the output queue. |
|
147 // [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 |
|
148 Flow.prototype._read = function _read() { |
|
149 // * if the flow control queue is empty, then let the user push more frames |
|
150 if (this._queue.length === 0) { |
|
151 this._send(); |
|
152 } |
|
153 |
|
154 // * if there are items in the flow control queue, then let's put them into the output queue (to |
|
155 // the extent it is possible with respect to the window size and output queue feedback) |
|
156 else if (this._window > 0) { |
|
157 this._readableState.sync = true; // to avoid reentrant calls |
|
158 do { |
|
159 var moreNeeded = this._push(this._queue[0]); |
|
160 if (moreNeeded !== null) { |
|
161 this._queue.shift(); |
|
162 } |
|
163 } while (moreNeeded && (this._queue.length > 0)); |
|
164 this._readableState.sync = false; |
|
165 |
|
166 assert((moreNeeded == false) || // * output queue is full |
|
167 (this._queue.length === 0) || // * flow control queue is empty |
|
168 (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update |
|
169 } |
|
170 |
|
171 // * otherwise, come back when the flow control window is positive |
|
172 else { |
|
173 this.once('window_update', this._read); |
|
174 } |
|
175 }; |
|
176 |
|
177 var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 |
|
178 |
|
179 // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control |
|
180 // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will |
|
181 // be under `limit`. |
|
182 Flow.prototype.read = function read(limit) { |
|
183 if (limit === 0) { |
|
184 return Duplex.prototype.read.call(this, 0); |
|
185 } else if (limit === -1) { |
|
186 limit = 0; |
|
187 } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { |
|
188 limit = MAX_PAYLOAD_SIZE; |
|
189 } |
|
190 |
|
191 // * Looking at the first frame in the queue without pulling it out if possible. This will save |
|
192 // a costly unshift if the frame proves to be too large to return. |
|
193 var firstInQueue = this._readableState.buffer[0]; |
|
194 var frame = firstInQueue || Duplex.prototype.read.call(this); |
|
195 |
|
196 if ((frame === null) || (frame.type !== 'DATA') || (frame.data.length <= limit)) { |
|
197 if (firstInQueue) { |
|
198 Duplex.prototype.read.call(this); |
|
199 } |
|
200 return frame; |
|
201 } |
|
202 |
|
203 else if (limit <= 0) { |
|
204 if (!firstInQueue) { |
|
205 this.unshift(frame); |
|
206 } |
|
207 return null; |
|
208 } |
|
209 |
|
210 else { |
|
211 this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, |
|
212 'Splitting out forwardable part of a DATA frame.'); |
|
213 var forwardable = { |
|
214 type: 'DATA', |
|
215 flags: {}, |
|
216 stream: frame.stream, |
|
217 data: frame.data.slice(0, limit) |
|
218 }; |
|
219 frame.data = frame.data.slice(limit); |
|
220 |
|
221 if (!firstInQueue) { |
|
222 this.unshift(frame); |
|
223 } |
|
224 return forwardable; |
|
225 } |
|
226 }; |
|
227 |
|
228 // `_parentPush` pushes the given `frame` into the output queue |
|
229 Flow.prototype._parentPush = function _parentPush(frame) { |
|
230 this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); |
|
231 |
|
232 if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { |
|
233 this._log.trace({ window: this._window, by: frame.data.length }, |
|
234 'Decreasing flow control window size.'); |
|
235 this._window -= frame.data.length; |
|
236 assert(this._window >= 0); |
|
237 } |
|
238 |
|
239 return Duplex.prototype.push.call(this, frame); |
|
240 }; |
|
241 |
|
242 // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. |
|
243 // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to |
|
244 // push the whole frame. The return value is similar to `push` except that it returns `null` if it |
|
245 // did not push the whole frame to the output queue (but maybe it did push part of the frame). |
|
246 Flow.prototype._push = function _push(frame) { |
|
247 var data = frame && (frame.type === 'DATA') && frame.data; |
|
248 |
|
249 if (!data || (data.length <= this._window)) { |
|
250 return this._parentPush(frame); |
|
251 } |
|
252 |
|
253 else if (this._window <= 0) { |
|
254 return null; |
|
255 } |
|
256 |
|
257 else { |
|
258 this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, |
|
259 'Splitting out forwardable part of a DATA frame.'); |
|
260 frame.data = data.slice(this._window); |
|
261 this._parentPush({ |
|
262 type: 'DATA', |
|
263 flags: {}, |
|
264 stream: frame.stream, |
|
265 data: data.slice(0, this._window) |
|
266 }); |
|
267 return null; |
|
268 } |
|
269 }; |
|
270 |
|
271 // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue |
|
272 Flow.prototype.push = function push(frame) { |
|
273 if (frame === null) { |
|
274 this._log.debug('Enqueueing outgoing End Of Stream'); |
|
275 } else { |
|
276 this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); |
|
277 } |
|
278 |
|
279 var moreNeeded = null; |
|
280 if (this._queue.length === 0) { |
|
281 moreNeeded = this._push(frame); |
|
282 } |
|
283 |
|
284 if (moreNeeded === null) { |
|
285 this._queue.push(frame); |
|
286 } |
|
287 |
|
288 return moreNeeded; |
|
289 }; |
|
290 |
|
291 // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the |
|
292 // [Stream](stream.html) class to mark the last frame with END_STREAM flag. |
|
293 Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { |
|
294 var readableQueue = this._readableState.buffer; |
|
295 return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; |
|
296 }; |
|
297 |
|
298 // Outgoing frames - managing the window size |
|
299 // ------------------------------------------ |
|
300 |
|
301 // Flow control window size is manipulated using the `_increaseWindow` method. |
|
302 // |
|
303 // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled |
|
304 // again once disabled. Any attempt to re-enable flow control MUST be rejected with a |
|
305 // FLOW_CONTROL_ERROR error code. |
|
306 // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken |
|
307 // depends on it being a stream or the connection itself. |
|
308 |
|
309 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; |
|
310 |
|
311 Flow.prototype._increaseWindow = function _increaseWindow(size) { |
|
312 if ((this._window === Infinity) && (size !== Infinity)) { |
|
313 this._log.error('Trying to increase flow control window after flow control was turned off.'); |
|
314 this.emit('error', 'FLOW_CONTROL_ERROR'); |
|
315 } else { |
|
316 this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); |
|
317 this._window += size; |
|
318 if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { |
|
319 this._log.error('Flow control window grew too large.'); |
|
320 this.emit('error', 'FLOW_CONTROL_ERROR'); |
|
321 } else { |
|
322 this.emit('window_update'); |
|
323 } |
|
324 } |
|
325 }; |
|
326 |
|
327 // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It |
|
328 // modifies the flow control window: |
|
329 // |
|
330 // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the |
|
331 // END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL |
|
332 // flag set is ignored. |
|
333 // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount |
|
334 // specified in the frame. |
|
335 Flow.prototype._updateWindow = function _updateWindow(frame) { |
|
336 this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); |
|
337 }; |
|
338 |
|
339 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the |
|
340 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by |
|
341 // calling the `setInitialWindow` method. The window size has to be modified by the difference |
|
342 // between the new value and the old value. |
|
343 Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { |
|
344 this._increaseWindow(initialWindow - this._initialWindow); |
|
345 this._initialWindow = initialWindow; |
|
346 }; |