testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,264 @@
     1.4 +var assert = require('assert');
     1.5 +
     1.6 +var Serializer   = require('./framer').Serializer;
     1.7 +var Deserializer = require('./framer').Deserializer;
     1.8 +var Compressor   = require('./compressor').Compressor;
     1.9 +var Decompressor = require('./compressor').Decompressor;
    1.10 +var Connection   = require('./connection').Connection;
    1.11 +var Duplex       = require('stream').Duplex;
    1.12 +var Transform    = require('stream').Transform;
    1.13 +
    1.14 +exports.Endpoint = Endpoint;
    1.15 +
    1.16 +// The Endpoint class
    1.17 +// ==================
    1.18 +
    1.19 +// Public API
    1.20 +// ----------
    1.21 +
    1.22 +// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
    1.23 +//
    1.24 +//   - `log`: bunyan logger of the parent
    1.25 +//   - `role`: 'CLIENT' or 'SERVER'
    1.26 +//   - `settings`: initial HTTP/2 settings
    1.27 +//   - `filters`: a map of functions that filter the traffic between components (for debugging or
    1.28 +//     intentional failure injection).
    1.29 +//
    1.30 +//     Filter functions get three arguments:
    1.31 +//     1. `frame`: the current frame
    1.32 +//     2. `forward(frame)`: function that can be used to forward a frame to the next component
    1.33 +//     3. `done()`: callback to signal the end of the filter process
    1.34 +//
    1.35 +//     Valid filter names and their position in the stack:
    1.36 +//     - `beforeSerialization`: after compression, before serialization
    1.37 +//     - `beforeCompression`: after multiplexing, before compression
    1.38 +//     - `afterDeserialization`: after deserialization, before decompression
    1.39 +//     - `afterDecompression`: after decompression, before multiplexing
    1.40 +//
    1.41 +// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
    1.42 +//
    1.43 +// * **Event: 'error' (type)**: signals an error
    1.44 +//
    1.45 +// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
    1.46 +//
    1.47 +// * **close([error])**: close the connection with an error code
    1.48 +
    1.49 +// Constructor
    1.50 +// -----------
    1.51 +
    1.52 +// The process of initialization:
    1.53 +function Endpoint(log, role, settings, filters) {
    1.54 +  Duplex.call(this);
    1.55 +
    1.56 +  // * Initializing logging infrastructure
    1.57 +  this._log = log.child({ component: 'endpoint', e: this });
    1.58 +
    1.59 +  // * First part of the handshake process: sending and receiving the client connection header
    1.60 +  //   prelude.
    1.61 +  assert((role === 'CLIENT') || role === 'SERVER');
    1.62 +  if (role === 'CLIENT') {
    1.63 +    this._writePrelude();
    1.64 +  } else {
    1.65 +    this._readPrelude();
    1.66 +  }
    1.67 +
    1.68 +  // * Initialization of component. This includes the second part of the handshake process:
    1.69 +  //   sending the first SETTINGS frame. This is done by the connection class right after
    1.70 +  //   initialization.
    1.71 +  this._initializeDataFlow(role, settings, filters || {});
    1.72 +
    1.73 +  // * Initialization of management code.
    1.74 +  this._initializeManagement();
    1.75 +
    1.76 +  // * Initializing error handling.
    1.77 +  this._initializeErrorHandling();
    1.78 +}
    1.79 +Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
    1.80 +
    1.81 +// Handshake
    1.82 +// ---------
    1.83 +
    1.84 +var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
    1.85 +
    1.86 +// Writing the client header is simple and synchronous.
    1.87 +Endpoint.prototype._writePrelude = function _writePrelude() {
    1.88 +  this._log.debug('Sending the client connection header prelude.');
    1.89 +  this.push(CLIENT_PRELUDE);
    1.90 +};
    1.91 +
    1.92 +// The asynchronous process of reading the client header:
    1.93 +Endpoint.prototype._readPrelude = function _readPrelude() {
    1.94 +  // * progress in the header is tracker using a `cursor`
    1.95 +  var cursor = 0;
    1.96 +
    1.97 +  // * `_write` is temporarily replaced by the comparator function
    1.98 +  this._write = function _temporalWrite(chunk, encoding, done) {
    1.99 +    // * which compares the stored header with the current `chunk` byte by byte and emits the
   1.100 +    //   'error' event if there's a byte that doesn't match
   1.101 +    var offset = cursor;
   1.102 +    while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
   1.103 +      if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
   1.104 +        this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
   1.105 +                        'Client connection header prelude does not match.');
   1.106 +        this._error('handshake', 'PROTOCOL_ERROR');
   1.107 +        return;
   1.108 +      }
   1.109 +      cursor += 1;
   1.110 +    }
   1.111 +
   1.112 +    // * if the whole header is over, and there were no error then restore the original `_write`
   1.113 +    //   and call it with the remaining part of the current chunk
   1.114 +    if (cursor === CLIENT_PRELUDE.length) {
   1.115 +      this._log.debug('Successfully received the client connection header prelude.');
   1.116 +      delete this._write;
   1.117 +      chunk = chunk.slice(cursor - offset);
   1.118 +      this._write(chunk, encoding, done);
   1.119 +    }
   1.120 +  };
   1.121 +};
   1.122 +
   1.123 +// Data flow
   1.124 +// ---------
   1.125 +
   1.126 +//     +---------------------------------------------+
   1.127 +//     |                                             |
   1.128 +//     |   +-------------------------------------+   |
   1.129 +//     |   | +---------+ +---------+ +---------+ |   |
   1.130 +//     |   | | stream1 | | stream2 | |   ...   | |   |
   1.131 +//     |   | +---------+ +---------+ +---------+ |   |
   1.132 +//     |   |             connection              |   |
   1.133 +//     |   +-------------------------------------+   |
   1.134 +//     |             |                 ^             |
   1.135 +//     |        pipe |                 | pipe        |
   1.136 +//     |             v                 |             |
   1.137 +//     |   +------------------+------------------+   |
   1.138 +//     |   |    compressor    |   decompressor   |   |
   1.139 +//     |   +------------------+------------------+   |
   1.140 +//     |             |                 ^             |
   1.141 +//     |        pipe |                 | pipe        |
   1.142 +//     |             v                 |             |
   1.143 +//     |   +------------------+------------------+   |
   1.144 +//     |   |    serializer    |   deserializer   |   |
   1.145 +//     |   +------------------+------------------+   |
   1.146 +//     |             |                 ^             |
   1.147 +//     |     _read() |                 | _write()    |
   1.148 +//     |             v                 |             |
   1.149 +//     |      +------------+     +-----------+       |
   1.150 +//     |      |output queue|     |input queue|       |
   1.151 +//     +------+------------+-----+-----------+-------+
   1.152 +//                   |                 ^
   1.153 +//            read() |                 | write()
   1.154 +//                   v                 |
   1.155 +
   1.156 +function createTransformStream(filter) {
   1.157 +  var transform = new Transform({ objectMode: true });
   1.158 +  var push = transform.push.bind(transform);
   1.159 +  transform._transform = function(frame, encoding, done) {
   1.160 +    filter(frame, push, done);
   1.161 +  };
   1.162 +  return transform;
   1.163 +}
   1.164 +
   1.165 +function pipeAndFilter(stream1, stream2, filter) {
   1.166 +  if (filter) {
   1.167 +    stream1.pipe(createTransformStream(filter)).pipe(stream2);
   1.168 +  } else {
   1.169 +    stream1.pipe(stream2);
   1.170 +  }
   1.171 +}
   1.172 +
   1.173 +var MAX_HTTP_PAYLOAD_SIZE = 16383;
   1.174 +
   1.175 +Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
   1.176 +  var firstStreamId, compressorRole, decompressorRole;
   1.177 +  if (role === 'CLIENT') {
   1.178 +    firstStreamId = 1;
   1.179 +    compressorRole = 'REQUEST';
   1.180 +    decompressorRole = 'RESPONSE';
   1.181 +  } else {
   1.182 +    firstStreamId = 2;
   1.183 +    compressorRole = 'RESPONSE';
   1.184 +    decompressorRole = 'REQUEST';
   1.185 +  }
   1.186 +
   1.187 +  this._serializer   = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
   1.188 +  this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
   1.189 +  this._compressor   = new Compressor(this._log, compressorRole);
   1.190 +  this._decompressor = new Decompressor(this._log, decompressorRole);
   1.191 +  this._connection   = new Connection(this._log, firstStreamId, settings);
   1.192 +
   1.193 +  pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
   1.194 +  pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
   1.195 +  pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
   1.196 +  pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
   1.197 +
   1.198 +  this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
   1.199 +                      this._decompressor.setTableSizeLimit.bind(this._decompressor))
   1.200 +  this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
   1.201 +                      this._compressor.setTableSizeLimit.bind(this._compressor))
   1.202 +};
   1.203 +
   1.204 +var noread = {};
   1.205 +Endpoint.prototype._read = function _read() {
   1.206 +  this._readableState.sync = true;
   1.207 +  var moreNeeded = noread, chunk;
   1.208 +  while (moreNeeded && (chunk = this._serializer.read())) {
   1.209 +    moreNeeded = this.push(chunk);
   1.210 +  }
   1.211 +  if (moreNeeded === noread) {
   1.212 +    this._serializer.once('readable', this._read.bind(this));
   1.213 +  }
   1.214 +  this._readableState.sync = false;
   1.215 +};
   1.216 +
   1.217 +Endpoint.prototype._write = function _write(chunk, encoding, done) {
   1.218 +  this._deserializer.write(chunk, encoding, done);
   1.219 +};
   1.220 +
   1.221 +// Management
   1.222 +// --------------
   1.223 +
   1.224 +Endpoint.prototype._initializeManagement = function _initializeManagement() {
   1.225 +  this._connection.on('stream', this.emit.bind(this, 'stream'));
   1.226 +};
   1.227 +
   1.228 +Endpoint.prototype.createStream = function createStream() {
   1.229 +  return this._connection.createStream();
   1.230 +};
   1.231 +
   1.232 +// Error handling
   1.233 +// --------------
   1.234 +
   1.235 +Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
   1.236 +  this._serializer.on('error', this._error.bind(this, 'serializer'));
   1.237 +  this._deserializer.on('error', this._error.bind(this, 'deserializer'));
   1.238 +  this._compressor.on('error', this._error.bind(this, 'compressor'));
   1.239 +  this._decompressor.on('error', this._error.bind(this, 'decompressor'));
   1.240 +  this._connection.on('error', this._error.bind(this, 'connection'));
   1.241 +
   1.242 +  this._connection.on('peerError', this.emit.bind(this, 'peerError'));
   1.243 +};
   1.244 +
   1.245 +Endpoint.prototype._error = function _error(component, error) {
   1.246 +  this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
   1.247 +  this.close(error);
   1.248 +  setImmediate(this.emit.bind(this, 'error', error));
   1.249 +};
   1.250 +
   1.251 +Endpoint.prototype.close = function close(error) {
   1.252 +  this._connection.close(error);
   1.253 +};
   1.254 +
   1.255 +// Bunyan serializers
   1.256 +// ------------------
   1.257 +
   1.258 +exports.serializers = {};
   1.259 +
   1.260 +var nextId = 0;
   1.261 +exports.serializers.e = function(endpoint) {
   1.262 +  if (!('id' in endpoint)) {
   1.263 +    endpoint.id = nextId;
   1.264 +    nextId += 1;
   1.265 +  }
   1.266 +  return endpoint.id;
   1.267 +};

mercurial