Wed, 31 Dec 2014 06:55:46 +0100
Added tag TORBROWSER_REPLICA for changeset 6474c204b198
michael@0 | 1 | var spdy = require('../spdy'), |
michael@0 | 2 | util = require('util'), |
michael@0 | 3 | https = require('https'), |
michael@0 | 4 | stream = require('stream'), |
michael@0 | 5 | Buffer = require('buffer').Buffer; |
michael@0 | 6 | |
michael@0 | 7 | var crlf = new Buffer('\r\n'); |
michael@0 | 8 | var last_frag = new Buffer('0\r\n\r\n'); |
michael@0 | 9 | |
michael@0 | 10 | var legacy = !stream.Duplex; |
michael@0 | 11 | |
michael@0 | 12 | if (legacy) { |
michael@0 | 13 | var DuplexStream = stream; |
michael@0 | 14 | } else { |
michael@0 | 15 | var DuplexStream = stream.Duplex; |
michael@0 | 16 | } |
michael@0 | 17 | |
michael@0 | 18 | // |
michael@0 | 19 | // ### function instantiate (HTTPSServer) |
michael@0 | 20 | // #### @HTTPSServer {https.Server|Function} Base server class |
michael@0 | 21 | // Will return constructor for SPDY Server, based on the HTTPSServer class |
michael@0 | 22 | // |
michael@0 | 23 | function instantiate(HTTPSServer) { |
michael@0 | 24 | // |
michael@0 | 25 | // ### function Server (options, requestListener) |
michael@0 | 26 | // #### @options {Object} tls server options |
michael@0 | 27 | // #### @requestListener {Function} (optional) request callback |
michael@0 | 28 | // SPDY Server @constructor |
michael@0 | 29 | // |
michael@0 | 30 | function Server(options, requestListener) { |
michael@0 | 31 | // Initialize |
michael@0 | 32 | this._init(HTTPSServer, options, requestListener); |
michael@0 | 33 | |
michael@0 | 34 | // Wrap connection handler |
michael@0 | 35 | this._wrap(); |
michael@0 | 36 | }; |
michael@0 | 37 | util.inherits(Server, HTTPSServer); |
michael@0 | 38 | |
michael@0 | 39 | // Copy prototype methods |
michael@0 | 40 | Object.keys(proto).forEach(function(key) { |
michael@0 | 41 | this[key] = proto[key]; |
michael@0 | 42 | }, Server.prototype); |
michael@0 | 43 | |
michael@0 | 44 | return Server; |
michael@0 | 45 | } |
michael@0 | 46 | exports.instantiate = instantiate; |
michael@0 | 47 | |
michael@0 | 48 | // Common prototype for all servers |
michael@0 | 49 | var proto = {}; |
michael@0 | 50 | |
michael@0 | 51 | // |
michael@0 | 52 | // ### function _init(base, options, listener) |
michael@0 | 53 | // #### @base {Function} (optional) base server class (https.Server) |
michael@0 | 54 | // #### @options {Object} tls server options |
michael@0 | 55 | // #### @handler {Function} (optional) request handler |
michael@0 | 56 | // Initializer. |
michael@0 | 57 | // |
michael@0 | 58 | proto._init = function _init(base, options, handler) { |
michael@0 | 59 | var state = {}; |
michael@0 | 60 | this._spdyState = state; |
michael@0 | 61 | |
michael@0 | 62 | if (!options) options = {}; |
michael@0 | 63 | if (!options.maxStreams) options.maxStreams = 100; |
michael@0 | 64 | if (!options.sinkSize) { |
michael@0 | 65 | options.sinkSize = 1 << 16; |
michael@0 | 66 | } |
michael@0 | 67 | if (!options.windowSize) { |
michael@0 | 68 | options.windowSize = 1 << 20; // 1mb |
michael@0 | 69 | } |
michael@0 | 70 | |
michael@0 | 71 | options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; |
michael@0 | 72 | options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; |
michael@0 | 73 | state.options = options; |
michael@0 | 74 | state.reqHandler = handler; |
michael@0 | 75 | |
michael@0 | 76 | if (options.plain && !options.ssl) { |
michael@0 | 77 | base.call(this, handler); |
michael@0 | 78 | } else { |
michael@0 | 79 | base.call(this, options, handler); |
michael@0 | 80 | } |
michael@0 | 81 | |
michael@0 | 82 | // Use https if NPN is not supported |
michael@0 | 83 | if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) { |
michael@0 | 84 | return; |
michael@0 | 85 | } |
michael@0 | 86 | }; |
michael@0 | 87 | |
michael@0 | 88 | // |
michael@0 | 89 | // ### function _wrap() |
michael@0 | 90 | // Wrap connection handler and add logic. |
michael@0 | 91 | // |
michael@0 | 92 | proto._wrap = function _wrap() { |
michael@0 | 93 | var self = this, |
michael@0 | 94 | state = this._spdyState; |
michael@0 | 95 | |
michael@0 | 96 | // Wrap connection handler |
michael@0 | 97 | var event = state.options.plain && !state.options.ssl ? 'connection' : |
michael@0 | 98 | 'secureConnection', |
michael@0 | 99 | handler = this.listeners(event)[0]; |
michael@0 | 100 | |
michael@0 | 101 | state.pool = spdy.zlibpool.create(); |
michael@0 | 102 | state.handler = handler; |
michael@0 | 103 | |
michael@0 | 104 | this.removeAllListeners(event); |
michael@0 | 105 | |
michael@0 | 106 | // Normal mode, use NPN to fallback to HTTPS |
michael@0 | 107 | if (!state.options.plain) { |
michael@0 | 108 | return this.on(event, this._onConnection.bind(this)); |
michael@0 | 109 | } |
michael@0 | 110 | |
michael@0 | 111 | // In case of plain connection, we must fallback to HTTPS if first byte |
michael@0 | 112 | // is not equal to 0x80. |
michael@0 | 113 | this.on(event, function(socket) { |
michael@0 | 114 | var history = [], |
michael@0 | 115 | _emit = socket.emit; |
michael@0 | 116 | |
michael@0 | 117 | // Add 'data' listener, otherwise 'data' events won't be emitted |
michael@0 | 118 | if (legacy) { |
michael@0 | 119 | function ondata() {}; |
michael@0 | 120 | socket.once('data', ondata); |
michael@0 | 121 | } |
michael@0 | 122 | |
michael@0 | 123 | // 2 minutes timeout, as http.js does by default |
michael@0 | 124 | socket.setTimeout(self.timeout || 2 * 60 * 1000); |
michael@0 | 125 | |
michael@0 | 126 | socket.emit = function emit(event, data) { |
michael@0 | 127 | history.push(Array.prototype.slice.call(arguments)); |
michael@0 | 128 | |
michael@0 | 129 | if (event === 'data') { |
michael@0 | 130 | // Legacy |
michael@0 | 131 | onFirstByte.call(socket, data); |
michael@0 | 132 | } else if (event === 'readable') { |
michael@0 | 133 | // Streams |
michael@0 | 134 | onReadable.call(socket); |
michael@0 | 135 | } else if (event === 'end' || |
michael@0 | 136 | event === 'close' || |
michael@0 | 137 | event === 'error' || |
michael@0 | 138 | event === 'timeout') { |
michael@0 | 139 | // We shouldn't get there if any data was received |
michael@0 | 140 | fail(); |
michael@0 | 141 | } |
michael@0 | 142 | }; |
michael@0 | 143 | |
michael@0 | 144 | function fail() { |
michael@0 | 145 | socket.emit = _emit; |
michael@0 | 146 | history = null; |
michael@0 | 147 | try { |
michael@0 | 148 | socket.destroy(); |
michael@0 | 149 | } catch (e) { |
michael@0 | 150 | } |
michael@0 | 151 | } |
michael@0 | 152 | |
michael@0 | 153 | function restore() { |
michael@0 | 154 | var copy = history.slice(); |
michael@0 | 155 | history = null; |
michael@0 | 156 | |
michael@0 | 157 | if (legacy) socket.removeListener('data', ondata); |
michael@0 | 158 | socket.emit = _emit; |
michael@0 | 159 | for (var i = 0; i < copy.length; i++) { |
michael@0 | 160 | socket.emit.apply(socket, copy[i]); |
michael@0 | 161 | if (copy[i][0] === 'end') { |
michael@0 | 162 | if (socket.onend) socket.onend(); |
michael@0 | 163 | } |
michael@0 | 164 | } |
michael@0 | 165 | } |
michael@0 | 166 | |
michael@0 | 167 | function onFirstByte(data) { |
michael@0 | 168 | // Ignore empty packets |
michael@0 | 169 | if (data.length === 0) return; |
michael@0 | 170 | |
michael@0 | 171 | if (data[0] === 0x80) { |
michael@0 | 172 | self._onConnection(socket); |
michael@0 | 173 | } else { |
michael@0 | 174 | handler.call(self, socket); |
michael@0 | 175 | } |
michael@0 | 176 | |
michael@0 | 177 | // Fire events |
michael@0 | 178 | restore(); |
michael@0 | 179 | |
michael@0 | 180 | // NOTE: If we came there - .ondata() will be called anyway in this tick, |
michael@0 | 181 | // so there're no need to call it manually |
michael@0 | 182 | }; |
michael@0 | 183 | |
michael@0 | 184 | if (!legacy) { |
michael@0 | 185 | // Hack to make streams2 work properly |
michael@0 | 186 | socket.on('readable', onReadable); |
michael@0 | 187 | } |
michael@0 | 188 | |
michael@0 | 189 | function onReadable() { |
michael@0 | 190 | var data = socket.read(1); |
michael@0 | 191 | |
michael@0 | 192 | // Ignore empty packets |
michael@0 | 193 | if (!data) return; |
michael@0 | 194 | socket.removeListener('readable', onReadable); |
michael@0 | 195 | |
michael@0 | 196 | // `.unshift()` emits `readable` event. Thus `emit` method should |
michael@0 | 197 | // be restored before calling it. |
michael@0 | 198 | socket.emit = _emit; |
michael@0 | 199 | |
michael@0 | 200 | // Put packet back where it was before |
michael@0 | 201 | socket.unshift(data); |
michael@0 | 202 | |
michael@0 | 203 | if (data[0] === 0x80) { |
michael@0 | 204 | self._onConnection(socket); |
michael@0 | 205 | } else { |
michael@0 | 206 | handler.call(self, socket); |
michael@0 | 207 | } |
michael@0 | 208 | |
michael@0 | 209 | // Fire events |
michael@0 | 210 | restore(); |
michael@0 | 211 | |
michael@0 | 212 | if (socket.ondata) { |
michael@0 | 213 | data = socket.read(socket._readableState.length); |
michael@0 | 214 | if (data) socket.ondata(data, 0, data.length); |
michael@0 | 215 | } |
michael@0 | 216 | } |
michael@0 | 217 | }); |
michael@0 | 218 | }; |
michael@0 | 219 | |
michael@0 | 220 | // |
michael@0 | 221 | // ### function _onConnection (socket) |
michael@0 | 222 | // #### @socket {Stream} incoming socket |
michael@0 | 223 | // Server's connection handler wrapper. |
michael@0 | 224 | // |
michael@0 | 225 | proto._onConnection = function _onConnection(socket) { |
michael@0 | 226 | var self = this, |
michael@0 | 227 | state = this._spdyState; |
michael@0 | 228 | |
michael@0 | 229 | // Fallback to HTTPS if needed |
michael@0 | 230 | var selectedProtocol = socket.npnProtocol || socket.alpnProtocol; |
michael@0 | 231 | if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) && |
michael@0 | 232 | !state.options.debug && !state.options.plain) { |
michael@0 | 233 | return state.handler.call(this, socket); |
michael@0 | 234 | } |
michael@0 | 235 | |
michael@0 | 236 | // Wrap incoming socket into abstract class |
michael@0 | 237 | var connection = new Connection(socket, state.pool, state.options); |
michael@0 | 238 | |
michael@0 | 239 | // Emulate each stream like connection |
michael@0 | 240 | connection.on('stream', state.handler); |
michael@0 | 241 | |
michael@0 | 242 | connection.on('connect', function onconnect(req, socket) { |
michael@0 | 243 | socket.streamID = req.streamID = req.socket.id; |
michael@0 | 244 | socket.isSpdy = req.isSpdy = true; |
michael@0 | 245 | socket.spdyVersion = req.spdyVersion = req.socket.version; |
michael@0 | 246 | |
michael@0 | 247 | socket.once('finish', function onfinish() { |
michael@0 | 248 | req.connection.end(); |
michael@0 | 249 | }); |
michael@0 | 250 | |
michael@0 | 251 | self.emit('connect', req, socket); |
michael@0 | 252 | }); |
michael@0 | 253 | |
michael@0 | 254 | connection.on('request', function onrequest(req, res) { |
michael@0 | 255 | res._renderHeaders = spdy.response._renderHeaders; |
michael@0 | 256 | res.writeHead = spdy.response.writeHead; |
michael@0 | 257 | res.push = spdy.response.push; |
michael@0 | 258 | res.streamID = req.streamID = req.socket.id; |
michael@0 | 259 | res.spdyVersion = req.spdyVersion = req.socket.version; |
michael@0 | 260 | res.isSpdy = req.isSpdy = true; |
michael@0 | 261 | |
michael@0 | 262 | // Chunked encoding is not supported in SPDY |
michael@0 | 263 | res.useChunkedEncodingByDefault = false; |
michael@0 | 264 | |
michael@0 | 265 | res.once('finish', function onfinish() { |
michael@0 | 266 | req.connection.end(); |
michael@0 | 267 | }); |
michael@0 | 268 | |
michael@0 | 269 | self.emit('request', req, res); |
michael@0 | 270 | }); |
michael@0 | 271 | |
michael@0 | 272 | connection.on('error', function onerror(e) { |
michael@0 | 273 | console.log('[secureConnection] error ' + e); |
michael@0 | 274 | socket.destroy(e.errno === 'EPIPE' ? undefined : e); |
michael@0 | 275 | }); |
michael@0 | 276 | }; |
michael@0 | 277 | |
michael@0 | 278 | // Export Server instantiated from https.Server |
michael@0 | 279 | var Server = instantiate(https.Server); |
michael@0 | 280 | exports.Server = Server; |
michael@0 | 281 | |
michael@0 | 282 | // |
michael@0 | 283 | // ### function create (base, options, requestListener) |
michael@0 | 284 | // #### @base {Function} (optional) base server class (https.Server) |
michael@0 | 285 | // #### @options {Object} tls server options |
michael@0 | 286 | // #### @requestListener {Function} (optional) request callback |
michael@0 | 287 | // @constructor wrapper |
michael@0 | 288 | // |
michael@0 | 289 | exports.create = function create(base, options, requestListener) { |
michael@0 | 290 | var server; |
michael@0 | 291 | if (typeof base === 'function') { |
michael@0 | 292 | server = instantiate(base); |
michael@0 | 293 | } else { |
michael@0 | 294 | server = Server; |
michael@0 | 295 | |
michael@0 | 296 | requestListener = options; |
michael@0 | 297 | options = base; |
michael@0 | 298 | base = null; |
michael@0 | 299 | } |
michael@0 | 300 | |
michael@0 | 301 | return new server(options, requestListener); |
michael@0 | 302 | }; |
michael@0 | 303 | |
michael@0 | 304 | // |
michael@0 | 305 | // ### function Connection (socket, pool, options) |
michael@0 | 306 | // #### @socket {net.Socket} server's connection |
michael@0 | 307 | // #### @pool {spdy.ZlibPool} zlib pool |
michael@0 | 308 | // #### @options {Object} server's options |
michael@0 | 309 | // Abstract connection @constructor |
michael@0 | 310 | // |
michael@0 | 311 | function Connection(socket, pool, options) { |
michael@0 | 312 | process.EventEmitter.call(this); |
michael@0 | 313 | |
michael@0 | 314 | var self = this; |
michael@0 | 315 | |
michael@0 | 316 | this._closed = false; |
michael@0 | 317 | |
michael@0 | 318 | this.pool = pool; |
michael@0 | 319 | var pair = null; |
michael@0 | 320 | |
michael@0 | 321 | this._deflate = null; |
michael@0 | 322 | this._inflate = null; |
michael@0 | 323 | |
michael@0 | 324 | this.encrypted = socket.encrypted; |
michael@0 | 325 | |
michael@0 | 326 | // Init streams list |
michael@0 | 327 | this.streams = {}; |
michael@0 | 328 | this.streamsCount = 0; |
michael@0 | 329 | this.pushId = 0; |
michael@0 | 330 | this._goaway = false; |
michael@0 | 331 | |
michael@0 | 332 | this._framer = null; |
michael@0 | 333 | |
michael@0 | 334 | // Data transfer window defaults to 64kb |
michael@0 | 335 | this.windowSize = options.windowSize; |
michael@0 | 336 | this.sinkSize = options.sinkSize; |
michael@0 | 337 | |
michael@0 | 338 | // Initialize scheduler |
michael@0 | 339 | this.scheduler = spdy.scheduler.create(this); |
michael@0 | 340 | |
michael@0 | 341 | // Store socket and pipe it to parser |
michael@0 | 342 | this.socket = socket; |
michael@0 | 343 | |
michael@0 | 344 | // Initialize parser |
michael@0 | 345 | this.parser = spdy.parser.create(this); |
michael@0 | 346 | this.parser.on('frame', function (frame) { |
michael@0 | 347 | if (this._closed) return; |
michael@0 | 348 | |
michael@0 | 349 | var stream; |
michael@0 | 350 | |
michael@0 | 351 | // Create new stream |
michael@0 | 352 | if (frame.type === 'SYN_STREAM') { |
michael@0 | 353 | self.streamsCount++; |
michael@0 | 354 | |
michael@0 | 355 | stream = self.streams[frame.id] = new Stream(self, frame); |
michael@0 | 356 | |
michael@0 | 357 | // If we reached stream limit |
michael@0 | 358 | if (self.streamsCount > options.maxStreams) { |
michael@0 | 359 | stream.once('error', function onerror() {}); |
michael@0 | 360 | // REFUSED_STREAM |
michael@0 | 361 | stream._rstCode = 3; |
michael@0 | 362 | stream.destroy(true); |
michael@0 | 363 | } else { |
michael@0 | 364 | self.emit('stream', stream); |
michael@0 | 365 | |
michael@0 | 366 | stream._init(); |
michael@0 | 367 | } |
michael@0 | 368 | } else { |
michael@0 | 369 | if (frame.id) { |
michael@0 | 370 | // Load created one |
michael@0 | 371 | stream = self.streams[frame.id]; |
michael@0 | 372 | |
michael@0 | 373 | // Fail if not found |
michael@0 | 374 | if (stream === undefined) { |
michael@0 | 375 | if (frame.type === 'RST_STREAM') return; |
michael@0 | 376 | self.write(self._framer.rstFrame(frame.id, 2)); |
michael@0 | 377 | return; |
michael@0 | 378 | } |
michael@0 | 379 | } |
michael@0 | 380 | |
michael@0 | 381 | // Emit 'data' event |
michael@0 | 382 | if (frame.type === 'DATA') { |
michael@0 | 383 | if (frame.data.length > 0){ |
michael@0 | 384 | if (stream._closedBy.client) { |
michael@0 | 385 | stream._rstCode = 2; |
michael@0 | 386 | stream.emit('error', 'Writing to half-closed stream'); |
michael@0 | 387 | } else { |
michael@0 | 388 | stream._recv(frame.data); |
michael@0 | 389 | } |
michael@0 | 390 | } |
michael@0 | 391 | // Destroy stream if we was asked to do this |
michael@0 | 392 | } else if (frame.type === 'RST_STREAM') { |
michael@0 | 393 | stream._rstCode = 0; |
michael@0 | 394 | if (frame.status === 5) { |
michael@0 | 395 | // If client "cancels" connection - close stream and |
michael@0 | 396 | // all associated push streams without error |
michael@0 | 397 | stream.pushes.forEach(function(stream) { |
michael@0 | 398 | stream.close(); |
michael@0 | 399 | }); |
michael@0 | 400 | stream.close(); |
michael@0 | 401 | } else { |
michael@0 | 402 | // Emit error on destroy |
michael@0 | 403 | stream.destroy(new Error('Received rst: ' + frame.status)); |
michael@0 | 404 | } |
michael@0 | 405 | // Respond with same PING |
michael@0 | 406 | } else if (frame.type === 'PING') { |
michael@0 | 407 | self.write(self._framer.pingFrame(frame.pingId)); |
michael@0 | 408 | } else if (frame.type === 'SETTINGS') { |
michael@0 | 409 | self._setDefaultWindow(frame.settings); |
michael@0 | 410 | } else if (frame.type === 'GOAWAY') { |
michael@0 | 411 | self._goaway = frame.lastId; |
michael@0 | 412 | } else if (frame.type === 'WINDOW_UPDATE') { |
michael@0 | 413 | stream._drainSink(frame.delta); |
michael@0 | 414 | } else { |
michael@0 | 415 | console.error('Unknown type: ', frame.type); |
michael@0 | 416 | } |
michael@0 | 417 | } |
michael@0 | 418 | |
michael@0 | 419 | // Handle half-closed |
michael@0 | 420 | if (frame.fin) { |
michael@0 | 421 | // Don't allow to close stream twice |
michael@0 | 422 | if (stream._closedBy.client) { |
michael@0 | 423 | stream._rstCode = 2; |
michael@0 | 424 | stream.emit('error', 'Already half-closed'); |
michael@0 | 425 | } else { |
michael@0 | 426 | stream._closedBy.client = true; |
michael@0 | 427 | |
michael@0 | 428 | // Emulate last chunked fragment |
michael@0 | 429 | if (stream._forceChunked) { |
michael@0 | 430 | stream._recv(last_frag, true); |
michael@0 | 431 | } |
michael@0 | 432 | |
michael@0 | 433 | stream._handleClose(); |
michael@0 | 434 | } |
michael@0 | 435 | } |
michael@0 | 436 | }); |
michael@0 | 437 | |
michael@0 | 438 | this.parser.on('version', function onversion(version) { |
michael@0 | 439 | if (!pair) { |
michael@0 | 440 | pair = pool.get('spdy/' + version); |
michael@0 | 441 | self._deflate = pair.deflate; |
michael@0 | 442 | self._inflate = pair.inflate; |
michael@0 | 443 | } |
michael@0 | 444 | }); |
michael@0 | 445 | |
michael@0 | 446 | this.parser.on('framer', function onframer(framer) { |
michael@0 | 447 | // Generate custom settings frame and send |
michael@0 | 448 | self.write(framer.settingsFrame(options)); |
michael@0 | 449 | }); |
michael@0 | 450 | |
michael@0 | 451 | // Propagate parser errors |
michael@0 | 452 | this.parser.on('error', function onParserError(err) { |
michael@0 | 453 | self.emit('error', err); |
michael@0 | 454 | }); |
michael@0 | 455 | |
michael@0 | 456 | socket.pipe(this.parser); |
michael@0 | 457 | |
michael@0 | 458 | // 2 minutes socket timeout |
michael@0 | 459 | socket.setTimeout(2 * 60 * 1000); |
michael@0 | 460 | socket.once('timeout', function ontimeout() { |
michael@0 | 461 | socket.destroy(); |
michael@0 | 462 | }); |
michael@0 | 463 | |
michael@0 | 464 | // Allow high-level api to catch socket errors |
michael@0 | 465 | socket.on('error', function onSocketError(e) { |
michael@0 | 466 | self.emit('error', e); |
michael@0 | 467 | }); |
michael@0 | 468 | |
michael@0 | 469 | socket.once('close', function onclose() { |
michael@0 | 470 | self._closed = true; |
michael@0 | 471 | if (pair) pool.put(pair); |
michael@0 | 472 | }); |
michael@0 | 473 | |
michael@0 | 474 | if (legacy) { |
michael@0 | 475 | socket.on('drain', function ondrain() { |
michael@0 | 476 | self.emit('drain'); |
michael@0 | 477 | }); |
michael@0 | 478 | } |
michael@0 | 479 | } |
michael@0 | 480 | util.inherits(Connection, process.EventEmitter); |
michael@0 | 481 | exports.Connection = Connection; |
michael@0 | 482 | |
michael@0 | 483 | // |
michael@0 | 484 | // ### function write (data, encoding) |
michael@0 | 485 | // #### @data {String|Buffer} data |
michael@0 | 486 | // #### @encoding {String} (optional) encoding |
michael@0 | 487 | // Writes data to socket |
michael@0 | 488 | // |
michael@0 | 489 | Connection.prototype.write = function write(data, encoding) { |
michael@0 | 490 | if (this.socket.writable) { |
michael@0 | 491 | return this.socket.write(data, encoding); |
michael@0 | 492 | } |
michael@0 | 493 | }; |
michael@0 | 494 | |
michael@0 | 495 | // |
michael@0 | 496 | // ### function _setDefaultWindow (settings) |
michael@0 | 497 | // #### @settings {Object} |
michael@0 | 498 | // Update the default transfer window -- in the connection and in the |
michael@0 | 499 | // active streams |
michael@0 | 500 | // |
michael@0 | 501 | Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) { |
michael@0 | 502 | if (!settings) return; |
michael@0 | 503 | if (!settings.initial_window_size || |
michael@0 | 504 | settings.initial_window_size.persisted) { |
michael@0 | 505 | return; |
michael@0 | 506 | } |
michael@0 | 507 | |
michael@0 | 508 | this.sinkSize = settings.initial_window_size.value; |
michael@0 | 509 | |
michael@0 | 510 | Object.keys(this.streams).forEach(function(id) { |
michael@0 | 511 | this.streams[id]._updateSinkSize(settings.initial_window_size.value); |
michael@0 | 512 | }, this); |
michael@0 | 513 | }; |
michael@0 | 514 | |
michael@0 | 515 | // |
michael@0 | 516 | // ### function Stream (connection, frame) |
michael@0 | 517 | // #### @connection {Connection} SPDY Connection |
michael@0 | 518 | // #### @frame {Object} SYN_STREAM data |
michael@0 | 519 | // Abstract stream @constructor |
michael@0 | 520 | // |
michael@0 | 521 | function Stream(connection, frame) { |
michael@0 | 522 | DuplexStream.call(this); |
michael@0 | 523 | |
michael@0 | 524 | this.connection = connection; |
michael@0 | 525 | this.socket = connection.socket; |
michael@0 | 526 | this.encrypted = connection.encrypted; |
michael@0 | 527 | this._framer = connection._framer; |
michael@0 | 528 | this._initialized = false; |
michael@0 | 529 | |
michael@0 | 530 | // Should chunked encoding be forced |
michael@0 | 531 | this._forceChunked = false; |
michael@0 | 532 | |
michael@0 | 533 | this.ondata = this.onend = null; |
michael@0 | 534 | |
michael@0 | 535 | // RST_STREAM code if any |
michael@0 | 536 | this._rstCode = 1; |
michael@0 | 537 | this._destroyed = false; |
michael@0 | 538 | |
michael@0 | 539 | this._closedBy = { |
michael@0 | 540 | client: false, |
michael@0 | 541 | server: false |
michael@0 | 542 | }; |
michael@0 | 543 | |
michael@0 | 544 | // Lock data |
michael@0 | 545 | this._locked = false; |
michael@0 | 546 | this._lockBuffer = []; |
michael@0 | 547 | |
michael@0 | 548 | // Store id |
michael@0 | 549 | this.id = frame.id; |
michael@0 | 550 | this.version = frame.version; |
michael@0 | 551 | |
michael@0 | 552 | // Store priority |
michael@0 | 553 | this.priority = frame.priority; |
michael@0 | 554 | |
michael@0 | 555 | // Array of push streams associated to that one |
michael@0 | 556 | this.pushes = []; |
michael@0 | 557 | |
michael@0 | 558 | // How much data can be sent TO client before next WINDOW_UPDATE |
michael@0 | 559 | this._sinkSize = connection.sinkSize; |
michael@0 | 560 | this._initialSinkSize = connection.sinkSize; |
michael@0 | 561 | |
michael@0 | 562 | // When data needs to be send, but window is too small for it - it'll be |
michael@0 | 563 | // queued in this buffer |
michael@0 | 564 | this._sinkBuffer = []; |
michael@0 | 565 | |
michael@0 | 566 | // How much data can be sent BY client before next WINDOW_UPDATE |
michael@0 | 567 | this._initialWindowSize = connection.windowSize; |
michael@0 | 568 | this._windowSize = connection.windowSize; |
michael@0 | 569 | |
michael@0 | 570 | // Create compression streams |
michael@0 | 571 | this._deflate = connection._deflate; |
michael@0 | 572 | this._inflate = connection._inflate; |
michael@0 | 573 | |
michael@0 | 574 | // Store headers |
michael@0 | 575 | this.headers = frame.headers; |
michael@0 | 576 | this.url = frame.url; |
michael@0 | 577 | |
michael@0 | 578 | this._frame = frame; |
michael@0 | 579 | |
michael@0 | 580 | if (legacy) { |
michael@0 | 581 | this.readable = this.writable = true; |
michael@0 | 582 | } |
michael@0 | 583 | |
michael@0 | 584 | // Call .onend() |
michael@0 | 585 | this.once('end', function() { |
michael@0 | 586 | var self = this; |
michael@0 | 587 | process.nextTick(function() { |
michael@0 | 588 | if (self.onend) self.onend(); |
michael@0 | 589 | }); |
michael@0 | 590 | }); |
michael@0 | 591 | |
michael@0 | 592 | // Handle half-close |
michael@0 | 593 | this.once('finish', function() { |
michael@0 | 594 | this._writeData(true, []); |
michael@0 | 595 | this._closedBy.server = true; |
michael@0 | 596 | if (this._sinkBuffer.length !== 0) return; |
michael@0 | 597 | this._handleClose(); |
michael@0 | 598 | }); |
michael@0 | 599 | }; |
michael@0 | 600 | util.inherits(Stream, DuplexStream); |
michael@0 | 601 | exports.Stream = Stream; |
michael@0 | 602 | |
michael@0 | 603 | if (legacy) { |
michael@0 | 604 | Stream.prototype.pause = function pause() {}; |
michael@0 | 605 | Stream.prototype.resume = function resume() {}; |
michael@0 | 606 | } |
michael@0 | 607 | |
michael@0 | 608 | // |
michael@0 | 609 | // ### function _isGoaway () |
michael@0 | 610 | // Returns true if any writes to that stream should be ignored |
michael@0 | 611 | // |
michael@0 | 612 | Stream.prototype._isGoaway = function _isGoaway() { |
michael@0 | 613 | return this.connection._goaway && this.id > this.connection._goaway; |
michael@0 | 614 | }; |
michael@0 | 615 | |
michael@0 | 616 | // |
michael@0 | 617 | // ### function init () |
michael@0 | 618 | // Initialize stream, internal |
michael@0 | 619 | // |
michael@0 | 620 | Stream.prototype._init = function init() { |
michael@0 | 621 | var headers = this.headers, |
michael@0 | 622 | req = [headers.method + ' ' + this.url + ' ' + headers.version]; |
michael@0 | 623 | |
michael@0 | 624 | Object.keys(headers).forEach(function (key) { |
michael@0 | 625 | if (key !== 'method' && key !== 'url' && key !== 'version' && |
michael@0 | 626 | key !== 'scheme') { |
michael@0 | 627 | req.push(key + ': ' + headers[key]); |
michael@0 | 628 | } |
michael@0 | 629 | }); |
michael@0 | 630 | |
michael@0 | 631 | // Force chunked encoding |
michael@0 | 632 | if (!headers['content-length'] && !headers['transfer-encoding']) { |
michael@0 | 633 | req.push('Transfer-Encoding: chunked'); |
michael@0 | 634 | this._forceChunked = true; |
michael@0 | 635 | } |
michael@0 | 636 | |
michael@0 | 637 | // Add '\r\n\r\n' |
michael@0 | 638 | req.push('', ''); |
michael@0 | 639 | |
michael@0 | 640 | req = new Buffer(req.join('\r\n')); |
michael@0 | 641 | |
michael@0 | 642 | this._recv(req, true); |
michael@0 | 643 | this._initialized = true; |
michael@0 | 644 | }; |
michael@0 | 645 | |
michael@0 | 646 | // |
michael@0 | 647 | // ### function lock (callback) |
michael@0 | 648 | // #### @callback {Function} continuation callback |
michael@0 | 649 | // Acquire lock |
michael@0 | 650 | // |
michael@0 | 651 | Stream.prototype._lock = function lock(callback) { |
michael@0 | 652 | if (!callback) return; |
michael@0 | 653 | |
michael@0 | 654 | if (this._locked) { |
michael@0 | 655 | this._lockBuffer.push(callback); |
michael@0 | 656 | } else { |
michael@0 | 657 | this._locked = true; |
michael@0 | 658 | callback.call(this, null); |
michael@0 | 659 | } |
michael@0 | 660 | }; |
michael@0 | 661 | |
michael@0 | 662 | // |
michael@0 | 663 | // ### function unlock () |
michael@0 | 664 | // Release lock and call all buffered callbacks |
michael@0 | 665 | // |
michael@0 | 666 | Stream.prototype._unlock = function unlock() { |
michael@0 | 667 | if (this._locked) { |
michael@0 | 668 | this._locked = false; |
michael@0 | 669 | this._lock(this._lockBuffer.shift()); |
michael@0 | 670 | } |
michael@0 | 671 | }; |
michael@0 | 672 | |
michael@0 | 673 | // |
michael@0 | 674 | // ### function setTimeout () |
michael@0 | 675 | // TODO: use timers.enroll, timers.active, timers.unenroll |
michael@0 | 676 | // |
michael@0 | 677 | Stream.prototype.setTimeout = function setTimeout(time) {}; |
michael@0 | 678 | |
michael@0 | 679 | // |
michael@0 | 680 | // ### function _handleClose () |
michael@0 | 681 | // Close stream if it was closed by both server and client |
michael@0 | 682 | // |
michael@0 | 683 | Stream.prototype._handleClose = function _handleClose() { |
michael@0 | 684 | if (this._closedBy.client && this._closedBy.server) { |
michael@0 | 685 | this.close(); |
michael@0 | 686 | } |
michael@0 | 687 | }; |
michael@0 | 688 | |
michael@0 | 689 | // |
michael@0 | 690 | // ### function close () |
michael@0 | 691 | // Destroys stream |
michael@0 | 692 | // |
michael@0 | 693 | Stream.prototype.close = function close() { |
michael@0 | 694 | this.destroy(); |
michael@0 | 695 | }; |
michael@0 | 696 | |
michael@0 | 697 | // |
michael@0 | 698 | // ### function destroy (error) |
michael@0 | 699 | // #### @error {Error} (optional) error |
michael@0 | 700 | // Destroys stream |
michael@0 | 701 | // |
michael@0 | 702 | Stream.prototype.destroy = function destroy(error) { |
michael@0 | 703 | if (this._destroyed) return; |
michael@0 | 704 | this._destroyed = true; |
michael@0 | 705 | |
michael@0 | 706 | delete this.connection.streams[this.id]; |
michael@0 | 707 | if (this.id % 2 === 1) { |
michael@0 | 708 | this.connection.streamsCount--; |
michael@0 | 709 | } |
michael@0 | 710 | |
michael@0 | 711 | // If stream is not finished, RST frame should be sent to notify client |
michael@0 | 712 | // about sudden stream termination. |
michael@0 | 713 | if (error || !this._closedBy.server) { |
michael@0 | 714 | // REFUSED_STREAM if terminated before 'finish' event |
michael@0 | 715 | if (!this._closedBy.server) this._rstCode = 3; |
michael@0 | 716 | |
michael@0 | 717 | if (this._rstCode) { |
michael@0 | 718 | this._lock(function() { |
michael@0 | 719 | this.connection.scheduler.schedule( |
michael@0 | 720 | this, |
michael@0 | 721 | this._framer.rstFrame(this.id, this._rstCode)); |
michael@0 | 722 | this.connection.scheduler.tick(); |
michael@0 | 723 | |
michael@0 | 724 | this._unlock(); |
michael@0 | 725 | }); |
michael@0 | 726 | } |
michael@0 | 727 | } |
michael@0 | 728 | |
michael@0 | 729 | if (legacy) { |
michael@0 | 730 | this.emit('end'); |
michael@0 | 731 | } else { |
michael@0 | 732 | this.push(null); |
michael@0 | 733 | } |
michael@0 | 734 | |
michael@0 | 735 | if (error) this.emit('error', error); |
michael@0 | 736 | |
michael@0 | 737 | var self = this; |
michael@0 | 738 | process.nextTick(function() { |
michael@0 | 739 | self.emit('close', !!error); |
michael@0 | 740 | }); |
michael@0 | 741 | }; |
michael@0 | 742 | |
michael@0 | 743 | Stream.prototype.destroySoon = function destroySoon(error) { |
michael@0 | 744 | return this.destroy(error); |
michael@0 | 745 | }; |
michael@0 | 746 | |
michael@0 | 747 | Stream.prototype._drainSink = function _drainSink(size) { |
michael@0 | 748 | var oldBuffer = this._sinkBuffer; |
michael@0 | 749 | this._sinkBuffer = []; |
michael@0 | 750 | |
michael@0 | 751 | this._sinkSize += size; |
michael@0 | 752 | |
michael@0 | 753 | for (var i = 0; i < oldBuffer.length; i++) { |
michael@0 | 754 | this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]); |
michael@0 | 755 | } |
michael@0 | 756 | |
michael@0 | 757 | // Handle half-close |
michael@0 | 758 | if (this._sinkBuffer.length === 0 && this._closedBy.server) { |
michael@0 | 759 | this._handleClose(); |
michael@0 | 760 | } |
michael@0 | 761 | |
michael@0 | 762 | if (legacy) this.emit('drain'); |
michael@0 | 763 | }; |
michael@0 | 764 | |
michael@0 | 765 | // |
michael@0 | 766 | // ### function _writeData (fin, buffer, cb) |
michael@0 | 767 | // #### @fin {Boolean} |
michael@0 | 768 | // #### @buffer {Buffer} |
michael@0 | 769 | // #### @cb {Function} **optional** |
michael@0 | 770 | // Internal function |
michael@0 | 771 | // |
michael@0 | 772 | Stream.prototype._writeData = function _writeData(fin, buffer, cb) { |
michael@0 | 773 | if (this._framer.version === 3) { |
michael@0 | 774 | // Window was exhausted, queue data |
michael@0 | 775 | if (this._sinkSize <= 0) { |
michael@0 | 776 | this._sinkBuffer.push([fin, buffer, cb]); |
michael@0 | 777 | return false; |
michael@0 | 778 | } |
michael@0 | 779 | |
michael@0 | 780 | var len = Math.min(this._sinkSize, buffer.length); |
michael@0 | 781 | this._sinkSize -= len; |
michael@0 | 782 | |
michael@0 | 783 | // Only partial write is possible, queue rest for later |
michael@0 | 784 | if (len < buffer.length) { |
michael@0 | 785 | this._sinkBuffer.push([fin, buffer.slice(len)]); |
michael@0 | 786 | buffer = buffer.slice(0, len); |
michael@0 | 787 | fin = false; |
michael@0 | 788 | } |
michael@0 | 789 | } |
michael@0 | 790 | |
michael@0 | 791 | this._lock(function() { |
michael@0 | 792 | var stream = this, |
michael@0 | 793 | frame = this._framer.dataFrame(this.id, fin, buffer); |
michael@0 | 794 | |
michael@0 | 795 | stream.connection.scheduler.schedule(stream, frame); |
michael@0 | 796 | stream.connection.scheduler.tick(); |
michael@0 | 797 | |
michael@0 | 798 | this._unlock(); |
michael@0 | 799 | |
michael@0 | 800 | if (cb) cb(); |
michael@0 | 801 | }); |
michael@0 | 802 | |
michael@0 | 803 | return true; |
michael@0 | 804 | }; |
michael@0 | 805 | |
michael@0 | 806 | // |
michael@0 | 807 | // ### function write (data, encoding) |
michael@0 | 808 | // #### @data {Buffer|String} data |
michael@0 | 809 | // #### @encoding {String} data encoding |
michael@0 | 810 | // Writes data to connection |
michael@0 | 811 | // |
michael@0 | 812 | Stream.prototype._write = function write(data, encoding, cb) { |
michael@0 | 813 | // Do not send data to new connections after GOAWAY |
michael@0 | 814 | if (this._isGoaway()) { |
michael@0 | 815 | if (cb) cb(); |
michael@0 | 816 | return false; |
michael@0 | 817 | } |
michael@0 | 818 | |
michael@0 | 819 | return this._writeData(false, data, cb); |
michael@0 | 820 | }; |
michael@0 | 821 | |
michael@0 | 822 | if (legacy) { |
michael@0 | 823 | Stream.prototype.write = function write(data, encoding, cb) { |
michael@0 | 824 | if (!Buffer.isBuffer(data)) { |
michael@0 | 825 | return this._write(new Buffer(data, encoding), null, cb); |
michael@0 | 826 | } else { |
michael@0 | 827 | return this._write(data, encoding, cb); |
michael@0 | 828 | } |
michael@0 | 829 | }; |
michael@0 | 830 | |
michael@0 | 831 | // |
michael@0 | 832 | // ### function end (data) |
michael@0 | 833 | // #### @data {Buffer|String} (optional) data to write before ending stream |
michael@0 | 834 | // #### @encoding {String} (optional) string encoding |
michael@0 | 835 | // Send FIN data frame |
michael@0 | 836 | // |
michael@0 | 837 | Stream.prototype.end = function end(data, encoding) { |
michael@0 | 838 | // Do not send data to new connections after GOAWAY |
michael@0 | 839 | if (this._isGoaway()) return; |
michael@0 | 840 | |
michael@0 | 841 | if (data) this.write(data, encoding); |
michael@0 | 842 | this.emit('finish'); |
michael@0 | 843 | }; |
michael@0 | 844 | } |
michael@0 | 845 | |
michael@0 | 846 | // |
michael@0 | 847 | // ### function _recv (data) |
michael@0 | 848 | // #### @data {Buffer} buffer to receive |
michael@0 | 849 | // #### @chunked {Boolean} |
michael@0 | 850 | // (internal) |
michael@0 | 851 | // |
michael@0 | 852 | Stream.prototype._recv = function _recv(data, chunked) { |
michael@0 | 853 | // Update window if exhausted |
michael@0 | 854 | if (!chunked && this._framer.version >= 3 && this._initialized) { |
michael@0 | 855 | this._windowSize -= data.length; |
michael@0 | 856 | |
michael@0 | 857 | if (this._windowSize <= 0) { |
michael@0 | 858 | var delta = this._initialWindowSize - this._windowSize; |
michael@0 | 859 | this._windowSize += delta; |
michael@0 | 860 | this.connection.write(this._framer.windowUpdateFrame(this.id, delta)); |
michael@0 | 861 | } |
michael@0 | 862 | } |
michael@0 | 863 | |
michael@0 | 864 | // Emulate chunked encoding |
michael@0 | 865 | if (this._forceChunked && !chunked) { |
michael@0 | 866 | // Zero-chunks are treated as end, do not emit them |
michael@0 | 867 | if (data.length === 0) return; |
michael@0 | 868 | |
michael@0 | 869 | this._recv(new Buffer(data.length.toString(16)), true); |
michael@0 | 870 | this._recv(crlf, true); |
michael@0 | 871 | this._recv(data, true); |
michael@0 | 872 | this._recv(crlf, true); |
michael@0 | 873 | return; |
michael@0 | 874 | } |
michael@0 | 875 | |
michael@0 | 876 | if (legacy) { |
michael@0 | 877 | var self = this; |
michael@0 | 878 | process.nextTick(function() { |
michael@0 | 879 | self.emit('data', data); |
michael@0 | 880 | if (self.ondata) { |
michael@0 | 881 | self.ondata(data, 0, data.length); |
michael@0 | 882 | } |
michael@0 | 883 | }); |
michael@0 | 884 | } else { |
michael@0 | 885 | // Right now, http module expects socket to be working in streams1 mode. |
michael@0 | 886 | if (this.ondata) { |
michael@0 | 887 | this.ondata(data, 0, data.length); |
michael@0 | 888 | } else { |
michael@0 | 889 | this.push(data); |
michael@0 | 890 | } |
michael@0 | 891 | } |
michael@0 | 892 | }; |
michael@0 | 893 | |
michael@0 | 894 | // |
michael@0 | 895 | // ### function _read (bytes, cb) |
michael@0 | 896 | // #### @bytes {Number} number of bytes to read |
michael@0 | 897 | // Streams2 API |
michael@0 | 898 | // |
michael@0 | 899 | Stream.prototype._read = function read(bytes) { |
michael@0 | 900 | // NOP |
michael@0 | 901 | }; |
michael@0 | 902 | |
michael@0 | 903 | // |
michael@0 | 904 | // ### function _updateSinkSize (size) |
michael@0 | 905 | // #### @size {Integer} |
michael@0 | 906 | // Update the internal data transfer window |
michael@0 | 907 | // |
michael@0 | 908 | Stream.prototype._updateSinkSize = function _updateSinkSize(size) { |
michael@0 | 909 | var diff = size - this._initialSinkSize; |
michael@0 | 910 | |
michael@0 | 911 | this._initialSinkSize = size; |
michael@0 | 912 | this._drainSink(diff); |
michael@0 | 913 | }; |
michael@0 | 914 | |
michael@0 | 915 | // |
michael@0 | 916 | // `net` compatibility layer |
michael@0 | 917 | // (Copy pasted from lib/tls.js from node.js) |
michael@0 | 918 | // |
michael@0 | 919 | Stream.prototype.address = function address() { |
michael@0 | 920 | return this.socket && this.socket.address(); |
michael@0 | 921 | }; |
michael@0 | 922 | |
michael@0 | 923 | Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() { |
michael@0 | 924 | return this.socket && this.socket.remoteAddress; |
michael@0 | 925 | }); |
michael@0 | 926 | |
michael@0 | 927 | Stream.prototype.__defineGetter__('remotePort', function remotePort() { |
michael@0 | 928 | return this.socket && this.socket.remotePort; |
michael@0 | 929 | }); |