Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 var assert = require('assert');
3 // The Connection class
4 // ====================
6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a
8 // [Flow](flow.html) subclass.
10 var Flow = require('./flow').Flow;
12 exports.Connection = Connection;
14 // Public API
15 // ----------
17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection
18 //
19 // * **Event: 'error' (type)**: signals a connection level error made by the other end
20 //
21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
22 // code other than NO_ERROR
23 //
24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream
25 //
26 // * **createStream(): stream**: initiate a new stream
27 //
28 // * **set(settings, callback)**: change the value of one or more settings according to the
29 // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
30 //
31 // * **ping([callback])**: send a ping and call callback when the answer arrives
32 //
33 // * **close([error])**: close the stream with an error code
35 // Constructor
36 // -----------
38 // The main aspects of managing the connection are:
39 function Connection(log, firstStreamId, settings) {
40 // * initializing the base class
41 Flow.call(this, 0);
43 // * logging: every method uses the common logger object
44 this._log = log.child({ component: 'connection' });
46 // * stream management
47 this._initializeStreamManagement(firstStreamId);
49 // * lifecycle management
50 this._initializeLifecycleManagement();
52 // * flow control
53 this._initializeFlowControl();
55 // * settings management
56 this._initializeSettingsManagement(settings);
58 // * multiplexing
59 this._initializeMultiplexing();
60 }
61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
63 // Overview
64 // --------
66 // | ^ | ^
67 // v | v |
68 // +--------------+ +--------------+
69 // +---| stream1 |---| stream2 |---- .... ---+
70 // | | +----------+ | | +----------+ | |
71 // | | | stream1. | | | | stream2. | | |
72 // | +-| upstream |-+ +-| upstream |-+ |
73 // | +----------+ +----------+ |
74 // | | ^ | ^ |
75 // | v | v | |
76 // | +-----+-------------+-----+-------- .... |
77 // | ^ | | | |
78 // | | v | | |
79 // | +--------------+ | | |
80 // | | stream0 | | | |
81 // | | connection | | | |
82 // | | management | multiplexing |
83 // | +--------------+ flow control |
84 // | | ^ |
85 // | _read() | | _write() |
86 // | v | |
87 // | +------------+ +-----------+ |
88 // | |output queue| |input queue| |
89 // +----------------+------------+-+-----------+-----------------+
90 // | ^
91 // read() | | write()
92 // v |
94 // Stream management
95 // -----------------
97 var Stream = require('./stream').Stream;
99 // Initialization:
100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
101 // * streams are stored in two data structures:
102 // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
103 // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
104 this._streamIds = [];
105 this._streamPriorities = [];
107 // * The next outbound stream ID and the last inbound stream id
108 this._nextStreamId = firstStreamId;
109 this._lastIncomingStream = 0;
111 // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
112 this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
114 // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
115 // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
116 this._streamSlotsFree = Infinity;
117 this._streamLimit = Infinity;
118 this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
119 };
121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
122 // broadcasts the message by creating an event on it.
123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
124 if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
125 (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE')) {
126 this._log.debug({ frame: frame }, 'Receiving connection level frame');
127 this.emit(frame.type, frame);
128 } else {
129 this._log.error({ frame: frame }, 'Invalid connection level frame');
130 this.emit('error', 'PROTOCOL_ERROR');
131 }
132 };
134 // Methods to manage the stream slot pool:
135 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
136 var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
137 this._streamSlotsFree += newStreamLimit - this._streamLimit;
138 this._streamLimit = newStreamLimit;
139 if (wakeup) {
140 this.emit('wakeup');
141 }
142 };
144 Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
145 if (change) {
146 this._log.trace({ free: this._streamSlotsFree, change: change },
147 'Changing active stream count.');
148 var wakeup = (this._streamSlotsFree === 0) && (change < 0);
149 this._streamSlotsFree -= change;
150 if (wakeup) {
151 this.emit('wakeup');
152 }
153 }
154 };
156 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
157 // an outbound stream) consists of three steps:
158 //
159 // 1. var stream = new Stream(this._log);
160 // 2. this._allocateId(stream, id);
161 // 2. this._allocatePriority(stream);
163 // Allocating an ID to a stream
164 Connection.prototype._allocateId = function _allocateId(stream, id) {
165 // * initiated stream without definite ID
166 if (id === undefined) {
167 id = this._nextStreamId;
168 this._nextStreamId += 2;
169 }
171 // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
172 else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
173 this._lastIncomingStream = id;
174 }
176 // * incoming stream with invalid ID
177 else {
178 this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
179 'Invalid incoming stream ID.');
180 this.emit('error', 'PROTOCOL_ERROR');
181 return undefined;
182 }
184 assert(!(id in this._streamIds));
186 // * adding to `this._streamIds`
187 this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
188 this._streamIds[id] = stream;
189 stream.id = id;
190 this.emit('new_stream', stream, id);
192 // * handling stream errors as connection errors
193 stream.on('error', this.emit.bind(this, 'error'));
195 return id;
196 };
198 // Allocating a priority to a stream, and managing priority changes
199 Connection.prototype._allocatePriority = function _allocatePriority(stream) {
200 this._log.trace({ s: stream }, 'Allocating priority for stream.');
201 this._insert(stream, stream._priority);
202 stream.on('priority', this._reprioritize.bind(this, stream));
203 stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
204 this.emit('wakeup');
205 };
207 Connection.prototype._insert = function _insert(stream, priority) {
208 if (priority in this._streamPriorities) {
209 this._streamPriorities[priority].push(stream);
210 } else {
211 this._streamPriorities[priority] = [stream];
212 }
213 };
215 Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
216 var bucket = this._streamPriorities[stream._priority];
217 var index = bucket.indexOf(stream);
218 assert(index !== -1);
219 bucket.splice(index, 1);
220 if (bucket.length === 0) {
221 delete this._streamPriorities[stream._priority];
222 }
224 this._insert(stream, priority);
225 };
227 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
228 // a previously nonexistent stream.
229 Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
230 this._log.debug({ stream_id: id }, 'New incoming stream.');
232 var stream = new Stream(this._log);
233 this._allocateId(stream, id);
234 this._allocatePriority(stream);
235 this.emit('stream', stream, id);
237 return stream;
238 };
240 // Creating an *outbound* stream
241 Connection.prototype.createStream = function createStream() {
242 this._log.trace('Creating new outbound stream.');
244 // * Receiving is enabled immediately, and an ID gets assigned to the stream
245 var stream = new Stream(this._log);
246 this._allocatePriority(stream);
248 return stream;
249 };
251 // Multiplexing
252 // ------------
254 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
255 this.on('window_update', this.emit.bind(this, 'wakeup'));
256 this._sendScheduled = false;
257 this._firstFrameReceived = false;
258 };
260 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
261 // by child classes. It reads frames from streams and pushes them to the output buffer.
262 Connection.prototype._send = function _send(immediate) {
263 // * Do not do anything if the connection is already closed
264 if (this._closed) {
265 return;
266 }
268 // * Collapsing multiple calls in a turn into a single deferred call
269 if (immediate) {
270 this._sendScheduled = false;
271 } else {
272 if (!this._sendScheduled) {
273 this._sendScheduled = true;
274 setImmediate(this._send.bind(this, true));
275 }
276 return;
277 }
279 this._log.trace('Starting forwarding frames from streams.');
281 // * Looping through priority `bucket`s in priority order.
282 priority_loop:
283 for (var priority in this._streamPriorities) {
284 var bucket = this._streamPriorities[priority];
285 var nextBucket = [];
287 // * Forwarding frames from buckets with round-robin scheduling.
288 // 1. pulling out frame
289 // 2. if there's no frame, skip this stream
290 // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
291 // this stream
292 // 4. adding stream to the bucket of the next round
293 // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
294 // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
295 // 7. forwarding the frame, changing `streamCount` as appropriate
296 // 8. stepping to the next stream if there's still more frame needed in the output buffer
297 // 9. switching to the bucket of the next round
298 while (bucket.length > 0) {
299 for (var index = 0; index < bucket.length; index++) {
300 var stream = bucket[index];
301 var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
303 if (!frame) {
304 continue;
305 } else if (frame.count_change > this._streamSlotsFree) {
306 stream.upstream.unshift(frame);
307 continue;
308 }
310 nextBucket.push(stream);
312 if (frame.stream === undefined) {
313 frame.stream = stream.id || this._allocateId(stream);
314 }
316 if (frame.type === 'PUSH_PROMISE') {
317 this._allocatePriority(frame.promised_stream);
318 frame.promised_stream = this._allocateId(frame.promised_stream);
319 }
321 this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
322 var moreNeeded = this.push(frame);
323 this._changeStreamCount(frame.count_change);
325 assert(moreNeeded !== null); // The frame shouldn't be unforwarded
326 if (moreNeeded === false) {
327 break priority_loop;
328 }
329 }
331 bucket = nextBucket;
332 nextBucket = [];
333 }
334 }
336 // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
337 if (moreNeeded === undefined) {
338 this.once('wakeup', this._send.bind(this));
339 }
341 this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
342 };
344 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
345 // implemented by child classes. It forwards the given frame to the appropriate stream:
346 Connection.prototype._receive = function _receive(frame, done) {
347 this._log.trace({ frame: frame }, 'Forwarding incoming frame');
349 // * first frame needs to be checked by the `_onFirstFrameReceived` method
350 if (!this._firstFrameReceived) {
351 this._firstFrameReceived = true;
352 this._onFirstFrameReceived(frame);
353 }
355 // * gets the appropriate stream from the stream registry
356 var stream = this._streamIds[frame.stream];
358 // * or creates one if it's not in `this.streams`
359 if (!stream) {
360 stream = this._createIncomingStream(frame.stream);
361 }
363 // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
364 if (frame.type === 'PUSH_PROMISE') {
365 frame.promised_stream = this._createIncomingStream(frame.promised_stream);
366 }
368 frame.count_change = this._changeStreamCount.bind(this);
370 // * and writes it to the `stream`'s `upstream`
371 stream.upstream.write(frame);
373 done();
374 };
376 // Settings management
377 // -------------------
379 var defaultSettings = {
380 };
382 // Settings management initialization:
383 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
384 // * Setting up the callback queue for setting acknowledgements
385 this._settingsAckCallbacks = [];
387 // * Sending the initial settings.
388 this._log.debug({ settings: settings },
389 'Sending the first SETTINGS frame as part of the connection header.');
390 this.set(settings || defaultSettings);
392 // * Forwarding SETTINGS frames to the `_receiveSettings` method
393 this.on('SETTINGS', this._receiveSettings);
394 };
396 // * Checking that the first frame the other endpoint sends is SETTINGS
397 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
398 if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
399 this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
400 } else {
401 this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
402 this.emit('error');
403 }
404 };
406 // Handling of incoming SETTINGS frames.
407 Connection.prototype._receiveSettings = function _receiveSettings(frame) {
408 // * If it's an ACK, call the appropriate callback
409 if (frame.flags.ACK) {
410 var callback = this._settingsAckCallbacks.shift();
411 if (callback) {
412 callback();
413 }
414 }
416 // * If it's a setting change request, then send an ACK and change the appropriate settings
417 else {
418 if (!this._closed) {
419 this.push({
420 type: 'SETTINGS',
421 flags: { ACK: true },
422 stream: 0,
423 settings: {}
424 });
425 }
426 for (var name in frame.settings) {
427 this.emit('RECEIVING_' + name, frame.settings[name]);
428 }
429 }
430 };
432 // Changing one or more settings value and sending out a SETTINGS frame
433 Connection.prototype.set = function set(settings, callback) {
434 // * Calling the callback and emitting event when the change is acknowledges
435 callback = callback || function noop() {};
436 var self = this;
437 this._settingsAckCallbacks.push(function() {
438 for (var name in settings) {
439 self.emit('ACKNOWLEDGED_' + name, settings[name]);
440 }
441 callback();
442 });
444 // * Sending out the SETTINGS frame
445 this.push({
446 type: 'SETTINGS',
447 flags: { ACK: false },
448 stream: 0,
449 settings: settings
450 });
451 for (var name in settings) {
452 this.emit('SENDING_' + name, settings[name]);
453 }
454 };
456 // Lifecycle management
457 // --------------------
459 // The main responsibilities of lifecycle management code:
460 //
461 // * keeping the connection alive by
462 // * sending PINGs when the connection is idle
463 // * answering PINGs
464 // * ending the connection
466 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
467 this._pings = {};
468 this.on('PING', this._receivePing);
469 this.on('GOAWAY', this._receiveGoaway);
470 this._closed = false;
471 };
473 // Generating a string of length 16 with random hexadecimal digits
474 Connection.prototype._generatePingId = function _generatePingId() {
475 do {
476 var id = '';
477 for (var i = 0; i < 16; i++) {
478 id += Math.floor(Math.random()*16).toString(16);
479 }
480 } while(id in this._pings);
481 return id;
482 };
484 // Sending a ping and calling `callback` when the answer arrives
485 Connection.prototype.ping = function ping(callback) {
486 var id = this._generatePingId();
487 var data = new Buffer(id, 'hex');
488 this._pings[id] = callback;
490 this._log.debug({ data: data }, 'Sending PING.');
491 this.push({
492 type: 'PING',
493 flags: {
494 ACK: false
495 },
496 stream: 0,
497 data: data
498 });
499 };
501 // Answering pings
502 Connection.prototype._receivePing = function _receivePing(frame) {
503 if (frame.flags.ACK) {
504 var id = frame.data.toString('hex');
505 if (id in this._pings) {
506 this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
507 var callback = this._pings[id];
508 if (callback) {
509 callback();
510 }
511 delete this._pings[id];
512 } else {
513 this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
514 }
516 } else {
517 this._log.debug({ data: frame.data }, 'Answering PING.');
518 this.push({
519 type: 'PING',
520 flags: {
521 ACK: true
522 },
523 stream: 0,
524 data: frame.data
525 });
526 }
527 };
529 // Terminating the connection
530 Connection.prototype.close = function close(error) {
531 if (this._closed) {
532 this._log.warn('Trying to close an already closed connection');
533 return;
534 }
536 this._log.debug({ error: error }, 'Closing the connection');
537 this.push({
538 type: 'GOAWAY',
539 flags: {},
540 stream: 0,
541 last_stream: this._lastIncomingStream,
542 error: error || 'NO_ERROR'
543 });
544 this.push(null);
545 this._closed = true;
546 };
548 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
549 this._log.debug({ error: frame.error }, 'Other end closed the connection');
550 this.push(null);
551 this._closed = true;
552 if (frame.error !== 'NO_ERROR') {
553 this.emit('peerError', frame.error);
554 }
555 };
557 // Flow control
558 // ------------
560 Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
561 // Handling of initial window size of individual streams.
562 this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
563 this.on('new_stream', function(stream) {
564 stream.upstream.setInitialWindow(this._initialStreamWindowSize);
565 });
566 this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
567 this._streamIds[0].upstream.setInitialWindow = function noop() {};
568 };
570 // The initial connection flow control window is 65535 bytes.
571 var INITIAL_STREAM_WINDOW_SIZE = 65535;
573 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
574 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
575 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
576 // the difference between the new value and the old value.
577 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
578 if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
579 this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
580 this.emit('error', 'FLOW_CONTROL_ERROR');
581 } else {
582 this._log.debug({ size: size }, 'Changing stream initial window size.');
583 this._initialStreamWindowSize = size;
584 this._streamIds.forEach(function(stream) {
585 stream.upstream.setInitialWindow(size);
586 });
587 }
588 };