|
1 # A unix-oriented process dispatcher. Uses a single thread with select and |
|
2 # waitpid to dispatch tasks. This avoids several deadlocks that are possible |
|
3 # with fork/exec + threads + Python. |
|
4 |
|
5 import errno, os, select |
|
6 from datetime import datetime, timedelta |
|
7 from results import TestOutput |
|
8 |
|
9 class Task(object): |
|
10 def __init__(self, test, pid, stdout, stderr): |
|
11 self.test = test |
|
12 self.cmd = test.get_command(test.js_cmd_prefix) |
|
13 self.pid = pid |
|
14 self.stdout = stdout |
|
15 self.stderr = stderr |
|
16 self.start = datetime.now() |
|
17 self.out = [] |
|
18 self.err = [] |
|
19 |
|
20 def spawn_test(test, passthrough = False): |
|
21 """Spawn one child, return a task struct.""" |
|
22 if not passthrough: |
|
23 (rout, wout) = os.pipe() |
|
24 (rerr, werr) = os.pipe() |
|
25 |
|
26 rv = os.fork() |
|
27 |
|
28 # Parent. |
|
29 if rv: |
|
30 os.close(wout) |
|
31 os.close(werr) |
|
32 return Task(test, rv, rout, rerr) |
|
33 |
|
34 # Child. |
|
35 os.close(rout) |
|
36 os.close(rerr) |
|
37 |
|
38 os.dup2(wout, 1) |
|
39 os.dup2(werr, 2) |
|
40 |
|
41 cmd = test.get_command(test.js_cmd_prefix) |
|
42 os.execvp(cmd[0], cmd) |
|
43 |
|
44 def total_seconds(td): |
|
45 """ |
|
46 Return the total number of seconds contained in the duration as a float |
|
47 """ |
|
48 return (float(td.microseconds) + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 |
|
49 |
|
50 def get_max_wait(tasks, results, timeout): |
|
51 """ |
|
52 Return the maximum time we can wait before any task should time out. |
|
53 """ |
|
54 |
|
55 # If we have a progress-meter, we need to wake up to update it frequently. |
|
56 wait = results.pb.update_granularity() |
|
57 |
|
58 # If a timeout is supplied, we need to wake up for the first task to |
|
59 # timeout if that is sooner. |
|
60 if timeout: |
|
61 now = datetime.now() |
|
62 timeout_delta = timedelta(seconds=timeout) |
|
63 for task in tasks: |
|
64 remaining = task.start + timeout_delta - now |
|
65 if remaining < wait: |
|
66 wait = remaining |
|
67 |
|
68 # Return the wait time in seconds, clamped to zero. |
|
69 return max(total_seconds(wait), 0) |
|
70 |
|
71 def flush_input(fd, frags): |
|
72 """ |
|
73 Read any pages sitting in the file descriptor 'fd' into the list 'frags'. |
|
74 """ |
|
75 rv = os.read(fd, 4096) |
|
76 frags.append(rv) |
|
77 while len(rv) == 4096: |
|
78 # If read() returns a full buffer, it may indicate there was 1 buffer |
|
79 # worth of data, or that there is more data to read. Poll the socket |
|
80 # before we read again to ensure that we will not block indefinitly. |
|
81 readable, _, _ = select.select([fd], [], [], 0) |
|
82 if not readable: |
|
83 return |
|
84 |
|
85 rv = os.read(fd, 4096) |
|
86 frags.append(rv) |
|
87 |
|
88 def read_input(tasks, timeout): |
|
89 """ |
|
90 Select on input or errors from the given task list for a max of timeout |
|
91 seconds. |
|
92 """ |
|
93 rlist = [] |
|
94 exlist = [] |
|
95 outmap = {} # Fast access to fragment list given fd. |
|
96 for t in tasks: |
|
97 rlist.append(t.stdout) |
|
98 rlist.append(t.stderr) |
|
99 outmap[t.stdout] = t.out |
|
100 outmap[t.stderr] = t.err |
|
101 # This will trigger with a close event when the child dies, allowing |
|
102 # us to respond immediately and not leave cores idle. |
|
103 exlist.append(t.stdout) |
|
104 |
|
105 readable, _, _ = select.select(rlist, [], exlist, timeout) |
|
106 for fd in readable: |
|
107 flush_input(fd, outmap[fd]) |
|
108 |
|
109 def remove_task(tasks, pid): |
|
110 """ |
|
111 Return a pair with the removed task and the new, modified tasks list. |
|
112 """ |
|
113 index = None |
|
114 for i, t in enumerate(tasks): |
|
115 if t.pid == pid: |
|
116 index = i |
|
117 break |
|
118 else: |
|
119 raise KeyError("No such pid: %s" % pid) |
|
120 |
|
121 out = tasks[index] |
|
122 tasks.pop(index) |
|
123 return out |
|
124 |
|
125 def timed_out(task, timeout): |
|
126 """ |
|
127 Return True if the given task has been running for longer than |timeout|. |
|
128 |timeout| may be falsy, indicating an infinite timeout (in which case |
|
129 timed_out always returns False). |
|
130 """ |
|
131 if timeout: |
|
132 now = datetime.now() |
|
133 return (now - task.start) > timedelta(seconds=timeout) |
|
134 return False |
|
135 |
|
136 def reap_zombies(tasks, results, timeout): |
|
137 """ |
|
138 Search for children of this process that have finished. If they are tasks, |
|
139 then this routine will clean up the child and send a TestOutput to the |
|
140 results channel. This method returns a new task list that has had the ended |
|
141 tasks removed. |
|
142 """ |
|
143 while True: |
|
144 try: |
|
145 pid, status = os.waitpid(0, os.WNOHANG) |
|
146 if pid == 0: |
|
147 break |
|
148 except OSError, e: |
|
149 if e.errno == errno.ECHILD: |
|
150 break |
|
151 raise e |
|
152 |
|
153 ended = remove_task(tasks, pid) |
|
154 flush_input(ended.stdout, ended.out) |
|
155 flush_input(ended.stderr, ended.err) |
|
156 os.close(ended.stdout) |
|
157 os.close(ended.stderr) |
|
158 |
|
159 returncode = os.WEXITSTATUS(status) |
|
160 if os.WIFSIGNALED(status): |
|
161 returncode = -os.WTERMSIG(status) |
|
162 |
|
163 out = TestOutput( |
|
164 ended.test, |
|
165 ended.cmd, |
|
166 ''.join(ended.out), |
|
167 ''.join(ended.err), |
|
168 returncode, |
|
169 total_seconds(datetime.now() - ended.start), |
|
170 timed_out(ended, timeout)) |
|
171 results.push(out) |
|
172 return tasks |
|
173 |
|
174 def kill_undead(tasks, results, timeout): |
|
175 """ |
|
176 Signal all children that are over the given timeout. |
|
177 """ |
|
178 for task in tasks: |
|
179 if timed_out(task, timeout): |
|
180 os.kill(task.pid, 9) |
|
181 |
|
182 def run_all_tests(tests, results, options): |
|
183 # Copy and reverse for fast pop off end. |
|
184 tests = tests[:] |
|
185 tests.reverse() |
|
186 |
|
187 # The set of currently running tests. |
|
188 tasks = [] |
|
189 |
|
190 while len(tests) or len(tasks): |
|
191 while len(tests) and len(tasks) < options.worker_count: |
|
192 tasks.append(spawn_test(tests.pop(), options.passthrough)) |
|
193 |
|
194 timeout = get_max_wait(tasks, results, options.timeout) |
|
195 read_input(tasks, timeout) |
|
196 |
|
197 kill_undead(tasks, results, options.timeout) |
|
198 tasks = reap_zombies(tasks, results, options.timeout) |
|
199 |
|
200 results.pb.poke() |
|
201 |
|
202 return True |