michael@0: var expect = require('chai').expect; michael@0: var util = require('./util'); michael@0: michael@0: var Flow = require('../lib/flow').Flow; michael@0: michael@0: function createFlow(log) { michael@0: var flowControlId = util.random(10, 100); michael@0: var flow = new Flow(flowControlId); michael@0: flow._log = util.log.child(log || {}); michael@0: return flow; michael@0: } michael@0: michael@0: describe('flow.js', function() { michael@0: describe('Flow class', function() { michael@0: var flow; michael@0: beforeEach(function() { michael@0: flow = createFlow(); michael@0: }); michael@0: michael@0: describe('._receive(frame, callback) method', function() { michael@0: it('is called when there\'s a frame in the input buffer to be consumed', function(done) { michael@0: var frame = { type: 'PRIORITY', flags: {}, priority: 1 }; michael@0: flow._receive = function _receive(receivedFrame, callback) { michael@0: expect(receivedFrame).to.equal(frame); michael@0: callback(); michael@0: }; michael@0: flow.write(frame, done); michael@0: }); michael@0: it('has to be overridden by the child class, otherwise it throws', function() { michael@0: expect(flow._receive.bind(flow)).to.throw(Error); michael@0: }); michael@0: }); michael@0: describe('._send() method', function() { michael@0: it('is called when the output buffer should be filled with more frames and the flow' + michael@0: 'control queue is empty', function() { michael@0: var sendCalled = 0; michael@0: var notFlowControlledFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; michael@0: flow._send = function _send() { michael@0: sendCalled += 1; michael@0: this.push(notFlowControlledFrame); michael@0: }; michael@0: expect(flow.read()).to.equal(notFlowControlledFrame); michael@0: michael@0: flow._window = 0; michael@0: flow._queue.push({ type: 'DATA', flags: {}, data: { length: 1 } }); michael@0: expect(flow.read()).to.equal(null); michael@0: michael@0: expect(sendCalled).to.equal(1); michael@0: }); michael@0: it('has to be overridden by the child class, otherwise it throws', function() { michael@0: expect(flow._send.bind(flow)).to.throw(Error); michael@0: }); michael@0: }); michael@0: describe('._increaseWindow(size) method', function() { michael@0: it('should increase `this._window` by `size`', function() { michael@0: flow._send = util.noop; michael@0: flow._window = 0; michael@0: michael@0: var increase1 = util.random(0,100); michael@0: var increase2 = util.random(0,100); michael@0: flow._increaseWindow(increase1); michael@0: flow._increaseWindow(increase2); michael@0: expect(flow._window).to.equal(increase1 + increase2); michael@0: michael@0: flow._increaseWindow(Infinity); michael@0: expect(flow._window).to.equal(Infinity); michael@0: }); michael@0: it('should emit error when increasing with a finite `size` when `_window` is infinite', function() { michael@0: flow._send = util.noop; michael@0: flow._increaseWindow(Infinity); michael@0: var increase = util.random(1,100); michael@0: michael@0: expect(flow._increaseWindow.bind(flow, increase)).to.throw('Uncaught, unspecified "error" event.'); michael@0: }); michael@0: it('should emit error when `_window` grows over the window limit', function() { michael@0: var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; michael@0: flow._send = util.noop; michael@0: flow._window = 0; michael@0: michael@0: flow._increaseWindow(WINDOW_SIZE_LIMIT); michael@0: expect(flow._increaseWindow.bind(flow, 1)).to.throw('Uncaught, unspecified "error" event.'); michael@0: michael@0: }); michael@0: }); michael@0: describe('.read() method', function() { michael@0: describe('when the flow control queue is not empty', function() { michael@0: it('should return the first item in the queue if the window is enough', function() { michael@0: var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; michael@0: var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; michael@0: flow._send = util.noop; michael@0: flow._window = 10; michael@0: flow._queue = [priorityFrame, dataFrame]; michael@0: michael@0: expect(flow.read()).to.equal(priorityFrame); michael@0: expect(flow.read()).to.equal(dataFrame); michael@0: }); michael@0: it('should also split DATA frames when needed', function() { michael@0: var buffer = new Buffer(10); michael@0: var dataFrame = { type: 'DATA', flags: {}, stream: util.random(0, 100), data: buffer }; michael@0: flow._send = util.noop; michael@0: flow._window = 5; michael@0: flow._queue = [dataFrame]; michael@0: michael@0: var expectedFragment = { flags: {}, type: 'DATA', stream: dataFrame.stream, data: buffer.slice(0,5) }; michael@0: expect(flow.read()).to.deep.equal(expectedFragment); michael@0: expect(dataFrame.data).to.deep.equal(buffer.slice(5)); michael@0: }); michael@0: }); michael@0: }); michael@0: describe('.push(frame) method', function() { michael@0: it('should push `frame` into the output queue or the flow control queue', function() { michael@0: var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; michael@0: var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; michael@0: flow._window = 10; michael@0: michael@0: flow.push(dataFrame); // output queue michael@0: flow.push(dataFrame); // flow control queue, because of depleted window michael@0: flow.push(priorityFrame); // flow control queue, because it's not empty michael@0: michael@0: expect(flow.read()).to.be.equal(dataFrame); michael@0: expect(flow._queue[0]).to.be.equal(dataFrame); michael@0: expect(flow._queue[1]).to.be.equal(priorityFrame); michael@0: }); michael@0: }); michael@0: describe('.write() method', function() { michael@0: it('call with a DATA frame should trigger sending WINDOW_UPDATE if remote flow control is not' + michael@0: 'disabled', function(done) { michael@0: flow._window = 100; michael@0: flow._send = util.noop; michael@0: flow._receive = function(frame, callback) { michael@0: callback(); michael@0: }; michael@0: michael@0: var buffer = new Buffer(util.random(10, 100)); michael@0: flow.write({ type: 'DATA', flags: {}, data: buffer }); michael@0: flow.once('readable', function() { michael@0: expect(flow.read()).to.be.deep.equal({ michael@0: type: 'WINDOW_UPDATE', michael@0: flags: {}, michael@0: stream: flow._flowControlId, michael@0: window_size: buffer.length michael@0: }); michael@0: done(); michael@0: }); michael@0: }); michael@0: }); michael@0: }); michael@0: describe('test scenario', function() { michael@0: var flow1, flow2; michael@0: beforeEach(function() { michael@0: flow1 = createFlow({ flow: 1 }); michael@0: flow2 = createFlow({ flow: 2 }); michael@0: flow1._flowControlId = flow2._flowControlId; michael@0: flow1._send = flow2._send = util.noop; michael@0: flow1._receive = flow2._receive = function(frame, callback) { callback(); }; michael@0: }); michael@0: michael@0: describe('sending a large data stream', function() { michael@0: it('should work as expected', function(done) { michael@0: // Sender side michael@0: var frameNumber = util.random(5, 8); michael@0: var input = []; michael@0: flow1._send = function _send() { michael@0: if (input.length >= frameNumber) { michael@0: this.push({ type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(0) }); michael@0: this.push(null); michael@0: } else { michael@0: var buffer = new Buffer(util.random(1000, 100000)); michael@0: input.push(buffer); michael@0: this.push({ type: 'DATA', flags: {}, data: buffer }); michael@0: } michael@0: }; michael@0: michael@0: // Receiver side michael@0: var output = []; michael@0: flow2._receive = function _receive(frame, callback) { michael@0: if (frame.type === 'DATA') { michael@0: output.push(frame.data); michael@0: } michael@0: if (frame.flags.END_STREAM) { michael@0: this.emit('end_stream'); michael@0: } michael@0: callback(); michael@0: }; michael@0: michael@0: // Checking results michael@0: flow2.on('end_stream', function() { michael@0: input = util.concat(input); michael@0: output = util.concat(output); michael@0: michael@0: expect(input).to.deep.equal(output); michael@0: michael@0: done(); michael@0: }); michael@0: michael@0: // Start piping michael@0: flow1.pipe(flow2).pipe(flow1); michael@0: }); michael@0: }); michael@0: }); michael@0: });