|
1 var spdy = require('../spdy'), |
|
2 util = require('util'), |
|
3 https = require('https'), |
|
4 stream = require('stream'), |
|
5 Buffer = require('buffer').Buffer; |
|
6 |
|
7 var crlf = new Buffer('\r\n'); |
|
8 var last_frag = new Buffer('0\r\n\r\n'); |
|
9 |
|
10 var legacy = !stream.Duplex; |
|
11 |
|
12 if (legacy) { |
|
13 var DuplexStream = stream; |
|
14 } else { |
|
15 var DuplexStream = stream.Duplex; |
|
16 } |
|
17 |
|
18 // |
|
19 // ### function instantiate (HTTPSServer) |
|
20 // #### @HTTPSServer {https.Server|Function} Base server class |
|
21 // Will return constructor for SPDY Server, based on the HTTPSServer class |
|
22 // |
|
23 function instantiate(HTTPSServer) { |
|
24 // |
|
25 // ### function Server (options, requestListener) |
|
26 // #### @options {Object} tls server options |
|
27 // #### @requestListener {Function} (optional) request callback |
|
28 // SPDY Server @constructor |
|
29 // |
|
30 function Server(options, requestListener) { |
|
31 // Initialize |
|
32 this._init(HTTPSServer, options, requestListener); |
|
33 |
|
34 // Wrap connection handler |
|
35 this._wrap(); |
|
36 }; |
|
37 util.inherits(Server, HTTPSServer); |
|
38 |
|
39 // Copy prototype methods |
|
40 Object.keys(proto).forEach(function(key) { |
|
41 this[key] = proto[key]; |
|
42 }, Server.prototype); |
|
43 |
|
44 return Server; |
|
45 } |
|
46 exports.instantiate = instantiate; |
|
47 |
|
48 // Common prototype for all servers |
|
49 var proto = {}; |
|
50 |
|
51 // |
|
52 // ### function _init(base, options, listener) |
|
53 // #### @base {Function} (optional) base server class (https.Server) |
|
54 // #### @options {Object} tls server options |
|
55 // #### @handler {Function} (optional) request handler |
|
56 // Initializer. |
|
57 // |
|
58 proto._init = function _init(base, options, handler) { |
|
59 var state = {}; |
|
60 this._spdyState = state; |
|
61 |
|
62 if (!options) options = {}; |
|
63 if (!options.maxStreams) options.maxStreams = 100; |
|
64 if (!options.sinkSize) { |
|
65 options.sinkSize = 1 << 16; |
|
66 } |
|
67 if (!options.windowSize) { |
|
68 options.windowSize = 1 << 20; // 1mb |
|
69 } |
|
70 |
|
71 options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; |
|
72 options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0']; |
|
73 state.options = options; |
|
74 state.reqHandler = handler; |
|
75 |
|
76 if (options.plain && !options.ssl) { |
|
77 base.call(this, handler); |
|
78 } else { |
|
79 base.call(this, options, handler); |
|
80 } |
|
81 |
|
82 // Use https if NPN is not supported |
|
83 if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) { |
|
84 return; |
|
85 } |
|
86 }; |
|
87 |
|
88 // |
|
89 // ### function _wrap() |
|
90 // Wrap connection handler and add logic. |
|
91 // |
|
92 proto._wrap = function _wrap() { |
|
93 var self = this, |
|
94 state = this._spdyState; |
|
95 |
|
96 // Wrap connection handler |
|
97 var event = state.options.plain && !state.options.ssl ? 'connection' : |
|
98 'secureConnection', |
|
99 handler = this.listeners(event)[0]; |
|
100 |
|
101 state.pool = spdy.zlibpool.create(); |
|
102 state.handler = handler; |
|
103 |
|
104 this.removeAllListeners(event); |
|
105 |
|
106 // Normal mode, use NPN to fallback to HTTPS |
|
107 if (!state.options.plain) { |
|
108 return this.on(event, this._onConnection.bind(this)); |
|
109 } |
|
110 |
|
111 // In case of plain connection, we must fallback to HTTPS if first byte |
|
112 // is not equal to 0x80. |
|
113 this.on(event, function(socket) { |
|
114 var history = [], |
|
115 _emit = socket.emit; |
|
116 |
|
117 // Add 'data' listener, otherwise 'data' events won't be emitted |
|
118 if (legacy) { |
|
119 function ondata() {}; |
|
120 socket.once('data', ondata); |
|
121 } |
|
122 |
|
123 // 2 minutes timeout, as http.js does by default |
|
124 socket.setTimeout(self.timeout || 2 * 60 * 1000); |
|
125 |
|
126 socket.emit = function emit(event, data) { |
|
127 history.push(Array.prototype.slice.call(arguments)); |
|
128 |
|
129 if (event === 'data') { |
|
130 // Legacy |
|
131 onFirstByte.call(socket, data); |
|
132 } else if (event === 'readable') { |
|
133 // Streams |
|
134 onReadable.call(socket); |
|
135 } else if (event === 'end' || |
|
136 event === 'close' || |
|
137 event === 'error' || |
|
138 event === 'timeout') { |
|
139 // We shouldn't get there if any data was received |
|
140 fail(); |
|
141 } |
|
142 }; |
|
143 |
|
144 function fail() { |
|
145 socket.emit = _emit; |
|
146 history = null; |
|
147 try { |
|
148 socket.destroy(); |
|
149 } catch (e) { |
|
150 } |
|
151 } |
|
152 |
|
153 function restore() { |
|
154 var copy = history.slice(); |
|
155 history = null; |
|
156 |
|
157 if (legacy) socket.removeListener('data', ondata); |
|
158 socket.emit = _emit; |
|
159 for (var i = 0; i < copy.length; i++) { |
|
160 socket.emit.apply(socket, copy[i]); |
|
161 if (copy[i][0] === 'end') { |
|
162 if (socket.onend) socket.onend(); |
|
163 } |
|
164 } |
|
165 } |
|
166 |
|
167 function onFirstByte(data) { |
|
168 // Ignore empty packets |
|
169 if (data.length === 0) return; |
|
170 |
|
171 if (data[0] === 0x80) { |
|
172 self._onConnection(socket); |
|
173 } else { |
|
174 handler.call(self, socket); |
|
175 } |
|
176 |
|
177 // Fire events |
|
178 restore(); |
|
179 |
|
180 // NOTE: If we came there - .ondata() will be called anyway in this tick, |
|
181 // so there're no need to call it manually |
|
182 }; |
|
183 |
|
184 if (!legacy) { |
|
185 // Hack to make streams2 work properly |
|
186 socket.on('readable', onReadable); |
|
187 } |
|
188 |
|
189 function onReadable() { |
|
190 var data = socket.read(1); |
|
191 |
|
192 // Ignore empty packets |
|
193 if (!data) return; |
|
194 socket.removeListener('readable', onReadable); |
|
195 |
|
196 // `.unshift()` emits `readable` event. Thus `emit` method should |
|
197 // be restored before calling it. |
|
198 socket.emit = _emit; |
|
199 |
|
200 // Put packet back where it was before |
|
201 socket.unshift(data); |
|
202 |
|
203 if (data[0] === 0x80) { |
|
204 self._onConnection(socket); |
|
205 } else { |
|
206 handler.call(self, socket); |
|
207 } |
|
208 |
|
209 // Fire events |
|
210 restore(); |
|
211 |
|
212 if (socket.ondata) { |
|
213 data = socket.read(socket._readableState.length); |
|
214 if (data) socket.ondata(data, 0, data.length); |
|
215 } |
|
216 } |
|
217 }); |
|
218 }; |
|
219 |
|
220 // |
|
221 // ### function _onConnection (socket) |
|
222 // #### @socket {Stream} incoming socket |
|
223 // Server's connection handler wrapper. |
|
224 // |
|
225 proto._onConnection = function _onConnection(socket) { |
|
226 var self = this, |
|
227 state = this._spdyState; |
|
228 |
|
229 // Fallback to HTTPS if needed |
|
230 var selectedProtocol = socket.npnProtocol || socket.alpnProtocol; |
|
231 if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) && |
|
232 !state.options.debug && !state.options.plain) { |
|
233 return state.handler.call(this, socket); |
|
234 } |
|
235 |
|
236 // Wrap incoming socket into abstract class |
|
237 var connection = new Connection(socket, state.pool, state.options); |
|
238 |
|
239 // Emulate each stream like connection |
|
240 connection.on('stream', state.handler); |
|
241 |
|
242 connection.on('connect', function onconnect(req, socket) { |
|
243 socket.streamID = req.streamID = req.socket.id; |
|
244 socket.isSpdy = req.isSpdy = true; |
|
245 socket.spdyVersion = req.spdyVersion = req.socket.version; |
|
246 |
|
247 socket.once('finish', function onfinish() { |
|
248 req.connection.end(); |
|
249 }); |
|
250 |
|
251 self.emit('connect', req, socket); |
|
252 }); |
|
253 |
|
254 connection.on('request', function onrequest(req, res) { |
|
255 res._renderHeaders = spdy.response._renderHeaders; |
|
256 res.writeHead = spdy.response.writeHead; |
|
257 res.push = spdy.response.push; |
|
258 res.streamID = req.streamID = req.socket.id; |
|
259 res.spdyVersion = req.spdyVersion = req.socket.version; |
|
260 res.isSpdy = req.isSpdy = true; |
|
261 |
|
262 // Chunked encoding is not supported in SPDY |
|
263 res.useChunkedEncodingByDefault = false; |
|
264 |
|
265 res.once('finish', function onfinish() { |
|
266 req.connection.end(); |
|
267 }); |
|
268 |
|
269 self.emit('request', req, res); |
|
270 }); |
|
271 |
|
272 connection.on('error', function onerror(e) { |
|
273 console.log('[secureConnection] error ' + e); |
|
274 socket.destroy(e.errno === 'EPIPE' ? undefined : e); |
|
275 }); |
|
276 }; |
|
277 |
|
278 // Export Server instantiated from https.Server |
|
279 var Server = instantiate(https.Server); |
|
280 exports.Server = Server; |
|
281 |
|
282 // |
|
283 // ### function create (base, options, requestListener) |
|
284 // #### @base {Function} (optional) base server class (https.Server) |
|
285 // #### @options {Object} tls server options |
|
286 // #### @requestListener {Function} (optional) request callback |
|
287 // @constructor wrapper |
|
288 // |
|
289 exports.create = function create(base, options, requestListener) { |
|
290 var server; |
|
291 if (typeof base === 'function') { |
|
292 server = instantiate(base); |
|
293 } else { |
|
294 server = Server; |
|
295 |
|
296 requestListener = options; |
|
297 options = base; |
|
298 base = null; |
|
299 } |
|
300 |
|
301 return new server(options, requestListener); |
|
302 }; |
|
303 |
|
304 // |
|
305 // ### function Connection (socket, pool, options) |
|
306 // #### @socket {net.Socket} server's connection |
|
307 // #### @pool {spdy.ZlibPool} zlib pool |
|
308 // #### @options {Object} server's options |
|
309 // Abstract connection @constructor |
|
310 // |
|
311 function Connection(socket, pool, options) { |
|
312 process.EventEmitter.call(this); |
|
313 |
|
314 var self = this; |
|
315 |
|
316 this._closed = false; |
|
317 |
|
318 this.pool = pool; |
|
319 var pair = null; |
|
320 |
|
321 this._deflate = null; |
|
322 this._inflate = null; |
|
323 |
|
324 this.encrypted = socket.encrypted; |
|
325 |
|
326 // Init streams list |
|
327 this.streams = {}; |
|
328 this.streamsCount = 0; |
|
329 this.pushId = 0; |
|
330 this._goaway = false; |
|
331 |
|
332 this._framer = null; |
|
333 |
|
334 // Data transfer window defaults to 64kb |
|
335 this.windowSize = options.windowSize; |
|
336 this.sinkSize = options.sinkSize; |
|
337 |
|
338 // Initialize scheduler |
|
339 this.scheduler = spdy.scheduler.create(this); |
|
340 |
|
341 // Store socket and pipe it to parser |
|
342 this.socket = socket; |
|
343 |
|
344 // Initialize parser |
|
345 this.parser = spdy.parser.create(this); |
|
346 this.parser.on('frame', function (frame) { |
|
347 if (this._closed) return; |
|
348 |
|
349 var stream; |
|
350 |
|
351 // Create new stream |
|
352 if (frame.type === 'SYN_STREAM') { |
|
353 self.streamsCount++; |
|
354 |
|
355 stream = self.streams[frame.id] = new Stream(self, frame); |
|
356 |
|
357 // If we reached stream limit |
|
358 if (self.streamsCount > options.maxStreams) { |
|
359 stream.once('error', function onerror() {}); |
|
360 // REFUSED_STREAM |
|
361 stream._rstCode = 3; |
|
362 stream.destroy(true); |
|
363 } else { |
|
364 self.emit('stream', stream); |
|
365 |
|
366 stream._init(); |
|
367 } |
|
368 } else { |
|
369 if (frame.id) { |
|
370 // Load created one |
|
371 stream = self.streams[frame.id]; |
|
372 |
|
373 // Fail if not found |
|
374 if (stream === undefined) { |
|
375 if (frame.type === 'RST_STREAM') return; |
|
376 self.write(self._framer.rstFrame(frame.id, 2)); |
|
377 return; |
|
378 } |
|
379 } |
|
380 |
|
381 // Emit 'data' event |
|
382 if (frame.type === 'DATA') { |
|
383 if (frame.data.length > 0){ |
|
384 if (stream._closedBy.client) { |
|
385 stream._rstCode = 2; |
|
386 stream.emit('error', 'Writing to half-closed stream'); |
|
387 } else { |
|
388 stream._recv(frame.data); |
|
389 } |
|
390 } |
|
391 // Destroy stream if we was asked to do this |
|
392 } else if (frame.type === 'RST_STREAM') { |
|
393 stream._rstCode = 0; |
|
394 if (frame.status === 5) { |
|
395 // If client "cancels" connection - close stream and |
|
396 // all associated push streams without error |
|
397 stream.pushes.forEach(function(stream) { |
|
398 stream.close(); |
|
399 }); |
|
400 stream.close(); |
|
401 } else { |
|
402 // Emit error on destroy |
|
403 stream.destroy(new Error('Received rst: ' + frame.status)); |
|
404 } |
|
405 // Respond with same PING |
|
406 } else if (frame.type === 'PING') { |
|
407 self.write(self._framer.pingFrame(frame.pingId)); |
|
408 } else if (frame.type === 'SETTINGS') { |
|
409 self._setDefaultWindow(frame.settings); |
|
410 } else if (frame.type === 'GOAWAY') { |
|
411 self._goaway = frame.lastId; |
|
412 } else if (frame.type === 'WINDOW_UPDATE') { |
|
413 stream._drainSink(frame.delta); |
|
414 } else { |
|
415 console.error('Unknown type: ', frame.type); |
|
416 } |
|
417 } |
|
418 |
|
419 // Handle half-closed |
|
420 if (frame.fin) { |
|
421 // Don't allow to close stream twice |
|
422 if (stream._closedBy.client) { |
|
423 stream._rstCode = 2; |
|
424 stream.emit('error', 'Already half-closed'); |
|
425 } else { |
|
426 stream._closedBy.client = true; |
|
427 |
|
428 // Emulate last chunked fragment |
|
429 if (stream._forceChunked) { |
|
430 stream._recv(last_frag, true); |
|
431 } |
|
432 |
|
433 stream._handleClose(); |
|
434 } |
|
435 } |
|
436 }); |
|
437 |
|
438 this.parser.on('version', function onversion(version) { |
|
439 if (!pair) { |
|
440 pair = pool.get('spdy/' + version); |
|
441 self._deflate = pair.deflate; |
|
442 self._inflate = pair.inflate; |
|
443 } |
|
444 }); |
|
445 |
|
446 this.parser.on('framer', function onframer(framer) { |
|
447 // Generate custom settings frame and send |
|
448 self.write(framer.settingsFrame(options)); |
|
449 }); |
|
450 |
|
451 // Propagate parser errors |
|
452 this.parser.on('error', function onParserError(err) { |
|
453 self.emit('error', err); |
|
454 }); |
|
455 |
|
456 socket.pipe(this.parser); |
|
457 |
|
458 // 2 minutes socket timeout |
|
459 socket.setTimeout(2 * 60 * 1000); |
|
460 socket.once('timeout', function ontimeout() { |
|
461 socket.destroy(); |
|
462 }); |
|
463 |
|
464 // Allow high-level api to catch socket errors |
|
465 socket.on('error', function onSocketError(e) { |
|
466 self.emit('error', e); |
|
467 }); |
|
468 |
|
469 socket.once('close', function onclose() { |
|
470 self._closed = true; |
|
471 if (pair) pool.put(pair); |
|
472 }); |
|
473 |
|
474 if (legacy) { |
|
475 socket.on('drain', function ondrain() { |
|
476 self.emit('drain'); |
|
477 }); |
|
478 } |
|
479 } |
|
480 util.inherits(Connection, process.EventEmitter); |
|
481 exports.Connection = Connection; |
|
482 |
|
483 // |
|
484 // ### function write (data, encoding) |
|
485 // #### @data {String|Buffer} data |
|
486 // #### @encoding {String} (optional) encoding |
|
487 // Writes data to socket |
|
488 // |
|
489 Connection.prototype.write = function write(data, encoding) { |
|
490 if (this.socket.writable) { |
|
491 return this.socket.write(data, encoding); |
|
492 } |
|
493 }; |
|
494 |
|
495 // |
|
496 // ### function _setDefaultWindow (settings) |
|
497 // #### @settings {Object} |
|
498 // Update the default transfer window -- in the connection and in the |
|
499 // active streams |
|
500 // |
|
501 Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) { |
|
502 if (!settings) return; |
|
503 if (!settings.initial_window_size || |
|
504 settings.initial_window_size.persisted) { |
|
505 return; |
|
506 } |
|
507 |
|
508 this.sinkSize = settings.initial_window_size.value; |
|
509 |
|
510 Object.keys(this.streams).forEach(function(id) { |
|
511 this.streams[id]._updateSinkSize(settings.initial_window_size.value); |
|
512 }, this); |
|
513 }; |
|
514 |
|
515 // |
|
516 // ### function Stream (connection, frame) |
|
517 // #### @connection {Connection} SPDY Connection |
|
518 // #### @frame {Object} SYN_STREAM data |
|
519 // Abstract stream @constructor |
|
520 // |
|
521 function Stream(connection, frame) { |
|
522 DuplexStream.call(this); |
|
523 |
|
524 this.connection = connection; |
|
525 this.socket = connection.socket; |
|
526 this.encrypted = connection.encrypted; |
|
527 this._framer = connection._framer; |
|
528 this._initialized = false; |
|
529 |
|
530 // Should chunked encoding be forced |
|
531 this._forceChunked = false; |
|
532 |
|
533 this.ondata = this.onend = null; |
|
534 |
|
535 // RST_STREAM code if any |
|
536 this._rstCode = 1; |
|
537 this._destroyed = false; |
|
538 |
|
539 this._closedBy = { |
|
540 client: false, |
|
541 server: false |
|
542 }; |
|
543 |
|
544 // Lock data |
|
545 this._locked = false; |
|
546 this._lockBuffer = []; |
|
547 |
|
548 // Store id |
|
549 this.id = frame.id; |
|
550 this.version = frame.version; |
|
551 |
|
552 // Store priority |
|
553 this.priority = frame.priority; |
|
554 |
|
555 // Array of push streams associated to that one |
|
556 this.pushes = []; |
|
557 |
|
558 // How much data can be sent TO client before next WINDOW_UPDATE |
|
559 this._sinkSize = connection.sinkSize; |
|
560 this._initialSinkSize = connection.sinkSize; |
|
561 |
|
562 // When data needs to be send, but window is too small for it - it'll be |
|
563 // queued in this buffer |
|
564 this._sinkBuffer = []; |
|
565 |
|
566 // How much data can be sent BY client before next WINDOW_UPDATE |
|
567 this._initialWindowSize = connection.windowSize; |
|
568 this._windowSize = connection.windowSize; |
|
569 |
|
570 // Create compression streams |
|
571 this._deflate = connection._deflate; |
|
572 this._inflate = connection._inflate; |
|
573 |
|
574 // Store headers |
|
575 this.headers = frame.headers; |
|
576 this.url = frame.url; |
|
577 |
|
578 this._frame = frame; |
|
579 |
|
580 if (legacy) { |
|
581 this.readable = this.writable = true; |
|
582 } |
|
583 |
|
584 // Call .onend() |
|
585 this.once('end', function() { |
|
586 var self = this; |
|
587 process.nextTick(function() { |
|
588 if (self.onend) self.onend(); |
|
589 }); |
|
590 }); |
|
591 |
|
592 // Handle half-close |
|
593 this.once('finish', function() { |
|
594 this._writeData(true, []); |
|
595 this._closedBy.server = true; |
|
596 if (this._sinkBuffer.length !== 0) return; |
|
597 this._handleClose(); |
|
598 }); |
|
599 }; |
|
600 util.inherits(Stream, DuplexStream); |
|
601 exports.Stream = Stream; |
|
602 |
|
603 if (legacy) { |
|
604 Stream.prototype.pause = function pause() {}; |
|
605 Stream.prototype.resume = function resume() {}; |
|
606 } |
|
607 |
|
608 // |
|
609 // ### function _isGoaway () |
|
610 // Returns true if any writes to that stream should be ignored |
|
611 // |
|
612 Stream.prototype._isGoaway = function _isGoaway() { |
|
613 return this.connection._goaway && this.id > this.connection._goaway; |
|
614 }; |
|
615 |
|
616 // |
|
617 // ### function init () |
|
618 // Initialize stream, internal |
|
619 // |
|
620 Stream.prototype._init = function init() { |
|
621 var headers = this.headers, |
|
622 req = [headers.method + ' ' + this.url + ' ' + headers.version]; |
|
623 |
|
624 Object.keys(headers).forEach(function (key) { |
|
625 if (key !== 'method' && key !== 'url' && key !== 'version' && |
|
626 key !== 'scheme') { |
|
627 req.push(key + ': ' + headers[key]); |
|
628 } |
|
629 }); |
|
630 |
|
631 // Force chunked encoding |
|
632 if (!headers['content-length'] && !headers['transfer-encoding']) { |
|
633 req.push('Transfer-Encoding: chunked'); |
|
634 this._forceChunked = true; |
|
635 } |
|
636 |
|
637 // Add '\r\n\r\n' |
|
638 req.push('', ''); |
|
639 |
|
640 req = new Buffer(req.join('\r\n')); |
|
641 |
|
642 this._recv(req, true); |
|
643 this._initialized = true; |
|
644 }; |
|
645 |
|
646 // |
|
647 // ### function lock (callback) |
|
648 // #### @callback {Function} continuation callback |
|
649 // Acquire lock |
|
650 // |
|
651 Stream.prototype._lock = function lock(callback) { |
|
652 if (!callback) return; |
|
653 |
|
654 if (this._locked) { |
|
655 this._lockBuffer.push(callback); |
|
656 } else { |
|
657 this._locked = true; |
|
658 callback.call(this, null); |
|
659 } |
|
660 }; |
|
661 |
|
662 // |
|
663 // ### function unlock () |
|
664 // Release lock and call all buffered callbacks |
|
665 // |
|
666 Stream.prototype._unlock = function unlock() { |
|
667 if (this._locked) { |
|
668 this._locked = false; |
|
669 this._lock(this._lockBuffer.shift()); |
|
670 } |
|
671 }; |
|
672 |
|
673 // |
|
674 // ### function setTimeout () |
|
675 // TODO: use timers.enroll, timers.active, timers.unenroll |
|
676 // |
|
677 Stream.prototype.setTimeout = function setTimeout(time) {}; |
|
678 |
|
679 // |
|
680 // ### function _handleClose () |
|
681 // Close stream if it was closed by both server and client |
|
682 // |
|
683 Stream.prototype._handleClose = function _handleClose() { |
|
684 if (this._closedBy.client && this._closedBy.server) { |
|
685 this.close(); |
|
686 } |
|
687 }; |
|
688 |
|
689 // |
|
690 // ### function close () |
|
691 // Destroys stream |
|
692 // |
|
693 Stream.prototype.close = function close() { |
|
694 this.destroy(); |
|
695 }; |
|
696 |
|
697 // |
|
698 // ### function destroy (error) |
|
699 // #### @error {Error} (optional) error |
|
700 // Destroys stream |
|
701 // |
|
702 Stream.prototype.destroy = function destroy(error) { |
|
703 if (this._destroyed) return; |
|
704 this._destroyed = true; |
|
705 |
|
706 delete this.connection.streams[this.id]; |
|
707 if (this.id % 2 === 1) { |
|
708 this.connection.streamsCount--; |
|
709 } |
|
710 |
|
711 // If stream is not finished, RST frame should be sent to notify client |
|
712 // about sudden stream termination. |
|
713 if (error || !this._closedBy.server) { |
|
714 // REFUSED_STREAM if terminated before 'finish' event |
|
715 if (!this._closedBy.server) this._rstCode = 3; |
|
716 |
|
717 if (this._rstCode) { |
|
718 this._lock(function() { |
|
719 this.connection.scheduler.schedule( |
|
720 this, |
|
721 this._framer.rstFrame(this.id, this._rstCode)); |
|
722 this.connection.scheduler.tick(); |
|
723 |
|
724 this._unlock(); |
|
725 }); |
|
726 } |
|
727 } |
|
728 |
|
729 if (legacy) { |
|
730 this.emit('end'); |
|
731 } else { |
|
732 this.push(null); |
|
733 } |
|
734 |
|
735 if (error) this.emit('error', error); |
|
736 |
|
737 var self = this; |
|
738 process.nextTick(function() { |
|
739 self.emit('close', !!error); |
|
740 }); |
|
741 }; |
|
742 |
|
743 Stream.prototype.destroySoon = function destroySoon(error) { |
|
744 return this.destroy(error); |
|
745 }; |
|
746 |
|
747 Stream.prototype._drainSink = function _drainSink(size) { |
|
748 var oldBuffer = this._sinkBuffer; |
|
749 this._sinkBuffer = []; |
|
750 |
|
751 this._sinkSize += size; |
|
752 |
|
753 for (var i = 0; i < oldBuffer.length; i++) { |
|
754 this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]); |
|
755 } |
|
756 |
|
757 // Handle half-close |
|
758 if (this._sinkBuffer.length === 0 && this._closedBy.server) { |
|
759 this._handleClose(); |
|
760 } |
|
761 |
|
762 if (legacy) this.emit('drain'); |
|
763 }; |
|
764 |
|
765 // |
|
766 // ### function _writeData (fin, buffer, cb) |
|
767 // #### @fin {Boolean} |
|
768 // #### @buffer {Buffer} |
|
769 // #### @cb {Function} **optional** |
|
770 // Internal function |
|
771 // |
|
772 Stream.prototype._writeData = function _writeData(fin, buffer, cb) { |
|
773 if (this._framer.version === 3) { |
|
774 // Window was exhausted, queue data |
|
775 if (this._sinkSize <= 0) { |
|
776 this._sinkBuffer.push([fin, buffer, cb]); |
|
777 return false; |
|
778 } |
|
779 |
|
780 var len = Math.min(this._sinkSize, buffer.length); |
|
781 this._sinkSize -= len; |
|
782 |
|
783 // Only partial write is possible, queue rest for later |
|
784 if (len < buffer.length) { |
|
785 this._sinkBuffer.push([fin, buffer.slice(len)]); |
|
786 buffer = buffer.slice(0, len); |
|
787 fin = false; |
|
788 } |
|
789 } |
|
790 |
|
791 this._lock(function() { |
|
792 var stream = this, |
|
793 frame = this._framer.dataFrame(this.id, fin, buffer); |
|
794 |
|
795 stream.connection.scheduler.schedule(stream, frame); |
|
796 stream.connection.scheduler.tick(); |
|
797 |
|
798 this._unlock(); |
|
799 |
|
800 if (cb) cb(); |
|
801 }); |
|
802 |
|
803 return true; |
|
804 }; |
|
805 |
|
806 // |
|
807 // ### function write (data, encoding) |
|
808 // #### @data {Buffer|String} data |
|
809 // #### @encoding {String} data encoding |
|
810 // Writes data to connection |
|
811 // |
|
812 Stream.prototype._write = function write(data, encoding, cb) { |
|
813 // Do not send data to new connections after GOAWAY |
|
814 if (this._isGoaway()) { |
|
815 if (cb) cb(); |
|
816 return false; |
|
817 } |
|
818 |
|
819 return this._writeData(false, data, cb); |
|
820 }; |
|
821 |
|
822 if (legacy) { |
|
823 Stream.prototype.write = function write(data, encoding, cb) { |
|
824 if (!Buffer.isBuffer(data)) { |
|
825 return this._write(new Buffer(data, encoding), null, cb); |
|
826 } else { |
|
827 return this._write(data, encoding, cb); |
|
828 } |
|
829 }; |
|
830 |
|
831 // |
|
832 // ### function end (data) |
|
833 // #### @data {Buffer|String} (optional) data to write before ending stream |
|
834 // #### @encoding {String} (optional) string encoding |
|
835 // Send FIN data frame |
|
836 // |
|
837 Stream.prototype.end = function end(data, encoding) { |
|
838 // Do not send data to new connections after GOAWAY |
|
839 if (this._isGoaway()) return; |
|
840 |
|
841 if (data) this.write(data, encoding); |
|
842 this.emit('finish'); |
|
843 }; |
|
844 } |
|
845 |
|
846 // |
|
847 // ### function _recv (data) |
|
848 // #### @data {Buffer} buffer to receive |
|
849 // #### @chunked {Boolean} |
|
850 // (internal) |
|
851 // |
|
852 Stream.prototype._recv = function _recv(data, chunked) { |
|
853 // Update window if exhausted |
|
854 if (!chunked && this._framer.version >= 3 && this._initialized) { |
|
855 this._windowSize -= data.length; |
|
856 |
|
857 if (this._windowSize <= 0) { |
|
858 var delta = this._initialWindowSize - this._windowSize; |
|
859 this._windowSize += delta; |
|
860 this.connection.write(this._framer.windowUpdateFrame(this.id, delta)); |
|
861 } |
|
862 } |
|
863 |
|
864 // Emulate chunked encoding |
|
865 if (this._forceChunked && !chunked) { |
|
866 // Zero-chunks are treated as end, do not emit them |
|
867 if (data.length === 0) return; |
|
868 |
|
869 this._recv(new Buffer(data.length.toString(16)), true); |
|
870 this._recv(crlf, true); |
|
871 this._recv(data, true); |
|
872 this._recv(crlf, true); |
|
873 return; |
|
874 } |
|
875 |
|
876 if (legacy) { |
|
877 var self = this; |
|
878 process.nextTick(function() { |
|
879 self.emit('data', data); |
|
880 if (self.ondata) { |
|
881 self.ondata(data, 0, data.length); |
|
882 } |
|
883 }); |
|
884 } else { |
|
885 // Right now, http module expects socket to be working in streams1 mode. |
|
886 if (this.ondata) { |
|
887 this.ondata(data, 0, data.length); |
|
888 } else { |
|
889 this.push(data); |
|
890 } |
|
891 } |
|
892 }; |
|
893 |
|
894 // |
|
895 // ### function _read (bytes, cb) |
|
896 // #### @bytes {Number} number of bytes to read |
|
897 // Streams2 API |
|
898 // |
|
899 Stream.prototype._read = function read(bytes) { |
|
900 // NOP |
|
901 }; |
|
902 |
|
903 // |
|
904 // ### function _updateSinkSize (size) |
|
905 // #### @size {Integer} |
|
906 // Update the internal data transfer window |
|
907 // |
|
908 Stream.prototype._updateSinkSize = function _updateSinkSize(size) { |
|
909 var diff = size - this._initialSinkSize; |
|
910 |
|
911 this._initialSinkSize = size; |
|
912 this._drainSink(diff); |
|
913 }; |
|
914 |
|
915 // |
|
916 // `net` compatibility layer |
|
917 // (Copy pasted from lib/tls.js from node.js) |
|
918 // |
|
919 Stream.prototype.address = function address() { |
|
920 return this.socket && this.socket.address(); |
|
921 }; |
|
922 |
|
923 Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() { |
|
924 return this.socket && this.socket.remoteAddress; |
|
925 }); |
|
926 |
|
927 Stream.prototype.__defineGetter__('remotePort', function remotePort() { |
|
928 return this.socket && this.socket.remotePort; |
|
929 }); |