testing/mochitest/pywebsocket/mod_pywebsocket/_stream_hybi.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/testing/mochitest/pywebsocket/mod_pywebsocket/_stream_hybi.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,701 @@
     1.4 +# Copyright 2012, Google Inc.
     1.5 +# All rights reserved.
     1.6 +#
     1.7 +# Redistribution and use in source and binary forms, with or without
     1.8 +# modification, are permitted provided that the following conditions are
     1.9 +# met:
    1.10 +#
    1.11 +#     * Redistributions of source code must retain the above copyright
    1.12 +# notice, this list of conditions and the following disclaimer.
    1.13 +#     * Redistributions in binary form must reproduce the above
    1.14 +# copyright notice, this list of conditions and the following disclaimer
    1.15 +# in the documentation and/or other materials provided with the
    1.16 +# distribution.
    1.17 +#     * Neither the name of Google Inc. nor the names of its
    1.18 +# contributors may be used to endorse or promote products derived from
    1.19 +# this software without specific prior written permission.
    1.20 +#
    1.21 +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
    1.22 +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
    1.23 +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
    1.24 +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
    1.25 +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
    1.26 +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
    1.27 +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
    1.28 +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
    1.29 +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    1.30 +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    1.31 +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    1.32 +
    1.33 +
    1.34 +"""This file provides classes and helper functions for parsing/building frames
    1.35 +of the WebSocket protocol (RFC 6455).
    1.36 +
    1.37 +Specification:
    1.38 +http://tools.ietf.org/html/rfc6455
    1.39 +"""
    1.40 +
    1.41 +
    1.42 +from collections import deque
    1.43 +import os
    1.44 +import struct
    1.45 +
    1.46 +from mod_pywebsocket import common
    1.47 +from mod_pywebsocket import util
    1.48 +from mod_pywebsocket._stream_base import BadOperationException
    1.49 +from mod_pywebsocket._stream_base import ConnectionTerminatedException
    1.50 +from mod_pywebsocket._stream_base import InvalidFrameException
    1.51 +from mod_pywebsocket._stream_base import InvalidUTF8Exception
    1.52 +from mod_pywebsocket._stream_base import StreamBase
    1.53 +from mod_pywebsocket._stream_base import UnsupportedFrameException
    1.54 +
    1.55 +
    1.56 +_NOOP_MASKER = util.NoopMasker()
    1.57 +
    1.58 +
    1.59 +class Frame(object):
    1.60 +
    1.61 +    def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
    1.62 +                 opcode=None, payload=''):
    1.63 +        self.fin = fin
    1.64 +        self.rsv1 = rsv1
    1.65 +        self.rsv2 = rsv2
    1.66 +        self.rsv3 = rsv3
    1.67 +        self.opcode = opcode
    1.68 +        self.payload = payload
    1.69 +
    1.70 +
    1.71 +# Helper functions made public to be used for writing unittests for WebSocket
    1.72 +# clients.
    1.73 +
    1.74 +
    1.75 +def create_length_header(length, mask):
    1.76 +    """Creates a length header.
    1.77 +
    1.78 +    Args:
    1.79 +        length: Frame length. Must be less than 2^63.
    1.80 +        mask: Mask bit. Must be boolean.
    1.81 +
    1.82 +    Raises:
    1.83 +        ValueError: when bad data is given.
    1.84 +    """
    1.85 +
    1.86 +    if mask:
    1.87 +        mask_bit = 1 << 7
    1.88 +    else:
    1.89 +        mask_bit = 0
    1.90 +
    1.91 +    if length < 0:
    1.92 +        raise ValueError('length must be non negative integer')
    1.93 +    elif length <= 125:
    1.94 +        return chr(mask_bit | length)
    1.95 +    elif length < (1 << 16):
    1.96 +        return chr(mask_bit | 126) + struct.pack('!H', length)
    1.97 +    elif length < (1 << 63):
    1.98 +        return chr(mask_bit | 127) + struct.pack('!Q', length)
    1.99 +    else:
   1.100 +        raise ValueError('Payload is too big for one frame')
   1.101 +
   1.102 +
   1.103 +def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
   1.104 +    """Creates a frame header.
   1.105 +
   1.106 +    Raises:
   1.107 +        Exception: when bad data is given.
   1.108 +    """
   1.109 +
   1.110 +    if opcode < 0 or 0xf < opcode:
   1.111 +        raise ValueError('Opcode out of range')
   1.112 +
   1.113 +    if payload_length < 0 or (1 << 63) <= payload_length:
   1.114 +        raise ValueError('payload_length out of range')
   1.115 +
   1.116 +    if (fin | rsv1 | rsv2 | rsv3) & ~1:
   1.117 +        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
   1.118 +
   1.119 +    header = ''
   1.120 +
   1.121 +    first_byte = ((fin << 7)
   1.122 +                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
   1.123 +                  | opcode)
   1.124 +    header += chr(first_byte)
   1.125 +    header += create_length_header(payload_length, mask)
   1.126 +
   1.127 +    return header
   1.128 +
   1.129 +
   1.130 +def _build_frame(header, body, mask):
   1.131 +    if not mask:
   1.132 +        return header + body
   1.133 +
   1.134 +    masking_nonce = os.urandom(4)
   1.135 +    masker = util.RepeatedXorMasker(masking_nonce)
   1.136 +
   1.137 +    return header + masking_nonce + masker.mask(body)
   1.138 +
   1.139 +
   1.140 +def _filter_and_format_frame_object(frame, mask, frame_filters):
   1.141 +    for frame_filter in frame_filters:
   1.142 +        frame_filter.filter(frame)
   1.143 +
   1.144 +    header = create_header(
   1.145 +        frame.opcode, len(frame.payload), frame.fin,
   1.146 +        frame.rsv1, frame.rsv2, frame.rsv3, mask)
   1.147 +    return _build_frame(header, frame.payload, mask)
   1.148 +
   1.149 +
   1.150 +def create_binary_frame(
   1.151 +    message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
   1.152 +    """Creates a simple binary frame with no extension, reserved bit."""
   1.153 +
   1.154 +    frame = Frame(fin=fin, opcode=opcode, payload=message)
   1.155 +    return _filter_and_format_frame_object(frame, mask, frame_filters)
   1.156 +
   1.157 +
   1.158 +def create_text_frame(
   1.159 +    message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
   1.160 +    """Creates a simple text frame with no extension, reserved bit."""
   1.161 +
   1.162 +    encoded_message = message.encode('utf-8')
   1.163 +    return create_binary_frame(encoded_message, opcode, fin, mask,
   1.164 +                               frame_filters)
   1.165 +
   1.166 +
   1.167 +class FragmentedFrameBuilder(object):
   1.168 +    """A stateful class to send a message as fragments."""
   1.169 +
   1.170 +    def __init__(self, mask, frame_filters=[]):
   1.171 +        """Constructs an instance."""
   1.172 +
   1.173 +        self._mask = mask
   1.174 +        self._frame_filters = frame_filters
   1.175 +
   1.176 +        self._started = False
   1.177 +
   1.178 +        # Hold opcode of the first frame in messages to verify types of other
   1.179 +        # frames in the message are all the same.
   1.180 +        self._opcode = common.OPCODE_TEXT
   1.181 +
   1.182 +    def build(self, message, end, binary):
   1.183 +        if binary:
   1.184 +            frame_type = common.OPCODE_BINARY
   1.185 +        else:
   1.186 +            frame_type = common.OPCODE_TEXT
   1.187 +        if self._started:
   1.188 +            if self._opcode != frame_type:
   1.189 +                raise ValueError('Message types are different in frames for '
   1.190 +                                 'the same message')
   1.191 +            opcode = common.OPCODE_CONTINUATION
   1.192 +        else:
   1.193 +            opcode = frame_type
   1.194 +            self._opcode = frame_type
   1.195 +
   1.196 +        if end:
   1.197 +            self._started = False
   1.198 +            fin = 1
   1.199 +        else:
   1.200 +            self._started = True
   1.201 +            fin = 0
   1.202 +
   1.203 +        if binary:
   1.204 +            return create_binary_frame(
   1.205 +                message, opcode, fin, self._mask, self._frame_filters)
   1.206 +        else:
   1.207 +            return create_text_frame(
   1.208 +                message, opcode, fin, self._mask, self._frame_filters)
   1.209 +
   1.210 +
   1.211 +def _create_control_frame(opcode, body, mask, frame_filters):
   1.212 +    frame = Frame(opcode=opcode, payload=body)
   1.213 +
   1.214 +    return _filter_and_format_frame_object(frame, mask, frame_filters)
   1.215 +
   1.216 +
   1.217 +def create_ping_frame(body, mask=False, frame_filters=[]):
   1.218 +    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
   1.219 +
   1.220 +
   1.221 +def create_pong_frame(body, mask=False, frame_filters=[]):
   1.222 +    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
   1.223 +
   1.224 +
   1.225 +def create_close_frame(body, mask=False, frame_filters=[]):
   1.226 +    return _create_control_frame(
   1.227 +        common.OPCODE_CLOSE, body, mask, frame_filters)
   1.228 +
   1.229 +
   1.230 +class StreamOptions(object):
   1.231 +    """Holds option values to configure Stream objects."""
   1.232 +
   1.233 +    def __init__(self):
   1.234 +        """Constructs StreamOptions."""
   1.235 +
   1.236 +        # Enables deflate-stream extension.
   1.237 +        self.deflate_stream = False
   1.238 +
   1.239 +        # Filters applied to frames.
   1.240 +        self.outgoing_frame_filters = []
   1.241 +        self.incoming_frame_filters = []
   1.242 +
   1.243 +        self.mask_send = False
   1.244 +        self.unmask_receive = True
   1.245 +
   1.246 +
   1.247 +class Stream(StreamBase):
   1.248 +    """A class for parsing/building frames of the WebSocket protocol
   1.249 +    (RFC 6455).
   1.250 +    """
   1.251 +
   1.252 +    def __init__(self, request, options):
   1.253 +        """Constructs an instance.
   1.254 +
   1.255 +        Args:
   1.256 +            request: mod_python request.
   1.257 +        """
   1.258 +
   1.259 +        StreamBase.__init__(self, request)
   1.260 +
   1.261 +        self._logger = util.get_class_logger(self)
   1.262 +
   1.263 +        self._options = options
   1.264 +
   1.265 +        if self._options.deflate_stream:
   1.266 +            self._logger.debug('Setup filter for deflate-stream')
   1.267 +            self._request = util.DeflateRequest(self._request)
   1.268 +
   1.269 +        self._request.client_terminated = False
   1.270 +        self._request.server_terminated = False
   1.271 +
   1.272 +        # Holds body of received fragments.
   1.273 +        self._received_fragments = []
   1.274 +        # Holds the opcode of the first fragment.
   1.275 +        self._original_opcode = None
   1.276 +
   1.277 +        self._writer = FragmentedFrameBuilder(
   1.278 +            self._options.mask_send, self._options.outgoing_frame_filters)
   1.279 +
   1.280 +        self._ping_queue = deque()
   1.281 +
   1.282 +    def _receive_frame(self):
   1.283 +        """Receives a frame and return data in the frame as a tuple containing
   1.284 +        each header field and payload separately.
   1.285 +
   1.286 +        Raises:
   1.287 +            ConnectionTerminatedException: when read returns empty
   1.288 +                string.
   1.289 +            InvalidFrameException: when the frame contains invalid data.
   1.290 +        """
   1.291 +
   1.292 +        received = self.receive_bytes(2)
   1.293 +
   1.294 +        first_byte = ord(received[0])
   1.295 +        fin = (first_byte >> 7) & 1
   1.296 +        rsv1 = (first_byte >> 6) & 1
   1.297 +        rsv2 = (first_byte >> 5) & 1
   1.298 +        rsv3 = (first_byte >> 4) & 1
   1.299 +        opcode = first_byte & 0xf
   1.300 +
   1.301 +        second_byte = ord(received[1])
   1.302 +        mask = (second_byte >> 7) & 1
   1.303 +        payload_length = second_byte & 0x7f
   1.304 +
   1.305 +        if (mask == 1) != self._options.unmask_receive:
   1.306 +            raise InvalidFrameException(
   1.307 +                'Mask bit on the received frame did\'nt match masking '
   1.308 +                'configuration for received frames')
   1.309 +
   1.310 +        # The Hybi-13 and later specs disallow putting a value in 0x0-0xFFFF
   1.311 +        # into the 8-octet extended payload length field (or 0x0-0xFD in
   1.312 +        # 2-octet field).
   1.313 +        valid_length_encoding = True
   1.314 +        length_encoding_bytes = 1
   1.315 +        if payload_length == 127:
   1.316 +            extended_payload_length = self.receive_bytes(8)
   1.317 +            payload_length = struct.unpack(
   1.318 +                '!Q', extended_payload_length)[0]
   1.319 +            if payload_length > 0x7FFFFFFFFFFFFFFF:
   1.320 +                raise InvalidFrameException(
   1.321 +                    'Extended payload length >= 2^63')
   1.322 +            if self._request.ws_version >= 13 and payload_length < 0x10000:
   1.323 +                valid_length_encoding = False
   1.324 +                length_encoding_bytes = 8
   1.325 +        elif payload_length == 126:
   1.326 +            extended_payload_length = self.receive_bytes(2)
   1.327 +            payload_length = struct.unpack(
   1.328 +                '!H', extended_payload_length)[0]
   1.329 +            if self._request.ws_version >= 13 and payload_length < 126:
   1.330 +                valid_length_encoding = False
   1.331 +                length_encoding_bytes = 2
   1.332 +
   1.333 +        if not valid_length_encoding:
   1.334 +            self._logger.warning(
   1.335 +                'Payload length is not encoded using the minimal number of '
   1.336 +                'bytes (%d is encoded using %d bytes)',
   1.337 +                payload_length,
   1.338 +                length_encoding_bytes)
   1.339 +
   1.340 +        if mask == 1:
   1.341 +            masking_nonce = self.receive_bytes(4)
   1.342 +            masker = util.RepeatedXorMasker(masking_nonce)
   1.343 +        else:
   1.344 +            masker = _NOOP_MASKER
   1.345 +
   1.346 +        bytes = masker.mask(self.receive_bytes(payload_length))
   1.347 +
   1.348 +        return opcode, bytes, fin, rsv1, rsv2, rsv3
   1.349 +
   1.350 +    def _receive_frame_as_frame_object(self):
   1.351 +        opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
   1.352 +
   1.353 +        return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
   1.354 +                     opcode=opcode, payload=bytes)
   1.355 +
   1.356 +    def send_message(self, message, end=True, binary=False):
   1.357 +        """Send message.
   1.358 +
   1.359 +        Args:
   1.360 +            message: text in unicode or binary in str to send.
   1.361 +            binary: send message as binary frame.
   1.362 +
   1.363 +        Raises:
   1.364 +            BadOperationException: when called on a server-terminated
   1.365 +                connection or called with inconsistent message type or
   1.366 +                binary parameter.
   1.367 +        """
   1.368 +
   1.369 +        if self._request.server_terminated:
   1.370 +            raise BadOperationException(
   1.371 +                'Requested send_message after sending out a closing handshake')
   1.372 +
   1.373 +        if binary and isinstance(message, unicode):
   1.374 +            raise BadOperationException(
   1.375 +                'Message for binary frame must be instance of str')
   1.376 +
   1.377 +        try:
   1.378 +            self._write(self._writer.build(message, end, binary))
   1.379 +        except ValueError, e:
   1.380 +            raise BadOperationException(e)
   1.381 +
   1.382 +    def receive_message(self):
   1.383 +        """Receive a WebSocket frame and return its payload as a text in
   1.384 +        unicode or a binary in str.
   1.385 +
   1.386 +        Returns:
   1.387 +            payload data of the frame
   1.388 +            - as unicode instance if received text frame
   1.389 +            - as str instance if received binary frame
   1.390 +            or None iff received closing handshake.
   1.391 +        Raises:
   1.392 +            BadOperationException: when called on a client-terminated
   1.393 +                connection.
   1.394 +            ConnectionTerminatedException: when read returns empty
   1.395 +                string.
   1.396 +            InvalidFrameException: when the frame contains invalid
   1.397 +                data.
   1.398 +            UnsupportedFrameException: when the received frame has
   1.399 +                flags, opcode we cannot handle. You can ignore this
   1.400 +                exception and continue receiving the next frame.
   1.401 +        """
   1.402 +
   1.403 +        if self._request.client_terminated:
   1.404 +            raise BadOperationException(
   1.405 +                'Requested receive_message after receiving a closing '
   1.406 +                'handshake')
   1.407 +
   1.408 +        while True:
   1.409 +            # mp_conn.read will block if no bytes are available.
   1.410 +            # Timeout is controlled by TimeOut directive of Apache.
   1.411 +
   1.412 +            frame = self._receive_frame_as_frame_object()
   1.413 +
   1.414 +            for frame_filter in self._options.incoming_frame_filters:
   1.415 +                frame_filter.filter(frame)
   1.416 +
   1.417 +            if frame.rsv1 or frame.rsv2 or frame.rsv3:
   1.418 +                raise UnsupportedFrameException(
   1.419 +                    'Unsupported flag is set (rsv = %d%d%d)' %
   1.420 +                    (frame.rsv1, frame.rsv2, frame.rsv3))
   1.421 +
   1.422 +            if frame.opcode == common.OPCODE_CONTINUATION:
   1.423 +                if not self._received_fragments:
   1.424 +                    if frame.fin:
   1.425 +                        raise InvalidFrameException(
   1.426 +                            'Received a termination frame but fragmentation '
   1.427 +                            'not started')
   1.428 +                    else:
   1.429 +                        raise InvalidFrameException(
   1.430 +                            'Received an intermediate frame but '
   1.431 +                            'fragmentation not started')
   1.432 +
   1.433 +                if frame.fin:
   1.434 +                    # End of fragmentation frame
   1.435 +                    self._received_fragments.append(frame.payload)
   1.436 +                    message = ''.join(self._received_fragments)
   1.437 +                    self._received_fragments = []
   1.438 +                else:
   1.439 +                    # Intermediate frame
   1.440 +                    self._received_fragments.append(frame.payload)
   1.441 +                    continue
   1.442 +            else:
   1.443 +                if self._received_fragments:
   1.444 +                    if frame.fin:
   1.445 +                        raise InvalidFrameException(
   1.446 +                            'Received an unfragmented frame without '
   1.447 +                            'terminating existing fragmentation')
   1.448 +                    else:
   1.449 +                        raise InvalidFrameException(
   1.450 +                            'New fragmentation started without terminating '
   1.451 +                            'existing fragmentation')
   1.452 +
   1.453 +                if frame.fin:
   1.454 +                    # Unfragmented frame
   1.455 +
   1.456 +                    if (common.is_control_opcode(frame.opcode) and
   1.457 +                        len(frame.payload) > 125):
   1.458 +                        raise InvalidFrameException(
   1.459 +                            'Application data size of control frames must be '
   1.460 +                            '125 bytes or less')
   1.461 +
   1.462 +                    self._original_opcode = frame.opcode
   1.463 +                    message = frame.payload
   1.464 +                else:
   1.465 +                    # Start of fragmentation frame
   1.466 +
   1.467 +                    if common.is_control_opcode(frame.opcode):
   1.468 +                        raise InvalidFrameException(
   1.469 +                            'Control frames must not be fragmented')
   1.470 +
   1.471 +                    self._original_opcode = frame.opcode
   1.472 +                    self._received_fragments.append(frame.payload)
   1.473 +                    continue
   1.474 +
   1.475 +            if self._original_opcode == common.OPCODE_TEXT:
   1.476 +                # The WebSocket protocol section 4.4 specifies that invalid
   1.477 +                # characters must be replaced with U+fffd REPLACEMENT
   1.478 +                # CHARACTER.
   1.479 +                try:
   1.480 +                    return message.decode('utf-8')
   1.481 +                except UnicodeDecodeError, e:
   1.482 +                    raise InvalidUTF8Exception(e)
   1.483 +            elif self._original_opcode == common.OPCODE_BINARY:
   1.484 +                return message
   1.485 +            elif self._original_opcode == common.OPCODE_CLOSE:
   1.486 +                self._request.client_terminated = True
   1.487 +
   1.488 +                # Status code is optional. We can have status reason only if we
   1.489 +                # have status code. Status reason can be empty string. So,
   1.490 +                # allowed cases are
   1.491 +                # - no application data: no code no reason
   1.492 +                # - 2 octet of application data: has code but no reason
   1.493 +                # - 3 or more octet of application data: both code and reason
   1.494 +                if len(message) == 0:
   1.495 +                    self._logger.debug('Received close frame (empty body)')
   1.496 +                    self._request.ws_close_code = (
   1.497 +                        common.STATUS_NO_STATUS_RECEIVED)
   1.498 +                elif len(message) == 1:
   1.499 +                    raise InvalidFrameException(
   1.500 +                        'If a close frame has status code, the length of '
   1.501 +                        'status code must be 2 octet')
   1.502 +                elif len(message) >= 2:
   1.503 +                    self._request.ws_close_code = struct.unpack(
   1.504 +                        '!H', message[0:2])[0]
   1.505 +                    self._request.ws_close_reason = message[2:].decode(
   1.506 +                        'utf-8', 'replace')
   1.507 +                    self._logger.debug(
   1.508 +                        'Received close frame (code=%d, reason=%r)',
   1.509 +                        self._request.ws_close_code,
   1.510 +                        self._request.ws_close_reason)
   1.511 +
   1.512 +                # Drain junk data after the close frame if necessary.
   1.513 +                self._drain_received_data()
   1.514 +
   1.515 +                if self._request.server_terminated:
   1.516 +                    self._logger.debug(
   1.517 +                        'Received ack for server-initiated closing handshake')
   1.518 +                    return None
   1.519 +
   1.520 +                self._logger.debug(
   1.521 +                    'Received client-initiated closing handshake')
   1.522 +
   1.523 +                code = common.STATUS_NORMAL_CLOSURE
   1.524 +                reason = ''
   1.525 +                if hasattr(self._request, '_dispatcher'):
   1.526 +                    dispatcher = self._request._dispatcher
   1.527 +                    code, reason = dispatcher.passive_closing_handshake(
   1.528 +                        self._request)
   1.529 +                    if code is None and reason is not None and len(reason) > 0:
   1.530 +                        self._logger.warning(
   1.531 +                            'Handler specified reason despite code being None')
   1.532 +                        reason = ''
   1.533 +                    if reason is None:
   1.534 +                        reason = ''
   1.535 +                self._send_closing_handshake(code, reason)
   1.536 +                self._logger.debug(
   1.537 +                    'Sent ack for client-initiated closing handshake '
   1.538 +                    '(code=%r, reason=%r)', code, reason)
   1.539 +                return None
   1.540 +            elif self._original_opcode == common.OPCODE_PING:
   1.541 +                try:
   1.542 +                    handler = self._request.on_ping_handler
   1.543 +                    if handler:
   1.544 +                        handler(self._request, message)
   1.545 +                        continue
   1.546 +                except AttributeError, e:
   1.547 +                    pass
   1.548 +                self._send_pong(message)
   1.549 +            elif self._original_opcode == common.OPCODE_PONG:
   1.550 +                # TODO(tyoshino): Add ping timeout handling.
   1.551 +
   1.552 +                inflight_pings = deque()
   1.553 +
   1.554 +                while True:
   1.555 +                    try:
   1.556 +                        expected_body = self._ping_queue.popleft()
   1.557 +                        if expected_body == message:
   1.558 +                            # inflight_pings contains pings ignored by the
   1.559 +                            # other peer. Just forget them.
   1.560 +                            self._logger.debug(
   1.561 +                                'Ping %r is acked (%d pings were ignored)',
   1.562 +                                expected_body, len(inflight_pings))
   1.563 +                            break
   1.564 +                        else:
   1.565 +                            inflight_pings.append(expected_body)
   1.566 +                    except IndexError, e:
   1.567 +                        # The received pong was unsolicited pong. Keep the
   1.568 +                        # ping queue as is.
   1.569 +                        self._ping_queue = inflight_pings
   1.570 +                        self._logger.debug('Received a unsolicited pong')
   1.571 +                        break
   1.572 +
   1.573 +                try:
   1.574 +                    handler = self._request.on_pong_handler
   1.575 +                    if handler:
   1.576 +                        handler(self._request, message)
   1.577 +                        continue
   1.578 +                except AttributeError, e:
   1.579 +                    pass
   1.580 +
   1.581 +                continue
   1.582 +            else:
   1.583 +                raise UnsupportedFrameException(
   1.584 +                    'Opcode %d is not supported' % self._original_opcode)
   1.585 +
   1.586 +    def _send_closing_handshake(self, code, reason):
   1.587 +        body = ''
   1.588 +        if code is not None:
   1.589 +            if code >= (1 << 16) or code < 0:
   1.590 +                raise BadOperationException('Status code is out of range')
   1.591 +            encoded_reason = reason.encode('utf-8')
   1.592 +            if len(encoded_reason) + 2 > 125:
   1.593 +                raise BadOperationException(
   1.594 +                    'Application data size of close frames must be 125 bytes '
   1.595 +                    'or less')
   1.596 +            body = struct.pack('!H', code) + encoded_reason
   1.597 +
   1.598 +        frame = create_close_frame(
   1.599 +            body,
   1.600 +            self._options.mask_send,
   1.601 +            self._options.outgoing_frame_filters)
   1.602 +
   1.603 +        self._request.server_terminated = True
   1.604 +
   1.605 +        self._write(frame)
   1.606 +
   1.607 +    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
   1.608 +        """Closes a WebSocket connection.
   1.609 +
   1.610 +        Args:
   1.611 +            code: Status code for close frame. If code is None, a close
   1.612 +                frame with empty body will be sent.
   1.613 +            reason: string representing close reason.
   1.614 +        Raises:
   1.615 +            BadOperationException: when reason is specified with code None
   1.616 +            or reason is not an instance of both str and unicode.
   1.617 +        """
   1.618 +
   1.619 +        if self._request.server_terminated:
   1.620 +            self._logger.debug(
   1.621 +                'Requested close_connection but server is already terminated')
   1.622 +            return
   1.623 +
   1.624 +        if code is None:
   1.625 +            if reason is not None and len(reason) > 0:
   1.626 +                raise BadOperationException(
   1.627 +                    'close reason must not be specified if code is None')
   1.628 +            reason = ''
   1.629 +        else:
   1.630 +            if not isinstance(reason, str) and not isinstance(reason, unicode):
   1.631 +                raise BadOperationException(
   1.632 +                    'close reason must be an instance of str or unicode')
   1.633 +
   1.634 +        self._send_closing_handshake(code, reason)
   1.635 +        self._logger.debug(
   1.636 +            'Sent server-initiated closing handshake (code=%r, reason=%r)',
   1.637 +            code, reason)
   1.638 +
   1.639 +        if (code == common.STATUS_GOING_AWAY or
   1.640 +            code == common.STATUS_PROTOCOL_ERROR):
   1.641 +            # It doesn't make sense to wait for a close frame if the reason is
   1.642 +            # protocol error or that the server is going away. For some of
   1.643 +            # other reasons, it might not make sense to wait for a close frame,
   1.644 +            # but it's not clear, yet.
   1.645 +            return
   1.646 +
   1.647 +        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
   1.648 +        # or until a server-defined timeout expires.
   1.649 +        #
   1.650 +        # For now, we expect receiving closing handshake right after sending
   1.651 +        # out closing handshake.
   1.652 +        message = self.receive_message()
   1.653 +        if message is not None:
   1.654 +            raise ConnectionTerminatedException(
   1.655 +                'Didn\'t receive valid ack for closing handshake')
   1.656 +        # TODO: 3. close the WebSocket connection.
   1.657 +        # note: mod_python Connection (mp_conn) doesn't have close method.
   1.658 +
   1.659 +    def send_ping(self, body=''):
   1.660 +        if len(body) > 125:
   1.661 +            raise ValueError(
   1.662 +                'Application data size of control frames must be 125 bytes or '
   1.663 +                'less')
   1.664 +        frame = create_ping_frame(
   1.665 +            body,
   1.666 +            self._options.mask_send,
   1.667 +            self._options.outgoing_frame_filters)
   1.668 +        self._write(frame)
   1.669 +
   1.670 +        self._ping_queue.append(body)
   1.671 +
   1.672 +    def _send_pong(self, body):
   1.673 +        if len(body) > 125:
   1.674 +            raise ValueError(
   1.675 +                'Application data size of control frames must be 125 bytes or '
   1.676 +                'less')
   1.677 +        frame = create_pong_frame(
   1.678 +            body,
   1.679 +            self._options.mask_send,
   1.680 +            self._options.outgoing_frame_filters)
   1.681 +        self._write(frame)
   1.682 +
   1.683 +    def _drain_received_data(self):
   1.684 +        """Drains unread data in the receive buffer to avoid sending out TCP
   1.685 +        RST packet. This is because when deflate-stream is enabled, some
   1.686 +        DEFLATE block for flushing data may follow a close frame. If any data
   1.687 +        remains in the receive buffer of a socket when the socket is closed,
   1.688 +        it sends out TCP RST packet to the other peer.
   1.689 +
   1.690 +        Since mod_python's mp_conn object doesn't support non-blocking read,
   1.691 +        we perform this only when pywebsocket is running in standalone mode.
   1.692 +        """
   1.693 +
   1.694 +        # If self._options.deflate_stream is true, self._request is
   1.695 +        # DeflateRequest, so we can get wrapped request object by
   1.696 +        # self._request._request.
   1.697 +        #
   1.698 +        # Only _StandaloneRequest has _drain_received_data method.
   1.699 +        if (self._options.deflate_stream and
   1.700 +            ('_drain_received_data' in dir(self._request._request))):
   1.701 +            self._request._request._drain_received_data()
   1.702 +
   1.703 +
   1.704 +# vi:sts=4 sw=4 et

mercurial