michael@0: import fcntl, os, select, time michael@0: from subprocess import Popen, PIPE michael@0: michael@0: # Run a series of subprocesses. Try to keep up to a certain number going in michael@0: # parallel at any given time. Enforce time limits. michael@0: # michael@0: # This is implemented using non-blocking I/O, and so is Unix-specific. michael@0: # michael@0: # We assume that, if a task closes its standard error, then it's safe to michael@0: # wait for it to terminate. So an ill-behaved task that closes its standard michael@0: # output and then hangs will hang us, as well. However, as it takes special michael@0: # effort to close one's standard output, this seems unlikely to be a michael@0: # problem in practice. michael@0: class TaskPool(object): michael@0: michael@0: # A task we should run in a subprocess. Users should subclass this and michael@0: # fill in the methods as given. michael@0: class Task(object): michael@0: def __init__(self): michael@0: self.pipe = None michael@0: self.start_time = None michael@0: michael@0: # Record that this task is running, with |pipe| as its Popen object, michael@0: # and should time out at |deadline|. michael@0: def start(self, pipe, deadline): michael@0: self.pipe = pipe michael@0: self.deadline = deadline michael@0: michael@0: # Return a shell command (a string or sequence of arguments) to be michael@0: # passed to Popen to run the task. The command will be given michael@0: # /dev/null as its standard input, and pipes as its standard output michael@0: # and error. michael@0: def cmd(self): michael@0: raise NotImplementedError michael@0: michael@0: # TaskPool calls this method to report that the process wrote michael@0: # |string| to its standard output. michael@0: def onStdout(self, string): michael@0: raise NotImplementedError michael@0: michael@0: # TaskPool calls this method to report that the process wrote michael@0: # |string| to its standard error. michael@0: def onStderr(self, string): michael@0: raise NotImplementedError michael@0: michael@0: # TaskPool calls this method to report that the process terminated, michael@0: # yielding |returncode|. michael@0: def onFinished(self, returncode): michael@0: raise NotImplementedError michael@0: michael@0: # TaskPool calls this method to report that the process timed out and michael@0: # was killed. michael@0: def onTimeout(self): michael@0: raise NotImplementedError michael@0: michael@0: # If a task output handler (onStdout, onStderr) throws this, we terminate michael@0: # the task. michael@0: class TerminateTask(Exception): michael@0: pass michael@0: michael@0: def __init__(self, tasks, cwd='.', job_limit=4, timeout=150): michael@0: self.pending = iter(tasks) michael@0: self.cwd = cwd michael@0: self.job_limit = job_limit michael@0: self.timeout = timeout michael@0: self.next_pending = self.get_next_pending() michael@0: michael@0: # Set self.next_pending to the next task that has not yet been executed. michael@0: def get_next_pending(self): michael@0: try: michael@0: return self.pending.next() michael@0: except StopIteration: michael@0: return None michael@0: michael@0: def run_all(self): michael@0: # The currently running tasks: a set of Task instances. michael@0: running = set() michael@0: with open(os.devnull, 'r') as devnull: michael@0: while True: michael@0: while len(running) < self.job_limit and self.next_pending: michael@0: t = self.next_pending michael@0: p = Popen(t.cmd(), bufsize=16384, michael@0: stdin=devnull, stdout=PIPE, stderr=PIPE, michael@0: cwd=self.cwd) michael@0: michael@0: # Put the stdout and stderr pipes in non-blocking mode. See michael@0: # the post-'select' code below for details. michael@0: flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL) michael@0: fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) michael@0: flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL) michael@0: fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) michael@0: michael@0: t.start(p, time.time() + self.timeout) michael@0: running.add(t) michael@0: self.next_pending = self.get_next_pending() michael@0: michael@0: # If we have no tasks running, and the above wasn't able to michael@0: # start any new ones, then we must be done! michael@0: if not running: michael@0: break michael@0: michael@0: # How many seconds do we have until the earliest deadline? michael@0: now = time.time() michael@0: secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0) michael@0: michael@0: # Wait for output or a timeout. michael@0: stdouts_and_stderrs = ([t.pipe.stdout for t in running] michael@0: + [t.pipe.stderr for t in running]) michael@0: (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline) michael@0: finished = set() michael@0: terminate = set() michael@0: for t in running: michael@0: # Since we've placed the pipes in non-blocking mode, these michael@0: # 'read's will simply return as many bytes as are available, michael@0: # rather than blocking until they have accumulated the full michael@0: # amount requested (or reached EOF). The 'read's should michael@0: # never throw, since 'select' has told us there was michael@0: # something available. michael@0: if t.pipe.stdout in readable: michael@0: output = t.pipe.stdout.read(16384) michael@0: if output != "": michael@0: try: michael@0: t.onStdout(output) michael@0: except TerminateTask: michael@0: terminate.add(t) michael@0: if t.pipe.stderr in readable: michael@0: output = t.pipe.stderr.read(16384) michael@0: if output != "": michael@0: try: michael@0: t.onStderr(output) michael@0: except TerminateTask: michael@0: terminate.add(t) michael@0: else: michael@0: # We assume that, once a task has closed its stderr, michael@0: # it will soon terminate. If a task closes its michael@0: # stderr and then hangs, we'll hang too, here. michael@0: t.pipe.wait() michael@0: t.onFinished(t.pipe.returncode) michael@0: finished.add(t) michael@0: # Remove the finished tasks from the running set. (Do this here michael@0: # to avoid mutating the set while iterating over it.) michael@0: running -= finished michael@0: michael@0: # Terminate any tasks whose handlers have asked us to do so. michael@0: for t in terminate: michael@0: t.pipe.terminate() michael@0: t.pipe.wait() michael@0: running.remove(t) michael@0: michael@0: # Terminate any tasks which have missed their deadline. michael@0: finished = set() michael@0: for t in running: michael@0: if now >= t.deadline: michael@0: t.pipe.terminate() michael@0: t.pipe.wait() michael@0: t.onTimeout() michael@0: finished.add(t) michael@0: # Remove the finished tasks from the running set. (Do this here michael@0: # to avoid mutating the set while iterating over it.) michael@0: running -= finished michael@0: return None michael@0: michael@0: def get_cpu_count(): michael@0: """ michael@0: Guess at a reasonable parallelism count to set as the default for the michael@0: current machine and run. michael@0: """ michael@0: # Python 2.6+ michael@0: try: michael@0: import multiprocessing michael@0: return multiprocessing.cpu_count() michael@0: except (ImportError,NotImplementedError): michael@0: pass michael@0: michael@0: # POSIX michael@0: try: michael@0: res = int(os.sysconf('SC_NPROCESSORS_ONLN')) michael@0: if res > 0: michael@0: return res michael@0: except (AttributeError,ValueError): michael@0: pass michael@0: michael@0: # Windows michael@0: try: michael@0: res = int(os.environ['NUMBER_OF_PROCESSORS']) michael@0: if res > 0: michael@0: return res michael@0: except (KeyError, ValueError): michael@0: pass michael@0: michael@0: return 1 michael@0: michael@0: if __name__ == '__main__': michael@0: # Test TaskPool by using it to implement the unique 'sleep sort' algorithm. michael@0: def sleep_sort(ns, timeout): michael@0: sorted=[] michael@0: class SortableTask(TaskPool.Task): michael@0: def __init__(self, n): michael@0: super(SortableTask, self).__init__() michael@0: self.n = n michael@0: def start(self, pipe, deadline): michael@0: super(SortableTask, self).start(pipe, deadline) michael@0: def cmd(self): michael@0: return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)] michael@0: def onStdout(self, text): michael@0: print '%d stdout: %r' % (self.n, text) michael@0: def onStderr(self, text): michael@0: print '%d stderr: %r' % (self.n, text) michael@0: def onFinished(self, returncode): michael@0: print '%d (rc=%d)' % (self.n, returncode) michael@0: sorted.append(self.n) michael@0: def onTimeout(self): michael@0: print '%d timed out' % (self.n,) michael@0: michael@0: p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout) michael@0: p.run_all() michael@0: return sorted michael@0: michael@0: print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))