|
1 var parser = exports; |
|
2 |
|
3 var spdy = require('../spdy'), |
|
4 util = require('util'), |
|
5 stream = require('stream'), |
|
6 Buffer = require('buffer').Buffer; |
|
7 |
|
8 var legacy = !stream.Duplex; |
|
9 |
|
10 if (legacy) { |
|
11 var DuplexStream = stream; |
|
12 } else { |
|
13 var DuplexStream = stream.Duplex; |
|
14 } |
|
15 |
|
16 // |
|
17 // ### function Parser (connection) |
|
18 // #### @connection {spdy.Connection} connection |
|
19 // SPDY protocol frames parser's @constructor |
|
20 // |
|
21 function Parser(connection) { |
|
22 DuplexStream.call(this); |
|
23 |
|
24 this.drained = true; |
|
25 this.paused = false; |
|
26 this.buffer = []; |
|
27 this.buffered = 0; |
|
28 this.waiting = 8; |
|
29 |
|
30 this.state = { type: 'frame-head' }; |
|
31 this.socket = connection.socket; |
|
32 this.connection = connection; |
|
33 this.framer = null; |
|
34 |
|
35 this.connection = connection; |
|
36 |
|
37 if (legacy) { |
|
38 this.readable = this.writable = true; |
|
39 } |
|
40 } |
|
41 util.inherits(Parser, DuplexStream); |
|
42 |
|
43 // |
|
44 // ### function create (connection) |
|
45 // #### @connection {spdy.Connection} connection |
|
46 // @constructor wrapper |
|
47 // |
|
48 parser.create = function create(connection) { |
|
49 return new Parser(connection); |
|
50 }; |
|
51 |
|
52 // |
|
53 // ### function destroy () |
|
54 // Just a stub. |
|
55 // |
|
56 Parser.prototype.destroy = function destroy() { |
|
57 }; |
|
58 |
|
59 // |
|
60 // ### function _write (data, encoding, cb) |
|
61 // #### @data {Buffer} chunk of data |
|
62 // #### @encoding {Null} encoding |
|
63 // #### @cb {Function} callback |
|
64 // Writes or buffers data to parser |
|
65 // |
|
66 Parser.prototype._write = function write(data, encoding, cb) { |
|
67 // Legacy compatibility |
|
68 if (!cb) cb = function() {}; |
|
69 |
|
70 if (data !== undefined) { |
|
71 // Buffer data |
|
72 this.buffer.push(data); |
|
73 this.buffered += data.length; |
|
74 } |
|
75 |
|
76 // Notify caller about state (for piping) |
|
77 if (this.paused) return false; |
|
78 |
|
79 // We shall not do anything until we get all expected data |
|
80 if (this.buffered < this.waiting) return cb(); |
|
81 |
|
82 // Mark parser as not drained |
|
83 if (data !== undefined) this.drained = false; |
|
84 |
|
85 var self = this, |
|
86 buffer = new Buffer(this.waiting), |
|
87 sliced = 0, |
|
88 offset = 0; |
|
89 |
|
90 while (this.waiting > offset && sliced < this.buffer.length) { |
|
91 var chunk = this.buffer[sliced++], |
|
92 overmatched = false; |
|
93 |
|
94 // Copy chunk into `buffer` |
|
95 if (chunk.length > this.waiting - offset) { |
|
96 chunk.copy(buffer, offset, 0, this.waiting - offset); |
|
97 |
|
98 this.buffer[--sliced] = chunk.slice(this.waiting - offset); |
|
99 this.buffered += this.buffer[sliced].length; |
|
100 |
|
101 overmatched = true; |
|
102 } else { |
|
103 chunk.copy(buffer, offset); |
|
104 } |
|
105 |
|
106 // Move offset and decrease amount of buffered data |
|
107 offset += chunk.length; |
|
108 this.buffered -= chunk.length; |
|
109 |
|
110 if (overmatched) break; |
|
111 } |
|
112 |
|
113 // Remove used buffers |
|
114 this.buffer = this.buffer.slice(sliced); |
|
115 |
|
116 // Executed parser for buffered data |
|
117 this.paused = true; |
|
118 this.execute(this.state, buffer, function (err, waiting) { |
|
119 // And unpause once execution finished |
|
120 self.paused = false; |
|
121 |
|
122 // Propagate errors |
|
123 if (err) { |
|
124 cb(); |
|
125 return self.emit('error', err); |
|
126 } |
|
127 |
|
128 // Set new `waiting` |
|
129 self.waiting = waiting; |
|
130 |
|
131 if (self.waiting <= self.buffered) { |
|
132 self._write(undefined, null, cb); |
|
133 } else { |
|
134 process.nextTick(function() { |
|
135 if (self.drained) return; |
|
136 |
|
137 // Mark parser as drained |
|
138 self.drained = true; |
|
139 self.emit('drain'); |
|
140 }); |
|
141 |
|
142 cb(); |
|
143 } |
|
144 }); |
|
145 }; |
|
146 |
|
147 if (legacy) { |
|
148 // |
|
149 // ### function write (data, encoding, cb) |
|
150 // #### @data {Buffer} chunk of data |
|
151 // #### @encoding {Null} encoding |
|
152 // #### @cb {Function} callback |
|
153 // Legacy method |
|
154 // |
|
155 Parser.prototype.write = Parser.prototype._write; |
|
156 |
|
157 // |
|
158 // ### function end () |
|
159 // Stream's end() implementation |
|
160 // |
|
161 Parser.prototype.end = function end() { |
|
162 this.emit('end'); |
|
163 }; |
|
164 } |
|
165 |
|
166 // |
|
167 // ### function createFramer (version) |
|
168 // #### @version {Number} Protocol version, either 2 or 3 |
|
169 // Sets framer instance on Parser's instance |
|
170 // |
|
171 Parser.prototype.createFramer = function createFramer(version) { |
|
172 if (spdy.protocol[version]) { |
|
173 this.emit('version', version); |
|
174 |
|
175 this.framer = new spdy.protocol[version].Framer( |
|
176 spdy.utils.zwrap(this.connection._deflate), |
|
177 spdy.utils.zwrap(this.connection._inflate) |
|
178 ); |
|
179 |
|
180 // Propagate framer to connection |
|
181 this.connection._framer = this.framer; |
|
182 this.emit('framer', this.framer); |
|
183 } else { |
|
184 this.emit( |
|
185 'error', |
|
186 new Error('Unknown protocol version requested: ' + version) |
|
187 ); |
|
188 } |
|
189 }; |
|
190 |
|
191 // |
|
192 // ### function execute (state, data, callback) |
|
193 // #### @state {Object} Parser's state |
|
194 // #### @data {Buffer} Incoming data |
|
195 // #### @callback {Function} continuation callback |
|
196 // Parse buffered data |
|
197 // |
|
198 Parser.prototype.execute = function execute(state, data, callback) { |
|
199 if (state.type === 'frame-head') { |
|
200 var header = state.header = spdy.protocol.generic.parseHeader(data); |
|
201 |
|
202 // Lazily create framer |
|
203 if (!this.framer && header.control) { |
|
204 this.createFramer(header.version); |
|
205 } |
|
206 |
|
207 state.type = 'frame-body'; |
|
208 callback(null, header.length); |
|
209 } else if (state.type === 'frame-body') { |
|
210 var self = this; |
|
211 |
|
212 // Data frame |
|
213 if (!state.header.control) { |
|
214 return onFrame(null, { |
|
215 type: 'DATA', |
|
216 id: state.header.id, |
|
217 fin: (state.header.flags & 0x01) === 0x01, |
|
218 compressed: (state.header.flags & 0x02) === 0x02, |
|
219 data: data |
|
220 }); |
|
221 } else { |
|
222 // Control frame |
|
223 this.framer.execute(state.header, data, onFrame); |
|
224 } |
|
225 |
|
226 function onFrame(err, frame) { |
|
227 if (err) return callback(err); |
|
228 |
|
229 self.emit('frame', frame); |
|
230 |
|
231 state.type = 'frame-head'; |
|
232 callback(null, 8); |
|
233 }; |
|
234 } |
|
235 }; |