js/src/tests/lib/tasks_win.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 # Multiprocess activities with a push-driven divide-process-collect model.
michael@0 2
michael@0 3 from threading import Thread, Lock
michael@0 4 from Queue import Queue, Empty
michael@0 5 from datetime import datetime
michael@0 6
michael@0 7 class Source:
michael@0 8 def __init__(self, task_list, results, timeout, verbose = False):
michael@0 9 self.tasks = Queue()
michael@0 10 for task in task_list:
michael@0 11 self.tasks.put_nowait(task)
michael@0 12
michael@0 13 self.results = results
michael@0 14 self.timeout = timeout
michael@0 15 self.verbose = verbose
michael@0 16
michael@0 17 def start(self, worker_count):
michael@0 18 t0 = datetime.now()
michael@0 19
michael@0 20 sink = Sink(self.results)
michael@0 21 self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ]
michael@0 22 if self.verbose: print '[P] Starting workers.'
michael@0 23 for w in self.workers:
michael@0 24 w.t0 = t0
michael@0 25 w.start()
michael@0 26 ans = self.join_workers()
michael@0 27 if self.verbose: print '[P] Finished.'
michael@0 28 return ans
michael@0 29
michael@0 30 def join_workers(self):
michael@0 31 try:
michael@0 32 for w in self.workers:
michael@0 33 w.join(20000)
michael@0 34 return True
michael@0 35 except KeyboardInterrupt:
michael@0 36 for w in self.workers:
michael@0 37 w.stop = True
michael@0 38 return False
michael@0 39
michael@0 40 class Sink:
michael@0 41 def __init__(self, results):
michael@0 42 self.results = results
michael@0 43 self.lock = Lock()
michael@0 44
michael@0 45 def push(self, result):
michael@0 46 self.lock.acquire()
michael@0 47 try:
michael@0 48 self.results.push(result)
michael@0 49 finally:
michael@0 50 self.lock.release()
michael@0 51
michael@0 52 class Worker(Thread):
michael@0 53 def __init__(self, id, tasks, sink, timeout, verbose):
michael@0 54 Thread.__init__(self)
michael@0 55 self.setDaemon(True)
michael@0 56 self.id = id
michael@0 57 self.tasks = tasks
michael@0 58 self.sink = sink
michael@0 59 self.timeout = timeout
michael@0 60 self.verbose = verbose
michael@0 61
michael@0 62 self.thread = None
michael@0 63 self.stop = False
michael@0 64
michael@0 65 def log(self, msg):
michael@0 66 if self.verbose:
michael@0 67 dd = datetime.now() - self.t0
michael@0 68 dt = dd.seconds + 1e-6 * dd.microseconds
michael@0 69 print '[W%d %.3f] %s' % (self.id, dt, msg)
michael@0 70
michael@0 71 def run(self):
michael@0 72 try:
michael@0 73 while True:
michael@0 74 if self.stop:
michael@0 75 break
michael@0 76 self.log('Get next task.')
michael@0 77 task = self.tasks.get(False)
michael@0 78 self.log('Start task %s.'%str(task))
michael@0 79 result = task.run(task.js_cmd_prefix, self.timeout)
michael@0 80 self.log('Finished task.')
michael@0 81 self.sink.push(result)
michael@0 82 self.log('Pushed result.')
michael@0 83 except Empty:
michael@0 84 pass
michael@0 85
michael@0 86 def run_all_tests(tests, results, options):
michael@0 87 pipeline = Source(tests, results, options.timeout, False)
michael@0 88 return pipeline.start(options.worker_count)
michael@0 89

mercurial