michael@0: var assert = require('assert'); michael@0: michael@0: // The Connection class michael@0: // ==================== michael@0: michael@0: // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport michael@0: // stream (TCP stream). It operates by sending and receiving frames and is implemented as a michael@0: // [Flow](flow.html) subclass. michael@0: michael@0: var Flow = require('./flow').Flow; michael@0: michael@0: exports.Connection = Connection; michael@0: michael@0: // Public API michael@0: // ---------- michael@0: michael@0: // * **new Connection(log, firstStreamId, settings)**: create a new Connection michael@0: // michael@0: // * **Event: 'error' (type)**: signals a connection level error made by the other end michael@0: // michael@0: // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error michael@0: // code other than NO_ERROR michael@0: // michael@0: // * **Event: 'stream' (stream)**: signals that there's an incoming stream michael@0: // michael@0: // * **createStream(): stream**: initiate a new stream michael@0: // michael@0: // * **set(settings, callback)**: change the value of one or more settings according to the michael@0: // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. michael@0: // michael@0: // * **ping([callback])**: send a ping and call callback when the answer arrives michael@0: // michael@0: // * **close([error])**: close the stream with an error code michael@0: michael@0: // Constructor michael@0: // ----------- michael@0: michael@0: // The main aspects of managing the connection are: michael@0: function Connection(log, firstStreamId, settings) { michael@0: // * initializing the base class michael@0: Flow.call(this, 0); michael@0: michael@0: // * logging: every method uses the common logger object michael@0: this._log = log.child({ component: 'connection' }); michael@0: michael@0: // * stream management michael@0: this._initializeStreamManagement(firstStreamId); michael@0: michael@0: // * lifecycle management michael@0: this._initializeLifecycleManagement(); michael@0: michael@0: // * flow control michael@0: this._initializeFlowControl(); michael@0: michael@0: // * settings management michael@0: this._initializeSettingsManagement(settings); michael@0: michael@0: // * multiplexing michael@0: this._initializeMultiplexing(); michael@0: } michael@0: Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); michael@0: michael@0: // Overview michael@0: // -------- michael@0: michael@0: // | ^ | ^ michael@0: // v | v | michael@0: // +--------------+ +--------------+ michael@0: // +---| stream1 |---| stream2 |---- .... ---+ michael@0: // | | +----------+ | | +----------+ | | michael@0: // | | | stream1. | | | | stream2. | | | michael@0: // | +-| upstream |-+ +-| upstream |-+ | michael@0: // | +----------+ +----------+ | michael@0: // | | ^ | ^ | michael@0: // | v | v | | michael@0: // | +-----+-------------+-----+-------- .... | michael@0: // | ^ | | | | michael@0: // | | v | | | michael@0: // | +--------------+ | | | michael@0: // | | stream0 | | | | michael@0: // | | connection | | | | michael@0: // | | management | multiplexing | michael@0: // | +--------------+ flow control | michael@0: // | | ^ | michael@0: // | _read() | | _write() | michael@0: // | v | | michael@0: // | +------------+ +-----------+ | michael@0: // | |output queue| |input queue| | michael@0: // +----------------+------------+-+-----------+-----------------+ michael@0: // | ^ michael@0: // read() | | write() michael@0: // v | michael@0: michael@0: // Stream management michael@0: // ----------------- michael@0: michael@0: var Stream = require('./stream').Stream; michael@0: michael@0: // Initialization: michael@0: Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { michael@0: // * streams are stored in two data structures: michael@0: // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. michael@0: // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. michael@0: this._streamIds = []; michael@0: this._streamPriorities = []; michael@0: michael@0: // * The next outbound stream ID and the last inbound stream id michael@0: this._nextStreamId = firstStreamId; michael@0: this._lastIncomingStream = 0; michael@0: michael@0: // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID michael@0: this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; michael@0: michael@0: // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can michael@0: // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. michael@0: this._streamSlotsFree = Infinity; michael@0: this._streamLimit = Infinity; michael@0: this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); michael@0: }; michael@0: michael@0: // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It michael@0: // broadcasts the message by creating an event on it. michael@0: Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { michael@0: if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || michael@0: (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) { michael@0: this._log.debug({ frame: frame }, 'Receiving connection level frame'); michael@0: this.emit(frame.type, frame); michael@0: } else { michael@0: this._log.error({ frame: frame }, 'Invalid connection level frame'); michael@0: this.emit('error', 'PROTOCOL_ERROR'); michael@0: } michael@0: }; michael@0: michael@0: // Methods to manage the stream slot pool: michael@0: Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { michael@0: var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); michael@0: this._streamSlotsFree += newStreamLimit - this._streamLimit; michael@0: this._streamLimit = newStreamLimit; michael@0: if (wakeup) { michael@0: this.emit('wakeup'); michael@0: } michael@0: }; michael@0: michael@0: Connection.prototype._changeStreamCount = function _changeStreamCount(change) { michael@0: if (change) { michael@0: this._log.trace({ free: this._streamSlotsFree, change: change }, michael@0: 'Changing active stream count.'); michael@0: var wakeup = (this._streamSlotsFree === 0) && (change < 0); michael@0: this._streamSlotsFree -= change; michael@0: if (wakeup) { michael@0: this.emit('wakeup'); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of michael@0: // an outbound stream) consists of three steps: michael@0: // michael@0: // 1. var stream = new Stream(this._log); michael@0: // 2. this._allocateId(stream, id); michael@0: // 2. this._allocatePriority(stream); michael@0: michael@0: // Allocating an ID to a stream michael@0: Connection.prototype._allocateId = function _allocateId(stream, id) { michael@0: // * initiated stream without definite ID michael@0: if (id === undefined) { michael@0: id = this._nextStreamId; michael@0: this._nextStreamId += 2; michael@0: } michael@0: michael@0: // * incoming stream with a legitim ID (larger than any previous and different parity than ours) michael@0: else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { michael@0: this._lastIncomingStream = id; michael@0: } michael@0: michael@0: // * incoming stream with invalid ID michael@0: else { michael@0: this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, michael@0: 'Invalid incoming stream ID.'); michael@0: this.emit('error', 'PROTOCOL_ERROR'); michael@0: return undefined; michael@0: } michael@0: michael@0: assert(!(id in this._streamIds)); michael@0: michael@0: // * adding to `this._streamIds` michael@0: this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); michael@0: this._streamIds[id] = stream; michael@0: stream.id = id; michael@0: this.emit('new_stream', stream, id); michael@0: michael@0: // * handling stream errors as connection errors michael@0: stream.on('error', this.emit.bind(this, 'error')); michael@0: michael@0: return id; michael@0: }; michael@0: michael@0: // Allocating a priority to a stream, and managing priority changes michael@0: Connection.prototype._allocatePriority = function _allocatePriority(stream) { michael@0: this._log.trace({ s: stream }, 'Allocating priority for stream.'); michael@0: this._insert(stream, stream._priority); michael@0: stream.on('priority', this._reprioritize.bind(this, stream)); michael@0: stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); michael@0: this.emit('wakeup'); michael@0: }; michael@0: michael@0: Connection.prototype._insert = function _insert(stream, priority) { michael@0: if (priority in this._streamPriorities) { michael@0: this._streamPriorities[priority].push(stream); michael@0: } else { michael@0: this._streamPriorities[priority] = [stream]; michael@0: } michael@0: }; michael@0: michael@0: Connection.prototype._reprioritize = function _reprioritize(stream, priority) { michael@0: var bucket = this._streamPriorities[stream._priority]; michael@0: var index = bucket.indexOf(stream); michael@0: assert(index !== -1); michael@0: bucket.splice(index, 1); michael@0: if (bucket.length === 0) { michael@0: delete this._streamPriorities[stream._priority]; michael@0: } michael@0: michael@0: this._insert(stream, priority); michael@0: }; michael@0: michael@0: // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to michael@0: // a previously nonexistent stream. michael@0: Connection.prototype._createIncomingStream = function _createIncomingStream(id) { michael@0: this._log.debug({ stream_id: id }, 'New incoming stream.'); michael@0: michael@0: var stream = new Stream(this._log); michael@0: this._allocateId(stream, id); michael@0: this._allocatePriority(stream); michael@0: this.emit('stream', stream, id); michael@0: michael@0: return stream; michael@0: }; michael@0: michael@0: // Creating an *outbound* stream michael@0: Connection.prototype.createStream = function createStream() { michael@0: this._log.trace('Creating new outbound stream.'); michael@0: michael@0: // * Receiving is enabled immediately, and an ID gets assigned to the stream michael@0: var stream = new Stream(this._log); michael@0: this._allocatePriority(stream); michael@0: michael@0: return stream; michael@0: }; michael@0: michael@0: // Multiplexing michael@0: // ------------ michael@0: michael@0: Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { michael@0: this.on('window_update', this.emit.bind(this, 'wakeup')); michael@0: this._sendScheduled = false; michael@0: this._firstFrameReceived = false; michael@0: }; michael@0: michael@0: // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented michael@0: // by child classes. It reads frames from streams and pushes them to the output buffer. michael@0: Connection.prototype._send = function _send(immediate) { michael@0: // * Do not do anything if the connection is already closed michael@0: if (this._closed) { michael@0: return; michael@0: } michael@0: michael@0: // * Collapsing multiple calls in a turn into a single deferred call michael@0: if (immediate) { michael@0: this._sendScheduled = false; michael@0: } else { michael@0: if (!this._sendScheduled) { michael@0: this._sendScheduled = true; michael@0: setImmediate(this._send.bind(this, true)); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: this._log.trace('Starting forwarding frames from streams.'); michael@0: michael@0: // * Looping through priority `bucket`s in priority order. michael@0: priority_loop: michael@0: for (var priority in this._streamPriorities) { michael@0: var bucket = this._streamPriorities[priority]; michael@0: var nextBucket = []; michael@0: michael@0: // * Forwarding frames from buckets with round-robin scheduling. michael@0: // 1. pulling out frame michael@0: // 2. if there's no frame, skip this stream michael@0: // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip michael@0: // this stream michael@0: // 4. adding stream to the bucket of the next round michael@0: // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) michael@0: // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream michael@0: // 7. forwarding the frame, changing `streamCount` as appropriate michael@0: // 8. stepping to the next stream if there's still more frame needed in the output buffer michael@0: // 9. switching to the bucket of the next round michael@0: while (bucket.length > 0) { michael@0: for (var index = 0; index < bucket.length; index++) { michael@0: var stream = bucket[index]; michael@0: var frame = stream.upstream.read((this._window > 0) ? this._window : -1); michael@0: michael@0: if (!frame) { michael@0: continue; michael@0: } else if (frame.count_change > this._streamSlotsFree) { michael@0: stream.upstream.unshift(frame); michael@0: continue; michael@0: } michael@0: michael@0: nextBucket.push(stream); michael@0: michael@0: if (frame.stream === undefined) { michael@0: frame.stream = stream.id || this._allocateId(stream); michael@0: } michael@0: michael@0: if (frame.type === 'PUSH_PROMISE') { michael@0: this._allocatePriority(frame.promised_stream); michael@0: frame.promised_stream = this._allocateId(frame.promised_stream); michael@0: } michael@0: michael@0: this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); michael@0: var moreNeeded = this.push(frame); michael@0: this._changeStreamCount(frame.count_change); michael@0: michael@0: assert(moreNeeded !== null); // The frame shouldn't be unforwarded michael@0: if (moreNeeded === false) { michael@0: break priority_loop; michael@0: } michael@0: } michael@0: michael@0: bucket = nextBucket; michael@0: nextBucket = []; michael@0: } michael@0: } michael@0: michael@0: // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event michael@0: if (moreNeeded === undefined) { michael@0: this.once('wakeup', this._send.bind(this)); michael@0: } michael@0: michael@0: this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); michael@0: }; michael@0: michael@0: // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be michael@0: // implemented by child classes. It forwards the given frame to the appropriate stream: michael@0: Connection.prototype._receive = function _receive(frame, done) { michael@0: this._log.trace({ frame: frame }, 'Forwarding incoming frame'); michael@0: michael@0: // * first frame needs to be checked by the `_onFirstFrameReceived` method michael@0: if (!this._firstFrameReceived) { michael@0: this._firstFrameReceived = true; michael@0: this._onFirstFrameReceived(frame); michael@0: } michael@0: michael@0: // * gets the appropriate stream from the stream registry michael@0: var stream = this._streamIds[frame.stream]; michael@0: michael@0: // * or creates one if it's not in `this.streams` michael@0: if (!stream) { michael@0: stream = this._createIncomingStream(frame.stream); michael@0: } michael@0: michael@0: // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream michael@0: if (frame.type === 'PUSH_PROMISE') { michael@0: frame.promised_stream = this._createIncomingStream(frame.promised_stream); michael@0: } michael@0: michael@0: frame.count_change = this._changeStreamCount.bind(this); michael@0: michael@0: // * and writes it to the `stream`'s `upstream` michael@0: stream.upstream.write(frame); michael@0: michael@0: done(); michael@0: }; michael@0: michael@0: // Settings management michael@0: // ------------------- michael@0: michael@0: var defaultSettings = { michael@0: }; michael@0: michael@0: // Settings management initialization: michael@0: Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { michael@0: // * Setting up the callback queue for setting acknowledgements michael@0: this._settingsAckCallbacks = []; michael@0: michael@0: // * Sending the initial settings. michael@0: this._log.debug({ settings: settings }, michael@0: 'Sending the first SETTINGS frame as part of the connection header.'); michael@0: this.set(settings || defaultSettings); michael@0: michael@0: // * Forwarding SETTINGS frames to the `_receiveSettings` method michael@0: this.on('SETTINGS', this._receiveSettings); michael@0: }; michael@0: michael@0: // * Checking that the first frame the other endpoint sends is SETTINGS michael@0: Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { michael@0: if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { michael@0: this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); michael@0: } else { michael@0: this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); michael@0: this.emit('error'); michael@0: } michael@0: }; michael@0: michael@0: // Handling of incoming SETTINGS frames. michael@0: Connection.prototype._receiveSettings = function _receiveSettings(frame) { michael@0: // * If it's an ACK, call the appropriate callback michael@0: if (frame.flags.ACK) { michael@0: var callback = this._settingsAckCallbacks.shift(); michael@0: if (callback) { michael@0: callback(); michael@0: } michael@0: } michael@0: michael@0: // * If it's a setting change request, then send an ACK and change the appropriate settings michael@0: else { michael@0: if (!this._closed) { michael@0: this.push({ michael@0: type: 'SETTINGS', michael@0: flags: { ACK: true }, michael@0: stream: 0, michael@0: settings: {} michael@0: }); michael@0: } michael@0: for (var name in frame.settings) { michael@0: this.emit('RECEIVING_' + name, frame.settings[name]); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: // Changing one or more settings value and sending out a SETTINGS frame michael@0: Connection.prototype.set = function set(settings, callback) { michael@0: // * Calling the callback and emitting event when the change is acknowledges michael@0: callback = callback || function noop() {}; michael@0: var self = this; michael@0: this._settingsAckCallbacks.push(function() { michael@0: for (var name in settings) { michael@0: self.emit('ACKNOWLEDGED_' + name, settings[name]); michael@0: } michael@0: callback(); michael@0: }); michael@0: michael@0: // * Sending out the SETTINGS frame michael@0: this.push({ michael@0: type: 'SETTINGS', michael@0: flags: { ACK: false }, michael@0: stream: 0, michael@0: settings: settings michael@0: }); michael@0: for (var name in settings) { michael@0: this.emit('SENDING_' + name, settings[name]); michael@0: } michael@0: }; michael@0: michael@0: // Lifecycle management michael@0: // -------------------- michael@0: michael@0: // The main responsibilities of lifecycle management code: michael@0: // michael@0: // * keeping the connection alive by michael@0: // * sending PINGs when the connection is idle michael@0: // * answering PINGs michael@0: // * ending the connection michael@0: michael@0: Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { michael@0: this._pings = {}; michael@0: this.on('PING', this._receivePing); michael@0: this.on('GOAWAY', this._receiveGoaway); michael@0: this._closed = false; michael@0: }; michael@0: michael@0: // Generating a string of length 16 with random hexadecimal digits michael@0: Connection.prototype._generatePingId = function _generatePingId() { michael@0: do { michael@0: var id = ''; michael@0: for (var i = 0; i < 16; i++) { michael@0: id += Math.floor(Math.random()*16).toString(16); michael@0: } michael@0: } while(id in this._pings); michael@0: return id; michael@0: }; michael@0: michael@0: // Sending a ping and calling `callback` when the answer arrives michael@0: Connection.prototype.ping = function ping(callback) { michael@0: var id = this._generatePingId(); michael@0: var data = new Buffer(id, 'hex'); michael@0: this._pings[id] = callback; michael@0: michael@0: this._log.debug({ data: data }, 'Sending PING.'); michael@0: this.push({ michael@0: type: 'PING', michael@0: flags: { michael@0: ACK: false michael@0: }, michael@0: stream: 0, michael@0: data: data michael@0: }); michael@0: }; michael@0: michael@0: // Answering pings michael@0: Connection.prototype._receivePing = function _receivePing(frame) { michael@0: if (frame.flags.ACK) { michael@0: var id = frame.data.toString('hex'); michael@0: if (id in this._pings) { michael@0: this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); michael@0: var callback = this._pings[id]; michael@0: if (callback) { michael@0: callback(); michael@0: } michael@0: delete this._pings[id]; michael@0: } else { michael@0: this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); michael@0: } michael@0: michael@0: } else { michael@0: this._log.debug({ data: frame.data }, 'Answering PING.'); michael@0: this.push({ michael@0: type: 'PING', michael@0: flags: { michael@0: ACK: true michael@0: }, michael@0: stream: 0, michael@0: data: frame.data michael@0: }); michael@0: } michael@0: }; michael@0: michael@0: // Terminating the connection michael@0: Connection.prototype.close = function close(error) { michael@0: if (this._closed) { michael@0: this._log.warn('Trying to close an already closed connection'); michael@0: return; michael@0: } michael@0: michael@0: this._log.debug({ error: error }, 'Closing the connection'); michael@0: this.push({ michael@0: type: 'GOAWAY', michael@0: flags: {}, michael@0: stream: 0, michael@0: last_stream: this._lastIncomingStream, michael@0: error: error || 'NO_ERROR' michael@0: }); michael@0: this.push(null); michael@0: this._closed = true; michael@0: }; michael@0: michael@0: Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { michael@0: this._log.debug({ error: frame.error }, 'Other end closed the connection'); michael@0: this.push(null); michael@0: this._closed = true; michael@0: if (frame.error !== 'NO_ERROR') { michael@0: this.emit('peerError', frame.error); michael@0: } michael@0: }; michael@0: michael@0: // Flow control michael@0: // ------------ michael@0: michael@0: Connection.prototype._initializeFlowControl = function _initializeFlowControl() { michael@0: // Handling of initial window size of individual streams. michael@0: this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; michael@0: this.on('new_stream', function(stream) { michael@0: stream.upstream.setInitialWindow(this._initialStreamWindowSize); michael@0: }); michael@0: this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); michael@0: this._streamIds[0].upstream.setInitialWindow = function noop() {}; michael@0: }; michael@0: michael@0: // The initial connection flow control window is 65535 bytes. michael@0: var INITIAL_STREAM_WINDOW_SIZE = 65535; michael@0: michael@0: // A SETTINGS frame can alter the initial flow control window size for all current streams. When the michael@0: // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all michael@0: // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by michael@0: // the difference between the new value and the old value. michael@0: Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { michael@0: if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { michael@0: this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); michael@0: this.emit('error', 'FLOW_CONTROL_ERROR'); michael@0: } else { michael@0: this._log.debug({ size: size }, 'Changing stream initial window size.'); michael@0: this._initialStreamWindowSize = size; michael@0: this._streamIds.forEach(function(stream) { michael@0: stream.upstream.setInitialWindow(size); michael@0: }); michael@0: } michael@0: };