testing/xpcshell/node-spdy/lib/spdy/server.js

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/testing/xpcshell/node-spdy/lib/spdy/server.js	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,929 @@
     1.4 +var spdy = require('../spdy'),
     1.5 +    util = require('util'),
     1.6 +    https = require('https'),
     1.7 +    stream = require('stream'),
     1.8 +    Buffer = require('buffer').Buffer;
     1.9 +
    1.10 +var crlf = new Buffer('\r\n');
    1.11 +var last_frag = new Buffer('0\r\n\r\n');
    1.12 +
    1.13 +var legacy = !stream.Duplex;
    1.14 +
    1.15 +if (legacy) {
    1.16 +  var DuplexStream = stream;
    1.17 +} else {
    1.18 +  var DuplexStream = stream.Duplex;
    1.19 +}
    1.20 +
    1.21 +//
    1.22 +// ### function instantiate (HTTPSServer)
    1.23 +// #### @HTTPSServer {https.Server|Function} Base server class
    1.24 +// Will return constructor for SPDY Server, based on the HTTPSServer class
    1.25 +//
    1.26 +function instantiate(HTTPSServer) {
    1.27 +  //
    1.28 +  // ### function Server (options, requestListener)
    1.29 +  // #### @options {Object} tls server options
    1.30 +  // #### @requestListener {Function} (optional) request callback
    1.31 +  // SPDY Server @constructor
    1.32 +  //
    1.33 +  function Server(options, requestListener) {
    1.34 +    // Initialize
    1.35 +    this._init(HTTPSServer, options, requestListener);
    1.36 +
    1.37 +    // Wrap connection handler
    1.38 +    this._wrap();
    1.39 +  };
    1.40 +  util.inherits(Server, HTTPSServer);
    1.41 +
    1.42 +  // Copy prototype methods
    1.43 +  Object.keys(proto).forEach(function(key) {
    1.44 +    this[key] = proto[key];
    1.45 +  }, Server.prototype);
    1.46 +
    1.47 +  return Server;
    1.48 +}
    1.49 +exports.instantiate = instantiate;
    1.50 +
    1.51 +// Common prototype for all servers
    1.52 +var proto = {};
    1.53 +
    1.54 +//
    1.55 +// ### function _init(base, options, listener)
    1.56 +// #### @base {Function} (optional) base server class (https.Server)
    1.57 +// #### @options {Object} tls server options
    1.58 +// #### @handler {Function} (optional) request handler
    1.59 +// Initializer.
    1.60 +//
    1.61 +proto._init = function _init(base, options, handler) {
    1.62 +  var state = {};
    1.63 +  this._spdyState = state;
    1.64 +
    1.65 +  if (!options) options = {};
    1.66 +  if (!options.maxStreams) options.maxStreams = 100;
    1.67 +  if (!options.sinkSize) {
    1.68 +    options.sinkSize = 1 << 16;
    1.69 +  }
    1.70 +  if (!options.windowSize) {
    1.71 +    options.windowSize = 1 << 20; // 1mb
    1.72 +  }
    1.73 +
    1.74 +  options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
    1.75 +  options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
    1.76 +  state.options = options;
    1.77 +  state.reqHandler = handler;
    1.78 +
    1.79 +  if (options.plain && !options.ssl) {
    1.80 +    base.call(this, handler);
    1.81 +  } else {
    1.82 +    base.call(this, options, handler);
    1.83 +  }
    1.84 +
    1.85 +  // Use https if NPN is not supported
    1.86 +  if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) {
    1.87 +    return;
    1.88 +  }
    1.89 +};
    1.90 +
    1.91 +//
    1.92 +// ### function _wrap()
    1.93 +// Wrap connection handler and add logic.
    1.94 +//
    1.95 +proto._wrap = function _wrap() {
    1.96 +  var self = this,
    1.97 +      state = this._spdyState;
    1.98 +
    1.99 +  // Wrap connection handler
   1.100 +  var event = state.options.plain && !state.options.ssl ? 'connection' :
   1.101 +                                                          'secureConnection',
   1.102 +      handler = this.listeners(event)[0];
   1.103 +
   1.104 +  state.pool = spdy.zlibpool.create();
   1.105 +  state.handler = handler;
   1.106 +
   1.107 +  this.removeAllListeners(event);
   1.108 +
   1.109 +  // Normal mode, use NPN to fallback to HTTPS
   1.110 +  if (!state.options.plain) {
   1.111 +    return this.on(event, this._onConnection.bind(this));
   1.112 +  }
   1.113 +
   1.114 +  // In case of plain connection, we must fallback to HTTPS if first byte
   1.115 +  // is not equal to 0x80.
   1.116 +  this.on(event, function(socket) {
   1.117 +    var history = [],
   1.118 +        _emit = socket.emit;
   1.119 +
   1.120 +    // Add 'data' listener, otherwise 'data' events won't be emitted
   1.121 +    if (legacy) {
   1.122 +      function ondata() {};
   1.123 +      socket.once('data', ondata);
   1.124 +    }
   1.125 +
   1.126 +    // 2 minutes timeout, as http.js does by default
   1.127 +    socket.setTimeout(self.timeout || 2 * 60 * 1000);
   1.128 +
   1.129 +    socket.emit = function emit(event, data) {
   1.130 +      history.push(Array.prototype.slice.call(arguments));
   1.131 +
   1.132 +      if (event === 'data') {
   1.133 +        // Legacy
   1.134 +        onFirstByte.call(socket, data);
   1.135 +      } else if (event === 'readable') {
   1.136 +        // Streams
   1.137 +        onReadable.call(socket);
   1.138 +      } else if (event === 'end' ||
   1.139 +                 event === 'close' ||
   1.140 +                 event === 'error' ||
   1.141 +                 event === 'timeout') {
   1.142 +        // We shouldn't get there if any data was received
   1.143 +        fail();
   1.144 +      }
   1.145 +    };
   1.146 +
   1.147 +    function fail() {
   1.148 +      socket.emit = _emit;
   1.149 +      history = null;
   1.150 +      try {
   1.151 +        socket.destroy();
   1.152 +      } catch (e) {
   1.153 +      }
   1.154 +    }
   1.155 +
   1.156 +    function restore() {
   1.157 +      var copy = history.slice();
   1.158 +      history = null;
   1.159 +
   1.160 +      if (legacy) socket.removeListener('data', ondata);
   1.161 +      socket.emit = _emit;
   1.162 +      for (var i = 0; i < copy.length; i++) {
   1.163 +        socket.emit.apply(socket, copy[i]);
   1.164 +        if (copy[i][0] === 'end') {
   1.165 +          if (socket.onend) socket.onend();
   1.166 +        }
   1.167 +      }
   1.168 +    }
   1.169 +
   1.170 +    function onFirstByte(data) {
   1.171 +      // Ignore empty packets
   1.172 +      if (data.length === 0) return;
   1.173 +
   1.174 +      if (data[0] === 0x80) {
   1.175 +        self._onConnection(socket);
   1.176 +      } else  {
   1.177 +        handler.call(self, socket);
   1.178 +      }
   1.179 +
   1.180 +      // Fire events
   1.181 +      restore();
   1.182 +
   1.183 +      // NOTE: If we came there - .ondata() will be called anyway in this tick,
   1.184 +      // so there're no need to call it manually
   1.185 +    };
   1.186 +
   1.187 +    if (!legacy) {
   1.188 +      // Hack to make streams2 work properly
   1.189 +      socket.on('readable', onReadable);
   1.190 +    }
   1.191 +
   1.192 +    function onReadable() {
   1.193 +      var data = socket.read(1);
   1.194 +
   1.195 +      // Ignore empty packets
   1.196 +      if (!data) return;
   1.197 +      socket.removeListener('readable', onReadable);
   1.198 +
   1.199 +      // `.unshift()` emits `readable` event. Thus `emit` method should
   1.200 +      // be restored before calling it.
   1.201 +      socket.emit = _emit;
   1.202 +
   1.203 +      // Put packet back where it was before
   1.204 +      socket.unshift(data);
   1.205 +
   1.206 +      if (data[0] === 0x80) {
   1.207 +        self._onConnection(socket);
   1.208 +      } else  {
   1.209 +        handler.call(self, socket);
   1.210 +      }
   1.211 +
   1.212 +      // Fire events
   1.213 +      restore();
   1.214 +
   1.215 +      if (socket.ondata) {
   1.216 +        data = socket.read(socket._readableState.length);
   1.217 +        if (data) socket.ondata(data, 0, data.length);
   1.218 +      }
   1.219 +    }
   1.220 +  });
   1.221 +};
   1.222 +
   1.223 +//
   1.224 +// ### function _onConnection (socket)
   1.225 +// #### @socket {Stream} incoming socket
   1.226 +// Server's connection handler wrapper.
   1.227 +//
   1.228 +proto._onConnection = function _onConnection(socket) {
   1.229 +  var self = this,
   1.230 +      state = this._spdyState;
   1.231 +
   1.232 +  // Fallback to HTTPS if needed
   1.233 +  var selectedProtocol = socket.npnProtocol || socket.alpnProtocol;
   1.234 +  if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) &&
   1.235 +      !state.options.debug && !state.options.plain) {
   1.236 +    return state.handler.call(this, socket);
   1.237 +  }
   1.238 +
   1.239 +  // Wrap incoming socket into abstract class
   1.240 +  var connection = new Connection(socket, state.pool, state.options);
   1.241 +
   1.242 +  // Emulate each stream like connection
   1.243 +  connection.on('stream', state.handler);
   1.244 +
   1.245 +  connection.on('connect', function onconnect(req, socket) {
   1.246 +    socket.streamID = req.streamID = req.socket.id;
   1.247 +    socket.isSpdy = req.isSpdy = true;
   1.248 +    socket.spdyVersion = req.spdyVersion = req.socket.version;
   1.249 +
   1.250 +    socket.once('finish', function onfinish() {
   1.251 +      req.connection.end();
   1.252 +    });
   1.253 +
   1.254 +    self.emit('connect', req, socket);
   1.255 +  });
   1.256 +
   1.257 +  connection.on('request', function onrequest(req, res) {
   1.258 +    res._renderHeaders = spdy.response._renderHeaders;
   1.259 +    res.writeHead = spdy.response.writeHead;
   1.260 +    res.push = spdy.response.push;
   1.261 +    res.streamID = req.streamID = req.socket.id;
   1.262 +    res.spdyVersion = req.spdyVersion = req.socket.version;
   1.263 +    res.isSpdy = req.isSpdy = true;
   1.264 +
   1.265 +    // Chunked encoding is not supported in SPDY
   1.266 +    res.useChunkedEncodingByDefault = false;
   1.267 +
   1.268 +    res.once('finish', function onfinish() {
   1.269 +      req.connection.end();
   1.270 +    });
   1.271 +
   1.272 +    self.emit('request', req, res);
   1.273 +  });
   1.274 +
   1.275 +  connection.on('error', function onerror(e) {
   1.276 +    console.log('[secureConnection] error ' + e);
   1.277 +    socket.destroy(e.errno === 'EPIPE' ? undefined : e);
   1.278 +  });
   1.279 +};
   1.280 +
   1.281 +// Export Server instantiated from https.Server
   1.282 +var Server = instantiate(https.Server);
   1.283 +exports.Server = Server;
   1.284 +
   1.285 +//
   1.286 +// ### function create (base, options, requestListener)
   1.287 +// #### @base {Function} (optional) base server class (https.Server)
   1.288 +// #### @options {Object} tls server options
   1.289 +// #### @requestListener {Function} (optional) request callback
   1.290 +// @constructor wrapper
   1.291 +//
   1.292 +exports.create = function create(base, options, requestListener) {
   1.293 +  var server;
   1.294 +  if (typeof base === 'function') {
   1.295 +    server = instantiate(base);
   1.296 +  } else {
   1.297 +    server = Server;
   1.298 +
   1.299 +    requestListener = options;
   1.300 +    options = base;
   1.301 +    base = null;
   1.302 +  }
   1.303 +
   1.304 +  return new server(options, requestListener);
   1.305 +};
   1.306 +
   1.307 +//
   1.308 +// ### function Connection (socket, pool, options)
   1.309 +// #### @socket {net.Socket} server's connection
   1.310 +// #### @pool {spdy.ZlibPool} zlib pool
   1.311 +// #### @options {Object} server's options
   1.312 +// Abstract connection @constructor
   1.313 +//
   1.314 +function Connection(socket, pool, options) {
   1.315 +  process.EventEmitter.call(this);
   1.316 +
   1.317 +  var self = this;
   1.318 +
   1.319 +  this._closed = false;
   1.320 +
   1.321 +  this.pool = pool;
   1.322 +  var pair = null;
   1.323 +
   1.324 +  this._deflate = null;
   1.325 +  this._inflate = null;
   1.326 +
   1.327 +  this.encrypted = socket.encrypted;
   1.328 +
   1.329 +  // Init streams list
   1.330 +  this.streams = {};
   1.331 +  this.streamsCount = 0;
   1.332 +  this.pushId = 0;
   1.333 +  this._goaway = false;
   1.334 +
   1.335 +  this._framer = null;
   1.336 +
   1.337 +  // Data transfer window defaults to 64kb
   1.338 +  this.windowSize = options.windowSize;
   1.339 +  this.sinkSize = options.sinkSize;
   1.340 +
   1.341 +  // Initialize scheduler
   1.342 +  this.scheduler = spdy.scheduler.create(this);
   1.343 +
   1.344 +  // Store socket and pipe it to parser
   1.345 +  this.socket = socket;
   1.346 +
   1.347 +  // Initialize parser
   1.348 +  this.parser = spdy.parser.create(this);
   1.349 +  this.parser.on('frame', function (frame) {
   1.350 +    if (this._closed) return;
   1.351 +
   1.352 +    var stream;
   1.353 +
   1.354 +    // Create new stream
   1.355 +    if (frame.type === 'SYN_STREAM') {
   1.356 +      self.streamsCount++;
   1.357 +
   1.358 +      stream = self.streams[frame.id] = new Stream(self, frame);
   1.359 +
   1.360 +      // If we reached stream limit
   1.361 +      if (self.streamsCount > options.maxStreams) {
   1.362 +        stream.once('error', function onerror() {});
   1.363 +        // REFUSED_STREAM
   1.364 +        stream._rstCode = 3;
   1.365 +        stream.destroy(true);
   1.366 +      } else {
   1.367 +        self.emit('stream', stream);
   1.368 +
   1.369 +        stream._init();
   1.370 +      }
   1.371 +    } else {
   1.372 +      if (frame.id) {
   1.373 +        // Load created one
   1.374 +        stream = self.streams[frame.id];
   1.375 +
   1.376 +        // Fail if not found
   1.377 +        if (stream === undefined) {
   1.378 +          if (frame.type === 'RST_STREAM') return;
   1.379 +          self.write(self._framer.rstFrame(frame.id, 2));
   1.380 +          return;
   1.381 +        }
   1.382 +      }
   1.383 +
   1.384 +      // Emit 'data' event
   1.385 +      if (frame.type === 'DATA') {
   1.386 +        if (frame.data.length > 0){
   1.387 +          if (stream._closedBy.client) {
   1.388 +            stream._rstCode = 2;
   1.389 +            stream.emit('error', 'Writing to half-closed stream');
   1.390 +          } else {
   1.391 +            stream._recv(frame.data);
   1.392 +          }
   1.393 +        }
   1.394 +      // Destroy stream if we was asked to do this
   1.395 +      } else if (frame.type === 'RST_STREAM') {
   1.396 +        stream._rstCode = 0;
   1.397 +        if (frame.status === 5) {
   1.398 +          // If client "cancels" connection - close stream and
   1.399 +          // all associated push streams without error
   1.400 +          stream.pushes.forEach(function(stream) {
   1.401 +            stream.close();
   1.402 +          });
   1.403 +          stream.close();
   1.404 +        } else {
   1.405 +          // Emit error on destroy
   1.406 +          stream.destroy(new Error('Received rst: ' + frame.status));
   1.407 +        }
   1.408 +      // Respond with same PING
   1.409 +      } else if (frame.type === 'PING') {
   1.410 +        self.write(self._framer.pingFrame(frame.pingId));
   1.411 +      } else if (frame.type === 'SETTINGS') {
   1.412 +        self._setDefaultWindow(frame.settings);
   1.413 +      } else if (frame.type === 'GOAWAY') {
   1.414 +        self._goaway = frame.lastId;
   1.415 +      } else if (frame.type === 'WINDOW_UPDATE') {
   1.416 +        stream._drainSink(frame.delta);
   1.417 +      } else {
   1.418 +        console.error('Unknown type: ', frame.type);
   1.419 +      }
   1.420 +    }
   1.421 +
   1.422 +    // Handle half-closed
   1.423 +    if (frame.fin) {
   1.424 +      // Don't allow to close stream twice
   1.425 +      if (stream._closedBy.client) {
   1.426 +        stream._rstCode = 2;
   1.427 +        stream.emit('error', 'Already half-closed');
   1.428 +      } else {
   1.429 +        stream._closedBy.client = true;
   1.430 +
   1.431 +        // Emulate last chunked fragment
   1.432 +        if (stream._forceChunked) {
   1.433 +          stream._recv(last_frag, true);
   1.434 +        }
   1.435 +
   1.436 +        stream._handleClose();
   1.437 +      }
   1.438 +    }
   1.439 +  });
   1.440 +
   1.441 +  this.parser.on('version', function onversion(version) {
   1.442 +    if (!pair) {
   1.443 +      pair = pool.get('spdy/' + version);
   1.444 +      self._deflate = pair.deflate;
   1.445 +      self._inflate = pair.inflate;
   1.446 +    }
   1.447 +  });
   1.448 +
   1.449 +  this.parser.on('framer', function onframer(framer) {
   1.450 +    // Generate custom settings frame and send
   1.451 +    self.write(framer.settingsFrame(options));
   1.452 +  });
   1.453 +
   1.454 +  // Propagate parser errors
   1.455 +  this.parser.on('error', function onParserError(err) {
   1.456 +    self.emit('error', err);
   1.457 +  });
   1.458 +
   1.459 +  socket.pipe(this.parser);
   1.460 +
   1.461 +  // 2 minutes socket timeout
   1.462 +  socket.setTimeout(2 * 60 * 1000);
   1.463 +  socket.once('timeout', function ontimeout() {
   1.464 +    socket.destroy();
   1.465 +  });
   1.466 +
   1.467 +  // Allow high-level api to catch socket errors
   1.468 +  socket.on('error', function onSocketError(e) {
   1.469 +    self.emit('error', e);
   1.470 +  });
   1.471 +
   1.472 +  socket.once('close', function onclose() {
   1.473 +    self._closed = true;
   1.474 +    if (pair) pool.put(pair);
   1.475 +  });
   1.476 +
   1.477 +  if (legacy) {
   1.478 +    socket.on('drain', function ondrain() {
   1.479 +      self.emit('drain');
   1.480 +    });
   1.481 +  }
   1.482 +}
   1.483 +util.inherits(Connection, process.EventEmitter);
   1.484 +exports.Connection = Connection;
   1.485 +
   1.486 +//
   1.487 +// ### function write (data, encoding)
   1.488 +// #### @data {String|Buffer} data
   1.489 +// #### @encoding {String} (optional) encoding
   1.490 +// Writes data to socket
   1.491 +//
   1.492 +Connection.prototype.write = function write(data, encoding) {
   1.493 +  if (this.socket.writable) {
   1.494 +    return this.socket.write(data, encoding);
   1.495 +  }
   1.496 +};
   1.497 +
   1.498 +//
   1.499 +// ### function _setDefaultWindow (settings)
   1.500 +// #### @settings {Object}
   1.501 +// Update the default transfer window -- in the connection and in the
   1.502 +// active streams
   1.503 +//
   1.504 +Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) {
   1.505 +  if (!settings) return;
   1.506 +  if (!settings.initial_window_size ||
   1.507 +      settings.initial_window_size.persisted) {
   1.508 +    return;
   1.509 +  }
   1.510 +
   1.511 +  this.sinkSize = settings.initial_window_size.value;
   1.512 +
   1.513 +  Object.keys(this.streams).forEach(function(id) {
   1.514 +    this.streams[id]._updateSinkSize(settings.initial_window_size.value);
   1.515 +  }, this);
   1.516 +};
   1.517 +
   1.518 +//
   1.519 +// ### function Stream (connection, frame)
   1.520 +// #### @connection {Connection} SPDY Connection
   1.521 +// #### @frame {Object} SYN_STREAM data
   1.522 +// Abstract stream @constructor
   1.523 +//
   1.524 +function Stream(connection, frame) {
   1.525 +  DuplexStream.call(this);
   1.526 +
   1.527 +  this.connection = connection;
   1.528 +  this.socket = connection.socket;
   1.529 +  this.encrypted = connection.encrypted;
   1.530 +  this._framer = connection._framer;
   1.531 +  this._initialized = false;
   1.532 +
   1.533 +  // Should chunked encoding be forced
   1.534 +  this._forceChunked = false;
   1.535 +
   1.536 +  this.ondata = this.onend = null;
   1.537 +
   1.538 +  // RST_STREAM code if any
   1.539 +  this._rstCode = 1;
   1.540 +  this._destroyed = false;
   1.541 +
   1.542 +  this._closedBy = {
   1.543 +    client: false,
   1.544 +    server: false
   1.545 +  };
   1.546 +
   1.547 +  // Lock data
   1.548 +  this._locked = false;
   1.549 +  this._lockBuffer = [];
   1.550 +
   1.551 +  // Store id
   1.552 +  this.id = frame.id;
   1.553 +  this.version = frame.version;
   1.554 +
   1.555 +  // Store priority
   1.556 +  this.priority = frame.priority;
   1.557 +
   1.558 +  // Array of push streams associated to that one
   1.559 +  this.pushes = [];
   1.560 +
   1.561 +  // How much data can be sent TO client before next WINDOW_UPDATE
   1.562 +  this._sinkSize = connection.sinkSize;
   1.563 +  this._initialSinkSize = connection.sinkSize;
   1.564 +
   1.565 +  // When data needs to be send, but window is too small for it - it'll be
   1.566 +  // queued in this buffer
   1.567 +  this._sinkBuffer = [];
   1.568 +
   1.569 +  // How much data can be sent BY client before next WINDOW_UPDATE
   1.570 +  this._initialWindowSize = connection.windowSize;
   1.571 +  this._windowSize = connection.windowSize;
   1.572 +
   1.573 +  // Create compression streams
   1.574 +  this._deflate = connection._deflate;
   1.575 +  this._inflate = connection._inflate;
   1.576 +
   1.577 +  // Store headers
   1.578 +  this.headers = frame.headers;
   1.579 +  this.url = frame.url;
   1.580 +
   1.581 +  this._frame = frame;
   1.582 +
   1.583 +  if (legacy) {
   1.584 +    this.readable = this.writable = true;
   1.585 +  }
   1.586 +
   1.587 +  // Call .onend()
   1.588 +  this.once('end', function() {
   1.589 +    var self = this;
   1.590 +    process.nextTick(function() {
   1.591 +      if (self.onend) self.onend();
   1.592 +    });
   1.593 +  });
   1.594 +
   1.595 +  // Handle half-close
   1.596 +  this.once('finish', function() {
   1.597 +    this._writeData(true, []);
   1.598 +    this._closedBy.server = true;
   1.599 +    if (this._sinkBuffer.length !== 0) return;
   1.600 +    this._handleClose();
   1.601 +  });
   1.602 +};
   1.603 +util.inherits(Stream, DuplexStream);
   1.604 +exports.Stream = Stream;
   1.605 +
   1.606 +if (legacy) {
   1.607 +  Stream.prototype.pause = function pause() {};
   1.608 +  Stream.prototype.resume = function resume() {};
   1.609 +}
   1.610 +
   1.611 +//
   1.612 +// ### function _isGoaway ()
   1.613 +// Returns true if any writes to that stream should be ignored
   1.614 +//
   1.615 +Stream.prototype._isGoaway = function _isGoaway() {
   1.616 +  return this.connection._goaway && this.id > this.connection._goaway;
   1.617 +};
   1.618 +
   1.619 +//
   1.620 +// ### function init ()
   1.621 +// Initialize stream, internal
   1.622 +//
   1.623 +Stream.prototype._init = function init() {
   1.624 +  var headers = this.headers,
   1.625 +      req = [headers.method + ' ' + this.url + ' ' + headers.version];
   1.626 +
   1.627 +  Object.keys(headers).forEach(function (key) {
   1.628 +    if (key !== 'method' && key !== 'url' && key !== 'version' &&
   1.629 +        key !== 'scheme') {
   1.630 +      req.push(key + ': ' + headers[key]);
   1.631 +    }
   1.632 +  });
   1.633 +
   1.634 +  // Force chunked encoding
   1.635 +  if (!headers['content-length'] && !headers['transfer-encoding']) {
   1.636 +    req.push('Transfer-Encoding: chunked');
   1.637 +    this._forceChunked = true;
   1.638 +  }
   1.639 +
   1.640 +  // Add '\r\n\r\n'
   1.641 +  req.push('', '');
   1.642 +
   1.643 +  req = new Buffer(req.join('\r\n'));
   1.644 +
   1.645 +  this._recv(req, true);
   1.646 +  this._initialized = true;
   1.647 +};
   1.648 +
   1.649 +//
   1.650 +// ### function lock (callback)
   1.651 +// #### @callback {Function} continuation callback
   1.652 +// Acquire lock
   1.653 +//
   1.654 +Stream.prototype._lock = function lock(callback) {
   1.655 +  if (!callback) return;
   1.656 +
   1.657 +  if (this._locked) {
   1.658 +    this._lockBuffer.push(callback);
   1.659 +  } else {
   1.660 +    this._locked = true;
   1.661 +    callback.call(this, null);
   1.662 +  }
   1.663 +};
   1.664 +
   1.665 +//
   1.666 +// ### function unlock ()
   1.667 +// Release lock and call all buffered callbacks
   1.668 +//
   1.669 +Stream.prototype._unlock = function unlock() {
   1.670 +  if (this._locked) {
   1.671 +    this._locked = false;
   1.672 +    this._lock(this._lockBuffer.shift());
   1.673 +  }
   1.674 +};
   1.675 +
   1.676 +//
   1.677 +// ### function setTimeout ()
   1.678 +// TODO: use timers.enroll, timers.active, timers.unenroll
   1.679 +//
   1.680 +Stream.prototype.setTimeout = function setTimeout(time) {};
   1.681 +
   1.682 +//
   1.683 +// ### function _handleClose ()
   1.684 +// Close stream if it was closed by both server and client
   1.685 +//
   1.686 +Stream.prototype._handleClose = function _handleClose() {
   1.687 +  if (this._closedBy.client && this._closedBy.server) {
   1.688 +    this.close();
   1.689 +  }
   1.690 +};
   1.691 +
   1.692 +//
   1.693 +// ### function close ()
   1.694 +// Destroys stream
   1.695 +//
   1.696 +Stream.prototype.close = function close() {
   1.697 +  this.destroy();
   1.698 +};
   1.699 +
   1.700 +//
   1.701 +// ### function destroy (error)
   1.702 +// #### @error {Error} (optional) error
   1.703 +// Destroys stream
   1.704 +//
   1.705 +Stream.prototype.destroy = function destroy(error) {
   1.706 +  if (this._destroyed) return;
   1.707 +  this._destroyed = true;
   1.708 +
   1.709 +  delete this.connection.streams[this.id];
   1.710 +  if (this.id % 2 === 1) {
   1.711 +    this.connection.streamsCount--;
   1.712 +  }
   1.713 +
   1.714 +  // If stream is not finished, RST frame should be sent to notify client
   1.715 +  // about sudden stream termination.
   1.716 +  if (error || !this._closedBy.server) {
   1.717 +    // REFUSED_STREAM if terminated before 'finish' event
   1.718 +    if (!this._closedBy.server) this._rstCode = 3;
   1.719 +
   1.720 +    if (this._rstCode) {
   1.721 +      this._lock(function() {
   1.722 +        this.connection.scheduler.schedule(
   1.723 +          this,
   1.724 +          this._framer.rstFrame(this.id, this._rstCode));
   1.725 +        this.connection.scheduler.tick();
   1.726 +
   1.727 +        this._unlock();
   1.728 +      });
   1.729 +    }
   1.730 +  }
   1.731 +
   1.732 +  if (legacy) {
   1.733 +    this.emit('end');
   1.734 +  } else {
   1.735 +    this.push(null);
   1.736 +  }
   1.737 +
   1.738 +  if (error) this.emit('error', error);
   1.739 +
   1.740 +  var self = this;
   1.741 +  process.nextTick(function() {
   1.742 +    self.emit('close', !!error);
   1.743 +  });
   1.744 +};
   1.745 +
   1.746 +Stream.prototype.destroySoon = function destroySoon(error) {
   1.747 +  return this.destroy(error);
   1.748 +};
   1.749 +
   1.750 +Stream.prototype._drainSink = function _drainSink(size) {
   1.751 +  var oldBuffer = this._sinkBuffer;
   1.752 +  this._sinkBuffer = [];
   1.753 +
   1.754 +  this._sinkSize += size;
   1.755 +
   1.756 +  for (var i = 0; i < oldBuffer.length; i++) {
   1.757 +    this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]);
   1.758 +  }
   1.759 +
   1.760 +  // Handle half-close
   1.761 +  if (this._sinkBuffer.length === 0 && this._closedBy.server) {
   1.762 +    this._handleClose();
   1.763 +  }
   1.764 +
   1.765 +  if (legacy) this.emit('drain');
   1.766 +};
   1.767 +
   1.768 +//
   1.769 +// ### function _writeData (fin, buffer, cb)
   1.770 +// #### @fin {Boolean}
   1.771 +// #### @buffer {Buffer}
   1.772 +// #### @cb {Function} **optional**
   1.773 +// Internal function
   1.774 +//
   1.775 +Stream.prototype._writeData = function _writeData(fin, buffer, cb) {
   1.776 +  if (this._framer.version === 3) {
   1.777 +    // Window was exhausted, queue data
   1.778 +    if (this._sinkSize <= 0) {
   1.779 +      this._sinkBuffer.push([fin, buffer, cb]);
   1.780 +      return false;
   1.781 +    }
   1.782 +
   1.783 +    var len = Math.min(this._sinkSize, buffer.length);
   1.784 +    this._sinkSize -= len;
   1.785 +
   1.786 +    // Only partial write is possible, queue rest for later
   1.787 +    if (len < buffer.length) {
   1.788 +      this._sinkBuffer.push([fin, buffer.slice(len)]);
   1.789 +      buffer = buffer.slice(0, len);
   1.790 +      fin = false;
   1.791 +    }
   1.792 +  }
   1.793 +
   1.794 +  this._lock(function() {
   1.795 +    var stream = this,
   1.796 +        frame = this._framer.dataFrame(this.id, fin, buffer);
   1.797 +
   1.798 +    stream.connection.scheduler.schedule(stream, frame);
   1.799 +    stream.connection.scheduler.tick();
   1.800 +
   1.801 +    this._unlock();
   1.802 +
   1.803 +    if (cb) cb();
   1.804 +  });
   1.805 +
   1.806 +  return true;
   1.807 +};
   1.808 +
   1.809 +//
   1.810 +// ### function write (data, encoding)
   1.811 +// #### @data {Buffer|String} data
   1.812 +// #### @encoding {String} data encoding
   1.813 +// Writes data to connection
   1.814 +//
   1.815 +Stream.prototype._write = function write(data, encoding, cb) {
   1.816 +  // Do not send data to new connections after GOAWAY
   1.817 +  if (this._isGoaway()) {
   1.818 +    if (cb) cb();
   1.819 +    return false;
   1.820 +  }
   1.821 +
   1.822 +  return this._writeData(false, data, cb);
   1.823 +};
   1.824 +
   1.825 +if (legacy) {
   1.826 +  Stream.prototype.write = function write(data, encoding, cb) {
   1.827 +    if (!Buffer.isBuffer(data)) {
   1.828 +      return this._write(new Buffer(data, encoding), null, cb);
   1.829 +    } else {
   1.830 +      return this._write(data, encoding, cb);
   1.831 +    }
   1.832 +  };
   1.833 +
   1.834 +  //
   1.835 +  // ### function end (data)
   1.836 +  // #### @data {Buffer|String} (optional) data to write before ending stream
   1.837 +  // #### @encoding {String} (optional) string encoding
   1.838 +  // Send FIN data frame
   1.839 +  //
   1.840 +  Stream.prototype.end = function end(data, encoding) {
   1.841 +    // Do not send data to new connections after GOAWAY
   1.842 +    if (this._isGoaway()) return;
   1.843 +
   1.844 +    if (data) this.write(data, encoding);
   1.845 +    this.emit('finish');
   1.846 +  };
   1.847 +}
   1.848 +
   1.849 +//
   1.850 +// ### function _recv (data)
   1.851 +// #### @data {Buffer} buffer to receive
   1.852 +// #### @chunked {Boolean}
   1.853 +// (internal)
   1.854 +//
   1.855 +Stream.prototype._recv = function _recv(data, chunked) {
   1.856 +  // Update window if exhausted
   1.857 +  if (!chunked && this._framer.version >= 3 && this._initialized) {
   1.858 +    this._windowSize -= data.length;
   1.859 +
   1.860 +    if (this._windowSize <= 0) {
   1.861 +      var delta = this._initialWindowSize - this._windowSize;
   1.862 +      this._windowSize += delta;
   1.863 +      this.connection.write(this._framer.windowUpdateFrame(this.id, delta));
   1.864 +    }
   1.865 +  }
   1.866 +
   1.867 +  // Emulate chunked encoding
   1.868 +  if (this._forceChunked && !chunked) {
   1.869 +    // Zero-chunks are treated as end, do not emit them
   1.870 +    if (data.length === 0) return;
   1.871 +
   1.872 +    this._recv(new Buffer(data.length.toString(16)), true);
   1.873 +    this._recv(crlf, true);
   1.874 +    this._recv(data, true);
   1.875 +    this._recv(crlf, true);
   1.876 +    return;
   1.877 +  }
   1.878 +
   1.879 +  if (legacy) {
   1.880 +    var self = this;
   1.881 +    process.nextTick(function() {
   1.882 +      self.emit('data', data);
   1.883 +      if (self.ondata) {
   1.884 +        self.ondata(data, 0, data.length);
   1.885 +      }
   1.886 +    });
   1.887 +  } else {
   1.888 +    // Right now, http module expects socket to be working in streams1 mode.
   1.889 +    if (this.ondata) {
   1.890 +      this.ondata(data, 0, data.length);
   1.891 +    } else {
   1.892 +      this.push(data);
   1.893 +    }
   1.894 +  }
   1.895 +};
   1.896 +
   1.897 +//
   1.898 +// ### function _read (bytes, cb)
   1.899 +// #### @bytes {Number} number of bytes to read
   1.900 +// Streams2 API
   1.901 +//
   1.902 +Stream.prototype._read = function read(bytes) {
   1.903 +  // NOP
   1.904 +};
   1.905 +
   1.906 +//
   1.907 +// ### function _updateSinkSize (size)
   1.908 +// #### @size {Integer}
   1.909 +// Update the internal data transfer window
   1.910 +//
   1.911 +Stream.prototype._updateSinkSize = function _updateSinkSize(size) {
   1.912 +  var diff = size - this._initialSinkSize;
   1.913 +
   1.914 +  this._initialSinkSize = size;
   1.915 +  this._drainSink(diff);
   1.916 +};
   1.917 +
   1.918 +//
   1.919 +// `net` compatibility layer
   1.920 +// (Copy pasted from lib/tls.js from node.js)
   1.921 +//
   1.922 +Stream.prototype.address = function address() {
   1.923 +  return this.socket && this.socket.address();
   1.924 +};
   1.925 +
   1.926 +Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() {
   1.927 +  return this.socket && this.socket.remoteAddress;
   1.928 +});
   1.929 +
   1.930 +Stream.prototype.__defineGetter__('remotePort', function remotePort() {
   1.931 +  return this.socket && this.socket.remotePort;
   1.932 +});

mercurial