Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 # Multiprocess activities with a push-driven divide-process-collect model.
3 from threading import Thread, Lock
4 from Queue import Queue, Empty
5 from datetime import datetime
7 class Source:
8 def __init__(self, task_list, results, timeout, verbose = False):
9 self.tasks = Queue()
10 for task in task_list:
11 self.tasks.put_nowait(task)
13 self.results = results
14 self.timeout = timeout
15 self.verbose = verbose
17 def start(self, worker_count):
18 t0 = datetime.now()
20 sink = Sink(self.results)
21 self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ]
22 if self.verbose: print '[P] Starting workers.'
23 for w in self.workers:
24 w.t0 = t0
25 w.start()
26 ans = self.join_workers()
27 if self.verbose: print '[P] Finished.'
28 return ans
30 def join_workers(self):
31 try:
32 for w in self.workers:
33 w.join(20000)
34 return True
35 except KeyboardInterrupt:
36 for w in self.workers:
37 w.stop = True
38 return False
40 class Sink:
41 def __init__(self, results):
42 self.results = results
43 self.lock = Lock()
45 def push(self, result):
46 self.lock.acquire()
47 try:
48 self.results.push(result)
49 finally:
50 self.lock.release()
52 class Worker(Thread):
53 def __init__(self, id, tasks, sink, timeout, verbose):
54 Thread.__init__(self)
55 self.setDaemon(True)
56 self.id = id
57 self.tasks = tasks
58 self.sink = sink
59 self.timeout = timeout
60 self.verbose = verbose
62 self.thread = None
63 self.stop = False
65 def log(self, msg):
66 if self.verbose:
67 dd = datetime.now() - self.t0
68 dt = dd.seconds + 1e-6 * dd.microseconds
69 print '[W%d %.3f] %s' % (self.id, dt, msg)
71 def run(self):
72 try:
73 while True:
74 if self.stop:
75 break
76 self.log('Get next task.')
77 task = self.tasks.get(False)
78 self.log('Start task %s.'%str(task))
79 result = task.run(task.js_cmd_prefix, self.timeout)
80 self.log('Finished task.')
81 self.sink.push(result)
82 self.log('Pushed result.')
83 except Empty:
84 pass
86 def run_all_tests(tests, results, options):
87 pipeline = Source(tests, results, options.timeout, False)
88 return pipeline.start(options.worker_count)