Sat, 03 Jan 2015 20:18:00 +0100
Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.
1 import fcntl, os, select, time
2 from subprocess import Popen, PIPE
4 # Run a series of subprocesses. Try to keep up to a certain number going in
5 # parallel at any given time. Enforce time limits.
6 #
7 # This is implemented using non-blocking I/O, and so is Unix-specific.
8 #
9 # We assume that, if a task closes its standard error, then it's safe to
10 # wait for it to terminate. So an ill-behaved task that closes its standard
11 # output and then hangs will hang us, as well. However, as it takes special
12 # effort to close one's standard output, this seems unlikely to be a
13 # problem in practice.
14 class TaskPool(object):
16 # A task we should run in a subprocess. Users should subclass this and
17 # fill in the methods as given.
18 class Task(object):
19 def __init__(self):
20 self.pipe = None
21 self.start_time = None
23 # Record that this task is running, with |pipe| as its Popen object,
24 # and should time out at |deadline|.
25 def start(self, pipe, deadline):
26 self.pipe = pipe
27 self.deadline = deadline
29 # Return a shell command (a string or sequence of arguments) to be
30 # passed to Popen to run the task. The command will be given
31 # /dev/null as its standard input, and pipes as its standard output
32 # and error.
33 def cmd(self):
34 raise NotImplementedError
36 # TaskPool calls this method to report that the process wrote
37 # |string| to its standard output.
38 def onStdout(self, string):
39 raise NotImplementedError
41 # TaskPool calls this method to report that the process wrote
42 # |string| to its standard error.
43 def onStderr(self, string):
44 raise NotImplementedError
46 # TaskPool calls this method to report that the process terminated,
47 # yielding |returncode|.
48 def onFinished(self, returncode):
49 raise NotImplementedError
51 # TaskPool calls this method to report that the process timed out and
52 # was killed.
53 def onTimeout(self):
54 raise NotImplementedError
56 # If a task output handler (onStdout, onStderr) throws this, we terminate
57 # the task.
58 class TerminateTask(Exception):
59 pass
61 def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
62 self.pending = iter(tasks)
63 self.cwd = cwd
64 self.job_limit = job_limit
65 self.timeout = timeout
66 self.next_pending = self.get_next_pending()
68 # Set self.next_pending to the next task that has not yet been executed.
69 def get_next_pending(self):
70 try:
71 return self.pending.next()
72 except StopIteration:
73 return None
75 def run_all(self):
76 # The currently running tasks: a set of Task instances.
77 running = set()
78 with open(os.devnull, 'r') as devnull:
79 while True:
80 while len(running) < self.job_limit and self.next_pending:
81 t = self.next_pending
82 p = Popen(t.cmd(), bufsize=16384,
83 stdin=devnull, stdout=PIPE, stderr=PIPE,
84 cwd=self.cwd)
86 # Put the stdout and stderr pipes in non-blocking mode. See
87 # the post-'select' code below for details.
88 flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
89 fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
90 flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
91 fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
93 t.start(p, time.time() + self.timeout)
94 running.add(t)
95 self.next_pending = self.get_next_pending()
97 # If we have no tasks running, and the above wasn't able to
98 # start any new ones, then we must be done!
99 if not running:
100 break
102 # How many seconds do we have until the earliest deadline?
103 now = time.time()
104 secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
106 # Wait for output or a timeout.
107 stdouts_and_stderrs = ([t.pipe.stdout for t in running]
108 + [t.pipe.stderr for t in running])
109 (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
110 finished = set()
111 terminate = set()
112 for t in running:
113 # Since we've placed the pipes in non-blocking mode, these
114 # 'read's will simply return as many bytes as are available,
115 # rather than blocking until they have accumulated the full
116 # amount requested (or reached EOF). The 'read's should
117 # never throw, since 'select' has told us there was
118 # something available.
119 if t.pipe.stdout in readable:
120 output = t.pipe.stdout.read(16384)
121 if output != "":
122 try:
123 t.onStdout(output)
124 except TerminateTask:
125 terminate.add(t)
126 if t.pipe.stderr in readable:
127 output = t.pipe.stderr.read(16384)
128 if output != "":
129 try:
130 t.onStderr(output)
131 except TerminateTask:
132 terminate.add(t)
133 else:
134 # We assume that, once a task has closed its stderr,
135 # it will soon terminate. If a task closes its
136 # stderr and then hangs, we'll hang too, here.
137 t.pipe.wait()
138 t.onFinished(t.pipe.returncode)
139 finished.add(t)
140 # Remove the finished tasks from the running set. (Do this here
141 # to avoid mutating the set while iterating over it.)
142 running -= finished
144 # Terminate any tasks whose handlers have asked us to do so.
145 for t in terminate:
146 t.pipe.terminate()
147 t.pipe.wait()
148 running.remove(t)
150 # Terminate any tasks which have missed their deadline.
151 finished = set()
152 for t in running:
153 if now >= t.deadline:
154 t.pipe.terminate()
155 t.pipe.wait()
156 t.onTimeout()
157 finished.add(t)
158 # Remove the finished tasks from the running set. (Do this here
159 # to avoid mutating the set while iterating over it.)
160 running -= finished
161 return None
163 def get_cpu_count():
164 """
165 Guess at a reasonable parallelism count to set as the default for the
166 current machine and run.
167 """
168 # Python 2.6+
169 try:
170 import multiprocessing
171 return multiprocessing.cpu_count()
172 except (ImportError,NotImplementedError):
173 pass
175 # POSIX
176 try:
177 res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
178 if res > 0:
179 return res
180 except (AttributeError,ValueError):
181 pass
183 # Windows
184 try:
185 res = int(os.environ['NUMBER_OF_PROCESSORS'])
186 if res > 0:
187 return res
188 except (KeyError, ValueError):
189 pass
191 return 1
193 if __name__ == '__main__':
194 # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
195 def sleep_sort(ns, timeout):
196 sorted=[]
197 class SortableTask(TaskPool.Task):
198 def __init__(self, n):
199 super(SortableTask, self).__init__()
200 self.n = n
201 def start(self, pipe, deadline):
202 super(SortableTask, self).start(pipe, deadline)
203 def cmd(self):
204 return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
205 def onStdout(self, text):
206 print '%d stdout: %r' % (self.n, text)
207 def onStderr(self, text):
208 print '%d stderr: %r' % (self.n, text)
209 def onFinished(self, returncode):
210 print '%d (rc=%d)' % (self.n, returncode)
211 sorted.append(self.n)
212 def onTimeout(self):
213 print '%d timed out' % (self.n,)
215 p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
216 p.run_all()
217 return sorted
219 print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))