Sat, 03 Jan 2015 20:18:00 +0100
Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.
michael@0 | 1 | import fcntl, os, select, time |
michael@0 | 2 | from subprocess import Popen, PIPE |
michael@0 | 3 | |
michael@0 | 4 | # Run a series of subprocesses. Try to keep up to a certain number going in |
michael@0 | 5 | # parallel at any given time. Enforce time limits. |
michael@0 | 6 | # |
michael@0 | 7 | # This is implemented using non-blocking I/O, and so is Unix-specific. |
michael@0 | 8 | # |
michael@0 | 9 | # We assume that, if a task closes its standard error, then it's safe to |
michael@0 | 10 | # wait for it to terminate. So an ill-behaved task that closes its standard |
michael@0 | 11 | # output and then hangs will hang us, as well. However, as it takes special |
michael@0 | 12 | # effort to close one's standard output, this seems unlikely to be a |
michael@0 | 13 | # problem in practice. |
michael@0 | 14 | class TaskPool(object): |
michael@0 | 15 | |
michael@0 | 16 | # A task we should run in a subprocess. Users should subclass this and |
michael@0 | 17 | # fill in the methods as given. |
michael@0 | 18 | class Task(object): |
michael@0 | 19 | def __init__(self): |
michael@0 | 20 | self.pipe = None |
michael@0 | 21 | self.start_time = None |
michael@0 | 22 | |
michael@0 | 23 | # Record that this task is running, with |pipe| as its Popen object, |
michael@0 | 24 | # and should time out at |deadline|. |
michael@0 | 25 | def start(self, pipe, deadline): |
michael@0 | 26 | self.pipe = pipe |
michael@0 | 27 | self.deadline = deadline |
michael@0 | 28 | |
michael@0 | 29 | # Return a shell command (a string or sequence of arguments) to be |
michael@0 | 30 | # passed to Popen to run the task. The command will be given |
michael@0 | 31 | # /dev/null as its standard input, and pipes as its standard output |
michael@0 | 32 | # and error. |
michael@0 | 33 | def cmd(self): |
michael@0 | 34 | raise NotImplementedError |
michael@0 | 35 | |
michael@0 | 36 | # TaskPool calls this method to report that the process wrote |
michael@0 | 37 | # |string| to its standard output. |
michael@0 | 38 | def onStdout(self, string): |
michael@0 | 39 | raise NotImplementedError |
michael@0 | 40 | |
michael@0 | 41 | # TaskPool calls this method to report that the process wrote |
michael@0 | 42 | # |string| to its standard error. |
michael@0 | 43 | def onStderr(self, string): |
michael@0 | 44 | raise NotImplementedError |
michael@0 | 45 | |
michael@0 | 46 | # TaskPool calls this method to report that the process terminated, |
michael@0 | 47 | # yielding |returncode|. |
michael@0 | 48 | def onFinished(self, returncode): |
michael@0 | 49 | raise NotImplementedError |
michael@0 | 50 | |
michael@0 | 51 | # TaskPool calls this method to report that the process timed out and |
michael@0 | 52 | # was killed. |
michael@0 | 53 | def onTimeout(self): |
michael@0 | 54 | raise NotImplementedError |
michael@0 | 55 | |
michael@0 | 56 | # If a task output handler (onStdout, onStderr) throws this, we terminate |
michael@0 | 57 | # the task. |
michael@0 | 58 | class TerminateTask(Exception): |
michael@0 | 59 | pass |
michael@0 | 60 | |
michael@0 | 61 | def __init__(self, tasks, cwd='.', job_limit=4, timeout=150): |
michael@0 | 62 | self.pending = iter(tasks) |
michael@0 | 63 | self.cwd = cwd |
michael@0 | 64 | self.job_limit = job_limit |
michael@0 | 65 | self.timeout = timeout |
michael@0 | 66 | self.next_pending = self.get_next_pending() |
michael@0 | 67 | |
michael@0 | 68 | # Set self.next_pending to the next task that has not yet been executed. |
michael@0 | 69 | def get_next_pending(self): |
michael@0 | 70 | try: |
michael@0 | 71 | return self.pending.next() |
michael@0 | 72 | except StopIteration: |
michael@0 | 73 | return None |
michael@0 | 74 | |
michael@0 | 75 | def run_all(self): |
michael@0 | 76 | # The currently running tasks: a set of Task instances. |
michael@0 | 77 | running = set() |
michael@0 | 78 | with open(os.devnull, 'r') as devnull: |
michael@0 | 79 | while True: |
michael@0 | 80 | while len(running) < self.job_limit and self.next_pending: |
michael@0 | 81 | t = self.next_pending |
michael@0 | 82 | p = Popen(t.cmd(), bufsize=16384, |
michael@0 | 83 | stdin=devnull, stdout=PIPE, stderr=PIPE, |
michael@0 | 84 | cwd=self.cwd) |
michael@0 | 85 | |
michael@0 | 86 | # Put the stdout and stderr pipes in non-blocking mode. See |
michael@0 | 87 | # the post-'select' code below for details. |
michael@0 | 88 | flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL) |
michael@0 | 89 | fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
michael@0 | 90 | flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL) |
michael@0 | 91 | fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
michael@0 | 92 | |
michael@0 | 93 | t.start(p, time.time() + self.timeout) |
michael@0 | 94 | running.add(t) |
michael@0 | 95 | self.next_pending = self.get_next_pending() |
michael@0 | 96 | |
michael@0 | 97 | # If we have no tasks running, and the above wasn't able to |
michael@0 | 98 | # start any new ones, then we must be done! |
michael@0 | 99 | if not running: |
michael@0 | 100 | break |
michael@0 | 101 | |
michael@0 | 102 | # How many seconds do we have until the earliest deadline? |
michael@0 | 103 | now = time.time() |
michael@0 | 104 | secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0) |
michael@0 | 105 | |
michael@0 | 106 | # Wait for output or a timeout. |
michael@0 | 107 | stdouts_and_stderrs = ([t.pipe.stdout for t in running] |
michael@0 | 108 | + [t.pipe.stderr for t in running]) |
michael@0 | 109 | (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline) |
michael@0 | 110 | finished = set() |
michael@0 | 111 | terminate = set() |
michael@0 | 112 | for t in running: |
michael@0 | 113 | # Since we've placed the pipes in non-blocking mode, these |
michael@0 | 114 | # 'read's will simply return as many bytes as are available, |
michael@0 | 115 | # rather than blocking until they have accumulated the full |
michael@0 | 116 | # amount requested (or reached EOF). The 'read's should |
michael@0 | 117 | # never throw, since 'select' has told us there was |
michael@0 | 118 | # something available. |
michael@0 | 119 | if t.pipe.stdout in readable: |
michael@0 | 120 | output = t.pipe.stdout.read(16384) |
michael@0 | 121 | if output != "": |
michael@0 | 122 | try: |
michael@0 | 123 | t.onStdout(output) |
michael@0 | 124 | except TerminateTask: |
michael@0 | 125 | terminate.add(t) |
michael@0 | 126 | if t.pipe.stderr in readable: |
michael@0 | 127 | output = t.pipe.stderr.read(16384) |
michael@0 | 128 | if output != "": |
michael@0 | 129 | try: |
michael@0 | 130 | t.onStderr(output) |
michael@0 | 131 | except TerminateTask: |
michael@0 | 132 | terminate.add(t) |
michael@0 | 133 | else: |
michael@0 | 134 | # We assume that, once a task has closed its stderr, |
michael@0 | 135 | # it will soon terminate. If a task closes its |
michael@0 | 136 | # stderr and then hangs, we'll hang too, here. |
michael@0 | 137 | t.pipe.wait() |
michael@0 | 138 | t.onFinished(t.pipe.returncode) |
michael@0 | 139 | finished.add(t) |
michael@0 | 140 | # Remove the finished tasks from the running set. (Do this here |
michael@0 | 141 | # to avoid mutating the set while iterating over it.) |
michael@0 | 142 | running -= finished |
michael@0 | 143 | |
michael@0 | 144 | # Terminate any tasks whose handlers have asked us to do so. |
michael@0 | 145 | for t in terminate: |
michael@0 | 146 | t.pipe.terminate() |
michael@0 | 147 | t.pipe.wait() |
michael@0 | 148 | running.remove(t) |
michael@0 | 149 | |
michael@0 | 150 | # Terminate any tasks which have missed their deadline. |
michael@0 | 151 | finished = set() |
michael@0 | 152 | for t in running: |
michael@0 | 153 | if now >= t.deadline: |
michael@0 | 154 | t.pipe.terminate() |
michael@0 | 155 | t.pipe.wait() |
michael@0 | 156 | t.onTimeout() |
michael@0 | 157 | finished.add(t) |
michael@0 | 158 | # Remove the finished tasks from the running set. (Do this here |
michael@0 | 159 | # to avoid mutating the set while iterating over it.) |
michael@0 | 160 | running -= finished |
michael@0 | 161 | return None |
michael@0 | 162 | |
michael@0 | 163 | def get_cpu_count(): |
michael@0 | 164 | """ |
michael@0 | 165 | Guess at a reasonable parallelism count to set as the default for the |
michael@0 | 166 | current machine and run. |
michael@0 | 167 | """ |
michael@0 | 168 | # Python 2.6+ |
michael@0 | 169 | try: |
michael@0 | 170 | import multiprocessing |
michael@0 | 171 | return multiprocessing.cpu_count() |
michael@0 | 172 | except (ImportError,NotImplementedError): |
michael@0 | 173 | pass |
michael@0 | 174 | |
michael@0 | 175 | # POSIX |
michael@0 | 176 | try: |
michael@0 | 177 | res = int(os.sysconf('SC_NPROCESSORS_ONLN')) |
michael@0 | 178 | if res > 0: |
michael@0 | 179 | return res |
michael@0 | 180 | except (AttributeError,ValueError): |
michael@0 | 181 | pass |
michael@0 | 182 | |
michael@0 | 183 | # Windows |
michael@0 | 184 | try: |
michael@0 | 185 | res = int(os.environ['NUMBER_OF_PROCESSORS']) |
michael@0 | 186 | if res > 0: |
michael@0 | 187 | return res |
michael@0 | 188 | except (KeyError, ValueError): |
michael@0 | 189 | pass |
michael@0 | 190 | |
michael@0 | 191 | return 1 |
michael@0 | 192 | |
michael@0 | 193 | if __name__ == '__main__': |
michael@0 | 194 | # Test TaskPool by using it to implement the unique 'sleep sort' algorithm. |
michael@0 | 195 | def sleep_sort(ns, timeout): |
michael@0 | 196 | sorted=[] |
michael@0 | 197 | class SortableTask(TaskPool.Task): |
michael@0 | 198 | def __init__(self, n): |
michael@0 | 199 | super(SortableTask, self).__init__() |
michael@0 | 200 | self.n = n |
michael@0 | 201 | def start(self, pipe, deadline): |
michael@0 | 202 | super(SortableTask, self).start(pipe, deadline) |
michael@0 | 203 | def cmd(self): |
michael@0 | 204 | return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)] |
michael@0 | 205 | def onStdout(self, text): |
michael@0 | 206 | print '%d stdout: %r' % (self.n, text) |
michael@0 | 207 | def onStderr(self, text): |
michael@0 | 208 | print '%d stderr: %r' % (self.n, text) |
michael@0 | 209 | def onFinished(self, returncode): |
michael@0 | 210 | print '%d (rc=%d)' % (self.n, returncode) |
michael@0 | 211 | sorted.append(self.n) |
michael@0 | 212 | def onTimeout(self): |
michael@0 | 213 | print '%d timed out' % (self.n,) |
michael@0 | 214 | |
michael@0 | 215 | p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout) |
michael@0 | 216 | p.run_all() |
michael@0 | 217 | return sorted |
michael@0 | 218 | |
michael@0 | 219 | print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15)) |