testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 var assert = require('assert');
     3 var Serializer   = require('./framer').Serializer;
     4 var Deserializer = require('./framer').Deserializer;
     5 var Compressor   = require('./compressor').Compressor;
     6 var Decompressor = require('./compressor').Decompressor;
     7 var Connection   = require('./connection').Connection;
     8 var Duplex       = require('stream').Duplex;
     9 var Transform    = require('stream').Transform;
    11 exports.Endpoint = Endpoint;
    13 // The Endpoint class
    14 // ==================
    16 // Public API
    17 // ----------
    19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
    20 //
    21 //   - `log`: bunyan logger of the parent
    22 //   - `role`: 'CLIENT' or 'SERVER'
    23 //   - `settings`: initial HTTP/2 settings
    24 //   - `filters`: a map of functions that filter the traffic between components (for debugging or
    25 //     intentional failure injection).
    26 //
    27 //     Filter functions get three arguments:
    28 //     1. `frame`: the current frame
    29 //     2. `forward(frame)`: function that can be used to forward a frame to the next component
    30 //     3. `done()`: callback to signal the end of the filter process
    31 //
    32 //     Valid filter names and their position in the stack:
    33 //     - `beforeSerialization`: after compression, before serialization
    34 //     - `beforeCompression`: after multiplexing, before compression
    35 //     - `afterDeserialization`: after deserialization, before decompression
    36 //     - `afterDecompression`: after decompression, before multiplexing
    37 //
    38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
    39 //
    40 // * **Event: 'error' (type)**: signals an error
    41 //
    42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
    43 //
    44 // * **close([error])**: close the connection with an error code
    46 // Constructor
    47 // -----------
    49 // The process of initialization:
    50 function Endpoint(log, role, settings, filters) {
    51   Duplex.call(this);
    53   // * Initializing logging infrastructure
    54   this._log = log.child({ component: 'endpoint', e: this });
    56   // * First part of the handshake process: sending and receiving the client connection header
    57   //   prelude.
    58   assert((role === 'CLIENT') || role === 'SERVER');
    59   if (role === 'CLIENT') {
    60     this._writePrelude();
    61   } else {
    62     this._readPrelude();
    63   }
    65   // * Initialization of component. This includes the second part of the handshake process:
    66   //   sending the first SETTINGS frame. This is done by the connection class right after
    67   //   initialization.
    68   this._initializeDataFlow(role, settings, filters || {});
    70   // * Initialization of management code.
    71   this._initializeManagement();
    73   // * Initializing error handling.
    74   this._initializeErrorHandling();
    75 }
    76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
    78 // Handshake
    79 // ---------
    81 var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
    83 // Writing the client header is simple and synchronous.
    84 Endpoint.prototype._writePrelude = function _writePrelude() {
    85   this._log.debug('Sending the client connection header prelude.');
    86   this.push(CLIENT_PRELUDE);
    87 };
    89 // The asynchronous process of reading the client header:
    90 Endpoint.prototype._readPrelude = function _readPrelude() {
    91   // * progress in the header is tracker using a `cursor`
    92   var cursor = 0;
    94   // * `_write` is temporarily replaced by the comparator function
    95   this._write = function _temporalWrite(chunk, encoding, done) {
    96     // * which compares the stored header with the current `chunk` byte by byte and emits the
    97     //   'error' event if there's a byte that doesn't match
    98     var offset = cursor;
    99     while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
   100       if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
   101         this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
   102                         'Client connection header prelude does not match.');
   103         this._error('handshake', 'PROTOCOL_ERROR');
   104         return;
   105       }
   106       cursor += 1;
   107     }
   109     // * if the whole header is over, and there were no error then restore the original `_write`
   110     //   and call it with the remaining part of the current chunk
   111     if (cursor === CLIENT_PRELUDE.length) {
   112       this._log.debug('Successfully received the client connection header prelude.');
   113       delete this._write;
   114       chunk = chunk.slice(cursor - offset);
   115       this._write(chunk, encoding, done);
   116     }
   117   };
   118 };
   120 // Data flow
   121 // ---------
   123 //     +---------------------------------------------+
   124 //     |                                             |
   125 //     |   +-------------------------------------+   |
   126 //     |   | +---------+ +---------+ +---------+ |   |
   127 //     |   | | stream1 | | stream2 | |   ...   | |   |
   128 //     |   | +---------+ +---------+ +---------+ |   |
   129 //     |   |             connection              |   |
   130 //     |   +-------------------------------------+   |
   131 //     |             |                 ^             |
   132 //     |        pipe |                 | pipe        |
   133 //     |             v                 |             |
   134 //     |   +------------------+------------------+   |
   135 //     |   |    compressor    |   decompressor   |   |
   136 //     |   +------------------+------------------+   |
   137 //     |             |                 ^             |
   138 //     |        pipe |                 | pipe        |
   139 //     |             v                 |             |
   140 //     |   +------------------+------------------+   |
   141 //     |   |    serializer    |   deserializer   |   |
   142 //     |   +------------------+------------------+   |
   143 //     |             |                 ^             |
   144 //     |     _read() |                 | _write()    |
   145 //     |             v                 |             |
   146 //     |      +------------+     +-----------+       |
   147 //     |      |output queue|     |input queue|       |
   148 //     +------+------------+-----+-----------+-------+
   149 //                   |                 ^
   150 //            read() |                 | write()
   151 //                   v                 |
   153 function createTransformStream(filter) {
   154   var transform = new Transform({ objectMode: true });
   155   var push = transform.push.bind(transform);
   156   transform._transform = function(frame, encoding, done) {
   157     filter(frame, push, done);
   158   };
   159   return transform;
   160 }
   162 function pipeAndFilter(stream1, stream2, filter) {
   163   if (filter) {
   164     stream1.pipe(createTransformStream(filter)).pipe(stream2);
   165   } else {
   166     stream1.pipe(stream2);
   167   }
   168 }
   170 var MAX_HTTP_PAYLOAD_SIZE = 16383;
   172 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
   173   var firstStreamId, compressorRole, decompressorRole;
   174   if (role === 'CLIENT') {
   175     firstStreamId = 1;
   176     compressorRole = 'REQUEST';
   177     decompressorRole = 'RESPONSE';
   178   } else {
   179     firstStreamId = 2;
   180     compressorRole = 'RESPONSE';
   181     decompressorRole = 'REQUEST';
   182   }
   184   this._serializer   = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
   185   this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
   186   this._compressor   = new Compressor(this._log, compressorRole);
   187   this._decompressor = new Decompressor(this._log, decompressorRole);
   188   this._connection   = new Connection(this._log, firstStreamId, settings);
   190   pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
   191   pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
   192   pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
   193   pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
   195   this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
   196                       this._decompressor.setTableSizeLimit.bind(this._decompressor))
   197   this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
   198                       this._compressor.setTableSizeLimit.bind(this._compressor))
   199 };
   201 var noread = {};
   202 Endpoint.prototype._read = function _read() {
   203   this._readableState.sync = true;
   204   var moreNeeded = noread, chunk;
   205   while (moreNeeded && (chunk = this._serializer.read())) {
   206     moreNeeded = this.push(chunk);
   207   }
   208   if (moreNeeded === noread) {
   209     this._serializer.once('readable', this._read.bind(this));
   210   }
   211   this._readableState.sync = false;
   212 };
   214 Endpoint.prototype._write = function _write(chunk, encoding, done) {
   215   this._deserializer.write(chunk, encoding, done);
   216 };
   218 // Management
   219 // --------------
   221 Endpoint.prototype._initializeManagement = function _initializeManagement() {
   222   this._connection.on('stream', this.emit.bind(this, 'stream'));
   223 };
   225 Endpoint.prototype.createStream = function createStream() {
   226   return this._connection.createStream();
   227 };
   229 // Error handling
   230 // --------------
   232 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
   233   this._serializer.on('error', this._error.bind(this, 'serializer'));
   234   this._deserializer.on('error', this._error.bind(this, 'deserializer'));
   235   this._compressor.on('error', this._error.bind(this, 'compressor'));
   236   this._decompressor.on('error', this._error.bind(this, 'decompressor'));
   237   this._connection.on('error', this._error.bind(this, 'connection'));
   239   this._connection.on('peerError', this.emit.bind(this, 'peerError'));
   240 };
   242 Endpoint.prototype._error = function _error(component, error) {
   243   this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
   244   this.close(error);
   245   setImmediate(this.emit.bind(this, 'error', error));
   246 };
   248 Endpoint.prototype.close = function close(error) {
   249   this._connection.close(error);
   250 };
   252 // Bunyan serializers
   253 // ------------------
   255 exports.serializers = {};
   257 var nextId = 0;
   258 exports.serializers.e = function(endpoint) {
   259   if (!('id' in endpoint)) {
   260     endpoint.id = nextId;
   261     nextId += 1;
   262   }
   263   return endpoint.id;
   264 };

mercurial