michael@0: # Multiprocess activities with a push-driven divide-process-collect model. michael@0: michael@0: from threading import Thread, Lock michael@0: from Queue import Queue, Empty michael@0: from datetime import datetime michael@0: michael@0: class Source: michael@0: def __init__(self, task_list, results, timeout, verbose = False): michael@0: self.tasks = Queue() michael@0: for task in task_list: michael@0: self.tasks.put_nowait(task) michael@0: michael@0: self.results = results michael@0: self.timeout = timeout michael@0: self.verbose = verbose michael@0: michael@0: def start(self, worker_count): michael@0: t0 = datetime.now() michael@0: michael@0: sink = Sink(self.results) michael@0: self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ] michael@0: if self.verbose: print '[P] Starting workers.' michael@0: for w in self.workers: michael@0: w.t0 = t0 michael@0: w.start() michael@0: ans = self.join_workers() michael@0: if self.verbose: print '[P] Finished.' michael@0: return ans michael@0: michael@0: def join_workers(self): michael@0: try: michael@0: for w in self.workers: michael@0: w.join(20000) michael@0: return True michael@0: except KeyboardInterrupt: michael@0: for w in self.workers: michael@0: w.stop = True michael@0: return False michael@0: michael@0: class Sink: michael@0: def __init__(self, results): michael@0: self.results = results michael@0: self.lock = Lock() michael@0: michael@0: def push(self, result): michael@0: self.lock.acquire() michael@0: try: michael@0: self.results.push(result) michael@0: finally: michael@0: self.lock.release() michael@0: michael@0: class Worker(Thread): michael@0: def __init__(self, id, tasks, sink, timeout, verbose): michael@0: Thread.__init__(self) michael@0: self.setDaemon(True) michael@0: self.id = id michael@0: self.tasks = tasks michael@0: self.sink = sink michael@0: self.timeout = timeout michael@0: self.verbose = verbose michael@0: michael@0: self.thread = None michael@0: self.stop = False michael@0: michael@0: def log(self, msg): michael@0: if self.verbose: michael@0: dd = datetime.now() - self.t0 michael@0: dt = dd.seconds + 1e-6 * dd.microseconds michael@0: print '[W%d %.3f] %s' % (self.id, dt, msg) michael@0: michael@0: def run(self): michael@0: try: michael@0: while True: michael@0: if self.stop: michael@0: break michael@0: self.log('Get next task.') michael@0: task = self.tasks.get(False) michael@0: self.log('Start task %s.'%str(task)) michael@0: result = task.run(task.js_cmd_prefix, self.timeout) michael@0: self.log('Finished task.') michael@0: self.sink.push(result) michael@0: self.log('Pushed result.') michael@0: except Empty: michael@0: pass michael@0: michael@0: def run_all_tests(tests, results, options): michael@0: pipeline = Source(tests, results, options.timeout, False) michael@0: return pipeline.start(options.worker_count) michael@0: