|
1 var assert = require('assert'); |
|
2 |
|
3 // The Stream class |
|
4 // ================ |
|
5 |
|
6 // Stream is a [Duplex stream](http://nodejs.org/api/stream.html#stream_class_stream_duplex) |
|
7 // subclass that implements the [HTTP/2 Stream](http://http2.github.io/http2-spec/#rfc.section.3.4) |
|
8 // concept. It has two 'sides': one that is used by the user to send/receive data (the `stream` |
|
9 // object itself) and one that is used by a Connection to read/write frames to/from the other peer |
|
10 // (`stream.upstream`). |
|
11 |
|
12 var Duplex = require('stream').Duplex; |
|
13 |
|
14 exports.Stream = Stream; |
|
15 |
|
16 // Public API |
|
17 // ---------- |
|
18 |
|
19 // * **new Stream(log)**: create a new Stream |
|
20 // |
|
21 // * **Event: 'headers' (headers)**: signals incoming headers |
|
22 // |
|
23 // * **Event: 'promise' (stream, headers)**: signals an incoming push promise |
|
24 // |
|
25 // * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0 |
|
26 // (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. |
|
27 // |
|
28 // * **Event: 'error' (type)**: signals an error |
|
29 // |
|
30 // * **headers(headers)**: send headers |
|
31 // |
|
32 // * **promise(headers): Stream**: promise a stream |
|
33 // |
|
34 // * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer |
|
35 // too, but once it is set locally, it can not be changed remotely. |
|
36 // |
|
37 // * **reset(error)**: reset the stream with an error code |
|
38 // |
|
39 // * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames |
|
40 // that are to be sent/arrived to/from the peer and are related to this stream. |
|
41 // |
|
42 // Headers are always in the [regular node.js header format][1]. |
|
43 // [1]: http://nodejs.org/api/http.html#http_message_headers |
|
44 |
|
45 // Constructor |
|
46 // ----------- |
|
47 |
|
48 // The main aspects of managing the stream are: |
|
49 function Stream(log) { |
|
50 Duplex.call(this); |
|
51 |
|
52 // * logging |
|
53 this._log = log.child({ component: 'stream', s: this }); |
|
54 |
|
55 // * receiving and sending stream management commands |
|
56 this._initializeManagement(); |
|
57 |
|
58 // * sending and receiving frames to/from the upstream connection |
|
59 this._initializeDataFlow(); |
|
60 |
|
61 // * maintaining the state of the stream (idle, open, closed, etc.) and error detection |
|
62 this._initializeState(); |
|
63 } |
|
64 |
|
65 Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } }); |
|
66 |
|
67 // Managing the stream |
|
68 // ------------------- |
|
69 |
|
70 // the default stream priority is 2^30 |
|
71 var DEFAULT_PRIORITY = Math.pow(2, 30); |
|
72 var MAX_PRIORITY = Math.pow(2, 31) - 1; |
|
73 |
|
74 // PUSH_PROMISE and HEADERS are forwarded to the user through events. |
|
75 Stream.prototype._initializeManagement = function _initializeManagement() { |
|
76 this._resetSent = false; |
|
77 this._priority = DEFAULT_PRIORITY; |
|
78 this._letPeerPrioritize = true; |
|
79 }; |
|
80 |
|
81 Stream.prototype.promise = function promise(headers) { |
|
82 var stream = new Stream(this._log); |
|
83 stream._priority = Math.min(this._priority + 1, MAX_PRIORITY); |
|
84 this._pushUpstream({ |
|
85 type: 'PUSH_PROMISE', |
|
86 flags: {}, |
|
87 stream: this.id, |
|
88 promised_stream: stream, |
|
89 headers: headers |
|
90 }); |
|
91 return stream; |
|
92 }; |
|
93 |
|
94 Stream.prototype._onPromise = function _onPromise(frame) { |
|
95 this.emit('promise', frame.promised_stream, frame.headers); |
|
96 }; |
|
97 |
|
98 Stream.prototype.headers = function headers(headers) { |
|
99 this._pushUpstream({ |
|
100 type: 'HEADERS', |
|
101 flags: {}, |
|
102 stream: this.id, |
|
103 headers: headers |
|
104 }); |
|
105 }; |
|
106 |
|
107 Stream.prototype._onHeaders = function _onHeaders(frame) { |
|
108 if (frame.priority !== undefined) { |
|
109 this.priority(frame.priority, true); |
|
110 } |
|
111 this.emit('headers', frame.headers); |
|
112 }; |
|
113 |
|
114 Stream.prototype.priority = function priority(priority, peer) { |
|
115 if ((peer && this._letPeerPrioritize) || !peer) { |
|
116 if (!peer) { |
|
117 this._letPeerPrioritize = false; |
|
118 |
|
119 var lastFrame = this.upstream.getLastQueuedFrame(); |
|
120 if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) { |
|
121 lastFrame.priority = priority; |
|
122 } else { |
|
123 this._pushUpstream({ |
|
124 type: 'PRIORITY', |
|
125 flags: {}, |
|
126 stream: this.id, |
|
127 priority: priority |
|
128 }); |
|
129 } |
|
130 } |
|
131 |
|
132 this._log.debug({ priority: priority }, 'Changing priority'); |
|
133 this.emit('priority', priority); |
|
134 this._priority = priority; |
|
135 } |
|
136 }; |
|
137 |
|
138 Stream.prototype._onPriority = function _onPriority(frame) { |
|
139 this.priority(frame.priority, true); |
|
140 }; |
|
141 |
|
142 // Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for |
|
143 // any stream. |
|
144 Stream.prototype.reset = function reset(error) { |
|
145 if (!this._resetSent) { |
|
146 this._resetSent = true; |
|
147 this._pushUpstream({ |
|
148 type: 'RST_STREAM', |
|
149 flags: {}, |
|
150 stream: this.id, |
|
151 error: error |
|
152 }); |
|
153 } |
|
154 }; |
|
155 |
|
156 // Data flow |
|
157 // --------- |
|
158 |
|
159 // The incoming and the generated outgoing frames are received/transmitted on the `this.upstream` |
|
160 // [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read |
|
161 // and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by |
|
162 // the user to write or read the body of the request. |
|
163 // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex |
|
164 |
|
165 // upstream side stream user side |
|
166 // |
|
167 // +------------------------------------+ |
|
168 // | | |
|
169 // +------------------+ | |
|
170 // | upstream | | |
|
171 // | | | |
|
172 // +--+ | +--| |
|
173 // read() | | _send() | _write() | | write(buf) |
|
174 // <--------------|B |<--------------|--------------| B|<------------ |
|
175 // | | | | | |
|
176 // frames +--+ | +--| buffers |
|
177 // | | | | | |
|
178 // -------------->|B |---------------|------------->| B|------------> |
|
179 // write(frame) | | _receive() | _read() | | read() |
|
180 // +--+ | +--| |
|
181 // | | | |
|
182 // | | | |
|
183 // +------------------+ | |
|
184 // | | |
|
185 // +------------------------------------+ |
|
186 // |
|
187 // B: input or output buffer |
|
188 |
|
189 var Flow = require('./flow').Flow; |
|
190 |
|
191 Stream.prototype._initializeDataFlow = function _initializeDataFlow() { |
|
192 this.id = undefined; |
|
193 |
|
194 this._ended = false; |
|
195 |
|
196 this.upstream = new Flow(); |
|
197 this.upstream._log = this._log; |
|
198 this.upstream._send = this._send.bind(this); |
|
199 this.upstream._receive = this._receive.bind(this); |
|
200 this.upstream.write = this._writeUpstream.bind(this); |
|
201 this.upstream.on('error', this.emit.bind(this, 'error')); |
|
202 |
|
203 this.on('finish', this._finishing); |
|
204 }; |
|
205 |
|
206 Stream.prototype._pushUpstream = function _pushUpstream(frame) { |
|
207 this.upstream.push(frame); |
|
208 this._transition(true, frame); |
|
209 }; |
|
210 |
|
211 // Overriding the upstream's `write` allows us to act immediately instead of waiting for the input |
|
212 // queue to empty. This is important in case of control frames. |
|
213 Stream.prototype._writeUpstream = function _writeUpstream(frame) { |
|
214 this._log.debug({ frame: frame }, 'Receiving frame'); |
|
215 |
|
216 var moreNeeded = Flow.prototype.write.call(this.upstream, frame); |
|
217 |
|
218 // * Transition to a new state if that's the effect of receiving the frame |
|
219 this._transition(false, frame); |
|
220 |
|
221 // * If it's a control frame. Call the appropriate handler method. |
|
222 if (frame.type === 'HEADERS') { |
|
223 this._onHeaders(frame); |
|
224 } else if (frame.type === 'PUSH_PROMISE') { |
|
225 this._onPromise(frame); |
|
226 } else if (frame.type === 'PRIORITY') { |
|
227 this._onPriority(frame); |
|
228 } |
|
229 |
|
230 // * If it's an invalid stream level frame, emit error |
|
231 else if ((frame.type !== 'DATA') && |
|
232 (frame.type !== 'WINDOW_UPDATE') && |
|
233 (frame.type !== 'RST_STREAM')) { |
|
234 this._log.error({ frame: frame }, 'Invalid stream level frame'); |
|
235 this.emit('error', 'PROTOCOL_ERROR'); |
|
236 } |
|
237 |
|
238 return moreNeeded; |
|
239 }; |
|
240 |
|
241 // The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame. |
|
242 Stream.prototype._receive = function _receive(frame, ready) { |
|
243 // * If it's a DATA frame, then push the payload into the output buffer on the other side. |
|
244 // Call ready when the other side is ready to receive more. |
|
245 if (!this._ended && (frame.type === 'DATA')) { |
|
246 var moreNeeded = this.push(frame.data); |
|
247 if (!moreNeeded) { |
|
248 this._receiveMore = ready; |
|
249 } |
|
250 } |
|
251 |
|
252 // * Any frame may signal the end of the stream with the END_STREAM flag |
|
253 if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { |
|
254 this.push(null); |
|
255 this._ended = true; |
|
256 } |
|
257 |
|
258 // * Postpone calling `ready` if `push()` returned a falsy value |
|
259 if (this._receiveMore !== ready) { |
|
260 ready(); |
|
261 } |
|
262 }; |
|
263 |
|
264 // The `_read` method is called when the user side is ready to receive more data. If there's a |
|
265 // pending write on the upstream, then call its pending ready callback to receive more frames. |
|
266 Stream.prototype._read = function _read() { |
|
267 if (this._receiveMore) { |
|
268 var receiveMore = this._receiveMore; |
|
269 delete this._receiveMore; |
|
270 receiveMore(); |
|
271 } |
|
272 }; |
|
273 |
|
274 // The `write` method gets called when there's a write request from the user. |
|
275 Stream.prototype._write = function _write(buffer, encoding, ready) { |
|
276 // * Chunking is done by the upstream Flow. |
|
277 var moreNeeded = this._pushUpstream({ |
|
278 type: 'DATA', |
|
279 flags: {}, |
|
280 stream: this.id, |
|
281 data: buffer |
|
282 }); |
|
283 |
|
284 // * Call ready when upstream is ready to receive more frames. |
|
285 if (moreNeeded) { |
|
286 ready(); |
|
287 } else { |
|
288 this._sendMore = ready; |
|
289 } |
|
290 }; |
|
291 |
|
292 // The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames. |
|
293 // If there's a pending write on the user side, then call its pending ready callback to receive more |
|
294 // writes. |
|
295 Stream.prototype._send = function _send() { |
|
296 if (this._sendMore) { |
|
297 var sendMore = this._sendMore; |
|
298 delete this._sendMore; |
|
299 sendMore(); |
|
300 } |
|
301 }; |
|
302 |
|
303 // When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM` |
|
304 // flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag, |
|
305 // then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an |
|
306 // existing frame is a nice optimization. |
|
307 var emptyBuffer = new Buffer(0); |
|
308 Stream.prototype._finishing = function _finishing() { |
|
309 var endFrame = { |
|
310 type: 'DATA', |
|
311 flags: { END_STREAM: true }, |
|
312 stream: this.id, |
|
313 data: emptyBuffer |
|
314 }; |
|
315 var lastFrame = this.upstream.getLastQueuedFrame(); |
|
316 if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) { |
|
317 this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.'); |
|
318 lastFrame.flags.END_STREAM = true; |
|
319 this._transition(true, endFrame); |
|
320 } else { |
|
321 this._pushUpstream(endFrame); |
|
322 } |
|
323 }; |
|
324 |
|
325 // [Stream States](http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-5.1) |
|
326 // ---------------- |
|
327 // |
|
328 // +--------+ |
|
329 // PP | | PP |
|
330 // ,--------| idle |--------. |
|
331 // / | | \ |
|
332 // v +--------+ v |
|
333 // +----------+ | +----------+ |
|
334 // | | | H | | |
|
335 // ,---| reserved | | | reserved |---. |
|
336 // | | (local) | v | (remote) | | |
|
337 // | +----------+ +--------+ +----------+ | |
|
338 // | | ES | | ES | | |
|
339 // | | H ,-------| open |-------. | H | |
|
340 // | | / | | \ | | |
|
341 // | v v +--------+ v v | |
|
342 // | +----------+ | +----------+ | |
|
343 // | | half | | | half | | |
|
344 // | | closed | | R | closed | | |
|
345 // | | (remote) | | | (local) | | |
|
346 // | +----------+ | +----------+ | |
|
347 // | | v | | |
|
348 // | | ES / R +--------+ ES / R | | |
|
349 // | `----------->| |<-----------' | |
|
350 // | R | closed | R | |
|
351 // `-------------------->| |<--------------------' |
|
352 // +--------+ |
|
353 |
|
354 // Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame |
|
355 Stream.prototype._initializeState = function _initializeState() { |
|
356 this.state = 'IDLE'; |
|
357 this._initiated = undefined; |
|
358 this._closedByUs = undefined; |
|
359 this._closedWithRst = undefined; |
|
360 }; |
|
361 |
|
362 // Only `_setState` should change `this.state` directly. It also logs the state change and notifies |
|
363 // interested parties using the 'state' event. |
|
364 Stream.prototype._setState = function transition(state) { |
|
365 assert(this.state !== state); |
|
366 this._log.debug({ from: this.state, to: state }, 'State transition'); |
|
367 this.state = state; |
|
368 this.emit('state', state); |
|
369 }; |
|
370 |
|
371 // A state is 'active' if the stream in that state counts towards the concurrency limit. Streams |
|
372 // that are in the "open" state, or either of the "half closed" states count toward this limit. |
|
373 function activeState(state) { |
|
374 return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN')); |
|
375 } |
|
376 |
|
377 // `_transition` is called every time there's an incoming or outgoing frame. It manages state |
|
378 // transitions, and detects stream errors. A stream error is always caused by a frame that is not |
|
379 // allowed in the current state. |
|
380 Stream.prototype._transition = function transition(sending, frame) { |
|
381 var receiving = !sending; |
|
382 var error = undefined; |
|
383 |
|
384 var DATA = false, HEADERS = false, PRIORITY = false; |
|
385 var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false; |
|
386 switch(frame.type) { |
|
387 case 'DATA' : DATA = true; break; |
|
388 case 'HEADERS' : HEADERS = true; break; |
|
389 case 'PRIORITY' : PRIORITY = true; break; |
|
390 case 'RST_STREAM' : RST_STREAM = true; break; |
|
391 case 'PUSH_PROMISE' : PUSH_PROMISE = true; break; |
|
392 case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break; |
|
393 } |
|
394 |
|
395 var previousState = this.state; |
|
396 |
|
397 switch (this.state) { |
|
398 // All streams start in the **idle** state. In this state, no frames have been exchanged. |
|
399 // |
|
400 // * Sending or receiving a HEADERS frame causes the stream to become "open". |
|
401 // |
|
402 // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen. |
|
403 case 'IDLE': |
|
404 if (HEADERS) { |
|
405 this._setState('OPEN'); |
|
406 if (frame.flags.END_STREAM) { |
|
407 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); |
|
408 } |
|
409 this._initiated = sending; |
|
410 } else if (sending && RST_STREAM) { |
|
411 this._setState('CLOSED'); |
|
412 } else { |
|
413 error = 'PROTOCOL_ERROR'; |
|
414 } |
|
415 break; |
|
416 |
|
417 // A stream in the **reserved (local)** state is one that has been promised by sending a |
|
418 // PUSH_PROMISE frame. |
|
419 // |
|
420 // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed |
|
421 // (remote)" state. |
|
422 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This |
|
423 // releases the stream reservation. |
|
424 // * An endpoint may receive PRIORITY frame in this state. |
|
425 // * An endpoint MUST NOT send any other type of frame in this state. |
|
426 case 'RESERVED_LOCAL': |
|
427 if (sending && HEADERS) { |
|
428 this._setState('HALF_CLOSED_REMOTE'); |
|
429 } else if (RST_STREAM) { |
|
430 this._setState('CLOSED'); |
|
431 } else if (receiving && PRIORITY) { |
|
432 /* No state change */ |
|
433 } else { |
|
434 error = 'PROTOCOL_ERROR'; |
|
435 } |
|
436 break; |
|
437 |
|
438 // A stream in the **reserved (remote)** state has been reserved by a remote peer. |
|
439 // |
|
440 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This |
|
441 // releases the stream reservation. |
|
442 // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)". |
|
443 // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream. |
|
444 // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR. |
|
445 case 'RESERVED_REMOTE': |
|
446 if (RST_STREAM) { |
|
447 this._setState('CLOSED'); |
|
448 } else if (receiving && HEADERS) { |
|
449 this._setState('HALF_CLOSED_LOCAL'); |
|
450 } else if (sending && PRIORITY) { |
|
451 /* No state change */ |
|
452 } else { |
|
453 error = 'PROTOCOL_ERROR'; |
|
454 } |
|
455 break; |
|
456 |
|
457 // The **open** state is where both peers can send frames. In this state, sending peers observe |
|
458 // advertised stream level flow control limits. |
|
459 // |
|
460 // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes |
|
461 // the stream to transition into one of the "half closed" states: an endpoint sending a |
|
462 // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint |
|
463 // receiving a END_STREAM flag causes the stream state to become "half closed (remote)". |
|
464 // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition |
|
465 // immediately to "closed". |
|
466 case 'OPEN': |
|
467 if (frame.flags.END_STREAM) { |
|
468 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); |
|
469 } else if (RST_STREAM) { |
|
470 this._setState('CLOSED'); |
|
471 } else { |
|
472 /* No state change */ |
|
473 } |
|
474 break; |
|
475 |
|
476 // A stream that is **half closed (local)** cannot be used for sending frames. |
|
477 // |
|
478 // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM |
|
479 // flag is received, or when either peer sends a RST_STREAM frame. |
|
480 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. |
|
481 // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag. |
|
482 case 'HALF_CLOSED_LOCAL': |
|
483 if (RST_STREAM || (receiving && frame.flags.END_STREAM)) { |
|
484 this._setState('CLOSED'); |
|
485 } else if (receiving || (sending && (PRIORITY || WINDOW_UPDATE))) { |
|
486 /* No state change */ |
|
487 } else { |
|
488 error = 'PROTOCOL_ERROR'; |
|
489 } |
|
490 break; |
|
491 |
|
492 // A stream that is **half closed (remote)** is no longer being used by the peer to send frames. |
|
493 // In this state, an endpoint is no longer obligated to maintain a receiver flow control window |
|
494 // if it performs flow control. |
|
495 // |
|
496 // * If an endpoint receives additional frames for a stream that is in this state it MUST |
|
497 // respond with a stream error of type STREAM_CLOSED. |
|
498 // * A stream can transition from this state to "closed" by sending a frame that contains a |
|
499 // END_STREAM flag, or when either peer sends a RST_STREAM frame. |
|
500 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. |
|
501 // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream. |
|
502 case 'HALF_CLOSED_REMOTE': |
|
503 if (RST_STREAM || (sending && frame.flags.END_STREAM)) { |
|
504 this._setState('CLOSED'); |
|
505 } else if (sending || (receiving && (WINDOW_UPDATE || PRIORITY))) { |
|
506 /* No state change */ |
|
507 } else { |
|
508 error = 'PROTOCOL_ERROR'; |
|
509 } |
|
510 break; |
|
511 |
|
512 // The **closed** state is the terminal state. |
|
513 // |
|
514 // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame |
|
515 // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST |
|
516 // treat that as a stream error of type STREAM_CLOSED. |
|
517 // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short |
|
518 // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives |
|
519 // and processes the frame bearing the END_STREAM flag, it might send either frame type. |
|
520 // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY |
|
521 // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending |
|
522 // END_STREAM as a connection error of type PROTOCOL_ERROR. |
|
523 // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives |
|
524 // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream |
|
525 // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that |
|
526 // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose |
|
527 // to limit the period over which it ignores frames and treat frames that arrive after this |
|
528 // time as being in error. |
|
529 // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE |
|
530 // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM |
|
531 // can be used to close any of those streams. |
|
532 case 'CLOSED': |
|
533 if ((sending && RST_STREAM) || |
|
534 (receiving && this._closedByUs && |
|
535 (this._closedWithRst || WINDOW_UPDATE || PRIORITY || RST_STREAM))) { |
|
536 /* No state change */ |
|
537 } else { |
|
538 error = 'STREAM_CLOSED'; |
|
539 } |
|
540 break; |
|
541 } |
|
542 |
|
543 // Noting that the connection was closed by the other endpoint. It may be important in edge cases. |
|
544 // For example, when the peer tries to cancel a promised stream, but we already sent every data |
|
545 // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM. |
|
546 if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) { |
|
547 this._closedByUs = sending; |
|
548 this._closedWithRst = RST_STREAM; |
|
549 } |
|
550 |
|
551 // Sending/receiving a PUSH_PROMISE |
|
552 // |
|
553 // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state |
|
554 // for the reserved stream transitions to "reserved (local)". |
|
555 // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer. |
|
556 // The state of the stream becomes "reserved (remote)". |
|
557 if (PUSH_PROMISE && !error) { |
|
558 /* This assertion must hold, because _transition is called immediately when a frame is written |
|
559 to the stream. If it would be called when a frame gets out of the input queue, the state |
|
560 of the reserved could have been changed by then. */ |
|
561 assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state); |
|
562 frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE'); |
|
563 frame.promised_stream._initiated = sending; |
|
564 } |
|
565 |
|
566 // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) |
|
567 if (this._initiated) { |
|
568 var change = (activeState(this.state) - activeState(previousState)); |
|
569 if (sending) { |
|
570 frame.count_change = change; |
|
571 } else { |
|
572 frame.count_change(change); |
|
573 } |
|
574 } else if (sending) { |
|
575 frame.count_change = 0; |
|
576 } |
|
577 |
|
578 // Common error handling. |
|
579 if (error) { |
|
580 var info = { |
|
581 error: error, |
|
582 frame: frame, |
|
583 state: this.state, |
|
584 closedByUs: this._closedByUs, |
|
585 closedWithRst: this._closedWithRst |
|
586 }; |
|
587 |
|
588 // * When sending something invalid, throwing an exception, since it is probably a bug. |
|
589 if (sending) { |
|
590 this._log.error(info, 'Sending illegal frame.'); |
|
591 throw new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'); |
|
592 } |
|
593 |
|
594 // * When receiving something invalid, sending an RST_STREAM using the `reset` method. |
|
595 // This will automatically cause a transition to the CLOSED state. |
|
596 else { |
|
597 this._log.error(info, 'Received illegal frame.'); |
|
598 this.emit('error', error); |
|
599 } |
|
600 } |
|
601 }; |
|
602 |
|
603 // Bunyan serializers |
|
604 // ------------------ |
|
605 |
|
606 exports.serializers = {}; |
|
607 |
|
608 var nextId = 0; |
|
609 exports.serializers.s = function(stream) { |
|
610 if (!('_id' in stream)) { |
|
611 stream._id = nextId; |
|
612 nextId += 1; |
|
613 } |
|
614 return stream._id; |
|
615 }; |