|
1 var assert = require('assert'); |
|
2 |
|
3 var Serializer = require('./framer').Serializer; |
|
4 var Deserializer = require('./framer').Deserializer; |
|
5 var Compressor = require('./compressor').Compressor; |
|
6 var Decompressor = require('./compressor').Decompressor; |
|
7 var Connection = require('./connection').Connection; |
|
8 var Duplex = require('stream').Duplex; |
|
9 var Transform = require('stream').Transform; |
|
10 |
|
11 exports.Endpoint = Endpoint; |
|
12 |
|
13 // The Endpoint class |
|
14 // ================== |
|
15 |
|
16 // Public API |
|
17 // ---------- |
|
18 |
|
19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. |
|
20 // |
|
21 // - `log`: bunyan logger of the parent |
|
22 // - `role`: 'CLIENT' or 'SERVER' |
|
23 // - `settings`: initial HTTP/2 settings |
|
24 // - `filters`: a map of functions that filter the traffic between components (for debugging or |
|
25 // intentional failure injection). |
|
26 // |
|
27 // Filter functions get three arguments: |
|
28 // 1. `frame`: the current frame |
|
29 // 2. `forward(frame)`: function that can be used to forward a frame to the next component |
|
30 // 3. `done()`: callback to signal the end of the filter process |
|
31 // |
|
32 // Valid filter names and their position in the stack: |
|
33 // - `beforeSerialization`: after compression, before serialization |
|
34 // - `beforeCompression`: after multiplexing, before compression |
|
35 // - `afterDeserialization`: after deserialization, before decompression |
|
36 // - `afterDecompression`: after decompression, before multiplexing |
|
37 // |
|
38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection |
|
39 // |
|
40 // * **Event: 'error' (type)**: signals an error |
|
41 // |
|
42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) |
|
43 // |
|
44 // * **close([error])**: close the connection with an error code |
|
45 |
|
46 // Constructor |
|
47 // ----------- |
|
48 |
|
49 // The process of initialization: |
|
50 function Endpoint(log, role, settings, filters) { |
|
51 Duplex.call(this); |
|
52 |
|
53 // * Initializing logging infrastructure |
|
54 this._log = log.child({ component: 'endpoint', e: this }); |
|
55 |
|
56 // * First part of the handshake process: sending and receiving the client connection header |
|
57 // prelude. |
|
58 assert((role === 'CLIENT') || role === 'SERVER'); |
|
59 if (role === 'CLIENT') { |
|
60 this._writePrelude(); |
|
61 } else { |
|
62 this._readPrelude(); |
|
63 } |
|
64 |
|
65 // * Initialization of component. This includes the second part of the handshake process: |
|
66 // sending the first SETTINGS frame. This is done by the connection class right after |
|
67 // initialization. |
|
68 this._initializeDataFlow(role, settings, filters || {}); |
|
69 |
|
70 // * Initialization of management code. |
|
71 this._initializeManagement(); |
|
72 |
|
73 // * Initializing error handling. |
|
74 this._initializeErrorHandling(); |
|
75 } |
|
76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); |
|
77 |
|
78 // Handshake |
|
79 // --------- |
|
80 |
|
81 var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); |
|
82 |
|
83 // Writing the client header is simple and synchronous. |
|
84 Endpoint.prototype._writePrelude = function _writePrelude() { |
|
85 this._log.debug('Sending the client connection header prelude.'); |
|
86 this.push(CLIENT_PRELUDE); |
|
87 }; |
|
88 |
|
89 // The asynchronous process of reading the client header: |
|
90 Endpoint.prototype._readPrelude = function _readPrelude() { |
|
91 // * progress in the header is tracker using a `cursor` |
|
92 var cursor = 0; |
|
93 |
|
94 // * `_write` is temporarily replaced by the comparator function |
|
95 this._write = function _temporalWrite(chunk, encoding, done) { |
|
96 // * which compares the stored header with the current `chunk` byte by byte and emits the |
|
97 // 'error' event if there's a byte that doesn't match |
|
98 var offset = cursor; |
|
99 while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { |
|
100 if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { |
|
101 this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, |
|
102 'Client connection header prelude does not match.'); |
|
103 this._error('handshake', 'PROTOCOL_ERROR'); |
|
104 return; |
|
105 } |
|
106 cursor += 1; |
|
107 } |
|
108 |
|
109 // * if the whole header is over, and there were no error then restore the original `_write` |
|
110 // and call it with the remaining part of the current chunk |
|
111 if (cursor === CLIENT_PRELUDE.length) { |
|
112 this._log.debug('Successfully received the client connection header prelude.'); |
|
113 delete this._write; |
|
114 chunk = chunk.slice(cursor - offset); |
|
115 this._write(chunk, encoding, done); |
|
116 } |
|
117 }; |
|
118 }; |
|
119 |
|
120 // Data flow |
|
121 // --------- |
|
122 |
|
123 // +---------------------------------------------+ |
|
124 // | | |
|
125 // | +-------------------------------------+ | |
|
126 // | | +---------+ +---------+ +---------+ | | |
|
127 // | | | stream1 | | stream2 | | ... | | | |
|
128 // | | +---------+ +---------+ +---------+ | | |
|
129 // | | connection | | |
|
130 // | +-------------------------------------+ | |
|
131 // | | ^ | |
|
132 // | pipe | | pipe | |
|
133 // | v | | |
|
134 // | +------------------+------------------+ | |
|
135 // | | compressor | decompressor | | |
|
136 // | +------------------+------------------+ | |
|
137 // | | ^ | |
|
138 // | pipe | | pipe | |
|
139 // | v | | |
|
140 // | +------------------+------------------+ | |
|
141 // | | serializer | deserializer | | |
|
142 // | +------------------+------------------+ | |
|
143 // | | ^ | |
|
144 // | _read() | | _write() | |
|
145 // | v | | |
|
146 // | +------------+ +-----------+ | |
|
147 // | |output queue| |input queue| | |
|
148 // +------+------------+-----+-----------+-------+ |
|
149 // | ^ |
|
150 // read() | | write() |
|
151 // v | |
|
152 |
|
153 function createTransformStream(filter) { |
|
154 var transform = new Transform({ objectMode: true }); |
|
155 var push = transform.push.bind(transform); |
|
156 transform._transform = function(frame, encoding, done) { |
|
157 filter(frame, push, done); |
|
158 }; |
|
159 return transform; |
|
160 } |
|
161 |
|
162 function pipeAndFilter(stream1, stream2, filter) { |
|
163 if (filter) { |
|
164 stream1.pipe(createTransformStream(filter)).pipe(stream2); |
|
165 } else { |
|
166 stream1.pipe(stream2); |
|
167 } |
|
168 } |
|
169 |
|
170 var MAX_HTTP_PAYLOAD_SIZE = 16383; |
|
171 |
|
172 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { |
|
173 var firstStreamId, compressorRole, decompressorRole; |
|
174 if (role === 'CLIENT') { |
|
175 firstStreamId = 1; |
|
176 compressorRole = 'REQUEST'; |
|
177 decompressorRole = 'RESPONSE'; |
|
178 } else { |
|
179 firstStreamId = 2; |
|
180 compressorRole = 'RESPONSE'; |
|
181 decompressorRole = 'REQUEST'; |
|
182 } |
|
183 |
|
184 this._serializer = new Serializer(this._log, MAX_HTTP_PAYLOAD_SIZE); |
|
185 this._deserializer = new Deserializer(this._log, MAX_HTTP_PAYLOAD_SIZE); |
|
186 this._compressor = new Compressor(this._log, compressorRole); |
|
187 this._decompressor = new Decompressor(this._log, decompressorRole); |
|
188 this._connection = new Connection(this._log, firstStreamId, settings); |
|
189 |
|
190 pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); |
|
191 pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); |
|
192 pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); |
|
193 pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); |
|
194 |
|
195 this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', |
|
196 this._decompressor.setTableSizeLimit.bind(this._decompressor)) |
|
197 this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', |
|
198 this._compressor.setTableSizeLimit.bind(this._compressor)) |
|
199 }; |
|
200 |
|
201 var noread = {}; |
|
202 Endpoint.prototype._read = function _read() { |
|
203 this._readableState.sync = true; |
|
204 var moreNeeded = noread, chunk; |
|
205 while (moreNeeded && (chunk = this._serializer.read())) { |
|
206 moreNeeded = this.push(chunk); |
|
207 } |
|
208 if (moreNeeded === noread) { |
|
209 this._serializer.once('readable', this._read.bind(this)); |
|
210 } |
|
211 this._readableState.sync = false; |
|
212 }; |
|
213 |
|
214 Endpoint.prototype._write = function _write(chunk, encoding, done) { |
|
215 this._deserializer.write(chunk, encoding, done); |
|
216 }; |
|
217 |
|
218 // Management |
|
219 // -------------- |
|
220 |
|
221 Endpoint.prototype._initializeManagement = function _initializeManagement() { |
|
222 this._connection.on('stream', this.emit.bind(this, 'stream')); |
|
223 }; |
|
224 |
|
225 Endpoint.prototype.createStream = function createStream() { |
|
226 return this._connection.createStream(); |
|
227 }; |
|
228 |
|
229 // Error handling |
|
230 // -------------- |
|
231 |
|
232 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { |
|
233 this._serializer.on('error', this._error.bind(this, 'serializer')); |
|
234 this._deserializer.on('error', this._error.bind(this, 'deserializer')); |
|
235 this._compressor.on('error', this._error.bind(this, 'compressor')); |
|
236 this._decompressor.on('error', this._error.bind(this, 'decompressor')); |
|
237 this._connection.on('error', this._error.bind(this, 'connection')); |
|
238 |
|
239 this._connection.on('peerError', this.emit.bind(this, 'peerError')); |
|
240 }; |
|
241 |
|
242 Endpoint.prototype._error = function _error(component, error) { |
|
243 this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); |
|
244 this.close(error); |
|
245 setImmediate(this.emit.bind(this, 'error', error)); |
|
246 }; |
|
247 |
|
248 Endpoint.prototype.close = function close(error) { |
|
249 this._connection.close(error); |
|
250 }; |
|
251 |
|
252 // Bunyan serializers |
|
253 // ------------------ |
|
254 |
|
255 exports.serializers = {}; |
|
256 |
|
257 var nextId = 0; |
|
258 exports.serializers.e = function(endpoint) { |
|
259 if (!('id' in endpoint)) { |
|
260 endpoint.id = nextId; |
|
261 nextId += 1; |
|
262 } |
|
263 return endpoint.id; |
|
264 }; |