testing/mochitest/pywebsocket/mod_pywebsocket/_stream_hybi.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 # Copyright 2012, Google Inc.
michael@0 2 # All rights reserved.
michael@0 3 #
michael@0 4 # Redistribution and use in source and binary forms, with or without
michael@0 5 # modification, are permitted provided that the following conditions are
michael@0 6 # met:
michael@0 7 #
michael@0 8 # * Redistributions of source code must retain the above copyright
michael@0 9 # notice, this list of conditions and the following disclaimer.
michael@0 10 # * Redistributions in binary form must reproduce the above
michael@0 11 # copyright notice, this list of conditions and the following disclaimer
michael@0 12 # in the documentation and/or other materials provided with the
michael@0 13 # distribution.
michael@0 14 # * Neither the name of Google Inc. nor the names of its
michael@0 15 # contributors may be used to endorse or promote products derived from
michael@0 16 # this software without specific prior written permission.
michael@0 17 #
michael@0 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
michael@0 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
michael@0 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
michael@0 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
michael@0 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
michael@0 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
michael@0 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
michael@0 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
michael@0 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
michael@0 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
michael@0 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
michael@0 29
michael@0 30
michael@0 31 """This file provides classes and helper functions for parsing/building frames
michael@0 32 of the WebSocket protocol (RFC 6455).
michael@0 33
michael@0 34 Specification:
michael@0 35 http://tools.ietf.org/html/rfc6455
michael@0 36 """
michael@0 37
michael@0 38
michael@0 39 from collections import deque
michael@0 40 import os
michael@0 41 import struct
michael@0 42
michael@0 43 from mod_pywebsocket import common
michael@0 44 from mod_pywebsocket import util
michael@0 45 from mod_pywebsocket._stream_base import BadOperationException
michael@0 46 from mod_pywebsocket._stream_base import ConnectionTerminatedException
michael@0 47 from mod_pywebsocket._stream_base import InvalidFrameException
michael@0 48 from mod_pywebsocket._stream_base import InvalidUTF8Exception
michael@0 49 from mod_pywebsocket._stream_base import StreamBase
michael@0 50 from mod_pywebsocket._stream_base import UnsupportedFrameException
michael@0 51
michael@0 52
michael@0 53 _NOOP_MASKER = util.NoopMasker()
michael@0 54
michael@0 55
michael@0 56 class Frame(object):
michael@0 57
michael@0 58 def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
michael@0 59 opcode=None, payload=''):
michael@0 60 self.fin = fin
michael@0 61 self.rsv1 = rsv1
michael@0 62 self.rsv2 = rsv2
michael@0 63 self.rsv3 = rsv3
michael@0 64 self.opcode = opcode
michael@0 65 self.payload = payload
michael@0 66
michael@0 67
michael@0 68 # Helper functions made public to be used for writing unittests for WebSocket
michael@0 69 # clients.
michael@0 70
michael@0 71
michael@0 72 def create_length_header(length, mask):
michael@0 73 """Creates a length header.
michael@0 74
michael@0 75 Args:
michael@0 76 length: Frame length. Must be less than 2^63.
michael@0 77 mask: Mask bit. Must be boolean.
michael@0 78
michael@0 79 Raises:
michael@0 80 ValueError: when bad data is given.
michael@0 81 """
michael@0 82
michael@0 83 if mask:
michael@0 84 mask_bit = 1 << 7
michael@0 85 else:
michael@0 86 mask_bit = 0
michael@0 87
michael@0 88 if length < 0:
michael@0 89 raise ValueError('length must be non negative integer')
michael@0 90 elif length <= 125:
michael@0 91 return chr(mask_bit | length)
michael@0 92 elif length < (1 << 16):
michael@0 93 return chr(mask_bit | 126) + struct.pack('!H', length)
michael@0 94 elif length < (1 << 63):
michael@0 95 return chr(mask_bit | 127) + struct.pack('!Q', length)
michael@0 96 else:
michael@0 97 raise ValueError('Payload is too big for one frame')
michael@0 98
michael@0 99
michael@0 100 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
michael@0 101 """Creates a frame header.
michael@0 102
michael@0 103 Raises:
michael@0 104 Exception: when bad data is given.
michael@0 105 """
michael@0 106
michael@0 107 if opcode < 0 or 0xf < opcode:
michael@0 108 raise ValueError('Opcode out of range')
michael@0 109
michael@0 110 if payload_length < 0 or (1 << 63) <= payload_length:
michael@0 111 raise ValueError('payload_length out of range')
michael@0 112
michael@0 113 if (fin | rsv1 | rsv2 | rsv3) & ~1:
michael@0 114 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
michael@0 115
michael@0 116 header = ''
michael@0 117
michael@0 118 first_byte = ((fin << 7)
michael@0 119 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
michael@0 120 | opcode)
michael@0 121 header += chr(first_byte)
michael@0 122 header += create_length_header(payload_length, mask)
michael@0 123
michael@0 124 return header
michael@0 125
michael@0 126
michael@0 127 def _build_frame(header, body, mask):
michael@0 128 if not mask:
michael@0 129 return header + body
michael@0 130
michael@0 131 masking_nonce = os.urandom(4)
michael@0 132 masker = util.RepeatedXorMasker(masking_nonce)
michael@0 133
michael@0 134 return header + masking_nonce + masker.mask(body)
michael@0 135
michael@0 136
michael@0 137 def _filter_and_format_frame_object(frame, mask, frame_filters):
michael@0 138 for frame_filter in frame_filters:
michael@0 139 frame_filter.filter(frame)
michael@0 140
michael@0 141 header = create_header(
michael@0 142 frame.opcode, len(frame.payload), frame.fin,
michael@0 143 frame.rsv1, frame.rsv2, frame.rsv3, mask)
michael@0 144 return _build_frame(header, frame.payload, mask)
michael@0 145
michael@0 146
michael@0 147 def create_binary_frame(
michael@0 148 message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
michael@0 149 """Creates a simple binary frame with no extension, reserved bit."""
michael@0 150
michael@0 151 frame = Frame(fin=fin, opcode=opcode, payload=message)
michael@0 152 return _filter_and_format_frame_object(frame, mask, frame_filters)
michael@0 153
michael@0 154
michael@0 155 def create_text_frame(
michael@0 156 message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
michael@0 157 """Creates a simple text frame with no extension, reserved bit."""
michael@0 158
michael@0 159 encoded_message = message.encode('utf-8')
michael@0 160 return create_binary_frame(encoded_message, opcode, fin, mask,
michael@0 161 frame_filters)
michael@0 162
michael@0 163
michael@0 164 class FragmentedFrameBuilder(object):
michael@0 165 """A stateful class to send a message as fragments."""
michael@0 166
michael@0 167 def __init__(self, mask, frame_filters=[]):
michael@0 168 """Constructs an instance."""
michael@0 169
michael@0 170 self._mask = mask
michael@0 171 self._frame_filters = frame_filters
michael@0 172
michael@0 173 self._started = False
michael@0 174
michael@0 175 # Hold opcode of the first frame in messages to verify types of other
michael@0 176 # frames in the message are all the same.
michael@0 177 self._opcode = common.OPCODE_TEXT
michael@0 178
michael@0 179 def build(self, message, end, binary):
michael@0 180 if binary:
michael@0 181 frame_type = common.OPCODE_BINARY
michael@0 182 else:
michael@0 183 frame_type = common.OPCODE_TEXT
michael@0 184 if self._started:
michael@0 185 if self._opcode != frame_type:
michael@0 186 raise ValueError('Message types are different in frames for '
michael@0 187 'the same message')
michael@0 188 opcode = common.OPCODE_CONTINUATION
michael@0 189 else:
michael@0 190 opcode = frame_type
michael@0 191 self._opcode = frame_type
michael@0 192
michael@0 193 if end:
michael@0 194 self._started = False
michael@0 195 fin = 1
michael@0 196 else:
michael@0 197 self._started = True
michael@0 198 fin = 0
michael@0 199
michael@0 200 if binary:
michael@0 201 return create_binary_frame(
michael@0 202 message, opcode, fin, self._mask, self._frame_filters)
michael@0 203 else:
michael@0 204 return create_text_frame(
michael@0 205 message, opcode, fin, self._mask, self._frame_filters)
michael@0 206
michael@0 207
michael@0 208 def _create_control_frame(opcode, body, mask, frame_filters):
michael@0 209 frame = Frame(opcode=opcode, payload=body)
michael@0 210
michael@0 211 return _filter_and_format_frame_object(frame, mask, frame_filters)
michael@0 212
michael@0 213
michael@0 214 def create_ping_frame(body, mask=False, frame_filters=[]):
michael@0 215 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
michael@0 216
michael@0 217
michael@0 218 def create_pong_frame(body, mask=False, frame_filters=[]):
michael@0 219 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
michael@0 220
michael@0 221
michael@0 222 def create_close_frame(body, mask=False, frame_filters=[]):
michael@0 223 return _create_control_frame(
michael@0 224 common.OPCODE_CLOSE, body, mask, frame_filters)
michael@0 225
michael@0 226
michael@0 227 class StreamOptions(object):
michael@0 228 """Holds option values to configure Stream objects."""
michael@0 229
michael@0 230 def __init__(self):
michael@0 231 """Constructs StreamOptions."""
michael@0 232
michael@0 233 # Enables deflate-stream extension.
michael@0 234 self.deflate_stream = False
michael@0 235
michael@0 236 # Filters applied to frames.
michael@0 237 self.outgoing_frame_filters = []
michael@0 238 self.incoming_frame_filters = []
michael@0 239
michael@0 240 self.mask_send = False
michael@0 241 self.unmask_receive = True
michael@0 242
michael@0 243
michael@0 244 class Stream(StreamBase):
michael@0 245 """A class for parsing/building frames of the WebSocket protocol
michael@0 246 (RFC 6455).
michael@0 247 """
michael@0 248
michael@0 249 def __init__(self, request, options):
michael@0 250 """Constructs an instance.
michael@0 251
michael@0 252 Args:
michael@0 253 request: mod_python request.
michael@0 254 """
michael@0 255
michael@0 256 StreamBase.__init__(self, request)
michael@0 257
michael@0 258 self._logger = util.get_class_logger(self)
michael@0 259
michael@0 260 self._options = options
michael@0 261
michael@0 262 if self._options.deflate_stream:
michael@0 263 self._logger.debug('Setup filter for deflate-stream')
michael@0 264 self._request = util.DeflateRequest(self._request)
michael@0 265
michael@0 266 self._request.client_terminated = False
michael@0 267 self._request.server_terminated = False
michael@0 268
michael@0 269 # Holds body of received fragments.
michael@0 270 self._received_fragments = []
michael@0 271 # Holds the opcode of the first fragment.
michael@0 272 self._original_opcode = None
michael@0 273
michael@0 274 self._writer = FragmentedFrameBuilder(
michael@0 275 self._options.mask_send, self._options.outgoing_frame_filters)
michael@0 276
michael@0 277 self._ping_queue = deque()
michael@0 278
michael@0 279 def _receive_frame(self):
michael@0 280 """Receives a frame and return data in the frame as a tuple containing
michael@0 281 each header field and payload separately.
michael@0 282
michael@0 283 Raises:
michael@0 284 ConnectionTerminatedException: when read returns empty
michael@0 285 string.
michael@0 286 InvalidFrameException: when the frame contains invalid data.
michael@0 287 """
michael@0 288
michael@0 289 received = self.receive_bytes(2)
michael@0 290
michael@0 291 first_byte = ord(received[0])
michael@0 292 fin = (first_byte >> 7) & 1
michael@0 293 rsv1 = (first_byte >> 6) & 1
michael@0 294 rsv2 = (first_byte >> 5) & 1
michael@0 295 rsv3 = (first_byte >> 4) & 1
michael@0 296 opcode = first_byte & 0xf
michael@0 297
michael@0 298 second_byte = ord(received[1])
michael@0 299 mask = (second_byte >> 7) & 1
michael@0 300 payload_length = second_byte & 0x7f
michael@0 301
michael@0 302 if (mask == 1) != self._options.unmask_receive:
michael@0 303 raise InvalidFrameException(
michael@0 304 'Mask bit on the received frame did\'nt match masking '
michael@0 305 'configuration for received frames')
michael@0 306
michael@0 307 # The Hybi-13 and later specs disallow putting a value in 0x0-0xFFFF
michael@0 308 # into the 8-octet extended payload length field (or 0x0-0xFD in
michael@0 309 # 2-octet field).
michael@0 310 valid_length_encoding = True
michael@0 311 length_encoding_bytes = 1
michael@0 312 if payload_length == 127:
michael@0 313 extended_payload_length = self.receive_bytes(8)
michael@0 314 payload_length = struct.unpack(
michael@0 315 '!Q', extended_payload_length)[0]
michael@0 316 if payload_length > 0x7FFFFFFFFFFFFFFF:
michael@0 317 raise InvalidFrameException(
michael@0 318 'Extended payload length >= 2^63')
michael@0 319 if self._request.ws_version >= 13 and payload_length < 0x10000:
michael@0 320 valid_length_encoding = False
michael@0 321 length_encoding_bytes = 8
michael@0 322 elif payload_length == 126:
michael@0 323 extended_payload_length = self.receive_bytes(2)
michael@0 324 payload_length = struct.unpack(
michael@0 325 '!H', extended_payload_length)[0]
michael@0 326 if self._request.ws_version >= 13 and payload_length < 126:
michael@0 327 valid_length_encoding = False
michael@0 328 length_encoding_bytes = 2
michael@0 329
michael@0 330 if not valid_length_encoding:
michael@0 331 self._logger.warning(
michael@0 332 'Payload length is not encoded using the minimal number of '
michael@0 333 'bytes (%d is encoded using %d bytes)',
michael@0 334 payload_length,
michael@0 335 length_encoding_bytes)
michael@0 336
michael@0 337 if mask == 1:
michael@0 338 masking_nonce = self.receive_bytes(4)
michael@0 339 masker = util.RepeatedXorMasker(masking_nonce)
michael@0 340 else:
michael@0 341 masker = _NOOP_MASKER
michael@0 342
michael@0 343 bytes = masker.mask(self.receive_bytes(payload_length))
michael@0 344
michael@0 345 return opcode, bytes, fin, rsv1, rsv2, rsv3
michael@0 346
michael@0 347 def _receive_frame_as_frame_object(self):
michael@0 348 opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
michael@0 349
michael@0 350 return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
michael@0 351 opcode=opcode, payload=bytes)
michael@0 352
michael@0 353 def send_message(self, message, end=True, binary=False):
michael@0 354 """Send message.
michael@0 355
michael@0 356 Args:
michael@0 357 message: text in unicode or binary in str to send.
michael@0 358 binary: send message as binary frame.
michael@0 359
michael@0 360 Raises:
michael@0 361 BadOperationException: when called on a server-terminated
michael@0 362 connection or called with inconsistent message type or
michael@0 363 binary parameter.
michael@0 364 """
michael@0 365
michael@0 366 if self._request.server_terminated:
michael@0 367 raise BadOperationException(
michael@0 368 'Requested send_message after sending out a closing handshake')
michael@0 369
michael@0 370 if binary and isinstance(message, unicode):
michael@0 371 raise BadOperationException(
michael@0 372 'Message for binary frame must be instance of str')
michael@0 373
michael@0 374 try:
michael@0 375 self._write(self._writer.build(message, end, binary))
michael@0 376 except ValueError, e:
michael@0 377 raise BadOperationException(e)
michael@0 378
michael@0 379 def receive_message(self):
michael@0 380 """Receive a WebSocket frame and return its payload as a text in
michael@0 381 unicode or a binary in str.
michael@0 382
michael@0 383 Returns:
michael@0 384 payload data of the frame
michael@0 385 - as unicode instance if received text frame
michael@0 386 - as str instance if received binary frame
michael@0 387 or None iff received closing handshake.
michael@0 388 Raises:
michael@0 389 BadOperationException: when called on a client-terminated
michael@0 390 connection.
michael@0 391 ConnectionTerminatedException: when read returns empty
michael@0 392 string.
michael@0 393 InvalidFrameException: when the frame contains invalid
michael@0 394 data.
michael@0 395 UnsupportedFrameException: when the received frame has
michael@0 396 flags, opcode we cannot handle. You can ignore this
michael@0 397 exception and continue receiving the next frame.
michael@0 398 """
michael@0 399
michael@0 400 if self._request.client_terminated:
michael@0 401 raise BadOperationException(
michael@0 402 'Requested receive_message after receiving a closing '
michael@0 403 'handshake')
michael@0 404
michael@0 405 while True:
michael@0 406 # mp_conn.read will block if no bytes are available.
michael@0 407 # Timeout is controlled by TimeOut directive of Apache.
michael@0 408
michael@0 409 frame = self._receive_frame_as_frame_object()
michael@0 410
michael@0 411 for frame_filter in self._options.incoming_frame_filters:
michael@0 412 frame_filter.filter(frame)
michael@0 413
michael@0 414 if frame.rsv1 or frame.rsv2 or frame.rsv3:
michael@0 415 raise UnsupportedFrameException(
michael@0 416 'Unsupported flag is set (rsv = %d%d%d)' %
michael@0 417 (frame.rsv1, frame.rsv2, frame.rsv3))
michael@0 418
michael@0 419 if frame.opcode == common.OPCODE_CONTINUATION:
michael@0 420 if not self._received_fragments:
michael@0 421 if frame.fin:
michael@0 422 raise InvalidFrameException(
michael@0 423 'Received a termination frame but fragmentation '
michael@0 424 'not started')
michael@0 425 else:
michael@0 426 raise InvalidFrameException(
michael@0 427 'Received an intermediate frame but '
michael@0 428 'fragmentation not started')
michael@0 429
michael@0 430 if frame.fin:
michael@0 431 # End of fragmentation frame
michael@0 432 self._received_fragments.append(frame.payload)
michael@0 433 message = ''.join(self._received_fragments)
michael@0 434 self._received_fragments = []
michael@0 435 else:
michael@0 436 # Intermediate frame
michael@0 437 self._received_fragments.append(frame.payload)
michael@0 438 continue
michael@0 439 else:
michael@0 440 if self._received_fragments:
michael@0 441 if frame.fin:
michael@0 442 raise InvalidFrameException(
michael@0 443 'Received an unfragmented frame without '
michael@0 444 'terminating existing fragmentation')
michael@0 445 else:
michael@0 446 raise InvalidFrameException(
michael@0 447 'New fragmentation started without terminating '
michael@0 448 'existing fragmentation')
michael@0 449
michael@0 450 if frame.fin:
michael@0 451 # Unfragmented frame
michael@0 452
michael@0 453 if (common.is_control_opcode(frame.opcode) and
michael@0 454 len(frame.payload) > 125):
michael@0 455 raise InvalidFrameException(
michael@0 456 'Application data size of control frames must be '
michael@0 457 '125 bytes or less')
michael@0 458
michael@0 459 self._original_opcode = frame.opcode
michael@0 460 message = frame.payload
michael@0 461 else:
michael@0 462 # Start of fragmentation frame
michael@0 463
michael@0 464 if common.is_control_opcode(frame.opcode):
michael@0 465 raise InvalidFrameException(
michael@0 466 'Control frames must not be fragmented')
michael@0 467
michael@0 468 self._original_opcode = frame.opcode
michael@0 469 self._received_fragments.append(frame.payload)
michael@0 470 continue
michael@0 471
michael@0 472 if self._original_opcode == common.OPCODE_TEXT:
michael@0 473 # The WebSocket protocol section 4.4 specifies that invalid
michael@0 474 # characters must be replaced with U+fffd REPLACEMENT
michael@0 475 # CHARACTER.
michael@0 476 try:
michael@0 477 return message.decode('utf-8')
michael@0 478 except UnicodeDecodeError, e:
michael@0 479 raise InvalidUTF8Exception(e)
michael@0 480 elif self._original_opcode == common.OPCODE_BINARY:
michael@0 481 return message
michael@0 482 elif self._original_opcode == common.OPCODE_CLOSE:
michael@0 483 self._request.client_terminated = True
michael@0 484
michael@0 485 # Status code is optional. We can have status reason only if we
michael@0 486 # have status code. Status reason can be empty string. So,
michael@0 487 # allowed cases are
michael@0 488 # - no application data: no code no reason
michael@0 489 # - 2 octet of application data: has code but no reason
michael@0 490 # - 3 or more octet of application data: both code and reason
michael@0 491 if len(message) == 0:
michael@0 492 self._logger.debug('Received close frame (empty body)')
michael@0 493 self._request.ws_close_code = (
michael@0 494 common.STATUS_NO_STATUS_RECEIVED)
michael@0 495 elif len(message) == 1:
michael@0 496 raise InvalidFrameException(
michael@0 497 'If a close frame has status code, the length of '
michael@0 498 'status code must be 2 octet')
michael@0 499 elif len(message) >= 2:
michael@0 500 self._request.ws_close_code = struct.unpack(
michael@0 501 '!H', message[0:2])[0]
michael@0 502 self._request.ws_close_reason = message[2:].decode(
michael@0 503 'utf-8', 'replace')
michael@0 504 self._logger.debug(
michael@0 505 'Received close frame (code=%d, reason=%r)',
michael@0 506 self._request.ws_close_code,
michael@0 507 self._request.ws_close_reason)
michael@0 508
michael@0 509 # Drain junk data after the close frame if necessary.
michael@0 510 self._drain_received_data()
michael@0 511
michael@0 512 if self._request.server_terminated:
michael@0 513 self._logger.debug(
michael@0 514 'Received ack for server-initiated closing handshake')
michael@0 515 return None
michael@0 516
michael@0 517 self._logger.debug(
michael@0 518 'Received client-initiated closing handshake')
michael@0 519
michael@0 520 code = common.STATUS_NORMAL_CLOSURE
michael@0 521 reason = ''
michael@0 522 if hasattr(self._request, '_dispatcher'):
michael@0 523 dispatcher = self._request._dispatcher
michael@0 524 code, reason = dispatcher.passive_closing_handshake(
michael@0 525 self._request)
michael@0 526 if code is None and reason is not None and len(reason) > 0:
michael@0 527 self._logger.warning(
michael@0 528 'Handler specified reason despite code being None')
michael@0 529 reason = ''
michael@0 530 if reason is None:
michael@0 531 reason = ''
michael@0 532 self._send_closing_handshake(code, reason)
michael@0 533 self._logger.debug(
michael@0 534 'Sent ack for client-initiated closing handshake '
michael@0 535 '(code=%r, reason=%r)', code, reason)
michael@0 536 return None
michael@0 537 elif self._original_opcode == common.OPCODE_PING:
michael@0 538 try:
michael@0 539 handler = self._request.on_ping_handler
michael@0 540 if handler:
michael@0 541 handler(self._request, message)
michael@0 542 continue
michael@0 543 except AttributeError, e:
michael@0 544 pass
michael@0 545 self._send_pong(message)
michael@0 546 elif self._original_opcode == common.OPCODE_PONG:
michael@0 547 # TODO(tyoshino): Add ping timeout handling.
michael@0 548
michael@0 549 inflight_pings = deque()
michael@0 550
michael@0 551 while True:
michael@0 552 try:
michael@0 553 expected_body = self._ping_queue.popleft()
michael@0 554 if expected_body == message:
michael@0 555 # inflight_pings contains pings ignored by the
michael@0 556 # other peer. Just forget them.
michael@0 557 self._logger.debug(
michael@0 558 'Ping %r is acked (%d pings were ignored)',
michael@0 559 expected_body, len(inflight_pings))
michael@0 560 break
michael@0 561 else:
michael@0 562 inflight_pings.append(expected_body)
michael@0 563 except IndexError, e:
michael@0 564 # The received pong was unsolicited pong. Keep the
michael@0 565 # ping queue as is.
michael@0 566 self._ping_queue = inflight_pings
michael@0 567 self._logger.debug('Received a unsolicited pong')
michael@0 568 break
michael@0 569
michael@0 570 try:
michael@0 571 handler = self._request.on_pong_handler
michael@0 572 if handler:
michael@0 573 handler(self._request, message)
michael@0 574 continue
michael@0 575 except AttributeError, e:
michael@0 576 pass
michael@0 577
michael@0 578 continue
michael@0 579 else:
michael@0 580 raise UnsupportedFrameException(
michael@0 581 'Opcode %d is not supported' % self._original_opcode)
michael@0 582
michael@0 583 def _send_closing_handshake(self, code, reason):
michael@0 584 body = ''
michael@0 585 if code is not None:
michael@0 586 if code >= (1 << 16) or code < 0:
michael@0 587 raise BadOperationException('Status code is out of range')
michael@0 588 encoded_reason = reason.encode('utf-8')
michael@0 589 if len(encoded_reason) + 2 > 125:
michael@0 590 raise BadOperationException(
michael@0 591 'Application data size of close frames must be 125 bytes '
michael@0 592 'or less')
michael@0 593 body = struct.pack('!H', code) + encoded_reason
michael@0 594
michael@0 595 frame = create_close_frame(
michael@0 596 body,
michael@0 597 self._options.mask_send,
michael@0 598 self._options.outgoing_frame_filters)
michael@0 599
michael@0 600 self._request.server_terminated = True
michael@0 601
michael@0 602 self._write(frame)
michael@0 603
michael@0 604 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
michael@0 605 """Closes a WebSocket connection.
michael@0 606
michael@0 607 Args:
michael@0 608 code: Status code for close frame. If code is None, a close
michael@0 609 frame with empty body will be sent.
michael@0 610 reason: string representing close reason.
michael@0 611 Raises:
michael@0 612 BadOperationException: when reason is specified with code None
michael@0 613 or reason is not an instance of both str and unicode.
michael@0 614 """
michael@0 615
michael@0 616 if self._request.server_terminated:
michael@0 617 self._logger.debug(
michael@0 618 'Requested close_connection but server is already terminated')
michael@0 619 return
michael@0 620
michael@0 621 if code is None:
michael@0 622 if reason is not None and len(reason) > 0:
michael@0 623 raise BadOperationException(
michael@0 624 'close reason must not be specified if code is None')
michael@0 625 reason = ''
michael@0 626 else:
michael@0 627 if not isinstance(reason, str) and not isinstance(reason, unicode):
michael@0 628 raise BadOperationException(
michael@0 629 'close reason must be an instance of str or unicode')
michael@0 630
michael@0 631 self._send_closing_handshake(code, reason)
michael@0 632 self._logger.debug(
michael@0 633 'Sent server-initiated closing handshake (code=%r, reason=%r)',
michael@0 634 code, reason)
michael@0 635
michael@0 636 if (code == common.STATUS_GOING_AWAY or
michael@0 637 code == common.STATUS_PROTOCOL_ERROR):
michael@0 638 # It doesn't make sense to wait for a close frame if the reason is
michael@0 639 # protocol error or that the server is going away. For some of
michael@0 640 # other reasons, it might not make sense to wait for a close frame,
michael@0 641 # but it's not clear, yet.
michael@0 642 return
michael@0 643
michael@0 644 # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
michael@0 645 # or until a server-defined timeout expires.
michael@0 646 #
michael@0 647 # For now, we expect receiving closing handshake right after sending
michael@0 648 # out closing handshake.
michael@0 649 message = self.receive_message()
michael@0 650 if message is not None:
michael@0 651 raise ConnectionTerminatedException(
michael@0 652 'Didn\'t receive valid ack for closing handshake')
michael@0 653 # TODO: 3. close the WebSocket connection.
michael@0 654 # note: mod_python Connection (mp_conn) doesn't have close method.
michael@0 655
michael@0 656 def send_ping(self, body=''):
michael@0 657 if len(body) > 125:
michael@0 658 raise ValueError(
michael@0 659 'Application data size of control frames must be 125 bytes or '
michael@0 660 'less')
michael@0 661 frame = create_ping_frame(
michael@0 662 body,
michael@0 663 self._options.mask_send,
michael@0 664 self._options.outgoing_frame_filters)
michael@0 665 self._write(frame)
michael@0 666
michael@0 667 self._ping_queue.append(body)
michael@0 668
michael@0 669 def _send_pong(self, body):
michael@0 670 if len(body) > 125:
michael@0 671 raise ValueError(
michael@0 672 'Application data size of control frames must be 125 bytes or '
michael@0 673 'less')
michael@0 674 frame = create_pong_frame(
michael@0 675 body,
michael@0 676 self._options.mask_send,
michael@0 677 self._options.outgoing_frame_filters)
michael@0 678 self._write(frame)
michael@0 679
michael@0 680 def _drain_received_data(self):
michael@0 681 """Drains unread data in the receive buffer to avoid sending out TCP
michael@0 682 RST packet. This is because when deflate-stream is enabled, some
michael@0 683 DEFLATE block for flushing data may follow a close frame. If any data
michael@0 684 remains in the receive buffer of a socket when the socket is closed,
michael@0 685 it sends out TCP RST packet to the other peer.
michael@0 686
michael@0 687 Since mod_python's mp_conn object doesn't support non-blocking read,
michael@0 688 we perform this only when pywebsocket is running in standalone mode.
michael@0 689 """
michael@0 690
michael@0 691 # If self._options.deflate_stream is true, self._request is
michael@0 692 # DeflateRequest, so we can get wrapped request object by
michael@0 693 # self._request._request.
michael@0 694 #
michael@0 695 # Only _StandaloneRequest has _drain_received_data method.
michael@0 696 if (self._options.deflate_stream and
michael@0 697 ('_drain_received_data' in dir(self._request._request))):
michael@0 698 self._request._request._drain_received_data()
michael@0 699
michael@0 700
michael@0 701 # vi:sts=4 sw=4 et

mercurial