testing/xpcshell/node-spdy/lib/spdy/server.js

Wed, 31 Dec 2014 06:55:46 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:46 +0100
changeset 1
ca08bd8f51b2
permissions
-rw-r--r--

Added tag TORBROWSER_REPLICA for changeset 6474c204b198

     1 var spdy = require('../spdy'),
     2     util = require('util'),
     3     https = require('https'),
     4     stream = require('stream'),
     5     Buffer = require('buffer').Buffer;
     7 var crlf = new Buffer('\r\n');
     8 var last_frag = new Buffer('0\r\n\r\n');
    10 var legacy = !stream.Duplex;
    12 if (legacy) {
    13   var DuplexStream = stream;
    14 } else {
    15   var DuplexStream = stream.Duplex;
    16 }
    18 //
    19 // ### function instantiate (HTTPSServer)
    20 // #### @HTTPSServer {https.Server|Function} Base server class
    21 // Will return constructor for SPDY Server, based on the HTTPSServer class
    22 //
    23 function instantiate(HTTPSServer) {
    24   //
    25   // ### function Server (options, requestListener)
    26   // #### @options {Object} tls server options
    27   // #### @requestListener {Function} (optional) request callback
    28   // SPDY Server @constructor
    29   //
    30   function Server(options, requestListener) {
    31     // Initialize
    32     this._init(HTTPSServer, options, requestListener);
    34     // Wrap connection handler
    35     this._wrap();
    36   };
    37   util.inherits(Server, HTTPSServer);
    39   // Copy prototype methods
    40   Object.keys(proto).forEach(function(key) {
    41     this[key] = proto[key];
    42   }, Server.prototype);
    44   return Server;
    45 }
    46 exports.instantiate = instantiate;
    48 // Common prototype for all servers
    49 var proto = {};
    51 //
    52 // ### function _init(base, options, listener)
    53 // #### @base {Function} (optional) base server class (https.Server)
    54 // #### @options {Object} tls server options
    55 // #### @handler {Function} (optional) request handler
    56 // Initializer.
    57 //
    58 proto._init = function _init(base, options, handler) {
    59   var state = {};
    60   this._spdyState = state;
    62   if (!options) options = {};
    63   if (!options.maxStreams) options.maxStreams = 100;
    64   if (!options.sinkSize) {
    65     options.sinkSize = 1 << 16;
    66   }
    67   if (!options.windowSize) {
    68     options.windowSize = 1 << 20; // 1mb
    69   }
    71   options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
    72   options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
    73   state.options = options;
    74   state.reqHandler = handler;
    76   if (options.plain && !options.ssl) {
    77     base.call(this, handler);
    78   } else {
    79     base.call(this, options, handler);
    80   }
    82   // Use https if NPN is not supported
    83   if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) {
    84     return;
    85   }
    86 };
    88 //
    89 // ### function _wrap()
    90 // Wrap connection handler and add logic.
    91 //
    92 proto._wrap = function _wrap() {
    93   var self = this,
    94       state = this._spdyState;
    96   // Wrap connection handler
    97   var event = state.options.plain && !state.options.ssl ? 'connection' :
    98                                                           'secureConnection',
    99       handler = this.listeners(event)[0];
   101   state.pool = spdy.zlibpool.create();
   102   state.handler = handler;
   104   this.removeAllListeners(event);
   106   // Normal mode, use NPN to fallback to HTTPS
   107   if (!state.options.plain) {
   108     return this.on(event, this._onConnection.bind(this));
   109   }
   111   // In case of plain connection, we must fallback to HTTPS if first byte
   112   // is not equal to 0x80.
   113   this.on(event, function(socket) {
   114     var history = [],
   115         _emit = socket.emit;
   117     // Add 'data' listener, otherwise 'data' events won't be emitted
   118     if (legacy) {
   119       function ondata() {};
   120       socket.once('data', ondata);
   121     }
   123     // 2 minutes timeout, as http.js does by default
   124     socket.setTimeout(self.timeout || 2 * 60 * 1000);
   126     socket.emit = function emit(event, data) {
   127       history.push(Array.prototype.slice.call(arguments));
   129       if (event === 'data') {
   130         // Legacy
   131         onFirstByte.call(socket, data);
   132       } else if (event === 'readable') {
   133         // Streams
   134         onReadable.call(socket);
   135       } else if (event === 'end' ||
   136                  event === 'close' ||
   137                  event === 'error' ||
   138                  event === 'timeout') {
   139         // We shouldn't get there if any data was received
   140         fail();
   141       }
   142     };
   144     function fail() {
   145       socket.emit = _emit;
   146       history = null;
   147       try {
   148         socket.destroy();
   149       } catch (e) {
   150       }
   151     }
   153     function restore() {
   154       var copy = history.slice();
   155       history = null;
   157       if (legacy) socket.removeListener('data', ondata);
   158       socket.emit = _emit;
   159       for (var i = 0; i < copy.length; i++) {
   160         socket.emit.apply(socket, copy[i]);
   161         if (copy[i][0] === 'end') {
   162           if (socket.onend) socket.onend();
   163         }
   164       }
   165     }
   167     function onFirstByte(data) {
   168       // Ignore empty packets
   169       if (data.length === 0) return;
   171       if (data[0] === 0x80) {
   172         self._onConnection(socket);
   173       } else  {
   174         handler.call(self, socket);
   175       }
   177       // Fire events
   178       restore();
   180       // NOTE: If we came there - .ondata() will be called anyway in this tick,
   181       // so there're no need to call it manually
   182     };
   184     if (!legacy) {
   185       // Hack to make streams2 work properly
   186       socket.on('readable', onReadable);
   187     }
   189     function onReadable() {
   190       var data = socket.read(1);
   192       // Ignore empty packets
   193       if (!data) return;
   194       socket.removeListener('readable', onReadable);
   196       // `.unshift()` emits `readable` event. Thus `emit` method should
   197       // be restored before calling it.
   198       socket.emit = _emit;
   200       // Put packet back where it was before
   201       socket.unshift(data);
   203       if (data[0] === 0x80) {
   204         self._onConnection(socket);
   205       } else  {
   206         handler.call(self, socket);
   207       }
   209       // Fire events
   210       restore();
   212       if (socket.ondata) {
   213         data = socket.read(socket._readableState.length);
   214         if (data) socket.ondata(data, 0, data.length);
   215       }
   216     }
   217   });
   218 };
   220 //
   221 // ### function _onConnection (socket)
   222 // #### @socket {Stream} incoming socket
   223 // Server's connection handler wrapper.
   224 //
   225 proto._onConnection = function _onConnection(socket) {
   226   var self = this,
   227       state = this._spdyState;
   229   // Fallback to HTTPS if needed
   230   var selectedProtocol = socket.npnProtocol || socket.alpnProtocol;
   231   if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) &&
   232       !state.options.debug && !state.options.plain) {
   233     return state.handler.call(this, socket);
   234   }
   236   // Wrap incoming socket into abstract class
   237   var connection = new Connection(socket, state.pool, state.options);
   239   // Emulate each stream like connection
   240   connection.on('stream', state.handler);
   242   connection.on('connect', function onconnect(req, socket) {
   243     socket.streamID = req.streamID = req.socket.id;
   244     socket.isSpdy = req.isSpdy = true;
   245     socket.spdyVersion = req.spdyVersion = req.socket.version;
   247     socket.once('finish', function onfinish() {
   248       req.connection.end();
   249     });
   251     self.emit('connect', req, socket);
   252   });
   254   connection.on('request', function onrequest(req, res) {
   255     res._renderHeaders = spdy.response._renderHeaders;
   256     res.writeHead = spdy.response.writeHead;
   257     res.push = spdy.response.push;
   258     res.streamID = req.streamID = req.socket.id;
   259     res.spdyVersion = req.spdyVersion = req.socket.version;
   260     res.isSpdy = req.isSpdy = true;
   262     // Chunked encoding is not supported in SPDY
   263     res.useChunkedEncodingByDefault = false;
   265     res.once('finish', function onfinish() {
   266       req.connection.end();
   267     });
   269     self.emit('request', req, res);
   270   });
   272   connection.on('error', function onerror(e) {
   273     console.log('[secureConnection] error ' + e);
   274     socket.destroy(e.errno === 'EPIPE' ? undefined : e);
   275   });
   276 };
   278 // Export Server instantiated from https.Server
   279 var Server = instantiate(https.Server);
   280 exports.Server = Server;
   282 //
   283 // ### function create (base, options, requestListener)
   284 // #### @base {Function} (optional) base server class (https.Server)
   285 // #### @options {Object} tls server options
   286 // #### @requestListener {Function} (optional) request callback
   287 // @constructor wrapper
   288 //
   289 exports.create = function create(base, options, requestListener) {
   290   var server;
   291   if (typeof base === 'function') {
   292     server = instantiate(base);
   293   } else {
   294     server = Server;
   296     requestListener = options;
   297     options = base;
   298     base = null;
   299   }
   301   return new server(options, requestListener);
   302 };
   304 //
   305 // ### function Connection (socket, pool, options)
   306 // #### @socket {net.Socket} server's connection
   307 // #### @pool {spdy.ZlibPool} zlib pool
   308 // #### @options {Object} server's options
   309 // Abstract connection @constructor
   310 //
   311 function Connection(socket, pool, options) {
   312   process.EventEmitter.call(this);
   314   var self = this;
   316   this._closed = false;
   318   this.pool = pool;
   319   var pair = null;
   321   this._deflate = null;
   322   this._inflate = null;
   324   this.encrypted = socket.encrypted;
   326   // Init streams list
   327   this.streams = {};
   328   this.streamsCount = 0;
   329   this.pushId = 0;
   330   this._goaway = false;
   332   this._framer = null;
   334   // Data transfer window defaults to 64kb
   335   this.windowSize = options.windowSize;
   336   this.sinkSize = options.sinkSize;
   338   // Initialize scheduler
   339   this.scheduler = spdy.scheduler.create(this);
   341   // Store socket and pipe it to parser
   342   this.socket = socket;
   344   // Initialize parser
   345   this.parser = spdy.parser.create(this);
   346   this.parser.on('frame', function (frame) {
   347     if (this._closed) return;
   349     var stream;
   351     // Create new stream
   352     if (frame.type === 'SYN_STREAM') {
   353       self.streamsCount++;
   355       stream = self.streams[frame.id] = new Stream(self, frame);
   357       // If we reached stream limit
   358       if (self.streamsCount > options.maxStreams) {
   359         stream.once('error', function onerror() {});
   360         // REFUSED_STREAM
   361         stream._rstCode = 3;
   362         stream.destroy(true);
   363       } else {
   364         self.emit('stream', stream);
   366         stream._init();
   367       }
   368     } else {
   369       if (frame.id) {
   370         // Load created one
   371         stream = self.streams[frame.id];
   373         // Fail if not found
   374         if (stream === undefined) {
   375           if (frame.type === 'RST_STREAM') return;
   376           self.write(self._framer.rstFrame(frame.id, 2));
   377           return;
   378         }
   379       }
   381       // Emit 'data' event
   382       if (frame.type === 'DATA') {
   383         if (frame.data.length > 0){
   384           if (stream._closedBy.client) {
   385             stream._rstCode = 2;
   386             stream.emit('error', 'Writing to half-closed stream');
   387           } else {
   388             stream._recv(frame.data);
   389           }
   390         }
   391       // Destroy stream if we was asked to do this
   392       } else if (frame.type === 'RST_STREAM') {
   393         stream._rstCode = 0;
   394         if (frame.status === 5) {
   395           // If client "cancels" connection - close stream and
   396           // all associated push streams without error
   397           stream.pushes.forEach(function(stream) {
   398             stream.close();
   399           });
   400           stream.close();
   401         } else {
   402           // Emit error on destroy
   403           stream.destroy(new Error('Received rst: ' + frame.status));
   404         }
   405       // Respond with same PING
   406       } else if (frame.type === 'PING') {
   407         self.write(self._framer.pingFrame(frame.pingId));
   408       } else if (frame.type === 'SETTINGS') {
   409         self._setDefaultWindow(frame.settings);
   410       } else if (frame.type === 'GOAWAY') {
   411         self._goaway = frame.lastId;
   412       } else if (frame.type === 'WINDOW_UPDATE') {
   413         stream._drainSink(frame.delta);
   414       } else {
   415         console.error('Unknown type: ', frame.type);
   416       }
   417     }
   419     // Handle half-closed
   420     if (frame.fin) {
   421       // Don't allow to close stream twice
   422       if (stream._closedBy.client) {
   423         stream._rstCode = 2;
   424         stream.emit('error', 'Already half-closed');
   425       } else {
   426         stream._closedBy.client = true;
   428         // Emulate last chunked fragment
   429         if (stream._forceChunked) {
   430           stream._recv(last_frag, true);
   431         }
   433         stream._handleClose();
   434       }
   435     }
   436   });
   438   this.parser.on('version', function onversion(version) {
   439     if (!pair) {
   440       pair = pool.get('spdy/' + version);
   441       self._deflate = pair.deflate;
   442       self._inflate = pair.inflate;
   443     }
   444   });
   446   this.parser.on('framer', function onframer(framer) {
   447     // Generate custom settings frame and send
   448     self.write(framer.settingsFrame(options));
   449   });
   451   // Propagate parser errors
   452   this.parser.on('error', function onParserError(err) {
   453     self.emit('error', err);
   454   });
   456   socket.pipe(this.parser);
   458   // 2 minutes socket timeout
   459   socket.setTimeout(2 * 60 * 1000);
   460   socket.once('timeout', function ontimeout() {
   461     socket.destroy();
   462   });
   464   // Allow high-level api to catch socket errors
   465   socket.on('error', function onSocketError(e) {
   466     self.emit('error', e);
   467   });
   469   socket.once('close', function onclose() {
   470     self._closed = true;
   471     if (pair) pool.put(pair);
   472   });
   474   if (legacy) {
   475     socket.on('drain', function ondrain() {
   476       self.emit('drain');
   477     });
   478   }
   479 }
   480 util.inherits(Connection, process.EventEmitter);
   481 exports.Connection = Connection;
   483 //
   484 // ### function write (data, encoding)
   485 // #### @data {String|Buffer} data
   486 // #### @encoding {String} (optional) encoding
   487 // Writes data to socket
   488 //
   489 Connection.prototype.write = function write(data, encoding) {
   490   if (this.socket.writable) {
   491     return this.socket.write(data, encoding);
   492   }
   493 };
   495 //
   496 // ### function _setDefaultWindow (settings)
   497 // #### @settings {Object}
   498 // Update the default transfer window -- in the connection and in the
   499 // active streams
   500 //
   501 Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) {
   502   if (!settings) return;
   503   if (!settings.initial_window_size ||
   504       settings.initial_window_size.persisted) {
   505     return;
   506   }
   508   this.sinkSize = settings.initial_window_size.value;
   510   Object.keys(this.streams).forEach(function(id) {
   511     this.streams[id]._updateSinkSize(settings.initial_window_size.value);
   512   }, this);
   513 };
   515 //
   516 // ### function Stream (connection, frame)
   517 // #### @connection {Connection} SPDY Connection
   518 // #### @frame {Object} SYN_STREAM data
   519 // Abstract stream @constructor
   520 //
   521 function Stream(connection, frame) {
   522   DuplexStream.call(this);
   524   this.connection = connection;
   525   this.socket = connection.socket;
   526   this.encrypted = connection.encrypted;
   527   this._framer = connection._framer;
   528   this._initialized = false;
   530   // Should chunked encoding be forced
   531   this._forceChunked = false;
   533   this.ondata = this.onend = null;
   535   // RST_STREAM code if any
   536   this._rstCode = 1;
   537   this._destroyed = false;
   539   this._closedBy = {
   540     client: false,
   541     server: false
   542   };
   544   // Lock data
   545   this._locked = false;
   546   this._lockBuffer = [];
   548   // Store id
   549   this.id = frame.id;
   550   this.version = frame.version;
   552   // Store priority
   553   this.priority = frame.priority;
   555   // Array of push streams associated to that one
   556   this.pushes = [];
   558   // How much data can be sent TO client before next WINDOW_UPDATE
   559   this._sinkSize = connection.sinkSize;
   560   this._initialSinkSize = connection.sinkSize;
   562   // When data needs to be send, but window is too small for it - it'll be
   563   // queued in this buffer
   564   this._sinkBuffer = [];
   566   // How much data can be sent BY client before next WINDOW_UPDATE
   567   this._initialWindowSize = connection.windowSize;
   568   this._windowSize = connection.windowSize;
   570   // Create compression streams
   571   this._deflate = connection._deflate;
   572   this._inflate = connection._inflate;
   574   // Store headers
   575   this.headers = frame.headers;
   576   this.url = frame.url;
   578   this._frame = frame;
   580   if (legacy) {
   581     this.readable = this.writable = true;
   582   }
   584   // Call .onend()
   585   this.once('end', function() {
   586     var self = this;
   587     process.nextTick(function() {
   588       if (self.onend) self.onend();
   589     });
   590   });
   592   // Handle half-close
   593   this.once('finish', function() {
   594     this._writeData(true, []);
   595     this._closedBy.server = true;
   596     if (this._sinkBuffer.length !== 0) return;
   597     this._handleClose();
   598   });
   599 };
   600 util.inherits(Stream, DuplexStream);
   601 exports.Stream = Stream;
   603 if (legacy) {
   604   Stream.prototype.pause = function pause() {};
   605   Stream.prototype.resume = function resume() {};
   606 }
   608 //
   609 // ### function _isGoaway ()
   610 // Returns true if any writes to that stream should be ignored
   611 //
   612 Stream.prototype._isGoaway = function _isGoaway() {
   613   return this.connection._goaway && this.id > this.connection._goaway;
   614 };
   616 //
   617 // ### function init ()
   618 // Initialize stream, internal
   619 //
   620 Stream.prototype._init = function init() {
   621   var headers = this.headers,
   622       req = [headers.method + ' ' + this.url + ' ' + headers.version];
   624   Object.keys(headers).forEach(function (key) {
   625     if (key !== 'method' && key !== 'url' && key !== 'version' &&
   626         key !== 'scheme') {
   627       req.push(key + ': ' + headers[key]);
   628     }
   629   });
   631   // Force chunked encoding
   632   if (!headers['content-length'] && !headers['transfer-encoding']) {
   633     req.push('Transfer-Encoding: chunked');
   634     this._forceChunked = true;
   635   }
   637   // Add '\r\n\r\n'
   638   req.push('', '');
   640   req = new Buffer(req.join('\r\n'));
   642   this._recv(req, true);
   643   this._initialized = true;
   644 };
   646 //
   647 // ### function lock (callback)
   648 // #### @callback {Function} continuation callback
   649 // Acquire lock
   650 //
   651 Stream.prototype._lock = function lock(callback) {
   652   if (!callback) return;
   654   if (this._locked) {
   655     this._lockBuffer.push(callback);
   656   } else {
   657     this._locked = true;
   658     callback.call(this, null);
   659   }
   660 };
   662 //
   663 // ### function unlock ()
   664 // Release lock and call all buffered callbacks
   665 //
   666 Stream.prototype._unlock = function unlock() {
   667   if (this._locked) {
   668     this._locked = false;
   669     this._lock(this._lockBuffer.shift());
   670   }
   671 };
   673 //
   674 // ### function setTimeout ()
   675 // TODO: use timers.enroll, timers.active, timers.unenroll
   676 //
   677 Stream.prototype.setTimeout = function setTimeout(time) {};
   679 //
   680 // ### function _handleClose ()
   681 // Close stream if it was closed by both server and client
   682 //
   683 Stream.prototype._handleClose = function _handleClose() {
   684   if (this._closedBy.client && this._closedBy.server) {
   685     this.close();
   686   }
   687 };
   689 //
   690 // ### function close ()
   691 // Destroys stream
   692 //
   693 Stream.prototype.close = function close() {
   694   this.destroy();
   695 };
   697 //
   698 // ### function destroy (error)
   699 // #### @error {Error} (optional) error
   700 // Destroys stream
   701 //
   702 Stream.prototype.destroy = function destroy(error) {
   703   if (this._destroyed) return;
   704   this._destroyed = true;
   706   delete this.connection.streams[this.id];
   707   if (this.id % 2 === 1) {
   708     this.connection.streamsCount--;
   709   }
   711   // If stream is not finished, RST frame should be sent to notify client
   712   // about sudden stream termination.
   713   if (error || !this._closedBy.server) {
   714     // REFUSED_STREAM if terminated before 'finish' event
   715     if (!this._closedBy.server) this._rstCode = 3;
   717     if (this._rstCode) {
   718       this._lock(function() {
   719         this.connection.scheduler.schedule(
   720           this,
   721           this._framer.rstFrame(this.id, this._rstCode));
   722         this.connection.scheduler.tick();
   724         this._unlock();
   725       });
   726     }
   727   }
   729   if (legacy) {
   730     this.emit('end');
   731   } else {
   732     this.push(null);
   733   }
   735   if (error) this.emit('error', error);
   737   var self = this;
   738   process.nextTick(function() {
   739     self.emit('close', !!error);
   740   });
   741 };
   743 Stream.prototype.destroySoon = function destroySoon(error) {
   744   return this.destroy(error);
   745 };
   747 Stream.prototype._drainSink = function _drainSink(size) {
   748   var oldBuffer = this._sinkBuffer;
   749   this._sinkBuffer = [];
   751   this._sinkSize += size;
   753   for (var i = 0; i < oldBuffer.length; i++) {
   754     this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]);
   755   }
   757   // Handle half-close
   758   if (this._sinkBuffer.length === 0 && this._closedBy.server) {
   759     this._handleClose();
   760   }
   762   if (legacy) this.emit('drain');
   763 };
   765 //
   766 // ### function _writeData (fin, buffer, cb)
   767 // #### @fin {Boolean}
   768 // #### @buffer {Buffer}
   769 // #### @cb {Function} **optional**
   770 // Internal function
   771 //
   772 Stream.prototype._writeData = function _writeData(fin, buffer, cb) {
   773   if (this._framer.version === 3) {
   774     // Window was exhausted, queue data
   775     if (this._sinkSize <= 0) {
   776       this._sinkBuffer.push([fin, buffer, cb]);
   777       return false;
   778     }
   780     var len = Math.min(this._sinkSize, buffer.length);
   781     this._sinkSize -= len;
   783     // Only partial write is possible, queue rest for later
   784     if (len < buffer.length) {
   785       this._sinkBuffer.push([fin, buffer.slice(len)]);
   786       buffer = buffer.slice(0, len);
   787       fin = false;
   788     }
   789   }
   791   this._lock(function() {
   792     var stream = this,
   793         frame = this._framer.dataFrame(this.id, fin, buffer);
   795     stream.connection.scheduler.schedule(stream, frame);
   796     stream.connection.scheduler.tick();
   798     this._unlock();
   800     if (cb) cb();
   801   });
   803   return true;
   804 };
   806 //
   807 // ### function write (data, encoding)
   808 // #### @data {Buffer|String} data
   809 // #### @encoding {String} data encoding
   810 // Writes data to connection
   811 //
   812 Stream.prototype._write = function write(data, encoding, cb) {
   813   // Do not send data to new connections after GOAWAY
   814   if (this._isGoaway()) {
   815     if (cb) cb();
   816     return false;
   817   }
   819   return this._writeData(false, data, cb);
   820 };
   822 if (legacy) {
   823   Stream.prototype.write = function write(data, encoding, cb) {
   824     if (!Buffer.isBuffer(data)) {
   825       return this._write(new Buffer(data, encoding), null, cb);
   826     } else {
   827       return this._write(data, encoding, cb);
   828     }
   829   };
   831   //
   832   // ### function end (data)
   833   // #### @data {Buffer|String} (optional) data to write before ending stream
   834   // #### @encoding {String} (optional) string encoding
   835   // Send FIN data frame
   836   //
   837   Stream.prototype.end = function end(data, encoding) {
   838     // Do not send data to new connections after GOAWAY
   839     if (this._isGoaway()) return;
   841     if (data) this.write(data, encoding);
   842     this.emit('finish');
   843   };
   844 }
   846 //
   847 // ### function _recv (data)
   848 // #### @data {Buffer} buffer to receive
   849 // #### @chunked {Boolean}
   850 // (internal)
   851 //
   852 Stream.prototype._recv = function _recv(data, chunked) {
   853   // Update window if exhausted
   854   if (!chunked && this._framer.version >= 3 && this._initialized) {
   855     this._windowSize -= data.length;
   857     if (this._windowSize <= 0) {
   858       var delta = this._initialWindowSize - this._windowSize;
   859       this._windowSize += delta;
   860       this.connection.write(this._framer.windowUpdateFrame(this.id, delta));
   861     }
   862   }
   864   // Emulate chunked encoding
   865   if (this._forceChunked && !chunked) {
   866     // Zero-chunks are treated as end, do not emit them
   867     if (data.length === 0) return;
   869     this._recv(new Buffer(data.length.toString(16)), true);
   870     this._recv(crlf, true);
   871     this._recv(data, true);
   872     this._recv(crlf, true);
   873     return;
   874   }
   876   if (legacy) {
   877     var self = this;
   878     process.nextTick(function() {
   879       self.emit('data', data);
   880       if (self.ondata) {
   881         self.ondata(data, 0, data.length);
   882       }
   883     });
   884   } else {
   885     // Right now, http module expects socket to be working in streams1 mode.
   886     if (this.ondata) {
   887       this.ondata(data, 0, data.length);
   888     } else {
   889       this.push(data);
   890     }
   891   }
   892 };
   894 //
   895 // ### function _read (bytes, cb)
   896 // #### @bytes {Number} number of bytes to read
   897 // Streams2 API
   898 //
   899 Stream.prototype._read = function read(bytes) {
   900   // NOP
   901 };
   903 //
   904 // ### function _updateSinkSize (size)
   905 // #### @size {Integer}
   906 // Update the internal data transfer window
   907 //
   908 Stream.prototype._updateSinkSize = function _updateSinkSize(size) {
   909   var diff = size - this._initialSinkSize;
   911   this._initialSinkSize = size;
   912   this._drainSink(diff);
   913 };
   915 //
   916 // `net` compatibility layer
   917 // (Copy pasted from lib/tls.js from node.js)
   918 //
   919 Stream.prototype.address = function address() {
   920   return this.socket && this.socket.address();
   921 };
   923 Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() {
   924   return this.socket && this.socket.remoteAddress;
   925 });
   927 Stream.prototype.__defineGetter__('remotePort', function remotePort() {
   928   return this.socket && this.socket.remotePort;
   929 });

mercurial