1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/testing/xpcshell/node-spdy/lib/spdy/server.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,929 @@ 1.4 +var spdy = require('../spdy'), 1.5 + util = require('util'), 1.6 + https = require('https'), 1.7 + stream = require('stream'), 1.8 + Buffer = require('buffer').Buffer; 1.9 + 1.10 +var crlf = new Buffer('\r\n'); 1.11 +var last_frag = new Buffer('0\r\n\r\n'); 1.12 + 1.13 +var legacy = !stream.Duplex; 1.14 + 1.15 +if (legacy) { 1.16 + var DuplexStream = stream; 1.17 +} else { 1.18 + var DuplexStream = stream.Duplex; 1.19 +} 1.20 + 1.21 +// 1.22 +// ### function instantiate (HTTPSServer) 1.23 +// #### @HTTPSServer {https.Server|Function} Base server class 1.24 +// Will return constructor for SPDY Server, based on the HTTPSServer class 1.25 +// 1.26 +function instantiate(HTTPSServer) { 1.27 + // 1.28 + // ### function Server (options, requestListener) 1.29 + // #### @options {Object} tls server options 1.30 + // #### @requestListener {Function} (optional) request callback 1.31 + // SPDY Server @constructor 1.32 + // 1.33 + function Server(options, requestListener) { 1.34 + // Initialize 1.35 + this._init(HTTPSServer, options, requestListener); 1.36 + 1.37 + // Wrap connection handler 1.38 + this._wrap(); 1.39 + }; 1.40 + util.inherits(Server, HTTPSServer); 1.41 + 1.42 + // Copy prototype methods 1.43 + Object.keys(proto).forEach(function(key) { 1.44 + this[key] = proto[key]; 1.45 + }, Server.prototype); 1.46 + 1.47 + return Server; 1.48 +} 1.49 +exports.instantiate = instantiate; 1.50 + 1.51 +// Common prototype for all servers 1.52 +var proto = {}; 1.53 + 1.54 +// 1.55 +// ### function _init(base, options, listener) 1.56 +// #### @base {Function} (optional) base server class (https.Server) 1.57 +// #### @options {Object} tls server options 1.58 +// #### @handler {Function} (optional) request handler 1.59 +// Initializer. 1.60 +// 1.61 +proto._init = function _init(base, options, handler) { 1.62 + var state = {}; 1.63 + this._spdyState = state; 1.64 + 1.65 + if (!options) options = {}; 1.66 + if (!options.maxStreams) options.maxStreams = 100; 1.67 + if (!options.sinkSize) { 1.68 + options.sinkSize = 1 << 16; 1.69 + } 1.70 + if (!options.windowSize) { 1.71 + options.windowSize = 1 << 20; // 1mb 1.72 + } 1.73 + 1.74 + options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; 1.75 + options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; 1.76 + state.options = options; 1.77 + state.reqHandler = handler; 1.78 + 1.79 + if (options.plain && !options.ssl) { 1.80 + base.call(this, handler); 1.81 + } else { 1.82 + base.call(this, options, handler); 1.83 + } 1.84 + 1.85 + // Use https if NPN is not supported 1.86 + if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) { 1.87 + return; 1.88 + } 1.89 +}; 1.90 + 1.91 +// 1.92 +// ### function _wrap() 1.93 +// Wrap connection handler and add logic. 1.94 +// 1.95 +proto._wrap = function _wrap() { 1.96 + var self = this, 1.97 + state = this._spdyState; 1.98 + 1.99 + // Wrap connection handler 1.100 + var event = state.options.plain && !state.options.ssl ? 'connection' : 1.101 + 'secureConnection', 1.102 + handler = this.listeners(event)[0]; 1.103 + 1.104 + state.pool = spdy.zlibpool.create(); 1.105 + state.handler = handler; 1.106 + 1.107 + this.removeAllListeners(event); 1.108 + 1.109 + // Normal mode, use NPN to fallback to HTTPS 1.110 + if (!state.options.plain) { 1.111 + return this.on(event, this._onConnection.bind(this)); 1.112 + } 1.113 + 1.114 + // In case of plain connection, we must fallback to HTTPS if first byte 1.115 + // is not equal to 0x80. 1.116 + this.on(event, function(socket) { 1.117 + var history = [], 1.118 + _emit = socket.emit; 1.119 + 1.120 + // Add 'data' listener, otherwise 'data' events won't be emitted 1.121 + if (legacy) { 1.122 + function ondata() {}; 1.123 + socket.once('data', ondata); 1.124 + } 1.125 + 1.126 + // 2 minutes timeout, as http.js does by default 1.127 + socket.setTimeout(self.timeout || 2 * 60 * 1000); 1.128 + 1.129 + socket.emit = function emit(event, data) { 1.130 + history.push(Array.prototype.slice.call(arguments)); 1.131 + 1.132 + if (event === 'data') { 1.133 + // Legacy 1.134 + onFirstByte.call(socket, data); 1.135 + } else if (event === 'readable') { 1.136 + // Streams 1.137 + onReadable.call(socket); 1.138 + } else if (event === 'end' || 1.139 + event === 'close' || 1.140 + event === 'error' || 1.141 + event === 'timeout') { 1.142 + // We shouldn't get there if any data was received 1.143 + fail(); 1.144 + } 1.145 + }; 1.146 + 1.147 + function fail() { 1.148 + socket.emit = _emit; 1.149 + history = null; 1.150 + try { 1.151 + socket.destroy(); 1.152 + } catch (e) { 1.153 + } 1.154 + } 1.155 + 1.156 + function restore() { 1.157 + var copy = history.slice(); 1.158 + history = null; 1.159 + 1.160 + if (legacy) socket.removeListener('data', ondata); 1.161 + socket.emit = _emit; 1.162 + for (var i = 0; i < copy.length; i++) { 1.163 + socket.emit.apply(socket, copy[i]); 1.164 + if (copy[i][0] === 'end') { 1.165 + if (socket.onend) socket.onend(); 1.166 + } 1.167 + } 1.168 + } 1.169 + 1.170 + function onFirstByte(data) { 1.171 + // Ignore empty packets 1.172 + if (data.length === 0) return; 1.173 + 1.174 + if (data[0] === 0x80) { 1.175 + self._onConnection(socket); 1.176 + } else { 1.177 + handler.call(self, socket); 1.178 + } 1.179 + 1.180 + // Fire events 1.181 + restore(); 1.182 + 1.183 + // NOTE: If we came there - .ondata() will be called anyway in this tick, 1.184 + // so there're no need to call it manually 1.185 + }; 1.186 + 1.187 + if (!legacy) { 1.188 + // Hack to make streams2 work properly 1.189 + socket.on('readable', onReadable); 1.190 + } 1.191 + 1.192 + function onReadable() { 1.193 + var data = socket.read(1); 1.194 + 1.195 + // Ignore empty packets 1.196 + if (!data) return; 1.197 + socket.removeListener('readable', onReadable); 1.198 + 1.199 + // `.unshift()` emits `readable` event. Thus `emit` method should 1.200 + // be restored before calling it. 1.201 + socket.emit = _emit; 1.202 + 1.203 + // Put packet back where it was before 1.204 + socket.unshift(data); 1.205 + 1.206 + if (data[0] === 0x80) { 1.207 + self._onConnection(socket); 1.208 + } else { 1.209 + handler.call(self, socket); 1.210 + } 1.211 + 1.212 + // Fire events 1.213 + restore(); 1.214 + 1.215 + if (socket.ondata) { 1.216 + data = socket.read(socket._readableState.length); 1.217 + if (data) socket.ondata(data, 0, data.length); 1.218 + } 1.219 + } 1.220 + }); 1.221 +}; 1.222 + 1.223 +// 1.224 +// ### function _onConnection (socket) 1.225 +// #### @socket {Stream} incoming socket 1.226 +// Server's connection handler wrapper. 1.227 +// 1.228 +proto._onConnection = function _onConnection(socket) { 1.229 + var self = this, 1.230 + state = this._spdyState; 1.231 + 1.232 + // Fallback to HTTPS if needed 1.233 + var selectedProtocol = socket.npnProtocol || socket.alpnProtocol; 1.234 + if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) && 1.235 + !state.options.debug && !state.options.plain) { 1.236 + return state.handler.call(this, socket); 1.237 + } 1.238 + 1.239 + // Wrap incoming socket into abstract class 1.240 + var connection = new Connection(socket, state.pool, state.options); 1.241 + 1.242 + // Emulate each stream like connection 1.243 + connection.on('stream', state.handler); 1.244 + 1.245 + connection.on('connect', function onconnect(req, socket) { 1.246 + socket.streamID = req.streamID = req.socket.id; 1.247 + socket.isSpdy = req.isSpdy = true; 1.248 + socket.spdyVersion = req.spdyVersion = req.socket.version; 1.249 + 1.250 + socket.once('finish', function onfinish() { 1.251 + req.connection.end(); 1.252 + }); 1.253 + 1.254 + self.emit('connect', req, socket); 1.255 + }); 1.256 + 1.257 + connection.on('request', function onrequest(req, res) { 1.258 + res._renderHeaders = spdy.response._renderHeaders; 1.259 + res.writeHead = spdy.response.writeHead; 1.260 + res.push = spdy.response.push; 1.261 + res.streamID = req.streamID = req.socket.id; 1.262 + res.spdyVersion = req.spdyVersion = req.socket.version; 1.263 + res.isSpdy = req.isSpdy = true; 1.264 + 1.265 + // Chunked encoding is not supported in SPDY 1.266 + res.useChunkedEncodingByDefault = false; 1.267 + 1.268 + res.once('finish', function onfinish() { 1.269 + req.connection.end(); 1.270 + }); 1.271 + 1.272 + self.emit('request', req, res); 1.273 + }); 1.274 + 1.275 + connection.on('error', function onerror(e) { 1.276 + console.log('[secureConnection] error ' + e); 1.277 + socket.destroy(e.errno === 'EPIPE' ? undefined : e); 1.278 + }); 1.279 +}; 1.280 + 1.281 +// Export Server instantiated from https.Server 1.282 +var Server = instantiate(https.Server); 1.283 +exports.Server = Server; 1.284 + 1.285 +// 1.286 +// ### function create (base, options, requestListener) 1.287 +// #### @base {Function} (optional) base server class (https.Server) 1.288 +// #### @options {Object} tls server options 1.289 +// #### @requestListener {Function} (optional) request callback 1.290 +// @constructor wrapper 1.291 +// 1.292 +exports.create = function create(base, options, requestListener) { 1.293 + var server; 1.294 + if (typeof base === 'function') { 1.295 + server = instantiate(base); 1.296 + } else { 1.297 + server = Server; 1.298 + 1.299 + requestListener = options; 1.300 + options = base; 1.301 + base = null; 1.302 + } 1.303 + 1.304 + return new server(options, requestListener); 1.305 +}; 1.306 + 1.307 +// 1.308 +// ### function Connection (socket, pool, options) 1.309 +// #### @socket {net.Socket} server's connection 1.310 +// #### @pool {spdy.ZlibPool} zlib pool 1.311 +// #### @options {Object} server's options 1.312 +// Abstract connection @constructor 1.313 +// 1.314 +function Connection(socket, pool, options) { 1.315 + process.EventEmitter.call(this); 1.316 + 1.317 + var self = this; 1.318 + 1.319 + this._closed = false; 1.320 + 1.321 + this.pool = pool; 1.322 + var pair = null; 1.323 + 1.324 + this._deflate = null; 1.325 + this._inflate = null; 1.326 + 1.327 + this.encrypted = socket.encrypted; 1.328 + 1.329 + // Init streams list 1.330 + this.streams = {}; 1.331 + this.streamsCount = 0; 1.332 + this.pushId = 0; 1.333 + this._goaway = false; 1.334 + 1.335 + this._framer = null; 1.336 + 1.337 + // Data transfer window defaults to 64kb 1.338 + this.windowSize = options.windowSize; 1.339 + this.sinkSize = options.sinkSize; 1.340 + 1.341 + // Initialize scheduler 1.342 + this.scheduler = spdy.scheduler.create(this); 1.343 + 1.344 + // Store socket and pipe it to parser 1.345 + this.socket = socket; 1.346 + 1.347 + // Initialize parser 1.348 + this.parser = spdy.parser.create(this); 1.349 + this.parser.on('frame', function (frame) { 1.350 + if (this._closed) return; 1.351 + 1.352 + var stream; 1.353 + 1.354 + // Create new stream 1.355 + if (frame.type === 'SYN_STREAM') { 1.356 + self.streamsCount++; 1.357 + 1.358 + stream = self.streams[frame.id] = new Stream(self, frame); 1.359 + 1.360 + // If we reached stream limit 1.361 + if (self.streamsCount > options.maxStreams) { 1.362 + stream.once('error', function onerror() {}); 1.363 + // REFUSED_STREAM 1.364 + stream._rstCode = 3; 1.365 + stream.destroy(true); 1.366 + } else { 1.367 + self.emit('stream', stream); 1.368 + 1.369 + stream._init(); 1.370 + } 1.371 + } else { 1.372 + if (frame.id) { 1.373 + // Load created one 1.374 + stream = self.streams[frame.id]; 1.375 + 1.376 + // Fail if not found 1.377 + if (stream === undefined) { 1.378 + if (frame.type === 'RST_STREAM') return; 1.379 + self.write(self._framer.rstFrame(frame.id, 2)); 1.380 + return; 1.381 + } 1.382 + } 1.383 + 1.384 + // Emit 'data' event 1.385 + if (frame.type === 'DATA') { 1.386 + if (frame.data.length > 0){ 1.387 + if (stream._closedBy.client) { 1.388 + stream._rstCode = 2; 1.389 + stream.emit('error', 'Writing to half-closed stream'); 1.390 + } else { 1.391 + stream._recv(frame.data); 1.392 + } 1.393 + } 1.394 + // Destroy stream if we was asked to do this 1.395 + } else if (frame.type === 'RST_STREAM') { 1.396 + stream._rstCode = 0; 1.397 + if (frame.status === 5) { 1.398 + // If client "cancels" connection - close stream and 1.399 + // all associated push streams without error 1.400 + stream.pushes.forEach(function(stream) { 1.401 + stream.close(); 1.402 + }); 1.403 + stream.close(); 1.404 + } else { 1.405 + // Emit error on destroy 1.406 + stream.destroy(new Error('Received rst: ' + frame.status)); 1.407 + } 1.408 + // Respond with same PING 1.409 + } else if (frame.type === 'PING') { 1.410 + self.write(self._framer.pingFrame(frame.pingId)); 1.411 + } else if (frame.type === 'SETTINGS') { 1.412 + self._setDefaultWindow(frame.settings); 1.413 + } else if (frame.type === 'GOAWAY') { 1.414 + self._goaway = frame.lastId; 1.415 + } else if (frame.type === 'WINDOW_UPDATE') { 1.416 + stream._drainSink(frame.delta); 1.417 + } else { 1.418 + console.error('Unknown type: ', frame.type); 1.419 + } 1.420 + } 1.421 + 1.422 + // Handle half-closed 1.423 + if (frame.fin) { 1.424 + // Don't allow to close stream twice 1.425 + if (stream._closedBy.client) { 1.426 + stream._rstCode = 2; 1.427 + stream.emit('error', 'Already half-closed'); 1.428 + } else { 1.429 + stream._closedBy.client = true; 1.430 + 1.431 + // Emulate last chunked fragment 1.432 + if (stream._forceChunked) { 1.433 + stream._recv(last_frag, true); 1.434 + } 1.435 + 1.436 + stream._handleClose(); 1.437 + } 1.438 + } 1.439 + }); 1.440 + 1.441 + this.parser.on('version', function onversion(version) { 1.442 + if (!pair) { 1.443 + pair = pool.get('spdy/' + version); 1.444 + self._deflate = pair.deflate; 1.445 + self._inflate = pair.inflate; 1.446 + } 1.447 + }); 1.448 + 1.449 + this.parser.on('framer', function onframer(framer) { 1.450 + // Generate custom settings frame and send 1.451 + self.write(framer.settingsFrame(options)); 1.452 + }); 1.453 + 1.454 + // Propagate parser errors 1.455 + this.parser.on('error', function onParserError(err) { 1.456 + self.emit('error', err); 1.457 + }); 1.458 + 1.459 + socket.pipe(this.parser); 1.460 + 1.461 + // 2 minutes socket timeout 1.462 + socket.setTimeout(2 * 60 * 1000); 1.463 + socket.once('timeout', function ontimeout() { 1.464 + socket.destroy(); 1.465 + }); 1.466 + 1.467 + // Allow high-level api to catch socket errors 1.468 + socket.on('error', function onSocketError(e) { 1.469 + self.emit('error', e); 1.470 + }); 1.471 + 1.472 + socket.once('close', function onclose() { 1.473 + self._closed = true; 1.474 + if (pair) pool.put(pair); 1.475 + }); 1.476 + 1.477 + if (legacy) { 1.478 + socket.on('drain', function ondrain() { 1.479 + self.emit('drain'); 1.480 + }); 1.481 + } 1.482 +} 1.483 +util.inherits(Connection, process.EventEmitter); 1.484 +exports.Connection = Connection; 1.485 + 1.486 +// 1.487 +// ### function write (data, encoding) 1.488 +// #### @data {String|Buffer} data 1.489 +// #### @encoding {String} (optional) encoding 1.490 +// Writes data to socket 1.491 +// 1.492 +Connection.prototype.write = function write(data, encoding) { 1.493 + if (this.socket.writable) { 1.494 + return this.socket.write(data, encoding); 1.495 + } 1.496 +}; 1.497 + 1.498 +// 1.499 +// ### function _setDefaultWindow (settings) 1.500 +// #### @settings {Object} 1.501 +// Update the default transfer window -- in the connection and in the 1.502 +// active streams 1.503 +// 1.504 +Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) { 1.505 + if (!settings) return; 1.506 + if (!settings.initial_window_size || 1.507 + settings.initial_window_size.persisted) { 1.508 + return; 1.509 + } 1.510 + 1.511 + this.sinkSize = settings.initial_window_size.value; 1.512 + 1.513 + Object.keys(this.streams).forEach(function(id) { 1.514 + this.streams[id]._updateSinkSize(settings.initial_window_size.value); 1.515 + }, this); 1.516 +}; 1.517 + 1.518 +// 1.519 +// ### function Stream (connection, frame) 1.520 +// #### @connection {Connection} SPDY Connection 1.521 +// #### @frame {Object} SYN_STREAM data 1.522 +// Abstract stream @constructor 1.523 +// 1.524 +function Stream(connection, frame) { 1.525 + DuplexStream.call(this); 1.526 + 1.527 + this.connection = connection; 1.528 + this.socket = connection.socket; 1.529 + this.encrypted = connection.encrypted; 1.530 + this._framer = connection._framer; 1.531 + this._initialized = false; 1.532 + 1.533 + // Should chunked encoding be forced 1.534 + this._forceChunked = false; 1.535 + 1.536 + this.ondata = this.onend = null; 1.537 + 1.538 + // RST_STREAM code if any 1.539 + this._rstCode = 1; 1.540 + this._destroyed = false; 1.541 + 1.542 + this._closedBy = { 1.543 + client: false, 1.544 + server: false 1.545 + }; 1.546 + 1.547 + // Lock data 1.548 + this._locked = false; 1.549 + this._lockBuffer = []; 1.550 + 1.551 + // Store id 1.552 + this.id = frame.id; 1.553 + this.version = frame.version; 1.554 + 1.555 + // Store priority 1.556 + this.priority = frame.priority; 1.557 + 1.558 + // Array of push streams associated to that one 1.559 + this.pushes = []; 1.560 + 1.561 + // How much data can be sent TO client before next WINDOW_UPDATE 1.562 + this._sinkSize = connection.sinkSize; 1.563 + this._initialSinkSize = connection.sinkSize; 1.564 + 1.565 + // When data needs to be send, but window is too small for it - it'll be 1.566 + // queued in this buffer 1.567 + this._sinkBuffer = []; 1.568 + 1.569 + // How much data can be sent BY client before next WINDOW_UPDATE 1.570 + this._initialWindowSize = connection.windowSize; 1.571 + this._windowSize = connection.windowSize; 1.572 + 1.573 + // Create compression streams 1.574 + this._deflate = connection._deflate; 1.575 + this._inflate = connection._inflate; 1.576 + 1.577 + // Store headers 1.578 + this.headers = frame.headers; 1.579 + this.url = frame.url; 1.580 + 1.581 + this._frame = frame; 1.582 + 1.583 + if (legacy) { 1.584 + this.readable = this.writable = true; 1.585 + } 1.586 + 1.587 + // Call .onend() 1.588 + this.once('end', function() { 1.589 + var self = this; 1.590 + process.nextTick(function() { 1.591 + if (self.onend) self.onend(); 1.592 + }); 1.593 + }); 1.594 + 1.595 + // Handle half-close 1.596 + this.once('finish', function() { 1.597 + this._writeData(true, []); 1.598 + this._closedBy.server = true; 1.599 + if (this._sinkBuffer.length !== 0) return; 1.600 + this._handleClose(); 1.601 + }); 1.602 +}; 1.603 +util.inherits(Stream, DuplexStream); 1.604 +exports.Stream = Stream; 1.605 + 1.606 +if (legacy) { 1.607 + Stream.prototype.pause = function pause() {}; 1.608 + Stream.prototype.resume = function resume() {}; 1.609 +} 1.610 + 1.611 +// 1.612 +// ### function _isGoaway () 1.613 +// Returns true if any writes to that stream should be ignored 1.614 +// 1.615 +Stream.prototype._isGoaway = function _isGoaway() { 1.616 + return this.connection._goaway && this.id > this.connection._goaway; 1.617 +}; 1.618 + 1.619 +// 1.620 +// ### function init () 1.621 +// Initialize stream, internal 1.622 +// 1.623 +Stream.prototype._init = function init() { 1.624 + var headers = this.headers, 1.625 + req = [headers.method + ' ' + this.url + ' ' + headers.version]; 1.626 + 1.627 + Object.keys(headers).forEach(function (key) { 1.628 + if (key !== 'method' && key !== 'url' && key !== 'version' && 1.629 + key !== 'scheme') { 1.630 + req.push(key + ': ' + headers[key]); 1.631 + } 1.632 + }); 1.633 + 1.634 + // Force chunked encoding 1.635 + if (!headers['content-length'] && !headers['transfer-encoding']) { 1.636 + req.push('Transfer-Encoding: chunked'); 1.637 + this._forceChunked = true; 1.638 + } 1.639 + 1.640 + // Add '\r\n\r\n' 1.641 + req.push('', ''); 1.642 + 1.643 + req = new Buffer(req.join('\r\n')); 1.644 + 1.645 + this._recv(req, true); 1.646 + this._initialized = true; 1.647 +}; 1.648 + 1.649 +// 1.650 +// ### function lock (callback) 1.651 +// #### @callback {Function} continuation callback 1.652 +// Acquire lock 1.653 +// 1.654 +Stream.prototype._lock = function lock(callback) { 1.655 + if (!callback) return; 1.656 + 1.657 + if (this._locked) { 1.658 + this._lockBuffer.push(callback); 1.659 + } else { 1.660 + this._locked = true; 1.661 + callback.call(this, null); 1.662 + } 1.663 +}; 1.664 + 1.665 +// 1.666 +// ### function unlock () 1.667 +// Release lock and call all buffered callbacks 1.668 +// 1.669 +Stream.prototype._unlock = function unlock() { 1.670 + if (this._locked) { 1.671 + this._locked = false; 1.672 + this._lock(this._lockBuffer.shift()); 1.673 + } 1.674 +}; 1.675 + 1.676 +// 1.677 +// ### function setTimeout () 1.678 +// TODO: use timers.enroll, timers.active, timers.unenroll 1.679 +// 1.680 +Stream.prototype.setTimeout = function setTimeout(time) {}; 1.681 + 1.682 +// 1.683 +// ### function _handleClose () 1.684 +// Close stream if it was closed by both server and client 1.685 +// 1.686 +Stream.prototype._handleClose = function _handleClose() { 1.687 + if (this._closedBy.client && this._closedBy.server) { 1.688 + this.close(); 1.689 + } 1.690 +}; 1.691 + 1.692 +// 1.693 +// ### function close () 1.694 +// Destroys stream 1.695 +// 1.696 +Stream.prototype.close = function close() { 1.697 + this.destroy(); 1.698 +}; 1.699 + 1.700 +// 1.701 +// ### function destroy (error) 1.702 +// #### @error {Error} (optional) error 1.703 +// Destroys stream 1.704 +// 1.705 +Stream.prototype.destroy = function destroy(error) { 1.706 + if (this._destroyed) return; 1.707 + this._destroyed = true; 1.708 + 1.709 + delete this.connection.streams[this.id]; 1.710 + if (this.id % 2 === 1) { 1.711 + this.connection.streamsCount--; 1.712 + } 1.713 + 1.714 + // If stream is not finished, RST frame should be sent to notify client 1.715 + // about sudden stream termination. 1.716 + if (error || !this._closedBy.server) { 1.717 + // REFUSED_STREAM if terminated before 'finish' event 1.718 + if (!this._closedBy.server) this._rstCode = 3; 1.719 + 1.720 + if (this._rstCode) { 1.721 + this._lock(function() { 1.722 + this.connection.scheduler.schedule( 1.723 + this, 1.724 + this._framer.rstFrame(this.id, this._rstCode)); 1.725 + this.connection.scheduler.tick(); 1.726 + 1.727 + this._unlock(); 1.728 + }); 1.729 + } 1.730 + } 1.731 + 1.732 + if (legacy) { 1.733 + this.emit('end'); 1.734 + } else { 1.735 + this.push(null); 1.736 + } 1.737 + 1.738 + if (error) this.emit('error', error); 1.739 + 1.740 + var self = this; 1.741 + process.nextTick(function() { 1.742 + self.emit('close', !!error); 1.743 + }); 1.744 +}; 1.745 + 1.746 +Stream.prototype.destroySoon = function destroySoon(error) { 1.747 + return this.destroy(error); 1.748 +}; 1.749 + 1.750 +Stream.prototype._drainSink = function _drainSink(size) { 1.751 + var oldBuffer = this._sinkBuffer; 1.752 + this._sinkBuffer = []; 1.753 + 1.754 + this._sinkSize += size; 1.755 + 1.756 + for (var i = 0; i < oldBuffer.length; i++) { 1.757 + this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]); 1.758 + } 1.759 + 1.760 + // Handle half-close 1.761 + if (this._sinkBuffer.length === 0 && this._closedBy.server) { 1.762 + this._handleClose(); 1.763 + } 1.764 + 1.765 + if (legacy) this.emit('drain'); 1.766 +}; 1.767 + 1.768 +// 1.769 +// ### function _writeData (fin, buffer, cb) 1.770 +// #### @fin {Boolean} 1.771 +// #### @buffer {Buffer} 1.772 +// #### @cb {Function} **optional** 1.773 +// Internal function 1.774 +// 1.775 +Stream.prototype._writeData = function _writeData(fin, buffer, cb) { 1.776 + if (this._framer.version === 3) { 1.777 + // Window was exhausted, queue data 1.778 + if (this._sinkSize <= 0) { 1.779 + this._sinkBuffer.push([fin, buffer, cb]); 1.780 + return false; 1.781 + } 1.782 + 1.783 + var len = Math.min(this._sinkSize, buffer.length); 1.784 + this._sinkSize -= len; 1.785 + 1.786 + // Only partial write is possible, queue rest for later 1.787 + if (len < buffer.length) { 1.788 + this._sinkBuffer.push([fin, buffer.slice(len)]); 1.789 + buffer = buffer.slice(0, len); 1.790 + fin = false; 1.791 + } 1.792 + } 1.793 + 1.794 + this._lock(function() { 1.795 + var stream = this, 1.796 + frame = this._framer.dataFrame(this.id, fin, buffer); 1.797 + 1.798 + stream.connection.scheduler.schedule(stream, frame); 1.799 + stream.connection.scheduler.tick(); 1.800 + 1.801 + this._unlock(); 1.802 + 1.803 + if (cb) cb(); 1.804 + }); 1.805 + 1.806 + return true; 1.807 +}; 1.808 + 1.809 +// 1.810 +// ### function write (data, encoding) 1.811 +// #### @data {Buffer|String} data 1.812 +// #### @encoding {String} data encoding 1.813 +// Writes data to connection 1.814 +// 1.815 +Stream.prototype._write = function write(data, encoding, cb) { 1.816 + // Do not send data to new connections after GOAWAY 1.817 + if (this._isGoaway()) { 1.818 + if (cb) cb(); 1.819 + return false; 1.820 + } 1.821 + 1.822 + return this._writeData(false, data, cb); 1.823 +}; 1.824 + 1.825 +if (legacy) { 1.826 + Stream.prototype.write = function write(data, encoding, cb) { 1.827 + if (!Buffer.isBuffer(data)) { 1.828 + return this._write(new Buffer(data, encoding), null, cb); 1.829 + } else { 1.830 + return this._write(data, encoding, cb); 1.831 + } 1.832 + }; 1.833 + 1.834 + // 1.835 + // ### function end (data) 1.836 + // #### @data {Buffer|String} (optional) data to write before ending stream 1.837 + // #### @encoding {String} (optional) string encoding 1.838 + // Send FIN data frame 1.839 + // 1.840 + Stream.prototype.end = function end(data, encoding) { 1.841 + // Do not send data to new connections after GOAWAY 1.842 + if (this._isGoaway()) return; 1.843 + 1.844 + if (data) this.write(data, encoding); 1.845 + this.emit('finish'); 1.846 + }; 1.847 +} 1.848 + 1.849 +// 1.850 +// ### function _recv (data) 1.851 +// #### @data {Buffer} buffer to receive 1.852 +// #### @chunked {Boolean} 1.853 +// (internal) 1.854 +// 1.855 +Stream.prototype._recv = function _recv(data, chunked) { 1.856 + // Update window if exhausted 1.857 + if (!chunked && this._framer.version >= 3 && this._initialized) { 1.858 + this._windowSize -= data.length; 1.859 + 1.860 + if (this._windowSize <= 0) { 1.861 + var delta = this._initialWindowSize - this._windowSize; 1.862 + this._windowSize += delta; 1.863 + this.connection.write(this._framer.windowUpdateFrame(this.id, delta)); 1.864 + } 1.865 + } 1.866 + 1.867 + // Emulate chunked encoding 1.868 + if (this._forceChunked && !chunked) { 1.869 + // Zero-chunks are treated as end, do not emit them 1.870 + if (data.length === 0) return; 1.871 + 1.872 + this._recv(new Buffer(data.length.toString(16)), true); 1.873 + this._recv(crlf, true); 1.874 + this._recv(data, true); 1.875 + this._recv(crlf, true); 1.876 + return; 1.877 + } 1.878 + 1.879 + if (legacy) { 1.880 + var self = this; 1.881 + process.nextTick(function() { 1.882 + self.emit('data', data); 1.883 + if (self.ondata) { 1.884 + self.ondata(data, 0, data.length); 1.885 + } 1.886 + }); 1.887 + } else { 1.888 + // Right now, http module expects socket to be working in streams1 mode. 1.889 + if (this.ondata) { 1.890 + this.ondata(data, 0, data.length); 1.891 + } else { 1.892 + this.push(data); 1.893 + } 1.894 + } 1.895 +}; 1.896 + 1.897 +// 1.898 +// ### function _read (bytes, cb) 1.899 +// #### @bytes {Number} number of bytes to read 1.900 +// Streams2 API 1.901 +// 1.902 +Stream.prototype._read = function read(bytes) { 1.903 + // NOP 1.904 +}; 1.905 + 1.906 +// 1.907 +// ### function _updateSinkSize (size) 1.908 +// #### @size {Integer} 1.909 +// Update the internal data transfer window 1.910 +// 1.911 +Stream.prototype._updateSinkSize = function _updateSinkSize(size) { 1.912 + var diff = size - this._initialSinkSize; 1.913 + 1.914 + this._initialSinkSize = size; 1.915 + this._drainSink(diff); 1.916 +}; 1.917 + 1.918 +// 1.919 +// `net` compatibility layer 1.920 +// (Copy pasted from lib/tls.js from node.js) 1.921 +// 1.922 +Stream.prototype.address = function address() { 1.923 + return this.socket && this.socket.address(); 1.924 +}; 1.925 + 1.926 +Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() { 1.927 + return this.socket && this.socket.remoteAddress; 1.928 +}); 1.929 + 1.930 +Stream.prototype.__defineGetter__('remotePort', function remotePort() { 1.931 + return this.socket && this.socket.remotePort; 1.932 +});