1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,264 @@ 1.4 +var assert = require('assert'); 1.5 + 1.6 +var Serializer = require('./framer').Serializer; 1.7 +var Deserializer = require('./framer').Deserializer; 1.8 +var Compressor = require('./compressor').Compressor; 1.9 +var Decompressor = require('./compressor').Decompressor; 1.10 +var Connection = require('./connection').Connection; 1.11 +var Duplex = require('stream').Duplex; 1.12 +var Transform = require('stream').Transform; 1.13 + 1.14 +exports.Endpoint = Endpoint; 1.15 + 1.16 +// The Endpoint class 1.17 +// ================== 1.18 + 1.19 +// Public API 1.20 +// ---------- 1.21 + 1.22 +// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. 1.23 +// 1.24 +// - `log`: bunyan logger of the parent 1.25 +// - `role`: 'CLIENT' or 'SERVER' 1.26 +// - `settings`: initial HTTP/2 settings 1.27 +// - `filters`: a map of functions that filter the traffic between components (for debugging or 1.28 +// intentional failure injection). 1.29 +// 1.30 +// Filter functions get three arguments: 1.31 +// 1. `frame`: the current frame 1.32 +// 2. `forward(frame)`: function that can be used to forward a frame to the next component 1.33 +// 3. `done()`: callback to signal the end of the filter process 1.34 +// 1.35 +// Valid filter names and their position in the stack: 1.36 +// - `beforeSerialization`: after compression, before serialization 1.37 +// - `beforeCompression`: after multiplexing, before compression 1.38 +// - `afterDeserialization`: after deserialization, before decompression 1.39 +// - `afterDecompression`: after decompression, before multiplexing 1.40 +// 1.41 +// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection 1.42 +// 1.43 +// * **Event: 'error' (type)**: signals an error 1.44 +// 1.45 +// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) 1.46 +// 1.47 +// * **close([error])**: close the connection with an error code 1.48 + 1.49 +// Constructor 1.50 +// ----------- 1.51 + 1.52 +// The process of initialization: 1.53 +function Endpoint(log, role, settings, filters) { 1.54 + Duplex.call(this); 1.55 + 1.56 + // * Initializing logging infrastructure 1.57 + this._log = log.child({ component: 'endpoint', e: this }); 1.58 + 1.59 + // * First part of the handshake process: sending and receiving the client connection header 1.60 + // prelude. 1.61 + assert((role === 'CLIENT') || role === 'SERVER'); 1.62 + if (role === 'CLIENT') { 1.63 + this._writePrelude(); 1.64 + } else { 1.65 + this._readPrelude(); 1.66 + } 1.67 + 1.68 + // * Initialization of component. This includes the second part of the handshake process: 1.69 + // sending the first SETTINGS frame. This is done by the connection class right after 1.70 + // initialization. 1.71 + this._initializeDataFlow(role, settings, filters || {}); 1.72 + 1.73 + // * Initialization of management code. 1.74 + this._initializeManagement(); 1.75 + 1.76 + // * Initializing error handling. 1.77 + this._initializeErrorHandling(); 1.78 +} 1.79 +Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); 1.80 + 1.81 +// Handshake 1.82 +// --------- 1.83 + 1.84 +var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); 1.85 + 1.86 +// Writing the client header is simple and synchronous. 1.87 +Endpoint.prototype._writePrelude = function _writePrelude() { 1.88 + this._log.debug('Sending the client connection header prelude.'); 1.89 + this.push(CLIENT_PRELUDE); 1.90 +}; 1.91 + 1.92 +// The asynchronous process of reading the client header: 1.93 +Endpoint.prototype._readPrelude = function _readPrelude() { 1.94 + // * progress in the header is tracker using a `cursor` 1.95 + var cursor = 0; 1.96 + 1.97 + // * `_write` is temporarily replaced by the comparator function 1.98 + this._write = function _temporalWrite(chunk, encoding, done) { 1.99 + // * which compares the stored header with the current `chunk` byte by byte and emits the 1.100 + // 'error' event if there's a byte that doesn't match 1.101 + var offset = cursor; 1.102 + while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { 1.103 + if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { 1.104 + this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, 1.105 + 'Client connection header prelude does not match.'); 1.106 + this._error('handshake', 'PROTOCOL_ERROR'); 1.107 + return; 1.108 + } 1.109 + cursor += 1; 1.110 + } 1.111 + 1.112 + // * if the whole header is over, and there were no error then restore the original `_write` 1.113 + // and call it with the remaining part of the current chunk 1.114 + if (cursor === CLIENT_PRELUDE.length) { 1.115 + this._log.debug('Successfully received the client connection header prelude.'); 1.116 + delete this._write; 1.117 + chunk = chunk.slice(cursor - offset); 1.118 + this._write(chunk, encoding, done); 1.119 + } 1.120 + }; 1.121 +}; 1.122 + 1.123 +// Data flow 1.124 +// --------- 1.125 + 1.126 +// +---------------------------------------------+ 1.127 +// | | 1.128 +// | +-------------------------------------+ | 1.129 +// | | +---------+ +---------+ +---------+ | | 1.130 +// | | | stream1 | | stream2 | | ... | | | 1.131 +// | | +---------+ +---------+ +---------+ | | 1.132 +// | | connection | | 1.133 +// | +-------------------------------------+ | 1.134 +// | | ^ | 1.135 +// | pipe | | pipe | 1.136 +// | v | | 1.137 +// | +------------------+------------------+ | 1.138 +// | | compressor | decompressor | | 1.139 +// | +------------------+------------------+ | 1.140 +// | | ^ | 1.141 +// | pipe | | pipe | 1.142 +// | v | | 1.143 +// | +------------------+------------------+ | 1.144 +// | | serializer | deserializer | | 1.145 +// | +------------------+------------------+ | 1.146 +// | | ^ | 1.147 +// | _read() | | _write() | 1.148 +// | v | | 1.149 +// | +------------+ +-----------+ | 1.150 +// | |output queue| |input queue| | 1.151 +// +------+------------+-----+-----------+-------+ 1.152 +// | ^ 1.153 +// read() | | write() 1.154 +// v | 1.155 + 1.156 +function createTransformStream(filter) { 1.157 + var transform = new Transform({ objectMode: true }); 1.158 + var push = transform.push.bind(transform); 1.159 + transform._transform = function(frame, encoding, done) { 1.160 + filter(frame, push, done); 1.161 + }; 1.162 + return transform; 1.163 +} 1.164 + 1.165 +function pipeAndFilter(stream1, stream2, filter) { 1.166 + if (filter) { 1.167 + stream1.pipe(createTransformStream(filter)).pipe(stream2); 1.168 + } else { 1.169 + stream1.pipe(stream2); 1.170 + } 1.171 +} 1.172 + 1.173 +var MAX_HTTP_PAYLOAD_SIZE = 16383; 1.174 + 1.175 +Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { 1.176 + var firstStreamId, compressorRole, decompressorRole; 1.177 + if (role === 'CLIENT') { 1.178 + firstStreamId = 1; 1.179 + compressorRole = 'REQUEST'; 1.180 + decompressorRole = 'RESPONSE'; 1.181 + } else { 1.182 + firstStreamId = 2; 1.183 + compressorRole = 'RESPONSE'; 1.184 + decompressorRole = 'REQUEST'; 1.185 + } 1.186 + 1.187 + this._serializer = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE); 1.188 + this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE); 1.189 + this._compressor = new Compressor(this._log, compressorRole); 1.190 + this._decompressor = new Decompressor(this._log, decompressorRole); 1.191 + this._connection = new Connection(this._log, firstStreamId, settings); 1.192 + 1.193 + pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); 1.194 + pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); 1.195 + pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); 1.196 + pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); 1.197 + 1.198 + this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', 1.199 + this._decompressor.setTableSizeLimit.bind(this._decompressor)) 1.200 + this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', 1.201 + this._compressor.setTableSizeLimit.bind(this._compressor)) 1.202 +}; 1.203 + 1.204 +var noread = {}; 1.205 +Endpoint.prototype._read = function _read() { 1.206 + this._readableState.sync = true; 1.207 + var moreNeeded = noread, chunk; 1.208 + while (moreNeeded && (chunk = this._serializer.read())) { 1.209 + moreNeeded = this.push(chunk); 1.210 + } 1.211 + if (moreNeeded === noread) { 1.212 + this._serializer.once('readable', this._read.bind(this)); 1.213 + } 1.214 + this._readableState.sync = false; 1.215 +}; 1.216 + 1.217 +Endpoint.prototype._write = function _write(chunk, encoding, done) { 1.218 + this._deserializer.write(chunk, encoding, done); 1.219 +}; 1.220 + 1.221 +// Management 1.222 +// -------------- 1.223 + 1.224 +Endpoint.prototype._initializeManagement = function _initializeManagement() { 1.225 + this._connection.on('stream', this.emit.bind(this, 'stream')); 1.226 +}; 1.227 + 1.228 +Endpoint.prototype.createStream = function createStream() { 1.229 + return this._connection.createStream(); 1.230 +}; 1.231 + 1.232 +// Error handling 1.233 +// -------------- 1.234 + 1.235 +Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { 1.236 + this._serializer.on('error', this._error.bind(this, 'serializer')); 1.237 + this._deserializer.on('error', this._error.bind(this, 'deserializer')); 1.238 + this._compressor.on('error', this._error.bind(this, 'compressor')); 1.239 + this._decompressor.on('error', this._error.bind(this, 'decompressor')); 1.240 + this._connection.on('error', this._error.bind(this, 'connection')); 1.241 + 1.242 + this._connection.on('peerError', this.emit.bind(this, 'peerError')); 1.243 +}; 1.244 + 1.245 +Endpoint.prototype._error = function _error(component, error) { 1.246 + this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); 1.247 + this.close(error); 1.248 + setImmediate(this.emit.bind(this, 'error', error)); 1.249 +}; 1.250 + 1.251 +Endpoint.prototype.close = function close(error) { 1.252 + this._connection.close(error); 1.253 +}; 1.254 + 1.255 +// Bunyan serializers 1.256 +// ------------------ 1.257 + 1.258 +exports.serializers = {}; 1.259 + 1.260 +var nextId = 0; 1.261 +exports.serializers.e = function(endpoint) { 1.262 + if (!('id' in endpoint)) { 1.263 + endpoint.id = nextId; 1.264 + nextId += 1; 1.265 + } 1.266 + return endpoint.id; 1.267 +};