media/webrtc/trunk/build/android/pylib/base_test_sharder.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/media/webrtc/trunk/build/android/pylib/base_test_sharder.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,113 @@
     1.4 +# Copyright (c) 2012 The Chromium Authors. All rights reserved.
     1.5 +# Use of this source code is governed by a BSD-style license that can be
     1.6 +# found in the LICENSE file.
     1.7 +
     1.8 +
     1.9 +import logging
    1.10 +import multiprocessing
    1.11 +
    1.12 +from test_result import TestResults
    1.13 +
    1.14 +
    1.15 +def _ShardedTestRunnable(test):
    1.16 +  """Standalone function needed by multiprocessing.Pool."""
    1.17 +  log_format = '[' + test.device + '] # %(asctime)-15s: %(message)s'
    1.18 +  if logging.getLogger().handlers:
    1.19 +    logging.getLogger().handlers[0].setFormatter(logging.Formatter(log_format))
    1.20 +  else:
    1.21 +    logging.basicConfig(format=log_format)
    1.22 +  # Handle SystemExit here since python has a bug to exit current process
    1.23 +  try:
    1.24 +    return test.Run()
    1.25 +  except SystemExit:
    1.26 +    return TestResults()
    1.27 +
    1.28 +def SetTestsContainer(tests_container):
    1.29 +  """Sets tests container.
    1.30 +
    1.31 +  multiprocessing.Queue can't be pickled across processes, so we need to set
    1.32 +  this as a 'global', per process, via multiprocessing.Pool.
    1.33 +  """
    1.34 +  BaseTestSharder.tests_container = tests_container
    1.35 +
    1.36 +
    1.37 +class BaseTestSharder(object):
    1.38 +  """Base class for sharding tests across multiple devices.
    1.39 +
    1.40 +  Args:
    1.41 +    attached_devices: A list of attached devices.
    1.42 +  """
    1.43 +  # See more in SetTestsContainer.
    1.44 +  tests_container = None
    1.45 +
    1.46 +  def __init__(self, attached_devices):
    1.47 +    self.attached_devices = attached_devices
    1.48 +    self.retries = 1
    1.49 +    self.tests = []
    1.50 +
    1.51 +  def CreateShardedTestRunner(self, device, index):
    1.52 +    """Factory function to create a suite-specific test runner.
    1.53 +
    1.54 +    Args:
    1.55 +      device: Device serial where this shard will run
    1.56 +      index: Index of this device in the pool.
    1.57 +
    1.58 +    Returns:
    1.59 +      An object of BaseTestRunner type (that can provide a "Run()" method).
    1.60 +    """
    1.61 +    pass
    1.62 +
    1.63 +  def SetupSharding(self, tests):
    1.64 +    """Called before starting the shards."""
    1.65 +    pass
    1.66 +
    1.67 +  def OnTestsCompleted(self, test_runners, test_results):
    1.68 +    """Notifies that we completed the tests."""
    1.69 +    pass
    1.70 +
    1.71 +  def RunShardedTests(self):
    1.72 +    """Runs the tests in all connected devices.
    1.73 +
    1.74 +    Returns:
    1.75 +      A TestResults object.
    1.76 +    """
    1.77 +    logging.warning('*' * 80)
    1.78 +    logging.warning('Sharding in ' + str(len(self.attached_devices)) +
    1.79 +                    ' devices.')
    1.80 +    logging.warning('Note that the output is not synchronized.')
    1.81 +    logging.warning('Look for the "Final result" banner in the end.')
    1.82 +    logging.warning('*' * 80)
    1.83 +    final_results = TestResults()
    1.84 +    for retry in xrange(self.retries):
    1.85 +      logging.warning('Try %d of %d', retry + 1, self.retries)
    1.86 +      self.SetupSharding(self.tests)
    1.87 +      test_runners = []
    1.88 +      for index, device in enumerate(self.attached_devices):
    1.89 +        logging.warning('*' * 80)
    1.90 +        logging.warning('Creating shard %d for %s', index, device)
    1.91 +        logging.warning('*' * 80)
    1.92 +        test_runner = self.CreateShardedTestRunner(device, index)
    1.93 +        test_runners += [test_runner]
    1.94 +      logging.warning('Starting...')
    1.95 +      pool = multiprocessing.Pool(len(self.attached_devices),
    1.96 +                                  SetTestsContainer,
    1.97 +                                  [BaseTestSharder.tests_container])
    1.98 +      # map can't handle KeyboardInterrupt exception. It's a python bug.
    1.99 +      # So use map_async instead.
   1.100 +      async_results = pool.map_async(_ShardedTestRunnable, test_runners)
   1.101 +      results_lists = async_results.get(999999)
   1.102 +      test_results = TestResults.FromTestResults(results_lists)
   1.103 +      if retry == self.retries - 1:
   1.104 +        all_passed = final_results.ok + test_results.ok
   1.105 +        final_results = test_results
   1.106 +        final_results.ok = all_passed
   1.107 +        break
   1.108 +      else:
   1.109 +        final_results.ok += test_results.ok
   1.110 +        self.tests = []
   1.111 +        for t in test_results.GetAllBroken():
   1.112 +          self.tests += [t.name]
   1.113 +        if not self.tests:
   1.114 +          break
   1.115 +    self.OnTestsCompleted(test_runners, final_results)
   1.116 +    return final_results

mercurial