testing/xpcshell/node-http2/node_modules/http2-protocol/lib/endpoint.js

changeset 0
6474c204b198
equal deleted inserted replaced
-1:000000000000 0:3fffe34bbd05
1 var assert = require('assert');
2
3 var Serializer = require('./framer').Serializer;
4 var Deserializer = require('./framer').Deserializer;
5 var Compressor = require('./compressor').Compressor;
6 var Decompressor = require('./compressor').Decompressor;
7 var Connection = require('./connection').Connection;
8 var Duplex = require('stream').Duplex;
9 var Transform = require('stream').Transform;
10
11 exports.Endpoint = Endpoint;
12
13 // The Endpoint class
14 // ==================
15
16 // Public API
17 // ----------
18
19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
20 //
21 // - `log`: bunyan logger of the parent
22 // - `role`: 'CLIENT' or 'SERVER'
23 // - `settings`: initial HTTP/2 settings
24 // - `filters`: a map of functions that filter the traffic between components (for debugging or
25 // intentional failure injection).
26 //
27 // Filter functions get three arguments:
28 // 1. `frame`: the current frame
29 // 2. `forward(frame)`: function that can be used to forward a frame to the next component
30 // 3. `done()`: callback to signal the end of the filter process
31 //
32 // Valid filter names and their position in the stack:
33 // - `beforeSerialization`: after compression, before serialization
34 // - `beforeCompression`: after multiplexing, before compression
35 // - `afterDeserialization`: after deserialization, before decompression
36 // - `afterDecompression`: after decompression, before multiplexing
37 //
38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
39 //
40 // * **Event: 'error' (type)**: signals an error
41 //
42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
43 //
44 // * **close([error])**: close the connection with an error code
45
46 // Constructor
47 // -----------
48
49 // The process of initialization:
50 function Endpoint(log, role, settings, filters) {
51 Duplex.call(this);
52
53 // * Initializing logging infrastructure
54 this._log = log.child({ component: 'endpoint', e: this });
55
56 // * First part of the handshake process: sending and receiving the client connection header
57 // prelude.
58 assert((role === 'CLIENT') || role === 'SERVER');
59 if (role === 'CLIENT') {
60 this._writePrelude();
61 } else {
62 this._readPrelude();
63 }
64
65 // * Initialization of component. This includes the second part of the handshake process:
66 // sending the first SETTINGS frame. This is done by the connection class right after
67 // initialization.
68 this._initializeDataFlow(role, settings, filters || {});
69
70 // * Initialization of management code.
71 this._initializeManagement();
72
73 // * Initializing error handling.
74 this._initializeErrorHandling();
75 }
76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
77
78 // Handshake
79 // ---------
80
81 var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
82
83 // Writing the client header is simple and synchronous.
84 Endpoint.prototype._writePrelude = function _writePrelude() {
85 this._log.debug('Sending the client connection header prelude.');
86 this.push(CLIENT_PRELUDE);
87 };
88
89 // The asynchronous process of reading the client header:
90 Endpoint.prototype._readPrelude = function _readPrelude() {
91 // * progress in the header is tracker using a `cursor`
92 var cursor = 0;
93
94 // * `_write` is temporarily replaced by the comparator function
95 this._write = function _temporalWrite(chunk, encoding, done) {
96 // * which compares the stored header with the current `chunk` byte by byte and emits the
97 // 'error' event if there's a byte that doesn't match
98 var offset = cursor;
99 while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
100 if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
101 this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
102 'Client connection header prelude does not match.');
103 this._error('handshake', 'PROTOCOL_ERROR');
104 return;
105 }
106 cursor += 1;
107 }
108
109 // * if the whole header is over, and there were no error then restore the original `_write`
110 // and call it with the remaining part of the current chunk
111 if (cursor === CLIENT_PRELUDE.length) {
112 this._log.debug('Successfully received the client connection header prelude.');
113 delete this._write;
114 chunk = chunk.slice(cursor - offset);
115 this._write(chunk, encoding, done);
116 }
117 };
118 };
119
120 // Data flow
121 // ---------
122
123 // +---------------------------------------------+
124 // | |
125 // | +-------------------------------------+ |
126 // | | +---------+ +---------+ +---------+ | |
127 // | | | stream1 | | stream2 | | ... | | |
128 // | | +---------+ +---------+ +---------+ | |
129 // | | connection | |
130 // | +-------------------------------------+ |
131 // | | ^ |
132 // | pipe | | pipe |
133 // | v | |
134 // | +------------------+------------------+ |
135 // | | compressor | decompressor | |
136 // | +------------------+------------------+ |
137 // | | ^ |
138 // | pipe | | pipe |
139 // | v | |
140 // | +------------------+------------------+ |
141 // | | serializer | deserializer | |
142 // | +------------------+------------------+ |
143 // | | ^ |
144 // | _read() | | _write() |
145 // | v | |
146 // | +------------+ +-----------+ |
147 // | |output queue| |input queue| |
148 // +------+------------+-----+-----------+-------+
149 // | ^
150 // read() | | write()
151 // v |
152
153 function createTransformStream(filter) {
154 var transform = new Transform({ objectMode: true });
155 var push = transform.push.bind(transform);
156 transform._transform = function(frame, encoding, done) {
157 filter(frame, push, done);
158 };
159 return transform;
160 }
161
162 function pipeAndFilter(stream1, stream2, filter) {
163 if (filter) {
164 stream1.pipe(createTransformStream(filter)).pipe(stream2);
165 } else {
166 stream1.pipe(stream2);
167 }
168 }
169
170 var MAX_HTTP_PAYLOAD_SIZE = 16383;
171
172 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
173 var firstStreamId, compressorRole, decompressorRole;
174 if (role === 'CLIENT') {
175 firstStreamId = 1;
176 compressorRole = 'REQUEST';
177 decompressorRole = 'RESPONSE';
178 } else {
179 firstStreamId = 2;
180 compressorRole = 'RESPONSE';
181 decompressorRole = 'REQUEST';
182 }
183
184 this._serializer = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
185 this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE);
186 this._compressor = new Compressor(this._log, compressorRole);
187 this._decompressor = new Decompressor(this._log, decompressorRole);
188 this._connection = new Connection(this._log, firstStreamId, settings);
189
190 pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
191 pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
192 pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
193 pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
194
195 this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
196 this._decompressor.setTableSizeLimit.bind(this._decompressor))
197 this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
198 this._compressor.setTableSizeLimit.bind(this._compressor))
199 };
200
201 var noread = {};
202 Endpoint.prototype._read = function _read() {
203 this._readableState.sync = true;
204 var moreNeeded = noread, chunk;
205 while (moreNeeded && (chunk = this._serializer.read())) {
206 moreNeeded = this.push(chunk);
207 }
208 if (moreNeeded === noread) {
209 this._serializer.once('readable', this._read.bind(this));
210 }
211 this._readableState.sync = false;
212 };
213
214 Endpoint.prototype._write = function _write(chunk, encoding, done) {
215 this._deserializer.write(chunk, encoding, done);
216 };
217
218 // Management
219 // --------------
220
221 Endpoint.prototype._initializeManagement = function _initializeManagement() {
222 this._connection.on('stream', this.emit.bind(this, 'stream'));
223 };
224
225 Endpoint.prototype.createStream = function createStream() {
226 return this._connection.createStream();
227 };
228
229 // Error handling
230 // --------------
231
232 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
233 this._serializer.on('error', this._error.bind(this, 'serializer'));
234 this._deserializer.on('error', this._error.bind(this, 'deserializer'));
235 this._compressor.on('error', this._error.bind(this, 'compressor'));
236 this._decompressor.on('error', this._error.bind(this, 'decompressor'));
237 this._connection.on('error', this._error.bind(this, 'connection'));
238
239 this._connection.on('peerError', this.emit.bind(this, 'peerError'));
240 };
241
242 Endpoint.prototype._error = function _error(component, error) {
243 this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
244 this.close(error);
245 setImmediate(this.emit.bind(this, 'error', error));
246 };
247
248 Endpoint.prototype.close = function close(error) {
249 this._connection.close(error);
250 };
251
252 // Bunyan serializers
253 // ------------------
254
255 exports.serializers = {};
256
257 var nextId = 0;
258 exports.serializers.e = function(endpoint) {
259 if (!('id' in endpoint)) {
260 endpoint.id = nextId;
261 nextId += 1;
262 }
263 return endpoint.id;
264 };

mercurial