js/src/tests/lib/tasks_win.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/js/src/tests/lib/tasks_win.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,89 @@
     1.4 +# Multiprocess activities with a push-driven divide-process-collect model.
     1.5 +
     1.6 +from threading import Thread, Lock
     1.7 +from Queue import Queue, Empty
     1.8 +from datetime import datetime
     1.9 +
    1.10 +class Source:
    1.11 +    def __init__(self, task_list, results, timeout, verbose = False):
    1.12 +        self.tasks = Queue()
    1.13 +        for task in task_list:
    1.14 +            self.tasks.put_nowait(task)
    1.15 +
    1.16 +        self.results = results
    1.17 +        self.timeout = timeout
    1.18 +        self.verbose = verbose
    1.19 +
    1.20 +    def start(self, worker_count):
    1.21 +        t0 = datetime.now()
    1.22 +
    1.23 +        sink = Sink(self.results)
    1.24 +        self.workers = [ Worker(_+1, self.tasks, sink, self.timeout, self.verbose) for _ in range(worker_count) ]
    1.25 +        if self.verbose: print '[P] Starting workers.'
    1.26 +        for w in self.workers:
    1.27 +            w.t0 = t0
    1.28 +            w.start()
    1.29 +        ans = self.join_workers()
    1.30 +        if self.verbose: print '[P] Finished.'
    1.31 +        return ans
    1.32 +
    1.33 +    def join_workers(self):
    1.34 +        try:
    1.35 +            for w in self.workers:
    1.36 +                w.join(20000)
    1.37 +            return True
    1.38 +        except KeyboardInterrupt:
    1.39 +            for w in self.workers:
    1.40 +                w.stop = True
    1.41 +            return False
    1.42 +
    1.43 +class Sink:
    1.44 +    def __init__(self, results):
    1.45 +        self.results = results
    1.46 +        self.lock = Lock()
    1.47 +
    1.48 +    def push(self, result):
    1.49 +        self.lock.acquire()
    1.50 +        try:
    1.51 +            self.results.push(result)
    1.52 +        finally:
    1.53 +            self.lock.release()
    1.54 +
    1.55 +class Worker(Thread):
    1.56 +    def __init__(self, id, tasks, sink, timeout, verbose):
    1.57 +        Thread.__init__(self)
    1.58 +        self.setDaemon(True)
    1.59 +        self.id = id
    1.60 +        self.tasks = tasks
    1.61 +        self.sink = sink
    1.62 +        self.timeout = timeout
    1.63 +        self.verbose = verbose
    1.64 +
    1.65 +        self.thread = None
    1.66 +        self.stop = False
    1.67 +
    1.68 +    def log(self, msg):
    1.69 +        if self.verbose:
    1.70 +            dd = datetime.now() - self.t0
    1.71 +            dt = dd.seconds + 1e-6 * dd.microseconds
    1.72 +            print '[W%d %.3f] %s' % (self.id, dt, msg)
    1.73 +
    1.74 +    def run(self):
    1.75 +        try:
    1.76 +            while True:
    1.77 +                if self.stop:
    1.78 +                    break
    1.79 +                self.log('Get next task.')
    1.80 +                task = self.tasks.get(False)
    1.81 +                self.log('Start task %s.'%str(task))
    1.82 +                result = task.run(task.js_cmd_prefix, self.timeout)
    1.83 +                self.log('Finished task.')
    1.84 +                self.sink.push(result)
    1.85 +                self.log('Pushed result.')
    1.86 +        except Empty:
    1.87 +            pass
    1.88 +
    1.89 +def run_all_tests(tests, results, options):
    1.90 +    pipeline = Source(tests, results, options.timeout, False)
    1.91 +    return pipeline.start(options.worker_count)
    1.92 +

mercurial