michael@0: var spdy = require('../spdy'), michael@0: util = require('util'), michael@0: https = require('https'), michael@0: stream = require('stream'), michael@0: Buffer = require('buffer').Buffer; michael@0: michael@0: var crlf = new Buffer('\r\n'); michael@0: var last_frag = new Buffer('0\r\n\r\n'); michael@0: michael@0: var legacy = !stream.Duplex; michael@0: michael@0: if (legacy) { michael@0: var DuplexStream = stream; michael@0: } else { michael@0: var DuplexStream = stream.Duplex; michael@0: } michael@0: michael@0: // michael@0: // ### function instantiate (HTTPSServer) michael@0: // #### @HTTPSServer {https.Server|Function} Base server class michael@0: // Will return constructor for SPDY Server, based on the HTTPSServer class michael@0: // michael@0: function instantiate(HTTPSServer) { michael@0: // michael@0: // ### function Server (options, requestListener) michael@0: // #### @options {Object} tls server options michael@0: // #### @requestListener {Function} (optional) request callback michael@0: // SPDY Server @constructor michael@0: // michael@0: function Server(options, requestListener) { michael@0: // Initialize michael@0: this._init(HTTPSServer, options, requestListener); michael@0: michael@0: // Wrap connection handler michael@0: this._wrap(); michael@0: }; michael@0: util.inherits(Server, HTTPSServer); michael@0: michael@0: // Copy prototype methods michael@0: Object.keys(proto).forEach(function(key) { michael@0: this[key] = proto[key]; michael@0: }, Server.prototype); michael@0: michael@0: return Server; michael@0: } michael@0: exports.instantiate = instantiate; michael@0: michael@0: // Common prototype for all servers michael@0: var proto = {}; michael@0: michael@0: // michael@0: // ### function _init(base, options, listener) michael@0: // #### @base {Function} (optional) base server class (https.Server) michael@0: // #### @options {Object} tls server options michael@0: // #### @handler {Function} (optional) request handler michael@0: // Initializer. michael@0: // michael@0: proto._init = function _init(base, options, handler) { michael@0: var state = {}; michael@0: this._spdyState = state; michael@0: michael@0: if (!options) options = {}; michael@0: if (!options.maxStreams) options.maxStreams = 100; michael@0: if (!options.sinkSize) { michael@0: options.sinkSize = 1 << 16; michael@0: } michael@0: if (!options.windowSize) { michael@0: options.windowSize = 1 << 20; // 1mb michael@0: } michael@0: michael@0: options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; michael@0: options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; michael@0: state.options = options; michael@0: state.reqHandler = handler; michael@0: michael@0: if (options.plain && !options.ssl) { michael@0: base.call(this, handler); michael@0: } else { michael@0: base.call(this, options, handler); michael@0: } michael@0: michael@0: // Use https if NPN is not supported michael@0: if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) { michael@0: return; michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function _wrap() michael@0: // Wrap connection handler and add logic. michael@0: // michael@0: proto._wrap = function _wrap() { michael@0: var self = this, michael@0: state = this._spdyState; michael@0: michael@0: // Wrap connection handler michael@0: var event = state.options.plain && !state.options.ssl ? 'connection' : michael@0: 'secureConnection', michael@0: handler = this.listeners(event)[0]; michael@0: michael@0: state.pool = spdy.zlibpool.create(); michael@0: state.handler = handler; michael@0: michael@0: this.removeAllListeners(event); michael@0: michael@0: // Normal mode, use NPN to fallback to HTTPS michael@0: if (!state.options.plain) { michael@0: return this.on(event, this._onConnection.bind(this)); michael@0: } michael@0: michael@0: // In case of plain connection, we must fallback to HTTPS if first byte michael@0: // is not equal to 0x80. michael@0: this.on(event, function(socket) { michael@0: var history = [], michael@0: _emit = socket.emit; michael@0: michael@0: // Add 'data' listener, otherwise 'data' events won't be emitted michael@0: if (legacy) { michael@0: function ondata() {}; michael@0: socket.once('data', ondata); michael@0: } michael@0: michael@0: // 2 minutes timeout, as http.js does by default michael@0: socket.setTimeout(self.timeout || 2 * 60 * 1000); michael@0: michael@0: socket.emit = function emit(event, data) { michael@0: history.push(Array.prototype.slice.call(arguments)); michael@0: michael@0: if (event === 'data') { michael@0: // Legacy michael@0: onFirstByte.call(socket, data); michael@0: } else if (event === 'readable') { michael@0: // Streams michael@0: onReadable.call(socket); michael@0: } else if (event === 'end' || michael@0: event === 'close' || michael@0: event === 'error' || michael@0: event === 'timeout') { michael@0: // We shouldn't get there if any data was received michael@0: fail(); michael@0: } michael@0: }; michael@0: michael@0: function fail() { michael@0: socket.emit = _emit; michael@0: history = null; michael@0: try { michael@0: socket.destroy(); michael@0: } catch (e) { michael@0: } michael@0: } michael@0: michael@0: function restore() { michael@0: var copy = history.slice(); michael@0: history = null; michael@0: michael@0: if (legacy) socket.removeListener('data', ondata); michael@0: socket.emit = _emit; michael@0: for (var i = 0; i < copy.length; i++) { michael@0: socket.emit.apply(socket, copy[i]); michael@0: if (copy[i][0] === 'end') { michael@0: if (socket.onend) socket.onend(); michael@0: } michael@0: } michael@0: } michael@0: michael@0: function onFirstByte(data) { michael@0: // Ignore empty packets michael@0: if (data.length === 0) return; michael@0: michael@0: if (data[0] === 0x80) { michael@0: self._onConnection(socket); michael@0: } else { michael@0: handler.call(self, socket); michael@0: } michael@0: michael@0: // Fire events michael@0: restore(); michael@0: michael@0: // NOTE: If we came there - .ondata() will be called anyway in this tick, michael@0: // so there're no need to call it manually michael@0: }; michael@0: michael@0: if (!legacy) { michael@0: // Hack to make streams2 work properly michael@0: socket.on('readable', onReadable); michael@0: } michael@0: michael@0: function onReadable() { michael@0: var data = socket.read(1); michael@0: michael@0: // Ignore empty packets michael@0: if (!data) return; michael@0: socket.removeListener('readable', onReadable); michael@0: michael@0: // `.unshift()` emits `readable` event. Thus `emit` method should michael@0: // be restored before calling it. michael@0: socket.emit = _emit; michael@0: michael@0: // Put packet back where it was before michael@0: socket.unshift(data); michael@0: michael@0: if (data[0] === 0x80) { michael@0: self._onConnection(socket); michael@0: } else { michael@0: handler.call(self, socket); michael@0: } michael@0: michael@0: // Fire events michael@0: restore(); michael@0: michael@0: if (socket.ondata) { michael@0: data = socket.read(socket._readableState.length); michael@0: if (data) socket.ondata(data, 0, data.length); michael@0: } michael@0: } michael@0: }); michael@0: }; michael@0: michael@0: // michael@0: // ### function _onConnection (socket) michael@0: // #### @socket {Stream} incoming socket michael@0: // Server's connection handler wrapper. michael@0: // michael@0: proto._onConnection = function _onConnection(socket) { michael@0: var self = this, michael@0: state = this._spdyState; michael@0: michael@0: // Fallback to HTTPS if needed michael@0: var selectedProtocol = socket.npnProtocol || socket.alpnProtocol; michael@0: if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) && michael@0: !state.options.debug && !state.options.plain) { michael@0: return state.handler.call(this, socket); michael@0: } michael@0: michael@0: // Wrap incoming socket into abstract class michael@0: var connection = new Connection(socket, state.pool, state.options); michael@0: michael@0: // Emulate each stream like connection michael@0: connection.on('stream', state.handler); michael@0: michael@0: connection.on('connect', function onconnect(req, socket) { michael@0: socket.streamID = req.streamID = req.socket.id; michael@0: socket.isSpdy = req.isSpdy = true; michael@0: socket.spdyVersion = req.spdyVersion = req.socket.version; michael@0: michael@0: socket.once('finish', function onfinish() { michael@0: req.connection.end(); michael@0: }); michael@0: michael@0: self.emit('connect', req, socket); michael@0: }); michael@0: michael@0: connection.on('request', function onrequest(req, res) { michael@0: res._renderHeaders = spdy.response._renderHeaders; michael@0: res.writeHead = spdy.response.writeHead; michael@0: res.push = spdy.response.push; michael@0: res.streamID = req.streamID = req.socket.id; michael@0: res.spdyVersion = req.spdyVersion = req.socket.version; michael@0: res.isSpdy = req.isSpdy = true; michael@0: michael@0: // Chunked encoding is not supported in SPDY michael@0: res.useChunkedEncodingByDefault = false; michael@0: michael@0: res.once('finish', function onfinish() { michael@0: req.connection.end(); michael@0: }); michael@0: michael@0: self.emit('request', req, res); michael@0: }); michael@0: michael@0: connection.on('error', function onerror(e) { michael@0: console.log('[secureConnection] error ' + e); michael@0: socket.destroy(e.errno === 'EPIPE' ? undefined : e); michael@0: }); michael@0: }; michael@0: michael@0: // Export Server instantiated from https.Server michael@0: var Server = instantiate(https.Server); michael@0: exports.Server = Server; michael@0: michael@0: // michael@0: // ### function create (base, options, requestListener) michael@0: // #### @base {Function} (optional) base server class (https.Server) michael@0: // #### @options {Object} tls server options michael@0: // #### @requestListener {Function} (optional) request callback michael@0: // @constructor wrapper michael@0: // michael@0: exports.create = function create(base, options, requestListener) { michael@0: var server; michael@0: if (typeof base === 'function') { michael@0: server = instantiate(base); michael@0: } else { michael@0: server = Server; michael@0: michael@0: requestListener = options; michael@0: options = base; michael@0: base = null; michael@0: } michael@0: michael@0: return new server(options, requestListener); michael@0: }; michael@0: michael@0: // michael@0: // ### function Connection (socket, pool, options) michael@0: // #### @socket {net.Socket} server's connection michael@0: // #### @pool {spdy.ZlibPool} zlib pool michael@0: // #### @options {Object} server's options michael@0: // Abstract connection @constructor michael@0: // michael@0: function Connection(socket, pool, options) { michael@0: process.EventEmitter.call(this); michael@0: michael@0: var self = this; michael@0: michael@0: this._closed = false; michael@0: michael@0: this.pool = pool; michael@0: var pair = null; michael@0: michael@0: this._deflate = null; michael@0: this._inflate = null; michael@0: michael@0: this.encrypted = socket.encrypted; michael@0: michael@0: // Init streams list michael@0: this.streams = {}; michael@0: this.streamsCount = 0; michael@0: this.pushId = 0; michael@0: this._goaway = false; michael@0: michael@0: this._framer = null; michael@0: michael@0: // Data transfer window defaults to 64kb michael@0: this.windowSize = options.windowSize; michael@0: this.sinkSize = options.sinkSize; michael@0: michael@0: // Initialize scheduler michael@0: this.scheduler = spdy.scheduler.create(this); michael@0: michael@0: // Store socket and pipe it to parser michael@0: this.socket = socket; michael@0: michael@0: // Initialize parser michael@0: this.parser = spdy.parser.create(this); michael@0: this.parser.on('frame', function (frame) { michael@0: if (this._closed) return; michael@0: michael@0: var stream; michael@0: michael@0: // Create new stream michael@0: if (frame.type === 'SYN_STREAM') { michael@0: self.streamsCount++; michael@0: michael@0: stream = self.streams[frame.id] = new Stream(self, frame); michael@0: michael@0: // If we reached stream limit michael@0: if (self.streamsCount > options.maxStreams) { michael@0: stream.once('error', function onerror() {}); michael@0: // REFUSED_STREAM michael@0: stream._rstCode = 3; michael@0: stream.destroy(true); michael@0: } else { michael@0: self.emit('stream', stream); michael@0: michael@0: stream._init(); michael@0: } michael@0: } else { michael@0: if (frame.id) { michael@0: // Load created one michael@0: stream = self.streams[frame.id]; michael@0: michael@0: // Fail if not found michael@0: if (stream === undefined) { michael@0: if (frame.type === 'RST_STREAM') return; michael@0: self.write(self._framer.rstFrame(frame.id, 2)); michael@0: return; michael@0: } michael@0: } michael@0: michael@0: // Emit 'data' event michael@0: if (frame.type === 'DATA') { michael@0: if (frame.data.length > 0){ michael@0: if (stream._closedBy.client) { michael@0: stream._rstCode = 2; michael@0: stream.emit('error', 'Writing to half-closed stream'); michael@0: } else { michael@0: stream._recv(frame.data); michael@0: } michael@0: } michael@0: // Destroy stream if we was asked to do this michael@0: } else if (frame.type === 'RST_STREAM') { michael@0: stream._rstCode = 0; michael@0: if (frame.status === 5) { michael@0: // If client "cancels" connection - close stream and michael@0: // all associated push streams without error michael@0: stream.pushes.forEach(function(stream) { michael@0: stream.close(); michael@0: }); michael@0: stream.close(); michael@0: } else { michael@0: // Emit error on destroy michael@0: stream.destroy(new Error('Received rst: ' + frame.status)); michael@0: } michael@0: // Respond with same PING michael@0: } else if (frame.type === 'PING') { michael@0: self.write(self._framer.pingFrame(frame.pingId)); michael@0: } else if (frame.type === 'SETTINGS') { michael@0: self._setDefaultWindow(frame.settings); michael@0: } else if (frame.type === 'GOAWAY') { michael@0: self._goaway = frame.lastId; michael@0: } else if (frame.type === 'WINDOW_UPDATE') { michael@0: stream._drainSink(frame.delta); michael@0: } else { michael@0: console.error('Unknown type: ', frame.type); michael@0: } michael@0: } michael@0: michael@0: // Handle half-closed michael@0: if (frame.fin) { michael@0: // Don't allow to close stream twice michael@0: if (stream._closedBy.client) { michael@0: stream._rstCode = 2; michael@0: stream.emit('error', 'Already half-closed'); michael@0: } else { michael@0: stream._closedBy.client = true; michael@0: michael@0: // Emulate last chunked fragment michael@0: if (stream._forceChunked) { michael@0: stream._recv(last_frag, true); michael@0: } michael@0: michael@0: stream._handleClose(); michael@0: } michael@0: } michael@0: }); michael@0: michael@0: this.parser.on('version', function onversion(version) { michael@0: if (!pair) { michael@0: pair = pool.get('spdy/' + version); michael@0: self._deflate = pair.deflate; michael@0: self._inflate = pair.inflate; michael@0: } michael@0: }); michael@0: michael@0: this.parser.on('framer', function onframer(framer) { michael@0: // Generate custom settings frame and send michael@0: self.write(framer.settingsFrame(options)); michael@0: }); michael@0: michael@0: // Propagate parser errors michael@0: this.parser.on('error', function onParserError(err) { michael@0: self.emit('error', err); michael@0: }); michael@0: michael@0: socket.pipe(this.parser); michael@0: michael@0: // 2 minutes socket timeout michael@0: socket.setTimeout(2 * 60 * 1000); michael@0: socket.once('timeout', function ontimeout() { michael@0: socket.destroy(); michael@0: }); michael@0: michael@0: // Allow high-level api to catch socket errors michael@0: socket.on('error', function onSocketError(e) { michael@0: self.emit('error', e); michael@0: }); michael@0: michael@0: socket.once('close', function onclose() { michael@0: self._closed = true; michael@0: if (pair) pool.put(pair); michael@0: }); michael@0: michael@0: if (legacy) { michael@0: socket.on('drain', function ondrain() { michael@0: self.emit('drain'); michael@0: }); michael@0: } michael@0: } michael@0: util.inherits(Connection, process.EventEmitter); michael@0: exports.Connection = Connection; michael@0: michael@0: // michael@0: // ### function write (data, encoding) michael@0: // #### @data {String|Buffer} data michael@0: // #### @encoding {String} (optional) encoding michael@0: // Writes data to socket michael@0: // michael@0: Connection.prototype.write = function write(data, encoding) { michael@0: if (this.socket.writable) { michael@0: return this.socket.write(data, encoding); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function _setDefaultWindow (settings) michael@0: // #### @settings {Object} michael@0: // Update the default transfer window -- in the connection and in the michael@0: // active streams michael@0: // michael@0: Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) { michael@0: if (!settings) return; michael@0: if (!settings.initial_window_size || michael@0: settings.initial_window_size.persisted) { michael@0: return; michael@0: } michael@0: michael@0: this.sinkSize = settings.initial_window_size.value; michael@0: michael@0: Object.keys(this.streams).forEach(function(id) { michael@0: this.streams[id]._updateSinkSize(settings.initial_window_size.value); michael@0: }, this); michael@0: }; michael@0: michael@0: // michael@0: // ### function Stream (connection, frame) michael@0: // #### @connection {Connection} SPDY Connection michael@0: // #### @frame {Object} SYN_STREAM data michael@0: // Abstract stream @constructor michael@0: // michael@0: function Stream(connection, frame) { michael@0: DuplexStream.call(this); michael@0: michael@0: this.connection = connection; michael@0: this.socket = connection.socket; michael@0: this.encrypted = connection.encrypted; michael@0: this._framer = connection._framer; michael@0: this._initialized = false; michael@0: michael@0: // Should chunked encoding be forced michael@0: this._forceChunked = false; michael@0: michael@0: this.ondata = this.onend = null; michael@0: michael@0: // RST_STREAM code if any michael@0: this._rstCode = 1; michael@0: this._destroyed = false; michael@0: michael@0: this._closedBy = { michael@0: client: false, michael@0: server: false michael@0: }; michael@0: michael@0: // Lock data michael@0: this._locked = false; michael@0: this._lockBuffer = []; michael@0: michael@0: // Store id michael@0: this.id = frame.id; michael@0: this.version = frame.version; michael@0: michael@0: // Store priority michael@0: this.priority = frame.priority; michael@0: michael@0: // Array of push streams associated to that one michael@0: this.pushes = []; michael@0: michael@0: // How much data can be sent TO client before next WINDOW_UPDATE michael@0: this._sinkSize = connection.sinkSize; michael@0: this._initialSinkSize = connection.sinkSize; michael@0: michael@0: // When data needs to be send, but window is too small for it - it'll be michael@0: // queued in this buffer michael@0: this._sinkBuffer = []; michael@0: michael@0: // How much data can be sent BY client before next WINDOW_UPDATE michael@0: this._initialWindowSize = connection.windowSize; michael@0: this._windowSize = connection.windowSize; michael@0: michael@0: // Create compression streams michael@0: this._deflate = connection._deflate; michael@0: this._inflate = connection._inflate; michael@0: michael@0: // Store headers michael@0: this.headers = frame.headers; michael@0: this.url = frame.url; michael@0: michael@0: this._frame = frame; michael@0: michael@0: if (legacy) { michael@0: this.readable = this.writable = true; michael@0: } michael@0: michael@0: // Call .onend() michael@0: this.once('end', function() { michael@0: var self = this; michael@0: process.nextTick(function() { michael@0: if (self.onend) self.onend(); michael@0: }); michael@0: }); michael@0: michael@0: // Handle half-close michael@0: this.once('finish', function() { michael@0: this._writeData(true, []); michael@0: this._closedBy.server = true; michael@0: if (this._sinkBuffer.length !== 0) return; michael@0: this._handleClose(); michael@0: }); michael@0: }; michael@0: util.inherits(Stream, DuplexStream); michael@0: exports.Stream = Stream; michael@0: michael@0: if (legacy) { michael@0: Stream.prototype.pause = function pause() {}; michael@0: Stream.prototype.resume = function resume() {}; michael@0: } michael@0: michael@0: // michael@0: // ### function _isGoaway () michael@0: // Returns true if any writes to that stream should be ignored michael@0: // michael@0: Stream.prototype._isGoaway = function _isGoaway() { michael@0: return this.connection._goaway && this.id > this.connection._goaway; michael@0: }; michael@0: michael@0: // michael@0: // ### function init () michael@0: // Initialize stream, internal michael@0: // michael@0: Stream.prototype._init = function init() { michael@0: var headers = this.headers, michael@0: req = [headers.method + ' ' + this.url + ' ' + headers.version]; michael@0: michael@0: Object.keys(headers).forEach(function (key) { michael@0: if (key !== 'method' && key !== 'url' && key !== 'version' && michael@0: key !== 'scheme') { michael@0: req.push(key + ': ' + headers[key]); michael@0: } michael@0: }); michael@0: michael@0: // Force chunked encoding michael@0: if (!headers['content-length'] && !headers['transfer-encoding']) { michael@0: req.push('Transfer-Encoding: chunked'); michael@0: this._forceChunked = true; michael@0: } michael@0: michael@0: // Add '\r\n\r\n' michael@0: req.push('', ''); michael@0: michael@0: req = new Buffer(req.join('\r\n')); michael@0: michael@0: this._recv(req, true); michael@0: this._initialized = true; michael@0: }; michael@0: michael@0: // michael@0: // ### function lock (callback) michael@0: // #### @callback {Function} continuation callback michael@0: // Acquire lock michael@0: // michael@0: Stream.prototype._lock = function lock(callback) { michael@0: if (!callback) return; michael@0: michael@0: if (this._locked) { michael@0: this._lockBuffer.push(callback); michael@0: } else { michael@0: this._locked = true; michael@0: callback.call(this, null); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function unlock () michael@0: // Release lock and call all buffered callbacks michael@0: // michael@0: Stream.prototype._unlock = function unlock() { michael@0: if (this._locked) { michael@0: this._locked = false; michael@0: this._lock(this._lockBuffer.shift()); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function setTimeout () michael@0: // TODO: use timers.enroll, timers.active, timers.unenroll michael@0: // michael@0: Stream.prototype.setTimeout = function setTimeout(time) {}; michael@0: michael@0: // michael@0: // ### function _handleClose () michael@0: // Close stream if it was closed by both server and client michael@0: // michael@0: Stream.prototype._handleClose = function _handleClose() { michael@0: if (this._closedBy.client && this._closedBy.server) { michael@0: this.close(); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function close () michael@0: // Destroys stream michael@0: // michael@0: Stream.prototype.close = function close() { michael@0: this.destroy(); michael@0: }; michael@0: michael@0: // michael@0: // ### function destroy (error) michael@0: // #### @error {Error} (optional) error michael@0: // Destroys stream michael@0: // michael@0: Stream.prototype.destroy = function destroy(error) { michael@0: if (this._destroyed) return; michael@0: this._destroyed = true; michael@0: michael@0: delete this.connection.streams[this.id]; michael@0: if (this.id % 2 === 1) { michael@0: this.connection.streamsCount--; michael@0: } michael@0: michael@0: // If stream is not finished, RST frame should be sent to notify client michael@0: // about sudden stream termination. michael@0: if (error || !this._closedBy.server) { michael@0: // REFUSED_STREAM if terminated before 'finish' event michael@0: if (!this._closedBy.server) this._rstCode = 3; michael@0: michael@0: if (this._rstCode) { michael@0: this._lock(function() { michael@0: this.connection.scheduler.schedule( michael@0: this, michael@0: this._framer.rstFrame(this.id, this._rstCode)); michael@0: this.connection.scheduler.tick(); michael@0: michael@0: this._unlock(); michael@0: }); michael@0: } michael@0: } michael@0: michael@0: if (legacy) { michael@0: this.emit('end'); michael@0: } else { michael@0: this.push(null); michael@0: } michael@0: michael@0: if (error) this.emit('error', error); michael@0: michael@0: var self = this; michael@0: process.nextTick(function() { michael@0: self.emit('close', !!error); michael@0: }); michael@0: }; michael@0: michael@0: Stream.prototype.destroySoon = function destroySoon(error) { michael@0: return this.destroy(error); michael@0: }; michael@0: michael@0: Stream.prototype._drainSink = function _drainSink(size) { michael@0: var oldBuffer = this._sinkBuffer; michael@0: this._sinkBuffer = []; michael@0: michael@0: this._sinkSize += size; michael@0: michael@0: for (var i = 0; i < oldBuffer.length; i++) { michael@0: this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]); michael@0: } michael@0: michael@0: // Handle half-close michael@0: if (this._sinkBuffer.length === 0 && this._closedBy.server) { michael@0: this._handleClose(); michael@0: } michael@0: michael@0: if (legacy) this.emit('drain'); michael@0: }; michael@0: michael@0: // michael@0: // ### function _writeData (fin, buffer, cb) michael@0: // #### @fin {Boolean} michael@0: // #### @buffer {Buffer} michael@0: // #### @cb {Function} **optional** michael@0: // Internal function michael@0: // michael@0: Stream.prototype._writeData = function _writeData(fin, buffer, cb) { michael@0: if (this._framer.version === 3) { michael@0: // Window was exhausted, queue data michael@0: if (this._sinkSize <= 0) { michael@0: this._sinkBuffer.push([fin, buffer, cb]); michael@0: return false; michael@0: } michael@0: michael@0: var len = Math.min(this._sinkSize, buffer.length); michael@0: this._sinkSize -= len; michael@0: michael@0: // Only partial write is possible, queue rest for later michael@0: if (len < buffer.length) { michael@0: this._sinkBuffer.push([fin, buffer.slice(len)]); michael@0: buffer = buffer.slice(0, len); michael@0: fin = false; michael@0: } michael@0: } michael@0: michael@0: this._lock(function() { michael@0: var stream = this, michael@0: frame = this._framer.dataFrame(this.id, fin, buffer); michael@0: michael@0: stream.connection.scheduler.schedule(stream, frame); michael@0: stream.connection.scheduler.tick(); michael@0: michael@0: this._unlock(); michael@0: michael@0: if (cb) cb(); michael@0: }); michael@0: michael@0: return true; michael@0: }; michael@0: michael@0: // michael@0: // ### function write (data, encoding) michael@0: // #### @data {Buffer|String} data michael@0: // #### @encoding {String} data encoding michael@0: // Writes data to connection michael@0: // michael@0: Stream.prototype._write = function write(data, encoding, cb) { michael@0: // Do not send data to new connections after GOAWAY michael@0: if (this._isGoaway()) { michael@0: if (cb) cb(); michael@0: return false; michael@0: } michael@0: michael@0: return this._writeData(false, data, cb); michael@0: }; michael@0: michael@0: if (legacy) { michael@0: Stream.prototype.write = function write(data, encoding, cb) { michael@0: if (!Buffer.isBuffer(data)) { michael@0: return this._write(new Buffer(data, encoding), null, cb); michael@0: } else { michael@0: return this._write(data, encoding, cb); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function end (data) michael@0: // #### @data {Buffer|String} (optional) data to write before ending stream michael@0: // #### @encoding {String} (optional) string encoding michael@0: // Send FIN data frame michael@0: // michael@0: Stream.prototype.end = function end(data, encoding) { michael@0: // Do not send data to new connections after GOAWAY michael@0: if (this._isGoaway()) return; michael@0: michael@0: if (data) this.write(data, encoding); michael@0: this.emit('finish'); michael@0: }; michael@0: } michael@0: michael@0: // michael@0: // ### function _recv (data) michael@0: // #### @data {Buffer} buffer to receive michael@0: // #### @chunked {Boolean} michael@0: // (internal) michael@0: // michael@0: Stream.prototype._recv = function _recv(data, chunked) { michael@0: // Update window if exhausted michael@0: if (!chunked && this._framer.version >= 3 && this._initialized) { michael@0: this._windowSize -= data.length; michael@0: michael@0: if (this._windowSize <= 0) { michael@0: var delta = this._initialWindowSize - this._windowSize; michael@0: this._windowSize += delta; michael@0: this.connection.write(this._framer.windowUpdateFrame(this.id, delta)); michael@0: } michael@0: } michael@0: michael@0: // Emulate chunked encoding michael@0: if (this._forceChunked && !chunked) { michael@0: // Zero-chunks are treated as end, do not emit them michael@0: if (data.length === 0) return; michael@0: michael@0: this._recv(new Buffer(data.length.toString(16)), true); michael@0: this._recv(crlf, true); michael@0: this._recv(data, true); michael@0: this._recv(crlf, true); michael@0: return; michael@0: } michael@0: michael@0: if (legacy) { michael@0: var self = this; michael@0: process.nextTick(function() { michael@0: self.emit('data', data); michael@0: if (self.ondata) { michael@0: self.ondata(data, 0, data.length); michael@0: } michael@0: }); michael@0: } else { michael@0: // Right now, http module expects socket to be working in streams1 mode. michael@0: if (this.ondata) { michael@0: this.ondata(data, 0, data.length); michael@0: } else { michael@0: this.push(data); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function _read (bytes, cb) michael@0: // #### @bytes {Number} number of bytes to read michael@0: // Streams2 API michael@0: // michael@0: Stream.prototype._read = function read(bytes) { michael@0: // NOP michael@0: }; michael@0: michael@0: // michael@0: // ### function _updateSinkSize (size) michael@0: // #### @size {Integer} michael@0: // Update the internal data transfer window michael@0: // michael@0: Stream.prototype._updateSinkSize = function _updateSinkSize(size) { michael@0: var diff = size - this._initialSinkSize; michael@0: michael@0: this._initialSinkSize = size; michael@0: this._drainSink(diff); michael@0: }; michael@0: michael@0: // michael@0: // `net` compatibility layer michael@0: // (Copy pasted from lib/tls.js from node.js) michael@0: // michael@0: Stream.prototype.address = function address() { michael@0: return this.socket && this.socket.address(); michael@0: }; michael@0: michael@0: Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() { michael@0: return this.socket && this.socket.remoteAddress; michael@0: }); michael@0: michael@0: Stream.prototype.__defineGetter__('remotePort', function remotePort() { michael@0: return this.socket && this.socket.remotePort; michael@0: });