js/src/gdb/taskpool.py

Sat, 03 Jan 2015 20:18:00 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Sat, 03 Jan 2015 20:18:00 +0100
branch
TOR_BUG_3246
changeset 7
129ffea94266
permissions
-rw-r--r--

Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.

michael@0 1 import fcntl, os, select, time
michael@0 2 from subprocess import Popen, PIPE
michael@0 3
michael@0 4 # Run a series of subprocesses. Try to keep up to a certain number going in
michael@0 5 # parallel at any given time. Enforce time limits.
michael@0 6 #
michael@0 7 # This is implemented using non-blocking I/O, and so is Unix-specific.
michael@0 8 #
michael@0 9 # We assume that, if a task closes its standard error, then it's safe to
michael@0 10 # wait for it to terminate. So an ill-behaved task that closes its standard
michael@0 11 # output and then hangs will hang us, as well. However, as it takes special
michael@0 12 # effort to close one's standard output, this seems unlikely to be a
michael@0 13 # problem in practice.
michael@0 14 class TaskPool(object):
michael@0 15
michael@0 16 # A task we should run in a subprocess. Users should subclass this and
michael@0 17 # fill in the methods as given.
michael@0 18 class Task(object):
michael@0 19 def __init__(self):
michael@0 20 self.pipe = None
michael@0 21 self.start_time = None
michael@0 22
michael@0 23 # Record that this task is running, with |pipe| as its Popen object,
michael@0 24 # and should time out at |deadline|.
michael@0 25 def start(self, pipe, deadline):
michael@0 26 self.pipe = pipe
michael@0 27 self.deadline = deadline
michael@0 28
michael@0 29 # Return a shell command (a string or sequence of arguments) to be
michael@0 30 # passed to Popen to run the task. The command will be given
michael@0 31 # /dev/null as its standard input, and pipes as its standard output
michael@0 32 # and error.
michael@0 33 def cmd(self):
michael@0 34 raise NotImplementedError
michael@0 35
michael@0 36 # TaskPool calls this method to report that the process wrote
michael@0 37 # |string| to its standard output.
michael@0 38 def onStdout(self, string):
michael@0 39 raise NotImplementedError
michael@0 40
michael@0 41 # TaskPool calls this method to report that the process wrote
michael@0 42 # |string| to its standard error.
michael@0 43 def onStderr(self, string):
michael@0 44 raise NotImplementedError
michael@0 45
michael@0 46 # TaskPool calls this method to report that the process terminated,
michael@0 47 # yielding |returncode|.
michael@0 48 def onFinished(self, returncode):
michael@0 49 raise NotImplementedError
michael@0 50
michael@0 51 # TaskPool calls this method to report that the process timed out and
michael@0 52 # was killed.
michael@0 53 def onTimeout(self):
michael@0 54 raise NotImplementedError
michael@0 55
michael@0 56 # If a task output handler (onStdout, onStderr) throws this, we terminate
michael@0 57 # the task.
michael@0 58 class TerminateTask(Exception):
michael@0 59 pass
michael@0 60
michael@0 61 def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
michael@0 62 self.pending = iter(tasks)
michael@0 63 self.cwd = cwd
michael@0 64 self.job_limit = job_limit
michael@0 65 self.timeout = timeout
michael@0 66 self.next_pending = self.get_next_pending()
michael@0 67
michael@0 68 # Set self.next_pending to the next task that has not yet been executed.
michael@0 69 def get_next_pending(self):
michael@0 70 try:
michael@0 71 return self.pending.next()
michael@0 72 except StopIteration:
michael@0 73 return None
michael@0 74
michael@0 75 def run_all(self):
michael@0 76 # The currently running tasks: a set of Task instances.
michael@0 77 running = set()
michael@0 78 with open(os.devnull, 'r') as devnull:
michael@0 79 while True:
michael@0 80 while len(running) < self.job_limit and self.next_pending:
michael@0 81 t = self.next_pending
michael@0 82 p = Popen(t.cmd(), bufsize=16384,
michael@0 83 stdin=devnull, stdout=PIPE, stderr=PIPE,
michael@0 84 cwd=self.cwd)
michael@0 85
michael@0 86 # Put the stdout and stderr pipes in non-blocking mode. See
michael@0 87 # the post-'select' code below for details.
michael@0 88 flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
michael@0 89 fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
michael@0 90 flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
michael@0 91 fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
michael@0 92
michael@0 93 t.start(p, time.time() + self.timeout)
michael@0 94 running.add(t)
michael@0 95 self.next_pending = self.get_next_pending()
michael@0 96
michael@0 97 # If we have no tasks running, and the above wasn't able to
michael@0 98 # start any new ones, then we must be done!
michael@0 99 if not running:
michael@0 100 break
michael@0 101
michael@0 102 # How many seconds do we have until the earliest deadline?
michael@0 103 now = time.time()
michael@0 104 secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
michael@0 105
michael@0 106 # Wait for output or a timeout.
michael@0 107 stdouts_and_stderrs = ([t.pipe.stdout for t in running]
michael@0 108 + [t.pipe.stderr for t in running])
michael@0 109 (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
michael@0 110 finished = set()
michael@0 111 terminate = set()
michael@0 112 for t in running:
michael@0 113 # Since we've placed the pipes in non-blocking mode, these
michael@0 114 # 'read's will simply return as many bytes as are available,
michael@0 115 # rather than blocking until they have accumulated the full
michael@0 116 # amount requested (or reached EOF). The 'read's should
michael@0 117 # never throw, since 'select' has told us there was
michael@0 118 # something available.
michael@0 119 if t.pipe.stdout in readable:
michael@0 120 output = t.pipe.stdout.read(16384)
michael@0 121 if output != "":
michael@0 122 try:
michael@0 123 t.onStdout(output)
michael@0 124 except TerminateTask:
michael@0 125 terminate.add(t)
michael@0 126 if t.pipe.stderr in readable:
michael@0 127 output = t.pipe.stderr.read(16384)
michael@0 128 if output != "":
michael@0 129 try:
michael@0 130 t.onStderr(output)
michael@0 131 except TerminateTask:
michael@0 132 terminate.add(t)
michael@0 133 else:
michael@0 134 # We assume that, once a task has closed its stderr,
michael@0 135 # it will soon terminate. If a task closes its
michael@0 136 # stderr and then hangs, we'll hang too, here.
michael@0 137 t.pipe.wait()
michael@0 138 t.onFinished(t.pipe.returncode)
michael@0 139 finished.add(t)
michael@0 140 # Remove the finished tasks from the running set. (Do this here
michael@0 141 # to avoid mutating the set while iterating over it.)
michael@0 142 running -= finished
michael@0 143
michael@0 144 # Terminate any tasks whose handlers have asked us to do so.
michael@0 145 for t in terminate:
michael@0 146 t.pipe.terminate()
michael@0 147 t.pipe.wait()
michael@0 148 running.remove(t)
michael@0 149
michael@0 150 # Terminate any tasks which have missed their deadline.
michael@0 151 finished = set()
michael@0 152 for t in running:
michael@0 153 if now >= t.deadline:
michael@0 154 t.pipe.terminate()
michael@0 155 t.pipe.wait()
michael@0 156 t.onTimeout()
michael@0 157 finished.add(t)
michael@0 158 # Remove the finished tasks from the running set. (Do this here
michael@0 159 # to avoid mutating the set while iterating over it.)
michael@0 160 running -= finished
michael@0 161 return None
michael@0 162
michael@0 163 def get_cpu_count():
michael@0 164 """
michael@0 165 Guess at a reasonable parallelism count to set as the default for the
michael@0 166 current machine and run.
michael@0 167 """
michael@0 168 # Python 2.6+
michael@0 169 try:
michael@0 170 import multiprocessing
michael@0 171 return multiprocessing.cpu_count()
michael@0 172 except (ImportError,NotImplementedError):
michael@0 173 pass
michael@0 174
michael@0 175 # POSIX
michael@0 176 try:
michael@0 177 res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
michael@0 178 if res > 0:
michael@0 179 return res
michael@0 180 except (AttributeError,ValueError):
michael@0 181 pass
michael@0 182
michael@0 183 # Windows
michael@0 184 try:
michael@0 185 res = int(os.environ['NUMBER_OF_PROCESSORS'])
michael@0 186 if res > 0:
michael@0 187 return res
michael@0 188 except (KeyError, ValueError):
michael@0 189 pass
michael@0 190
michael@0 191 return 1
michael@0 192
michael@0 193 if __name__ == '__main__':
michael@0 194 # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
michael@0 195 def sleep_sort(ns, timeout):
michael@0 196 sorted=[]
michael@0 197 class SortableTask(TaskPool.Task):
michael@0 198 def __init__(self, n):
michael@0 199 super(SortableTask, self).__init__()
michael@0 200 self.n = n
michael@0 201 def start(self, pipe, deadline):
michael@0 202 super(SortableTask, self).start(pipe, deadline)
michael@0 203 def cmd(self):
michael@0 204 return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
michael@0 205 def onStdout(self, text):
michael@0 206 print '%d stdout: %r' % (self.n, text)
michael@0 207 def onStderr(self, text):
michael@0 208 print '%d stderr: %r' % (self.n, text)
michael@0 209 def onFinished(self, returncode):
michael@0 210 print '%d (rc=%d)' % (self.n, returncode)
michael@0 211 sorted.append(self.n)
michael@0 212 def onTimeout(self):
michael@0 213 print '%d timed out' % (self.n,)
michael@0 214
michael@0 215 p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
michael@0 216 p.run_all()
michael@0 217 return sorted
michael@0 218
michael@0 219 print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))

mercurial