michael@0: var framer = exports; michael@0: michael@0: var spdy = require('../../../spdy'), michael@0: Buffer = require('buffer').Buffer, michael@0: protocol = require('./'); michael@0: michael@0: // michael@0: // ### function Framer (deflate, inflate) michael@0: // #### @deflate {zlib.Deflate} Deflate stream michael@0: // #### @inflate {zlib.Inflate} Inflate stream michael@0: // Framer constructor michael@0: // michael@0: function Framer(deflate, inflate) { michael@0: this.version = 2; michael@0: this.deflate = deflate; michael@0: this.inflate = inflate; michael@0: } michael@0: exports.Framer = Framer; michael@0: michael@0: michael@0: // michael@0: // ### function execute (header, body, callback) michael@0: // #### @header {Object} Frame headers michael@0: // #### @body {Buffer} Frame's body michael@0: // #### @callback {Function} Continuation callback michael@0: // Parse frame (decompress data and create streams) michael@0: // michael@0: Framer.prototype.execute = function execute(header, body, callback) { michael@0: // SYN_STREAM or SYN_REPLY michael@0: if (header.type === 0x01 || header.type === 0x02) { michael@0: var frame = protocol.parseSynHead(header.type, header.flags, body); michael@0: michael@0: body = body.slice(frame._offset); michael@0: michael@0: this.inflate(body, function(err, chunks, length) { michael@0: if (err) return callback(err); michael@0: michael@0: var pairs = new Buffer(length); michael@0: for (var i = 0, offset = 0; i < chunks.length; i++) { michael@0: chunks[i].copy(pairs, offset); michael@0: offset += chunks[i].length; michael@0: } michael@0: michael@0: frame.headers = protocol.parseHeaders(pairs); michael@0: frame.url = frame.headers.url || ''; michael@0: michael@0: callback(null, frame); michael@0: }); michael@0: // RST_STREAM michael@0: } else if (header.type === 0x03) { michael@0: callback(null, protocol.parseRst(body)); michael@0: // SETTINGS michael@0: } else if (header.type === 0x04) { michael@0: callback(null, { type: 'SETTINGS' }); michael@0: } else if (header.type === 0x05) { michael@0: callback(null, { type: 'NOOP' }); michael@0: // PING michael@0: } else if (header.type === 0x06) { michael@0: callback(null, { type: 'PING', pingId: body }); michael@0: // GOAWAY michael@0: } else if (header.type === 0x07) { michael@0: callback(null, protocol.parseGoaway(body)); michael@0: } else { michael@0: callback(null, { type: 'unknown: ' + header.type, body: body }); michael@0: } michael@0: }; michael@0: michael@0: // michael@0: // internal, converts object into spdy dictionary michael@0: // michael@0: function headersToDict(headers, preprocess) { michael@0: function stringify(value) { michael@0: if (value !== undefined) { michael@0: if (Array.isArray(value)) { michael@0: return value.join('\x00'); michael@0: } else if (typeof value === 'string') { michael@0: return value; michael@0: } else { michael@0: return value.toString(); michael@0: } michael@0: } else { michael@0: return ''; michael@0: } michael@0: } michael@0: michael@0: // Lower case of all headers keys michael@0: var loweredHeaders = {}; michael@0: Object.keys(headers || {}).map(function(key) { michael@0: loweredHeaders[key.toLowerCase()] = headers[key]; michael@0: }); michael@0: michael@0: // Allow outer code to add custom headers or remove something michael@0: if (preprocess) preprocess(loweredHeaders); michael@0: michael@0: // Transform object into kv pairs michael@0: var len = 2, michael@0: pairs = Object.keys(loweredHeaders).filter(function(key) { michael@0: var lkey = key.toLowerCase(); michael@0: return lkey !== 'connection' && lkey !== 'keep-alive' && michael@0: lkey !== 'proxy-connection' && lkey !== 'transfer-encoding'; michael@0: }).map(function(key) { michael@0: var klen = Buffer.byteLength(key), michael@0: value = stringify(loweredHeaders[key]), michael@0: vlen = Buffer.byteLength(value); michael@0: michael@0: len += 4 + klen + vlen; michael@0: return [klen, key, vlen, value]; michael@0: }), michael@0: result = new Buffer(len); michael@0: michael@0: result.writeUInt16BE(pairs.length, 0, true); michael@0: michael@0: var offset = 2; michael@0: pairs.forEach(function(pair) { michael@0: // Write key length michael@0: result.writeUInt16BE(pair[0], offset, true); michael@0: // Write key michael@0: result.write(pair[1], offset + 2); michael@0: michael@0: offset += pair[0] + 2; michael@0: michael@0: // Write value length michael@0: result.writeUInt16BE(pair[2], offset, true); michael@0: // Write value michael@0: result.write(pair[3], offset + 2); michael@0: michael@0: offset += pair[2] + 2; michael@0: }); michael@0: michael@0: return result; michael@0: }; michael@0: michael@0: Framer.prototype._synFrame = function _synFrame(type, id, assoc, priority, dict, michael@0: callback) { michael@0: // Compress headers michael@0: this.deflate(dict, function (err, chunks, size) { michael@0: if (err) return callback(err); michael@0: michael@0: var offset = type === 'SYN_STREAM' ? 18 : 14, michael@0: total = (type === 'SYN_STREAM' ? 10 : 6) + size, michael@0: frame = new Buffer(offset + size);; michael@0: michael@0: frame.writeUInt16BE(0x8002, 0, true); // Control + Version michael@0: frame.writeUInt16BE(type === 'SYN_STREAM' ? 1 : 2, 2, true); // type michael@0: frame.writeUInt32BE(total & 0x00ffffff, 4, true); // No flag support michael@0: frame.writeUInt32BE(id & 0x7fffffff, 8, true); // Stream-ID michael@0: michael@0: if (type === 'SYN_STREAM') { michael@0: frame[4] = 2; michael@0: frame.writeUInt32BE(assoc & 0x7fffffff, 12, true); // Stream-ID michael@0: } michael@0: michael@0: frame.writeUInt8(priority & 0x3, 16, true); // Priority michael@0: michael@0: for (var i = 0; i < chunks.length; i++) { michael@0: chunks[i].copy(frame, offset); michael@0: offset += chunks[i].length; michael@0: } michael@0: michael@0: callback(null, frame); michael@0: }); michael@0: }; michael@0: michael@0: // michael@0: // ### function replyFrame (id, code, reason, headers, callback) michael@0: // #### @id {Number} Stream ID michael@0: // #### @code {Number} HTTP Status Code michael@0: // #### @reason {String} (optional) michael@0: // #### @headers {Object|Array} (optional) HTTP headers michael@0: // #### @callback {Function} Continuation function michael@0: // Sends SYN_REPLY frame michael@0: // michael@0: Framer.prototype.replyFrame = function replyFrame(id, code, reason, headers, michael@0: callback) { michael@0: var dict = headersToDict(headers, function(headers) { michael@0: headers.status = code + ' ' + reason; michael@0: headers.version = 'HTTP/1.1'; michael@0: }); michael@0: michael@0: this._synFrame('SYN_REPLY', id, null, 0, dict, callback); michael@0: }; michael@0: michael@0: // michael@0: // ### function streamFrame (id, assoc, headers, callback) michael@0: // #### @id {Number} stream id michael@0: // #### @assoc {Number} associated stream id michael@0: // #### @meta {Object} meta headers ( method, scheme, url, version ) michael@0: // #### @headers {Object} stream headers michael@0: // #### @callback {Function} continuation callback michael@0: // Create SYN_STREAM frame michael@0: // (needed for server push and testing) michael@0: // michael@0: Framer.prototype.streamFrame = function streamFrame(id, assoc, meta, headers, michael@0: callback) { michael@0: var dict = headersToDict(headers, function(headers) { michael@0: headers.status = 200; michael@0: headers.version = 'HTTP/1.1'; michael@0: headers.url = meta.url; michael@0: }); michael@0: michael@0: this._synFrame('SYN_STREAM', id, assoc, meta.priority, dict, callback); michael@0: }; michael@0: michael@0: // michael@0: // ### function dataFrame (id, fin, data) michael@0: // #### @id {Number} Stream id michael@0: // #### @fin {Bool} Is this data frame last frame michael@0: // #### @data {Buffer} Response data michael@0: // Sends DATA frame michael@0: // michael@0: Framer.prototype.dataFrame = function dataFrame(id, fin, data) { michael@0: if (!fin && !data.length) return []; michael@0: michael@0: var frame = new Buffer(8 + data.length); michael@0: michael@0: frame.writeUInt32BE(id & 0x7fffffff, 0, true); michael@0: frame.writeUInt32BE(data.length & 0x00ffffff, 4, true); michael@0: frame.writeUInt8(fin ? 0x01 : 0x0, 4, true); michael@0: michael@0: if (data.length) data.copy(frame, 8); michael@0: michael@0: return frame; michael@0: }; michael@0: michael@0: // michael@0: // ### function pingFrame (id) michael@0: // #### @id {Buffer} Ping ID michael@0: // Sends PING frame michael@0: // michael@0: Framer.prototype.pingFrame = function pingFrame(id) { michael@0: var header = new Buffer(12); michael@0: michael@0: header.writeUInt32BE(0x80020006, 0, true); // Version and type michael@0: header.writeUInt32BE(0x00000004, 4, true); // Length michael@0: id.copy(header, 8, 0, 4); // ID michael@0: michael@0: return header; michael@0: }; michael@0: michael@0: // michael@0: // ### function rstFrame (id, code) michael@0: // #### @id {Number} Stream ID michael@0: // #### @code {NUmber} RST Code michael@0: // Sends PING frame michael@0: // michael@0: Framer.prototype.rstFrame = function rstFrame(id, code) { michael@0: var header; michael@0: michael@0: if (!(header = Framer.rstCache[code])) { michael@0: header = new Buffer(16); michael@0: michael@0: header.writeUInt32BE(0x80020003, 0, true); // Version and type michael@0: header.writeUInt32BE(0x00000008, 4, true); // Length michael@0: header.writeUInt32BE(id & 0x7fffffff, 8, true); // Stream ID michael@0: header.writeUInt32BE(code, 12, true); // Status Code michael@0: michael@0: Framer.rstCache[code] = header; michael@0: } michael@0: michael@0: return header; michael@0: }; michael@0: Framer.rstCache = {}; michael@0: michael@0: // michael@0: // ### function settingsFrame (options) michael@0: // #### @options {Object} settings frame options michael@0: // Sends SETTINGS frame with MAX_CONCURRENT_STREAMS michael@0: // michael@0: Framer.prototype.settingsFrame = function settingsFrame(options) { michael@0: var settings; michael@0: michael@0: if (!(settings = Framer.settingsCache[options.maxStreams])) { michael@0: settings = new Buffer(20); michael@0: michael@0: settings.writeUInt32BE(0x80020004, 0, true); // Version and type michael@0: settings.writeUInt32BE(0x0000000C, 4, true); // length michael@0: settings.writeUInt32BE(0x00000001, 8, true); // Count of entries michael@0: settings.writeUInt32LE(0x01000004, 12, true); // Entry ID and Persist flag michael@0: settings.writeUInt32BE(options.maxStreams, 16, true); michael@0: michael@0: Framer.settingsCache[options.maxStreams] = settings; michael@0: } michael@0: michael@0: return settings; michael@0: }; michael@0: Framer.settingsCache = {};