|
1 var expect = require('chai').expect; |
|
2 var util = require('./util'); |
|
3 |
|
4 var Flow = require('../lib/flow').Flow; |
|
5 |
|
6 function createFlow(log) { |
|
7 var flowControlId = util.random(10, 100); |
|
8 var flow = new Flow(flowControlId); |
|
9 flow._log = util.log.child(log || {}); |
|
10 return flow; |
|
11 } |
|
12 |
|
13 describe('flow.js', function() { |
|
14 describe('Flow class', function() { |
|
15 var flow; |
|
16 beforeEach(function() { |
|
17 flow = createFlow(); |
|
18 }); |
|
19 |
|
20 describe('._receive(frame, callback) method', function() { |
|
21 it('is called when there\'s a frame in the input buffer to be consumed', function(done) { |
|
22 var frame = { type: 'PRIORITY', flags: {}, priority: 1 }; |
|
23 flow._receive = function _receive(receivedFrame, callback) { |
|
24 expect(receivedFrame).to.equal(frame); |
|
25 callback(); |
|
26 }; |
|
27 flow.write(frame, done); |
|
28 }); |
|
29 it('has to be overridden by the child class, otherwise it throws', function() { |
|
30 expect(flow._receive.bind(flow)).to.throw(Error); |
|
31 }); |
|
32 }); |
|
33 describe('._send() method', function() { |
|
34 it('is called when the output buffer should be filled with more frames and the flow' + |
|
35 'control queue is empty', function() { |
|
36 var sendCalled = 0; |
|
37 var notFlowControlledFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; |
|
38 flow._send = function _send() { |
|
39 sendCalled += 1; |
|
40 this.push(notFlowControlledFrame); |
|
41 }; |
|
42 expect(flow.read()).to.equal(notFlowControlledFrame); |
|
43 |
|
44 flow._window = 0; |
|
45 flow._queue.push({ type: 'DATA', flags: {}, data: { length: 1 } }); |
|
46 expect(flow.read()).to.equal(null); |
|
47 |
|
48 expect(sendCalled).to.equal(1); |
|
49 }); |
|
50 it('has to be overridden by the child class, otherwise it throws', function() { |
|
51 expect(flow._send.bind(flow)).to.throw(Error); |
|
52 }); |
|
53 }); |
|
54 describe('._increaseWindow(size) method', function() { |
|
55 it('should increase `this._window` by `size`', function() { |
|
56 flow._send = util.noop; |
|
57 flow._window = 0; |
|
58 |
|
59 var increase1 = util.random(0,100); |
|
60 var increase2 = util.random(0,100); |
|
61 flow._increaseWindow(increase1); |
|
62 flow._increaseWindow(increase2); |
|
63 expect(flow._window).to.equal(increase1 + increase2); |
|
64 |
|
65 flow._increaseWindow(Infinity); |
|
66 expect(flow._window).to.equal(Infinity); |
|
67 }); |
|
68 it('should emit error when increasing with a finite `size` when `_window` is infinite', function() { |
|
69 flow._send = util.noop; |
|
70 flow._increaseWindow(Infinity); |
|
71 var increase = util.random(1,100); |
|
72 |
|
73 expect(flow._increaseWindow.bind(flow, increase)).to.throw('Uncaught, unspecified "error" event.'); |
|
74 }); |
|
75 it('should emit error when `_window` grows over the window limit', function() { |
|
76 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; |
|
77 flow._send = util.noop; |
|
78 flow._window = 0; |
|
79 |
|
80 flow._increaseWindow(WINDOW_SIZE_LIMIT); |
|
81 expect(flow._increaseWindow.bind(flow, 1)).to.throw('Uncaught, unspecified "error" event.'); |
|
82 |
|
83 }); |
|
84 }); |
|
85 describe('.read() method', function() { |
|
86 describe('when the flow control queue is not empty', function() { |
|
87 it('should return the first item in the queue if the window is enough', function() { |
|
88 var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; |
|
89 var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; |
|
90 flow._send = util.noop; |
|
91 flow._window = 10; |
|
92 flow._queue = [priorityFrame, dataFrame]; |
|
93 |
|
94 expect(flow.read()).to.equal(priorityFrame); |
|
95 expect(flow.read()).to.equal(dataFrame); |
|
96 }); |
|
97 it('should also split DATA frames when needed', function() { |
|
98 var buffer = new Buffer(10); |
|
99 var dataFrame = { type: 'DATA', flags: {}, stream: util.random(0, 100), data: buffer }; |
|
100 flow._send = util.noop; |
|
101 flow._window = 5; |
|
102 flow._queue = [dataFrame]; |
|
103 |
|
104 var expectedFragment = { flags: {}, type: 'DATA', stream: dataFrame.stream, data: buffer.slice(0,5) }; |
|
105 expect(flow.read()).to.deep.equal(expectedFragment); |
|
106 expect(dataFrame.data).to.deep.equal(buffer.slice(5)); |
|
107 }); |
|
108 }); |
|
109 }); |
|
110 describe('.push(frame) method', function() { |
|
111 it('should push `frame` into the output queue or the flow control queue', function() { |
|
112 var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; |
|
113 var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; |
|
114 flow._window = 10; |
|
115 |
|
116 flow.push(dataFrame); // output queue |
|
117 flow.push(dataFrame); // flow control queue, because of depleted window |
|
118 flow.push(priorityFrame); // flow control queue, because it's not empty |
|
119 |
|
120 expect(flow.read()).to.be.equal(dataFrame); |
|
121 expect(flow._queue[0]).to.be.equal(dataFrame); |
|
122 expect(flow._queue[1]).to.be.equal(priorityFrame); |
|
123 }); |
|
124 }); |
|
125 describe('.write() method', function() { |
|
126 it('call with a DATA frame should trigger sending WINDOW_UPDATE if remote flow control is not' + |
|
127 'disabled', function(done) { |
|
128 flow._window = 100; |
|
129 flow._send = util.noop; |
|
130 flow._receive = function(frame, callback) { |
|
131 callback(); |
|
132 }; |
|
133 |
|
134 var buffer = new Buffer(util.random(10, 100)); |
|
135 flow.write({ type: 'DATA', flags: {}, data: buffer }); |
|
136 flow.once('readable', function() { |
|
137 expect(flow.read()).to.be.deep.equal({ |
|
138 type: 'WINDOW_UPDATE', |
|
139 flags: {}, |
|
140 stream: flow._flowControlId, |
|
141 window_size: buffer.length |
|
142 }); |
|
143 done(); |
|
144 }); |
|
145 }); |
|
146 }); |
|
147 }); |
|
148 describe('test scenario', function() { |
|
149 var flow1, flow2; |
|
150 beforeEach(function() { |
|
151 flow1 = createFlow({ flow: 1 }); |
|
152 flow2 = createFlow({ flow: 2 }); |
|
153 flow1._flowControlId = flow2._flowControlId; |
|
154 flow1._send = flow2._send = util.noop; |
|
155 flow1._receive = flow2._receive = function(frame, callback) { callback(); }; |
|
156 }); |
|
157 |
|
158 describe('sending a large data stream', function() { |
|
159 it('should work as expected', function(done) { |
|
160 // Sender side |
|
161 var frameNumber = util.random(5, 8); |
|
162 var input = []; |
|
163 flow1._send = function _send() { |
|
164 if (input.length >= frameNumber) { |
|
165 this.push({ type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(0) }); |
|
166 this.push(null); |
|
167 } else { |
|
168 var buffer = new Buffer(util.random(1000, 100000)); |
|
169 input.push(buffer); |
|
170 this.push({ type: 'DATA', flags: {}, data: buffer }); |
|
171 } |
|
172 }; |
|
173 |
|
174 // Receiver side |
|
175 var output = []; |
|
176 flow2._receive = function _receive(frame, callback) { |
|
177 if (frame.type === 'DATA') { |
|
178 output.push(frame.data); |
|
179 } |
|
180 if (frame.flags.END_STREAM) { |
|
181 this.emit('end_stream'); |
|
182 } |
|
183 callback(); |
|
184 }; |
|
185 |
|
186 // Checking results |
|
187 flow2.on('end_stream', function() { |
|
188 input = util.concat(input); |
|
189 output = util.concat(output); |
|
190 |
|
191 expect(input).to.deep.equal(output); |
|
192 |
|
193 done(); |
|
194 }); |
|
195 |
|
196 // Start piping |
|
197 flow1.pipe(flow2).pipe(flow1); |
|
198 }); |
|
199 }); |
|
200 }); |
|
201 }); |