testing/xpcshell/node-http2/node_modules/http2-protocol/lib/stream.js

Fri, 16 Jan 2015 18:13:44 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Fri, 16 Jan 2015 18:13:44 +0100
branch
TOR_BUG_9701
changeset 14
925c144e1f1f
permissions
-rw-r--r--

Integrate suggestion from review to improve consistency with existing code.

michael@0 1 var assert = require('assert');
michael@0 2
michael@0 3 // The Stream class
michael@0 4 // ================
michael@0 5
michael@0 6 // Stream is a [Duplex stream](http://nodejs.org/api/stream.html#stream_class_stream_duplex)
michael@0 7 // subclass that implements the [HTTP/2 Stream](http://http2.github.io/http2-spec/#rfc.section.3.4)
michael@0 8 // concept. It has two 'sides': one that is used by the user to send/receive data (the `stream`
michael@0 9 // object itself) and one that is used by a Connection to read/write frames to/from the other peer
michael@0 10 // (`stream.upstream`).
michael@0 11
michael@0 12 var Duplex = require('stream').Duplex;
michael@0 13
michael@0 14 exports.Stream = Stream;
michael@0 15
michael@0 16 // Public API
michael@0 17 // ----------
michael@0 18
michael@0 19 // * **new Stream(log)**: create a new Stream
michael@0 20 //
michael@0 21 // * **Event: 'headers' (headers)**: signals incoming headers
michael@0 22 //
michael@0 23 // * **Event: 'promise' (stream, headers)**: signals an incoming push promise
michael@0 24 //
michael@0 25 // * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0
michael@0 26 // (highest priority) and 2^31-1 (lowest priority). Default value is 2^30.
michael@0 27 //
michael@0 28 // * **Event: 'error' (type)**: signals an error
michael@0 29 //
michael@0 30 // * **headers(headers)**: send headers
michael@0 31 //
michael@0 32 // * **promise(headers): Stream**: promise a stream
michael@0 33 //
michael@0 34 // * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer
michael@0 35 // too, but once it is set locally, it can not be changed remotely.
michael@0 36 //
michael@0 37 // * **reset(error)**: reset the stream with an error code
michael@0 38 //
michael@0 39 // * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames
michael@0 40 // that are to be sent/arrived to/from the peer and are related to this stream.
michael@0 41 //
michael@0 42 // Headers are always in the [regular node.js header format][1].
michael@0 43 // [1]: http://nodejs.org/api/http.html#http_message_headers
michael@0 44
michael@0 45 // Constructor
michael@0 46 // -----------
michael@0 47
michael@0 48 // The main aspects of managing the stream are:
michael@0 49 function Stream(log) {
michael@0 50 Duplex.call(this);
michael@0 51
michael@0 52 // * logging
michael@0 53 this._log = log.child({ component: 'stream', s: this });
michael@0 54
michael@0 55 // * receiving and sending stream management commands
michael@0 56 this._initializeManagement();
michael@0 57
michael@0 58 // * sending and receiving frames to/from the upstream connection
michael@0 59 this._initializeDataFlow();
michael@0 60
michael@0 61 // * maintaining the state of the stream (idle, open, closed, etc.) and error detection
michael@0 62 this._initializeState();
michael@0 63 }
michael@0 64
michael@0 65 Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } });
michael@0 66
michael@0 67 // Managing the stream
michael@0 68 // -------------------
michael@0 69
michael@0 70 // the default stream priority is 2^30
michael@0 71 var DEFAULT_PRIORITY = Math.pow(2, 30);
michael@0 72 var MAX_PRIORITY = Math.pow(2, 31) - 1;
michael@0 73
michael@0 74 // PUSH_PROMISE and HEADERS are forwarded to the user through events.
michael@0 75 Stream.prototype._initializeManagement = function _initializeManagement() {
michael@0 76 this._resetSent = false;
michael@0 77 this._priority = DEFAULT_PRIORITY;
michael@0 78 this._letPeerPrioritize = true;
michael@0 79 };
michael@0 80
michael@0 81 Stream.prototype.promise = function promise(headers) {
michael@0 82 var stream = new Stream(this._log);
michael@0 83 stream._priority = Math.min(this._priority + 1, MAX_PRIORITY);
michael@0 84 this._pushUpstream({
michael@0 85 type: 'PUSH_PROMISE',
michael@0 86 flags: {},
michael@0 87 stream: this.id,
michael@0 88 promised_stream: stream,
michael@0 89 headers: headers
michael@0 90 });
michael@0 91 return stream;
michael@0 92 };
michael@0 93
michael@0 94 Stream.prototype._onPromise = function _onPromise(frame) {
michael@0 95 this.emit('promise', frame.promised_stream, frame.headers);
michael@0 96 };
michael@0 97
michael@0 98 Stream.prototype.headers = function headers(headers) {
michael@0 99 this._pushUpstream({
michael@0 100 type: 'HEADERS',
michael@0 101 flags: {},
michael@0 102 stream: this.id,
michael@0 103 headers: headers
michael@0 104 });
michael@0 105 };
michael@0 106
michael@0 107 Stream.prototype._onHeaders = function _onHeaders(frame) {
michael@0 108 if (frame.priority !== undefined) {
michael@0 109 this.priority(frame.priority, true);
michael@0 110 }
michael@0 111 this.emit('headers', frame.headers);
michael@0 112 };
michael@0 113
michael@0 114 Stream.prototype.priority = function priority(priority, peer) {
michael@0 115 if ((peer && this._letPeerPrioritize) || !peer) {
michael@0 116 if (!peer) {
michael@0 117 this._letPeerPrioritize = false;
michael@0 118
michael@0 119 var lastFrame = this.upstream.getLastQueuedFrame();
michael@0 120 if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) {
michael@0 121 lastFrame.priority = priority;
michael@0 122 } else {
michael@0 123 this._pushUpstream({
michael@0 124 type: 'PRIORITY',
michael@0 125 flags: {},
michael@0 126 stream: this.id,
michael@0 127 priority: priority
michael@0 128 });
michael@0 129 }
michael@0 130 }
michael@0 131
michael@0 132 this._log.debug({ priority: priority }, 'Changing priority');
michael@0 133 this.emit('priority', priority);
michael@0 134 this._priority = priority;
michael@0 135 }
michael@0 136 };
michael@0 137
michael@0 138 Stream.prototype._onPriority = function _onPriority(frame) {
michael@0 139 this.priority(frame.priority, true);
michael@0 140 };
michael@0 141
michael@0 142 // Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for
michael@0 143 // any stream.
michael@0 144 Stream.prototype.reset = function reset(error) {
michael@0 145 if (!this._resetSent) {
michael@0 146 this._resetSent = true;
michael@0 147 this._pushUpstream({
michael@0 148 type: 'RST_STREAM',
michael@0 149 flags: {},
michael@0 150 stream: this.id,
michael@0 151 error: error
michael@0 152 });
michael@0 153 }
michael@0 154 };
michael@0 155
michael@0 156 // Data flow
michael@0 157 // ---------
michael@0 158
michael@0 159 // The incoming and the generated outgoing frames are received/transmitted on the `this.upstream`
michael@0 160 // [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read
michael@0 161 // and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by
michael@0 162 // the user to write or read the body of the request.
michael@0 163 // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex
michael@0 164
michael@0 165 // upstream side stream user side
michael@0 166 //
michael@0 167 // +------------------------------------+
michael@0 168 // | |
michael@0 169 // +------------------+ |
michael@0 170 // | upstream | |
michael@0 171 // | | |
michael@0 172 // +--+ | +--|
michael@0 173 // read() | | _send() | _write() | | write(buf)
michael@0 174 // <--------------|B |<--------------|--------------| B|<------------
michael@0 175 // | | | | |
michael@0 176 // frames +--+ | +--| buffers
michael@0 177 // | | | | |
michael@0 178 // -------------->|B |---------------|------------->| B|------------>
michael@0 179 // write(frame) | | _receive() | _read() | | read()
michael@0 180 // +--+ | +--|
michael@0 181 // | | |
michael@0 182 // | | |
michael@0 183 // +------------------+ |
michael@0 184 // | |
michael@0 185 // +------------------------------------+
michael@0 186 //
michael@0 187 // B: input or output buffer
michael@0 188
michael@0 189 var Flow = require('./flow').Flow;
michael@0 190
michael@0 191 Stream.prototype._initializeDataFlow = function _initializeDataFlow() {
michael@0 192 this.id = undefined;
michael@0 193
michael@0 194 this._ended = false;
michael@0 195
michael@0 196 this.upstream = new Flow();
michael@0 197 this.upstream._log = this._log;
michael@0 198 this.upstream._send = this._send.bind(this);
michael@0 199 this.upstream._receive = this._receive.bind(this);
michael@0 200 this.upstream.write = this._writeUpstream.bind(this);
michael@0 201 this.upstream.on('error', this.emit.bind(this, 'error'));
michael@0 202
michael@0 203 this.on('finish', this._finishing);
michael@0 204 };
michael@0 205
michael@0 206 Stream.prototype._pushUpstream = function _pushUpstream(frame) {
michael@0 207 this.upstream.push(frame);
michael@0 208 this._transition(true, frame);
michael@0 209 };
michael@0 210
michael@0 211 // Overriding the upstream's `write` allows us to act immediately instead of waiting for the input
michael@0 212 // queue to empty. This is important in case of control frames.
michael@0 213 Stream.prototype._writeUpstream = function _writeUpstream(frame) {
michael@0 214 this._log.debug({ frame: frame }, 'Receiving frame');
michael@0 215
michael@0 216 var moreNeeded = Flow.prototype.write.call(this.upstream, frame);
michael@0 217
michael@0 218 // * Transition to a new state if that's the effect of receiving the frame
michael@0 219 this._transition(false, frame);
michael@0 220
michael@0 221 // * If it's a control frame. Call the appropriate handler method.
michael@0 222 if (frame.type === 'HEADERS') {
michael@0 223 this._onHeaders(frame);
michael@0 224 } else if (frame.type === 'PUSH_PROMISE') {
michael@0 225 this._onPromise(frame);
michael@0 226 } else if (frame.type === 'PRIORITY') {
michael@0 227 this._onPriority(frame);
michael@0 228 }
michael@0 229
michael@0 230 // * If it's an invalid stream level frame, emit error
michael@0 231 else if ((frame.type !== 'DATA') &&
michael@0 232 (frame.type !== 'WINDOW_UPDATE') &&
michael@0 233 (frame.type !== 'RST_STREAM')) {
michael@0 234 this._log.error({ frame: frame }, 'Invalid stream level frame');
michael@0 235 this.emit('error', 'PROTOCOL_ERROR');
michael@0 236 }
michael@0 237
michael@0 238 return moreNeeded;
michael@0 239 };
michael@0 240
michael@0 241 // The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame.
michael@0 242 Stream.prototype._receive = function _receive(frame, ready) {
michael@0 243 // * If it's a DATA frame, then push the payload into the output buffer on the other side.
michael@0 244 // Call ready when the other side is ready to receive more.
michael@0 245 if (!this._ended && (frame.type === 'DATA')) {
michael@0 246 var moreNeeded = this.push(frame.data);
michael@0 247 if (!moreNeeded) {
michael@0 248 this._receiveMore = ready;
michael@0 249 }
michael@0 250 }
michael@0 251
michael@0 252 // * Any frame may signal the end of the stream with the END_STREAM flag
michael@0 253 if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
michael@0 254 this.push(null);
michael@0 255 this._ended = true;
michael@0 256 }
michael@0 257
michael@0 258 // * Postpone calling `ready` if `push()` returned a falsy value
michael@0 259 if (this._receiveMore !== ready) {
michael@0 260 ready();
michael@0 261 }
michael@0 262 };
michael@0 263
michael@0 264 // The `_read` method is called when the user side is ready to receive more data. If there's a
michael@0 265 // pending write on the upstream, then call its pending ready callback to receive more frames.
michael@0 266 Stream.prototype._read = function _read() {
michael@0 267 if (this._receiveMore) {
michael@0 268 var receiveMore = this._receiveMore;
michael@0 269 delete this._receiveMore;
michael@0 270 receiveMore();
michael@0 271 }
michael@0 272 };
michael@0 273
michael@0 274 // The `write` method gets called when there's a write request from the user.
michael@0 275 Stream.prototype._write = function _write(buffer, encoding, ready) {
michael@0 276 // * Chunking is done by the upstream Flow.
michael@0 277 var moreNeeded = this._pushUpstream({
michael@0 278 type: 'DATA',
michael@0 279 flags: {},
michael@0 280 stream: this.id,
michael@0 281 data: buffer
michael@0 282 });
michael@0 283
michael@0 284 // * Call ready when upstream is ready to receive more frames.
michael@0 285 if (moreNeeded) {
michael@0 286 ready();
michael@0 287 } else {
michael@0 288 this._sendMore = ready;
michael@0 289 }
michael@0 290 };
michael@0 291
michael@0 292 // The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames.
michael@0 293 // If there's a pending write on the user side, then call its pending ready callback to receive more
michael@0 294 // writes.
michael@0 295 Stream.prototype._send = function _send() {
michael@0 296 if (this._sendMore) {
michael@0 297 var sendMore = this._sendMore;
michael@0 298 delete this._sendMore;
michael@0 299 sendMore();
michael@0 300 }
michael@0 301 };
michael@0 302
michael@0 303 // When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM`
michael@0 304 // flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag,
michael@0 305 // then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an
michael@0 306 // existing frame is a nice optimization.
michael@0 307 var emptyBuffer = new Buffer(0);
michael@0 308 Stream.prototype._finishing = function _finishing() {
michael@0 309 var endFrame = {
michael@0 310 type: 'DATA',
michael@0 311 flags: { END_STREAM: true },
michael@0 312 stream: this.id,
michael@0 313 data: emptyBuffer
michael@0 314 };
michael@0 315 var lastFrame = this.upstream.getLastQueuedFrame();
michael@0 316 if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) {
michael@0 317 this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.');
michael@0 318 lastFrame.flags.END_STREAM = true;
michael@0 319 this._transition(true, endFrame);
michael@0 320 } else {
michael@0 321 this._pushUpstream(endFrame);
michael@0 322 }
michael@0 323 };
michael@0 324
michael@0 325 // [Stream States](http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-5.1)
michael@0 326 // ----------------
michael@0 327 //
michael@0 328 // +--------+
michael@0 329 // PP | | PP
michael@0 330 // ,--------| idle |--------.
michael@0 331 // / | | \
michael@0 332 // v +--------+ v
michael@0 333 // +----------+ | +----------+
michael@0 334 // | | | H | |
michael@0 335 // ,---| reserved | | | reserved |---.
michael@0 336 // | | (local) | v | (remote) | |
michael@0 337 // | +----------+ +--------+ +----------+ |
michael@0 338 // | | ES | | ES | |
michael@0 339 // | | H ,-------| open |-------. | H |
michael@0 340 // | | / | | \ | |
michael@0 341 // | v v +--------+ v v |
michael@0 342 // | +----------+ | +----------+ |
michael@0 343 // | | half | | | half | |
michael@0 344 // | | closed | | R | closed | |
michael@0 345 // | | (remote) | | | (local) | |
michael@0 346 // | +----------+ | +----------+ |
michael@0 347 // | | v | |
michael@0 348 // | | ES / R +--------+ ES / R | |
michael@0 349 // | `----------->| |<-----------' |
michael@0 350 // | R | closed | R |
michael@0 351 // `-------------------->| |<--------------------'
michael@0 352 // +--------+
michael@0 353
michael@0 354 // Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame
michael@0 355 Stream.prototype._initializeState = function _initializeState() {
michael@0 356 this.state = 'IDLE';
michael@0 357 this._initiated = undefined;
michael@0 358 this._closedByUs = undefined;
michael@0 359 this._closedWithRst = undefined;
michael@0 360 };
michael@0 361
michael@0 362 // Only `_setState` should change `this.state` directly. It also logs the state change and notifies
michael@0 363 // interested parties using the 'state' event.
michael@0 364 Stream.prototype._setState = function transition(state) {
michael@0 365 assert(this.state !== state);
michael@0 366 this._log.debug({ from: this.state, to: state }, 'State transition');
michael@0 367 this.state = state;
michael@0 368 this.emit('state', state);
michael@0 369 };
michael@0 370
michael@0 371 // A state is 'active' if the stream in that state counts towards the concurrency limit. Streams
michael@0 372 // that are in the "open" state, or either of the "half closed" states count toward this limit.
michael@0 373 function activeState(state) {
michael@0 374 return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN'));
michael@0 375 }
michael@0 376
michael@0 377 // `_transition` is called every time there's an incoming or outgoing frame. It manages state
michael@0 378 // transitions, and detects stream errors. A stream error is always caused by a frame that is not
michael@0 379 // allowed in the current state.
michael@0 380 Stream.prototype._transition = function transition(sending, frame) {
michael@0 381 var receiving = !sending;
michael@0 382 var error = undefined;
michael@0 383
michael@0 384 var DATA = false, HEADERS = false, PRIORITY = false;
michael@0 385 var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false;
michael@0 386 switch(frame.type) {
michael@0 387 case 'DATA' : DATA = true; break;
michael@0 388 case 'HEADERS' : HEADERS = true; break;
michael@0 389 case 'PRIORITY' : PRIORITY = true; break;
michael@0 390 case 'RST_STREAM' : RST_STREAM = true; break;
michael@0 391 case 'PUSH_PROMISE' : PUSH_PROMISE = true; break;
michael@0 392 case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break;
michael@0 393 }
michael@0 394
michael@0 395 var previousState = this.state;
michael@0 396
michael@0 397 switch (this.state) {
michael@0 398 // All streams start in the **idle** state. In this state, no frames have been exchanged.
michael@0 399 //
michael@0 400 // * Sending or receiving a HEADERS frame causes the stream to become "open".
michael@0 401 //
michael@0 402 // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen.
michael@0 403 case 'IDLE':
michael@0 404 if (HEADERS) {
michael@0 405 this._setState('OPEN');
michael@0 406 if (frame.flags.END_STREAM) {
michael@0 407 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
michael@0 408 }
michael@0 409 this._initiated = sending;
michael@0 410 } else if (sending && RST_STREAM) {
michael@0 411 this._setState('CLOSED');
michael@0 412 } else {
michael@0 413 error = 'PROTOCOL_ERROR';
michael@0 414 }
michael@0 415 break;
michael@0 416
michael@0 417 // A stream in the **reserved (local)** state is one that has been promised by sending a
michael@0 418 // PUSH_PROMISE frame.
michael@0 419 //
michael@0 420 // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed
michael@0 421 // (remote)" state.
michael@0 422 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
michael@0 423 // releases the stream reservation.
michael@0 424 // * An endpoint may receive PRIORITY frame in this state.
michael@0 425 // * An endpoint MUST NOT send any other type of frame in this state.
michael@0 426 case 'RESERVED_LOCAL':
michael@0 427 if (sending && HEADERS) {
michael@0 428 this._setState('HALF_CLOSED_REMOTE');
michael@0 429 } else if (RST_STREAM) {
michael@0 430 this._setState('CLOSED');
michael@0 431 } else if (receiving && PRIORITY) {
michael@0 432 /* No state change */
michael@0 433 } else {
michael@0 434 error = 'PROTOCOL_ERROR';
michael@0 435 }
michael@0 436 break;
michael@0 437
michael@0 438 // A stream in the **reserved (remote)** state has been reserved by a remote peer.
michael@0 439 //
michael@0 440 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
michael@0 441 // releases the stream reservation.
michael@0 442 // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)".
michael@0 443 // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream.
michael@0 444 // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR.
michael@0 445 case 'RESERVED_REMOTE':
michael@0 446 if (RST_STREAM) {
michael@0 447 this._setState('CLOSED');
michael@0 448 } else if (receiving && HEADERS) {
michael@0 449 this._setState('HALF_CLOSED_LOCAL');
michael@0 450 } else if (sending && PRIORITY) {
michael@0 451 /* No state change */
michael@0 452 } else {
michael@0 453 error = 'PROTOCOL_ERROR';
michael@0 454 }
michael@0 455 break;
michael@0 456
michael@0 457 // The **open** state is where both peers can send frames. In this state, sending peers observe
michael@0 458 // advertised stream level flow control limits.
michael@0 459 //
michael@0 460 // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes
michael@0 461 // the stream to transition into one of the "half closed" states: an endpoint sending a
michael@0 462 // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint
michael@0 463 // receiving a END_STREAM flag causes the stream state to become "half closed (remote)".
michael@0 464 // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition
michael@0 465 // immediately to "closed".
michael@0 466 case 'OPEN':
michael@0 467 if (frame.flags.END_STREAM) {
michael@0 468 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
michael@0 469 } else if (RST_STREAM) {
michael@0 470 this._setState('CLOSED');
michael@0 471 } else {
michael@0 472 /* No state change */
michael@0 473 }
michael@0 474 break;
michael@0 475
michael@0 476 // A stream that is **half closed (local)** cannot be used for sending frames.
michael@0 477 //
michael@0 478 // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM
michael@0 479 // flag is received, or when either peer sends a RST_STREAM frame.
michael@0 480 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
michael@0 481 // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag.
michael@0 482 case 'HALF_CLOSED_LOCAL':
michael@0 483 if (RST_STREAM || (receiving && frame.flags.END_STREAM)) {
michael@0 484 this._setState('CLOSED');
michael@0 485 } else if (receiving || (sending && (PRIORITY || WINDOW_UPDATE))) {
michael@0 486 /* No state change */
michael@0 487 } else {
michael@0 488 error = 'PROTOCOL_ERROR';
michael@0 489 }
michael@0 490 break;
michael@0 491
michael@0 492 // A stream that is **half closed (remote)** is no longer being used by the peer to send frames.
michael@0 493 // In this state, an endpoint is no longer obligated to maintain a receiver flow control window
michael@0 494 // if it performs flow control.
michael@0 495 //
michael@0 496 // * If an endpoint receives additional frames for a stream that is in this state it MUST
michael@0 497 // respond with a stream error of type STREAM_CLOSED.
michael@0 498 // * A stream can transition from this state to "closed" by sending a frame that contains a
michael@0 499 // END_STREAM flag, or when either peer sends a RST_STREAM frame.
michael@0 500 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
michael@0 501 // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream.
michael@0 502 case 'HALF_CLOSED_REMOTE':
michael@0 503 if (RST_STREAM || (sending && frame.flags.END_STREAM)) {
michael@0 504 this._setState('CLOSED');
michael@0 505 } else if (sending || (receiving && (WINDOW_UPDATE || PRIORITY))) {
michael@0 506 /* No state change */
michael@0 507 } else {
michael@0 508 error = 'PROTOCOL_ERROR';
michael@0 509 }
michael@0 510 break;
michael@0 511
michael@0 512 // The **closed** state is the terminal state.
michael@0 513 //
michael@0 514 // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame
michael@0 515 // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST
michael@0 516 // treat that as a stream error of type STREAM_CLOSED.
michael@0 517 // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short
michael@0 518 // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives
michael@0 519 // and processes the frame bearing the END_STREAM flag, it might send either frame type.
michael@0 520 // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY
michael@0 521 // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending
michael@0 522 // END_STREAM as a connection error of type PROTOCOL_ERROR.
michael@0 523 // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives
michael@0 524 // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream
michael@0 525 // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that
michael@0 526 // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose
michael@0 527 // to limit the period over which it ignores frames and treat frames that arrive after this
michael@0 528 // time as being in error.
michael@0 529 // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE
michael@0 530 // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM
michael@0 531 // can be used to close any of those streams.
michael@0 532 case 'CLOSED':
michael@0 533 if ((sending && RST_STREAM) ||
michael@0 534 (receiving && this._closedByUs &&
michael@0 535 (this._closedWithRst || WINDOW_UPDATE || PRIORITY || RST_STREAM))) {
michael@0 536 /* No state change */
michael@0 537 } else {
michael@0 538 error = 'STREAM_CLOSED';
michael@0 539 }
michael@0 540 break;
michael@0 541 }
michael@0 542
michael@0 543 // Noting that the connection was closed by the other endpoint. It may be important in edge cases.
michael@0 544 // For example, when the peer tries to cancel a promised stream, but we already sent every data
michael@0 545 // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM.
michael@0 546 if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) {
michael@0 547 this._closedByUs = sending;
michael@0 548 this._closedWithRst = RST_STREAM;
michael@0 549 }
michael@0 550
michael@0 551 // Sending/receiving a PUSH_PROMISE
michael@0 552 //
michael@0 553 // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state
michael@0 554 // for the reserved stream transitions to "reserved (local)".
michael@0 555 // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer.
michael@0 556 // The state of the stream becomes "reserved (remote)".
michael@0 557 if (PUSH_PROMISE && !error) {
michael@0 558 /* This assertion must hold, because _transition is called immediately when a frame is written
michael@0 559 to the stream. If it would be called when a frame gets out of the input queue, the state
michael@0 560 of the reserved could have been changed by then. */
michael@0 561 assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state);
michael@0 562 frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
michael@0 563 frame.promised_stream._initiated = sending;
michael@0 564 }
michael@0 565
michael@0 566 // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1)
michael@0 567 if (this._initiated) {
michael@0 568 var change = (activeState(this.state) - activeState(previousState));
michael@0 569 if (sending) {
michael@0 570 frame.count_change = change;
michael@0 571 } else {
michael@0 572 frame.count_change(change);
michael@0 573 }
michael@0 574 } else if (sending) {
michael@0 575 frame.count_change = 0;
michael@0 576 }
michael@0 577
michael@0 578 // Common error handling.
michael@0 579 if (error) {
michael@0 580 var info = {
michael@0 581 error: error,
michael@0 582 frame: frame,
michael@0 583 state: this.state,
michael@0 584 closedByUs: this._closedByUs,
michael@0 585 closedWithRst: this._closedWithRst
michael@0 586 };
michael@0 587
michael@0 588 // * When sending something invalid, throwing an exception, since it is probably a bug.
michael@0 589 if (sending) {
michael@0 590 this._log.error(info, 'Sending illegal frame.');
michael@0 591 throw new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.');
michael@0 592 }
michael@0 593
michael@0 594 // * When receiving something invalid, sending an RST_STREAM using the `reset` method.
michael@0 595 // This will automatically cause a transition to the CLOSED state.
michael@0 596 else {
michael@0 597 this._log.error(info, 'Received illegal frame.');
michael@0 598 this.emit('error', error);
michael@0 599 }
michael@0 600 }
michael@0 601 };
michael@0 602
michael@0 603 // Bunyan serializers
michael@0 604 // ------------------
michael@0 605
michael@0 606 exports.serializers = {};
michael@0 607
michael@0 608 var nextId = 0;
michael@0 609 exports.serializers.s = function(stream) {
michael@0 610 if (!('_id' in stream)) {
michael@0 611 stream._id = nextId;
michael@0 612 nextId += 1;
michael@0 613 }
michael@0 614 return stream._id;
michael@0 615 };

mercurial