testing/xpcshell/node-spdy/lib/spdy/server.js

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 var spdy = require('../spdy'),
michael@0 2 util = require('util'),
michael@0 3 https = require('https'),
michael@0 4 stream = require('stream'),
michael@0 5 Buffer = require('buffer').Buffer;
michael@0 6
michael@0 7 var crlf = new Buffer('\r\n');
michael@0 8 var last_frag = new Buffer('0\r\n\r\n');
michael@0 9
michael@0 10 var legacy = !stream.Duplex;
michael@0 11
michael@0 12 if (legacy) {
michael@0 13 var DuplexStream = stream;
michael@0 14 } else {
michael@0 15 var DuplexStream = stream.Duplex;
michael@0 16 }
michael@0 17
michael@0 18 //
michael@0 19 // ### function instantiate (HTTPSServer)
michael@0 20 // #### @HTTPSServer {https.Server|Function} Base server class
michael@0 21 // Will return constructor for SPDY Server, based on the HTTPSServer class
michael@0 22 //
michael@0 23 function instantiate(HTTPSServer) {
michael@0 24 //
michael@0 25 // ### function Server (options, requestListener)
michael@0 26 // #### @options {Object} tls server options
michael@0 27 // #### @requestListener {Function} (optional) request callback
michael@0 28 // SPDY Server @constructor
michael@0 29 //
michael@0 30 function Server(options, requestListener) {
michael@0 31 // Initialize
michael@0 32 this._init(HTTPSServer, options, requestListener);
michael@0 33
michael@0 34 // Wrap connection handler
michael@0 35 this._wrap();
michael@0 36 };
michael@0 37 util.inherits(Server, HTTPSServer);
michael@0 38
michael@0 39 // Copy prototype methods
michael@0 40 Object.keys(proto).forEach(function(key) {
michael@0 41 this[key] = proto[key];
michael@0 42 }, Server.prototype);
michael@0 43
michael@0 44 return Server;
michael@0 45 }
michael@0 46 exports.instantiate = instantiate;
michael@0 47
michael@0 48 // Common prototype for all servers
michael@0 49 var proto = {};
michael@0 50
michael@0 51 //
michael@0 52 // ### function _init(base, options, listener)
michael@0 53 // #### @base {Function} (optional) base server class (https.Server)
michael@0 54 // #### @options {Object} tls server options
michael@0 55 // #### @handler {Function} (optional) request handler
michael@0 56 // Initializer.
michael@0 57 //
michael@0 58 proto._init = function _init(base, options, handler) {
michael@0 59 var state = {};
michael@0 60 this._spdyState = state;
michael@0 61
michael@0 62 if (!options) options = {};
michael@0 63 if (!options.maxStreams) options.maxStreams = 100;
michael@0 64 if (!options.sinkSize) {
michael@0 65 options.sinkSize = 1 << 16;
michael@0 66 }
michael@0 67 if (!options.windowSize) {
michael@0 68 options.windowSize = 1 << 20; // 1mb
michael@0 69 }
michael@0 70
michael@0 71 options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
michael@0 72 options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
michael@0 73 state.options = options;
michael@0 74 state.reqHandler = handler;
michael@0 75
michael@0 76 if (options.plain && !options.ssl) {
michael@0 77 base.call(this, handler);
michael@0 78 } else {
michael@0 79 base.call(this, options, handler);
michael@0 80 }
michael@0 81
michael@0 82 // Use https if NPN is not supported
michael@0 83 if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) {
michael@0 84 return;
michael@0 85 }
michael@0 86 };
michael@0 87
michael@0 88 //
michael@0 89 // ### function _wrap()
michael@0 90 // Wrap connection handler and add logic.
michael@0 91 //
michael@0 92 proto._wrap = function _wrap() {
michael@0 93 var self = this,
michael@0 94 state = this._spdyState;
michael@0 95
michael@0 96 // Wrap connection handler
michael@0 97 var event = state.options.plain && !state.options.ssl ? 'connection' :
michael@0 98 'secureConnection',
michael@0 99 handler = this.listeners(event)[0];
michael@0 100
michael@0 101 state.pool = spdy.zlibpool.create();
michael@0 102 state.handler = handler;
michael@0 103
michael@0 104 this.removeAllListeners(event);
michael@0 105
michael@0 106 // Normal mode, use NPN to fallback to HTTPS
michael@0 107 if (!state.options.plain) {
michael@0 108 return this.on(event, this._onConnection.bind(this));
michael@0 109 }
michael@0 110
michael@0 111 // In case of plain connection, we must fallback to HTTPS if first byte
michael@0 112 // is not equal to 0x80.
michael@0 113 this.on(event, function(socket) {
michael@0 114 var history = [],
michael@0 115 _emit = socket.emit;
michael@0 116
michael@0 117 // Add 'data' listener, otherwise 'data' events won't be emitted
michael@0 118 if (legacy) {
michael@0 119 function ondata() {};
michael@0 120 socket.once('data', ondata);
michael@0 121 }
michael@0 122
michael@0 123 // 2 minutes timeout, as http.js does by default
michael@0 124 socket.setTimeout(self.timeout || 2 * 60 * 1000);
michael@0 125
michael@0 126 socket.emit = function emit(event, data) {
michael@0 127 history.push(Array.prototype.slice.call(arguments));
michael@0 128
michael@0 129 if (event === 'data') {
michael@0 130 // Legacy
michael@0 131 onFirstByte.call(socket, data);
michael@0 132 } else if (event === 'readable') {
michael@0 133 // Streams
michael@0 134 onReadable.call(socket);
michael@0 135 } else if (event === 'end' ||
michael@0 136 event === 'close' ||
michael@0 137 event === 'error' ||
michael@0 138 event === 'timeout') {
michael@0 139 // We shouldn't get there if any data was received
michael@0 140 fail();
michael@0 141 }
michael@0 142 };
michael@0 143
michael@0 144 function fail() {
michael@0 145 socket.emit = _emit;
michael@0 146 history = null;
michael@0 147 try {
michael@0 148 socket.destroy();
michael@0 149 } catch (e) {
michael@0 150 }
michael@0 151 }
michael@0 152
michael@0 153 function restore() {
michael@0 154 var copy = history.slice();
michael@0 155 history = null;
michael@0 156
michael@0 157 if (legacy) socket.removeListener('data', ondata);
michael@0 158 socket.emit = _emit;
michael@0 159 for (var i = 0; i < copy.length; i++) {
michael@0 160 socket.emit.apply(socket, copy[i]);
michael@0 161 if (copy[i][0] === 'end') {
michael@0 162 if (socket.onend) socket.onend();
michael@0 163 }
michael@0 164 }
michael@0 165 }
michael@0 166
michael@0 167 function onFirstByte(data) {
michael@0 168 // Ignore empty packets
michael@0 169 if (data.length === 0) return;
michael@0 170
michael@0 171 if (data[0] === 0x80) {
michael@0 172 self._onConnection(socket);
michael@0 173 } else {
michael@0 174 handler.call(self, socket);
michael@0 175 }
michael@0 176
michael@0 177 // Fire events
michael@0 178 restore();
michael@0 179
michael@0 180 // NOTE: If we came there - .ondata() will be called anyway in this tick,
michael@0 181 // so there're no need to call it manually
michael@0 182 };
michael@0 183
michael@0 184 if (!legacy) {
michael@0 185 // Hack to make streams2 work properly
michael@0 186 socket.on('readable', onReadable);
michael@0 187 }
michael@0 188
michael@0 189 function onReadable() {
michael@0 190 var data = socket.read(1);
michael@0 191
michael@0 192 // Ignore empty packets
michael@0 193 if (!data) return;
michael@0 194 socket.removeListener('readable', onReadable);
michael@0 195
michael@0 196 // `.unshift()` emits `readable` event. Thus `emit` method should
michael@0 197 // be restored before calling it.
michael@0 198 socket.emit = _emit;
michael@0 199
michael@0 200 // Put packet back where it was before
michael@0 201 socket.unshift(data);
michael@0 202
michael@0 203 if (data[0] === 0x80) {
michael@0 204 self._onConnection(socket);
michael@0 205 } else {
michael@0 206 handler.call(self, socket);
michael@0 207 }
michael@0 208
michael@0 209 // Fire events
michael@0 210 restore();
michael@0 211
michael@0 212 if (socket.ondata) {
michael@0 213 data = socket.read(socket._readableState.length);
michael@0 214 if (data) socket.ondata(data, 0, data.length);
michael@0 215 }
michael@0 216 }
michael@0 217 });
michael@0 218 };
michael@0 219
michael@0 220 //
michael@0 221 // ### function _onConnection (socket)
michael@0 222 // #### @socket {Stream} incoming socket
michael@0 223 // Server's connection handler wrapper.
michael@0 224 //
michael@0 225 proto._onConnection = function _onConnection(socket) {
michael@0 226 var self = this,
michael@0 227 state = this._spdyState;
michael@0 228
michael@0 229 // Fallback to HTTPS if needed
michael@0 230 var selectedProtocol = socket.npnProtocol || socket.alpnProtocol;
michael@0 231 if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) &&
michael@0 232 !state.options.debug && !state.options.plain) {
michael@0 233 return state.handler.call(this, socket);
michael@0 234 }
michael@0 235
michael@0 236 // Wrap incoming socket into abstract class
michael@0 237 var connection = new Connection(socket, state.pool, state.options);
michael@0 238
michael@0 239 // Emulate each stream like connection
michael@0 240 connection.on('stream', state.handler);
michael@0 241
michael@0 242 connection.on('connect', function onconnect(req, socket) {
michael@0 243 socket.streamID = req.streamID = req.socket.id;
michael@0 244 socket.isSpdy = req.isSpdy = true;
michael@0 245 socket.spdyVersion = req.spdyVersion = req.socket.version;
michael@0 246
michael@0 247 socket.once('finish', function onfinish() {
michael@0 248 req.connection.end();
michael@0 249 });
michael@0 250
michael@0 251 self.emit('connect', req, socket);
michael@0 252 });
michael@0 253
michael@0 254 connection.on('request', function onrequest(req, res) {
michael@0 255 res._renderHeaders = spdy.response._renderHeaders;
michael@0 256 res.writeHead = spdy.response.writeHead;
michael@0 257 res.push = spdy.response.push;
michael@0 258 res.streamID = req.streamID = req.socket.id;
michael@0 259 res.spdyVersion = req.spdyVersion = req.socket.version;
michael@0 260 res.isSpdy = req.isSpdy = true;
michael@0 261
michael@0 262 // Chunked encoding is not supported in SPDY
michael@0 263 res.useChunkedEncodingByDefault = false;
michael@0 264
michael@0 265 res.once('finish', function onfinish() {
michael@0 266 req.connection.end();
michael@0 267 });
michael@0 268
michael@0 269 self.emit('request', req, res);
michael@0 270 });
michael@0 271
michael@0 272 connection.on('error', function onerror(e) {
michael@0 273 console.log('[secureConnection] error ' + e);
michael@0 274 socket.destroy(e.errno === 'EPIPE' ? undefined : e);
michael@0 275 });
michael@0 276 };
michael@0 277
michael@0 278 // Export Server instantiated from https.Server
michael@0 279 var Server = instantiate(https.Server);
michael@0 280 exports.Server = Server;
michael@0 281
michael@0 282 //
michael@0 283 // ### function create (base, options, requestListener)
michael@0 284 // #### @base {Function} (optional) base server class (https.Server)
michael@0 285 // #### @options {Object} tls server options
michael@0 286 // #### @requestListener {Function} (optional) request callback
michael@0 287 // @constructor wrapper
michael@0 288 //
michael@0 289 exports.create = function create(base, options, requestListener) {
michael@0 290 var server;
michael@0 291 if (typeof base === 'function') {
michael@0 292 server = instantiate(base);
michael@0 293 } else {
michael@0 294 server = Server;
michael@0 295
michael@0 296 requestListener = options;
michael@0 297 options = base;
michael@0 298 base = null;
michael@0 299 }
michael@0 300
michael@0 301 return new server(options, requestListener);
michael@0 302 };
michael@0 303
michael@0 304 //
michael@0 305 // ### function Connection (socket, pool, options)
michael@0 306 // #### @socket {net.Socket} server's connection
michael@0 307 // #### @pool {spdy.ZlibPool} zlib pool
michael@0 308 // #### @options {Object} server's options
michael@0 309 // Abstract connection @constructor
michael@0 310 //
michael@0 311 function Connection(socket, pool, options) {
michael@0 312 process.EventEmitter.call(this);
michael@0 313
michael@0 314 var self = this;
michael@0 315
michael@0 316 this._closed = false;
michael@0 317
michael@0 318 this.pool = pool;
michael@0 319 var pair = null;
michael@0 320
michael@0 321 this._deflate = null;
michael@0 322 this._inflate = null;
michael@0 323
michael@0 324 this.encrypted = socket.encrypted;
michael@0 325
michael@0 326 // Init streams list
michael@0 327 this.streams = {};
michael@0 328 this.streamsCount = 0;
michael@0 329 this.pushId = 0;
michael@0 330 this._goaway = false;
michael@0 331
michael@0 332 this._framer = null;
michael@0 333
michael@0 334 // Data transfer window defaults to 64kb
michael@0 335 this.windowSize = options.windowSize;
michael@0 336 this.sinkSize = options.sinkSize;
michael@0 337
michael@0 338 // Initialize scheduler
michael@0 339 this.scheduler = spdy.scheduler.create(this);
michael@0 340
michael@0 341 // Store socket and pipe it to parser
michael@0 342 this.socket = socket;
michael@0 343
michael@0 344 // Initialize parser
michael@0 345 this.parser = spdy.parser.create(this);
michael@0 346 this.parser.on('frame', function (frame) {
michael@0 347 if (this._closed) return;
michael@0 348
michael@0 349 var stream;
michael@0 350
michael@0 351 // Create new stream
michael@0 352 if (frame.type === 'SYN_STREAM') {
michael@0 353 self.streamsCount++;
michael@0 354
michael@0 355 stream = self.streams[frame.id] = new Stream(self, frame);
michael@0 356
michael@0 357 // If we reached stream limit
michael@0 358 if (self.streamsCount > options.maxStreams) {
michael@0 359 stream.once('error', function onerror() {});
michael@0 360 // REFUSED_STREAM
michael@0 361 stream._rstCode = 3;
michael@0 362 stream.destroy(true);
michael@0 363 } else {
michael@0 364 self.emit('stream', stream);
michael@0 365
michael@0 366 stream._init();
michael@0 367 }
michael@0 368 } else {
michael@0 369 if (frame.id) {
michael@0 370 // Load created one
michael@0 371 stream = self.streams[frame.id];
michael@0 372
michael@0 373 // Fail if not found
michael@0 374 if (stream === undefined) {
michael@0 375 if (frame.type === 'RST_STREAM') return;
michael@0 376 self.write(self._framer.rstFrame(frame.id, 2));
michael@0 377 return;
michael@0 378 }
michael@0 379 }
michael@0 380
michael@0 381 // Emit 'data' event
michael@0 382 if (frame.type === 'DATA') {
michael@0 383 if (frame.data.length > 0){
michael@0 384 if (stream._closedBy.client) {
michael@0 385 stream._rstCode = 2;
michael@0 386 stream.emit('error', 'Writing to half-closed stream');
michael@0 387 } else {
michael@0 388 stream._recv(frame.data);
michael@0 389 }
michael@0 390 }
michael@0 391 // Destroy stream if we was asked to do this
michael@0 392 } else if (frame.type === 'RST_STREAM') {
michael@0 393 stream._rstCode = 0;
michael@0 394 if (frame.status === 5) {
michael@0 395 // If client "cancels" connection - close stream and
michael@0 396 // all associated push streams without error
michael@0 397 stream.pushes.forEach(function(stream) {
michael@0 398 stream.close();
michael@0 399 });
michael@0 400 stream.close();
michael@0 401 } else {
michael@0 402 // Emit error on destroy
michael@0 403 stream.destroy(new Error('Received rst: ' + frame.status));
michael@0 404 }
michael@0 405 // Respond with same PING
michael@0 406 } else if (frame.type === 'PING') {
michael@0 407 self.write(self._framer.pingFrame(frame.pingId));
michael@0 408 } else if (frame.type === 'SETTINGS') {
michael@0 409 self._setDefaultWindow(frame.settings);
michael@0 410 } else if (frame.type === 'GOAWAY') {
michael@0 411 self._goaway = frame.lastId;
michael@0 412 } else if (frame.type === 'WINDOW_UPDATE') {
michael@0 413 stream._drainSink(frame.delta);
michael@0 414 } else {
michael@0 415 console.error('Unknown type: ', frame.type);
michael@0 416 }
michael@0 417 }
michael@0 418
michael@0 419 // Handle half-closed
michael@0 420 if (frame.fin) {
michael@0 421 // Don't allow to close stream twice
michael@0 422 if (stream._closedBy.client) {
michael@0 423 stream._rstCode = 2;
michael@0 424 stream.emit('error', 'Already half-closed');
michael@0 425 } else {
michael@0 426 stream._closedBy.client = true;
michael@0 427
michael@0 428 // Emulate last chunked fragment
michael@0 429 if (stream._forceChunked) {
michael@0 430 stream._recv(last_frag, true);
michael@0 431 }
michael@0 432
michael@0 433 stream._handleClose();
michael@0 434 }
michael@0 435 }
michael@0 436 });
michael@0 437
michael@0 438 this.parser.on('version', function onversion(version) {
michael@0 439 if (!pair) {
michael@0 440 pair = pool.get('spdy/' + version);
michael@0 441 self._deflate = pair.deflate;
michael@0 442 self._inflate = pair.inflate;
michael@0 443 }
michael@0 444 });
michael@0 445
michael@0 446 this.parser.on('framer', function onframer(framer) {
michael@0 447 // Generate custom settings frame and send
michael@0 448 self.write(framer.settingsFrame(options));
michael@0 449 });
michael@0 450
michael@0 451 // Propagate parser errors
michael@0 452 this.parser.on('error', function onParserError(err) {
michael@0 453 self.emit('error', err);
michael@0 454 });
michael@0 455
michael@0 456 socket.pipe(this.parser);
michael@0 457
michael@0 458 // 2 minutes socket timeout
michael@0 459 socket.setTimeout(2 * 60 * 1000);
michael@0 460 socket.once('timeout', function ontimeout() {
michael@0 461 socket.destroy();
michael@0 462 });
michael@0 463
michael@0 464 // Allow high-level api to catch socket errors
michael@0 465 socket.on('error', function onSocketError(e) {
michael@0 466 self.emit('error', e);
michael@0 467 });
michael@0 468
michael@0 469 socket.once('close', function onclose() {
michael@0 470 self._closed = true;
michael@0 471 if (pair) pool.put(pair);
michael@0 472 });
michael@0 473
michael@0 474 if (legacy) {
michael@0 475 socket.on('drain', function ondrain() {
michael@0 476 self.emit('drain');
michael@0 477 });
michael@0 478 }
michael@0 479 }
michael@0 480 util.inherits(Connection, process.EventEmitter);
michael@0 481 exports.Connection = Connection;
michael@0 482
michael@0 483 //
michael@0 484 // ### function write (data, encoding)
michael@0 485 // #### @data {String|Buffer} data
michael@0 486 // #### @encoding {String} (optional) encoding
michael@0 487 // Writes data to socket
michael@0 488 //
michael@0 489 Connection.prototype.write = function write(data, encoding) {
michael@0 490 if (this.socket.writable) {
michael@0 491 return this.socket.write(data, encoding);
michael@0 492 }
michael@0 493 };
michael@0 494
michael@0 495 //
michael@0 496 // ### function _setDefaultWindow (settings)
michael@0 497 // #### @settings {Object}
michael@0 498 // Update the default transfer window -- in the connection and in the
michael@0 499 // active streams
michael@0 500 //
michael@0 501 Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) {
michael@0 502 if (!settings) return;
michael@0 503 if (!settings.initial_window_size ||
michael@0 504 settings.initial_window_size.persisted) {
michael@0 505 return;
michael@0 506 }
michael@0 507
michael@0 508 this.sinkSize = settings.initial_window_size.value;
michael@0 509
michael@0 510 Object.keys(this.streams).forEach(function(id) {
michael@0 511 this.streams[id]._updateSinkSize(settings.initial_window_size.value);
michael@0 512 }, this);
michael@0 513 };
michael@0 514
michael@0 515 //
michael@0 516 // ### function Stream (connection, frame)
michael@0 517 // #### @connection {Connection} SPDY Connection
michael@0 518 // #### @frame {Object} SYN_STREAM data
michael@0 519 // Abstract stream @constructor
michael@0 520 //
michael@0 521 function Stream(connection, frame) {
michael@0 522 DuplexStream.call(this);
michael@0 523
michael@0 524 this.connection = connection;
michael@0 525 this.socket = connection.socket;
michael@0 526 this.encrypted = connection.encrypted;
michael@0 527 this._framer = connection._framer;
michael@0 528 this._initialized = false;
michael@0 529
michael@0 530 // Should chunked encoding be forced
michael@0 531 this._forceChunked = false;
michael@0 532
michael@0 533 this.ondata = this.onend = null;
michael@0 534
michael@0 535 // RST_STREAM code if any
michael@0 536 this._rstCode = 1;
michael@0 537 this._destroyed = false;
michael@0 538
michael@0 539 this._closedBy = {
michael@0 540 client: false,
michael@0 541 server: false
michael@0 542 };
michael@0 543
michael@0 544 // Lock data
michael@0 545 this._locked = false;
michael@0 546 this._lockBuffer = [];
michael@0 547
michael@0 548 // Store id
michael@0 549 this.id = frame.id;
michael@0 550 this.version = frame.version;
michael@0 551
michael@0 552 // Store priority
michael@0 553 this.priority = frame.priority;
michael@0 554
michael@0 555 // Array of push streams associated to that one
michael@0 556 this.pushes = [];
michael@0 557
michael@0 558 // How much data can be sent TO client before next WINDOW_UPDATE
michael@0 559 this._sinkSize = connection.sinkSize;
michael@0 560 this._initialSinkSize = connection.sinkSize;
michael@0 561
michael@0 562 // When data needs to be send, but window is too small for it - it'll be
michael@0 563 // queued in this buffer
michael@0 564 this._sinkBuffer = [];
michael@0 565
michael@0 566 // How much data can be sent BY client before next WINDOW_UPDATE
michael@0 567 this._initialWindowSize = connection.windowSize;
michael@0 568 this._windowSize = connection.windowSize;
michael@0 569
michael@0 570 // Create compression streams
michael@0 571 this._deflate = connection._deflate;
michael@0 572 this._inflate = connection._inflate;
michael@0 573
michael@0 574 // Store headers
michael@0 575 this.headers = frame.headers;
michael@0 576 this.url = frame.url;
michael@0 577
michael@0 578 this._frame = frame;
michael@0 579
michael@0 580 if (legacy) {
michael@0 581 this.readable = this.writable = true;
michael@0 582 }
michael@0 583
michael@0 584 // Call .onend()
michael@0 585 this.once('end', function() {
michael@0 586 var self = this;
michael@0 587 process.nextTick(function() {
michael@0 588 if (self.onend) self.onend();
michael@0 589 });
michael@0 590 });
michael@0 591
michael@0 592 // Handle half-close
michael@0 593 this.once('finish', function() {
michael@0 594 this._writeData(true, []);
michael@0 595 this._closedBy.server = true;
michael@0 596 if (this._sinkBuffer.length !== 0) return;
michael@0 597 this._handleClose();
michael@0 598 });
michael@0 599 };
michael@0 600 util.inherits(Stream, DuplexStream);
michael@0 601 exports.Stream = Stream;
michael@0 602
michael@0 603 if (legacy) {
michael@0 604 Stream.prototype.pause = function pause() {};
michael@0 605 Stream.prototype.resume = function resume() {};
michael@0 606 }
michael@0 607
michael@0 608 //
michael@0 609 // ### function _isGoaway ()
michael@0 610 // Returns true if any writes to that stream should be ignored
michael@0 611 //
michael@0 612 Stream.prototype._isGoaway = function _isGoaway() {
michael@0 613 return this.connection._goaway && this.id > this.connection._goaway;
michael@0 614 };
michael@0 615
michael@0 616 //
michael@0 617 // ### function init ()
michael@0 618 // Initialize stream, internal
michael@0 619 //
michael@0 620 Stream.prototype._init = function init() {
michael@0 621 var headers = this.headers,
michael@0 622 req = [headers.method + ' ' + this.url + ' ' + headers.version];
michael@0 623
michael@0 624 Object.keys(headers).forEach(function (key) {
michael@0 625 if (key !== 'method' && key !== 'url' && key !== 'version' &&
michael@0 626 key !== 'scheme') {
michael@0 627 req.push(key + ': ' + headers[key]);
michael@0 628 }
michael@0 629 });
michael@0 630
michael@0 631 // Force chunked encoding
michael@0 632 if (!headers['content-length'] && !headers['transfer-encoding']) {
michael@0 633 req.push('Transfer-Encoding: chunked');
michael@0 634 this._forceChunked = true;
michael@0 635 }
michael@0 636
michael@0 637 // Add '\r\n\r\n'
michael@0 638 req.push('', '');
michael@0 639
michael@0 640 req = new Buffer(req.join('\r\n'));
michael@0 641
michael@0 642 this._recv(req, true);
michael@0 643 this._initialized = true;
michael@0 644 };
michael@0 645
michael@0 646 //
michael@0 647 // ### function lock (callback)
michael@0 648 // #### @callback {Function} continuation callback
michael@0 649 // Acquire lock
michael@0 650 //
michael@0 651 Stream.prototype._lock = function lock(callback) {
michael@0 652 if (!callback) return;
michael@0 653
michael@0 654 if (this._locked) {
michael@0 655 this._lockBuffer.push(callback);
michael@0 656 } else {
michael@0 657 this._locked = true;
michael@0 658 callback.call(this, null);
michael@0 659 }
michael@0 660 };
michael@0 661
michael@0 662 //
michael@0 663 // ### function unlock ()
michael@0 664 // Release lock and call all buffered callbacks
michael@0 665 //
michael@0 666 Stream.prototype._unlock = function unlock() {
michael@0 667 if (this._locked) {
michael@0 668 this._locked = false;
michael@0 669 this._lock(this._lockBuffer.shift());
michael@0 670 }
michael@0 671 };
michael@0 672
michael@0 673 //
michael@0 674 // ### function setTimeout ()
michael@0 675 // TODO: use timers.enroll, timers.active, timers.unenroll
michael@0 676 //
michael@0 677 Stream.prototype.setTimeout = function setTimeout(time) {};
michael@0 678
michael@0 679 //
michael@0 680 // ### function _handleClose ()
michael@0 681 // Close stream if it was closed by both server and client
michael@0 682 //
michael@0 683 Stream.prototype._handleClose = function _handleClose() {
michael@0 684 if (this._closedBy.client && this._closedBy.server) {
michael@0 685 this.close();
michael@0 686 }
michael@0 687 };
michael@0 688
michael@0 689 //
michael@0 690 // ### function close ()
michael@0 691 // Destroys stream
michael@0 692 //
michael@0 693 Stream.prototype.close = function close() {
michael@0 694 this.destroy();
michael@0 695 };
michael@0 696
michael@0 697 //
michael@0 698 // ### function destroy (error)
michael@0 699 // #### @error {Error} (optional) error
michael@0 700 // Destroys stream
michael@0 701 //
michael@0 702 Stream.prototype.destroy = function destroy(error) {
michael@0 703 if (this._destroyed) return;
michael@0 704 this._destroyed = true;
michael@0 705
michael@0 706 delete this.connection.streams[this.id];
michael@0 707 if (this.id % 2 === 1) {
michael@0 708 this.connection.streamsCount--;
michael@0 709 }
michael@0 710
michael@0 711 // If stream is not finished, RST frame should be sent to notify client
michael@0 712 // about sudden stream termination.
michael@0 713 if (error || !this._closedBy.server) {
michael@0 714 // REFUSED_STREAM if terminated before 'finish' event
michael@0 715 if (!this._closedBy.server) this._rstCode = 3;
michael@0 716
michael@0 717 if (this._rstCode) {
michael@0 718 this._lock(function() {
michael@0 719 this.connection.scheduler.schedule(
michael@0 720 this,
michael@0 721 this._framer.rstFrame(this.id, this._rstCode));
michael@0 722 this.connection.scheduler.tick();
michael@0 723
michael@0 724 this._unlock();
michael@0 725 });
michael@0 726 }
michael@0 727 }
michael@0 728
michael@0 729 if (legacy) {
michael@0 730 this.emit('end');
michael@0 731 } else {
michael@0 732 this.push(null);
michael@0 733 }
michael@0 734
michael@0 735 if (error) this.emit('error', error);
michael@0 736
michael@0 737 var self = this;
michael@0 738 process.nextTick(function() {
michael@0 739 self.emit('close', !!error);
michael@0 740 });
michael@0 741 };
michael@0 742
michael@0 743 Stream.prototype.destroySoon = function destroySoon(error) {
michael@0 744 return this.destroy(error);
michael@0 745 };
michael@0 746
michael@0 747 Stream.prototype._drainSink = function _drainSink(size) {
michael@0 748 var oldBuffer = this._sinkBuffer;
michael@0 749 this._sinkBuffer = [];
michael@0 750
michael@0 751 this._sinkSize += size;
michael@0 752
michael@0 753 for (var i = 0; i < oldBuffer.length; i++) {
michael@0 754 this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]);
michael@0 755 }
michael@0 756
michael@0 757 // Handle half-close
michael@0 758 if (this._sinkBuffer.length === 0 && this._closedBy.server) {
michael@0 759 this._handleClose();
michael@0 760 }
michael@0 761
michael@0 762 if (legacy) this.emit('drain');
michael@0 763 };
michael@0 764
michael@0 765 //
michael@0 766 // ### function _writeData (fin, buffer, cb)
michael@0 767 // #### @fin {Boolean}
michael@0 768 // #### @buffer {Buffer}
michael@0 769 // #### @cb {Function} **optional**
michael@0 770 // Internal function
michael@0 771 //
michael@0 772 Stream.prototype._writeData = function _writeData(fin, buffer, cb) {
michael@0 773 if (this._framer.version === 3) {
michael@0 774 // Window was exhausted, queue data
michael@0 775 if (this._sinkSize <= 0) {
michael@0 776 this._sinkBuffer.push([fin, buffer, cb]);
michael@0 777 return false;
michael@0 778 }
michael@0 779
michael@0 780 var len = Math.min(this._sinkSize, buffer.length);
michael@0 781 this._sinkSize -= len;
michael@0 782
michael@0 783 // Only partial write is possible, queue rest for later
michael@0 784 if (len < buffer.length) {
michael@0 785 this._sinkBuffer.push([fin, buffer.slice(len)]);
michael@0 786 buffer = buffer.slice(0, len);
michael@0 787 fin = false;
michael@0 788 }
michael@0 789 }
michael@0 790
michael@0 791 this._lock(function() {
michael@0 792 var stream = this,
michael@0 793 frame = this._framer.dataFrame(this.id, fin, buffer);
michael@0 794
michael@0 795 stream.connection.scheduler.schedule(stream, frame);
michael@0 796 stream.connection.scheduler.tick();
michael@0 797
michael@0 798 this._unlock();
michael@0 799
michael@0 800 if (cb) cb();
michael@0 801 });
michael@0 802
michael@0 803 return true;
michael@0 804 };
michael@0 805
michael@0 806 //
michael@0 807 // ### function write (data, encoding)
michael@0 808 // #### @data {Buffer|String} data
michael@0 809 // #### @encoding {String} data encoding
michael@0 810 // Writes data to connection
michael@0 811 //
michael@0 812 Stream.prototype._write = function write(data, encoding, cb) {
michael@0 813 // Do not send data to new connections after GOAWAY
michael@0 814 if (this._isGoaway()) {
michael@0 815 if (cb) cb();
michael@0 816 return false;
michael@0 817 }
michael@0 818
michael@0 819 return this._writeData(false, data, cb);
michael@0 820 };
michael@0 821
michael@0 822 if (legacy) {
michael@0 823 Stream.prototype.write = function write(data, encoding, cb) {
michael@0 824 if (!Buffer.isBuffer(data)) {
michael@0 825 return this._write(new Buffer(data, encoding), null, cb);
michael@0 826 } else {
michael@0 827 return this._write(data, encoding, cb);
michael@0 828 }
michael@0 829 };
michael@0 830
michael@0 831 //
michael@0 832 // ### function end (data)
michael@0 833 // #### @data {Buffer|String} (optional) data to write before ending stream
michael@0 834 // #### @encoding {String} (optional) string encoding
michael@0 835 // Send FIN data frame
michael@0 836 //
michael@0 837 Stream.prototype.end = function end(data, encoding) {
michael@0 838 // Do not send data to new connections after GOAWAY
michael@0 839 if (this._isGoaway()) return;
michael@0 840
michael@0 841 if (data) this.write(data, encoding);
michael@0 842 this.emit('finish');
michael@0 843 };
michael@0 844 }
michael@0 845
michael@0 846 //
michael@0 847 // ### function _recv (data)
michael@0 848 // #### @data {Buffer} buffer to receive
michael@0 849 // #### @chunked {Boolean}
michael@0 850 // (internal)
michael@0 851 //
michael@0 852 Stream.prototype._recv = function _recv(data, chunked) {
michael@0 853 // Update window if exhausted
michael@0 854 if (!chunked && this._framer.version >= 3 && this._initialized) {
michael@0 855 this._windowSize -= data.length;
michael@0 856
michael@0 857 if (this._windowSize <= 0) {
michael@0 858 var delta = this._initialWindowSize - this._windowSize;
michael@0 859 this._windowSize += delta;
michael@0 860 this.connection.write(this._framer.windowUpdateFrame(this.id, delta));
michael@0 861 }
michael@0 862 }
michael@0 863
michael@0 864 // Emulate chunked encoding
michael@0 865 if (this._forceChunked && !chunked) {
michael@0 866 // Zero-chunks are treated as end, do not emit them
michael@0 867 if (data.length === 0) return;
michael@0 868
michael@0 869 this._recv(new Buffer(data.length.toString(16)), true);
michael@0 870 this._recv(crlf, true);
michael@0 871 this._recv(data, true);
michael@0 872 this._recv(crlf, true);
michael@0 873 return;
michael@0 874 }
michael@0 875
michael@0 876 if (legacy) {
michael@0 877 var self = this;
michael@0 878 process.nextTick(function() {
michael@0 879 self.emit('data', data);
michael@0 880 if (self.ondata) {
michael@0 881 self.ondata(data, 0, data.length);
michael@0 882 }
michael@0 883 });
michael@0 884 } else {
michael@0 885 // Right now, http module expects socket to be working in streams1 mode.
michael@0 886 if (this.ondata) {
michael@0 887 this.ondata(data, 0, data.length);
michael@0 888 } else {
michael@0 889 this.push(data);
michael@0 890 }
michael@0 891 }
michael@0 892 };
michael@0 893
michael@0 894 //
michael@0 895 // ### function _read (bytes, cb)
michael@0 896 // #### @bytes {Number} number of bytes to read
michael@0 897 // Streams2 API
michael@0 898 //
michael@0 899 Stream.prototype._read = function read(bytes) {
michael@0 900 // NOP
michael@0 901 };
michael@0 902
michael@0 903 //
michael@0 904 // ### function _updateSinkSize (size)
michael@0 905 // #### @size {Integer}
michael@0 906 // Update the internal data transfer window
michael@0 907 //
michael@0 908 Stream.prototype._updateSinkSize = function _updateSinkSize(size) {
michael@0 909 var diff = size - this._initialSinkSize;
michael@0 910
michael@0 911 this._initialSinkSize = size;
michael@0 912 this._drainSink(diff);
michael@0 913 };
michael@0 914
michael@0 915 //
michael@0 916 // `net` compatibility layer
michael@0 917 // (Copy pasted from lib/tls.js from node.js)
michael@0 918 //
michael@0 919 Stream.prototype.address = function address() {
michael@0 920 return this.socket && this.socket.address();
michael@0 921 };
michael@0 922
michael@0 923 Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() {
michael@0 924 return this.socket && this.socket.remoteAddress;
michael@0 925 });
michael@0 926
michael@0 927 Stream.prototype.__defineGetter__('remotePort', function remotePort() {
michael@0 928 return this.socket && this.socket.remotePort;
michael@0 929 });

mercurial