michael@0: var parser = exports; michael@0: michael@0: var spdy = require('../spdy'), michael@0: util = require('util'), michael@0: stream = require('stream'), michael@0: Buffer = require('buffer').Buffer; michael@0: michael@0: var legacy = !stream.Duplex; michael@0: michael@0: if (legacy) { michael@0: var DuplexStream = stream; michael@0: } else { michael@0: var DuplexStream = stream.Duplex; michael@0: } michael@0: michael@0: // michael@0: // ### function Parser (connection) michael@0: // #### @connection {spdy.Connection} connection michael@0: // SPDY protocol frames parser's @constructor michael@0: // michael@0: function Parser(connection) { michael@0: DuplexStream.call(this); michael@0: michael@0: this.drained = true; michael@0: this.paused = false; michael@0: this.buffer = []; michael@0: this.buffered = 0; michael@0: this.waiting = 8; michael@0: michael@0: this.state = { type: 'frame-head' }; michael@0: this.socket = connection.socket; michael@0: this.connection = connection; michael@0: this.framer = null; michael@0: michael@0: this.connection = connection; michael@0: michael@0: if (legacy) { michael@0: this.readable = this.writable = true; michael@0: } michael@0: } michael@0: util.inherits(Parser, DuplexStream); michael@0: michael@0: // michael@0: // ### function create (connection) michael@0: // #### @connection {spdy.Connection} connection michael@0: // @constructor wrapper michael@0: // michael@0: parser.create = function create(connection) { michael@0: return new Parser(connection); michael@0: }; michael@0: michael@0: // michael@0: // ### function destroy () michael@0: // Just a stub. michael@0: // michael@0: Parser.prototype.destroy = function destroy() { michael@0: }; michael@0: michael@0: // michael@0: // ### function _write (data, encoding, cb) michael@0: // #### @data {Buffer} chunk of data michael@0: // #### @encoding {Null} encoding michael@0: // #### @cb {Function} callback michael@0: // Writes or buffers data to parser michael@0: // michael@0: Parser.prototype._write = function write(data, encoding, cb) { michael@0: // Legacy compatibility michael@0: if (!cb) cb = function() {}; michael@0: michael@0: if (data !== undefined) { michael@0: // Buffer data michael@0: this.buffer.push(data); michael@0: this.buffered += data.length; michael@0: } michael@0: michael@0: // Notify caller about state (for piping) michael@0: if (this.paused) return false; michael@0: michael@0: // We shall not do anything until we get all expected data michael@0: if (this.buffered < this.waiting) return cb(); michael@0: michael@0: // Mark parser as not drained michael@0: if (data !== undefined) this.drained = false; michael@0: michael@0: var self = this, michael@0: buffer = new Buffer(this.waiting), michael@0: sliced = 0, michael@0: offset = 0; michael@0: michael@0: while (this.waiting > offset && sliced < this.buffer.length) { michael@0: var chunk = this.buffer[sliced++], michael@0: overmatched = false; michael@0: michael@0: // Copy chunk into `buffer` michael@0: if (chunk.length > this.waiting - offset) { michael@0: chunk.copy(buffer, offset, 0, this.waiting - offset); michael@0: michael@0: this.buffer[--sliced] = chunk.slice(this.waiting - offset); michael@0: this.buffered += this.buffer[sliced].length; michael@0: michael@0: overmatched = true; michael@0: } else { michael@0: chunk.copy(buffer, offset); michael@0: } michael@0: michael@0: // Move offset and decrease amount of buffered data michael@0: offset += chunk.length; michael@0: this.buffered -= chunk.length; michael@0: michael@0: if (overmatched) break; michael@0: } michael@0: michael@0: // Remove used buffers michael@0: this.buffer = this.buffer.slice(sliced); michael@0: michael@0: // Executed parser for buffered data michael@0: this.paused = true; michael@0: this.execute(this.state, buffer, function (err, waiting) { michael@0: // And unpause once execution finished michael@0: self.paused = false; michael@0: michael@0: // Propagate errors michael@0: if (err) { michael@0: cb(); michael@0: return self.emit('error', err); michael@0: } michael@0: michael@0: // Set new `waiting` michael@0: self.waiting = waiting; michael@0: michael@0: if (self.waiting <= self.buffered) { michael@0: self._write(undefined, null, cb); michael@0: } else { michael@0: process.nextTick(function() { michael@0: if (self.drained) return; michael@0: michael@0: // Mark parser as drained michael@0: self.drained = true; michael@0: self.emit('drain'); michael@0: }); michael@0: michael@0: cb(); michael@0: } michael@0: }); michael@0: }; michael@0: michael@0: if (legacy) { michael@0: // michael@0: // ### function write (data, encoding, cb) michael@0: // #### @data {Buffer} chunk of data michael@0: // #### @encoding {Null} encoding michael@0: // #### @cb {Function} callback michael@0: // Legacy method michael@0: // michael@0: Parser.prototype.write = Parser.prototype._write; michael@0: michael@0: // michael@0: // ### function end () michael@0: // Stream's end() implementation michael@0: // michael@0: Parser.prototype.end = function end() { michael@0: this.emit('end'); michael@0: }; michael@0: } michael@0: michael@0: // michael@0: // ### function createFramer (version) michael@0: // #### @version {Number} Protocol version, either 2 or 3 michael@0: // Sets framer instance on Parser's instance michael@0: // michael@0: Parser.prototype.createFramer = function createFramer(version) { michael@0: if (spdy.protocol[version]) { michael@0: this.emit('version', version); michael@0: michael@0: this.framer = new spdy.protocol[version].Framer( michael@0: spdy.utils.zwrap(this.connection._deflate), michael@0: spdy.utils.zwrap(this.connection._inflate) michael@0: ); michael@0: michael@0: // Propagate framer to connection michael@0: this.connection._framer = this.framer; michael@0: this.emit('framer', this.framer); michael@0: } else { michael@0: this.emit( michael@0: 'error', michael@0: new Error('Unknown protocol version requested: ' + version) michael@0: ); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // ### function execute (state, data, callback) michael@0: // #### @state {Object} Parser's state michael@0: // #### @data {Buffer} Incoming data michael@0: // #### @callback {Function} continuation callback michael@0: // Parse buffered data michael@0: // michael@0: Parser.prototype.execute = function execute(state, data, callback) { michael@0: if (state.type === 'frame-head') { michael@0: var header = state.header = spdy.protocol.generic.parseHeader(data); michael@0: michael@0: // Lazily create framer michael@0: if (!this.framer && header.control) { michael@0: this.createFramer(header.version); michael@0: } michael@0: michael@0: state.type = 'frame-body'; michael@0: callback(null, header.length); michael@0: } else if (state.type === 'frame-body') { michael@0: var self = this; michael@0: michael@0: // Data frame michael@0: if (!state.header.control) { michael@0: return onFrame(null, { michael@0: type: 'DATA', michael@0: id: state.header.id, michael@0: fin: (state.header.flags & 0x01) === 0x01, michael@0: compressed: (state.header.flags & 0x02) === 0x02, michael@0: data: data michael@0: }); michael@0: } else { michael@0: // Control frame michael@0: this.framer.execute(state.header, data, onFrame); michael@0: } michael@0: michael@0: function onFrame(err, frame) { michael@0: if (err) return callback(err); michael@0: michael@0: self.emit('frame', frame); michael@0: michael@0: state.type = 'frame-head'; michael@0: callback(null, 8); michael@0: }; michael@0: } michael@0: };