Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
1 var spdy = require('../spdy'),
2 util = require('util'),
3 https = require('https'),
4 stream = require('stream'),
5 Buffer = require('buffer').Buffer;
7 var crlf = new Buffer('\r\n');
8 var last_frag = new Buffer('0\r\n\r\n');
10 var legacy = !stream.Duplex;
12 if (legacy) {
13 var DuplexStream = stream;
14 } else {
15 var DuplexStream = stream.Duplex;
16 }
18 //
19 // ### function instantiate (HTTPSServer)
20 // #### @HTTPSServer {https.Server|Function} Base server class
21 // Will return constructor for SPDY Server, based on the HTTPSServer class
22 //
23 function instantiate(HTTPSServer) {
24 //
25 // ### function Server (options, requestListener)
26 // #### @options {Object} tls server options
27 // #### @requestListener {Function} (optional) request callback
28 // SPDY Server @constructor
29 //
30 function Server(options, requestListener) {
31 // Initialize
32 this._init(HTTPSServer, options, requestListener);
34 // Wrap connection handler
35 this._wrap();
36 };
37 util.inherits(Server, HTTPSServer);
39 // Copy prototype methods
40 Object.keys(proto).forEach(function(key) {
41 this[key] = proto[key];
42 }, Server.prototype);
44 return Server;
45 }
46 exports.instantiate = instantiate;
48 // Common prototype for all servers
49 var proto = {};
51 //
52 // ### function _init(base, options, listener)
53 // #### @base {Function} (optional) base server class (https.Server)
54 // #### @options {Object} tls server options
55 // #### @handler {Function} (optional) request handler
56 // Initializer.
57 //
58 proto._init = function _init(base, options, handler) {
59 var state = {};
60 this._spdyState = state;
62 if (!options) options = {};
63 if (!options.maxStreams) options.maxStreams = 100;
64 if (!options.sinkSize) {
65 options.sinkSize = 1 << 16;
66 }
67 if (!options.windowSize) {
68 options.windowSize = 1 << 20; // 1mb
69 }
71 options.NPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
72 options.ALPNProtocols = ['spdy/3', 'spdy/2', 'http/1.1', 'http/1.0'];
73 state.options = options;
74 state.reqHandler = handler;
76 if (options.plain && !options.ssl) {
77 base.call(this, handler);
78 } else {
79 base.call(this, options, handler);
80 }
82 // Use https if NPN is not supported
83 if (!process.features.tls_npn && !process.features.tls_alpn && !options.debug && !options.plain) {
84 return;
85 }
86 };
88 //
89 // ### function _wrap()
90 // Wrap connection handler and add logic.
91 //
92 proto._wrap = function _wrap() {
93 var self = this,
94 state = this._spdyState;
96 // Wrap connection handler
97 var event = state.options.plain && !state.options.ssl ? 'connection' :
98 'secureConnection',
99 handler = this.listeners(event)[0];
101 state.pool = spdy.zlibpool.create();
102 state.handler = handler;
104 this.removeAllListeners(event);
106 // Normal mode, use NPN to fallback to HTTPS
107 if (!state.options.plain) {
108 return this.on(event, this._onConnection.bind(this));
109 }
111 // In case of plain connection, we must fallback to HTTPS if first byte
112 // is not equal to 0x80.
113 this.on(event, function(socket) {
114 var history = [],
115 _emit = socket.emit;
117 // Add 'data' listener, otherwise 'data' events won't be emitted
118 if (legacy) {
119 function ondata() {};
120 socket.once('data', ondata);
121 }
123 // 2 minutes timeout, as http.js does by default
124 socket.setTimeout(self.timeout || 2 * 60 * 1000);
126 socket.emit = function emit(event, data) {
127 history.push(Array.prototype.slice.call(arguments));
129 if (event === 'data') {
130 // Legacy
131 onFirstByte.call(socket, data);
132 } else if (event === 'readable') {
133 // Streams
134 onReadable.call(socket);
135 } else if (event === 'end' ||
136 event === 'close' ||
137 event === 'error' ||
138 event === 'timeout') {
139 // We shouldn't get there if any data was received
140 fail();
141 }
142 };
144 function fail() {
145 socket.emit = _emit;
146 history = null;
147 try {
148 socket.destroy();
149 } catch (e) {
150 }
151 }
153 function restore() {
154 var copy = history.slice();
155 history = null;
157 if (legacy) socket.removeListener('data', ondata);
158 socket.emit = _emit;
159 for (var i = 0; i < copy.length; i++) {
160 socket.emit.apply(socket, copy[i]);
161 if (copy[i][0] === 'end') {
162 if (socket.onend) socket.onend();
163 }
164 }
165 }
167 function onFirstByte(data) {
168 // Ignore empty packets
169 if (data.length === 0) return;
171 if (data[0] === 0x80) {
172 self._onConnection(socket);
173 } else {
174 handler.call(self, socket);
175 }
177 // Fire events
178 restore();
180 // NOTE: If we came there - .ondata() will be called anyway in this tick,
181 // so there're no need to call it manually
182 };
184 if (!legacy) {
185 // Hack to make streams2 work properly
186 socket.on('readable', onReadable);
187 }
189 function onReadable() {
190 var data = socket.read(1);
192 // Ignore empty packets
193 if (!data) return;
194 socket.removeListener('readable', onReadable);
196 // `.unshift()` emits `readable` event. Thus `emit` method should
197 // be restored before calling it.
198 socket.emit = _emit;
200 // Put packet back where it was before
201 socket.unshift(data);
203 if (data[0] === 0x80) {
204 self._onConnection(socket);
205 } else {
206 handler.call(self, socket);
207 }
209 // Fire events
210 restore();
212 if (socket.ondata) {
213 data = socket.read(socket._readableState.length);
214 if (data) socket.ondata(data, 0, data.length);
215 }
216 }
217 });
218 };
220 //
221 // ### function _onConnection (socket)
222 // #### @socket {Stream} incoming socket
223 // Server's connection handler wrapper.
224 //
225 proto._onConnection = function _onConnection(socket) {
226 var self = this,
227 state = this._spdyState;
229 // Fallback to HTTPS if needed
230 var selectedProtocol = socket.npnProtocol || socket.alpnProtocol;
231 if ((!selectedProtocol || !selectedProtocol.match(/spdy/)) &&
232 !state.options.debug && !state.options.plain) {
233 return state.handler.call(this, socket);
234 }
236 // Wrap incoming socket into abstract class
237 var connection = new Connection(socket, state.pool, state.options);
239 // Emulate each stream like connection
240 connection.on('stream', state.handler);
242 connection.on('connect', function onconnect(req, socket) {
243 socket.streamID = req.streamID = req.socket.id;
244 socket.isSpdy = req.isSpdy = true;
245 socket.spdyVersion = req.spdyVersion = req.socket.version;
247 socket.once('finish', function onfinish() {
248 req.connection.end();
249 });
251 self.emit('connect', req, socket);
252 });
254 connection.on('request', function onrequest(req, res) {
255 res._renderHeaders = spdy.response._renderHeaders;
256 res.writeHead = spdy.response.writeHead;
257 res.push = spdy.response.push;
258 res.streamID = req.streamID = req.socket.id;
259 res.spdyVersion = req.spdyVersion = req.socket.version;
260 res.isSpdy = req.isSpdy = true;
262 // Chunked encoding is not supported in SPDY
263 res.useChunkedEncodingByDefault = false;
265 res.once('finish', function onfinish() {
266 req.connection.end();
267 });
269 self.emit('request', req, res);
270 });
272 connection.on('error', function onerror(e) {
273 console.log('[secureConnection] error ' + e);
274 socket.destroy(e.errno === 'EPIPE' ? undefined : e);
275 });
276 };
278 // Export Server instantiated from https.Server
279 var Server = instantiate(https.Server);
280 exports.Server = Server;
282 //
283 // ### function create (base, options, requestListener)
284 // #### @base {Function} (optional) base server class (https.Server)
285 // #### @options {Object} tls server options
286 // #### @requestListener {Function} (optional) request callback
287 // @constructor wrapper
288 //
289 exports.create = function create(base, options, requestListener) {
290 var server;
291 if (typeof base === 'function') {
292 server = instantiate(base);
293 } else {
294 server = Server;
296 requestListener = options;
297 options = base;
298 base = null;
299 }
301 return new server(options, requestListener);
302 };
304 //
305 // ### function Connection (socket, pool, options)
306 // #### @socket {net.Socket} server's connection
307 // #### @pool {spdy.ZlibPool} zlib pool
308 // #### @options {Object} server's options
309 // Abstract connection @constructor
310 //
311 function Connection(socket, pool, options) {
312 process.EventEmitter.call(this);
314 var self = this;
316 this._closed = false;
318 this.pool = pool;
319 var pair = null;
321 this._deflate = null;
322 this._inflate = null;
324 this.encrypted = socket.encrypted;
326 // Init streams list
327 this.streams = {};
328 this.streamsCount = 0;
329 this.pushId = 0;
330 this._goaway = false;
332 this._framer = null;
334 // Data transfer window defaults to 64kb
335 this.windowSize = options.windowSize;
336 this.sinkSize = options.sinkSize;
338 // Initialize scheduler
339 this.scheduler = spdy.scheduler.create(this);
341 // Store socket and pipe it to parser
342 this.socket = socket;
344 // Initialize parser
345 this.parser = spdy.parser.create(this);
346 this.parser.on('frame', function (frame) {
347 if (this._closed) return;
349 var stream;
351 // Create new stream
352 if (frame.type === 'SYN_STREAM') {
353 self.streamsCount++;
355 stream = self.streams[frame.id] = new Stream(self, frame);
357 // If we reached stream limit
358 if (self.streamsCount > options.maxStreams) {
359 stream.once('error', function onerror() {});
360 // REFUSED_STREAM
361 stream._rstCode = 3;
362 stream.destroy(true);
363 } else {
364 self.emit('stream', stream);
366 stream._init();
367 }
368 } else {
369 if (frame.id) {
370 // Load created one
371 stream = self.streams[frame.id];
373 // Fail if not found
374 if (stream === undefined) {
375 if (frame.type === 'RST_STREAM') return;
376 self.write(self._framer.rstFrame(frame.id, 2));
377 return;
378 }
379 }
381 // Emit 'data' event
382 if (frame.type === 'DATA') {
383 if (frame.data.length > 0){
384 if (stream._closedBy.client) {
385 stream._rstCode = 2;
386 stream.emit('error', 'Writing to half-closed stream');
387 } else {
388 stream._recv(frame.data);
389 }
390 }
391 // Destroy stream if we was asked to do this
392 } else if (frame.type === 'RST_STREAM') {
393 stream._rstCode = 0;
394 if (frame.status === 5) {
395 // If client "cancels" connection - close stream and
396 // all associated push streams without error
397 stream.pushes.forEach(function(stream) {
398 stream.close();
399 });
400 stream.close();
401 } else {
402 // Emit error on destroy
403 stream.destroy(new Error('Received rst: ' + frame.status));
404 }
405 // Respond with same PING
406 } else if (frame.type === 'PING') {
407 self.write(self._framer.pingFrame(frame.pingId));
408 } else if (frame.type === 'SETTINGS') {
409 self._setDefaultWindow(frame.settings);
410 } else if (frame.type === 'GOAWAY') {
411 self._goaway = frame.lastId;
412 } else if (frame.type === 'WINDOW_UPDATE') {
413 stream._drainSink(frame.delta);
414 } else {
415 console.error('Unknown type: ', frame.type);
416 }
417 }
419 // Handle half-closed
420 if (frame.fin) {
421 // Don't allow to close stream twice
422 if (stream._closedBy.client) {
423 stream._rstCode = 2;
424 stream.emit('error', 'Already half-closed');
425 } else {
426 stream._closedBy.client = true;
428 // Emulate last chunked fragment
429 if (stream._forceChunked) {
430 stream._recv(last_frag, true);
431 }
433 stream._handleClose();
434 }
435 }
436 });
438 this.parser.on('version', function onversion(version) {
439 if (!pair) {
440 pair = pool.get('spdy/' + version);
441 self._deflate = pair.deflate;
442 self._inflate = pair.inflate;
443 }
444 });
446 this.parser.on('framer', function onframer(framer) {
447 // Generate custom settings frame and send
448 self.write(framer.settingsFrame(options));
449 });
451 // Propagate parser errors
452 this.parser.on('error', function onParserError(err) {
453 self.emit('error', err);
454 });
456 socket.pipe(this.parser);
458 // 2 minutes socket timeout
459 socket.setTimeout(2 * 60 * 1000);
460 socket.once('timeout', function ontimeout() {
461 socket.destroy();
462 });
464 // Allow high-level api to catch socket errors
465 socket.on('error', function onSocketError(e) {
466 self.emit('error', e);
467 });
469 socket.once('close', function onclose() {
470 self._closed = true;
471 if (pair) pool.put(pair);
472 });
474 if (legacy) {
475 socket.on('drain', function ondrain() {
476 self.emit('drain');
477 });
478 }
479 }
480 util.inherits(Connection, process.EventEmitter);
481 exports.Connection = Connection;
483 //
484 // ### function write (data, encoding)
485 // #### @data {String|Buffer} data
486 // #### @encoding {String} (optional) encoding
487 // Writes data to socket
488 //
489 Connection.prototype.write = function write(data, encoding) {
490 if (this.socket.writable) {
491 return this.socket.write(data, encoding);
492 }
493 };
495 //
496 // ### function _setDefaultWindow (settings)
497 // #### @settings {Object}
498 // Update the default transfer window -- in the connection and in the
499 // active streams
500 //
501 Connection.prototype._setDefaultWindow = function _setDefaultWindow(settings) {
502 if (!settings) return;
503 if (!settings.initial_window_size ||
504 settings.initial_window_size.persisted) {
505 return;
506 }
508 this.sinkSize = settings.initial_window_size.value;
510 Object.keys(this.streams).forEach(function(id) {
511 this.streams[id]._updateSinkSize(settings.initial_window_size.value);
512 }, this);
513 };
515 //
516 // ### function Stream (connection, frame)
517 // #### @connection {Connection} SPDY Connection
518 // #### @frame {Object} SYN_STREAM data
519 // Abstract stream @constructor
520 //
521 function Stream(connection, frame) {
522 DuplexStream.call(this);
524 this.connection = connection;
525 this.socket = connection.socket;
526 this.encrypted = connection.encrypted;
527 this._framer = connection._framer;
528 this._initialized = false;
530 // Should chunked encoding be forced
531 this._forceChunked = false;
533 this.ondata = this.onend = null;
535 // RST_STREAM code if any
536 this._rstCode = 1;
537 this._destroyed = false;
539 this._closedBy = {
540 client: false,
541 server: false
542 };
544 // Lock data
545 this._locked = false;
546 this._lockBuffer = [];
548 // Store id
549 this.id = frame.id;
550 this.version = frame.version;
552 // Store priority
553 this.priority = frame.priority;
555 // Array of push streams associated to that one
556 this.pushes = [];
558 // How much data can be sent TO client before next WINDOW_UPDATE
559 this._sinkSize = connection.sinkSize;
560 this._initialSinkSize = connection.sinkSize;
562 // When data needs to be send, but window is too small for it - it'll be
563 // queued in this buffer
564 this._sinkBuffer = [];
566 // How much data can be sent BY client before next WINDOW_UPDATE
567 this._initialWindowSize = connection.windowSize;
568 this._windowSize = connection.windowSize;
570 // Create compression streams
571 this._deflate = connection._deflate;
572 this._inflate = connection._inflate;
574 // Store headers
575 this.headers = frame.headers;
576 this.url = frame.url;
578 this._frame = frame;
580 if (legacy) {
581 this.readable = this.writable = true;
582 }
584 // Call .onend()
585 this.once('end', function() {
586 var self = this;
587 process.nextTick(function() {
588 if (self.onend) self.onend();
589 });
590 });
592 // Handle half-close
593 this.once('finish', function() {
594 this._writeData(true, []);
595 this._closedBy.server = true;
596 if (this._sinkBuffer.length !== 0) return;
597 this._handleClose();
598 });
599 };
600 util.inherits(Stream, DuplexStream);
601 exports.Stream = Stream;
603 if (legacy) {
604 Stream.prototype.pause = function pause() {};
605 Stream.prototype.resume = function resume() {};
606 }
608 //
609 // ### function _isGoaway ()
610 // Returns true if any writes to that stream should be ignored
611 //
612 Stream.prototype._isGoaway = function _isGoaway() {
613 return this.connection._goaway && this.id > this.connection._goaway;
614 };
616 //
617 // ### function init ()
618 // Initialize stream, internal
619 //
620 Stream.prototype._init = function init() {
621 var headers = this.headers,
622 req = [headers.method + ' ' + this.url + ' ' + headers.version];
624 Object.keys(headers).forEach(function (key) {
625 if (key !== 'method' && key !== 'url' && key !== 'version' &&
626 key !== 'scheme') {
627 req.push(key + ': ' + headers[key]);
628 }
629 });
631 // Force chunked encoding
632 if (!headers['content-length'] && !headers['transfer-encoding']) {
633 req.push('Transfer-Encoding: chunked');
634 this._forceChunked = true;
635 }
637 // Add '\r\n\r\n'
638 req.push('', '');
640 req = new Buffer(req.join('\r\n'));
642 this._recv(req, true);
643 this._initialized = true;
644 };
646 //
647 // ### function lock (callback)
648 // #### @callback {Function} continuation callback
649 // Acquire lock
650 //
651 Stream.prototype._lock = function lock(callback) {
652 if (!callback) return;
654 if (this._locked) {
655 this._lockBuffer.push(callback);
656 } else {
657 this._locked = true;
658 callback.call(this, null);
659 }
660 };
662 //
663 // ### function unlock ()
664 // Release lock and call all buffered callbacks
665 //
666 Stream.prototype._unlock = function unlock() {
667 if (this._locked) {
668 this._locked = false;
669 this._lock(this._lockBuffer.shift());
670 }
671 };
673 //
674 // ### function setTimeout ()
675 // TODO: use timers.enroll, timers.active, timers.unenroll
676 //
677 Stream.prototype.setTimeout = function setTimeout(time) {};
679 //
680 // ### function _handleClose ()
681 // Close stream if it was closed by both server and client
682 //
683 Stream.prototype._handleClose = function _handleClose() {
684 if (this._closedBy.client && this._closedBy.server) {
685 this.close();
686 }
687 };
689 //
690 // ### function close ()
691 // Destroys stream
692 //
693 Stream.prototype.close = function close() {
694 this.destroy();
695 };
697 //
698 // ### function destroy (error)
699 // #### @error {Error} (optional) error
700 // Destroys stream
701 //
702 Stream.prototype.destroy = function destroy(error) {
703 if (this._destroyed) return;
704 this._destroyed = true;
706 delete this.connection.streams[this.id];
707 if (this.id % 2 === 1) {
708 this.connection.streamsCount--;
709 }
711 // If stream is not finished, RST frame should be sent to notify client
712 // about sudden stream termination.
713 if (error || !this._closedBy.server) {
714 // REFUSED_STREAM if terminated before 'finish' event
715 if (!this._closedBy.server) this._rstCode = 3;
717 if (this._rstCode) {
718 this._lock(function() {
719 this.connection.scheduler.schedule(
720 this,
721 this._framer.rstFrame(this.id, this._rstCode));
722 this.connection.scheduler.tick();
724 this._unlock();
725 });
726 }
727 }
729 if (legacy) {
730 this.emit('end');
731 } else {
732 this.push(null);
733 }
735 if (error) this.emit('error', error);
737 var self = this;
738 process.nextTick(function() {
739 self.emit('close', !!error);
740 });
741 };
743 Stream.prototype.destroySoon = function destroySoon(error) {
744 return this.destroy(error);
745 };
747 Stream.prototype._drainSink = function _drainSink(size) {
748 var oldBuffer = this._sinkBuffer;
749 this._sinkBuffer = [];
751 this._sinkSize += size;
753 for (var i = 0; i < oldBuffer.length; i++) {
754 this._writeData(oldBuffer[i][0], oldBuffer[i][1], oldBuffer[i][2]);
755 }
757 // Handle half-close
758 if (this._sinkBuffer.length === 0 && this._closedBy.server) {
759 this._handleClose();
760 }
762 if (legacy) this.emit('drain');
763 };
765 //
766 // ### function _writeData (fin, buffer, cb)
767 // #### @fin {Boolean}
768 // #### @buffer {Buffer}
769 // #### @cb {Function} **optional**
770 // Internal function
771 //
772 Stream.prototype._writeData = function _writeData(fin, buffer, cb) {
773 if (this._framer.version === 3) {
774 // Window was exhausted, queue data
775 if (this._sinkSize <= 0) {
776 this._sinkBuffer.push([fin, buffer, cb]);
777 return false;
778 }
780 var len = Math.min(this._sinkSize, buffer.length);
781 this._sinkSize -= len;
783 // Only partial write is possible, queue rest for later
784 if (len < buffer.length) {
785 this._sinkBuffer.push([fin, buffer.slice(len)]);
786 buffer = buffer.slice(0, len);
787 fin = false;
788 }
789 }
791 this._lock(function() {
792 var stream = this,
793 frame = this._framer.dataFrame(this.id, fin, buffer);
795 stream.connection.scheduler.schedule(stream, frame);
796 stream.connection.scheduler.tick();
798 this._unlock();
800 if (cb) cb();
801 });
803 return true;
804 };
806 //
807 // ### function write (data, encoding)
808 // #### @data {Buffer|String} data
809 // #### @encoding {String} data encoding
810 // Writes data to connection
811 //
812 Stream.prototype._write = function write(data, encoding, cb) {
813 // Do not send data to new connections after GOAWAY
814 if (this._isGoaway()) {
815 if (cb) cb();
816 return false;
817 }
819 return this._writeData(false, data, cb);
820 };
822 if (legacy) {
823 Stream.prototype.write = function write(data, encoding, cb) {
824 if (!Buffer.isBuffer(data)) {
825 return this._write(new Buffer(data, encoding), null, cb);
826 } else {
827 return this._write(data, encoding, cb);
828 }
829 };
831 //
832 // ### function end (data)
833 // #### @data {Buffer|String} (optional) data to write before ending stream
834 // #### @encoding {String} (optional) string encoding
835 // Send FIN data frame
836 //
837 Stream.prototype.end = function end(data, encoding) {
838 // Do not send data to new connections after GOAWAY
839 if (this._isGoaway()) return;
841 if (data) this.write(data, encoding);
842 this.emit('finish');
843 };
844 }
846 //
847 // ### function _recv (data)
848 // #### @data {Buffer} buffer to receive
849 // #### @chunked {Boolean}
850 // (internal)
851 //
852 Stream.prototype._recv = function _recv(data, chunked) {
853 // Update window if exhausted
854 if (!chunked && this._framer.version >= 3 && this._initialized) {
855 this._windowSize -= data.length;
857 if (this._windowSize <= 0) {
858 var delta = this._initialWindowSize - this._windowSize;
859 this._windowSize += delta;
860 this.connection.write(this._framer.windowUpdateFrame(this.id, delta));
861 }
862 }
864 // Emulate chunked encoding
865 if (this._forceChunked && !chunked) {
866 // Zero-chunks are treated as end, do not emit them
867 if (data.length === 0) return;
869 this._recv(new Buffer(data.length.toString(16)), true);
870 this._recv(crlf, true);
871 this._recv(data, true);
872 this._recv(crlf, true);
873 return;
874 }
876 if (legacy) {
877 var self = this;
878 process.nextTick(function() {
879 self.emit('data', data);
880 if (self.ondata) {
881 self.ondata(data, 0, data.length);
882 }
883 });
884 } else {
885 // Right now, http module expects socket to be working in streams1 mode.
886 if (this.ondata) {
887 this.ondata(data, 0, data.length);
888 } else {
889 this.push(data);
890 }
891 }
892 };
894 //
895 // ### function _read (bytes, cb)
896 // #### @bytes {Number} number of bytes to read
897 // Streams2 API
898 //
899 Stream.prototype._read = function read(bytes) {
900 // NOP
901 };
903 //
904 // ### function _updateSinkSize (size)
905 // #### @size {Integer}
906 // Update the internal data transfer window
907 //
908 Stream.prototype._updateSinkSize = function _updateSinkSize(size) {
909 var diff = size - this._initialSinkSize;
911 this._initialSinkSize = size;
912 this._drainSink(diff);
913 };
915 //
916 // `net` compatibility layer
917 // (Copy pasted from lib/tls.js from node.js)
918 //
919 Stream.prototype.address = function address() {
920 return this.socket && this.socket.address();
921 };
923 Stream.prototype.__defineGetter__('remoteAddress', function remoteAddress() {
924 return this.socket && this.socket.remoteAddress;
925 });
927 Stream.prototype.__defineGetter__('remotePort', function remotePort() {
928 return this.socket && this.socket.remotePort;
929 });