testing/mozbase/mozdevice/tests/sut.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 #!/usr/bin/env python
     3 # Any copyright is dedicated to the Public Domain.
     4 # http://creativecommons.org/publicdomain/zero/1.0/
     6 import datetime
     7 import socket
     8 import time
    10 from threading import Thread
    12 class MockAgent(object):
    14     MAX_WAIT_TIME_SECONDS = 10
    15     SOCKET_TIMEOUT_SECONDS = 5
    17     def __init__(self, tester, start_commands = None, commands = []):
    18         if start_commands:
    19             self.commands = start_commands
    20         else:
    21             self.commands = [("testroot", "/mnt/sdcard"),
    22                              ("isdir /mnt/sdcard/tests", "TRUE"),
    23                              ("ver", "SUTAgentAndroid Version 1.14")]
    24         self.commands = self.commands + commands
    26         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    27         self._sock.bind(("127.0.0.1", 0))
    28         self._sock.listen(1)
    30         self.tester = tester
    32         self.thread = Thread(target=self._serve_thread)
    33         self.thread.start()
    35         self.should_stop = False
    37     @property
    38     def port(self):
    39         return self._sock.getsockname()[1]
    41     def _serve_thread(self):
    42         conn = None
    43         while self.commands:
    44             if not conn:
    45                 conn, addr = self._sock.accept()
    46                 conn.settimeout(self.SOCKET_TIMEOUT_SECONDS)
    47                 conn.send("$>\x00")
    48             (command, response) = self.commands.pop(0)
    49             data = ''
    50             timeout = datetime.datetime.now() + datetime.timedelta(
    51                 seconds=self.MAX_WAIT_TIME_SECONDS)
    52             # The data might come in chunks, particularly if we are expecting
    53             # multiple lines, as with push commands.
    54             while (len(data) < len(command) and
    55                    datetime.datetime.now() < timeout):
    56                 try:
    57                     data += conn.recv(1024)
    58                 except socket.timeout:
    59                     # We handle timeouts in the main loop.
    60                     pass
    61             self.tester.assertEqual(data.strip(), command)
    62             # send response and prompt separately to test for bug 789496
    63             # FIXME: Improve the mock agent, since overloading the meaning
    64             # of 'response' is getting confusing.
    65             if response is None: # code for "shut down"
    66                 conn.shutdown(socket.SHUT_RDWR)
    67                 conn.close()
    68                 conn = None
    69             elif type(response) is int: # code for "time out"
    70                 max_timeout = 15.0
    71                 timeout = 0.0
    72                 interval = 0.1
    73                 while not self.should_stop and timeout < max_timeout:
    74                     time.sleep(interval)
    75                     timeout += interval
    76                 if timeout >= max_timeout:
    77                     raise Exception("Maximum timeout reached! This should not "
    78                                     "happen")
    79                 return
    80             else:
    81                 # pull is handled specially, as we just pass back the full
    82                 # command line
    83                 if "pull" in command:
    84                     conn.send(response)
    85                 else:
    86                     conn.send("%s\n" % response)
    87                 conn.send("$>\x00")
    89     def wait(self):
    90         self.thread.join()

mercurial