michael@0: # Copyright 2011, Google Inc. michael@0: # All rights reserved. michael@0: # michael@0: # Redistribution and use in source and binary forms, with or without michael@0: # modification, are permitted provided that the following conditions are michael@0: # met: michael@0: # michael@0: # * Redistributions of source code must retain the above copyright michael@0: # notice, this list of conditions and the following disclaimer. michael@0: # * Redistributions in binary form must reproduce the above michael@0: # copyright notice, this list of conditions and the following disclaimer michael@0: # in the documentation and/or other materials provided with the michael@0: # distribution. michael@0: # * Neither the name of Google Inc. nor the names of its michael@0: # contributors may be used to endorse or promote products derived from michael@0: # this software without specific prior written permission. michael@0: # michael@0: # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS michael@0: # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT michael@0: # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR michael@0: # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT michael@0: # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, michael@0: # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT michael@0: # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE michael@0: # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: michael@0: michael@0: """Message related utilities. michael@0: michael@0: Note: request.connection.write/read are used in this module, even though michael@0: mod_python document says that they should be used only in connection michael@0: handlers. Unfortunately, we have no other options. For example, michael@0: request.write/read are not suitable because they don't allow direct raw michael@0: bytes writing/reading. michael@0: """ michael@0: michael@0: michael@0: import Queue michael@0: import threading michael@0: michael@0: michael@0: # Export Exception symbols from msgutil for backward compatibility michael@0: from mod_pywebsocket._stream_base import ConnectionTerminatedException michael@0: from mod_pywebsocket._stream_base import InvalidFrameException michael@0: from mod_pywebsocket._stream_base import BadOperationException michael@0: from mod_pywebsocket._stream_base import UnsupportedFrameException michael@0: michael@0: michael@0: # An API for handler to send/receive WebSocket messages. michael@0: def close_connection(request): michael@0: """Close connection. michael@0: michael@0: Args: michael@0: request: mod_python request. michael@0: """ michael@0: request.ws_stream.close_connection() michael@0: michael@0: michael@0: def send_message(request, message, end=True, binary=False): michael@0: """Send message. michael@0: michael@0: Args: michael@0: request: mod_python request. michael@0: message: unicode text or str binary to send. michael@0: end: False to send message as a fragment. All messages until the michael@0: first call with end=True (inclusive) will be delivered to the michael@0: client in separate frames but as one WebSocket message. michael@0: binary: send message as binary frame. michael@0: Raises: michael@0: BadOperationException: when server already terminated. michael@0: """ michael@0: request.ws_stream.send_message(message, end, binary) michael@0: michael@0: michael@0: def receive_message(request): michael@0: """Receive a WebSocket frame and return its payload as a text in michael@0: unicode or a binary in str. michael@0: michael@0: Args: michael@0: request: mod_python request. michael@0: Raises: michael@0: InvalidFrameException: when client send invalid frame. michael@0: UnsupportedFrameException: when client send unsupported frame e.g. some michael@0: of reserved bit is set but no extension can michael@0: recognize it. michael@0: InvalidUTF8Exception: when client send a text frame containing any michael@0: invalid UTF-8 string. michael@0: ConnectionTerminatedException: when the connection is closed michael@0: unexpectedly. michael@0: BadOperationException: when client already terminated. michael@0: """ michael@0: return request.ws_stream.receive_message() michael@0: michael@0: michael@0: def send_ping(request, body=''): michael@0: request.ws_stream.send_ping(body) michael@0: michael@0: michael@0: class MessageReceiver(threading.Thread): michael@0: """This class receives messages from the client. michael@0: michael@0: This class provides three ways to receive messages: blocking, michael@0: non-blocking, and via callback. Callback has the highest precedence. michael@0: michael@0: Note: This class should not be used with the standalone server for wss michael@0: because pyOpenSSL used by the server raises a fatal error if the socket michael@0: is accessed from multiple threads. michael@0: """ michael@0: michael@0: def __init__(self, request, onmessage=None): michael@0: """Construct an instance. michael@0: michael@0: Args: michael@0: request: mod_python request. michael@0: onmessage: a function to be called when a message is received. michael@0: May be None. If not None, the function is called on michael@0: another thread. In that case, MessageReceiver.receive michael@0: and MessageReceiver.receive_nowait are useless michael@0: because they will never return any messages. michael@0: """ michael@0: michael@0: threading.Thread.__init__(self) michael@0: self._request = request michael@0: self._queue = Queue.Queue() michael@0: self._onmessage = onmessage michael@0: self._stop_requested = False michael@0: self.setDaemon(True) michael@0: self.start() michael@0: michael@0: def run(self): michael@0: try: michael@0: while not self._stop_requested: michael@0: message = receive_message(self._request) michael@0: if self._onmessage: michael@0: self._onmessage(message) michael@0: else: michael@0: self._queue.put(message) michael@0: finally: michael@0: close_connection(self._request) michael@0: michael@0: def receive(self): michael@0: """ Receive a message from the channel, blocking. michael@0: michael@0: Returns: michael@0: message as a unicode string. michael@0: """ michael@0: return self._queue.get() michael@0: michael@0: def receive_nowait(self): michael@0: """ Receive a message from the channel, non-blocking. michael@0: michael@0: Returns: michael@0: message as a unicode string if available. None otherwise. michael@0: """ michael@0: try: michael@0: message = self._queue.get_nowait() michael@0: except Queue.Empty: michael@0: message = None michael@0: return message michael@0: michael@0: def stop(self): michael@0: """Request to stop this instance. michael@0: michael@0: The instance will be stopped after receiving the next message. michael@0: This method may not be very useful, but there is no clean way michael@0: in Python to forcefully stop a running thread. michael@0: """ michael@0: self._stop_requested = True michael@0: michael@0: michael@0: class MessageSender(threading.Thread): michael@0: """This class sends messages to the client. michael@0: michael@0: This class provides both synchronous and asynchronous ways to send michael@0: messages. michael@0: michael@0: Note: This class should not be used with the standalone server for wss michael@0: because pyOpenSSL used by the server raises a fatal error if the socket michael@0: is accessed from multiple threads. michael@0: """ michael@0: michael@0: def __init__(self, request): michael@0: """Construct an instance. michael@0: michael@0: Args: michael@0: request: mod_python request. michael@0: """ michael@0: threading.Thread.__init__(self) michael@0: self._request = request michael@0: self._queue = Queue.Queue() michael@0: self.setDaemon(True) michael@0: self.start() michael@0: michael@0: def run(self): michael@0: while True: michael@0: message, condition = self._queue.get() michael@0: condition.acquire() michael@0: send_message(self._request, message) michael@0: condition.notify() michael@0: condition.release() michael@0: michael@0: def send(self, message): michael@0: """Send a message, blocking.""" michael@0: michael@0: condition = threading.Condition() michael@0: condition.acquire() michael@0: self._queue.put((message, condition)) michael@0: condition.wait() michael@0: michael@0: def send_nowait(self, message): michael@0: """Send a message, non-blocking.""" michael@0: michael@0: self._queue.put((message, threading.Condition())) michael@0: michael@0: michael@0: # vi:sts=4 sw=4 et