testing/xpcshell/node-http2/node_modules/http2-protocol/lib/connection.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 var assert = require('assert');
     3 // The Connection class
     4 // ====================
     6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
     7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a
     8 // [Flow](flow.html) subclass.
    10 var Flow = require('./flow').Flow;
    12 exports.Connection = Connection;
    14 // Public API
    15 // ----------
    17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection
    18 //
    19 // * **Event: 'error' (type)**: signals a connection level error made by the other end
    20 //
    21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
    22 //   code other than NO_ERROR
    23 //
    24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream
    25 //
    26 // * **createStream(): stream**: initiate a new stream
    27 //
    28 // * **set(settings, callback)**: change the value of one or more settings according to the
    29 //   key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
    30 //
    31 // * **ping([callback])**: send a ping and call callback when the answer arrives
    32 //
    33 // * **close([error])**: close the stream with an error code
    35 // Constructor
    36 // -----------
    38 // The main aspects of managing the connection are:
    39 function Connection(log, firstStreamId, settings) {
    40   // * initializing the base class
    41   Flow.call(this, 0);
    43   // * logging: every method uses the common logger object
    44   this._log = log.child({ component: 'connection' });
    46   // * stream management
    47   this._initializeStreamManagement(firstStreamId);
    49   // * lifecycle management
    50   this._initializeLifecycleManagement();
    52   // * flow control
    53   this._initializeFlowControl();
    55   // * settings management
    56   this._initializeSettingsManagement(settings);
    58   // * multiplexing
    59   this._initializeMultiplexing();
    60 }
    61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
    63 // Overview
    64 // --------
    66 //              |    ^             |    ^
    67 //              v    |             v    |
    68 //         +--------------+   +--------------+
    69 //     +---|   stream1    |---|   stream2    |----      ....      ---+
    70 //     |   | +----------+ |   | +----------+ |                       |
    71 //     |   | | stream1. | |   | | stream2. | |                       |
    72 //     |   +-| upstream |-+   +-| upstream |-+                       |
    73 //     |     +----------+       +----------+                         |
    74 //     |       |     ^             |     ^                           |
    75 //     |       v     |             v     |                           |
    76 //     |       +-----+-------------+-----+--------      ....         |
    77 //     |       ^     |             |     |                           |
    78 //     |       |     v             |     |                           |
    79 //     |   +--------------+        |     |                           |
    80 //     |   |   stream0    |        |     |                           |
    81 //     |   |  connection  |        |     |                           |
    82 //     |   |  management  |     multiplexing                         |
    83 //     |   +--------------+     flow control                         |
    84 //     |                           |     ^                           |
    85 //     |                   _read() |     | _write()                  |
    86 //     |                           v     |                           |
    87 //     |                +------------+ +-----------+                 |
    88 //     |                |output queue| |input queue|                 |
    89 //     +----------------+------------+-+-----------+-----------------+
    90 //                                 |     ^
    91 //                          read() |     | write()
    92 //                                 v     |
    94 // Stream management
    95 // -----------------
    97 var Stream  = require('./stream').Stream;
    99 // Initialization:
   100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
   101   // * streams are stored in two data structures:
   102   //   * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
   103   //   * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
   104   this._streamIds = [];
   105   this._streamPriorities = [];
   107   // * The next outbound stream ID and the last inbound stream id
   108   this._nextStreamId = firstStreamId;
   109   this._lastIncomingStream = 0;
   111   // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
   112   this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
   114   // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
   115   //   be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
   116   this._streamSlotsFree = Infinity;
   117   this._streamLimit = Infinity;
   118   this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
   119 };
   121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
   122 // broadcasts the message by creating an event on it.
   123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
   124   if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
   125       (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) {
   126     this._log.debug({ frame: frame }, 'Receiving connection level frame');
   127     this.emit(frame.type, frame);
   128   } else {
   129     this._log.error({ frame: frame }, 'Invalid connection level frame');
   130     this.emit('error', 'PROTOCOL_ERROR');
   131   }
   132 };
   134 // Methods to manage the stream slot pool:
   135 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
   136   var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
   137   this._streamSlotsFree += newStreamLimit - this._streamLimit;
   138   this._streamLimit = newStreamLimit;
   139   if (wakeup) {
   140     this.emit('wakeup');
   141   }
   142 };
   144 Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
   145   if (change) {
   146     this._log.trace({ free: this._streamSlotsFree, change: change },
   147                     'Changing active stream count.');
   148     var wakeup = (this._streamSlotsFree === 0) && (change < 0);
   149     this._streamSlotsFree -= change;
   150     if (wakeup) {
   151       this.emit('wakeup');
   152     }
   153   }
   154 };
   156 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
   157 // an outbound stream) consists of three steps:
   158 //
   159 // 1. var stream = new Stream(this._log);
   160 // 2. this._allocateId(stream, id);
   161 // 2. this._allocatePriority(stream);
   163 // Allocating an ID to a stream
   164 Connection.prototype._allocateId = function _allocateId(stream, id) {
   165   // * initiated stream without definite ID
   166   if (id === undefined) {
   167     id = this._nextStreamId;
   168     this._nextStreamId += 2;
   169   }
   171   // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
   172   else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
   173     this._lastIncomingStream = id;
   174   }
   176   // * incoming stream with invalid ID
   177   else {
   178     this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
   179                     'Invalid incoming stream ID.');
   180     this.emit('error', 'PROTOCOL_ERROR');
   181     return undefined;
   182   }
   184   assert(!(id in this._streamIds));
   186   // * adding to `this._streamIds`
   187   this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
   188   this._streamIds[id] = stream;
   189   stream.id = id;
   190   this.emit('new_stream', stream, id);
   192   // * handling stream errors as connection errors
   193   stream.on('error', this.emit.bind(this, 'error'));
   195   return id;
   196 };
   198 // Allocating a priority to a stream, and managing priority changes
   199 Connection.prototype._allocatePriority = function _allocatePriority(stream) {
   200   this._log.trace({ s: stream }, 'Allocating priority for stream.');
   201   this._insert(stream, stream._priority);
   202   stream.on('priority', this._reprioritize.bind(this, stream));
   203   stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
   204   this.emit('wakeup');
   205 };
   207 Connection.prototype._insert = function _insert(stream, priority) {
   208   if (priority in this._streamPriorities) {
   209     this._streamPriorities[priority].push(stream);
   210   } else {
   211     this._streamPriorities[priority] = [stream];
   212   }
   213 };
   215 Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
   216   var bucket = this._streamPriorities[stream._priority];
   217   var index = bucket.indexOf(stream);
   218   assert(index !== -1);
   219   bucket.splice(index, 1);
   220   if (bucket.length === 0) {
   221     delete this._streamPriorities[stream._priority];
   222   }
   224   this._insert(stream, priority);
   225 };
   227 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
   228 // a previously nonexistent stream.
   229 Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
   230   this._log.debug({ stream_id: id }, 'New incoming stream.');
   232   var stream = new Stream(this._log);
   233   this._allocateId(stream, id);
   234   this._allocatePriority(stream);
   235   this.emit('stream', stream, id);
   237   return stream;
   238 };
   240 // Creating an *outbound* stream
   241 Connection.prototype.createStream = function createStream() {
   242   this._log.trace('Creating new outbound stream.');
   244   // * Receiving is enabled immediately, and an ID gets assigned to the stream
   245   var stream = new Stream(this._log);
   246   this._allocatePriority(stream);
   248   return stream;
   249 };
   251 // Multiplexing
   252 // ------------
   254 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
   255   this.on('window_update', this.emit.bind(this, 'wakeup'));
   256   this._sendScheduled = false;
   257   this._firstFrameReceived = false;
   258 };
   260 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
   261 // by child classes. It reads frames from streams and pushes them to the output buffer.
   262 Connection.prototype._send = function _send(immediate) {
   263   // * Do not do anything if the connection is already closed
   264   if (this._closed) {
   265     return;
   266   }
   268   // * Collapsing multiple calls in a turn into a single deferred call
   269   if (immediate) {
   270     this._sendScheduled = false;
   271   } else {
   272     if (!this._sendScheduled) {
   273       this._sendScheduled = true;
   274       setImmediate(this._send.bind(this, true));
   275     }
   276     return;
   277   }
   279   this._log.trace('Starting forwarding frames from streams.');
   281   // * Looping through priority `bucket`s in priority order.
   282 priority_loop:
   283   for (var priority in this._streamPriorities) {
   284     var bucket = this._streamPriorities[priority];
   285     var nextBucket = [];
   287     // * Forwarding frames from buckets with round-robin scheduling.
   288     //   1. pulling out frame
   289     //   2. if there's no frame, skip this stream
   290     //   3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
   291     //      this stream
   292     //   4. adding stream to the bucket of the next round
   293     //   5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
   294     //   6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
   295     //   7. forwarding the frame, changing `streamCount` as appropriate
   296     //   8. stepping to the next stream if there's still more frame needed in the output buffer
   297     //   9. switching to the bucket of the next round
   298     while (bucket.length > 0) {
   299       for (var index = 0; index < bucket.length; index++) {
   300         var stream = bucket[index];
   301         var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
   303         if (!frame) {
   304           continue;
   305         } else if (frame.count_change > this._streamSlotsFree) {
   306           stream.upstream.unshift(frame);
   307           continue;
   308         }
   310         nextBucket.push(stream);
   312         if (frame.stream === undefined) {
   313           frame.stream = stream.id || this._allocateId(stream);
   314         }
   316         if (frame.type === 'PUSH_PROMISE') {
   317           this._allocatePriority(frame.promised_stream);
   318           frame.promised_stream = this._allocateId(frame.promised_stream);
   319         }
   321         this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
   322         var moreNeeded = this.push(frame);
   323         this._changeStreamCount(frame.count_change);
   325         assert(moreNeeded !== null); // The frame shouldn't be unforwarded
   326         if (moreNeeded === false) {
   327           break priority_loop;
   328         }
   329       }
   331       bucket = nextBucket;
   332       nextBucket = [];
   333     }
   334   }
   336   // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
   337   if (moreNeeded === undefined) {
   338     this.once('wakeup', this._send.bind(this));
   339   }
   341   this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
   342 };
   344 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
   345 // implemented by child classes. It forwards the given frame to the appropriate stream:
   346 Connection.prototype._receive = function _receive(frame, done) {
   347   this._log.trace({ frame: frame }, 'Forwarding incoming frame');
   349   // * first frame needs to be checked by the `_onFirstFrameReceived` method
   350   if (!this._firstFrameReceived) {
   351     this._firstFrameReceived = true;
   352     this._onFirstFrameReceived(frame);
   353   }
   355   // * gets the appropriate stream from the stream registry
   356   var stream = this._streamIds[frame.stream];
   358   // * or creates one if it's not in `this.streams`
   359   if (!stream) {
   360     stream = this._createIncomingStream(frame.stream);
   361   }
   363   // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
   364   if (frame.type === 'PUSH_PROMISE') {
   365     frame.promised_stream = this._createIncomingStream(frame.promised_stream);
   366   }
   368   frame.count_change = this._changeStreamCount.bind(this);
   370   // * and writes it to the `stream`'s `upstream`
   371   stream.upstream.write(frame);
   373   done();
   374 };
   376 // Settings management
   377 // -------------------
   379 var defaultSettings = {
   380 };
   382 // Settings management initialization:
   383 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
   384   // * Setting up the callback queue for setting acknowledgements
   385   this._settingsAckCallbacks = [];
   387   // * Sending the initial settings.
   388   this._log.debug({ settings: settings },
   389                   'Sending the first SETTINGS frame as part of the connection header.');
   390   this.set(settings || defaultSettings);
   392   // * Forwarding SETTINGS frames to the `_receiveSettings` method
   393   this.on('SETTINGS', this._receiveSettings);
   394 };
   396 // * Checking that the first frame the other endpoint sends is SETTINGS
   397 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
   398   if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
   399     this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
   400   } else {
   401     this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
   402     this.emit('error');
   403   }
   404 };
   406 // Handling of incoming SETTINGS frames.
   407 Connection.prototype._receiveSettings = function _receiveSettings(frame) {
   408   // * If it's an ACK, call the appropriate callback
   409   if (frame.flags.ACK) {
   410     var callback = this._settingsAckCallbacks.shift();
   411     if (callback) {
   412       callback();
   413     }
   414   }
   416   // * If it's a setting change request, then send an ACK and change the appropriate settings
   417   else {
   418     if (!this._closed) {
   419       this.push({
   420         type: 'SETTINGS',
   421         flags: { ACK: true },
   422         stream: 0,
   423         settings: {}
   424       });
   425     }
   426     for (var name in frame.settings) {
   427       this.emit('RECEIVING_' + name, frame.settings[name]);
   428     }
   429   }
   430 };
   432 // Changing one or more settings value and sending out a SETTINGS frame
   433 Connection.prototype.set = function set(settings, callback) {
   434   // * Calling the callback and emitting event when the change is acknowledges
   435   callback = callback || function noop() {};
   436   var self = this;
   437   this._settingsAckCallbacks.push(function() {
   438     for (var name in settings) {
   439       self.emit('ACKNOWLEDGED_' + name, settings[name]);
   440     }
   441     callback();
   442   });
   444   // * Sending out the SETTINGS frame
   445   this.push({
   446     type: 'SETTINGS',
   447     flags: { ACK: false },
   448     stream: 0,
   449     settings: settings
   450   });
   451   for (var name in settings) {
   452     this.emit('SENDING_' + name, settings[name]);
   453   }
   454 };
   456 // Lifecycle management
   457 // --------------------
   459 // The main responsibilities of lifecycle management code:
   460 //
   461 // * keeping the connection alive by
   462 //   * sending PINGs when the connection is idle
   463 //   * answering PINGs
   464 // * ending the connection
   466 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
   467   this._pings = {};
   468   this.on('PING', this._receivePing);
   469   this.on('GOAWAY', this._receiveGoaway);
   470   this._closed = false;
   471 };
   473 // Generating a string of length 16 with random hexadecimal digits
   474 Connection.prototype._generatePingId = function _generatePingId() {
   475   do {
   476     var id = '';
   477     for (var i = 0; i < 16; i++) {
   478       id += Math.floor(Math.random()*16).toString(16);
   479     }
   480   } while(id in this._pings);
   481   return id;
   482 };
   484 // Sending a ping and calling `callback` when the answer arrives
   485 Connection.prototype.ping = function ping(callback) {
   486   var id = this._generatePingId();
   487   var data = new Buffer(id, 'hex');
   488   this._pings[id] = callback;
   490   this._log.debug({ data: data }, 'Sending PING.');
   491   this.push({
   492     type: 'PING',
   493     flags: {
   494       ACK: false
   495     },
   496     stream: 0,
   497     data: data
   498   });
   499 };
   501 // Answering pings
   502 Connection.prototype._receivePing = function _receivePing(frame) {
   503   if (frame.flags.ACK) {
   504     var id = frame.data.toString('hex');
   505     if (id in this._pings) {
   506       this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
   507       var callback = this._pings[id];
   508       if (callback) {
   509         callback();
   510       }
   511       delete this._pings[id];
   512     } else {
   513       this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
   514     }
   516   } else {
   517     this._log.debug({ data: frame.data }, 'Answering PING.');
   518     this.push({
   519       type: 'PING',
   520       flags: {
   521         ACK: true
   522       },
   523       stream: 0,
   524       data: frame.data
   525     });
   526   }
   527 };
   529 // Terminating the connection
   530 Connection.prototype.close = function close(error) {
   531   if (this._closed) {
   532     this._log.warn('Trying to close an already closed connection');
   533     return;
   534   }
   536   this._log.debug({ error: error }, 'Closing the connection');
   537   this.push({
   538     type: 'GOAWAY',
   539     flags: {},
   540     stream: 0,
   541     last_stream: this._lastIncomingStream,
   542     error: error || 'NO_ERROR'
   543   });
   544   this.push(null);
   545   this._closed = true;
   546 };
   548 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
   549   this._log.debug({ error: frame.error }, 'Other end closed the connection');
   550   this.push(null);
   551   this._closed = true;
   552   if (frame.error !== 'NO_ERROR') {
   553     this.emit('peerError', frame.error);
   554   }
   555 };
   557 // Flow control
   558 // ------------
   560 Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
   561   // Handling of initial window size of individual streams.
   562   this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
   563   this.on('new_stream', function(stream) {
   564     stream.upstream.setInitialWindow(this._initialStreamWindowSize);
   565   });
   566   this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
   567   this._streamIds[0].upstream.setInitialWindow = function noop() {};
   568 };
   570 // The initial connection flow control window is 65535 bytes.
   571 var INITIAL_STREAM_WINDOW_SIZE = 65535;
   573 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
   574 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
   575 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
   576 // the difference between the new value and the old value.
   577 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
   578   if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
   579     this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
   580     this.emit('error', 'FLOW_CONTROL_ERROR');
   581   } else {
   582     this._log.debug({ size: size }, 'Changing stream initial window size.');
   583     this._initialStreamWindowSize = size;
   584     this._streamIds.forEach(function(stream) {
   585       stream.upstream.setInitialWindow(size);
   586     });
   587   }
   588 };

mercurial