js/src/tests/lib/tasks_unix.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 # A unix-oriented process dispatcher.  Uses a single thread with select and
     2 # waitpid to dispatch tasks.  This avoids several deadlocks that are possible
     3 # with fork/exec + threads + Python.
     5 import errno, os, select
     6 from datetime import datetime, timedelta
     7 from results import TestOutput
     9 class Task(object):
    10     def __init__(self, test, pid, stdout, stderr):
    11         self.test = test
    12         self.cmd = test.get_command(test.js_cmd_prefix)
    13         self.pid = pid
    14         self.stdout = stdout
    15         self.stderr = stderr
    16         self.start = datetime.now()
    17         self.out = []
    18         self.err = []
    20 def spawn_test(test, passthrough = False):
    21     """Spawn one child, return a task struct."""
    22     if not passthrough:
    23         (rout, wout) = os.pipe()
    24         (rerr, werr) = os.pipe()
    26         rv = os.fork()
    28         # Parent.
    29         if rv:
    30             os.close(wout)
    31             os.close(werr)
    32             return Task(test, rv, rout, rerr)
    34         # Child.
    35         os.close(rout)
    36         os.close(rerr)
    38         os.dup2(wout, 1)
    39         os.dup2(werr, 2)
    41     cmd = test.get_command(test.js_cmd_prefix)
    42     os.execvp(cmd[0], cmd)
    44 def total_seconds(td):
    45     """
    46     Return the total number of seconds contained in the duration as a float
    47     """
    48     return (float(td.microseconds) + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
    50 def get_max_wait(tasks, results, timeout):
    51     """
    52     Return the maximum time we can wait before any task should time out.
    53     """
    55     # If we have a progress-meter, we need to wake up to update it frequently.
    56     wait = results.pb.update_granularity()
    58     # If a timeout is supplied, we need to wake up for the first task to
    59     # timeout if that is sooner.
    60     if timeout:
    61         now = datetime.now()
    62         timeout_delta = timedelta(seconds=timeout)
    63         for task in tasks:
    64             remaining = task.start + timeout_delta - now
    65             if remaining < wait:
    66                 wait = remaining
    68     # Return the wait time in seconds, clamped to zero.
    69     return max(total_seconds(wait), 0)
    71 def flush_input(fd, frags):
    72     """
    73     Read any pages sitting in the file descriptor 'fd' into the list 'frags'.
    74     """
    75     rv = os.read(fd, 4096)
    76     frags.append(rv)
    77     while len(rv) == 4096:
    78         # If read() returns a full buffer, it may indicate there was 1 buffer
    79         # worth of data, or that there is more data to read.  Poll the socket
    80         # before we read again to ensure that we will not block indefinitly.
    81         readable, _, _ = select.select([fd], [], [], 0)
    82         if not readable:
    83             return
    85         rv = os.read(fd, 4096)
    86         frags.append(rv)
    88 def read_input(tasks, timeout):
    89     """
    90     Select on input or errors from the given task list for a max of timeout
    91     seconds.
    92     """
    93     rlist = []
    94     exlist = []
    95     outmap = {} # Fast access to fragment list given fd.
    96     for t in tasks:
    97         rlist.append(t.stdout)
    98         rlist.append(t.stderr)
    99         outmap[t.stdout] = t.out
   100         outmap[t.stderr] = t.err
   101         # This will trigger with a close event when the child dies, allowing
   102         # us to respond immediately and not leave cores idle.
   103         exlist.append(t.stdout)
   105     readable, _, _ = select.select(rlist, [], exlist, timeout)
   106     for fd in readable:
   107         flush_input(fd, outmap[fd])
   109 def remove_task(tasks, pid):
   110     """
   111     Return a pair with the removed task and the new, modified tasks list.
   112     """
   113     index = None
   114     for i, t in enumerate(tasks):
   115         if t.pid == pid:
   116             index = i
   117             break
   118     else:
   119         raise KeyError("No such pid: %s" % pid)
   121     out = tasks[index]
   122     tasks.pop(index)
   123     return out
   125 def timed_out(task, timeout):
   126     """
   127     Return True if the given task has been running for longer than |timeout|.
   128     |timeout| may be falsy, indicating an infinite timeout (in which case
   129     timed_out always returns False).
   130     """
   131     if timeout:
   132         now = datetime.now()
   133         return (now - task.start) > timedelta(seconds=timeout)
   134     return False
   136 def reap_zombies(tasks, results, timeout):
   137     """
   138     Search for children of this process that have finished.  If they are tasks,
   139     then this routine will clean up the child and send a TestOutput to the
   140     results channel.  This method returns a new task list that has had the ended
   141     tasks removed.
   142     """
   143     while True:
   144         try:
   145             pid, status = os.waitpid(0, os.WNOHANG)
   146             if pid == 0:
   147                 break
   148         except OSError, e:
   149             if e.errno == errno.ECHILD:
   150                 break
   151             raise e
   153         ended = remove_task(tasks, pid)
   154         flush_input(ended.stdout, ended.out)
   155         flush_input(ended.stderr, ended.err)
   156         os.close(ended.stdout)
   157         os.close(ended.stderr)
   159         returncode = os.WEXITSTATUS(status)
   160         if os.WIFSIGNALED(status):
   161             returncode = -os.WTERMSIG(status)
   163         out = TestOutput(
   164                    ended.test,
   165                    ended.cmd,
   166                    ''.join(ended.out),
   167                    ''.join(ended.err),
   168                    returncode,
   169                    total_seconds(datetime.now() - ended.start),
   170                    timed_out(ended, timeout))
   171         results.push(out)
   172     return tasks
   174 def kill_undead(tasks, results, timeout):
   175     """
   176     Signal all children that are over the given timeout.
   177     """
   178     for task in tasks:
   179         if timed_out(task, timeout):
   180             os.kill(task.pid, 9)
   182 def run_all_tests(tests, results, options):
   183     # Copy and reverse for fast pop off end.
   184     tests = tests[:]
   185     tests.reverse()
   187     # The set of currently running tests.
   188     tasks = []
   190     while len(tests) or len(tasks):
   191         while len(tests) and len(tasks) < options.worker_count:
   192             tasks.append(spawn_test(tests.pop(), options.passthrough))
   194         timeout = get_max_wait(tasks, results, options.timeout)
   195         read_input(tasks, timeout)
   197         kill_undead(tasks, results, options.timeout)
   198         tasks = reap_zombies(tasks, results, options.timeout)
   200         results.pb.poke()
   202     return True

mercurial