michael@0: var assert = require('assert'); michael@0: michael@0: // The Stream class michael@0: // ================ michael@0: michael@0: // Stream is a [Duplex stream](http://nodejs.org/api/stream.html#stream_class_stream_duplex) michael@0: // subclass that implements the [HTTP/2 Stream](http://http2.github.io/http2-spec/#rfc.section.3.4) michael@0: // concept. It has two 'sides': one that is used by the user to send/receive data (the `stream` michael@0: // object itself) and one that is used by a Connection to read/write frames to/from the other peer michael@0: // (`stream.upstream`). michael@0: michael@0: var Duplex = require('stream').Duplex; michael@0: michael@0: exports.Stream = Stream; michael@0: michael@0: // Public API michael@0: // ---------- michael@0: michael@0: // * **new Stream(log)**: create a new Stream michael@0: // michael@0: // * **Event: 'headers' (headers)**: signals incoming headers michael@0: // michael@0: // * **Event: 'promise' (stream, headers)**: signals an incoming push promise michael@0: // michael@0: // * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0 michael@0: // (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. michael@0: // michael@0: // * **Event: 'error' (type)**: signals an error michael@0: // michael@0: // * **headers(headers)**: send headers michael@0: // michael@0: // * **promise(headers): Stream**: promise a stream michael@0: // michael@0: // * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer michael@0: // too, but once it is set locally, it can not be changed remotely. michael@0: // michael@0: // * **reset(error)**: reset the stream with an error code michael@0: // michael@0: // * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames michael@0: // that are to be sent/arrived to/from the peer and are related to this stream. michael@0: // michael@0: // Headers are always in the [regular node.js header format][1]. michael@0: // [1]: http://nodejs.org/api/http.html#http_message_headers michael@0: michael@0: // Constructor michael@0: // ----------- michael@0: michael@0: // The main aspects of managing the stream are: michael@0: function Stream(log) { michael@0: Duplex.call(this); michael@0: michael@0: // * logging michael@0: this._log = log.child({ component: 'stream', s: this }); michael@0: michael@0: // * receiving and sending stream management commands michael@0: this._initializeManagement(); michael@0: michael@0: // * sending and receiving frames to/from the upstream connection michael@0: this._initializeDataFlow(); michael@0: michael@0: // * maintaining the state of the stream (idle, open, closed, etc.) and error detection michael@0: this._initializeState(); michael@0: } michael@0: michael@0: Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } }); michael@0: michael@0: // Managing the stream michael@0: // ------------------- michael@0: michael@0: // the default stream priority is 2^30 michael@0: var DEFAULT_PRIORITY = Math.pow(2, 30); michael@0: var MAX_PRIORITY = Math.pow(2, 31) - 1; michael@0: michael@0: // PUSH_PROMISE and HEADERS are forwarded to the user through events. michael@0: Stream.prototype._initializeManagement = function _initializeManagement() { michael@0: this._resetSent = false; michael@0: this._priority = DEFAULT_PRIORITY; michael@0: this._letPeerPrioritize = true; michael@0: }; michael@0: michael@0: Stream.prototype.promise = function promise(headers) { michael@0: var stream = new Stream(this._log); michael@0: stream._priority = Math.min(this._priority + 1, MAX_PRIORITY); michael@0: this._pushUpstream({ michael@0: type: 'PUSH_PROMISE', michael@0: flags: {}, michael@0: stream: this.id, michael@0: promised_stream: stream, michael@0: headers: headers michael@0: }); michael@0: return stream; michael@0: }; michael@0: michael@0: Stream.prototype._onPromise = function _onPromise(frame) { michael@0: this.emit('promise', frame.promised_stream, frame.headers); michael@0: }; michael@0: michael@0: Stream.prototype.headers = function headers(headers) { michael@0: this._pushUpstream({ michael@0: type: 'HEADERS', michael@0: flags: {}, michael@0: stream: this.id, michael@0: headers: headers michael@0: }); michael@0: }; michael@0: michael@0: Stream.prototype._onHeaders = function _onHeaders(frame) { michael@0: if (frame.priority !== undefined) { michael@0: this.priority(frame.priority, true); michael@0: } michael@0: this.emit('headers', frame.headers); michael@0: }; michael@0: michael@0: Stream.prototype.priority = function priority(priority, peer) { michael@0: if ((peer && this._letPeerPrioritize) || !peer) { michael@0: if (!peer) { michael@0: this._letPeerPrioritize = false; michael@0: michael@0: var lastFrame = this.upstream.getLastQueuedFrame(); michael@0: if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) { michael@0: lastFrame.priority = priority; michael@0: } else { michael@0: this._pushUpstream({ michael@0: type: 'PRIORITY', michael@0: flags: {}, michael@0: stream: this.id, michael@0: priority: priority michael@0: }); michael@0: } michael@0: } michael@0: michael@0: this._log.debug({ priority: priority }, 'Changing priority'); michael@0: this.emit('priority', priority); michael@0: this._priority = priority; michael@0: } michael@0: }; michael@0: michael@0: Stream.prototype._onPriority = function _onPriority(frame) { michael@0: this.priority(frame.priority, true); michael@0: }; michael@0: michael@0: // Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for michael@0: // any stream. michael@0: Stream.prototype.reset = function reset(error) { michael@0: if (!this._resetSent) { michael@0: this._resetSent = true; michael@0: this._pushUpstream({ michael@0: type: 'RST_STREAM', michael@0: flags: {}, michael@0: stream: this.id, michael@0: error: error michael@0: }); michael@0: } michael@0: }; michael@0: michael@0: // Data flow michael@0: // --------- michael@0: michael@0: // The incoming and the generated outgoing frames are received/transmitted on the `this.upstream` michael@0: // [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read michael@0: // and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by michael@0: // the user to write or read the body of the request. michael@0: // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex michael@0: michael@0: // upstream side stream user side michael@0: // michael@0: // +------------------------------------+ michael@0: // | | michael@0: // +------------------+ | michael@0: // | upstream | | michael@0: // | | | michael@0: // +--+ | +--| michael@0: // read() | | _send() | _write() | | write(buf) michael@0: // <--------------|B |<--------------|--------------| B|<------------ michael@0: // | | | | | michael@0: // frames +--+ | +--| buffers michael@0: // | | | | | michael@0: // -------------->|B |---------------|------------->| B|------------> michael@0: // write(frame) | | _receive() | _read() | | read() michael@0: // +--+ | +--| michael@0: // | | | michael@0: // | | | michael@0: // +------------------+ | michael@0: // | | michael@0: // +------------------------------------+ michael@0: // michael@0: // B: input or output buffer michael@0: michael@0: var Flow = require('./flow').Flow; michael@0: michael@0: Stream.prototype._initializeDataFlow = function _initializeDataFlow() { michael@0: this.id = undefined; michael@0: michael@0: this._ended = false; michael@0: michael@0: this.upstream = new Flow(); michael@0: this.upstream._log = this._log; michael@0: this.upstream._send = this._send.bind(this); michael@0: this.upstream._receive = this._receive.bind(this); michael@0: this.upstream.write = this._writeUpstream.bind(this); michael@0: this.upstream.on('error', this.emit.bind(this, 'error')); michael@0: michael@0: this.on('finish', this._finishing); michael@0: }; michael@0: michael@0: Stream.prototype._pushUpstream = function _pushUpstream(frame) { michael@0: this.upstream.push(frame); michael@0: this._transition(true, frame); michael@0: }; michael@0: michael@0: // Overriding the upstream's `write` allows us to act immediately instead of waiting for the input michael@0: // queue to empty. This is important in case of control frames. michael@0: Stream.prototype._writeUpstream = function _writeUpstream(frame) { michael@0: this._log.debug({ frame: frame }, 'Receiving frame'); michael@0: michael@0: var moreNeeded = Flow.prototype.write.call(this.upstream, frame); michael@0: michael@0: // * Transition to a new state if that's the effect of receiving the frame michael@0: this._transition(false, frame); michael@0: michael@0: // * If it's a control frame. Call the appropriate handler method. michael@0: if (frame.type === 'HEADERS') { michael@0: this._onHeaders(frame); michael@0: } else if (frame.type === 'PUSH_PROMISE') { michael@0: this._onPromise(frame); michael@0: } else if (frame.type === 'PRIORITY') { michael@0: this._onPriority(frame); michael@0: } michael@0: michael@0: // * If it's an invalid stream level frame, emit error michael@0: else if ((frame.type !== 'DATA') && michael@0: (frame.type !== 'WINDOW_UPDATE') && michael@0: (frame.type !== 'RST_STREAM')) { michael@0: this._log.error({ frame: frame }, 'Invalid stream level frame'); michael@0: this.emit('error', 'PROTOCOL_ERROR'); michael@0: } michael@0: michael@0: return moreNeeded; michael@0: }; michael@0: michael@0: // The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame. michael@0: Stream.prototype._receive = function _receive(frame, ready) { michael@0: // * If it's a DATA frame, then push the payload into the output buffer on the other side. michael@0: // Call ready when the other side is ready to receive more. michael@0: if (!this._ended && (frame.type === 'DATA')) { michael@0: var moreNeeded = this.push(frame.data); michael@0: if (!moreNeeded) { michael@0: this._receiveMore = ready; michael@0: } michael@0: } michael@0: michael@0: // * Any frame may signal the end of the stream with the END_STREAM flag michael@0: if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { michael@0: this.push(null); michael@0: this._ended = true; michael@0: } michael@0: michael@0: // * Postpone calling `ready` if `push()` returned a falsy value michael@0: if (this._receiveMore !== ready) { michael@0: ready(); michael@0: } michael@0: }; michael@0: michael@0: // The `_read` method is called when the user side is ready to receive more data. If there's a michael@0: // pending write on the upstream, then call its pending ready callback to receive more frames. michael@0: Stream.prototype._read = function _read() { michael@0: if (this._receiveMore) { michael@0: var receiveMore = this._receiveMore; michael@0: delete this._receiveMore; michael@0: receiveMore(); michael@0: } michael@0: }; michael@0: michael@0: // The `write` method gets called when there's a write request from the user. michael@0: Stream.prototype._write = function _write(buffer, encoding, ready) { michael@0: // * Chunking is done by the upstream Flow. michael@0: var moreNeeded = this._pushUpstream({ michael@0: type: 'DATA', michael@0: flags: {}, michael@0: stream: this.id, michael@0: data: buffer michael@0: }); michael@0: michael@0: // * Call ready when upstream is ready to receive more frames. michael@0: if (moreNeeded) { michael@0: ready(); michael@0: } else { michael@0: this._sendMore = ready; michael@0: } michael@0: }; michael@0: michael@0: // The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames. michael@0: // If there's a pending write on the user side, then call its pending ready callback to receive more michael@0: // writes. michael@0: Stream.prototype._send = function _send() { michael@0: if (this._sendMore) { michael@0: var sendMore = this._sendMore; michael@0: delete this._sendMore; michael@0: sendMore(); michael@0: } michael@0: }; michael@0: michael@0: // When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM` michael@0: // flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag, michael@0: // then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an michael@0: // existing frame is a nice optimization. michael@0: var emptyBuffer = new Buffer(0); michael@0: Stream.prototype._finishing = function _finishing() { michael@0: var endFrame = { michael@0: type: 'DATA', michael@0: flags: { END_STREAM: true }, michael@0: stream: this.id, michael@0: data: emptyBuffer michael@0: }; michael@0: var lastFrame = this.upstream.getLastQueuedFrame(); michael@0: if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) { michael@0: this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.'); michael@0: lastFrame.flags.END_STREAM = true; michael@0: this._transition(true, endFrame); michael@0: } else { michael@0: this._pushUpstream(endFrame); michael@0: } michael@0: }; michael@0: michael@0: // [Stream States](http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-5.1) michael@0: // ---------------- michael@0: // michael@0: // +--------+ michael@0: // PP | | PP michael@0: // ,--------| idle |--------. michael@0: // / | | \ michael@0: // v +--------+ v michael@0: // +----------+ | +----------+ michael@0: // | | | H | | michael@0: // ,---| reserved | | | reserved |---. michael@0: // | | (local) | v | (remote) | | michael@0: // | +----------+ +--------+ +----------+ | michael@0: // | | ES | | ES | | michael@0: // | | H ,-------| open |-------. | H | michael@0: // | | / | | \ | | michael@0: // | v v +--------+ v v | michael@0: // | +----------+ | +----------+ | michael@0: // | | half | | | half | | michael@0: // | | closed | | R | closed | | michael@0: // | | (remote) | | | (local) | | michael@0: // | +----------+ | +----------+ | michael@0: // | | v | | michael@0: // | | ES / R +--------+ ES / R | | michael@0: // | `----------->| |<-----------' | michael@0: // | R | closed | R | michael@0: // `-------------------->| |<--------------------' michael@0: // +--------+ michael@0: michael@0: // Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame michael@0: Stream.prototype._initializeState = function _initializeState() { michael@0: this.state = 'IDLE'; michael@0: this._initiated = undefined; michael@0: this._closedByUs = undefined; michael@0: this._closedWithRst = undefined; michael@0: }; michael@0: michael@0: // Only `_setState` should change `this.state` directly. It also logs the state change and notifies michael@0: // interested parties using the 'state' event. michael@0: Stream.prototype._setState = function transition(state) { michael@0: assert(this.state !== state); michael@0: this._log.debug({ from: this.state, to: state }, 'State transition'); michael@0: this.state = state; michael@0: this.emit('state', state); michael@0: }; michael@0: michael@0: // A state is 'active' if the stream in that state counts towards the concurrency limit. Streams michael@0: // that are in the "open" state, or either of the "half closed" states count toward this limit. michael@0: function activeState(state) { michael@0: return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN')); michael@0: } michael@0: michael@0: // `_transition` is called every time there's an incoming or outgoing frame. It manages state michael@0: // transitions, and detects stream errors. A stream error is always caused by a frame that is not michael@0: // allowed in the current state. michael@0: Stream.prototype._transition = function transition(sending, frame) { michael@0: var receiving = !sending; michael@0: var error = undefined; michael@0: michael@0: var DATA = false, HEADERS = false, PRIORITY = false; michael@0: var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false; michael@0: switch(frame.type) { michael@0: case 'DATA' : DATA = true; break; michael@0: case 'HEADERS' : HEADERS = true; break; michael@0: case 'PRIORITY' : PRIORITY = true; break; michael@0: case 'RST_STREAM' : RST_STREAM = true; break; michael@0: case 'PUSH_PROMISE' : PUSH_PROMISE = true; break; michael@0: case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break; michael@0: } michael@0: michael@0: var previousState = this.state; michael@0: michael@0: switch (this.state) { michael@0: // All streams start in the **idle** state. In this state, no frames have been exchanged. michael@0: // michael@0: // * Sending or receiving a HEADERS frame causes the stream to become "open". michael@0: // michael@0: // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen. michael@0: case 'IDLE': michael@0: if (HEADERS) { michael@0: this._setState('OPEN'); michael@0: if (frame.flags.END_STREAM) { michael@0: this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); michael@0: } michael@0: this._initiated = sending; michael@0: } else if (sending && RST_STREAM) { michael@0: this._setState('CLOSED'); michael@0: } else { michael@0: error = 'PROTOCOL_ERROR'; michael@0: } michael@0: break; michael@0: michael@0: // A stream in the **reserved (local)** state is one that has been promised by sending a michael@0: // PUSH_PROMISE frame. michael@0: // michael@0: // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed michael@0: // (remote)" state. michael@0: // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This michael@0: // releases the stream reservation. michael@0: // * An endpoint may receive PRIORITY frame in this state. michael@0: // * An endpoint MUST NOT send any other type of frame in this state. michael@0: case 'RESERVED_LOCAL': michael@0: if (sending && HEADERS) { michael@0: this._setState('HALF_CLOSED_REMOTE'); michael@0: } else if (RST_STREAM) { michael@0: this._setState('CLOSED'); michael@0: } else if (receiving && PRIORITY) { michael@0: /* No state change */ michael@0: } else { michael@0: error = 'PROTOCOL_ERROR'; michael@0: } michael@0: break; michael@0: michael@0: // A stream in the **reserved (remote)** state has been reserved by a remote peer. michael@0: // michael@0: // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This michael@0: // releases the stream reservation. michael@0: // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)". michael@0: // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream. michael@0: // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR. michael@0: case 'RESERVED_REMOTE': michael@0: if (RST_STREAM) { michael@0: this._setState('CLOSED'); michael@0: } else if (receiving && HEADERS) { michael@0: this._setState('HALF_CLOSED_LOCAL'); michael@0: } else if (sending && PRIORITY) { michael@0: /* No state change */ michael@0: } else { michael@0: error = 'PROTOCOL_ERROR'; michael@0: } michael@0: break; michael@0: michael@0: // The **open** state is where both peers can send frames. In this state, sending peers observe michael@0: // advertised stream level flow control limits. michael@0: // michael@0: // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes michael@0: // the stream to transition into one of the "half closed" states: an endpoint sending a michael@0: // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint michael@0: // receiving a END_STREAM flag causes the stream state to become "half closed (remote)". michael@0: // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition michael@0: // immediately to "closed". michael@0: case 'OPEN': michael@0: if (frame.flags.END_STREAM) { michael@0: this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); michael@0: } else if (RST_STREAM) { michael@0: this._setState('CLOSED'); michael@0: } else { michael@0: /* No state change */ michael@0: } michael@0: break; michael@0: michael@0: // A stream that is **half closed (local)** cannot be used for sending frames. michael@0: // michael@0: // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM michael@0: // flag is received, or when either peer sends a RST_STREAM frame. michael@0: // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. michael@0: // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag. michael@0: case 'HALF_CLOSED_LOCAL': michael@0: if (RST_STREAM || (receiving && frame.flags.END_STREAM)) { michael@0: this._setState('CLOSED'); michael@0: } else if (receiving || (sending && (PRIORITY || WINDOW_UPDATE))) { michael@0: /* No state change */ michael@0: } else { michael@0: error = 'PROTOCOL_ERROR'; michael@0: } michael@0: break; michael@0: michael@0: // A stream that is **half closed (remote)** is no longer being used by the peer to send frames. michael@0: // In this state, an endpoint is no longer obligated to maintain a receiver flow control window michael@0: // if it performs flow control. michael@0: // michael@0: // * If an endpoint receives additional frames for a stream that is in this state it MUST michael@0: // respond with a stream error of type STREAM_CLOSED. michael@0: // * A stream can transition from this state to "closed" by sending a frame that contains a michael@0: // END_STREAM flag, or when either peer sends a RST_STREAM frame. michael@0: // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. michael@0: // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream. michael@0: case 'HALF_CLOSED_REMOTE': michael@0: if (RST_STREAM || (sending && frame.flags.END_STREAM)) { michael@0: this._setState('CLOSED'); michael@0: } else if (sending || (receiving && (WINDOW_UPDATE || PRIORITY))) { michael@0: /* No state change */ michael@0: } else { michael@0: error = 'PROTOCOL_ERROR'; michael@0: } michael@0: break; michael@0: michael@0: // The **closed** state is the terminal state. michael@0: // michael@0: // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame michael@0: // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST michael@0: // treat that as a stream error of type STREAM_CLOSED. michael@0: // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short michael@0: // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives michael@0: // and processes the frame bearing the END_STREAM flag, it might send either frame type. michael@0: // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY michael@0: // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending michael@0: // END_STREAM as a connection error of type PROTOCOL_ERROR. michael@0: // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives michael@0: // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream michael@0: // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that michael@0: // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose michael@0: // to limit the period over which it ignores frames and treat frames that arrive after this michael@0: // time as being in error. michael@0: // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE michael@0: // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM michael@0: // can be used to close any of those streams. michael@0: case 'CLOSED': michael@0: if ((sending && RST_STREAM) || michael@0: (receiving && this._closedByUs && michael@0: (this._closedWithRst || WINDOW_UPDATE || PRIORITY || RST_STREAM))) { michael@0: /* No state change */ michael@0: } else { michael@0: error = 'STREAM_CLOSED'; michael@0: } michael@0: break; michael@0: } michael@0: michael@0: // Noting that the connection was closed by the other endpoint. It may be important in edge cases. michael@0: // For example, when the peer tries to cancel a promised stream, but we already sent every data michael@0: // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM. michael@0: if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) { michael@0: this._closedByUs = sending; michael@0: this._closedWithRst = RST_STREAM; michael@0: } michael@0: michael@0: // Sending/receiving a PUSH_PROMISE michael@0: // michael@0: // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state michael@0: // for the reserved stream transitions to "reserved (local)". michael@0: // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer. michael@0: // The state of the stream becomes "reserved (remote)". michael@0: if (PUSH_PROMISE && !error) { michael@0: /* This assertion must hold, because _transition is called immediately when a frame is written michael@0: to the stream. If it would be called when a frame gets out of the input queue, the state michael@0: of the reserved could have been changed by then. */ michael@0: assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state); michael@0: frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE'); michael@0: frame.promised_stream._initiated = sending; michael@0: } michael@0: michael@0: // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) michael@0: if (this._initiated) { michael@0: var change = (activeState(this.state) - activeState(previousState)); michael@0: if (sending) { michael@0: frame.count_change = change; michael@0: } else { michael@0: frame.count_change(change); michael@0: } michael@0: } else if (sending) { michael@0: frame.count_change = 0; michael@0: } michael@0: michael@0: // Common error handling. michael@0: if (error) { michael@0: var info = { michael@0: error: error, michael@0: frame: frame, michael@0: state: this.state, michael@0: closedByUs: this._closedByUs, michael@0: closedWithRst: this._closedWithRst michael@0: }; michael@0: michael@0: // * When sending something invalid, throwing an exception, since it is probably a bug. michael@0: if (sending) { michael@0: this._log.error(info, 'Sending illegal frame.'); michael@0: throw new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'); michael@0: } michael@0: michael@0: // * When receiving something invalid, sending an RST_STREAM using the `reset` method. michael@0: // This will automatically cause a transition to the CLOSED state. michael@0: else { michael@0: this._log.error(info, 'Received illegal frame.'); michael@0: this.emit('error', error); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: // Bunyan serializers michael@0: // ------------------ michael@0: michael@0: exports.serializers = {}; michael@0: michael@0: var nextId = 0; michael@0: exports.serializers.s = function(stream) { michael@0: if (!('_id' in stream)) { michael@0: stream._id = nextId; michael@0: nextId += 1; michael@0: } michael@0: return stream._id; michael@0: };