|
1 import fcntl, os, select, time |
|
2 from subprocess import Popen, PIPE |
|
3 |
|
4 # Run a series of subprocesses. Try to keep up to a certain number going in |
|
5 # parallel at any given time. Enforce time limits. |
|
6 # |
|
7 # This is implemented using non-blocking I/O, and so is Unix-specific. |
|
8 # |
|
9 # We assume that, if a task closes its standard error, then it's safe to |
|
10 # wait for it to terminate. So an ill-behaved task that closes its standard |
|
11 # output and then hangs will hang us, as well. However, as it takes special |
|
12 # effort to close one's standard output, this seems unlikely to be a |
|
13 # problem in practice. |
|
14 class TaskPool(object): |
|
15 |
|
16 # A task we should run in a subprocess. Users should subclass this and |
|
17 # fill in the methods as given. |
|
18 class Task(object): |
|
19 def __init__(self): |
|
20 self.pipe = None |
|
21 self.start_time = None |
|
22 |
|
23 # Record that this task is running, with |pipe| as its Popen object, |
|
24 # and should time out at |deadline|. |
|
25 def start(self, pipe, deadline): |
|
26 self.pipe = pipe |
|
27 self.deadline = deadline |
|
28 |
|
29 # Return a shell command (a string or sequence of arguments) to be |
|
30 # passed to Popen to run the task. The command will be given |
|
31 # /dev/null as its standard input, and pipes as its standard output |
|
32 # and error. |
|
33 def cmd(self): |
|
34 raise NotImplementedError |
|
35 |
|
36 # TaskPool calls this method to report that the process wrote |
|
37 # |string| to its standard output. |
|
38 def onStdout(self, string): |
|
39 raise NotImplementedError |
|
40 |
|
41 # TaskPool calls this method to report that the process wrote |
|
42 # |string| to its standard error. |
|
43 def onStderr(self, string): |
|
44 raise NotImplementedError |
|
45 |
|
46 # TaskPool calls this method to report that the process terminated, |
|
47 # yielding |returncode|. |
|
48 def onFinished(self, returncode): |
|
49 raise NotImplementedError |
|
50 |
|
51 # TaskPool calls this method to report that the process timed out and |
|
52 # was killed. |
|
53 def onTimeout(self): |
|
54 raise NotImplementedError |
|
55 |
|
56 # If a task output handler (onStdout, onStderr) throws this, we terminate |
|
57 # the task. |
|
58 class TerminateTask(Exception): |
|
59 pass |
|
60 |
|
61 def __init__(self, tasks, cwd='.', job_limit=4, timeout=150): |
|
62 self.pending = iter(tasks) |
|
63 self.cwd = cwd |
|
64 self.job_limit = job_limit |
|
65 self.timeout = timeout |
|
66 self.next_pending = self.get_next_pending() |
|
67 |
|
68 # Set self.next_pending to the next task that has not yet been executed. |
|
69 def get_next_pending(self): |
|
70 try: |
|
71 return self.pending.next() |
|
72 except StopIteration: |
|
73 return None |
|
74 |
|
75 def run_all(self): |
|
76 # The currently running tasks: a set of Task instances. |
|
77 running = set() |
|
78 with open(os.devnull, 'r') as devnull: |
|
79 while True: |
|
80 while len(running) < self.job_limit and self.next_pending: |
|
81 t = self.next_pending |
|
82 p = Popen(t.cmd(), bufsize=16384, |
|
83 stdin=devnull, stdout=PIPE, stderr=PIPE, |
|
84 cwd=self.cwd) |
|
85 |
|
86 # Put the stdout and stderr pipes in non-blocking mode. See |
|
87 # the post-'select' code below for details. |
|
88 flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL) |
|
89 fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
|
90 flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL) |
|
91 fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
|
92 |
|
93 t.start(p, time.time() + self.timeout) |
|
94 running.add(t) |
|
95 self.next_pending = self.get_next_pending() |
|
96 |
|
97 # If we have no tasks running, and the above wasn't able to |
|
98 # start any new ones, then we must be done! |
|
99 if not running: |
|
100 break |
|
101 |
|
102 # How many seconds do we have until the earliest deadline? |
|
103 now = time.time() |
|
104 secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0) |
|
105 |
|
106 # Wait for output or a timeout. |
|
107 stdouts_and_stderrs = ([t.pipe.stdout for t in running] |
|
108 + [t.pipe.stderr for t in running]) |
|
109 (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline) |
|
110 finished = set() |
|
111 terminate = set() |
|
112 for t in running: |
|
113 # Since we've placed the pipes in non-blocking mode, these |
|
114 # 'read's will simply return as many bytes as are available, |
|
115 # rather than blocking until they have accumulated the full |
|
116 # amount requested (or reached EOF). The 'read's should |
|
117 # never throw, since 'select' has told us there was |
|
118 # something available. |
|
119 if t.pipe.stdout in readable: |
|
120 output = t.pipe.stdout.read(16384) |
|
121 if output != "": |
|
122 try: |
|
123 t.onStdout(output) |
|
124 except TerminateTask: |
|
125 terminate.add(t) |
|
126 if t.pipe.stderr in readable: |
|
127 output = t.pipe.stderr.read(16384) |
|
128 if output != "": |
|
129 try: |
|
130 t.onStderr(output) |
|
131 except TerminateTask: |
|
132 terminate.add(t) |
|
133 else: |
|
134 # We assume that, once a task has closed its stderr, |
|
135 # it will soon terminate. If a task closes its |
|
136 # stderr and then hangs, we'll hang too, here. |
|
137 t.pipe.wait() |
|
138 t.onFinished(t.pipe.returncode) |
|
139 finished.add(t) |
|
140 # Remove the finished tasks from the running set. (Do this here |
|
141 # to avoid mutating the set while iterating over it.) |
|
142 running -= finished |
|
143 |
|
144 # Terminate any tasks whose handlers have asked us to do so. |
|
145 for t in terminate: |
|
146 t.pipe.terminate() |
|
147 t.pipe.wait() |
|
148 running.remove(t) |
|
149 |
|
150 # Terminate any tasks which have missed their deadline. |
|
151 finished = set() |
|
152 for t in running: |
|
153 if now >= t.deadline: |
|
154 t.pipe.terminate() |
|
155 t.pipe.wait() |
|
156 t.onTimeout() |
|
157 finished.add(t) |
|
158 # Remove the finished tasks from the running set. (Do this here |
|
159 # to avoid mutating the set while iterating over it.) |
|
160 running -= finished |
|
161 return None |
|
162 |
|
163 def get_cpu_count(): |
|
164 """ |
|
165 Guess at a reasonable parallelism count to set as the default for the |
|
166 current machine and run. |
|
167 """ |
|
168 # Python 2.6+ |
|
169 try: |
|
170 import multiprocessing |
|
171 return multiprocessing.cpu_count() |
|
172 except (ImportError,NotImplementedError): |
|
173 pass |
|
174 |
|
175 # POSIX |
|
176 try: |
|
177 res = int(os.sysconf('SC_NPROCESSORS_ONLN')) |
|
178 if res > 0: |
|
179 return res |
|
180 except (AttributeError,ValueError): |
|
181 pass |
|
182 |
|
183 # Windows |
|
184 try: |
|
185 res = int(os.environ['NUMBER_OF_PROCESSORS']) |
|
186 if res > 0: |
|
187 return res |
|
188 except (KeyError, ValueError): |
|
189 pass |
|
190 |
|
191 return 1 |
|
192 |
|
193 if __name__ == '__main__': |
|
194 # Test TaskPool by using it to implement the unique 'sleep sort' algorithm. |
|
195 def sleep_sort(ns, timeout): |
|
196 sorted=[] |
|
197 class SortableTask(TaskPool.Task): |
|
198 def __init__(self, n): |
|
199 super(SortableTask, self).__init__() |
|
200 self.n = n |
|
201 def start(self, pipe, deadline): |
|
202 super(SortableTask, self).start(pipe, deadline) |
|
203 def cmd(self): |
|
204 return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)] |
|
205 def onStdout(self, text): |
|
206 print '%d stdout: %r' % (self.n, text) |
|
207 def onStderr(self, text): |
|
208 print '%d stderr: %r' % (self.n, text) |
|
209 def onFinished(self, returncode): |
|
210 print '%d (rc=%d)' % (self.n, returncode) |
|
211 sorted.append(self.n) |
|
212 def onTimeout(self): |
|
213 print '%d timed out' % (self.n,) |
|
214 |
|
215 p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout) |
|
216 p.run_all() |
|
217 return sorted |
|
218 |
|
219 print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15)) |