Thu, 15 Jan 2015 21:03:48 +0100
Integrate friendly tips from Tor colleagues to make (or not) 4.5 alpha 3;
This includes removal of overloaded (but unused) methods, and addition of
a overlooked call to DataStruct::SetData(nsISupports, uint32_t, bool.)
michael@0 | 1 | var assert = require('assert'); |
michael@0 | 2 | |
michael@0 | 3 | // The Stream class |
michael@0 | 4 | // ================ |
michael@0 | 5 | |
michael@0 | 6 | // Stream is a [Duplex stream](http://nodejs.org/api/stream.html#stream_class_stream_duplex) |
michael@0 | 7 | // subclass that implements the [HTTP/2 Stream](http://http2.github.io/http2-spec/#rfc.section.3.4) |
michael@0 | 8 | // concept. It has two 'sides': one that is used by the user to send/receive data (the `stream` |
michael@0 | 9 | // object itself) and one that is used by a Connection to read/write frames to/from the other peer |
michael@0 | 10 | // (`stream.upstream`). |
michael@0 | 11 | |
michael@0 | 12 | var Duplex = require('stream').Duplex; |
michael@0 | 13 | |
michael@0 | 14 | exports.Stream = Stream; |
michael@0 | 15 | |
michael@0 | 16 | // Public API |
michael@0 | 17 | // ---------- |
michael@0 | 18 | |
michael@0 | 19 | // * **new Stream(log)**: create a new Stream |
michael@0 | 20 | // |
michael@0 | 21 | // * **Event: 'headers' (headers)**: signals incoming headers |
michael@0 | 22 | // |
michael@0 | 23 | // * **Event: 'promise' (stream, headers)**: signals an incoming push promise |
michael@0 | 24 | // |
michael@0 | 25 | // * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0 |
michael@0 | 26 | // (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. |
michael@0 | 27 | // |
michael@0 | 28 | // * **Event: 'error' (type)**: signals an error |
michael@0 | 29 | // |
michael@0 | 30 | // * **headers(headers)**: send headers |
michael@0 | 31 | // |
michael@0 | 32 | // * **promise(headers): Stream**: promise a stream |
michael@0 | 33 | // |
michael@0 | 34 | // * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer |
michael@0 | 35 | // too, but once it is set locally, it can not be changed remotely. |
michael@0 | 36 | // |
michael@0 | 37 | // * **reset(error)**: reset the stream with an error code |
michael@0 | 38 | // |
michael@0 | 39 | // * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames |
michael@0 | 40 | // that are to be sent/arrived to/from the peer and are related to this stream. |
michael@0 | 41 | // |
michael@0 | 42 | // Headers are always in the [regular node.js header format][1]. |
michael@0 | 43 | // [1]: http://nodejs.org/api/http.html#http_message_headers |
michael@0 | 44 | |
michael@0 | 45 | // Constructor |
michael@0 | 46 | // ----------- |
michael@0 | 47 | |
michael@0 | 48 | // The main aspects of managing the stream are: |
michael@0 | 49 | function Stream(log) { |
michael@0 | 50 | Duplex.call(this); |
michael@0 | 51 | |
michael@0 | 52 | // * logging |
michael@0 | 53 | this._log = log.child({ component: 'stream', s: this }); |
michael@0 | 54 | |
michael@0 | 55 | // * receiving and sending stream management commands |
michael@0 | 56 | this._initializeManagement(); |
michael@0 | 57 | |
michael@0 | 58 | // * sending and receiving frames to/from the upstream connection |
michael@0 | 59 | this._initializeDataFlow(); |
michael@0 | 60 | |
michael@0 | 61 | // * maintaining the state of the stream (idle, open, closed, etc.) and error detection |
michael@0 | 62 | this._initializeState(); |
michael@0 | 63 | } |
michael@0 | 64 | |
michael@0 | 65 | Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } }); |
michael@0 | 66 | |
michael@0 | 67 | // Managing the stream |
michael@0 | 68 | // ------------------- |
michael@0 | 69 | |
michael@0 | 70 | // the default stream priority is 2^30 |
michael@0 | 71 | var DEFAULT_PRIORITY = Math.pow(2, 30); |
michael@0 | 72 | var MAX_PRIORITY = Math.pow(2, 31) - 1; |
michael@0 | 73 | |
michael@0 | 74 | // PUSH_PROMISE and HEADERS are forwarded to the user through events. |
michael@0 | 75 | Stream.prototype._initializeManagement = function _initializeManagement() { |
michael@0 | 76 | this._resetSent = false; |
michael@0 | 77 | this._priority = DEFAULT_PRIORITY; |
michael@0 | 78 | this._letPeerPrioritize = true; |
michael@0 | 79 | }; |
michael@0 | 80 | |
michael@0 | 81 | Stream.prototype.promise = function promise(headers) { |
michael@0 | 82 | var stream = new Stream(this._log); |
michael@0 | 83 | stream._priority = Math.min(this._priority + 1, MAX_PRIORITY); |
michael@0 | 84 | this._pushUpstream({ |
michael@0 | 85 | type: 'PUSH_PROMISE', |
michael@0 | 86 | flags: {}, |
michael@0 | 87 | stream: this.id, |
michael@0 | 88 | promised_stream: stream, |
michael@0 | 89 | headers: headers |
michael@0 | 90 | }); |
michael@0 | 91 | return stream; |
michael@0 | 92 | }; |
michael@0 | 93 | |
michael@0 | 94 | Stream.prototype._onPromise = function _onPromise(frame) { |
michael@0 | 95 | this.emit('promise', frame.promised_stream, frame.headers); |
michael@0 | 96 | }; |
michael@0 | 97 | |
michael@0 | 98 | Stream.prototype.headers = function headers(headers) { |
michael@0 | 99 | this._pushUpstream({ |
michael@0 | 100 | type: 'HEADERS', |
michael@0 | 101 | flags: {}, |
michael@0 | 102 | stream: this.id, |
michael@0 | 103 | headers: headers |
michael@0 | 104 | }); |
michael@0 | 105 | }; |
michael@0 | 106 | |
michael@0 | 107 | Stream.prototype._onHeaders = function _onHeaders(frame) { |
michael@0 | 108 | if (frame.priority !== undefined) { |
michael@0 | 109 | this.priority(frame.priority, true); |
michael@0 | 110 | } |
michael@0 | 111 | this.emit('headers', frame.headers); |
michael@0 | 112 | }; |
michael@0 | 113 | |
michael@0 | 114 | Stream.prototype.priority = function priority(priority, peer) { |
michael@0 | 115 | if ((peer && this._letPeerPrioritize) || !peer) { |
michael@0 | 116 | if (!peer) { |
michael@0 | 117 | this._letPeerPrioritize = false; |
michael@0 | 118 | |
michael@0 | 119 | var lastFrame = this.upstream.getLastQueuedFrame(); |
michael@0 | 120 | if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) { |
michael@0 | 121 | lastFrame.priority = priority; |
michael@0 | 122 | } else { |
michael@0 | 123 | this._pushUpstream({ |
michael@0 | 124 | type: 'PRIORITY', |
michael@0 | 125 | flags: {}, |
michael@0 | 126 | stream: this.id, |
michael@0 | 127 | priority: priority |
michael@0 | 128 | }); |
michael@0 | 129 | } |
michael@0 | 130 | } |
michael@0 | 131 | |
michael@0 | 132 | this._log.debug({ priority: priority }, 'Changing priority'); |
michael@0 | 133 | this.emit('priority', priority); |
michael@0 | 134 | this._priority = priority; |
michael@0 | 135 | } |
michael@0 | 136 | }; |
michael@0 | 137 | |
michael@0 | 138 | Stream.prototype._onPriority = function _onPriority(frame) { |
michael@0 | 139 | this.priority(frame.priority, true); |
michael@0 | 140 | }; |
michael@0 | 141 | |
michael@0 | 142 | // Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for |
michael@0 | 143 | // any stream. |
michael@0 | 144 | Stream.prototype.reset = function reset(error) { |
michael@0 | 145 | if (!this._resetSent) { |
michael@0 | 146 | this._resetSent = true; |
michael@0 | 147 | this._pushUpstream({ |
michael@0 | 148 | type: 'RST_STREAM', |
michael@0 | 149 | flags: {}, |
michael@0 | 150 | stream: this.id, |
michael@0 | 151 | error: error |
michael@0 | 152 | }); |
michael@0 | 153 | } |
michael@0 | 154 | }; |
michael@0 | 155 | |
michael@0 | 156 | // Data flow |
michael@0 | 157 | // --------- |
michael@0 | 158 | |
michael@0 | 159 | // The incoming and the generated outgoing frames are received/transmitted on the `this.upstream` |
michael@0 | 160 | // [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read |
michael@0 | 161 | // and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by |
michael@0 | 162 | // the user to write or read the body of the request. |
michael@0 | 163 | // [1]: http://nodejs.org/api/stream.html#stream_class_stream_duplex |
michael@0 | 164 | |
michael@0 | 165 | // upstream side stream user side |
michael@0 | 166 | // |
michael@0 | 167 | // +------------------------------------+ |
michael@0 | 168 | // | | |
michael@0 | 169 | // +------------------+ | |
michael@0 | 170 | // | upstream | | |
michael@0 | 171 | // | | | |
michael@0 | 172 | // +--+ | +--| |
michael@0 | 173 | // read() | | _send() | _write() | | write(buf) |
michael@0 | 174 | // <--------------|B |<--------------|--------------| B|<------------ |
michael@0 | 175 | // | | | | | |
michael@0 | 176 | // frames +--+ | +--| buffers |
michael@0 | 177 | // | | | | | |
michael@0 | 178 | // -------------->|B |---------------|------------->| B|------------> |
michael@0 | 179 | // write(frame) | | _receive() | _read() | | read() |
michael@0 | 180 | // +--+ | +--| |
michael@0 | 181 | // | | | |
michael@0 | 182 | // | | | |
michael@0 | 183 | // +------------------+ | |
michael@0 | 184 | // | | |
michael@0 | 185 | // +------------------------------------+ |
michael@0 | 186 | // |
michael@0 | 187 | // B: input or output buffer |
michael@0 | 188 | |
michael@0 | 189 | var Flow = require('./flow').Flow; |
michael@0 | 190 | |
michael@0 | 191 | Stream.prototype._initializeDataFlow = function _initializeDataFlow() { |
michael@0 | 192 | this.id = undefined; |
michael@0 | 193 | |
michael@0 | 194 | this._ended = false; |
michael@0 | 195 | |
michael@0 | 196 | this.upstream = new Flow(); |
michael@0 | 197 | this.upstream._log = this._log; |
michael@0 | 198 | this.upstream._send = this._send.bind(this); |
michael@0 | 199 | this.upstream._receive = this._receive.bind(this); |
michael@0 | 200 | this.upstream.write = this._writeUpstream.bind(this); |
michael@0 | 201 | this.upstream.on('error', this.emit.bind(this, 'error')); |
michael@0 | 202 | |
michael@0 | 203 | this.on('finish', this._finishing); |
michael@0 | 204 | }; |
michael@0 | 205 | |
michael@0 | 206 | Stream.prototype._pushUpstream = function _pushUpstream(frame) { |
michael@0 | 207 | this.upstream.push(frame); |
michael@0 | 208 | this._transition(true, frame); |
michael@0 | 209 | }; |
michael@0 | 210 | |
michael@0 | 211 | // Overriding the upstream's `write` allows us to act immediately instead of waiting for the input |
michael@0 | 212 | // queue to empty. This is important in case of control frames. |
michael@0 | 213 | Stream.prototype._writeUpstream = function _writeUpstream(frame) { |
michael@0 | 214 | this._log.debug({ frame: frame }, 'Receiving frame'); |
michael@0 | 215 | |
michael@0 | 216 | var moreNeeded = Flow.prototype.write.call(this.upstream, frame); |
michael@0 | 217 | |
michael@0 | 218 | // * Transition to a new state if that's the effect of receiving the frame |
michael@0 | 219 | this._transition(false, frame); |
michael@0 | 220 | |
michael@0 | 221 | // * If it's a control frame. Call the appropriate handler method. |
michael@0 | 222 | if (frame.type === 'HEADERS') { |
michael@0 | 223 | this._onHeaders(frame); |
michael@0 | 224 | } else if (frame.type === 'PUSH_PROMISE') { |
michael@0 | 225 | this._onPromise(frame); |
michael@0 | 226 | } else if (frame.type === 'PRIORITY') { |
michael@0 | 227 | this._onPriority(frame); |
michael@0 | 228 | } |
michael@0 | 229 | |
michael@0 | 230 | // * If it's an invalid stream level frame, emit error |
michael@0 | 231 | else if ((frame.type !== 'DATA') && |
michael@0 | 232 | (frame.type !== 'WINDOW_UPDATE') && |
michael@0 | 233 | (frame.type !== 'RST_STREAM')) { |
michael@0 | 234 | this._log.error({ frame: frame }, 'Invalid stream level frame'); |
michael@0 | 235 | this.emit('error', 'PROTOCOL_ERROR'); |
michael@0 | 236 | } |
michael@0 | 237 | |
michael@0 | 238 | return moreNeeded; |
michael@0 | 239 | }; |
michael@0 | 240 | |
michael@0 | 241 | // The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame. |
michael@0 | 242 | Stream.prototype._receive = function _receive(frame, ready) { |
michael@0 | 243 | // * If it's a DATA frame, then push the payload into the output buffer on the other side. |
michael@0 | 244 | // Call ready when the other side is ready to receive more. |
michael@0 | 245 | if (!this._ended && (frame.type === 'DATA')) { |
michael@0 | 246 | var moreNeeded = this.push(frame.data); |
michael@0 | 247 | if (!moreNeeded) { |
michael@0 | 248 | this._receiveMore = ready; |
michael@0 | 249 | } |
michael@0 | 250 | } |
michael@0 | 251 | |
michael@0 | 252 | // * Any frame may signal the end of the stream with the END_STREAM flag |
michael@0 | 253 | if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { |
michael@0 | 254 | this.push(null); |
michael@0 | 255 | this._ended = true; |
michael@0 | 256 | } |
michael@0 | 257 | |
michael@0 | 258 | // * Postpone calling `ready` if `push()` returned a falsy value |
michael@0 | 259 | if (this._receiveMore !== ready) { |
michael@0 | 260 | ready(); |
michael@0 | 261 | } |
michael@0 | 262 | }; |
michael@0 | 263 | |
michael@0 | 264 | // The `_read` method is called when the user side is ready to receive more data. If there's a |
michael@0 | 265 | // pending write on the upstream, then call its pending ready callback to receive more frames. |
michael@0 | 266 | Stream.prototype._read = function _read() { |
michael@0 | 267 | if (this._receiveMore) { |
michael@0 | 268 | var receiveMore = this._receiveMore; |
michael@0 | 269 | delete this._receiveMore; |
michael@0 | 270 | receiveMore(); |
michael@0 | 271 | } |
michael@0 | 272 | }; |
michael@0 | 273 | |
michael@0 | 274 | // The `write` method gets called when there's a write request from the user. |
michael@0 | 275 | Stream.prototype._write = function _write(buffer, encoding, ready) { |
michael@0 | 276 | // * Chunking is done by the upstream Flow. |
michael@0 | 277 | var moreNeeded = this._pushUpstream({ |
michael@0 | 278 | type: 'DATA', |
michael@0 | 279 | flags: {}, |
michael@0 | 280 | stream: this.id, |
michael@0 | 281 | data: buffer |
michael@0 | 282 | }); |
michael@0 | 283 | |
michael@0 | 284 | // * Call ready when upstream is ready to receive more frames. |
michael@0 | 285 | if (moreNeeded) { |
michael@0 | 286 | ready(); |
michael@0 | 287 | } else { |
michael@0 | 288 | this._sendMore = ready; |
michael@0 | 289 | } |
michael@0 | 290 | }; |
michael@0 | 291 | |
michael@0 | 292 | // The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames. |
michael@0 | 293 | // If there's a pending write on the user side, then call its pending ready callback to receive more |
michael@0 | 294 | // writes. |
michael@0 | 295 | Stream.prototype._send = function _send() { |
michael@0 | 296 | if (this._sendMore) { |
michael@0 | 297 | var sendMore = this._sendMore; |
michael@0 | 298 | delete this._sendMore; |
michael@0 | 299 | sendMore(); |
michael@0 | 300 | } |
michael@0 | 301 | }; |
michael@0 | 302 | |
michael@0 | 303 | // When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM` |
michael@0 | 304 | // flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag, |
michael@0 | 305 | // then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an |
michael@0 | 306 | // existing frame is a nice optimization. |
michael@0 | 307 | var emptyBuffer = new Buffer(0); |
michael@0 | 308 | Stream.prototype._finishing = function _finishing() { |
michael@0 | 309 | var endFrame = { |
michael@0 | 310 | type: 'DATA', |
michael@0 | 311 | flags: { END_STREAM: true }, |
michael@0 | 312 | stream: this.id, |
michael@0 | 313 | data: emptyBuffer |
michael@0 | 314 | }; |
michael@0 | 315 | var lastFrame = this.upstream.getLastQueuedFrame(); |
michael@0 | 316 | if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) { |
michael@0 | 317 | this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.'); |
michael@0 | 318 | lastFrame.flags.END_STREAM = true; |
michael@0 | 319 | this._transition(true, endFrame); |
michael@0 | 320 | } else { |
michael@0 | 321 | this._pushUpstream(endFrame); |
michael@0 | 322 | } |
michael@0 | 323 | }; |
michael@0 | 324 | |
michael@0 | 325 | // [Stream States](http://tools.ietf.org/html/draft-ietf-httpbis-http2-10#section-5.1) |
michael@0 | 326 | // ---------------- |
michael@0 | 327 | // |
michael@0 | 328 | // +--------+ |
michael@0 | 329 | // PP | | PP |
michael@0 | 330 | // ,--------| idle |--------. |
michael@0 | 331 | // / | | \ |
michael@0 | 332 | // v +--------+ v |
michael@0 | 333 | // +----------+ | +----------+ |
michael@0 | 334 | // | | | H | | |
michael@0 | 335 | // ,---| reserved | | | reserved |---. |
michael@0 | 336 | // | | (local) | v | (remote) | | |
michael@0 | 337 | // | +----------+ +--------+ +----------+ | |
michael@0 | 338 | // | | ES | | ES | | |
michael@0 | 339 | // | | H ,-------| open |-------. | H | |
michael@0 | 340 | // | | / | | \ | | |
michael@0 | 341 | // | v v +--------+ v v | |
michael@0 | 342 | // | +----------+ | +----------+ | |
michael@0 | 343 | // | | half | | | half | | |
michael@0 | 344 | // | | closed | | R | closed | | |
michael@0 | 345 | // | | (remote) | | | (local) | | |
michael@0 | 346 | // | +----------+ | +----------+ | |
michael@0 | 347 | // | | v | | |
michael@0 | 348 | // | | ES / R +--------+ ES / R | | |
michael@0 | 349 | // | `----------->| |<-----------' | |
michael@0 | 350 | // | R | closed | R | |
michael@0 | 351 | // `-------------------->| |<--------------------' |
michael@0 | 352 | // +--------+ |
michael@0 | 353 | |
michael@0 | 354 | // Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame |
michael@0 | 355 | Stream.prototype._initializeState = function _initializeState() { |
michael@0 | 356 | this.state = 'IDLE'; |
michael@0 | 357 | this._initiated = undefined; |
michael@0 | 358 | this._closedByUs = undefined; |
michael@0 | 359 | this._closedWithRst = undefined; |
michael@0 | 360 | }; |
michael@0 | 361 | |
michael@0 | 362 | // Only `_setState` should change `this.state` directly. It also logs the state change and notifies |
michael@0 | 363 | // interested parties using the 'state' event. |
michael@0 | 364 | Stream.prototype._setState = function transition(state) { |
michael@0 | 365 | assert(this.state !== state); |
michael@0 | 366 | this._log.debug({ from: this.state, to: state }, 'State transition'); |
michael@0 | 367 | this.state = state; |
michael@0 | 368 | this.emit('state', state); |
michael@0 | 369 | }; |
michael@0 | 370 | |
michael@0 | 371 | // A state is 'active' if the stream in that state counts towards the concurrency limit. Streams |
michael@0 | 372 | // that are in the "open" state, or either of the "half closed" states count toward this limit. |
michael@0 | 373 | function activeState(state) { |
michael@0 | 374 | return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN')); |
michael@0 | 375 | } |
michael@0 | 376 | |
michael@0 | 377 | // `_transition` is called every time there's an incoming or outgoing frame. It manages state |
michael@0 | 378 | // transitions, and detects stream errors. A stream error is always caused by a frame that is not |
michael@0 | 379 | // allowed in the current state. |
michael@0 | 380 | Stream.prototype._transition = function transition(sending, frame) { |
michael@0 | 381 | var receiving = !sending; |
michael@0 | 382 | var error = undefined; |
michael@0 | 383 | |
michael@0 | 384 | var DATA = false, HEADERS = false, PRIORITY = false; |
michael@0 | 385 | var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false; |
michael@0 | 386 | switch(frame.type) { |
michael@0 | 387 | case 'DATA' : DATA = true; break; |
michael@0 | 388 | case 'HEADERS' : HEADERS = true; break; |
michael@0 | 389 | case 'PRIORITY' : PRIORITY = true; break; |
michael@0 | 390 | case 'RST_STREAM' : RST_STREAM = true; break; |
michael@0 | 391 | case 'PUSH_PROMISE' : PUSH_PROMISE = true; break; |
michael@0 | 392 | case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break; |
michael@0 | 393 | } |
michael@0 | 394 | |
michael@0 | 395 | var previousState = this.state; |
michael@0 | 396 | |
michael@0 | 397 | switch (this.state) { |
michael@0 | 398 | // All streams start in the **idle** state. In this state, no frames have been exchanged. |
michael@0 | 399 | // |
michael@0 | 400 | // * Sending or receiving a HEADERS frame causes the stream to become "open". |
michael@0 | 401 | // |
michael@0 | 402 | // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen. |
michael@0 | 403 | case 'IDLE': |
michael@0 | 404 | if (HEADERS) { |
michael@0 | 405 | this._setState('OPEN'); |
michael@0 | 406 | if (frame.flags.END_STREAM) { |
michael@0 | 407 | this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); |
michael@0 | 408 | } |
michael@0 | 409 | this._initiated = sending; |
michael@0 | 410 | } else if (sending && RST_STREAM) { |
michael@0 | 411 | this._setState('CLOSED'); |
michael@0 | 412 | } else { |
michael@0 | 413 | error = 'PROTOCOL_ERROR'; |
michael@0 | 414 | } |
michael@0 | 415 | break; |
michael@0 | 416 | |
michael@0 | 417 | // A stream in the **reserved (local)** state is one that has been promised by sending a |
michael@0 | 418 | // PUSH_PROMISE frame. |
michael@0 | 419 | // |
michael@0 | 420 | // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed |
michael@0 | 421 | // (remote)" state. |
michael@0 | 422 | // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This |
michael@0 | 423 | // releases the stream reservation. |
michael@0 | 424 | // * An endpoint may receive PRIORITY frame in this state. |
michael@0 | 425 | // * An endpoint MUST NOT send any other type of frame in this state. |
michael@0 | 426 | case 'RESERVED_LOCAL': |
michael@0 | 427 | if (sending && HEADERS) { |
michael@0 | 428 | this._setState('HALF_CLOSED_REMOTE'); |
michael@0 | 429 | } else if (RST_STREAM) { |
michael@0 | 430 | this._setState('CLOSED'); |
michael@0 | 431 | } else if (receiving && PRIORITY) { |
michael@0 | 432 | /* No state change */ |
michael@0 | 433 | } else { |
michael@0 | 434 | error = 'PROTOCOL_ERROR'; |
michael@0 | 435 | } |
michael@0 | 436 | break; |
michael@0 | 437 | |
michael@0 | 438 | // A stream in the **reserved (remote)** state has been reserved by a remote peer. |
michael@0 | 439 | // |
michael@0 | 440 | // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This |
michael@0 | 441 | // releases the stream reservation. |
michael@0 | 442 | // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)". |
michael@0 | 443 | // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream. |
michael@0 | 444 | // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR. |
michael@0 | 445 | case 'RESERVED_REMOTE': |
michael@0 | 446 | if (RST_STREAM) { |
michael@0 | 447 | this._setState('CLOSED'); |
michael@0 | 448 | } else if (receiving && HEADERS) { |
michael@0 | 449 | this._setState('HALF_CLOSED_LOCAL'); |
michael@0 | 450 | } else if (sending && PRIORITY) { |
michael@0 | 451 | /* No state change */ |
michael@0 | 452 | } else { |
michael@0 | 453 | error = 'PROTOCOL_ERROR'; |
michael@0 | 454 | } |
michael@0 | 455 | break; |
michael@0 | 456 | |
michael@0 | 457 | // The **open** state is where both peers can send frames. In this state, sending peers observe |
michael@0 | 458 | // advertised stream level flow control limits. |
michael@0 | 459 | // |
michael@0 | 460 | // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes |
michael@0 | 461 | // the stream to transition into one of the "half closed" states: an endpoint sending a |
michael@0 | 462 | // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint |
michael@0 | 463 | // receiving a END_STREAM flag causes the stream state to become "half closed (remote)". |
michael@0 | 464 | // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition |
michael@0 | 465 | // immediately to "closed". |
michael@0 | 466 | case 'OPEN': |
michael@0 | 467 | if (frame.flags.END_STREAM) { |
michael@0 | 468 | this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); |
michael@0 | 469 | } else if (RST_STREAM) { |
michael@0 | 470 | this._setState('CLOSED'); |
michael@0 | 471 | } else { |
michael@0 | 472 | /* No state change */ |
michael@0 | 473 | } |
michael@0 | 474 | break; |
michael@0 | 475 | |
michael@0 | 476 | // A stream that is **half closed (local)** cannot be used for sending frames. |
michael@0 | 477 | // |
michael@0 | 478 | // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM |
michael@0 | 479 | // flag is received, or when either peer sends a RST_STREAM frame. |
michael@0 | 480 | // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. |
michael@0 | 481 | // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag. |
michael@0 | 482 | case 'HALF_CLOSED_LOCAL': |
michael@0 | 483 | if (RST_STREAM || (receiving && frame.flags.END_STREAM)) { |
michael@0 | 484 | this._setState('CLOSED'); |
michael@0 | 485 | } else if (receiving || (sending && (PRIORITY || WINDOW_UPDATE))) { |
michael@0 | 486 | /* No state change */ |
michael@0 | 487 | } else { |
michael@0 | 488 | error = 'PROTOCOL_ERROR'; |
michael@0 | 489 | } |
michael@0 | 490 | break; |
michael@0 | 491 | |
michael@0 | 492 | // A stream that is **half closed (remote)** is no longer being used by the peer to send frames. |
michael@0 | 493 | // In this state, an endpoint is no longer obligated to maintain a receiver flow control window |
michael@0 | 494 | // if it performs flow control. |
michael@0 | 495 | // |
michael@0 | 496 | // * If an endpoint receives additional frames for a stream that is in this state it MUST |
michael@0 | 497 | // respond with a stream error of type STREAM_CLOSED. |
michael@0 | 498 | // * A stream can transition from this state to "closed" by sending a frame that contains a |
michael@0 | 499 | // END_STREAM flag, or when either peer sends a RST_STREAM frame. |
michael@0 | 500 | // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. |
michael@0 | 501 | // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream. |
michael@0 | 502 | case 'HALF_CLOSED_REMOTE': |
michael@0 | 503 | if (RST_STREAM || (sending && frame.flags.END_STREAM)) { |
michael@0 | 504 | this._setState('CLOSED'); |
michael@0 | 505 | } else if (sending || (receiving && (WINDOW_UPDATE || PRIORITY))) { |
michael@0 | 506 | /* No state change */ |
michael@0 | 507 | } else { |
michael@0 | 508 | error = 'PROTOCOL_ERROR'; |
michael@0 | 509 | } |
michael@0 | 510 | break; |
michael@0 | 511 | |
michael@0 | 512 | // The **closed** state is the terminal state. |
michael@0 | 513 | // |
michael@0 | 514 | // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame |
michael@0 | 515 | // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST |
michael@0 | 516 | // treat that as a stream error of type STREAM_CLOSED. |
michael@0 | 517 | // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short |
michael@0 | 518 | // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives |
michael@0 | 519 | // and processes the frame bearing the END_STREAM flag, it might send either frame type. |
michael@0 | 520 | // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY |
michael@0 | 521 | // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending |
michael@0 | 522 | // END_STREAM as a connection error of type PROTOCOL_ERROR. |
michael@0 | 523 | // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives |
michael@0 | 524 | // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream |
michael@0 | 525 | // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that |
michael@0 | 526 | // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose |
michael@0 | 527 | // to limit the period over which it ignores frames and treat frames that arrive after this |
michael@0 | 528 | // time as being in error. |
michael@0 | 529 | // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE |
michael@0 | 530 | // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM |
michael@0 | 531 | // can be used to close any of those streams. |
michael@0 | 532 | case 'CLOSED': |
michael@0 | 533 | if ((sending && RST_STREAM) || |
michael@0 | 534 | (receiving && this._closedByUs && |
michael@0 | 535 | (this._closedWithRst || WINDOW_UPDATE || PRIORITY || RST_STREAM))) { |
michael@0 | 536 | /* No state change */ |
michael@0 | 537 | } else { |
michael@0 | 538 | error = 'STREAM_CLOSED'; |
michael@0 | 539 | } |
michael@0 | 540 | break; |
michael@0 | 541 | } |
michael@0 | 542 | |
michael@0 | 543 | // Noting that the connection was closed by the other endpoint. It may be important in edge cases. |
michael@0 | 544 | // For example, when the peer tries to cancel a promised stream, but we already sent every data |
michael@0 | 545 | // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM. |
michael@0 | 546 | if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) { |
michael@0 | 547 | this._closedByUs = sending; |
michael@0 | 548 | this._closedWithRst = RST_STREAM; |
michael@0 | 549 | } |
michael@0 | 550 | |
michael@0 | 551 | // Sending/receiving a PUSH_PROMISE |
michael@0 | 552 | // |
michael@0 | 553 | // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state |
michael@0 | 554 | // for the reserved stream transitions to "reserved (local)". |
michael@0 | 555 | // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer. |
michael@0 | 556 | // The state of the stream becomes "reserved (remote)". |
michael@0 | 557 | if (PUSH_PROMISE && !error) { |
michael@0 | 558 | /* This assertion must hold, because _transition is called immediately when a frame is written |
michael@0 | 559 | to the stream. If it would be called when a frame gets out of the input queue, the state |
michael@0 | 560 | of the reserved could have been changed by then. */ |
michael@0 | 561 | assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state); |
michael@0 | 562 | frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE'); |
michael@0 | 563 | frame.promised_stream._initiated = sending; |
michael@0 | 564 | } |
michael@0 | 565 | |
michael@0 | 566 | // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) |
michael@0 | 567 | if (this._initiated) { |
michael@0 | 568 | var change = (activeState(this.state) - activeState(previousState)); |
michael@0 | 569 | if (sending) { |
michael@0 | 570 | frame.count_change = change; |
michael@0 | 571 | } else { |
michael@0 | 572 | frame.count_change(change); |
michael@0 | 573 | } |
michael@0 | 574 | } else if (sending) { |
michael@0 | 575 | frame.count_change = 0; |
michael@0 | 576 | } |
michael@0 | 577 | |
michael@0 | 578 | // Common error handling. |
michael@0 | 579 | if (error) { |
michael@0 | 580 | var info = { |
michael@0 | 581 | error: error, |
michael@0 | 582 | frame: frame, |
michael@0 | 583 | state: this.state, |
michael@0 | 584 | closedByUs: this._closedByUs, |
michael@0 | 585 | closedWithRst: this._closedWithRst |
michael@0 | 586 | }; |
michael@0 | 587 | |
michael@0 | 588 | // * When sending something invalid, throwing an exception, since it is probably a bug. |
michael@0 | 589 | if (sending) { |
michael@0 | 590 | this._log.error(info, 'Sending illegal frame.'); |
michael@0 | 591 | throw new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'); |
michael@0 | 592 | } |
michael@0 | 593 | |
michael@0 | 594 | // * When receiving something invalid, sending an RST_STREAM using the `reset` method. |
michael@0 | 595 | // This will automatically cause a transition to the CLOSED state. |
michael@0 | 596 | else { |
michael@0 | 597 | this._log.error(info, 'Received illegal frame.'); |
michael@0 | 598 | this.emit('error', error); |
michael@0 | 599 | } |
michael@0 | 600 | } |
michael@0 | 601 | }; |
michael@0 | 602 | |
michael@0 | 603 | // Bunyan serializers |
michael@0 | 604 | // ------------------ |
michael@0 | 605 | |
michael@0 | 606 | exports.serializers = {}; |
michael@0 | 607 | |
michael@0 | 608 | var nextId = 0; |
michael@0 | 609 | exports.serializers.s = function(stream) { |
michael@0 | 610 | if (!('_id' in stream)) { |
michael@0 | 611 | stream._id = nextId; |
michael@0 | 612 | nextId += 1; |
michael@0 | 613 | } |
michael@0 | 614 | return stream._id; |
michael@0 | 615 | }; |