testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 var assert = require('assert');
michael@0 2
michael@0 3 var Serializer = require('./framer').Serializer;
michael@0 4 var Deserializer = require('./framer').Deserializer;
michael@0 5 var Compressor = require('./compressor').Compressor;
michael@0 6 var Decompressor = require('./compressor').Decompressor;
michael@0 7 var Connection = require('./connection').Connection;
michael@0 8 var Duplex = require('stream').Duplex;
michael@0 9 var Transform = require('stream').Transform;
michael@0 10
michael@0 11 exports.Endpoint = Endpoint;
michael@0 12
michael@0 13 // The Endpoint class
michael@0 14 // ==================
michael@0 15
michael@0 16 // Public API
michael@0 17 // ----------
michael@0 18
michael@0 19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
michael@0 20 //
michael@0 21 // - `log`: bunyan logger of the parent
michael@0 22 // - `role`: 'CLIENT' or 'SERVER'
michael@0 23 // - `settings`: initial HTTP/2 settings
michael@0 24 // - `filters`: a map of functions that filter the traffic between components (for debugging or
michael@0 25 // intentional failure injection).
michael@0 26 //
michael@0 27 // Filter functions get three arguments:
michael@0 28 // 1. `frame`: the current frame
michael@0 29 // 2. `forward(frame)`: function that can be used to forward a frame to the next component
michael@0 30 // 3. `done()`: callback to signal the end of the filter process
michael@0 31 //
michael@0 32 // Valid filter names and their position in the stack:
michael@0 33 // - `beforeSerialization`: after compression, before serialization
michael@0 34 // - `beforeCompression`: after multiplexing, before compression
michael@0 35 // - `afterDeserialization`: after deserialization, before decompression
michael@0 36 // - `afterDecompression`: after decompression, before multiplexing
michael@0 37 //
michael@0 38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
michael@0 39 //
michael@0 40 // * **Event: 'error' (type)**: signals an error
michael@0 41 //
michael@0 42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
michael@0 43 //
michael@0 44 // * **close([error])**: close the connection with an error code
michael@0 45
michael@0 46 // Constructor
michael@0 47 // -----------
michael@0 48
michael@0 49 // The process of initialization:
michael@0 50 function Endpoint(log, role, settings, filters) {
michael@0 51 Duplex.call(this);
michael@0 52
michael@0 53 // * Initializing logging infrastructure
michael@0 54 this._log = log.child({ component: 'endpoint', e: this });
michael@0 55
michael@0 56 // * First part of the handshake process: sending and receiving the client connection header
michael@0 57 // prelude.
michael@0 58 assert((role === 'CLIENT') || role === 'SERVER');
michael@0 59 if (role === 'CLIENT') {
michael@0 60 this._writePrelude();
michael@0 61 } else {
michael@0 62 this._readPrelude();
michael@0 63 }
michael@0 64
michael@0 65 // * Initialization of component. This includes the second part of the handshake process:
michael@0 66 // sending the first SETTINGS frame. This is done by the connection class right after
michael@0 67 // initialization.
michael@0 68 this._initializeDataFlow(role, settings, filters || {});
michael@0 69
michael@0 70 // * Initialization of management code.
michael@0 71 this._initializeManagement();
michael@0 72
michael@0 73 // * Initializing error handling.
michael@0 74 this._initializeErrorHandling();
michael@0 75 }
michael@0 76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
michael@0 77
michael@0 78 // Handshake
michael@0 79 // ---------
michael@0 80
michael@0 81 var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
michael@0 82
michael@0 83 // Writing the client header is simple and synchronous.
michael@0 84 Endpoint.prototype._writePrelude = function _writePrelude() {
michael@0 85 this._log.debug('Sending the client connection header prelude.');
michael@0 86 this.push(CLIENT_PRELUDE);
michael@0 87 };
michael@0 88
michael@0 89 // The asynchronous process of reading the client header:
michael@0 90 Endpoint.prototype._readPrelude = function _readPrelude() {
michael@0 91 // * progress in the header is tracker using a `cursor`
michael@0 92 var cursor = 0;
michael@0 93
michael@0 94 // * `_write` is temporarily replaced by the comparator function
michael@0 95 this._write = function _temporalWrite(chunk, encoding, done) {
michael@0 96 // * which compares the stored header with the current `chunk` byte by byte and emits the
michael@0 97 // 'error' event if there's a byte that doesn't match
michael@0 98 var offset = cursor;
michael@0 99 while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
michael@0 100 if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
michael@0 101 this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
michael@0 102 'Client connection header prelude does not match.');
michael@0 103 this._error('handshake', 'PROTOCOL_ERROR');
michael@0 104 return;
michael@0 105 }
michael@0 106 cursor += 1;
michael@0 107 }
michael@0 108
michael@0 109 // * if the whole header is over, and there were no error then restore the original `_write`
michael@0 110 // and call it with the remaining part of the current chunk
michael@0 111 if (cursor === CLIENT_PRELUDE.length) {
michael@0 112 this._log.debug('Successfully received the client connection header prelude.');
michael@0 113 delete this._write;
michael@0 114 chunk = chunk.slice(cursor - offset);
michael@0 115 this._write(chunk, encoding, done);
michael@0 116 }
michael@0 117 };
michael@0 118 };
michael@0 119
michael@0 120 // Data flow
michael@0 121 // ---------
michael@0 122
michael@0 123 // +---------------------------------------------+
michael@0 124 // | |
michael@0 125 // | +-------------------------------------+ |
michael@0 126 // | | +---------+ +---------+ +---------+ | |
michael@0 127 // | | | stream1 | | stream2 | | ... | | |
michael@0 128 // | | +---------+ +---------+ +---------+ | |
michael@0 129 // | | connection | |
michael@0 130 // | +-------------------------------------+ |
michael@0 131 // | | ^ |
michael@0 132 // | pipe | | pipe |
michael@0 133 // | v | |
michael@0 134 // | +------------------+------------------+ |
michael@0 135 // | | compressor | decompressor | |
michael@0 136 // | +------------------+------------------+ |
michael@0 137 // | | ^ |
michael@0 138 // | pipe | | pipe |
michael@0 139 // | v | |
michael@0 140 // | +------------------+------------------+ |
michael@0 141 // | | serializer | deserializer | |
michael@0 142 // | +------------------+------------------+ |
michael@0 143 // | | ^ |
michael@0 144 // | _read() | | _write() |
michael@0 145 // | v | |
michael@0 146 // | +------------+ +-----------+ |
michael@0 147 // | |output queue| |input queue| |
michael@0 148 // +------+------------+-----+-----------+-------+
michael@0 149 // | ^
michael@0 150 // read() | | write()
michael@0 151 // v |
michael@0 152
michael@0 153 function createTransformStream(filter) {
michael@0 154 var transform = new Transform({ objectMode: true });
michael@0 155 var push = transform.push.bind(transform);
michael@0 156 transform._transform = function(frame, encoding, done) {
michael@0 157 filter(frame, push, done);
michael@0 158 };
michael@0 159 return transform;
michael@0 160 }
michael@0 161
michael@0 162 function pipeAndFilter(stream1, stream2, filter) {
michael@0 163 if (filter) {
michael@0 164 stream1.pipe(createTransformStream(filter)).pipe(stream2);
michael@0 165 } else {
michael@0 166 stream1.pipe(stream2);
michael@0 167 }
michael@0 168 }
michael@0 169
michael@0 170 var MAX_HTTP_PAYLOAD_SIZE = 16383;
michael@0 171
michael@0 172 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
michael@0 173 var firstStreamId, compressorRole, decompressorRole;
michael@0 174 if (role === 'CLIENT') {
michael@0 175 firstStreamId = 1;
michael@0 176 compressorRole = 'REQUEST';
michael@0 177 decompressorRole = 'RESPONSE';
michael@0 178 } else {
michael@0 179 firstStreamId = 2;
michael@0 180 compressorRole = 'RESPONSE';
michael@0 181 decompressorRole = 'REQUEST';
michael@0 182 }
michael@0 183
michael@0 184 this._serializer = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
michael@0 185 this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
michael@0 186 this._compressor = new Compressor(this._log, compressorRole);
michael@0 187 this._decompressor = new Decompressor(this._log, decompressorRole);
michael@0 188 this._connection = new Connection(this._log, firstStreamId, settings);
michael@0 189
michael@0 190 pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
michael@0 191 pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
michael@0 192 pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
michael@0 193 pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
michael@0 194
michael@0 195 this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
michael@0 196 this._decompressor.setTableSizeLimit.bind(this._decompressor))
michael@0 197 this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
michael@0 198 this._compressor.setTableSizeLimit.bind(this._compressor))
michael@0 199 };
michael@0 200
michael@0 201 var noread = {};
michael@0 202 Endpoint.prototype._read = function _read() {
michael@0 203 this._readableState.sync = true;
michael@0 204 var moreNeeded = noread, chunk;
michael@0 205 while (moreNeeded && (chunk = this._serializer.read())) {
michael@0 206 moreNeeded = this.push(chunk);
michael@0 207 }
michael@0 208 if (moreNeeded === noread) {
michael@0 209 this._serializer.once('readable', this._read.bind(this));
michael@0 210 }
michael@0 211 this._readableState.sync = false;
michael@0 212 };
michael@0 213
michael@0 214 Endpoint.prototype._write = function _write(chunk, encoding, done) {
michael@0 215 this._deserializer.write(chunk, encoding, done);
michael@0 216 };
michael@0 217
michael@0 218 // Management
michael@0 219 // --------------
michael@0 220
michael@0 221 Endpoint.prototype._initializeManagement = function _initializeManagement() {
michael@0 222 this._connection.on('stream', this.emit.bind(this, 'stream'));
michael@0 223 };
michael@0 224
michael@0 225 Endpoint.prototype.createStream = function createStream() {
michael@0 226 return this._connection.createStream();
michael@0 227 };
michael@0 228
michael@0 229 // Error handling
michael@0 230 // --------------
michael@0 231
michael@0 232 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
michael@0 233 this._serializer.on('error', this._error.bind(this, 'serializer'));
michael@0 234 this._deserializer.on('error', this._error.bind(this, 'deserializer'));
michael@0 235 this._compressor.on('error', this._error.bind(this, 'compressor'));
michael@0 236 this._decompressor.on('error', this._error.bind(this, 'decompressor'));
michael@0 237 this._connection.on('error', this._error.bind(this, 'connection'));
michael@0 238
michael@0 239 this._connection.on('peerError', this.emit.bind(this, 'peerError'));
michael@0 240 };
michael@0 241
michael@0 242 Endpoint.prototype._error = function _error(component, error) {
michael@0 243 this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
michael@0 244 this.close(error);
michael@0 245 setImmediate(this.emit.bind(this, 'error', error));
michael@0 246 };
michael@0 247
michael@0 248 Endpoint.prototype.close = function close(error) {
michael@0 249 this._connection.close(error);
michael@0 250 };
michael@0 251
michael@0 252 // Bunyan serializers
michael@0 253 // ------------------
michael@0 254
michael@0 255 exports.serializers = {};
michael@0 256
michael@0 257 var nextId = 0;
michael@0 258 exports.serializers.e = function(endpoint) {
michael@0 259 if (!('id' in endpoint)) {
michael@0 260 endpoint.id = nextId;
michael@0 261 nextId += 1;
michael@0 262 }
michael@0 263 return endpoint.id;
michael@0 264 };

mercurial