1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/js/src/tests/lib/tasks_win.py Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,89 @@ 1.4 +# Multiprocess activities with a push-driven divide-process-collect model. 1.5 + 1.6 +from threading import Thread, Lock 1.7 +from Queue import Queue, Empty 1.8 +from datetime import datetime 1.9 + 1.10 +class Source: 1.11 + def __init__(self, task_list, results, timeout, verbose = False): 1.12 + self.tasks = Queue() 1.13 + for task in task_list: 1.14 + self.tasks.put_nowait(task) 1.15 + 1.16 + self.results = results 1.17 + self.timeout = timeout 1.18 + self.verbose = verbose 1.19 + 1.20 + def start(self, worker_count): 1.21 + t0 = datetime.now() 1.22 + 1.23 + sink = Sink(self.results) 1.24 + self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ] 1.25 + if self.verbose: print '[P] Starting workers.' 1.26 + for w in self.workers: 1.27 + w.t0 = t0 1.28 + w.start() 1.29 + ans = self.join_workers() 1.30 + if self.verbose: print '[P] Finished.' 1.31 + return ans 1.32 + 1.33 + def join_workers(self): 1.34 + try: 1.35 + for w in self.workers: 1.36 + w.join(20000) 1.37 + return True 1.38 + except KeyboardInterrupt: 1.39 + for w in self.workers: 1.40 + w.stop = True 1.41 + return False 1.42 + 1.43 +class Sink: 1.44 + def __init__(self, results): 1.45 + self.results = results 1.46 + self.lock = Lock() 1.47 + 1.48 + def push(self, result): 1.49 + self.lock.acquire() 1.50 + try: 1.51 + self.results.push(result) 1.52 + finally: 1.53 + self.lock.release() 1.54 + 1.55 +class Worker(Thread): 1.56 + def __init__(self, id, tasks, sink, timeout, verbose): 1.57 + Thread.__init__(self) 1.58 + self.setDaemon(True) 1.59 + self.id = id 1.60 + self.tasks = tasks 1.61 + self.sink = sink 1.62 + self.timeout = timeout 1.63 + self.verbose = verbose 1.64 + 1.65 + self.thread = None 1.66 + self.stop = False 1.67 + 1.68 + def log(self, msg): 1.69 + if self.verbose: 1.70 + dd = datetime.now() - self.t0 1.71 + dt = dd.seconds + 1e-6 * dd.microseconds 1.72 + print '[W%d %.3f] %s' % (self.id, dt, msg) 1.73 + 1.74 + def run(self): 1.75 + try: 1.76 + while True: 1.77 + if self.stop: 1.78 + break 1.79 + self.log('Get next task.') 1.80 + task = self.tasks.get(False) 1.81 + self.log('Start task %s.'%str(task)) 1.82 + result = task.run(task.js_cmd_prefix, self.timeout) 1.83 + self.log('Finished task.') 1.84 + self.sink.push(result) 1.85 + self.log('Pushed result.') 1.86 + except Empty: 1.87 + pass 1.88 + 1.89 +def run_all_tests(tests, results, options): 1.90 + pipeline = Source(tests, results, options.timeout, False) 1.91 + return pipeline.start(options.worker_count) 1.92 +