michael@0: var assert = require('assert'); michael@0: michael@0: var Serializer = require('./framer').Serializer; michael@0: var Deserializer = require('./framer').Deserializer; michael@0: var Compressor = require('./compressor').Compressor; michael@0: var Decompressor = require('./compressor').Decompressor; michael@0: var Connection = require('./connection').Connection; michael@0: var Duplex = require('stream').Duplex; michael@0: var Transform = require('stream').Transform; michael@0: michael@0: exports.Endpoint = Endpoint; michael@0: michael@0: // The Endpoint class michael@0: // ================== michael@0: michael@0: // Public API michael@0: // ---------- michael@0: michael@0: // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. michael@0: // michael@0: // - `log`: bunyan logger of the parent michael@0: // - `role`: 'CLIENT' or 'SERVER' michael@0: // - `settings`: initial HTTP/2 settings michael@0: // - `filters`: a map of functions that filter the traffic between components (for debugging or michael@0: // intentional failure injection). michael@0: // michael@0: // Filter functions get three arguments: michael@0: // 1. `frame`: the current frame michael@0: // 2. `forward(frame)`: function that can be used to forward a frame to the next component michael@0: // 3. `done()`: callback to signal the end of the filter process michael@0: // michael@0: // Valid filter names and their position in the stack: michael@0: // - `beforeSerialization`: after compression, before serialization michael@0: // - `beforeCompression`: after multiplexing, before compression michael@0: // - `afterDeserialization`: after deserialization, before decompression michael@0: // - `afterDecompression`: after decompression, before multiplexing michael@0: // michael@0: // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection michael@0: // michael@0: // * **Event: 'error' (type)**: signals an error michael@0: // michael@0: // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) michael@0: // michael@0: // * **close([error])**: close the connection with an error code michael@0: michael@0: // Constructor michael@0: // ----------- michael@0: michael@0: // The process of initialization: michael@0: function Endpoint(log, role, settings, filters) { michael@0: Duplex.call(this); michael@0: michael@0: // * Initializing logging infrastructure michael@0: this._log = log.child({ component: 'endpoint', e: this }); michael@0: michael@0: // * First part of the handshake process: sending and receiving the client connection header michael@0: // prelude. michael@0: assert((role === 'CLIENT') || role === 'SERVER'); michael@0: if (role === 'CLIENT') { michael@0: this._writePrelude(); michael@0: } else { michael@0: this._readPrelude(); michael@0: } michael@0: michael@0: // * Initialization of component. This includes the second part of the handshake process: michael@0: // sending the first SETTINGS frame. This is done by the connection class right after michael@0: // initialization. michael@0: this._initializeDataFlow(role, settings, filters || {}); michael@0: michael@0: // * Initialization of management code. michael@0: this._initializeManagement(); michael@0: michael@0: // * Initializing error handling. michael@0: this._initializeErrorHandling(); michael@0: } michael@0: Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); michael@0: michael@0: // Handshake michael@0: // --------- michael@0: michael@0: var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); michael@0: michael@0: // Writing the client header is simple and synchronous. michael@0: Endpoint.prototype._writePrelude = function _writePrelude() { michael@0: this._log.debug('Sending the client connection header prelude.'); michael@0: this.push(CLIENT_PRELUDE); michael@0: }; michael@0: michael@0: // The asynchronous process of reading the client header: michael@0: Endpoint.prototype._readPrelude = function _readPrelude() { michael@0: // * progress in the header is tracker using a `cursor` michael@0: var cursor = 0; michael@0: michael@0: // * `_write` is temporarily replaced by the comparator function michael@0: this._write = function _temporalWrite(chunk, encoding, done) { michael@0: // * which compares the stored header with the current `chunk` byte by byte and emits the michael@0: // 'error' event if there's a byte that doesn't match michael@0: var offset = cursor; michael@0: while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { michael@0: if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { michael@0: this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, michael@0: 'Client connection header prelude does not match.'); michael@0: this._error('handshake', 'PROTOCOL_ERROR'); michael@0: return; michael@0: } michael@0: cursor += 1; michael@0: } michael@0: michael@0: // * if the whole header is over, and there were no error then restore the original `_write` michael@0: // and call it with the remaining part of the current chunk michael@0: if (cursor === CLIENT_PRELUDE.length) { michael@0: this._log.debug('Successfully received the client connection header prelude.'); michael@0: delete this._write; michael@0: chunk = chunk.slice(cursor - offset); michael@0: this._write(chunk, encoding, done); michael@0: } michael@0: }; michael@0: }; michael@0: michael@0: // Data flow michael@0: // --------- michael@0: michael@0: // +---------------------------------------------+ michael@0: // | | michael@0: // | +-------------------------------------+ | michael@0: // | | +---------+ +---------+ +---------+ | | michael@0: // | | | stream1 | | stream2 | | ... | | | michael@0: // | | +---------+ +---------+ +---------+ | | michael@0: // | | connection | | michael@0: // | +-------------------------------------+ | michael@0: // | | ^ | michael@0: // | pipe | | pipe | michael@0: // | v | | michael@0: // | +------------------+------------------+ | michael@0: // | | compressor | decompressor | | michael@0: // | +------------------+------------------+ | michael@0: // | | ^ | michael@0: // | pipe | | pipe | michael@0: // | v | | michael@0: // | +------------------+------------------+ | michael@0: // | | serializer | deserializer | | michael@0: // | +------------------+------------------+ | michael@0: // | | ^ | michael@0: // | _read() | | _write() | michael@0: // | v | | michael@0: // | +------------+ +-----------+ | michael@0: // | |output queue| |input queue| | michael@0: // +------+------------+-----+-----------+-------+ michael@0: // | ^ michael@0: // read() | | write() michael@0: // v | michael@0: michael@0: function createTransformStream(filter) { michael@0: var transform = new Transform({ objectMode: true }); michael@0: var push = transform.push.bind(transform); michael@0: transform._transform = function(frame, encoding, done) { michael@0: filter(frame, push, done); michael@0: }; michael@0: return transform; michael@0: } michael@0: michael@0: function pipeAndFilter(stream1, stream2, filter) { michael@0: if (filter) { michael@0: stream1.pipe(createTransformStream(filter)).pipe(stream2); michael@0: } else { michael@0: stream1.pipe(stream2); michael@0: } michael@0: } michael@0: michael@0: var MAX_HTTP_PAYLOAD_SIZE = 16383; michael@0: michael@0: Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { michael@0: var firstStreamId, compressorRole, decompressorRole; michael@0: if (role === 'CLIENT') { michael@0: firstStreamId = 1; michael@0: compressorRole = 'REQUEST'; michael@0: decompressorRole = 'RESPONSE'; michael@0: } else { michael@0: firstStreamId = 2; michael@0: compressorRole = 'RESPONSE'; michael@0: decompressorRole = 'REQUEST'; michael@0: } michael@0: michael@0: this._serializer = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE); michael@0: this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE); michael@0: this._compressor = new Compressor(this._log, compressorRole); michael@0: this._decompressor = new Decompressor(this._log, decompressorRole); michael@0: this._connection = new Connection(this._log, firstStreamId, settings); michael@0: michael@0: pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); michael@0: pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); michael@0: pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); michael@0: pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); michael@0: michael@0: this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', michael@0: this._decompressor.setTableSizeLimit.bind(this._decompressor)) michael@0: this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', michael@0: this._compressor.setTableSizeLimit.bind(this._compressor)) michael@0: }; michael@0: michael@0: var noread = {}; michael@0: Endpoint.prototype._read = function _read() { michael@0: this._readableState.sync = true; michael@0: var moreNeeded = noread, chunk; michael@0: while (moreNeeded && (chunk = this._serializer.read())) { michael@0: moreNeeded = this.push(chunk); michael@0: } michael@0: if (moreNeeded === noread) { michael@0: this._serializer.once('readable', this._read.bind(this)); michael@0: } michael@0: this._readableState.sync = false; michael@0: }; michael@0: michael@0: Endpoint.prototype._write = function _write(chunk, encoding, done) { michael@0: this._deserializer.write(chunk, encoding, done); michael@0: }; michael@0: michael@0: // Management michael@0: // -------------- michael@0: michael@0: Endpoint.prototype._initializeManagement = function _initializeManagement() { michael@0: this._connection.on('stream', this.emit.bind(this, 'stream')); michael@0: }; michael@0: michael@0: Endpoint.prototype.createStream = function createStream() { michael@0: return this._connection.createStream(); michael@0: }; michael@0: michael@0: // Error handling michael@0: // -------------- michael@0: michael@0: Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { michael@0: this._serializer.on('error', this._error.bind(this, 'serializer')); michael@0: this._deserializer.on('error', this._error.bind(this, 'deserializer')); michael@0: this._compressor.on('error', this._error.bind(this, 'compressor')); michael@0: this._decompressor.on('error', this._error.bind(this, 'decompressor')); michael@0: this._connection.on('error', this._error.bind(this, 'connection')); michael@0: michael@0: this._connection.on('peerError', this.emit.bind(this, 'peerError')); michael@0: }; michael@0: michael@0: Endpoint.prototype._error = function _error(component, error) { michael@0: this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); michael@0: this.close(error); michael@0: setImmediate(this.emit.bind(this, 'error', error)); michael@0: }; michael@0: michael@0: Endpoint.prototype.close = function close(error) { michael@0: this._connection.close(error); michael@0: }; michael@0: michael@0: // Bunyan serializers michael@0: // ------------------ michael@0: michael@0: exports.serializers = {}; michael@0: michael@0: var nextId = 0; michael@0: exports.serializers.e = function(endpoint) { michael@0: if (!('id' in endpoint)) { michael@0: endpoint.id = nextId; michael@0: nextId += 1; michael@0: } michael@0: return endpoint.id; michael@0: };