Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 # A unix-oriented process dispatcher. Uses a single thread with select and
2 # waitpid to dispatch tasks. This avoids several deadlocks that are possible
3 # with fork/exec + threads + Python.
5 import errno, os, select
6 from datetime import datetime, timedelta
7 from results import TestOutput
9 class Task(object):
10 def __init__(self, test, pid, stdout, stderr):
11 self.test = test
12 self.cmd = test.get_command(test.js_cmd_prefix)
13 self.pid = pid
14 self.stdout = stdout
15 self.stderr = stderr
16 self.start = datetime.now()
17 self.out = []
18 self.err = []
20 def spawn_test(test, passthrough = False):
21 """Spawn one child, return a task struct."""
22 if not passthrough:
23 (rout, wout) = os.pipe()
24 (rerr, werr) = os.pipe()
26 rv = os.fork()
28 # Parent.
29 if rv:
30 os.close(wout)
31 os.close(werr)
32 return Task(test, rv, rout, rerr)
34 # Child.
35 os.close(rout)
36 os.close(rerr)
38 os.dup2(wout, 1)
39 os.dup2(werr, 2)
41 cmd = test.get_command(test.js_cmd_prefix)
42 os.execvp(cmd[0], cmd)
44 def total_seconds(td):
45 """
46 Return the total number of seconds contained in the duration as a float
47 """
48 return (float(td.microseconds) + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
50 def get_max_wait(tasks, results, timeout):
51 """
52 Return the maximum time we can wait before any task should time out.
53 """
55 # If we have a progress-meter, we need to wake up to update it frequently.
56 wait = results.pb.update_granularity()
58 # If a timeout is supplied, we need to wake up for the first task to
59 # timeout if that is sooner.
60 if timeout:
61 now = datetime.now()
62 timeout_delta = timedelta(seconds=timeout)
63 for task in tasks:
64 remaining = task.start + timeout_delta - now
65 if remaining < wait:
66 wait = remaining
68 # Return the wait time in seconds, clamped to zero.
69 return max(total_seconds(wait), 0)
71 def flush_input(fd, frags):
72 """
73 Read any pages sitting in the file descriptor 'fd' into the list 'frags'.
74 """
75 rv = os.read(fd, 4096)
76 frags.append(rv)
77 while len(rv) == 4096:
78 # If read() returns a full buffer, it may indicate there was 1 buffer
79 # worth of data, or that there is more data to read. Poll the socket
80 # before we read again to ensure that we will not block indefinitly.
81 readable, _, _ = select.select([fd], [], [], 0)
82 if not readable:
83 return
85 rv = os.read(fd, 4096)
86 frags.append(rv)
88 def read_input(tasks, timeout):
89 """
90 Select on input or errors from the given task list for a max of timeout
91 seconds.
92 """
93 rlist = []
94 exlist = []
95 outmap = {} # Fast access to fragment list given fd.
96 for t in tasks:
97 rlist.append(t.stdout)
98 rlist.append(t.stderr)
99 outmap[t.stdout] = t.out
100 outmap[t.stderr] = t.err
101 # This will trigger with a close event when the child dies, allowing
102 # us to respond immediately and not leave cores idle.
103 exlist.append(t.stdout)
105 readable, _, _ = select.select(rlist, [], exlist, timeout)
106 for fd in readable:
107 flush_input(fd, outmap[fd])
109 def remove_task(tasks, pid):
110 """
111 Return a pair with the removed task and the new, modified tasks list.
112 """
113 index = None
114 for i, t in enumerate(tasks):
115 if t.pid == pid:
116 index = i
117 break
118 else:
119 raise KeyError("No such pid: %s" % pid)
121 out = tasks[index]
122 tasks.pop(index)
123 return out
125 def timed_out(task, timeout):
126 """
127 Return True if the given task has been running for longer than |timeout|.
128 |timeout| may be falsy, indicating an infinite timeout (in which case
129 timed_out always returns False).
130 """
131 if timeout:
132 now = datetime.now()
133 return (now - task.start) > timedelta(seconds=timeout)
134 return False
136 def reap_zombies(tasks, results, timeout):
137 """
138 Search for children of this process that have finished. If they are tasks,
139 then this routine will clean up the child and send a TestOutput to the
140 results channel. This method returns a new task list that has had the ended
141 tasks removed.
142 """
143 while True:
144 try:
145 pid, status = os.waitpid(0, os.WNOHANG)
146 if pid == 0:
147 break
148 except OSError, e:
149 if e.errno == errno.ECHILD:
150 break
151 raise e
153 ended = remove_task(tasks, pid)
154 flush_input(ended.stdout, ended.out)
155 flush_input(ended.stderr, ended.err)
156 os.close(ended.stdout)
157 os.close(ended.stderr)
159 returncode = os.WEXITSTATUS(status)
160 if os.WIFSIGNALED(status):
161 returncode = -os.WTERMSIG(status)
163 out = TestOutput(
164 ended.test,
165 ended.cmd,
166 ''.join(ended.out),
167 ''.join(ended.err),
168 returncode,
169 total_seconds(datetime.now() - ended.start),
170 timed_out(ended, timeout))
171 results.push(out)
172 return tasks
174 def kill_undead(tasks, results, timeout):
175 """
176 Signal all children that are over the given timeout.
177 """
178 for task in tasks:
179 if timed_out(task, timeout):
180 os.kill(task.pid, 9)
182 def run_all_tests(tests, results, options):
183 # Copy and reverse for fast pop off end.
184 tests = tests[:]
185 tests.reverse()
187 # The set of currently running tests.
188 tasks = []
190 while len(tests) or len(tasks):
191 while len(tests) and len(tasks) < options.worker_count:
192 tasks.append(spawn_test(tests.pop(), options.passthrough))
194 timeout = get_max_wait(tasks, results, options.timeout)
195 read_input(tasks, timeout)
197 kill_undead(tasks, results, options.timeout)
198 tasks = reap_zombies(tasks, results, options.timeout)
200 results.pb.poke()
202 return True