|
1 var assert = require('assert'); |
|
2 |
|
3 // The Connection class |
|
4 // ==================== |
|
5 |
|
6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport |
|
7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a |
|
8 // [Flow](flow.html) subclass. |
|
9 |
|
10 var Flow = require('./flow').Flow; |
|
11 |
|
12 exports.Connection = Connection; |
|
13 |
|
14 // Public API |
|
15 // ---------- |
|
16 |
|
17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection |
|
18 // |
|
19 // * **Event: 'error' (type)**: signals a connection level error made by the other end |
|
20 // |
|
21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error |
|
22 // code other than NO_ERROR |
|
23 // |
|
24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream |
|
25 // |
|
26 // * **createStream(): stream**: initiate a new stream |
|
27 // |
|
28 // * **set(settings, callback)**: change the value of one or more settings according to the |
|
29 // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. |
|
30 // |
|
31 // * **ping([callback])**: send a ping and call callback when the answer arrives |
|
32 // |
|
33 // * **close([error])**: close the stream with an error code |
|
34 |
|
35 // Constructor |
|
36 // ----------- |
|
37 |
|
38 // The main aspects of managing the connection are: |
|
39 function Connection(log, firstStreamId, settings) { |
|
40 // * initializing the base class |
|
41 Flow.call(this, 0); |
|
42 |
|
43 // * logging: every method uses the common logger object |
|
44 this._log = log.child({ component: 'connection' }); |
|
45 |
|
46 // * stream management |
|
47 this._initializeStreamManagement(firstStreamId); |
|
48 |
|
49 // * lifecycle management |
|
50 this._initializeLifecycleManagement(); |
|
51 |
|
52 // * flow control |
|
53 this._initializeFlowControl(); |
|
54 |
|
55 // * settings management |
|
56 this._initializeSettingsManagement(settings); |
|
57 |
|
58 // * multiplexing |
|
59 this._initializeMultiplexing(); |
|
60 } |
|
61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); |
|
62 |
|
63 // Overview |
|
64 // -------- |
|
65 |
|
66 // | ^ | ^ |
|
67 // v | v | |
|
68 // +--------------+ +--------------+ |
|
69 // +---| stream1 |---| stream2 |---- .... ---+ |
|
70 // | | +----------+ | | +----------+ | | |
|
71 // | | | stream1. | | | | stream2. | | | |
|
72 // | +-| upstream |-+ +-| upstream |-+ | |
|
73 // | +----------+ +----------+ | |
|
74 // | | ^ | ^ | |
|
75 // | v | v | | |
|
76 // | +-----+-------------+-----+-------- .... | |
|
77 // | ^ | | | | |
|
78 // | | v | | | |
|
79 // | +--------------+ | | | |
|
80 // | | stream0 | | | | |
|
81 // | | connection | | | | |
|
82 // | | management | multiplexing | |
|
83 // | +--------------+ flow control | |
|
84 // | | ^ | |
|
85 // | _read() | | _write() | |
|
86 // | v | | |
|
87 // | +------------+ +-----------+ | |
|
88 // | |output queue| |input queue| | |
|
89 // +----------------+------------+-+-----------+-----------------+ |
|
90 // | ^ |
|
91 // read() | | write() |
|
92 // v | |
|
93 |
|
94 // Stream management |
|
95 // ----------------- |
|
96 |
|
97 var Stream = require('./stream').Stream; |
|
98 |
|
99 // Initialization: |
|
100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { |
|
101 // * streams are stored in two data structures: |
|
102 // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. |
|
103 // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. |
|
104 this._streamIds = []; |
|
105 this._streamPriorities = []; |
|
106 |
|
107 // * The next outbound stream ID and the last inbound stream id |
|
108 this._nextStreamId = firstStreamId; |
|
109 this._lastIncomingStream = 0; |
|
110 |
|
111 // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID |
|
112 this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; |
|
113 |
|
114 // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can |
|
115 // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. |
|
116 this._streamSlotsFree = Infinity; |
|
117 this._streamLimit = Infinity; |
|
118 this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); |
|
119 }; |
|
120 |
|
121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It |
|
122 // broadcasts the message by creating an event on it. |
|
123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { |
|
124 if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || |
|
125 (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) { |
|
126 this._log.debug({ frame: frame }, 'Receiving connection level frame'); |
|
127 this.emit(frame.type, frame); |
|
128 } else { |
|
129 this._log.error({ frame: frame }, 'Invalid connection level frame'); |
|
130 this.emit('error', 'PROTOCOL_ERROR'); |
|
131 } |
|
132 }; |
|
133 |
|
134 // Methods to manage the stream slot pool: |
|
135 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { |
|
136 var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); |
|
137 this._streamSlotsFree += newStreamLimit - this._streamLimit; |
|
138 this._streamLimit = newStreamLimit; |
|
139 if (wakeup) { |
|
140 this.emit('wakeup'); |
|
141 } |
|
142 }; |
|
143 |
|
144 Connection.prototype._changeStreamCount = function _changeStreamCount(change) { |
|
145 if (change) { |
|
146 this._log.trace({ free: this._streamSlotsFree, change: change }, |
|
147 'Changing active stream count.'); |
|
148 var wakeup = (this._streamSlotsFree === 0) && (change < 0); |
|
149 this._streamSlotsFree -= change; |
|
150 if (wakeup) { |
|
151 this.emit('wakeup'); |
|
152 } |
|
153 } |
|
154 }; |
|
155 |
|
156 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of |
|
157 // an outbound stream) consists of three steps: |
|
158 // |
|
159 // 1. var stream = new Stream(this._log); |
|
160 // 2. this._allocateId(stream, id); |
|
161 // 2. this._allocatePriority(stream); |
|
162 |
|
163 // Allocating an ID to a stream |
|
164 Connection.prototype._allocateId = function _allocateId(stream, id) { |
|
165 // * initiated stream without definite ID |
|
166 if (id === undefined) { |
|
167 id = this._nextStreamId; |
|
168 this._nextStreamId += 2; |
|
169 } |
|
170 |
|
171 // * incoming stream with a legitim ID (larger than any previous and different parity than ours) |
|
172 else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { |
|
173 this._lastIncomingStream = id; |
|
174 } |
|
175 |
|
176 // * incoming stream with invalid ID |
|
177 else { |
|
178 this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, |
|
179 'Invalid incoming stream ID.'); |
|
180 this.emit('error', 'PROTOCOL_ERROR'); |
|
181 return undefined; |
|
182 } |
|
183 |
|
184 assert(!(id in this._streamIds)); |
|
185 |
|
186 // * adding to `this._streamIds` |
|
187 this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); |
|
188 this._streamIds[id] = stream; |
|
189 stream.id = id; |
|
190 this.emit('new_stream', stream, id); |
|
191 |
|
192 // * handling stream errors as connection errors |
|
193 stream.on('error', this.emit.bind(this, 'error')); |
|
194 |
|
195 return id; |
|
196 }; |
|
197 |
|
198 // Allocating a priority to a stream, and managing priority changes |
|
199 Connection.prototype._allocatePriority = function _allocatePriority(stream) { |
|
200 this._log.trace({ s: stream }, 'Allocating priority for stream.'); |
|
201 this._insert(stream, stream._priority); |
|
202 stream.on('priority', this._reprioritize.bind(this, stream)); |
|
203 stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); |
|
204 this.emit('wakeup'); |
|
205 }; |
|
206 |
|
207 Connection.prototype._insert = function _insert(stream, priority) { |
|
208 if (priority in this._streamPriorities) { |
|
209 this._streamPriorities[priority].push(stream); |
|
210 } else { |
|
211 this._streamPriorities[priority] = [stream]; |
|
212 } |
|
213 }; |
|
214 |
|
215 Connection.prototype._reprioritize = function _reprioritize(stream, priority) { |
|
216 var bucket = this._streamPriorities[stream._priority]; |
|
217 var index = bucket.indexOf(stream); |
|
218 assert(index !== -1); |
|
219 bucket.splice(index, 1); |
|
220 if (bucket.length === 0) { |
|
221 delete this._streamPriorities[stream._priority]; |
|
222 } |
|
223 |
|
224 this._insert(stream, priority); |
|
225 }; |
|
226 |
|
227 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to |
|
228 // a previously nonexistent stream. |
|
229 Connection.prototype._createIncomingStream = function _createIncomingStream(id) { |
|
230 this._log.debug({ stream_id: id }, 'New incoming stream.'); |
|
231 |
|
232 var stream = new Stream(this._log); |
|
233 this._allocateId(stream, id); |
|
234 this._allocatePriority(stream); |
|
235 this.emit('stream', stream, id); |
|
236 |
|
237 return stream; |
|
238 }; |
|
239 |
|
240 // Creating an *outbound* stream |
|
241 Connection.prototype.createStream = function createStream() { |
|
242 this._log.trace('Creating new outbound stream.'); |
|
243 |
|
244 // * Receiving is enabled immediately, and an ID gets assigned to the stream |
|
245 var stream = new Stream(this._log); |
|
246 this._allocatePriority(stream); |
|
247 |
|
248 return stream; |
|
249 }; |
|
250 |
|
251 // Multiplexing |
|
252 // ------------ |
|
253 |
|
254 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { |
|
255 this.on('window_update', this.emit.bind(this, 'wakeup')); |
|
256 this._sendScheduled = false; |
|
257 this._firstFrameReceived = false; |
|
258 }; |
|
259 |
|
260 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented |
|
261 // by child classes. It reads frames from streams and pushes them to the output buffer. |
|
262 Connection.prototype._send = function _send(immediate) { |
|
263 // * Do not do anything if the connection is already closed |
|
264 if (this._closed) { |
|
265 return; |
|
266 } |
|
267 |
|
268 // * Collapsing multiple calls in a turn into a single deferred call |
|
269 if (immediate) { |
|
270 this._sendScheduled = false; |
|
271 } else { |
|
272 if (!this._sendScheduled) { |
|
273 this._sendScheduled = true; |
|
274 setImmediate(this._send.bind(this, true)); |
|
275 } |
|
276 return; |
|
277 } |
|
278 |
|
279 this._log.trace('Starting forwarding frames from streams.'); |
|
280 |
|
281 // * Looping through priority `bucket`s in priority order. |
|
282 priority_loop: |
|
283 for (var priority in this._streamPriorities) { |
|
284 var bucket = this._streamPriorities[priority]; |
|
285 var nextBucket = []; |
|
286 |
|
287 // * Forwarding frames from buckets with round-robin scheduling. |
|
288 // 1. pulling out frame |
|
289 // 2. if there's no frame, skip this stream |
|
290 // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip |
|
291 // this stream |
|
292 // 4. adding stream to the bucket of the next round |
|
293 // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) |
|
294 // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream |
|
295 // 7. forwarding the frame, changing `streamCount` as appropriate |
|
296 // 8. stepping to the next stream if there's still more frame needed in the output buffer |
|
297 // 9. switching to the bucket of the next round |
|
298 while (bucket.length > 0) { |
|
299 for (var index = 0; index < bucket.length; index++) { |
|
300 var stream = bucket[index]; |
|
301 var frame = stream.upstream.read((this._window > 0) ? this._window : -1); |
|
302 |
|
303 if (!frame) { |
|
304 continue; |
|
305 } else if (frame.count_change > this._streamSlotsFree) { |
|
306 stream.upstream.unshift(frame); |
|
307 continue; |
|
308 } |
|
309 |
|
310 nextBucket.push(stream); |
|
311 |
|
312 if (frame.stream === undefined) { |
|
313 frame.stream = stream.id || this._allocateId(stream); |
|
314 } |
|
315 |
|
316 if (frame.type === 'PUSH_PROMISE') { |
|
317 this._allocatePriority(frame.promised_stream); |
|
318 frame.promised_stream = this._allocateId(frame.promised_stream); |
|
319 } |
|
320 |
|
321 this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); |
|
322 var moreNeeded = this.push(frame); |
|
323 this._changeStreamCount(frame.count_change); |
|
324 |
|
325 assert(moreNeeded !== null); // The frame shouldn't be unforwarded |
|
326 if (moreNeeded === false) { |
|
327 break priority_loop; |
|
328 } |
|
329 } |
|
330 |
|
331 bucket = nextBucket; |
|
332 nextBucket = []; |
|
333 } |
|
334 } |
|
335 |
|
336 // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event |
|
337 if (moreNeeded === undefined) { |
|
338 this.once('wakeup', this._send.bind(this)); |
|
339 } |
|
340 |
|
341 this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); |
|
342 }; |
|
343 |
|
344 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be |
|
345 // implemented by child classes. It forwards the given frame to the appropriate stream: |
|
346 Connection.prototype._receive = function _receive(frame, done) { |
|
347 this._log.trace({ frame: frame }, 'Forwarding incoming frame'); |
|
348 |
|
349 // * first frame needs to be checked by the `_onFirstFrameReceived` method |
|
350 if (!this._firstFrameReceived) { |
|
351 this._firstFrameReceived = true; |
|
352 this._onFirstFrameReceived(frame); |
|
353 } |
|
354 |
|
355 // * gets the appropriate stream from the stream registry |
|
356 var stream = this._streamIds[frame.stream]; |
|
357 |
|
358 // * or creates one if it's not in `this.streams` |
|
359 if (!stream) { |
|
360 stream = this._createIncomingStream(frame.stream); |
|
361 } |
|
362 |
|
363 // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream |
|
364 if (frame.type === 'PUSH_PROMISE') { |
|
365 frame.promised_stream = this._createIncomingStream(frame.promised_stream); |
|
366 } |
|
367 |
|
368 frame.count_change = this._changeStreamCount.bind(this); |
|
369 |
|
370 // * and writes it to the `stream`'s `upstream` |
|
371 stream.upstream.write(frame); |
|
372 |
|
373 done(); |
|
374 }; |
|
375 |
|
376 // Settings management |
|
377 // ------------------- |
|
378 |
|
379 var defaultSettings = { |
|
380 }; |
|
381 |
|
382 // Settings management initialization: |
|
383 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { |
|
384 // * Setting up the callback queue for setting acknowledgements |
|
385 this._settingsAckCallbacks = []; |
|
386 |
|
387 // * Sending the initial settings. |
|
388 this._log.debug({ settings: settings }, |
|
389 'Sending the first SETTINGS frame as part of the connection header.'); |
|
390 this.set(settings || defaultSettings); |
|
391 |
|
392 // * Forwarding SETTINGS frames to the `_receiveSettings` method |
|
393 this.on('SETTINGS', this._receiveSettings); |
|
394 }; |
|
395 |
|
396 // * Checking that the first frame the other endpoint sends is SETTINGS |
|
397 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { |
|
398 if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { |
|
399 this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); |
|
400 } else { |
|
401 this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); |
|
402 this.emit('error'); |
|
403 } |
|
404 }; |
|
405 |
|
406 // Handling of incoming SETTINGS frames. |
|
407 Connection.prototype._receiveSettings = function _receiveSettings(frame) { |
|
408 // * If it's an ACK, call the appropriate callback |
|
409 if (frame.flags.ACK) { |
|
410 var callback = this._settingsAckCallbacks.shift(); |
|
411 if (callback) { |
|
412 callback(); |
|
413 } |
|
414 } |
|
415 |
|
416 // * If it's a setting change request, then send an ACK and change the appropriate settings |
|
417 else { |
|
418 if (!this._closed) { |
|
419 this.push({ |
|
420 type: 'SETTINGS', |
|
421 flags: { ACK: true }, |
|
422 stream: 0, |
|
423 settings: {} |
|
424 }); |
|
425 } |
|
426 for (var name in frame.settings) { |
|
427 this.emit('RECEIVING_' + name, frame.settings[name]); |
|
428 } |
|
429 } |
|
430 }; |
|
431 |
|
432 // Changing one or more settings value and sending out a SETTINGS frame |
|
433 Connection.prototype.set = function set(settings, callback) { |
|
434 // * Calling the callback and emitting event when the change is acknowledges |
|
435 callback = callback || function noop() {}; |
|
436 var self = this; |
|
437 this._settingsAckCallbacks.push(function() { |
|
438 for (var name in settings) { |
|
439 self.emit('ACKNOWLEDGED_' + name, settings[name]); |
|
440 } |
|
441 callback(); |
|
442 }); |
|
443 |
|
444 // * Sending out the SETTINGS frame |
|
445 this.push({ |
|
446 type: 'SETTINGS', |
|
447 flags: { ACK: false }, |
|
448 stream: 0, |
|
449 settings: settings |
|
450 }); |
|
451 for (var name in settings) { |
|
452 this.emit('SENDING_' + name, settings[name]); |
|
453 } |
|
454 }; |
|
455 |
|
456 // Lifecycle management |
|
457 // -------------------- |
|
458 |
|
459 // The main responsibilities of lifecycle management code: |
|
460 // |
|
461 // * keeping the connection alive by |
|
462 // * sending PINGs when the connection is idle |
|
463 // * answering PINGs |
|
464 // * ending the connection |
|
465 |
|
466 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { |
|
467 this._pings = {}; |
|
468 this.on('PING', this._receivePing); |
|
469 this.on('GOAWAY', this._receiveGoaway); |
|
470 this._closed = false; |
|
471 }; |
|
472 |
|
473 // Generating a string of length 16 with random hexadecimal digits |
|
474 Connection.prototype._generatePingId = function _generatePingId() { |
|
475 do { |
|
476 var id = ''; |
|
477 for (var i = 0; i < 16; i++) { |
|
478 id += Math.floor(Math.random()*16).toString(16); |
|
479 } |
|
480 } while(id in this._pings); |
|
481 return id; |
|
482 }; |
|
483 |
|
484 // Sending a ping and calling `callback` when the answer arrives |
|
485 Connection.prototype.ping = function ping(callback) { |
|
486 var id = this._generatePingId(); |
|
487 var data = new Buffer(id, 'hex'); |
|
488 this._pings[id] = callback; |
|
489 |
|
490 this._log.debug({ data: data }, 'Sending PING.'); |
|
491 this.push({ |
|
492 type: 'PING', |
|
493 flags: { |
|
494 ACK: false |
|
495 }, |
|
496 stream: 0, |
|
497 data: data |
|
498 }); |
|
499 }; |
|
500 |
|
501 // Answering pings |
|
502 Connection.prototype._receivePing = function _receivePing(frame) { |
|
503 if (frame.flags.ACK) { |
|
504 var id = frame.data.toString('hex'); |
|
505 if (id in this._pings) { |
|
506 this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); |
|
507 var callback = this._pings[id]; |
|
508 if (callback) { |
|
509 callback(); |
|
510 } |
|
511 delete this._pings[id]; |
|
512 } else { |
|
513 this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); |
|
514 } |
|
515 |
|
516 } else { |
|
517 this._log.debug({ data: frame.data }, 'Answering PING.'); |
|
518 this.push({ |
|
519 type: 'PING', |
|
520 flags: { |
|
521 ACK: true |
|
522 }, |
|
523 stream: 0, |
|
524 data: frame.data |
|
525 }); |
|
526 } |
|
527 }; |
|
528 |
|
529 // Terminating the connection |
|
530 Connection.prototype.close = function close(error) { |
|
531 if (this._closed) { |
|
532 this._log.warn('Trying to close an already closed connection'); |
|
533 return; |
|
534 } |
|
535 |
|
536 this._log.debug({ error: error }, 'Closing the connection'); |
|
537 this.push({ |
|
538 type: 'GOAWAY', |
|
539 flags: {}, |
|
540 stream: 0, |
|
541 last_stream: this._lastIncomingStream, |
|
542 error: error || 'NO_ERROR' |
|
543 }); |
|
544 this.push(null); |
|
545 this._closed = true; |
|
546 }; |
|
547 |
|
548 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { |
|
549 this._log.debug({ error: frame.error }, 'Other end closed the connection'); |
|
550 this.push(null); |
|
551 this._closed = true; |
|
552 if (frame.error !== 'NO_ERROR') { |
|
553 this.emit('peerError', frame.error); |
|
554 } |
|
555 }; |
|
556 |
|
557 // Flow control |
|
558 // ------------ |
|
559 |
|
560 Connection.prototype._initializeFlowControl = function _initializeFlowControl() { |
|
561 // Handling of initial window size of individual streams. |
|
562 this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; |
|
563 this.on('new_stream', function(stream) { |
|
564 stream.upstream.setInitialWindow(this._initialStreamWindowSize); |
|
565 }); |
|
566 this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); |
|
567 this._streamIds[0].upstream.setInitialWindow = function noop() {}; |
|
568 }; |
|
569 |
|
570 // The initial connection flow control window is 65535 bytes. |
|
571 var INITIAL_STREAM_WINDOW_SIZE = 65535; |
|
572 |
|
573 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the |
|
574 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all |
|
575 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by |
|
576 // the difference between the new value and the old value. |
|
577 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { |
|
578 if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { |
|
579 this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); |
|
580 this.emit('error', 'FLOW_CONTROL_ERROR'); |
|
581 } else { |
|
582 this._log.debug({ size: size }, 'Changing stream initial window size.'); |
|
583 this._initialStreamWindowSize = size; |
|
584 this._streamIds.forEach(function(stream) { |
|
585 stream.upstream.setInitialWindow(size); |
|
586 }); |
|
587 } |
|
588 }; |