1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/connection.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,588 @@ 1.4 +var assert = require('assert'); 1.5 + 1.6 +// The Connection class 1.7 +// ==================== 1.8 + 1.9 +// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport 1.10 +// stream (TCP stream). It operates by sending and receiving frames and is implemented as a 1.11 +// [Flow](flow.html) subclass. 1.12 + 1.13 +var Flow = require('./flow').Flow; 1.14 + 1.15 +exports.Connection = Connection; 1.16 + 1.17 +// Public API 1.18 +// ---------- 1.19 + 1.20 +// * **new Connection(log, firstStreamId, settings)**: create a new Connection 1.21 +// 1.22 +// * **Event: 'error' (type)**: signals a connection level error made by the other end 1.23 +// 1.24 +// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error 1.25 +// code other than NO_ERROR 1.26 +// 1.27 +// * **Event: 'stream' (stream)**: signals that there's an incoming stream 1.28 +// 1.29 +// * **createStream(): stream**: initiate a new stream 1.30 +// 1.31 +// * **set(settings, callback)**: change the value of one or more settings according to the 1.32 +// key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. 1.33 +// 1.34 +// * **ping([callback])**: send a ping and call callback when the answer arrives 1.35 +// 1.36 +// * **close([error])**: close the stream with an error code 1.37 + 1.38 +// Constructor 1.39 +// ----------- 1.40 + 1.41 +// The main aspects of managing the connection are: 1.42 +function Connection(log, firstStreamId, settings) { 1.43 + // * initializing the base class 1.44 + Flow.call(this, 0); 1.45 + 1.46 + // * logging: every method uses the common logger object 1.47 + this._log = log.child({ component: 'connection' }); 1.48 + 1.49 + // * stream management 1.50 + this._initializeStreamManagement(firstStreamId); 1.51 + 1.52 + // * lifecycle management 1.53 + this._initializeLifecycleManagement(); 1.54 + 1.55 + // * flow control 1.56 + this._initializeFlowControl(); 1.57 + 1.58 + // * settings management 1.59 + this._initializeSettingsManagement(settings); 1.60 + 1.61 + // * multiplexing 1.62 + this._initializeMultiplexing(); 1.63 +} 1.64 +Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); 1.65 + 1.66 +// Overview 1.67 +// -------- 1.68 + 1.69 +// | ^ | ^ 1.70 +// v | v | 1.71 +// +--------------+ +--------------+ 1.72 +// +---| stream1 |---| stream2 |---- .... ---+ 1.73 +// | | +----------+ | | +----------+ | | 1.74 +// | | | stream1. | | | | stream2. | | | 1.75 +// | +-| upstream |-+ +-| upstream |-+ | 1.76 +// | +----------+ +----------+ | 1.77 +// | | ^ | ^ | 1.78 +// | v | v | | 1.79 +// | +-----+-------------+-----+-------- .... | 1.80 +// | ^ | | | | 1.81 +// | | v | | | 1.82 +// | +--------------+ | | | 1.83 +// | | stream0 | | | | 1.84 +// | | connection | | | | 1.85 +// | | management | multiplexing | 1.86 +// | +--------------+ flow control | 1.87 +// | | ^ | 1.88 +// | _read() | | _write() | 1.89 +// | v | | 1.90 +// | +------------+ +-----------+ | 1.91 +// | |output queue| |input queue| | 1.92 +// +----------------+------------+-+-----------+-----------------+ 1.93 +// | ^ 1.94 +// read() | | write() 1.95 +// v | 1.96 + 1.97 +// Stream management 1.98 +// ----------------- 1.99 + 1.100 +var Stream = require('./stream').Stream; 1.101 + 1.102 +// Initialization: 1.103 +Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { 1.104 + // * streams are stored in two data structures: 1.105 + // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. 1.106 + // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. 1.107 + this._streamIds = []; 1.108 + this._streamPriorities = []; 1.109 + 1.110 + // * The next outbound stream ID and the last inbound stream id 1.111 + this._nextStreamId = firstStreamId; 1.112 + this._lastIncomingStream = 0; 1.113 + 1.114 + // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID 1.115 + this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; 1.116 + 1.117 + // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can 1.118 + // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. 1.119 + this._streamSlotsFree = Infinity; 1.120 + this._streamLimit = Infinity; 1.121 + this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); 1.122 +}; 1.123 + 1.124 +// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It 1.125 +// broadcasts the message by creating an event on it. 1.126 +Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { 1.127 + if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || 1.128 + (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) { 1.129 + this._log.debug({ frame: frame }, 'Receiving connection level frame'); 1.130 + this.emit(frame.type, frame); 1.131 + } else { 1.132 + this._log.error({ frame: frame }, 'Invalid connection level frame'); 1.133 + this.emit('error', 'PROTOCOL_ERROR'); 1.134 + } 1.135 +}; 1.136 + 1.137 +// Methods to manage the stream slot pool: 1.138 +Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { 1.139 + var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); 1.140 + this._streamSlotsFree += newStreamLimit - this._streamLimit; 1.141 + this._streamLimit = newStreamLimit; 1.142 + if (wakeup) { 1.143 + this.emit('wakeup'); 1.144 + } 1.145 +}; 1.146 + 1.147 +Connection.prototype._changeStreamCount = function _changeStreamCount(change) { 1.148 + if (change) { 1.149 + this._log.trace({ free: this._streamSlotsFree, change: change }, 1.150 + 'Changing active stream count.'); 1.151 + var wakeup = (this._streamSlotsFree === 0) && (change < 0); 1.152 + this._streamSlotsFree -= change; 1.153 + if (wakeup) { 1.154 + this.emit('wakeup'); 1.155 + } 1.156 + } 1.157 +}; 1.158 + 1.159 +// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of 1.160 +// an outbound stream) consists of three steps: 1.161 +// 1.162 +// 1. var stream = new Stream(this._log); 1.163 +// 2. this._allocateId(stream, id); 1.164 +// 2. this._allocatePriority(stream); 1.165 + 1.166 +// Allocating an ID to a stream 1.167 +Connection.prototype._allocateId = function _allocateId(stream, id) { 1.168 + // * initiated stream without definite ID 1.169 + if (id === undefined) { 1.170 + id = this._nextStreamId; 1.171 + this._nextStreamId += 2; 1.172 + } 1.173 + 1.174 + // * incoming stream with a legitim ID (larger than any previous and different parity than ours) 1.175 + else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { 1.176 + this._lastIncomingStream = id; 1.177 + } 1.178 + 1.179 + // * incoming stream with invalid ID 1.180 + else { 1.181 + this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, 1.182 + 'Invalid incoming stream ID.'); 1.183 + this.emit('error', 'PROTOCOL_ERROR'); 1.184 + return undefined; 1.185 + } 1.186 + 1.187 + assert(!(id in this._streamIds)); 1.188 + 1.189 + // * adding to `this._streamIds` 1.190 + this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); 1.191 + this._streamIds[id] = stream; 1.192 + stream.id = id; 1.193 + this.emit('new_stream', stream, id); 1.194 + 1.195 + // * handling stream errors as connection errors 1.196 + stream.on('error', this.emit.bind(this, 'error')); 1.197 + 1.198 + return id; 1.199 +}; 1.200 + 1.201 +// Allocating a priority to a stream, and managing priority changes 1.202 +Connection.prototype._allocatePriority = function _allocatePriority(stream) { 1.203 + this._log.trace({ s: stream }, 'Allocating priority for stream.'); 1.204 + this._insert(stream, stream._priority); 1.205 + stream.on('priority', this._reprioritize.bind(this, stream)); 1.206 + stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); 1.207 + this.emit('wakeup'); 1.208 +}; 1.209 + 1.210 +Connection.prototype._insert = function _insert(stream, priority) { 1.211 + if (priority in this._streamPriorities) { 1.212 + this._streamPriorities[priority].push(stream); 1.213 + } else { 1.214 + this._streamPriorities[priority] = [stream]; 1.215 + } 1.216 +}; 1.217 + 1.218 +Connection.prototype._reprioritize = function _reprioritize(stream, priority) { 1.219 + var bucket = this._streamPriorities[stream._priority]; 1.220 + var index = bucket.indexOf(stream); 1.221 + assert(index !== -1); 1.222 + bucket.splice(index, 1); 1.223 + if (bucket.length === 0) { 1.224 + delete this._streamPriorities[stream._priority]; 1.225 + } 1.226 + 1.227 + this._insert(stream, priority); 1.228 +}; 1.229 + 1.230 +// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to 1.231 +// a previously nonexistent stream. 1.232 +Connection.prototype._createIncomingStream = function _createIncomingStream(id) { 1.233 + this._log.debug({ stream_id: id }, 'New incoming stream.'); 1.234 + 1.235 + var stream = new Stream(this._log); 1.236 + this._allocateId(stream, id); 1.237 + this._allocatePriority(stream); 1.238 + this.emit('stream', stream, id); 1.239 + 1.240 + return stream; 1.241 +}; 1.242 + 1.243 +// Creating an *outbound* stream 1.244 +Connection.prototype.createStream = function createStream() { 1.245 + this._log.trace('Creating new outbound stream.'); 1.246 + 1.247 + // * Receiving is enabled immediately, and an ID gets assigned to the stream 1.248 + var stream = new Stream(this._log); 1.249 + this._allocatePriority(stream); 1.250 + 1.251 + return stream; 1.252 +}; 1.253 + 1.254 +// Multiplexing 1.255 +// ------------ 1.256 + 1.257 +Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { 1.258 + this.on('window_update', this.emit.bind(this, 'wakeup')); 1.259 + this._sendScheduled = false; 1.260 + this._firstFrameReceived = false; 1.261 +}; 1.262 + 1.263 +// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented 1.264 +// by child classes. It reads frames from streams and pushes them to the output buffer. 1.265 +Connection.prototype._send = function _send(immediate) { 1.266 + // * Do not do anything if the connection is already closed 1.267 + if (this._closed) { 1.268 + return; 1.269 + } 1.270 + 1.271 + // * Collapsing multiple calls in a turn into a single deferred call 1.272 + if (immediate) { 1.273 + this._sendScheduled = false; 1.274 + } else { 1.275 + if (!this._sendScheduled) { 1.276 + this._sendScheduled = true; 1.277 + setImmediate(this._send.bind(this, true)); 1.278 + } 1.279 + return; 1.280 + } 1.281 + 1.282 + this._log.trace('Starting forwarding frames from streams.'); 1.283 + 1.284 + // * Looping through priority `bucket`s in priority order. 1.285 +priority_loop: 1.286 + for (var priority in this._streamPriorities) { 1.287 + var bucket = this._streamPriorities[priority]; 1.288 + var nextBucket = []; 1.289 + 1.290 + // * Forwarding frames from buckets with round-robin scheduling. 1.291 + // 1. pulling out frame 1.292 + // 2. if there's no frame, skip this stream 1.293 + // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip 1.294 + // this stream 1.295 + // 4. adding stream to the bucket of the next round 1.296 + // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) 1.297 + // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream 1.298 + // 7. forwarding the frame, changing `streamCount` as appropriate 1.299 + // 8. stepping to the next stream if there's still more frame needed in the output buffer 1.300 + // 9. switching to the bucket of the next round 1.301 + while (bucket.length > 0) { 1.302 + for (var index = 0; index < bucket.length; index++) { 1.303 + var stream = bucket[index]; 1.304 + var frame = stream.upstream.read((this._window > 0) ? this._window : -1); 1.305 + 1.306 + if (!frame) { 1.307 + continue; 1.308 + } else if (frame.count_change > this._streamSlotsFree) { 1.309 + stream.upstream.unshift(frame); 1.310 + continue; 1.311 + } 1.312 + 1.313 + nextBucket.push(stream); 1.314 + 1.315 + if (frame.stream === undefined) { 1.316 + frame.stream = stream.id || this._allocateId(stream); 1.317 + } 1.318 + 1.319 + if (frame.type === 'PUSH_PROMISE') { 1.320 + this._allocatePriority(frame.promised_stream); 1.321 + frame.promised_stream = this._allocateId(frame.promised_stream); 1.322 + } 1.323 + 1.324 + this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); 1.325 + var moreNeeded = this.push(frame); 1.326 + this._changeStreamCount(frame.count_change); 1.327 + 1.328 + assert(moreNeeded !== null); // The frame shouldn't be unforwarded 1.329 + if (moreNeeded === false) { 1.330 + break priority_loop; 1.331 + } 1.332 + } 1.333 + 1.334 + bucket = nextBucket; 1.335 + nextBucket = []; 1.336 + } 1.337 + } 1.338 + 1.339 + // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event 1.340 + if (moreNeeded === undefined) { 1.341 + this.once('wakeup', this._send.bind(this)); 1.342 + } 1.343 + 1.344 + this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); 1.345 +}; 1.346 + 1.347 +// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be 1.348 +// implemented by child classes. It forwards the given frame to the appropriate stream: 1.349 +Connection.prototype._receive = function _receive(frame, done) { 1.350 + this._log.trace({ frame: frame }, 'Forwarding incoming frame'); 1.351 + 1.352 + // * first frame needs to be checked by the `_onFirstFrameReceived` method 1.353 + if (!this._firstFrameReceived) { 1.354 + this._firstFrameReceived = true; 1.355 + this._onFirstFrameReceived(frame); 1.356 + } 1.357 + 1.358 + // * gets the appropriate stream from the stream registry 1.359 + var stream = this._streamIds[frame.stream]; 1.360 + 1.361 + // * or creates one if it's not in `this.streams` 1.362 + if (!stream) { 1.363 + stream = this._createIncomingStream(frame.stream); 1.364 + } 1.365 + 1.366 + // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream 1.367 + if (frame.type === 'PUSH_PROMISE') { 1.368 + frame.promised_stream = this._createIncomingStream(frame.promised_stream); 1.369 + } 1.370 + 1.371 + frame.count_change = this._changeStreamCount.bind(this); 1.372 + 1.373 + // * and writes it to the `stream`'s `upstream` 1.374 + stream.upstream.write(frame); 1.375 + 1.376 + done(); 1.377 +}; 1.378 + 1.379 +// Settings management 1.380 +// ------------------- 1.381 + 1.382 +var defaultSettings = { 1.383 +}; 1.384 + 1.385 +// Settings management initialization: 1.386 +Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { 1.387 + // * Setting up the callback queue for setting acknowledgements 1.388 + this._settingsAckCallbacks = []; 1.389 + 1.390 + // * Sending the initial settings. 1.391 + this._log.debug({ settings: settings }, 1.392 + 'Sending the first SETTINGS frame as part of the connection header.'); 1.393 + this.set(settings || defaultSettings); 1.394 + 1.395 + // * Forwarding SETTINGS frames to the `_receiveSettings` method 1.396 + this.on('SETTINGS', this._receiveSettings); 1.397 +}; 1.398 + 1.399 +// * Checking that the first frame the other endpoint sends is SETTINGS 1.400 +Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { 1.401 + if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { 1.402 + this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); 1.403 + } else { 1.404 + this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); 1.405 + this.emit('error'); 1.406 + } 1.407 +}; 1.408 + 1.409 +// Handling of incoming SETTINGS frames. 1.410 +Connection.prototype._receiveSettings = function _receiveSettings(frame) { 1.411 + // * If it's an ACK, call the appropriate callback 1.412 + if (frame.flags.ACK) { 1.413 + var callback = this._settingsAckCallbacks.shift(); 1.414 + if (callback) { 1.415 + callback(); 1.416 + } 1.417 + } 1.418 + 1.419 + // * If it's a setting change request, then send an ACK and change the appropriate settings 1.420 + else { 1.421 + if (!this._closed) { 1.422 + this.push({ 1.423 + type: 'SETTINGS', 1.424 + flags: { ACK: true }, 1.425 + stream: 0, 1.426 + settings: {} 1.427 + }); 1.428 + } 1.429 + for (var name in frame.settings) { 1.430 + this.emit('RECEIVING_' + name, frame.settings[name]); 1.431 + } 1.432 + } 1.433 +}; 1.434 + 1.435 +// Changing one or more settings value and sending out a SETTINGS frame 1.436 +Connection.prototype.set = function set(settings, callback) { 1.437 + // * Calling the callback and emitting event when the change is acknowledges 1.438 + callback = callback || function noop() {}; 1.439 + var self = this; 1.440 + this._settingsAckCallbacks.push(function() { 1.441 + for (var name in settings) { 1.442 + self.emit('ACKNOWLEDGED_' + name, settings[name]); 1.443 + } 1.444 + callback(); 1.445 + }); 1.446 + 1.447 + // * Sending out the SETTINGS frame 1.448 + this.push({ 1.449 + type: 'SETTINGS', 1.450 + flags: { ACK: false }, 1.451 + stream: 0, 1.452 + settings: settings 1.453 + }); 1.454 + for (var name in settings) { 1.455 + this.emit('SENDING_' + name, settings[name]); 1.456 + } 1.457 +}; 1.458 + 1.459 +// Lifecycle management 1.460 +// -------------------- 1.461 + 1.462 +// The main responsibilities of lifecycle management code: 1.463 +// 1.464 +// * keeping the connection alive by 1.465 +// * sending PINGs when the connection is idle 1.466 +// * answering PINGs 1.467 +// * ending the connection 1.468 + 1.469 +Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { 1.470 + this._pings = {}; 1.471 + this.on('PING', this._receivePing); 1.472 + this.on('GOAWAY', this._receiveGoaway); 1.473 + this._closed = false; 1.474 +}; 1.475 + 1.476 +// Generating a string of length 16 with random hexadecimal digits 1.477 +Connection.prototype._generatePingId = function _generatePingId() { 1.478 + do { 1.479 + var id = ''; 1.480 + for (var i = 0; i < 16; i++) { 1.481 + id += Math.floor(Math.random()*16).toString(16); 1.482 + } 1.483 + } while(id in this._pings); 1.484 + return id; 1.485 +}; 1.486 + 1.487 +// Sending a ping and calling `callback` when the answer arrives 1.488 +Connection.prototype.ping = function ping(callback) { 1.489 + var id = this._generatePingId(); 1.490 + var data = new Buffer(id, 'hex'); 1.491 + this._pings[id] = callback; 1.492 + 1.493 + this._log.debug({ data: data }, 'Sending PING.'); 1.494 + this.push({ 1.495 + type: 'PING', 1.496 + flags: { 1.497 + ACK: false 1.498 + }, 1.499 + stream: 0, 1.500 + data: data 1.501 + }); 1.502 +}; 1.503 + 1.504 +// Answering pings 1.505 +Connection.prototype._receivePing = function _receivePing(frame) { 1.506 + if (frame.flags.ACK) { 1.507 + var id = frame.data.toString('hex'); 1.508 + if (id in this._pings) { 1.509 + this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); 1.510 + var callback = this._pings[id]; 1.511 + if (callback) { 1.512 + callback(); 1.513 + } 1.514 + delete this._pings[id]; 1.515 + } else { 1.516 + this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); 1.517 + } 1.518 + 1.519 + } else { 1.520 + this._log.debug({ data: frame.data }, 'Answering PING.'); 1.521 + this.push({ 1.522 + type: 'PING', 1.523 + flags: { 1.524 + ACK: true 1.525 + }, 1.526 + stream: 0, 1.527 + data: frame.data 1.528 + }); 1.529 + } 1.530 +}; 1.531 + 1.532 +// Terminating the connection 1.533 +Connection.prototype.close = function close(error) { 1.534 + if (this._closed) { 1.535 + this._log.warn('Trying to close an already closed connection'); 1.536 + return; 1.537 + } 1.538 + 1.539 + this._log.debug({ error: error }, 'Closing the connection'); 1.540 + this.push({ 1.541 + type: 'GOAWAY', 1.542 + flags: {}, 1.543 + stream: 0, 1.544 + last_stream: this._lastIncomingStream, 1.545 + error: error || 'NO_ERROR' 1.546 + }); 1.547 + this.push(null); 1.548 + this._closed = true; 1.549 +}; 1.550 + 1.551 +Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { 1.552 + this._log.debug({ error: frame.error }, 'Other end closed the connection'); 1.553 + this.push(null); 1.554 + this._closed = true; 1.555 + if (frame.error !== 'NO_ERROR') { 1.556 + this.emit('peerError', frame.error); 1.557 + } 1.558 +}; 1.559 + 1.560 +// Flow control 1.561 +// ------------ 1.562 + 1.563 +Connection.prototype._initializeFlowControl = function _initializeFlowControl() { 1.564 + // Handling of initial window size of individual streams. 1.565 + this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; 1.566 + this.on('new_stream', function(stream) { 1.567 + stream.upstream.setInitialWindow(this._initialStreamWindowSize); 1.568 + }); 1.569 + this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); 1.570 + this._streamIds[0].upstream.setInitialWindow = function noop() {}; 1.571 +}; 1.572 + 1.573 +// The initial connection flow control window is 65535 bytes. 1.574 +var INITIAL_STREAM_WINDOW_SIZE = 65535; 1.575 + 1.576 +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the 1.577 +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all 1.578 +// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by 1.579 +// the difference between the new value and the old value. 1.580 +Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { 1.581 + if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { 1.582 + this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); 1.583 + this.emit('error', 'FLOW_CONTROL_ERROR'); 1.584 + } else { 1.585 + this._log.debug({ size: size }, 'Changing stream initial window size.'); 1.586 + this._initialStreamWindowSize = size; 1.587 + this._streamIds.forEach(function(stream) { 1.588 + stream.upstream.setInitialWindow(size); 1.589 + }); 1.590 + } 1.591 +};