michael@0: var expect = require('chai').expect; michael@0: var util = require('./util'); michael@0: michael@0: var stream = require('../lib/stream'); michael@0: var Stream = stream.Stream; michael@0: michael@0: function createStream() { michael@0: var stream = new Stream(util.log); michael@0: stream.upstream._window = Infinity; michael@0: return stream; michael@0: } michael@0: michael@0: // Execute a list of commands and assertions michael@0: var recorded_events = ['state', 'error', 'window_update', 'headers', 'promise']; michael@0: function execute_sequence(stream, sequence, done) { michael@0: if (!done) { michael@0: done = sequence; michael@0: sequence = stream; michael@0: stream = createStream(); michael@0: } michael@0: michael@0: var outgoing_frames = []; michael@0: michael@0: var emit = stream.emit, events = []; michael@0: stream.emit = function(name) { michael@0: if (recorded_events.indexOf(name) !== -1) { michael@0: events.push({ name: name, data: Array.prototype.slice.call(arguments, 1) }); michael@0: } michael@0: return emit.apply(this, arguments); michael@0: }; michael@0: michael@0: var commands = [], checks = []; michael@0: sequence.forEach(function(step) { michael@0: if ('method' in step || 'incoming' in step || 'outgoing' in step || 'wait' in step || 'set_state' in step) { michael@0: commands.push(step); michael@0: } michael@0: michael@0: if ('outgoing' in step || 'event' in step || 'active' in step) { michael@0: checks.push(step); michael@0: } michael@0: }); michael@0: michael@0: var activeCount = 0; michael@0: function count_change(change) { michael@0: activeCount += change; michael@0: } michael@0: michael@0: function execute(callback) { michael@0: var command = commands.shift(); michael@0: if (command) { michael@0: if ('method' in command) { michael@0: var value = stream[command.method.name].apply(stream, command.method.arguments); michael@0: if (command.method.ret) { michael@0: command.method.ret(value); michael@0: } michael@0: execute(callback); michael@0: } else if ('incoming' in command) { michael@0: command.incoming.count_change = count_change; michael@0: stream.upstream.write(command.incoming); michael@0: execute(callback); michael@0: } else if ('outgoing' in command) { michael@0: outgoing_frames.push(stream.upstream.read()); michael@0: execute(callback); michael@0: } else if ('set_state' in command) { michael@0: stream.state = command.set_state; michael@0: execute(callback); michael@0: } else if ('wait' in command) { michael@0: setTimeout(execute.bind(null, callback), command.wait); michael@0: } else { michael@0: throw new Error('Invalid command', command); michael@0: } michael@0: } else { michael@0: setTimeout(callback, 5); michael@0: } michael@0: } michael@0: michael@0: function check() { michael@0: checks.forEach(function(check) { michael@0: if ('outgoing' in check) { michael@0: var frame = outgoing_frames.shift(); michael@0: for (var key in check.outgoing) { michael@0: expect(frame).to.have.property(key).that.deep.equals(check.outgoing[key]); michael@0: } michael@0: count_change(frame.count_change); michael@0: } else if ('event' in check) { michael@0: var event = events.shift(); michael@0: expect(event.name).to.be.equal(check.event.name); michael@0: check.event.data.forEach(function(data, index) { michael@0: expect(event.data[index]).to.deep.equal(data); michael@0: }); michael@0: } else if ('active' in check) { michael@0: expect(activeCount).to.be.equal(check.active); michael@0: } else { michael@0: throw new Error('Invalid check', check); michael@0: } michael@0: }); michael@0: done(); michael@0: } michael@0: michael@0: setImmediate(execute.bind(null, check)); michael@0: } michael@0: michael@0: var example_frames = [ michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, michael@0: { type: 'RST_STREAM', flags: {}, error: 'CANCEL' }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log) } michael@0: ]; michael@0: michael@0: var invalid_incoming_frames = { michael@0: IDLE: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} }, michael@0: { type: 'RST_STREAM', flags: {}, error: 'CANCEL' } michael@0: ], michael@0: RESERVED_LOCAL: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} } michael@0: ], michael@0: RESERVED_REMOTE: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} } michael@0: ], michael@0: OPEN: [ michael@0: ], michael@0: HALF_CLOSED_LOCAL: [ michael@0: ], michael@0: HALF_CLOSED_REMOTE: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} } michael@0: ] michael@0: }; michael@0: michael@0: var invalid_outgoing_frames = { michael@0: IDLE: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} } michael@0: ], michael@0: RESERVED_LOCAL: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} } michael@0: ], michael@0: RESERVED_REMOTE: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} } michael@0: ], michael@0: OPEN: [ michael@0: ], michael@0: HALF_CLOSED_LOCAL: [ michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {} } michael@0: ], michael@0: HALF_CLOSED_REMOTE: [ michael@0: ], michael@0: CLOSED: [ michael@0: { type: 'PRIORITY', flags: {}, priority: 1 }, michael@0: { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, michael@0: { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, michael@0: { type: 'DATA', flags: {}, data: new Buffer(5) }, michael@0: { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log) } michael@0: ] michael@0: }; michael@0: michael@0: describe('stream.js', function() { michael@0: describe('Stream class', function() { michael@0: describe('._transition(sending, frame) method', function() { michael@0: it('should emit error, and answer RST_STREAM for invalid incoming frames', function() { michael@0: Object.keys(invalid_incoming_frames).forEach(function(state) { michael@0: invalid_incoming_frames[state].forEach(function(invalid_frame) { michael@0: var stream = createStream(); michael@0: stream.state = state; michael@0: expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); michael@0: }); michael@0: }); michael@0: michael@0: // CLOSED state as a result of incoming END_STREAM (or RST_STREAM) michael@0: var stream = createStream(); michael@0: stream.headers({}); michael@0: stream.end(); michael@0: stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); michael@0: example_frames.forEach(function(invalid_frame) { michael@0: invalid_frame.count_change = util.noop; michael@0: expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); michael@0: }); michael@0: michael@0: // CLOSED state as a result of outgoing END_STREAM michael@0: var stream = createStream(); michael@0: stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); michael@0: stream.headers({}); michael@0: stream.end(); michael@0: example_frames.slice(3).forEach(function(invalid_frame) { michael@0: invalid_frame.count_change = util.noop; michael@0: expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); michael@0: }); michael@0: }); michael@0: it('should throw exception for invalid outgoing frames', function() { michael@0: Object.keys(invalid_outgoing_frames).forEach(function(state) { michael@0: invalid_outgoing_frames[state].forEach(function(invalid_frame) { michael@0: var stream = createStream(); michael@0: stream.state = state; michael@0: expect(stream._transition.bind(stream, true, invalid_frame)).to.throw(Error); michael@0: }); michael@0: }); michael@0: }); michael@0: it('should close the stream when there\'s an incoming or outgoing RST_STREAM', function() { michael@0: [ michael@0: 'RESERVED_LOCAL', michael@0: 'RESERVED_REMOTE', michael@0: 'OPEN', michael@0: 'HALF_CLOSED_LOCAL', michael@0: 'HALF_CLOSED_REMOTE' michael@0: ].forEach(function(state) { michael@0: [true, false].forEach(function(sending) { michael@0: var stream = createStream(); michael@0: stream.state = state; michael@0: stream._transition(sending, { type: 'RST_STREAM', flags: {} }); michael@0: expect(stream.state).to.be.equal('CLOSED'); michael@0: }); michael@0: }); michael@0: }); michael@0: it('should ignore any incoming frame after sending reset', function() { michael@0: var stream = createStream(); michael@0: stream.reset(); michael@0: example_frames.forEach(stream._transition.bind(stream, false)); michael@0: }); michael@0: it('should ignore certain incoming frames after closing the stream with END_STREAM', function() { michael@0: var stream = createStream(); michael@0: stream.upstream.write({ type: 'HEADERS', flags: { END_STREAM: true }, headers:{} }); michael@0: stream.headers({}); michael@0: stream.end(); michael@0: example_frames.slice(0,3).forEach(function(frame) { michael@0: frame.count_change = util.noop; michael@0: stream._transition(false, frame); michael@0: }); michael@0: }); michael@0: }); michael@0: }); michael@0: describe('test scenario', function() { michael@0: describe('sending request', function() { michael@0: it('should trigger the appropriate state transitions and outgoing frames', function(done) { michael@0: execute_sequence([ michael@0: { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, michael@0: { outgoing: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, michael@0: { event : { name: 'state', data: ['OPEN'] } }, michael@0: michael@0: { wait : 5 }, michael@0: { method : { name: 'end', arguments: [] } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, michael@0: { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(0) } }, michael@0: michael@0: { wait : 10 }, michael@0: { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, michael@0: { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: new Buffer(5) } }, michael@0: { event : { name: 'headers', data: [{ ':status': 200 }] } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 0 } michael@0: ], done); michael@0: }); michael@0: }); michael@0: describe('answering request', function() { michael@0: it('should trigger the appropriate state transitions and outgoing frames', function(done) { michael@0: var payload = new Buffer(5); michael@0: execute_sequence([ michael@0: { incoming: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, michael@0: { event : { name: 'state', data: ['OPEN'] } }, michael@0: { event : { name: 'headers', data: [{ ':path': '/' }] } }, michael@0: michael@0: { wait : 5 }, michael@0: { incoming: { type: 'DATA', flags: { }, data: new Buffer(5) } }, michael@0: { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(10) } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, michael@0: michael@0: { wait : 5 }, michael@0: { method : { name: 'headers', arguments: [{ ':status': 200 }] } }, michael@0: { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, michael@0: michael@0: { wait : 5 }, michael@0: { method : { name: 'end', arguments: [payload] } }, michael@0: { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 0 } michael@0: ], done); michael@0: }); michael@0: }); michael@0: describe('sending push stream', function() { michael@0: it('should trigger the appropriate state transitions and outgoing frames', function(done) { michael@0: var payload = new Buffer(5); michael@0: var pushStream; michael@0: michael@0: execute_sequence([ michael@0: // receiving request michael@0: { incoming: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, michael@0: { event : { name: 'state', data: ['OPEN'] } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, michael@0: { event : { name: 'headers', data: [{ ':path': '/' }] } }, michael@0: michael@0: // sending response headers michael@0: { wait : 5 }, michael@0: { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, michael@0: { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, michael@0: michael@0: // sending push promise michael@0: { method : { name: 'promise', arguments: [{ ':path': '/' }], ret: function(str) { pushStream = str; } } }, michael@0: { outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' } } }, michael@0: michael@0: // sending response data michael@0: { method : { name: 'end', arguments: [payload] } }, michael@0: { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 0 } michael@0: ], function() { michael@0: // initial state of the promised stream michael@0: expect(pushStream.state).to.equal('RESERVED_LOCAL'); michael@0: michael@0: execute_sequence(pushStream, [ michael@0: // push headers michael@0: { wait : 5 }, michael@0: { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, michael@0: { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, michael@0: michael@0: // push data michael@0: { method : { name: 'end', arguments: [payload] } }, michael@0: { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 1 } michael@0: ], done); michael@0: }); michael@0: }); michael@0: }); michael@0: describe('receiving push stream', function() { michael@0: it('should trigger the appropriate state transitions and outgoing frames', function(done) { michael@0: var payload = new Buffer(5); michael@0: var original_stream = createStream(); michael@0: var promised_stream = createStream(); michael@0: michael@0: done = util.callNTimes(2, done); michael@0: michael@0: execute_sequence(original_stream, [ michael@0: // sending request headers michael@0: { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, michael@0: { method : { name: 'end', arguments: [] } }, michael@0: { outgoing: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, michael@0: { event : { name: 'state', data: ['OPEN'] } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, michael@0: michael@0: // receiving response headers michael@0: { wait : 10 }, michael@0: { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, michael@0: { event : { name: 'headers', data: [{ ':status': 200 }] } }, michael@0: michael@0: // receiving push promise michael@0: { incoming: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/2.html' }, promised_stream: promised_stream } }, michael@0: { event : { name: 'promise', data: [promised_stream, { ':path': '/2.html' }] } }, michael@0: michael@0: // receiving response data michael@0: { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 0 } michael@0: ], done); michael@0: michael@0: execute_sequence(promised_stream, [ michael@0: // initial state of the promised stream michael@0: { event : { name: 'state', data: ['RESERVED_REMOTE'] } }, michael@0: michael@0: // push headers michael@0: { wait : 10 }, michael@0: { incoming: { type: 'HEADERS', flags: { END_STREAM: false }, headers: { ':status': 200 } } }, michael@0: { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, michael@0: { event : { name: 'headers', data: [{ ':status': 200 }] } }, michael@0: michael@0: // push data michael@0: { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, michael@0: { event : { name: 'state', data: ['CLOSED'] } }, michael@0: michael@0: { active : 0 } michael@0: ], done); michael@0: }); michael@0: }); michael@0: }); michael@0: michael@0: describe('bunyan formatter', function() { michael@0: describe('`s`', function() { michael@0: var format = stream.serializers.s; michael@0: it('should assign a unique ID to each frame', function() { michael@0: var stream1 = createStream(); michael@0: var stream2 = createStream(); michael@0: expect(format(stream1)).to.be.equal(format(stream1)); michael@0: expect(format(stream2)).to.be.equal(format(stream2)); michael@0: expect(format(stream1)).to.not.be.equal(format(stream2)); michael@0: }); michael@0: }); michael@0: }); michael@0: });