testing/xpcshell/node-spdy/lib/spdy/parser.js

changeset 1
ca08bd8f51b2
equal deleted inserted replaced
-1:000000000000 0:ba80d314b628
1 var parser = exports;
2
3 var spdy = require('../spdy'),
4 util = require('util'),
5 stream = require('stream'),
6 Buffer = require('buffer').Buffer;
7
8 var legacy = !stream.Duplex;
9
10 if (legacy) {
11 var DuplexStream = stream;
12 } else {
13 var DuplexStream = stream.Duplex;
14 }
15
16 //
17 // ### function Parser (connection)
18 // #### @connection {spdy.Connection} connection
19 // SPDY protocol frames parser's @constructor
20 //
21 function Parser(connection) {
22 DuplexStream.call(this);
23
24 this.drained = true;
25 this.paused = false;
26 this.buffer = [];
27 this.buffered = 0;
28 this.waiting = 8;
29
30 this.state = { type: 'frame-head' };
31 this.socket = connection.socket;
32 this.connection = connection;
33 this.framer = null;
34
35 this.connection = connection;
36
37 if (legacy) {
38 this.readable = this.writable = true;
39 }
40 }
41 util.inherits(Parser, DuplexStream);
42
43 //
44 // ### function create (connection)
45 // #### @connection {spdy.Connection} connection
46 // @constructor wrapper
47 //
48 parser.create = function create(connection) {
49 return new Parser(connection);
50 };
51
52 //
53 // ### function destroy ()
54 // Just a stub.
55 //
56 Parser.prototype.destroy = function destroy() {
57 };
58
59 //
60 // ### function _write (data, encoding, cb)
61 // #### @data {Buffer} chunk of data
62 // #### @encoding {Null} encoding
63 // #### @cb {Function} callback
64 // Writes or buffers data to parser
65 //
66 Parser.prototype._write = function write(data, encoding, cb) {
67 // Legacy compatibility
68 if (!cb) cb = function() {};
69
70 if (data !== undefined) {
71 // Buffer data
72 this.buffer.push(data);
73 this.buffered += data.length;
74 }
75
76 // Notify caller about state (for piping)
77 if (this.paused) return false;
78
79 // We shall not do anything until we get all expected data
80 if (this.buffered < this.waiting) return cb();
81
82 // Mark parser as not drained
83 if (data !== undefined) this.drained = false;
84
85 var self = this,
86 buffer = new Buffer(this.waiting),
87 sliced = 0,
88 offset = 0;
89
90 while (this.waiting > offset && sliced < this.buffer.length) {
91 var chunk = this.buffer[sliced++],
92 overmatched = false;
93
94 // Copy chunk into `buffer`
95 if (chunk.length > this.waiting - offset) {
96 chunk.copy(buffer, offset, 0, this.waiting - offset);
97
98 this.buffer[--sliced] = chunk.slice(this.waiting - offset);
99 this.buffered += this.buffer[sliced].length;
100
101 overmatched = true;
102 } else {
103 chunk.copy(buffer, offset);
104 }
105
106 // Move offset and decrease amount of buffered data
107 offset += chunk.length;
108 this.buffered -= chunk.length;
109
110 if (overmatched) break;
111 }
112
113 // Remove used buffers
114 this.buffer = this.buffer.slice(sliced);
115
116 // Executed parser for buffered data
117 this.paused = true;
118 this.execute(this.state, buffer, function (err, waiting) {
119 // And unpause once execution finished
120 self.paused = false;
121
122 // Propagate errors
123 if (err) {
124 cb();
125 return self.emit('error', err);
126 }
127
128 // Set new `waiting`
129 self.waiting = waiting;
130
131 if (self.waiting <= self.buffered) {
132 self._write(undefined, null, cb);
133 } else {
134 process.nextTick(function() {
135 if (self.drained) return;
136
137 // Mark parser as drained
138 self.drained = true;
139 self.emit('drain');
140 });
141
142 cb();
143 }
144 });
145 };
146
147 if (legacy) {
148 //
149 // ### function write (data, encoding, cb)
150 // #### @data {Buffer} chunk of data
151 // #### @encoding {Null} encoding
152 // #### @cb {Function} callback
153 // Legacy method
154 //
155 Parser.prototype.write = Parser.prototype._write;
156
157 //
158 // ### function end ()
159 // Stream's end() implementation
160 //
161 Parser.prototype.end = function end() {
162 this.emit('end');
163 };
164 }
165
166 //
167 // ### function createFramer (version)
168 // #### @version {Number} Protocol version, either 2 or 3
169 // Sets framer instance on Parser's instance
170 //
171 Parser.prototype.createFramer = function createFramer(version) {
172 if (spdy.protocol[version]) {
173 this.emit('version', version);
174
175 this.framer = new spdy.protocol[version].Framer(
176 spdy.utils.zwrap(this.connection._deflate),
177 spdy.utils.zwrap(this.connection._inflate)
178 );
179
180 // Propagate framer to connection
181 this.connection._framer = this.framer;
182 this.emit('framer', this.framer);
183 } else {
184 this.emit(
185 'error',
186 new Error('Unknown protocol version requested: ' + version)
187 );
188 }
189 };
190
191 //
192 // ### function execute (state, data, callback)
193 // #### @state {Object} Parser's state
194 // #### @data {Buffer} Incoming data
195 // #### @callback {Function} continuation callback
196 // Parse buffered data
197 //
198 Parser.prototype.execute = function execute(state, data, callback) {
199 if (state.type === 'frame-head') {
200 var header = state.header = spdy.protocol.generic.parseHeader(data);
201
202 // Lazily create framer
203 if (!this.framer && header.control) {
204 this.createFramer(header.version);
205 }
206
207 state.type = 'frame-body';
208 callback(null, header.length);
209 } else if (state.type === 'frame-body') {
210 var self = this;
211
212 // Data frame
213 if (!state.header.control) {
214 return onFrame(null, {
215 type: 'DATA',
216 id: state.header.id,
217 fin: (state.header.flags & 0x01) === 0x01,
218 compressed: (state.header.flags & 0x02) === 0x02,
219 data: data
220 });
221 } else {
222 // Control frame
223 this.framer.execute(state.header, data, onFrame);
224 }
225
226 function onFrame(err, frame) {
227 if (err) return callback(err);
228
229 self.emit('frame', frame);
230
231 state.type = 'frame-head';
232 callback(null, 8);
233 };
234 }
235 };

mercurial