|
1 var expect = require('chai').expect; |
|
2 var util = require('./util'); |
|
3 |
|
4 var stream = require('../lib/stream'); |
|
5 var Stream = stream.Stream; |
|
6 |
|
7 function createStream() { |
|
8 var stream = new Stream(util.log); |
|
9 stream.upstream._window = Infinity; |
|
10 return stream; |
|
11 } |
|
12 |
|
13 // Execute a list of commands and assertions |
|
14 var recorded_events = ['state', 'error', 'window_update', 'headers', 'promise']; |
|
15 function execute_sequence(stream, sequence, done) { |
|
16 if (!done) { |
|
17 done = sequence; |
|
18 sequence = stream; |
|
19 stream = createStream(); |
|
20 } |
|
21 |
|
22 var outgoing_frames = []; |
|
23 |
|
24 var emit = stream.emit, events = []; |
|
25 stream.emit = function(name) { |
|
26 if (recorded_events.indexOf(name) !== -1) { |
|
27 events.push({ name: name, data: Array.prototype.slice.call(arguments, 1) }); |
|
28 } |
|
29 return emit.apply(this, arguments); |
|
30 }; |
|
31 |
|
32 var commands = [], checks = []; |
|
33 sequence.forEach(function(step) { |
|
34 if ('method' in step || 'incoming' in step || 'outgoing' in step || 'wait' in step || 'set_state' in step) { |
|
35 commands.push(step); |
|
36 } |
|
37 |
|
38 if ('outgoing' in step || 'event' in step || 'active' in step) { |
|
39 checks.push(step); |
|
40 } |
|
41 }); |
|
42 |
|
43 var activeCount = 0; |
|
44 function count_change(change) { |
|
45 activeCount += change; |
|
46 } |
|
47 |
|
48 function execute(callback) { |
|
49 var command = commands.shift(); |
|
50 if (command) { |
|
51 if ('method' in command) { |
|
52 var value = stream[command.method.name].apply(stream, command.method.arguments); |
|
53 if (command.method.ret) { |
|
54 command.method.ret(value); |
|
55 } |
|
56 execute(callback); |
|
57 } else if ('incoming' in command) { |
|
58 command.incoming.count_change = count_change; |
|
59 stream.upstream.write(command.incoming); |
|
60 execute(callback); |
|
61 } else if ('outgoing' in command) { |
|
62 outgoing_frames.push(stream.upstream.read()); |
|
63 execute(callback); |
|
64 } else if ('set_state' in command) { |
|
65 stream.state = command.set_state; |
|
66 execute(callback); |
|
67 } else if ('wait' in command) { |
|
68 setTimeout(execute.bind(null, callback), command.wait); |
|
69 } else { |
|
70 throw new Error('Invalid command', command); |
|
71 } |
|
72 } else { |
|
73 setTimeout(callback, 5); |
|
74 } |
|
75 } |
|
76 |
|
77 function check() { |
|
78 checks.forEach(function(check) { |
|
79 if ('outgoing' in check) { |
|
80 var frame = outgoing_frames.shift(); |
|
81 for (var key in check.outgoing) { |
|
82 expect(frame).to.have.property(key).that.deep.equals(check.outgoing[key]); |
|
83 } |
|
84 count_change(frame.count_change); |
|
85 } else if ('event' in check) { |
|
86 var event = events.shift(); |
|
87 expect(event.name).to.be.equal(check.event.name); |
|
88 check.event.data.forEach(function(data, index) { |
|
89 expect(event.data[index]).to.deep.equal(data); |
|
90 }); |
|
91 } else if ('active' in check) { |
|
92 expect(activeCount).to.be.equal(check.active); |
|
93 } else { |
|
94 throw new Error('Invalid check', check); |
|
95 } |
|
96 }); |
|
97 done(); |
|
98 } |
|
99 |
|
100 setImmediate(execute.bind(null, check)); |
|
101 } |
|
102 |
|
103 var example_frames = [ |
|
104 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
105 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, |
|
106 { type: 'RST_STREAM', flags: {}, error: 'CANCEL' }, |
|
107 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
108 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
109 { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log) } |
|
110 ]; |
|
111 |
|
112 var invalid_incoming_frames = { |
|
113 IDLE: [ |
|
114 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
115 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
116 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, |
|
117 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, |
|
118 { type: 'RST_STREAM', flags: {}, error: 'CANCEL' } |
|
119 ], |
|
120 RESERVED_LOCAL: [ |
|
121 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
122 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
123 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, |
|
124 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } |
|
125 ], |
|
126 RESERVED_REMOTE: [ |
|
127 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
128 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
129 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, |
|
130 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } |
|
131 ], |
|
132 OPEN: [ |
|
133 ], |
|
134 HALF_CLOSED_LOCAL: [ |
|
135 ], |
|
136 HALF_CLOSED_REMOTE: [ |
|
137 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
138 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
139 { type: 'PUSH_PROMISE', flags: {}, headers: {} } |
|
140 ] |
|
141 }; |
|
142 |
|
143 var invalid_outgoing_frames = { |
|
144 IDLE: [ |
|
145 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
146 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
147 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, |
|
148 { type: 'PUSH_PROMISE', flags: {}, headers: {} } |
|
149 ], |
|
150 RESERVED_LOCAL: [ |
|
151 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
152 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
153 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, |
|
154 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } |
|
155 ], |
|
156 RESERVED_REMOTE: [ |
|
157 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
158 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
159 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, |
|
160 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } |
|
161 ], |
|
162 OPEN: [ |
|
163 ], |
|
164 HALF_CLOSED_LOCAL: [ |
|
165 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
166 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
167 { type: 'PUSH_PROMISE', flags: {}, headers: {} } |
|
168 ], |
|
169 HALF_CLOSED_REMOTE: [ |
|
170 ], |
|
171 CLOSED: [ |
|
172 { type: 'PRIORITY', flags: {}, priority: 1 }, |
|
173 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, |
|
174 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, |
|
175 { type: 'DATA', flags: {}, data: new Buffer(5) }, |
|
176 { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log) } |
|
177 ] |
|
178 }; |
|
179 |
|
180 describe('stream.js', function() { |
|
181 describe('Stream class', function() { |
|
182 describe('._transition(sending, frame) method', function() { |
|
183 it('should emit error, and answer RST_STREAM for invalid incoming frames', function() { |
|
184 Object.keys(invalid_incoming_frames).forEach(function(state) { |
|
185 invalid_incoming_frames[state].forEach(function(invalid_frame) { |
|
186 var stream = createStream(); |
|
187 stream.state = state; |
|
188 expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); |
|
189 }); |
|
190 }); |
|
191 |
|
192 // CLOSED state as a result of incoming END_STREAM (or RST_STREAM) |
|
193 var stream = createStream(); |
|
194 stream.headers({}); |
|
195 stream.end(); |
|
196 stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); |
|
197 example_frames.forEach(function(invalid_frame) { |
|
198 invalid_frame.count_change = util.noop; |
|
199 expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); |
|
200 }); |
|
201 |
|
202 // CLOSED state as a result of outgoing END_STREAM |
|
203 var stream = createStream(); |
|
204 stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); |
|
205 stream.headers({}); |
|
206 stream.end(); |
|
207 example_frames.slice(3).forEach(function(invalid_frame) { |
|
208 invalid_frame.count_change = util.noop; |
|
209 expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); |
|
210 }); |
|
211 }); |
|
212 it('should throw exception for invalid outgoing frames', function() { |
|
213 Object.keys(invalid_outgoing_frames).forEach(function(state) { |
|
214 invalid_outgoing_frames[state].forEach(function(invalid_frame) { |
|
215 var stream = createStream(); |
|
216 stream.state = state; |
|
217 expect(stream._transition.bind(stream, true, invalid_frame)).to.throw(Error); |
|
218 }); |
|
219 }); |
|
220 }); |
|
221 it('should close the stream when there\'s an incoming or outgoing RST_STREAM', function() { |
|
222 [ |
|
223 'RESERVED_LOCAL', |
|
224 'RESERVED_REMOTE', |
|
225 'OPEN', |
|
226 'HALF_CLOSED_LOCAL', |
|
227 'HALF_CLOSED_REMOTE' |
|
228 ].forEach(function(state) { |
|
229 [true, false].forEach(function(sending) { |
|
230 var stream = createStream(); |
|
231 stream.state = state; |
|
232 stream._transition(sending, { type: 'RST_STREAM', flags: {} }); |
|
233 expect(stream.state).to.be.equal('CLOSED'); |
|
234 }); |
|
235 }); |
|
236 }); |
|
237 it('should ignore any incoming frame after sending reset', function() { |
|
238 var stream = createStream(); |
|
239 stream.reset(); |
|
240 example_frames.forEach(stream._transition.bind(stream, false)); |
|
241 }); |
|
242 it('should ignore certain incoming frames after closing the stream with END_STREAM', function() { |
|
243 var stream = createStream(); |
|
244 stream.upstream.write({ type: 'HEADERS', flags: { END_STREAM: true }, headers:{} }); |
|
245 stream.headers({}); |
|
246 stream.end(); |
|
247 example_frames.slice(0,3).forEach(function(frame) { |
|
248 frame.count_change = util.noop; |
|
249 stream._transition(false, frame); |
|
250 }); |
|
251 }); |
|
252 }); |
|
253 }); |
|
254 describe('test scenario', function() { |
|
255 describe('sending request', function() { |
|
256 it('should trigger the appropriate state transitions and outgoing frames', function(done) { |
|
257 execute_sequence([ |
|
258 { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, |
|
259 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, |
|
260 { event : { name: 'state', data: ['OPEN'] } }, |
|
261 |
|
262 { wait : 5 }, |
|
263 { method : { name: 'end', arguments: [] } }, |
|
264 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, |
|
265 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(0) } }, |
|
266 |
|
267 { wait : 10 }, |
|
268 { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, |
|
269 { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: new Buffer(5) } }, |
|
270 { event : { name: 'headers', data: [{ ':status': 200 }] } }, |
|
271 { event : { name: 'state', data: ['CLOSED'] } }, |
|
272 |
|
273 { active : 0 } |
|
274 ], done); |
|
275 }); |
|
276 }); |
|
277 describe('answering request', function() { |
|
278 it('should trigger the appropriate state transitions and outgoing frames', function(done) { |
|
279 var payload = new Buffer(5); |
|
280 execute_sequence([ |
|
281 { incoming: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, |
|
282 { event : { name: 'state', data: ['OPEN'] } }, |
|
283 { event : { name: 'headers', data: [{ ':path': '/' }] } }, |
|
284 |
|
285 { wait : 5 }, |
|
286 { incoming: { type: 'DATA', flags: { }, data: new Buffer(5) } }, |
|
287 { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(10) } }, |
|
288 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, |
|
289 |
|
290 { wait : 5 }, |
|
291 { method : { name: 'headers', arguments: [{ ':status': 200 }] } }, |
|
292 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, |
|
293 |
|
294 { wait : 5 }, |
|
295 { method : { name: 'end', arguments: [payload] } }, |
|
296 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, |
|
297 { event : { name: 'state', data: ['CLOSED'] } }, |
|
298 |
|
299 { active : 0 } |
|
300 ], done); |
|
301 }); |
|
302 }); |
|
303 describe('sending push stream', function() { |
|
304 it('should trigger the appropriate state transitions and outgoing frames', function(done) { |
|
305 var payload = new Buffer(5); |
|
306 var pushStream; |
|
307 |
|
308 execute_sequence([ |
|
309 // receiving request |
|
310 { incoming: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, |
|
311 { event : { name: 'state', data: ['OPEN'] } }, |
|
312 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, |
|
313 { event : { name: 'headers', data: [{ ':path': '/' }] } }, |
|
314 |
|
315 // sending response headers |
|
316 { wait : 5 }, |
|
317 { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, |
|
318 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, |
|
319 |
|
320 // sending push promise |
|
321 { method : { name: 'promise', arguments: [{ ':path': '/' }], ret: function(str) { pushStream = str; } } }, |
|
322 { outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' } } }, |
|
323 |
|
324 // sending response data |
|
325 { method : { name: 'end', arguments: [payload] } }, |
|
326 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, |
|
327 { event : { name: 'state', data: ['CLOSED'] } }, |
|
328 |
|
329 { active : 0 } |
|
330 ], function() { |
|
331 // initial state of the promised stream |
|
332 expect(pushStream.state).to.equal('RESERVED_LOCAL'); |
|
333 |
|
334 execute_sequence(pushStream, [ |
|
335 // push headers |
|
336 { wait : 5 }, |
|
337 { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, |
|
338 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, |
|
339 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, |
|
340 |
|
341 // push data |
|
342 { method : { name: 'end', arguments: [payload] } }, |
|
343 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, |
|
344 { event : { name: 'state', data: ['CLOSED'] } }, |
|
345 |
|
346 { active : 1 } |
|
347 ], done); |
|
348 }); |
|
349 }); |
|
350 }); |
|
351 describe('receiving push stream', function() { |
|
352 it('should trigger the appropriate state transitions and outgoing frames', function(done) { |
|
353 var payload = new Buffer(5); |
|
354 var original_stream = createStream(); |
|
355 var promised_stream = createStream(); |
|
356 |
|
357 done = util.callNTimes(2, done); |
|
358 |
|
359 execute_sequence(original_stream, [ |
|
360 // sending request headers |
|
361 { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, |
|
362 { method : { name: 'end', arguments: [] } }, |
|
363 { outgoing: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, |
|
364 { event : { name: 'state', data: ['OPEN'] } }, |
|
365 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, |
|
366 |
|
367 // receiving response headers |
|
368 { wait : 10 }, |
|
369 { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, |
|
370 { event : { name: 'headers', data: [{ ':status': 200 }] } }, |
|
371 |
|
372 // receiving push promise |
|
373 { incoming: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/2.html' }, promised_stream: promised_stream } }, |
|
374 { event : { name: 'promise', data: [promised_stream, { ':path': '/2.html' }] } }, |
|
375 |
|
376 // receiving response data |
|
377 { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } }, |
|
378 { event : { name: 'state', data: ['CLOSED'] } }, |
|
379 |
|
380 { active : 0 } |
|
381 ], done); |
|
382 |
|
383 execute_sequence(promised_stream, [ |
|
384 // initial state of the promised stream |
|
385 { event : { name: 'state', data: ['RESERVED_REMOTE'] } }, |
|
386 |
|
387 // push headers |
|
388 { wait : 10 }, |
|
389 { incoming: { type: 'HEADERS', flags: { END_STREAM: false }, headers: { ':status': 200 } } }, |
|
390 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, |
|
391 { event : { name: 'headers', data: [{ ':status': 200 }] } }, |
|
392 |
|
393 // push data |
|
394 { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, |
|
395 { event : { name: 'state', data: ['CLOSED'] } }, |
|
396 |
|
397 { active : 0 } |
|
398 ], done); |
|
399 }); |
|
400 }); |
|
401 }); |
|
402 |
|
403 describe('bunyan formatter', function() { |
|
404 describe('`s`', function() { |
|
405 var format = stream.serializers.s; |
|
406 it('should assign a unique ID to each frame', function() { |
|
407 var stream1 = createStream(); |
|
408 var stream2 = createStream(); |
|
409 expect(format(stream1)).to.be.equal(format(stream1)); |
|
410 expect(format(stream2)).to.be.equal(format(stream2)); |
|
411 expect(format(stream1)).to.not.be.equal(format(stream2)); |
|
412 }); |
|
413 }); |
|
414 }); |
|
415 }); |