testing/xpcshell/node-http2/node_modules/http2-protocol/lib/connection.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 var assert = require('assert');
michael@0 2
michael@0 3 // The Connection class
michael@0 4 // ====================
michael@0 5
michael@0 6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
michael@0 7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a
michael@0 8 // [Flow](flow.html) subclass.
michael@0 9
michael@0 10 var Flow = require('./flow').Flow;
michael@0 11
michael@0 12 exports.Connection = Connection;
michael@0 13
michael@0 14 // Public API
michael@0 15 // ----------
michael@0 16
michael@0 17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection
michael@0 18 //
michael@0 19 // * **Event: 'error' (type)**: signals a connection level error made by the other end
michael@0 20 //
michael@0 21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
michael@0 22 // code other than NO_ERROR
michael@0 23 //
michael@0 24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream
michael@0 25 //
michael@0 26 // * **createStream(): stream**: initiate a new stream
michael@0 27 //
michael@0 28 // * **set(settings, callback)**: change the value of one or more settings according to the
michael@0 29 // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
michael@0 30 //
michael@0 31 // * **ping([callback])**: send a ping and call callback when the answer arrives
michael@0 32 //
michael@0 33 // * **close([error])**: close the stream with an error code
michael@0 34
michael@0 35 // Constructor
michael@0 36 // -----------
michael@0 37
michael@0 38 // The main aspects of managing the connection are:
michael@0 39 function Connection(log, firstStreamId, settings) {
michael@0 40 // * initializing the base class
michael@0 41 Flow.call(this, 0);
michael@0 42
michael@0 43 // * logging: every method uses the common logger object
michael@0 44 this._log = log.child({ component: 'connection' });
michael@0 45
michael@0 46 // * stream management
michael@0 47 this._initializeStreamManagement(firstStreamId);
michael@0 48
michael@0 49 // * lifecycle management
michael@0 50 this._initializeLifecycleManagement();
michael@0 51
michael@0 52 // * flow control
michael@0 53 this._initializeFlowControl();
michael@0 54
michael@0 55 // * settings management
michael@0 56 this._initializeSettingsManagement(settings);
michael@0 57
michael@0 58 // * multiplexing
michael@0 59 this._initializeMultiplexing();
michael@0 60 }
michael@0 61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
michael@0 62
michael@0 63 // Overview
michael@0 64 // --------
michael@0 65
michael@0 66 // | ^ | ^
michael@0 67 // v | v |
michael@0 68 // +--------------+ +--------------+
michael@0 69 // +---| stream1 |---| stream2 |---- .... ---+
michael@0 70 // | | +----------+ | | +----------+ | |
michael@0 71 // | | | stream1. | | | | stream2. | | |
michael@0 72 // | +-| upstream |-+ +-| upstream |-+ |
michael@0 73 // | +----------+ +----------+ |
michael@0 74 // | | ^ | ^ |
michael@0 75 // | v | v | |
michael@0 76 // | +-----+-------------+-----+-------- .... |
michael@0 77 // | ^ | | | |
michael@0 78 // | | v | | |
michael@0 79 // | +--------------+ | | |
michael@0 80 // | | stream0 | | | |
michael@0 81 // | | connection | | | |
michael@0 82 // | | management | multiplexing |
michael@0 83 // | +--------------+ flow control |
michael@0 84 // | | ^ |
michael@0 85 // | _read() | | _write() |
michael@0 86 // | v | |
michael@0 87 // | +------------+ +-----------+ |
michael@0 88 // | |output queue| |input queue| |
michael@0 89 // +----------------+------------+-+-----------+-----------------+
michael@0 90 // | ^
michael@0 91 // read() | | write()
michael@0 92 // v |
michael@0 93
michael@0 94 // Stream management
michael@0 95 // -----------------
michael@0 96
michael@0 97 var Stream = require('./stream').Stream;
michael@0 98
michael@0 99 // Initialization:
michael@0 100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
michael@0 101 // * streams are stored in two data structures:
michael@0 102 // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
michael@0 103 // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
michael@0 104 this._streamIds = [];
michael@0 105 this._streamPriorities = [];
michael@0 106
michael@0 107 // * The next outbound stream ID and the last inbound stream id
michael@0 108 this._nextStreamId = firstStreamId;
michael@0 109 this._lastIncomingStream = 0;
michael@0 110
michael@0 111 // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
michael@0 112 this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
michael@0 113
michael@0 114 // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
michael@0 115 // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
michael@0 116 this._streamSlotsFree = Infinity;
michael@0 117 this._streamLimit = Infinity;
michael@0 118 this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
michael@0 119 };
michael@0 120
michael@0 121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
michael@0 122 // broadcasts the message by creating an event on it.
michael@0 123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
michael@0 124 if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
michael@0 125 (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) {
michael@0 126 this._log.debug({ frame: frame }, 'Receiving connection level frame');
michael@0 127 this.emit(frame.type, frame);
michael@0 128 } else {
michael@0 129 this._log.error({ frame: frame }, 'Invalid connection level frame');
michael@0 130 this.emit('error', 'PROTOCOL_ERROR');
michael@0 131 }
michael@0 132 };
michael@0 133
michael@0 134 // Methods to manage the stream slot pool:
michael@0 135 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
michael@0 136 var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
michael@0 137 this._streamSlotsFree += newStreamLimit - this._streamLimit;
michael@0 138 this._streamLimit = newStreamLimit;
michael@0 139 if (wakeup) {
michael@0 140 this.emit('wakeup');
michael@0 141 }
michael@0 142 };
michael@0 143
michael@0 144 Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
michael@0 145 if (change) {
michael@0 146 this._log.trace({ free: this._streamSlotsFree, change: change },
michael@0 147 'Changing active stream count.');
michael@0 148 var wakeup = (this._streamSlotsFree === 0) && (change < 0);
michael@0 149 this._streamSlotsFree -= change;
michael@0 150 if (wakeup) {
michael@0 151 this.emit('wakeup');
michael@0 152 }
michael@0 153 }
michael@0 154 };
michael@0 155
michael@0 156 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
michael@0 157 // an outbound stream) consists of three steps:
michael@0 158 //
michael@0 159 // 1. var stream = new Stream(this._log);
michael@0 160 // 2. this._allocateId(stream, id);
michael@0 161 // 2. this._allocatePriority(stream);
michael@0 162
michael@0 163 // Allocating an ID to a stream
michael@0 164 Connection.prototype._allocateId = function _allocateId(stream, id) {
michael@0 165 // * initiated stream without definite ID
michael@0 166 if (id === undefined) {
michael@0 167 id = this._nextStreamId;
michael@0 168 this._nextStreamId += 2;
michael@0 169 }
michael@0 170
michael@0 171 // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
michael@0 172 else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
michael@0 173 this._lastIncomingStream = id;
michael@0 174 }
michael@0 175
michael@0 176 // * incoming stream with invalid ID
michael@0 177 else {
michael@0 178 this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
michael@0 179 'Invalid incoming stream ID.');
michael@0 180 this.emit('error', 'PROTOCOL_ERROR');
michael@0 181 return undefined;
michael@0 182 }
michael@0 183
michael@0 184 assert(!(id in this._streamIds));
michael@0 185
michael@0 186 // * adding to `this._streamIds`
michael@0 187 this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
michael@0 188 this._streamIds[id] = stream;
michael@0 189 stream.id = id;
michael@0 190 this.emit('new_stream', stream, id);
michael@0 191
michael@0 192 // * handling stream errors as connection errors
michael@0 193 stream.on('error', this.emit.bind(this, 'error'));
michael@0 194
michael@0 195 return id;
michael@0 196 };
michael@0 197
michael@0 198 // Allocating a priority to a stream, and managing priority changes
michael@0 199 Connection.prototype._allocatePriority = function _allocatePriority(stream) {
michael@0 200 this._log.trace({ s: stream }, 'Allocating priority for stream.');
michael@0 201 this._insert(stream, stream._priority);
michael@0 202 stream.on('priority', this._reprioritize.bind(this, stream));
michael@0 203 stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
michael@0 204 this.emit('wakeup');
michael@0 205 };
michael@0 206
michael@0 207 Connection.prototype._insert = function _insert(stream, priority) {
michael@0 208 if (priority in this._streamPriorities) {
michael@0 209 this._streamPriorities[priority].push(stream);
michael@0 210 } else {
michael@0 211 this._streamPriorities[priority] = [stream];
michael@0 212 }
michael@0 213 };
michael@0 214
michael@0 215 Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
michael@0 216 var bucket = this._streamPriorities[stream._priority];
michael@0 217 var index = bucket.indexOf(stream);
michael@0 218 assert(index !== -1);
michael@0 219 bucket.splice(index, 1);
michael@0 220 if (bucket.length === 0) {
michael@0 221 delete this._streamPriorities[stream._priority];
michael@0 222 }
michael@0 223
michael@0 224 this._insert(stream, priority);
michael@0 225 };
michael@0 226
michael@0 227 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
michael@0 228 // a previously nonexistent stream.
michael@0 229 Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
michael@0 230 this._log.debug({ stream_id: id }, 'New incoming stream.');
michael@0 231
michael@0 232 var stream = new Stream(this._log);
michael@0 233 this._allocateId(stream, id);
michael@0 234 this._allocatePriority(stream);
michael@0 235 this.emit('stream', stream, id);
michael@0 236
michael@0 237 return stream;
michael@0 238 };
michael@0 239
michael@0 240 // Creating an *outbound* stream
michael@0 241 Connection.prototype.createStream = function createStream() {
michael@0 242 this._log.trace('Creating new outbound stream.');
michael@0 243
michael@0 244 // * Receiving is enabled immediately, and an ID gets assigned to the stream
michael@0 245 var stream = new Stream(this._log);
michael@0 246 this._allocatePriority(stream);
michael@0 247
michael@0 248 return stream;
michael@0 249 };
michael@0 250
michael@0 251 // Multiplexing
michael@0 252 // ------------
michael@0 253
michael@0 254 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
michael@0 255 this.on('window_update', this.emit.bind(this, 'wakeup'));
michael@0 256 this._sendScheduled = false;
michael@0 257 this._firstFrameReceived = false;
michael@0 258 };
michael@0 259
michael@0 260 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
michael@0 261 // by child classes. It reads frames from streams and pushes them to the output buffer.
michael@0 262 Connection.prototype._send = function _send(immediate) {
michael@0 263 // * Do not do anything if the connection is already closed
michael@0 264 if (this._closed) {
michael@0 265 return;
michael@0 266 }
michael@0 267
michael@0 268 // * Collapsing multiple calls in a turn into a single deferred call
michael@0 269 if (immediate) {
michael@0 270 this._sendScheduled = false;
michael@0 271 } else {
michael@0 272 if (!this._sendScheduled) {
michael@0 273 this._sendScheduled = true;
michael@0 274 setImmediate(this._send.bind(this, true));
michael@0 275 }
michael@0 276 return;
michael@0 277 }
michael@0 278
michael@0 279 this._log.trace('Starting forwarding frames from streams.');
michael@0 280
michael@0 281 // * Looping through priority `bucket`s in priority order.
michael@0 282 priority_loop:
michael@0 283 for (var priority in this._streamPriorities) {
michael@0 284 var bucket = this._streamPriorities[priority];
michael@0 285 var nextBucket = [];
michael@0 286
michael@0 287 // * Forwarding frames from buckets with round-robin scheduling.
michael@0 288 // 1. pulling out frame
michael@0 289 // 2. if there's no frame, skip this stream
michael@0 290 // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
michael@0 291 // this stream
michael@0 292 // 4. adding stream to the bucket of the next round
michael@0 293 // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
michael@0 294 // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
michael@0 295 // 7. forwarding the frame, changing `streamCount` as appropriate
michael@0 296 // 8. stepping to the next stream if there's still more frame needed in the output buffer
michael@0 297 // 9. switching to the bucket of the next round
michael@0 298 while (bucket.length > 0) {
michael@0 299 for (var index = 0; index < bucket.length; index++) {
michael@0 300 var stream = bucket[index];
michael@0 301 var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
michael@0 302
michael@0 303 if (!frame) {
michael@0 304 continue;
michael@0 305 } else if (frame.count_change > this._streamSlotsFree) {
michael@0 306 stream.upstream.unshift(frame);
michael@0 307 continue;
michael@0 308 }
michael@0 309
michael@0 310 nextBucket.push(stream);
michael@0 311
michael@0 312 if (frame.stream === undefined) {
michael@0 313 frame.stream = stream.id || this._allocateId(stream);
michael@0 314 }
michael@0 315
michael@0 316 if (frame.type === 'PUSH_PROMISE') {
michael@0 317 this._allocatePriority(frame.promised_stream);
michael@0 318 frame.promised_stream = this._allocateId(frame.promised_stream);
michael@0 319 }
michael@0 320
michael@0 321 this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
michael@0 322 var moreNeeded = this.push(frame);
michael@0 323 this._changeStreamCount(frame.count_change);
michael@0 324
michael@0 325 assert(moreNeeded !== null); // The frame shouldn't be unforwarded
michael@0 326 if (moreNeeded === false) {
michael@0 327 break priority_loop;
michael@0 328 }
michael@0 329 }
michael@0 330
michael@0 331 bucket = nextBucket;
michael@0 332 nextBucket = [];
michael@0 333 }
michael@0 334 }
michael@0 335
michael@0 336 // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
michael@0 337 if (moreNeeded === undefined) {
michael@0 338 this.once('wakeup', this._send.bind(this));
michael@0 339 }
michael@0 340
michael@0 341 this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
michael@0 342 };
michael@0 343
michael@0 344 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
michael@0 345 // implemented by child classes. It forwards the given frame to the appropriate stream:
michael@0 346 Connection.prototype._receive = function _receive(frame, done) {
michael@0 347 this._log.trace({ frame: frame }, 'Forwarding incoming frame');
michael@0 348
michael@0 349 // * first frame needs to be checked by the `_onFirstFrameReceived` method
michael@0 350 if (!this._firstFrameReceived) {
michael@0 351 this._firstFrameReceived = true;
michael@0 352 this._onFirstFrameReceived(frame);
michael@0 353 }
michael@0 354
michael@0 355 // * gets the appropriate stream from the stream registry
michael@0 356 var stream = this._streamIds[frame.stream];
michael@0 357
michael@0 358 // * or creates one if it's not in `this.streams`
michael@0 359 if (!stream) {
michael@0 360 stream = this._createIncomingStream(frame.stream);
michael@0 361 }
michael@0 362
michael@0 363 // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
michael@0 364 if (frame.type === 'PUSH_PROMISE') {
michael@0 365 frame.promised_stream = this._createIncomingStream(frame.promised_stream);
michael@0 366 }
michael@0 367
michael@0 368 frame.count_change = this._changeStreamCount.bind(this);
michael@0 369
michael@0 370 // * and writes it to the `stream`'s `upstream`
michael@0 371 stream.upstream.write(frame);
michael@0 372
michael@0 373 done();
michael@0 374 };
michael@0 375
michael@0 376 // Settings management
michael@0 377 // -------------------
michael@0 378
michael@0 379 var defaultSettings = {
michael@0 380 };
michael@0 381
michael@0 382 // Settings management initialization:
michael@0 383 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
michael@0 384 // * Setting up the callback queue for setting acknowledgements
michael@0 385 this._settingsAckCallbacks = [];
michael@0 386
michael@0 387 // * Sending the initial settings.
michael@0 388 this._log.debug({ settings: settings },
michael@0 389 'Sending the first SETTINGS frame as part of the connection header.');
michael@0 390 this.set(settings || defaultSettings);
michael@0 391
michael@0 392 // * Forwarding SETTINGS frames to the `_receiveSettings` method
michael@0 393 this.on('SETTINGS', this._receiveSettings);
michael@0 394 };
michael@0 395
michael@0 396 // * Checking that the first frame the other endpoint sends is SETTINGS
michael@0 397 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
michael@0 398 if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
michael@0 399 this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
michael@0 400 } else {
michael@0 401 this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
michael@0 402 this.emit('error');
michael@0 403 }
michael@0 404 };
michael@0 405
michael@0 406 // Handling of incoming SETTINGS frames.
michael@0 407 Connection.prototype._receiveSettings = function _receiveSettings(frame) {
michael@0 408 // * If it's an ACK, call the appropriate callback
michael@0 409 if (frame.flags.ACK) {
michael@0 410 var callback = this._settingsAckCallbacks.shift();
michael@0 411 if (callback) {
michael@0 412 callback();
michael@0 413 }
michael@0 414 }
michael@0 415
michael@0 416 // * If it's a setting change request, then send an ACK and change the appropriate settings
michael@0 417 else {
michael@0 418 if (!this._closed) {
michael@0 419 this.push({
michael@0 420 type: 'SETTINGS',
michael@0 421 flags: { ACK: true },
michael@0 422 stream: 0,
michael@0 423 settings: {}
michael@0 424 });
michael@0 425 }
michael@0 426 for (var name in frame.settings) {
michael@0 427 this.emit('RECEIVING_' + name, frame.settings[name]);
michael@0 428 }
michael@0 429 }
michael@0 430 };
michael@0 431
michael@0 432 // Changing one or more settings value and sending out a SETTINGS frame
michael@0 433 Connection.prototype.set = function set(settings, callback) {
michael@0 434 // * Calling the callback and emitting event when the change is acknowledges
michael@0 435 callback = callback || function noop() {};
michael@0 436 var self = this;
michael@0 437 this._settingsAckCallbacks.push(function() {
michael@0 438 for (var name in settings) {
michael@0 439 self.emit('ACKNOWLEDGED_' + name, settings[name]);
michael@0 440 }
michael@0 441 callback();
michael@0 442 });
michael@0 443
michael@0 444 // * Sending out the SETTINGS frame
michael@0 445 this.push({
michael@0 446 type: 'SETTINGS',
michael@0 447 flags: { ACK: false },
michael@0 448 stream: 0,
michael@0 449 settings: settings
michael@0 450 });
michael@0 451 for (var name in settings) {
michael@0 452 this.emit('SENDING_' + name, settings[name]);
michael@0 453 }
michael@0 454 };
michael@0 455
michael@0 456 // Lifecycle management
michael@0 457 // --------------------
michael@0 458
michael@0 459 // The main responsibilities of lifecycle management code:
michael@0 460 //
michael@0 461 // * keeping the connection alive by
michael@0 462 // * sending PINGs when the connection is idle
michael@0 463 // * answering PINGs
michael@0 464 // * ending the connection
michael@0 465
michael@0 466 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
michael@0 467 this._pings = {};
michael@0 468 this.on('PING', this._receivePing);
michael@0 469 this.on('GOAWAY', this._receiveGoaway);
michael@0 470 this._closed = false;
michael@0 471 };
michael@0 472
michael@0 473 // Generating a string of length 16 with random hexadecimal digits
michael@0 474 Connection.prototype._generatePingId = function _generatePingId() {
michael@0 475 do {
michael@0 476 var id = '';
michael@0 477 for (var i = 0; i < 16; i++) {
michael@0 478 id += Math.floor(Math.random()*16).toString(16);
michael@0 479 }
michael@0 480 } while(id in this._pings);
michael@0 481 return id;
michael@0 482 };
michael@0 483
michael@0 484 // Sending a ping and calling `callback` when the answer arrives
michael@0 485 Connection.prototype.ping = function ping(callback) {
michael@0 486 var id = this._generatePingId();
michael@0 487 var data = new Buffer(id, 'hex');
michael@0 488 this._pings[id] = callback;
michael@0 489
michael@0 490 this._log.debug({ data: data }, 'Sending PING.');
michael@0 491 this.push({
michael@0 492 type: 'PING',
michael@0 493 flags: {
michael@0 494 ACK: false
michael@0 495 },
michael@0 496 stream: 0,
michael@0 497 data: data
michael@0 498 });
michael@0 499 };
michael@0 500
michael@0 501 // Answering pings
michael@0 502 Connection.prototype._receivePing = function _receivePing(frame) {
michael@0 503 if (frame.flags.ACK) {
michael@0 504 var id = frame.data.toString('hex');
michael@0 505 if (id in this._pings) {
michael@0 506 this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
michael@0 507 var callback = this._pings[id];
michael@0 508 if (callback) {
michael@0 509 callback();
michael@0 510 }
michael@0 511 delete this._pings[id];
michael@0 512 } else {
michael@0 513 this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
michael@0 514 }
michael@0 515
michael@0 516 } else {
michael@0 517 this._log.debug({ data: frame.data }, 'Answering PING.');
michael@0 518 this.push({
michael@0 519 type: 'PING',
michael@0 520 flags: {
michael@0 521 ACK: true
michael@0 522 },
michael@0 523 stream: 0,
michael@0 524 data: frame.data
michael@0 525 });
michael@0 526 }
michael@0 527 };
michael@0 528
michael@0 529 // Terminating the connection
michael@0 530 Connection.prototype.close = function close(error) {
michael@0 531 if (this._closed) {
michael@0 532 this._log.warn('Trying to close an already closed connection');
michael@0 533 return;
michael@0 534 }
michael@0 535
michael@0 536 this._log.debug({ error: error }, 'Closing the connection');
michael@0 537 this.push({
michael@0 538 type: 'GOAWAY',
michael@0 539 flags: {},
michael@0 540 stream: 0,
michael@0 541 last_stream: this._lastIncomingStream,
michael@0 542 error: error || 'NO_ERROR'
michael@0 543 });
michael@0 544 this.push(null);
michael@0 545 this._closed = true;
michael@0 546 };
michael@0 547
michael@0 548 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
michael@0 549 this._log.debug({ error: frame.error }, 'Other end closed the connection');
michael@0 550 this.push(null);
michael@0 551 this._closed = true;
michael@0 552 if (frame.error !== 'NO_ERROR') {
michael@0 553 this.emit('peerError', frame.error);
michael@0 554 }
michael@0 555 };
michael@0 556
michael@0 557 // Flow control
michael@0 558 // ------------
michael@0 559
michael@0 560 Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
michael@0 561 // Handling of initial window size of individual streams.
michael@0 562 this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
michael@0 563 this.on('new_stream', function(stream) {
michael@0 564 stream.upstream.setInitialWindow(this._initialStreamWindowSize);
michael@0 565 });
michael@0 566 this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
michael@0 567 this._streamIds[0].upstream.setInitialWindow = function noop() {};
michael@0 568 };
michael@0 569
michael@0 570 // The initial connection flow control window is 65535 bytes.
michael@0 571 var INITIAL_STREAM_WINDOW_SIZE = 65535;
michael@0 572
michael@0 573 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
michael@0 574 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
michael@0 575 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
michael@0 576 // the difference between the new value and the old value.
michael@0 577 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
michael@0 578 if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
michael@0 579 this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
michael@0 580 this.emit('error', 'FLOW_CONTROL_ERROR');
michael@0 581 } else {
michael@0 582 this._log.debug({ size: size }, 'Changing stream initial window size.');
michael@0 583 this._initialStreamWindowSize = size;
michael@0 584 this._streamIds.forEach(function(stream) {
michael@0 585 stream.upstream.setInitialWindow(size);
michael@0 586 });
michael@0 587 }
michael@0 588 };

mercurial