testing/xpcshell/node-http2/node_modules/http2-protocol/lib/connection.js

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/connection.js	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,588 @@
     1.4 +var assert = require('assert');
     1.5 +
     1.6 +// The Connection class
     1.7 +// ====================
     1.8 +
     1.9 +// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
    1.10 +// stream (TCP stream). It operates by sending and receiving frames and is implemented as a
    1.11 +// [Flow](flow.html) subclass.
    1.12 +
    1.13 +var Flow = require('./flow').Flow;
    1.14 +
    1.15 +exports.Connection = Connection;
    1.16 +
    1.17 +// Public API
    1.18 +// ----------
    1.19 +
    1.20 +// * **new Connection(log, firstStreamId, settings)**: create a new Connection
    1.21 +//
    1.22 +// * **Event: 'error' (type)**: signals a connection level error made by the other end
    1.23 +//
    1.24 +// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
    1.25 +//   code other than NO_ERROR
    1.26 +//
    1.27 +// * **Event: 'stream' (stream)**: signals that there's an incoming stream
    1.28 +//
    1.29 +// * **createStream(): stream**: initiate a new stream
    1.30 +//
    1.31 +// * **set(settings, callback)**: change the value of one or more settings according to the
    1.32 +//   key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
    1.33 +//
    1.34 +// * **ping([callback])**: send a ping and call callback when the answer arrives
    1.35 +//
    1.36 +// * **close([error])**: close the stream with an error code
    1.37 +
    1.38 +// Constructor
    1.39 +// -----------
    1.40 +
    1.41 +// The main aspects of managing the connection are:
    1.42 +function Connection(log, firstStreamId, settings) {
    1.43 +  // * initializing the base class
    1.44 +  Flow.call(this, 0);
    1.45 +
    1.46 +  // * logging: every method uses the common logger object
    1.47 +  this._log = log.child({ component: 'connection' });
    1.48 +
    1.49 +  // * stream management
    1.50 +  this._initializeStreamManagement(firstStreamId);
    1.51 +
    1.52 +  // * lifecycle management
    1.53 +  this._initializeLifecycleManagement();
    1.54 +
    1.55 +  // * flow control
    1.56 +  this._initializeFlowControl();
    1.57 +
    1.58 +  // * settings management
    1.59 +  this._initializeSettingsManagement(settings);
    1.60 +
    1.61 +  // * multiplexing
    1.62 +  this._initializeMultiplexing();
    1.63 +}
    1.64 +Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
    1.65 +
    1.66 +// Overview
    1.67 +// --------
    1.68 +
    1.69 +//              |    ^             |    ^
    1.70 +//              v    |             v    |
    1.71 +//         +--------------+   +--------------+
    1.72 +//     +---|   stream1    |---|   stream2    |----      ....      ---+
    1.73 +//     |   | +----------+ |   | +----------+ |                       |
    1.74 +//     |   | | stream1. | |   | | stream2. | |                       |
    1.75 +//     |   +-| upstream |-+   +-| upstream |-+                       |
    1.76 +//     |     +----------+       +----------+                         |
    1.77 +//     |       |     ^             |     ^                           |
    1.78 +//     |       v     |             v     |                           |
    1.79 +//     |       +-----+-------------+-----+--------      ....         |
    1.80 +//     |       ^     |             |     |                           |
    1.81 +//     |       |     v             |     |                           |
    1.82 +//     |   +--------------+        |     |                           |
    1.83 +//     |   |   stream0    |        |     |                           |
    1.84 +//     |   |  connection  |        |     |                           |
    1.85 +//     |   |  management  |     multiplexing                         |
    1.86 +//     |   +--------------+     flow control                         |
    1.87 +//     |                           |     ^                           |
    1.88 +//     |                   _read() |     | _write()                  |
    1.89 +//     |                           v     |                           |
    1.90 +//     |                +------------+ +-----------+                 |
    1.91 +//     |                |output queue| |input queue|                 |
    1.92 +//     +----------------+------------+-+-----------+-----------------+
    1.93 +//                                 |     ^
    1.94 +//                          read() |     | write()
    1.95 +//                                 v     |
    1.96 +
    1.97 +// Stream management
    1.98 +// -----------------
    1.99 +
   1.100 +var Stream  = require('./stream').Stream;
   1.101 +
   1.102 +// Initialization:
   1.103 +Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
   1.104 +  // * streams are stored in two data structures:
   1.105 +  //   * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
   1.106 +  //   * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
   1.107 +  this._streamIds = [];
   1.108 +  this._streamPriorities = [];
   1.109 +
   1.110 +  // * The next outbound stream ID and the last inbound stream id
   1.111 +  this._nextStreamId = firstStreamId;
   1.112 +  this._lastIncomingStream = 0;
   1.113 +
   1.114 +  // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
   1.115 +  this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
   1.116 +
   1.117 +  // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
   1.118 +  //   be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
   1.119 +  this._streamSlotsFree = Infinity;
   1.120 +  this._streamLimit = Infinity;
   1.121 +  this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
   1.122 +};
   1.123 +
   1.124 +// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
   1.125 +// broadcasts the message by creating an event on it.
   1.126 +Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
   1.127 +  if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
   1.128 +      (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) {
   1.129 +    this._log.debug({ frame: frame }, 'Receiving connection level frame');
   1.130 +    this.emit(frame.type, frame);
   1.131 +  } else {
   1.132 +    this._log.error({ frame: frame }, 'Invalid connection level frame');
   1.133 +    this.emit('error', 'PROTOCOL_ERROR');
   1.134 +  }
   1.135 +};
   1.136 +
   1.137 +// Methods to manage the stream slot pool:
   1.138 +Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
   1.139 +  var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
   1.140 +  this._streamSlotsFree += newStreamLimit - this._streamLimit;
   1.141 +  this._streamLimit = newStreamLimit;
   1.142 +  if (wakeup) {
   1.143 +    this.emit('wakeup');
   1.144 +  }
   1.145 +};
   1.146 +
   1.147 +Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
   1.148 +  if (change) {
   1.149 +    this._log.trace({ free: this._streamSlotsFree, change: change },
   1.150 +                    'Changing active stream count.');
   1.151 +    var wakeup = (this._streamSlotsFree === 0) && (change < 0);
   1.152 +    this._streamSlotsFree -= change;
   1.153 +    if (wakeup) {
   1.154 +      this.emit('wakeup');
   1.155 +    }
   1.156 +  }
   1.157 +};
   1.158 +
   1.159 +// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
   1.160 +// an outbound stream) consists of three steps:
   1.161 +//
   1.162 +// 1. var stream = new Stream(this._log);
   1.163 +// 2. this._allocateId(stream, id);
   1.164 +// 2. this._allocatePriority(stream);
   1.165 +
   1.166 +// Allocating an ID to a stream
   1.167 +Connection.prototype._allocateId = function _allocateId(stream, id) {
   1.168 +  // * initiated stream without definite ID
   1.169 +  if (id === undefined) {
   1.170 +    id = this._nextStreamId;
   1.171 +    this._nextStreamId += 2;
   1.172 +  }
   1.173 +
   1.174 +  // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
   1.175 +  else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
   1.176 +    this._lastIncomingStream = id;
   1.177 +  }
   1.178 +
   1.179 +  // * incoming stream with invalid ID
   1.180 +  else {
   1.181 +    this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
   1.182 +                    'Invalid incoming stream ID.');
   1.183 +    this.emit('error', 'PROTOCOL_ERROR');
   1.184 +    return undefined;
   1.185 +  }
   1.186 +
   1.187 +  assert(!(id in this._streamIds));
   1.188 +
   1.189 +  // * adding to `this._streamIds`
   1.190 +  this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
   1.191 +  this._streamIds[id] = stream;
   1.192 +  stream.id = id;
   1.193 +  this.emit('new_stream', stream, id);
   1.194 +
   1.195 +  // * handling stream errors as connection errors
   1.196 +  stream.on('error', this.emit.bind(this, 'error'));
   1.197 +
   1.198 +  return id;
   1.199 +};
   1.200 +
   1.201 +// Allocating a priority to a stream, and managing priority changes
   1.202 +Connection.prototype._allocatePriority = function _allocatePriority(stream) {
   1.203 +  this._log.trace({ s: stream }, 'Allocating priority for stream.');
   1.204 +  this._insert(stream, stream._priority);
   1.205 +  stream.on('priority', this._reprioritize.bind(this, stream));
   1.206 +  stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
   1.207 +  this.emit('wakeup');
   1.208 +};
   1.209 +
   1.210 +Connection.prototype._insert = function _insert(stream, priority) {
   1.211 +  if (priority in this._streamPriorities) {
   1.212 +    this._streamPriorities[priority].push(stream);
   1.213 +  } else {
   1.214 +    this._streamPriorities[priority] = [stream];
   1.215 +  }
   1.216 +};
   1.217 +
   1.218 +Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
   1.219 +  var bucket = this._streamPriorities[stream._priority];
   1.220 +  var index = bucket.indexOf(stream);
   1.221 +  assert(index !== -1);
   1.222 +  bucket.splice(index, 1);
   1.223 +  if (bucket.length === 0) {
   1.224 +    delete this._streamPriorities[stream._priority];
   1.225 +  }
   1.226 +
   1.227 +  this._insert(stream, priority);
   1.228 +};
   1.229 +
   1.230 +// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
   1.231 +// a previously nonexistent stream.
   1.232 +Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
   1.233 +  this._log.debug({ stream_id: id }, 'New incoming stream.');
   1.234 +
   1.235 +  var stream = new Stream(this._log);
   1.236 +  this._allocateId(stream, id);
   1.237 +  this._allocatePriority(stream);
   1.238 +  this.emit('stream', stream, id);
   1.239 +
   1.240 +  return stream;
   1.241 +};
   1.242 +
   1.243 +// Creating an *outbound* stream
   1.244 +Connection.prototype.createStream = function createStream() {
   1.245 +  this._log.trace('Creating new outbound stream.');
   1.246 +
   1.247 +  // * Receiving is enabled immediately, and an ID gets assigned to the stream
   1.248 +  var stream = new Stream(this._log);
   1.249 +  this._allocatePriority(stream);
   1.250 +
   1.251 +  return stream;
   1.252 +};
   1.253 +
   1.254 +// Multiplexing
   1.255 +// ------------
   1.256 +
   1.257 +Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
   1.258 +  this.on('window_update', this.emit.bind(this, 'wakeup'));
   1.259 +  this._sendScheduled = false;
   1.260 +  this._firstFrameReceived = false;
   1.261 +};
   1.262 +
   1.263 +// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
   1.264 +// by child classes. It reads frames from streams and pushes them to the output buffer.
   1.265 +Connection.prototype._send = function _send(immediate) {
   1.266 +  // * Do not do anything if the connection is already closed
   1.267 +  if (this._closed) {
   1.268 +    return;
   1.269 +  }
   1.270 +
   1.271 +  // * Collapsing multiple calls in a turn into a single deferred call
   1.272 +  if (immediate) {
   1.273 +    this._sendScheduled = false;
   1.274 +  } else {
   1.275 +    if (!this._sendScheduled) {
   1.276 +      this._sendScheduled = true;
   1.277 +      setImmediate(this._send.bind(this, true));
   1.278 +    }
   1.279 +    return;
   1.280 +  }
   1.281 +
   1.282 +  this._log.trace('Starting forwarding frames from streams.');
   1.283 +
   1.284 +  // * Looping through priority `bucket`s in priority order.
   1.285 +priority_loop:
   1.286 +  for (var priority in this._streamPriorities) {
   1.287 +    var bucket = this._streamPriorities[priority];
   1.288 +    var nextBucket = [];
   1.289 +
   1.290 +    // * Forwarding frames from buckets with round-robin scheduling.
   1.291 +    //   1. pulling out frame
   1.292 +    //   2. if there's no frame, skip this stream
   1.293 +    //   3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
   1.294 +    //      this stream
   1.295 +    //   4. adding stream to the bucket of the next round
   1.296 +    //   5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
   1.297 +    //   6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
   1.298 +    //   7. forwarding the frame, changing `streamCount` as appropriate
   1.299 +    //   8. stepping to the next stream if there's still more frame needed in the output buffer
   1.300 +    //   9. switching to the bucket of the next round
   1.301 +    while (bucket.length > 0) {
   1.302 +      for (var index = 0; index < bucket.length; index++) {
   1.303 +        var stream = bucket[index];
   1.304 +        var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
   1.305 +
   1.306 +        if (!frame) {
   1.307 +          continue;
   1.308 +        } else if (frame.count_change > this._streamSlotsFree) {
   1.309 +          stream.upstream.unshift(frame);
   1.310 +          continue;
   1.311 +        }
   1.312 +
   1.313 +        nextBucket.push(stream);
   1.314 +
   1.315 +        if (frame.stream === undefined) {
   1.316 +          frame.stream = stream.id || this._allocateId(stream);
   1.317 +        }
   1.318 +
   1.319 +        if (frame.type === 'PUSH_PROMISE') {
   1.320 +          this._allocatePriority(frame.promised_stream);
   1.321 +          frame.promised_stream = this._allocateId(frame.promised_stream);
   1.322 +        }
   1.323 +
   1.324 +        this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
   1.325 +        var moreNeeded = this.push(frame);
   1.326 +        this._changeStreamCount(frame.count_change);
   1.327 +
   1.328 +        assert(moreNeeded !== null); // The frame shouldn't be unforwarded
   1.329 +        if (moreNeeded === false) {
   1.330 +          break priority_loop;
   1.331 +        }
   1.332 +      }
   1.333 +
   1.334 +      bucket = nextBucket;
   1.335 +      nextBucket = [];
   1.336 +    }
   1.337 +  }
   1.338 +
   1.339 +  // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
   1.340 +  if (moreNeeded === undefined) {
   1.341 +    this.once('wakeup', this._send.bind(this));
   1.342 +  }
   1.343 +
   1.344 +  this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
   1.345 +};
   1.346 +
   1.347 +// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
   1.348 +// implemented by child classes. It forwards the given frame to the appropriate stream:
   1.349 +Connection.prototype._receive = function _receive(frame, done) {
   1.350 +  this._log.trace({ frame: frame }, 'Forwarding incoming frame');
   1.351 +
   1.352 +  // * first frame needs to be checked by the `_onFirstFrameReceived` method
   1.353 +  if (!this._firstFrameReceived) {
   1.354 +    this._firstFrameReceived = true;
   1.355 +    this._onFirstFrameReceived(frame);
   1.356 +  }
   1.357 +
   1.358 +  // * gets the appropriate stream from the stream registry
   1.359 +  var stream = this._streamIds[frame.stream];
   1.360 +
   1.361 +  // * or creates one if it's not in `this.streams`
   1.362 +  if (!stream) {
   1.363 +    stream = this._createIncomingStream(frame.stream);
   1.364 +  }
   1.365 +
   1.366 +  // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
   1.367 +  if (frame.type === 'PUSH_PROMISE') {
   1.368 +    frame.promised_stream = this._createIncomingStream(frame.promised_stream);
   1.369 +  }
   1.370 +
   1.371 +  frame.count_change = this._changeStreamCount.bind(this);
   1.372 +
   1.373 +  // * and writes it to the `stream`'s `upstream`
   1.374 +  stream.upstream.write(frame);
   1.375 +
   1.376 +  done();
   1.377 +};
   1.378 +
   1.379 +// Settings management
   1.380 +// -------------------
   1.381 +
   1.382 +var defaultSettings = {
   1.383 +};
   1.384 +
   1.385 +// Settings management initialization:
   1.386 +Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
   1.387 +  // * Setting up the callback queue for setting acknowledgements
   1.388 +  this._settingsAckCallbacks = [];
   1.389 +
   1.390 +  // * Sending the initial settings.
   1.391 +  this._log.debug({ settings: settings },
   1.392 +                  'Sending the first SETTINGS frame as part of the connection header.');
   1.393 +  this.set(settings || defaultSettings);
   1.394 +
   1.395 +  // * Forwarding SETTINGS frames to the `_receiveSettings` method
   1.396 +  this.on('SETTINGS', this._receiveSettings);
   1.397 +};
   1.398 +
   1.399 +// * Checking that the first frame the other endpoint sends is SETTINGS
   1.400 +Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
   1.401 +  if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
   1.402 +    this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
   1.403 +  } else {
   1.404 +    this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
   1.405 +    this.emit('error');
   1.406 +  }
   1.407 +};
   1.408 +
   1.409 +// Handling of incoming SETTINGS frames.
   1.410 +Connection.prototype._receiveSettings = function _receiveSettings(frame) {
   1.411 +  // * If it's an ACK, call the appropriate callback
   1.412 +  if (frame.flags.ACK) {
   1.413 +    var callback = this._settingsAckCallbacks.shift();
   1.414 +    if (callback) {
   1.415 +      callback();
   1.416 +    }
   1.417 +  }
   1.418 +
   1.419 +  // * If it's a setting change request, then send an ACK and change the appropriate settings
   1.420 +  else {
   1.421 +    if (!this._closed) {
   1.422 +      this.push({
   1.423 +        type: 'SETTINGS',
   1.424 +        flags: { ACK: true },
   1.425 +        stream: 0,
   1.426 +        settings: {}
   1.427 +      });
   1.428 +    }
   1.429 +    for (var name in frame.settings) {
   1.430 +      this.emit('RECEIVING_' + name, frame.settings[name]);
   1.431 +    }
   1.432 +  }
   1.433 +};
   1.434 +
   1.435 +// Changing one or more settings value and sending out a SETTINGS frame
   1.436 +Connection.prototype.set = function set(settings, callback) {
   1.437 +  // * Calling the callback and emitting event when the change is acknowledges
   1.438 +  callback = callback || function noop() {};
   1.439 +  var self = this;
   1.440 +  this._settingsAckCallbacks.push(function() {
   1.441 +    for (var name in settings) {
   1.442 +      self.emit('ACKNOWLEDGED_' + name, settings[name]);
   1.443 +    }
   1.444 +    callback();
   1.445 +  });
   1.446 +
   1.447 +  // * Sending out the SETTINGS frame
   1.448 +  this.push({
   1.449 +    type: 'SETTINGS',
   1.450 +    flags: { ACK: false },
   1.451 +    stream: 0,
   1.452 +    settings: settings
   1.453 +  });
   1.454 +  for (var name in settings) {
   1.455 +    this.emit('SENDING_' + name, settings[name]);
   1.456 +  }
   1.457 +};
   1.458 +
   1.459 +// Lifecycle management
   1.460 +// --------------------
   1.461 +
   1.462 +// The main responsibilities of lifecycle management code:
   1.463 +//
   1.464 +// * keeping the connection alive by
   1.465 +//   * sending PINGs when the connection is idle
   1.466 +//   * answering PINGs
   1.467 +// * ending the connection
   1.468 +
   1.469 +Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
   1.470 +  this._pings = {};
   1.471 +  this.on('PING', this._receivePing);
   1.472 +  this.on('GOAWAY', this._receiveGoaway);
   1.473 +  this._closed = false;
   1.474 +};
   1.475 +
   1.476 +// Generating a string of length 16 with random hexadecimal digits
   1.477 +Connection.prototype._generatePingId = function _generatePingId() {
   1.478 +  do {
   1.479 +    var id = '';
   1.480 +    for (var i = 0; i < 16; i++) {
   1.481 +      id += Math.floor(Math.random()*16).toString(16);
   1.482 +    }
   1.483 +  } while(id in this._pings);
   1.484 +  return id;
   1.485 +};
   1.486 +
   1.487 +// Sending a ping and calling `callback` when the answer arrives
   1.488 +Connection.prototype.ping = function ping(callback) {
   1.489 +  var id = this._generatePingId();
   1.490 +  var data = new Buffer(id, 'hex');
   1.491 +  this._pings[id] = callback;
   1.492 +
   1.493 +  this._log.debug({ data: data }, 'Sending PING.');
   1.494 +  this.push({
   1.495 +    type: 'PING',
   1.496 +    flags: {
   1.497 +      ACK: false
   1.498 +    },
   1.499 +    stream: 0,
   1.500 +    data: data
   1.501 +  });
   1.502 +};
   1.503 +
   1.504 +// Answering pings
   1.505 +Connection.prototype._receivePing = function _receivePing(frame) {
   1.506 +  if (frame.flags.ACK) {
   1.507 +    var id = frame.data.toString('hex');
   1.508 +    if (id in this._pings) {
   1.509 +      this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
   1.510 +      var callback = this._pings[id];
   1.511 +      if (callback) {
   1.512 +        callback();
   1.513 +      }
   1.514 +      delete this._pings[id];
   1.515 +    } else {
   1.516 +      this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
   1.517 +    }
   1.518 +
   1.519 +  } else {
   1.520 +    this._log.debug({ data: frame.data }, 'Answering PING.');
   1.521 +    this.push({
   1.522 +      type: 'PING',
   1.523 +      flags: {
   1.524 +        ACK: true
   1.525 +      },
   1.526 +      stream: 0,
   1.527 +      data: frame.data
   1.528 +    });
   1.529 +  }
   1.530 +};
   1.531 +
   1.532 +// Terminating the connection
   1.533 +Connection.prototype.close = function close(error) {
   1.534 +  if (this._closed) {
   1.535 +    this._log.warn('Trying to close an already closed connection');
   1.536 +    return;
   1.537 +  }
   1.538 +
   1.539 +  this._log.debug({ error: error }, 'Closing the connection');
   1.540 +  this.push({
   1.541 +    type: 'GOAWAY',
   1.542 +    flags: {},
   1.543 +    stream: 0,
   1.544 +    last_stream: this._lastIncomingStream,
   1.545 +    error: error || 'NO_ERROR'
   1.546 +  });
   1.547 +  this.push(null);
   1.548 +  this._closed = true;
   1.549 +};
   1.550 +
   1.551 +Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
   1.552 +  this._log.debug({ error: frame.error }, 'Other end closed the connection');
   1.553 +  this.push(null);
   1.554 +  this._closed = true;
   1.555 +  if (frame.error !== 'NO_ERROR') {
   1.556 +    this.emit('peerError', frame.error);
   1.557 +  }
   1.558 +};
   1.559 +
   1.560 +// Flow control
   1.561 +// ------------
   1.562 +
   1.563 +Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
   1.564 +  // Handling of initial window size of individual streams.
   1.565 +  this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
   1.566 +  this.on('new_stream', function(stream) {
   1.567 +    stream.upstream.setInitialWindow(this._initialStreamWindowSize);
   1.568 +  });
   1.569 +  this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
   1.570 +  this._streamIds[0].upstream.setInitialWindow = function noop() {};
   1.571 +};
   1.572 +
   1.573 +// The initial connection flow control window is 65535 bytes.
   1.574 +var INITIAL_STREAM_WINDOW_SIZE = 65535;
   1.575 +
   1.576 +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
   1.577 +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
   1.578 +// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
   1.579 +// the difference between the new value and the old value.
   1.580 +Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
   1.581 +  if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
   1.582 +    this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
   1.583 +    this.emit('error', 'FLOW_CONTROL_ERROR');
   1.584 +  } else {
   1.585 +    this._log.debug({ size: size }, 'Changing stream initial window size.');
   1.586 +    this._initialStreamWindowSize = size;
   1.587 +    this._streamIds.forEach(function(stream) {
   1.588 +      stream.upstream.setInitialWindow(size);
   1.589 +    });
   1.590 +  }
   1.591 +};

mercurial