testing/mochitest/pywebsocket/mod_pywebsocket/_stream_hybi.py

changeset 0
6474c204b198
equal deleted inserted replaced
-1:000000000000 0:63a9ec0be96f
1 # Copyright 2012, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 # * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """This file provides classes and helper functions for parsing/building frames
32 of the WebSocket protocol (RFC 6455).
33
34 Specification:
35 http://tools.ietf.org/html/rfc6455
36 """
37
38
39 from collections import deque
40 import os
41 import struct
42
43 from mod_pywebsocket import common
44 from mod_pywebsocket import util
45 from mod_pywebsocket._stream_base import BadOperationException
46 from mod_pywebsocket._stream_base import ConnectionTerminatedException
47 from mod_pywebsocket._stream_base import InvalidFrameException
48 from mod_pywebsocket._stream_base import InvalidUTF8Exception
49 from mod_pywebsocket._stream_base import StreamBase
50 from mod_pywebsocket._stream_base import UnsupportedFrameException
51
52
53 _NOOP_MASKER = util.NoopMasker()
54
55
56 class Frame(object):
57
58 def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
59 opcode=None, payload=''):
60 self.fin = fin
61 self.rsv1 = rsv1
62 self.rsv2 = rsv2
63 self.rsv3 = rsv3
64 self.opcode = opcode
65 self.payload = payload
66
67
68 # Helper functions made public to be used for writing unittests for WebSocket
69 # clients.
70
71
72 def create_length_header(length, mask):
73 """Creates a length header.
74
75 Args:
76 length: Frame length. Must be less than 2^63.
77 mask: Mask bit. Must be boolean.
78
79 Raises:
80 ValueError: when bad data is given.
81 """
82
83 if mask:
84 mask_bit = 1 << 7
85 else:
86 mask_bit = 0
87
88 if length < 0:
89 raise ValueError('length must be non negative integer')
90 elif length <= 125:
91 return chr(mask_bit | length)
92 elif length < (1 << 16):
93 return chr(mask_bit | 126) + struct.pack('!H', length)
94 elif length < (1 << 63):
95 return chr(mask_bit | 127) + struct.pack('!Q', length)
96 else:
97 raise ValueError('Payload is too big for one frame')
98
99
100 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
101 """Creates a frame header.
102
103 Raises:
104 Exception: when bad data is given.
105 """
106
107 if opcode < 0 or 0xf < opcode:
108 raise ValueError('Opcode out of range')
109
110 if payload_length < 0 or (1 << 63) <= payload_length:
111 raise ValueError('payload_length out of range')
112
113 if (fin | rsv1 | rsv2 | rsv3) & ~1:
114 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
115
116 header = ''
117
118 first_byte = ((fin << 7)
119 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
120 | opcode)
121 header += chr(first_byte)
122 header += create_length_header(payload_length, mask)
123
124 return header
125
126
127 def _build_frame(header, body, mask):
128 if not mask:
129 return header + body
130
131 masking_nonce = os.urandom(4)
132 masker = util.RepeatedXorMasker(masking_nonce)
133
134 return header + masking_nonce + masker.mask(body)
135
136
137 def _filter_and_format_frame_object(frame, mask, frame_filters):
138 for frame_filter in frame_filters:
139 frame_filter.filter(frame)
140
141 header = create_header(
142 frame.opcode, len(frame.payload), frame.fin,
143 frame.rsv1, frame.rsv2, frame.rsv3, mask)
144 return _build_frame(header, frame.payload, mask)
145
146
147 def create_binary_frame(
148 message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
149 """Creates a simple binary frame with no extension, reserved bit."""
150
151 frame = Frame(fin=fin, opcode=opcode, payload=message)
152 return _filter_and_format_frame_object(frame, mask, frame_filters)
153
154
155 def create_text_frame(
156 message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
157 """Creates a simple text frame with no extension, reserved bit."""
158
159 encoded_message = message.encode('utf-8')
160 return create_binary_frame(encoded_message, opcode, fin, mask,
161 frame_filters)
162
163
164 class FragmentedFrameBuilder(object):
165 """A stateful class to send a message as fragments."""
166
167 def __init__(self, mask, frame_filters=[]):
168 """Constructs an instance."""
169
170 self._mask = mask
171 self._frame_filters = frame_filters
172
173 self._started = False
174
175 # Hold opcode of the first frame in messages to verify types of other
176 # frames in the message are all the same.
177 self._opcode = common.OPCODE_TEXT
178
179 def build(self, message, end, binary):
180 if binary:
181 frame_type = common.OPCODE_BINARY
182 else:
183 frame_type = common.OPCODE_TEXT
184 if self._started:
185 if self._opcode != frame_type:
186 raise ValueError('Message types are different in frames for '
187 'the same message')
188 opcode = common.OPCODE_CONTINUATION
189 else:
190 opcode = frame_type
191 self._opcode = frame_type
192
193 if end:
194 self._started = False
195 fin = 1
196 else:
197 self._started = True
198 fin = 0
199
200 if binary:
201 return create_binary_frame(
202 message, opcode, fin, self._mask, self._frame_filters)
203 else:
204 return create_text_frame(
205 message, opcode, fin, self._mask, self._frame_filters)
206
207
208 def _create_control_frame(opcode, body, mask, frame_filters):
209 frame = Frame(opcode=opcode, payload=body)
210
211 return _filter_and_format_frame_object(frame, mask, frame_filters)
212
213
214 def create_ping_frame(body, mask=False, frame_filters=[]):
215 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
216
217
218 def create_pong_frame(body, mask=False, frame_filters=[]):
219 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
220
221
222 def create_close_frame(body, mask=False, frame_filters=[]):
223 return _create_control_frame(
224 common.OPCODE_CLOSE, body, mask, frame_filters)
225
226
227 class StreamOptions(object):
228 """Holds option values to configure Stream objects."""
229
230 def __init__(self):
231 """Constructs StreamOptions."""
232
233 # Enables deflate-stream extension.
234 self.deflate_stream = False
235
236 # Filters applied to frames.
237 self.outgoing_frame_filters = []
238 self.incoming_frame_filters = []
239
240 self.mask_send = False
241 self.unmask_receive = True
242
243
244 class Stream(StreamBase):
245 """A class for parsing/building frames of the WebSocket protocol
246 (RFC 6455).
247 """
248
249 def __init__(self, request, options):
250 """Constructs an instance.
251
252 Args:
253 request: mod_python request.
254 """
255
256 StreamBase.__init__(self, request)
257
258 self._logger = util.get_class_logger(self)
259
260 self._options = options
261
262 if self._options.deflate_stream:
263 self._logger.debug('Setup filter for deflate-stream')
264 self._request = util.DeflateRequest(self._request)
265
266 self._request.client_terminated = False
267 self._request.server_terminated = False
268
269 # Holds body of received fragments.
270 self._received_fragments = []
271 # Holds the opcode of the first fragment.
272 self._original_opcode = None
273
274 self._writer = FragmentedFrameBuilder(
275 self._options.mask_send, self._options.outgoing_frame_filters)
276
277 self._ping_queue = deque()
278
279 def _receive_frame(self):
280 """Receives a frame and return data in the frame as a tuple containing
281 each header field and payload separately.
282
283 Raises:
284 ConnectionTerminatedException: when read returns empty
285 string.
286 InvalidFrameException: when the frame contains invalid data.
287 """
288
289 received = self.receive_bytes(2)
290
291 first_byte = ord(received[0])
292 fin = (first_byte >> 7) & 1
293 rsv1 = (first_byte >> 6) & 1
294 rsv2 = (first_byte >> 5) & 1
295 rsv3 = (first_byte >> 4) & 1
296 opcode = first_byte & 0xf
297
298 second_byte = ord(received[1])
299 mask = (second_byte >> 7) & 1
300 payload_length = second_byte & 0x7f
301
302 if (mask == 1) != self._options.unmask_receive:
303 raise InvalidFrameException(
304 'Mask bit on the received frame did\'nt match masking '
305 'configuration for received frames')
306
307 # The Hybi-13 and later specs disallow putting a value in 0x0-0xFFFF
308 # into the 8-octet extended payload length field (or 0x0-0xFD in
309 # 2-octet field).
310 valid_length_encoding = True
311 length_encoding_bytes = 1
312 if payload_length == 127:
313 extended_payload_length = self.receive_bytes(8)
314 payload_length = struct.unpack(
315 '!Q', extended_payload_length)[0]
316 if payload_length > 0x7FFFFFFFFFFFFFFF:
317 raise InvalidFrameException(
318 'Extended payload length >= 2^63')
319 if self._request.ws_version >= 13 and payload_length < 0x10000:
320 valid_length_encoding = False
321 length_encoding_bytes = 8
322 elif payload_length == 126:
323 extended_payload_length = self.receive_bytes(2)
324 payload_length = struct.unpack(
325 '!H', extended_payload_length)[0]
326 if self._request.ws_version >= 13 and payload_length < 126:
327 valid_length_encoding = False
328 length_encoding_bytes = 2
329
330 if not valid_length_encoding:
331 self._logger.warning(
332 'Payload length is not encoded using the minimal number of '
333 'bytes (%d is encoded using %d bytes)',
334 payload_length,
335 length_encoding_bytes)
336
337 if mask == 1:
338 masking_nonce = self.receive_bytes(4)
339 masker = util.RepeatedXorMasker(masking_nonce)
340 else:
341 masker = _NOOP_MASKER
342
343 bytes = masker.mask(self.receive_bytes(payload_length))
344
345 return opcode, bytes, fin, rsv1, rsv2, rsv3
346
347 def _receive_frame_as_frame_object(self):
348 opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
349
350 return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
351 opcode=opcode, payload=bytes)
352
353 def send_message(self, message, end=True, binary=False):
354 """Send message.
355
356 Args:
357 message: text in unicode or binary in str to send.
358 binary: send message as binary frame.
359
360 Raises:
361 BadOperationException: when called on a server-terminated
362 connection or called with inconsistent message type or
363 binary parameter.
364 """
365
366 if self._request.server_terminated:
367 raise BadOperationException(
368 'Requested send_message after sending out a closing handshake')
369
370 if binary and isinstance(message, unicode):
371 raise BadOperationException(
372 'Message for binary frame must be instance of str')
373
374 try:
375 self._write(self._writer.build(message, end, binary))
376 except ValueError, e:
377 raise BadOperationException(e)
378
379 def receive_message(self):
380 """Receive a WebSocket frame and return its payload as a text in
381 unicode or a binary in str.
382
383 Returns:
384 payload data of the frame
385 - as unicode instance if received text frame
386 - as str instance if received binary frame
387 or None iff received closing handshake.
388 Raises:
389 BadOperationException: when called on a client-terminated
390 connection.
391 ConnectionTerminatedException: when read returns empty
392 string.
393 InvalidFrameException: when the frame contains invalid
394 data.
395 UnsupportedFrameException: when the received frame has
396 flags, opcode we cannot handle. You can ignore this
397 exception and continue receiving the next frame.
398 """
399
400 if self._request.client_terminated:
401 raise BadOperationException(
402 'Requested receive_message after receiving a closing '
403 'handshake')
404
405 while True:
406 # mp_conn.read will block if no bytes are available.
407 # Timeout is controlled by TimeOut directive of Apache.
408
409 frame = self._receive_frame_as_frame_object()
410
411 for frame_filter in self._options.incoming_frame_filters:
412 frame_filter.filter(frame)
413
414 if frame.rsv1 or frame.rsv2 or frame.rsv3:
415 raise UnsupportedFrameException(
416 'Unsupported flag is set (rsv = %d%d%d)' %
417 (frame.rsv1, frame.rsv2, frame.rsv3))
418
419 if frame.opcode == common.OPCODE_CONTINUATION:
420 if not self._received_fragments:
421 if frame.fin:
422 raise InvalidFrameException(
423 'Received a termination frame but fragmentation '
424 'not started')
425 else:
426 raise InvalidFrameException(
427 'Received an intermediate frame but '
428 'fragmentation not started')
429
430 if frame.fin:
431 # End of fragmentation frame
432 self._received_fragments.append(frame.payload)
433 message = ''.join(self._received_fragments)
434 self._received_fragments = []
435 else:
436 # Intermediate frame
437 self._received_fragments.append(frame.payload)
438 continue
439 else:
440 if self._received_fragments:
441 if frame.fin:
442 raise InvalidFrameException(
443 'Received an unfragmented frame without '
444 'terminating existing fragmentation')
445 else:
446 raise InvalidFrameException(
447 'New fragmentation started without terminating '
448 'existing fragmentation')
449
450 if frame.fin:
451 # Unfragmented frame
452
453 if (common.is_control_opcode(frame.opcode) and
454 len(frame.payload) > 125):
455 raise InvalidFrameException(
456 'Application data size of control frames must be '
457 '125 bytes or less')
458
459 self._original_opcode = frame.opcode
460 message = frame.payload
461 else:
462 # Start of fragmentation frame
463
464 if common.is_control_opcode(frame.opcode):
465 raise InvalidFrameException(
466 'Control frames must not be fragmented')
467
468 self._original_opcode = frame.opcode
469 self._received_fragments.append(frame.payload)
470 continue
471
472 if self._original_opcode == common.OPCODE_TEXT:
473 # The WebSocket protocol section 4.4 specifies that invalid
474 # characters must be replaced with U+fffd REPLACEMENT
475 # CHARACTER.
476 try:
477 return message.decode('utf-8')
478 except UnicodeDecodeError, e:
479 raise InvalidUTF8Exception(e)
480 elif self._original_opcode == common.OPCODE_BINARY:
481 return message
482 elif self._original_opcode == common.OPCODE_CLOSE:
483 self._request.client_terminated = True
484
485 # Status code is optional. We can have status reason only if we
486 # have status code. Status reason can be empty string. So,
487 # allowed cases are
488 # - no application data: no code no reason
489 # - 2 octet of application data: has code but no reason
490 # - 3 or more octet of application data: both code and reason
491 if len(message) == 0:
492 self._logger.debug('Received close frame (empty body)')
493 self._request.ws_close_code = (
494 common.STATUS_NO_STATUS_RECEIVED)
495 elif len(message) == 1:
496 raise InvalidFrameException(
497 'If a close frame has status code, the length of '
498 'status code must be 2 octet')
499 elif len(message) >= 2:
500 self._request.ws_close_code = struct.unpack(
501 '!H', message[0:2])[0]
502 self._request.ws_close_reason = message[2:].decode(
503 'utf-8', 'replace')
504 self._logger.debug(
505 'Received close frame (code=%d, reason=%r)',
506 self._request.ws_close_code,
507 self._request.ws_close_reason)
508
509 # Drain junk data after the close frame if necessary.
510 self._drain_received_data()
511
512 if self._request.server_terminated:
513 self._logger.debug(
514 'Received ack for server-initiated closing handshake')
515 return None
516
517 self._logger.debug(
518 'Received client-initiated closing handshake')
519
520 code = common.STATUS_NORMAL_CLOSURE
521 reason = ''
522 if hasattr(self._request, '_dispatcher'):
523 dispatcher = self._request._dispatcher
524 code, reason = dispatcher.passive_closing_handshake(
525 self._request)
526 if code is None and reason is not None and len(reason) > 0:
527 self._logger.warning(
528 'Handler specified reason despite code being None')
529 reason = ''
530 if reason is None:
531 reason = ''
532 self._send_closing_handshake(code, reason)
533 self._logger.debug(
534 'Sent ack for client-initiated closing handshake '
535 '(code=%r, reason=%r)', code, reason)
536 return None
537 elif self._original_opcode == common.OPCODE_PING:
538 try:
539 handler = self._request.on_ping_handler
540 if handler:
541 handler(self._request, message)
542 continue
543 except AttributeError, e:
544 pass
545 self._send_pong(message)
546 elif self._original_opcode == common.OPCODE_PONG:
547 # TODO(tyoshino): Add ping timeout handling.
548
549 inflight_pings = deque()
550
551 while True:
552 try:
553 expected_body = self._ping_queue.popleft()
554 if expected_body == message:
555 # inflight_pings contains pings ignored by the
556 # other peer. Just forget them.
557 self._logger.debug(
558 'Ping %r is acked (%d pings were ignored)',
559 expected_body, len(inflight_pings))
560 break
561 else:
562 inflight_pings.append(expected_body)
563 except IndexError, e:
564 # The received pong was unsolicited pong. Keep the
565 # ping queue as is.
566 self._ping_queue = inflight_pings
567 self._logger.debug('Received a unsolicited pong')
568 break
569
570 try:
571 handler = self._request.on_pong_handler
572 if handler:
573 handler(self._request, message)
574 continue
575 except AttributeError, e:
576 pass
577
578 continue
579 else:
580 raise UnsupportedFrameException(
581 'Opcode %d is not supported' % self._original_opcode)
582
583 def _send_closing_handshake(self, code, reason):
584 body = ''
585 if code is not None:
586 if code >= (1 << 16) or code < 0:
587 raise BadOperationException('Status code is out of range')
588 encoded_reason = reason.encode('utf-8')
589 if len(encoded_reason) + 2 > 125:
590 raise BadOperationException(
591 'Application data size of close frames must be 125 bytes '
592 'or less')
593 body = struct.pack('!H', code) + encoded_reason
594
595 frame = create_close_frame(
596 body,
597 self._options.mask_send,
598 self._options.outgoing_frame_filters)
599
600 self._request.server_terminated = True
601
602 self._write(frame)
603
604 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
605 """Closes a WebSocket connection.
606
607 Args:
608 code: Status code for close frame. If code is None, a close
609 frame with empty body will be sent.
610 reason: string representing close reason.
611 Raises:
612 BadOperationException: when reason is specified with code None
613 or reason is not an instance of both str and unicode.
614 """
615
616 if self._request.server_terminated:
617 self._logger.debug(
618 'Requested close_connection but server is already terminated')
619 return
620
621 if code is None:
622 if reason is not None and len(reason) > 0:
623 raise BadOperationException(
624 'close reason must not be specified if code is None')
625 reason = ''
626 else:
627 if not isinstance(reason, str) and not isinstance(reason, unicode):
628 raise BadOperationException(
629 'close reason must be an instance of str or unicode')
630
631 self._send_closing_handshake(code, reason)
632 self._logger.debug(
633 'Sent server-initiated closing handshake (code=%r, reason=%r)',
634 code, reason)
635
636 if (code == common.STATUS_GOING_AWAY or
637 code == common.STATUS_PROTOCOL_ERROR):
638 # It doesn't make sense to wait for a close frame if the reason is
639 # protocol error or that the server is going away. For some of
640 # other reasons, it might not make sense to wait for a close frame,
641 # but it's not clear, yet.
642 return
643
644 # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
645 # or until a server-defined timeout expires.
646 #
647 # For now, we expect receiving closing handshake right after sending
648 # out closing handshake.
649 message = self.receive_message()
650 if message is not None:
651 raise ConnectionTerminatedException(
652 'Didn\'t receive valid ack for closing handshake')
653 # TODO: 3. close the WebSocket connection.
654 # note: mod_python Connection (mp_conn) doesn't have close method.
655
656 def send_ping(self, body=''):
657 if len(body) > 125:
658 raise ValueError(
659 'Application data size of control frames must be 125 bytes or '
660 'less')
661 frame = create_ping_frame(
662 body,
663 self._options.mask_send,
664 self._options.outgoing_frame_filters)
665 self._write(frame)
666
667 self._ping_queue.append(body)
668
669 def _send_pong(self, body):
670 if len(body) > 125:
671 raise ValueError(
672 'Application data size of control frames must be 125 bytes or '
673 'less')
674 frame = create_pong_frame(
675 body,
676 self._options.mask_send,
677 self._options.outgoing_frame_filters)
678 self._write(frame)
679
680 def _drain_received_data(self):
681 """Drains unread data in the receive buffer to avoid sending out TCP
682 RST packet. This is because when deflate-stream is enabled, some
683 DEFLATE block for flushing data may follow a close frame. If any data
684 remains in the receive buffer of a socket when the socket is closed,
685 it sends out TCP RST packet to the other peer.
686
687 Since mod_python's mp_conn object doesn't support non-blocking read,
688 we perform this only when pywebsocket is running in standalone mode.
689 """
690
691 # If self._options.deflate_stream is true, self._request is
692 # DeflateRequest, so we can get wrapped request object by
693 # self._request._request.
694 #
695 # Only _StandaloneRequest has _drain_received_data method.
696 if (self._options.deflate_stream and
697 ('_drain_received_data' in dir(self._request._request))):
698 self._request._request._drain_received_data()
699
700
701 # vi:sts=4 sw=4 et

mercurial