michael@0: # A unix-oriented process dispatcher. Uses a single thread with select and michael@0: # waitpid to dispatch tasks. This avoids several deadlocks that are possible michael@0: # with fork/exec + threads + Python. michael@0: michael@0: import errno, os, select michael@0: from datetime import datetime, timedelta michael@0: from results import TestOutput michael@0: michael@0: class Task(object): michael@0: def __init__(self, test, pid, stdout, stderr): michael@0: self.test = test michael@0: self.cmd = test.get_command(test.js_cmd_prefix) michael@0: self.pid = pid michael@0: self.stdout = stdout michael@0: self.stderr = stderr michael@0: self.start = datetime.now() michael@0: self.out = [] michael@0: self.err = [] michael@0: michael@0: def spawn_test(test, passthrough = False): michael@0: """Spawn one child, return a task struct.""" michael@0: if not passthrough: michael@0: (rout, wout) = os.pipe() michael@0: (rerr, werr) = os.pipe() michael@0: michael@0: rv = os.fork() michael@0: michael@0: # Parent. michael@0: if rv: michael@0: os.close(wout) michael@0: os.close(werr) michael@0: return Task(test, rv, rout, rerr) michael@0: michael@0: # Child. michael@0: os.close(rout) michael@0: os.close(rerr) michael@0: michael@0: os.dup2(wout, 1) michael@0: os.dup2(werr, 2) michael@0: michael@0: cmd = test.get_command(test.js_cmd_prefix) michael@0: os.execvp(cmd[0], cmd) michael@0: michael@0: def total_seconds(td): michael@0: """ michael@0: Return the total number of seconds contained in the duration as a float michael@0: """ michael@0: return (float(td.microseconds) + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 michael@0: michael@0: def get_max_wait(tasks, results, timeout): michael@0: """ michael@0: Return the maximum time we can wait before any task should time out. michael@0: """ michael@0: michael@0: # If we have a progress-meter, we need to wake up to update it frequently. michael@0: wait = results.pb.update_granularity() michael@0: michael@0: # If a timeout is supplied, we need to wake up for the first task to michael@0: # timeout if that is sooner. michael@0: if timeout: michael@0: now = datetime.now() michael@0: timeout_delta = timedelta(seconds=timeout) michael@0: for task in tasks: michael@0: remaining = task.start + timeout_delta - now michael@0: if remaining < wait: michael@0: wait = remaining michael@0: michael@0: # Return the wait time in seconds, clamped to zero. michael@0: return max(total_seconds(wait), 0) michael@0: michael@0: def flush_input(fd, frags): michael@0: """ michael@0: Read any pages sitting in the file descriptor 'fd' into the list 'frags'. michael@0: """ michael@0: rv = os.read(fd, 4096) michael@0: frags.append(rv) michael@0: while len(rv) == 4096: michael@0: # If read() returns a full buffer, it may indicate there was 1 buffer michael@0: # worth of data, or that there is more data to read. Poll the socket michael@0: # before we read again to ensure that we will not block indefinitly. michael@0: readable, _, _ = select.select([fd], [], [], 0) michael@0: if not readable: michael@0: return michael@0: michael@0: rv = os.read(fd, 4096) michael@0: frags.append(rv) michael@0: michael@0: def read_input(tasks, timeout): michael@0: """ michael@0: Select on input or errors from the given task list for a max of timeout michael@0: seconds. michael@0: """ michael@0: rlist = [] michael@0: exlist = [] michael@0: outmap = {} # Fast access to fragment list given fd. michael@0: for t in tasks: michael@0: rlist.append(t.stdout) michael@0: rlist.append(t.stderr) michael@0: outmap[t.stdout] = t.out michael@0: outmap[t.stderr] = t.err michael@0: # This will trigger with a close event when the child dies, allowing michael@0: # us to respond immediately and not leave cores idle. michael@0: exlist.append(t.stdout) michael@0: michael@0: readable, _, _ = select.select(rlist, [], exlist, timeout) michael@0: for fd in readable: michael@0: flush_input(fd, outmap[fd]) michael@0: michael@0: def remove_task(tasks, pid): michael@0: """ michael@0: Return a pair with the removed task and the new, modified tasks list. michael@0: """ michael@0: index = None michael@0: for i, t in enumerate(tasks): michael@0: if t.pid == pid: michael@0: index = i michael@0: break michael@0: else: michael@0: raise KeyError("No such pid: %s" % pid) michael@0: michael@0: out = tasks[index] michael@0: tasks.pop(index) michael@0: return out michael@0: michael@0: def timed_out(task, timeout): michael@0: """ michael@0: Return True if the given task has been running for longer than |timeout|. michael@0: |timeout| may be falsy, indicating an infinite timeout (in which case michael@0: timed_out always returns False). michael@0: """ michael@0: if timeout: michael@0: now = datetime.now() michael@0: return (now - task.start) > timedelta(seconds=timeout) michael@0: return False michael@0: michael@0: def reap_zombies(tasks, results, timeout): michael@0: """ michael@0: Search for children of this process that have finished. If they are tasks, michael@0: then this routine will clean up the child and send a TestOutput to the michael@0: results channel. This method returns a new task list that has had the ended michael@0: tasks removed. michael@0: """ michael@0: while True: michael@0: try: michael@0: pid, status = os.waitpid(0, os.WNOHANG) michael@0: if pid == 0: michael@0: break michael@0: except OSError, e: michael@0: if e.errno == errno.ECHILD: michael@0: break michael@0: raise e michael@0: michael@0: ended = remove_task(tasks, pid) michael@0: flush_input(ended.stdout, ended.out) michael@0: flush_input(ended.stderr, ended.err) michael@0: os.close(ended.stdout) michael@0: os.close(ended.stderr) michael@0: michael@0: returncode = os.WEXITSTATUS(status) michael@0: if os.WIFSIGNALED(status): michael@0: returncode = -os.WTERMSIG(status) michael@0: michael@0: out = TestOutput( michael@0: ended.test, michael@0: ended.cmd, michael@0: ''.join(ended.out), michael@0: ''.join(ended.err), michael@0: returncode, michael@0: total_seconds(datetime.now() - ended.start), michael@0: timed_out(ended, timeout)) michael@0: results.push(out) michael@0: return tasks michael@0: michael@0: def kill_undead(tasks, results, timeout): michael@0: """ michael@0: Signal all children that are over the given timeout. michael@0: """ michael@0: for task in tasks: michael@0: if timed_out(task, timeout): michael@0: os.kill(task.pid, 9) michael@0: michael@0: def run_all_tests(tests, results, options): michael@0: # Copy and reverse for fast pop off end. michael@0: tests = tests[:] michael@0: tests.reverse() michael@0: michael@0: # The set of currently running tests. michael@0: tasks = [] michael@0: michael@0: while len(tests) or len(tasks): michael@0: while len(tests) and len(tasks) < options.worker_count: michael@0: tasks.append(spawn_test(tests.pop(), options.passthrough)) michael@0: michael@0: timeout = get_max_wait(tasks, results, options.timeout) michael@0: read_input(tasks, timeout) michael@0: michael@0: kill_undead(tasks, results, options.timeout) michael@0: tasks = reap_zombies(tasks, results, options.timeout) michael@0: michael@0: results.pb.poke() michael@0: michael@0: return True