|
1 # Multiprocess activities with a push-driven divide-process-collect model. |
|
2 |
|
3 from threading import Thread, Lock |
|
4 from Queue import Queue, Empty |
|
5 from datetime import datetime |
|
6 |
|
7 class Source: |
|
8 def __init__(self, task_list, results, timeout, verbose = False): |
|
9 self.tasks = Queue() |
|
10 for task in task_list: |
|
11 self.tasks.put_nowait(task) |
|
12 |
|
13 self.results = results |
|
14 self.timeout = timeout |
|
15 self.verbose = verbose |
|
16 |
|
17 def start(self, worker_count): |
|
18 t0 = datetime.now() |
|
19 |
|
20 sink = Sink(self.results) |
|
21 self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ] |
|
22 if self.verbose: print '[P] Starting workers.' |
|
23 for w in self.workers: |
|
24 w.t0 = t0 |
|
25 w.start() |
|
26 ans = self.join_workers() |
|
27 if self.verbose: print '[P] Finished.' |
|
28 return ans |
|
29 |
|
30 def join_workers(self): |
|
31 try: |
|
32 for w in self.workers: |
|
33 w.join(20000) |
|
34 return True |
|
35 except KeyboardInterrupt: |
|
36 for w in self.workers: |
|
37 w.stop = True |
|
38 return False |
|
39 |
|
40 class Sink: |
|
41 def __init__(self, results): |
|
42 self.results = results |
|
43 self.lock = Lock() |
|
44 |
|
45 def push(self, result): |
|
46 self.lock.acquire() |
|
47 try: |
|
48 self.results.push(result) |
|
49 finally: |
|
50 self.lock.release() |
|
51 |
|
52 class Worker(Thread): |
|
53 def __init__(self, id, tasks, sink, timeout, verbose): |
|
54 Thread.__init__(self) |
|
55 self.setDaemon(True) |
|
56 self.id = id |
|
57 self.tasks = tasks |
|
58 self.sink = sink |
|
59 self.timeout = timeout |
|
60 self.verbose = verbose |
|
61 |
|
62 self.thread = None |
|
63 self.stop = False |
|
64 |
|
65 def log(self, msg): |
|
66 if self.verbose: |
|
67 dd = datetime.now() - self.t0 |
|
68 dt = dd.seconds + 1e-6 * dd.microseconds |
|
69 print '[W%d %.3f] %s' % (self.id, dt, msg) |
|
70 |
|
71 def run(self): |
|
72 try: |
|
73 while True: |
|
74 if self.stop: |
|
75 break |
|
76 self.log('Get next task.') |
|
77 task = self.tasks.get(False) |
|
78 self.log('Start task %s.'%str(task)) |
|
79 result = task.run(task.js_cmd_prefix, self.timeout) |
|
80 self.log('Finished task.') |
|
81 self.sink.push(result) |
|
82 self.log('Pushed result.') |
|
83 except Empty: |
|
84 pass |
|
85 |
|
86 def run_all_tests(tests, results, options): |
|
87 pipeline = Source(tests, results, options.timeout, False) |
|
88 return pipeline.start(options.worker_count) |
|
89 |