js/src/gdb/taskpool.py

Sat, 03 Jan 2015 20:18:00 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Sat, 03 Jan 2015 20:18:00 +0100
branch
TOR_BUG_3246
changeset 7
129ffea94266
permissions
-rw-r--r--

Conditionally enable double key logic according to:
private browsing mode or privacy.thirdparty.isolate preference and
implement in GetCookieStringCommon and FindCookie where it counts...
With some reservations of how to convince FindCookie users to test
condition and pass a nullptr when disabling double key logic.

     1 import fcntl, os, select, time
     2 from subprocess import Popen, PIPE
     4 # Run a series of subprocesses. Try to keep up to a certain number going in
     5 # parallel at any given time. Enforce time limits.
     6 #
     7 # This is implemented using non-blocking I/O, and so is Unix-specific.
     8 #
     9 # We assume that, if a task closes its standard error, then it's safe to
    10 # wait for it to terminate. So an ill-behaved task that closes its standard
    11 # output and then hangs will hang us, as well. However, as it takes special
    12 # effort to close one's standard output, this seems unlikely to be a
    13 # problem in practice.
    14 class TaskPool(object):
    16     # A task we should run in a subprocess. Users should subclass this and
    17     # fill in the methods as given.
    18     class Task(object):
    19         def __init__(self):
    20             self.pipe = None
    21             self.start_time = None
    23         # Record that this task is running, with |pipe| as its Popen object,
    24         # and should time out at |deadline|.
    25         def start(self, pipe, deadline):
    26             self.pipe = pipe
    27             self.deadline = deadline
    29         # Return a shell command (a string or sequence of arguments) to be
    30         # passed to Popen to run the task. The command will be given
    31         # /dev/null as its standard input, and pipes as its standard output
    32         # and error.
    33         def cmd(self):
    34             raise NotImplementedError
    36         # TaskPool calls this method to report that the process wrote
    37         # |string| to its standard output.
    38         def onStdout(self, string):
    39             raise NotImplementedError
    41         # TaskPool calls this method to report that the process wrote
    42         # |string| to its standard error.
    43         def onStderr(self, string):
    44             raise NotImplementedError
    46         # TaskPool calls this method to report that the process terminated,
    47         # yielding |returncode|.
    48         def onFinished(self, returncode):
    49             raise NotImplementedError
    51         # TaskPool calls this method to report that the process timed out and
    52         # was killed.
    53         def onTimeout(self):
    54             raise NotImplementedError
    56     # If a task output handler (onStdout, onStderr) throws this, we terminate
    57     # the task.
    58     class TerminateTask(Exception):
    59         pass
    61     def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
    62         self.pending = iter(tasks)
    63         self.cwd = cwd
    64         self.job_limit = job_limit
    65         self.timeout = timeout
    66         self.next_pending = self.get_next_pending()
    68     # Set self.next_pending to the next task that has not yet been executed.
    69     def get_next_pending(self):
    70         try:
    71             return self.pending.next()
    72         except StopIteration:
    73             return None
    75     def run_all(self):
    76         # The currently running tasks: a set of Task instances.
    77         running = set()
    78         with open(os.devnull, 'r') as devnull:
    79             while True:
    80                 while len(running) < self.job_limit and self.next_pending:
    81                     t = self.next_pending
    82                     p = Popen(t.cmd(), bufsize=16384,
    83                               stdin=devnull, stdout=PIPE, stderr=PIPE,
    84                               cwd=self.cwd)
    86                     # Put the stdout and stderr pipes in non-blocking mode. See
    87                     # the post-'select' code below for details.
    88                     flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    89                     fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
    90                     flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
    91                     fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
    93                     t.start(p, time.time() + self.timeout)
    94                     running.add(t)
    95                     self.next_pending = self.get_next_pending()
    97                 # If we have no tasks running, and the above wasn't able to
    98                 # start any new ones, then we must be done!
    99                 if not running:
   100                     break
   102                 # How many seconds do we have until the earliest deadline?
   103                 now = time.time()
   104                 secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
   106                 # Wait for output or a timeout.
   107                 stdouts_and_stderrs = ([t.pipe.stdout for t in running]
   108                                      + [t.pipe.stderr for t in running])
   109                 (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
   110                 finished = set()
   111                 terminate = set()
   112                 for t in running:
   113                     # Since we've placed the pipes in non-blocking mode, these
   114                     # 'read's will simply return as many bytes as are available,
   115                     # rather than blocking until they have accumulated the full
   116                     # amount requested (or reached EOF). The 'read's should
   117                     # never throw, since 'select' has told us there was
   118                     # something available.
   119                     if t.pipe.stdout in readable:
   120                         output = t.pipe.stdout.read(16384)
   121                         if output != "":
   122                             try:
   123                                 t.onStdout(output)
   124                             except TerminateTask:
   125                                 terminate.add(t)
   126                     if t.pipe.stderr in readable:
   127                         output = t.pipe.stderr.read(16384)
   128                         if output != "":
   129                             try:
   130                                 t.onStderr(output)
   131                             except TerminateTask:
   132                                 terminate.add(t)
   133                         else:
   134                             # We assume that, once a task has closed its stderr,
   135                             # it will soon terminate. If a task closes its
   136                             # stderr and then hangs, we'll hang too, here.
   137                             t.pipe.wait()
   138                             t.onFinished(t.pipe.returncode)
   139                             finished.add(t)
   140                 # Remove the finished tasks from the running set. (Do this here
   141                 # to avoid mutating the set while iterating over it.)
   142                 running -= finished
   144                 # Terminate any tasks whose handlers have asked us to do so.
   145                 for t in terminate:
   146                     t.pipe.terminate()
   147                     t.pipe.wait()
   148                     running.remove(t)
   150                 # Terminate any tasks which have missed their deadline.
   151                 finished = set()
   152                 for t in running:
   153                     if now >= t.deadline:
   154                         t.pipe.terminate()
   155                         t.pipe.wait()
   156                         t.onTimeout()
   157                         finished.add(t)
   158                 # Remove the finished tasks from the running set. (Do this here
   159                 # to avoid mutating the set while iterating over it.)
   160                 running -= finished
   161         return None
   163 def get_cpu_count():
   164     """
   165     Guess at a reasonable parallelism count to set as the default for the
   166     current machine and run.
   167     """
   168     # Python 2.6+
   169     try:
   170         import multiprocessing
   171         return multiprocessing.cpu_count()
   172     except (ImportError,NotImplementedError):
   173         pass
   175     # POSIX
   176     try:
   177         res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
   178         if res > 0:
   179             return res
   180     except (AttributeError,ValueError):
   181         pass
   183     # Windows
   184     try:
   185         res = int(os.environ['NUMBER_OF_PROCESSORS'])
   186         if res > 0:
   187             return res
   188     except (KeyError, ValueError):
   189         pass
   191     return 1
   193 if __name__ == '__main__':
   194     # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
   195     def sleep_sort(ns, timeout):
   196         sorted=[]
   197         class SortableTask(TaskPool.Task):
   198             def __init__(self, n):
   199                 super(SortableTask, self).__init__()
   200                 self.n = n
   201             def start(self, pipe, deadline):
   202                 super(SortableTask, self).start(pipe, deadline)
   203             def cmd(self):
   204                 return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
   205             def onStdout(self, text):
   206                 print '%d stdout: %r' % (self.n, text)
   207             def onStderr(self, text):
   208                 print '%d stderr: %r' % (self.n, text)
   209             def onFinished(self, returncode):
   210                 print '%d (rc=%d)' % (self.n, returncode)
   211                 sorted.append(self.n)
   212             def onTimeout(self):
   213                 print '%d timed out' % (self.n,)
   215         p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
   216         p.run_all()
   217         return sorted
   219     print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))

mercurial