media/webrtc/trunk/build/android/pylib/python_test_sharder.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/media/webrtc/trunk/build/android/pylib/python_test_sharder.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,203 @@
     1.4 +# Copyright (c) 2012 The Chromium Authors. All rights reserved.
     1.5 +# Use of this source code is governed by a BSD-style license that can be
     1.6 +# found in the LICENSE file.
     1.7 +
     1.8 +"""Takes care of sharding the python-drive tests in multiple devices."""
     1.9 +
    1.10 +import copy
    1.11 +import logging
    1.12 +import multiprocessing
    1.13 +
    1.14 +from python_test_caller import CallPythonTest
    1.15 +from run_java_tests import FatalTestException
    1.16 +import sharded_tests_queue
    1.17 +from test_result import TestResults
    1.18 +
    1.19 +
    1.20 +def SetTestsContainer(tests_container):
    1.21 +  """Sets PythonTestSharder as a top-level field.
    1.22 +
    1.23 +  PythonTestSharder uses multiprocessing.Pool, which creates a pool of
    1.24 +  processes. This is used to initialize each worker in the pool, ensuring that
    1.25 +  each worker has access to this shared pool of tests.
    1.26 +
    1.27 +  The multiprocessing module requires that this be a top-level method.
    1.28 +
    1.29 +  Args:
    1.30 +    tests_container: the container for all the tests.
    1.31 +  """
    1.32 +  PythonTestSharder.tests_container = tests_container
    1.33 +
    1.34 +
    1.35 +def _DefaultRunnable(test_runner):
    1.36 +  """A default runnable for a PythonTestRunner.
    1.37 +
    1.38 +  Args:
    1.39 +    test_runner: A PythonTestRunner which will run tests.
    1.40 +
    1.41 +  Returns:
    1.42 +    The test results.
    1.43 +  """
    1.44 +  return test_runner.RunTests()
    1.45 +
    1.46 +
    1.47 +class PythonTestRunner(object):
    1.48 +  """Thin wrapper around a list of PythonTestBase instances.
    1.49 +
    1.50 +  This is meant to be a long-lived object which can run multiple Python tests
    1.51 +  within its lifetime. Tests will receive the device_id and shard_index.
    1.52 +
    1.53 +  The shard index affords the ability to create unique port numbers (e.g.
    1.54 +  DEFAULT_PORT + shard_index) if the test so wishes.
    1.55 +  """
    1.56 +
    1.57 +  def __init__(self, options):
    1.58 +    """Constructor.
    1.59 +
    1.60 +    Args:
    1.61 +      options: Options to use for setting up tests.
    1.62 +    """
    1.63 +    self.options = options
    1.64 +
    1.65 +  def RunTests(self):
    1.66 +    """Runs tests from the shared pool of tests, aggregating results.
    1.67 +
    1.68 +    Returns:
    1.69 +      A list of test results for all of the tests which this runner executed.
    1.70 +    """
    1.71 +    tests = PythonTestSharder.tests_container
    1.72 +
    1.73 +    results = []
    1.74 +    for t in tests:
    1.75 +      res = CallPythonTest(t, self.options)
    1.76 +      results.append(res)
    1.77 +
    1.78 +    return TestResults.FromTestResults(results)
    1.79 +
    1.80 +
    1.81 +class PythonTestSharder(object):
    1.82 +  """Runs Python tests in parallel on multiple devices.
    1.83 +
    1.84 +  This is lifted more or less wholesale from BaseTestRunner.
    1.85 +
    1.86 +  Under the covers, it creates a pool of long-lived PythonTestRunners, which
    1.87 +  execute tests from the pool of tests.
    1.88 +
    1.89 +  Args:
    1.90 +    attached_devices: a list of device IDs attached to the host.
    1.91 +    available_tests: a list of tests to run which subclass PythonTestBase.
    1.92 +    options: Options to use for setting up tests.
    1.93 +
    1.94 +  Returns:
    1.95 +    An aggregated list of test results.
    1.96 +  """
    1.97 +  tests_container = None
    1.98 +
    1.99 +  def __init__(self, attached_devices, available_tests, options):
   1.100 +    self.options = options
   1.101 +    self.attached_devices = attached_devices
   1.102 +    self.retries = options.shard_retries
   1.103 +    self.tests = available_tests
   1.104 +
   1.105 +  def _SetupSharding(self, tests):
   1.106 +    """Creates the shared pool of tests and makes it available to test runners.
   1.107 +
   1.108 +    Args:
   1.109 +      tests: the list of tests which will be consumed by workers.
   1.110 +    """
   1.111 +    SetTestsContainer(sharded_tests_queue.ShardedTestsQueue(
   1.112 +        len(self.attached_devices), tests))
   1.113 +
   1.114 +  def RunShardedTests(self):
   1.115 +    """Runs tests in parallel using a pool of workers.
   1.116 +
   1.117 +    Returns:
   1.118 +      A list of test results aggregated from all test runs.
   1.119 +    """
   1.120 +    logging.warning('*' * 80)
   1.121 +    logging.warning('Sharding in ' + str(len(self.attached_devices)) +
   1.122 +                    ' devices.')
   1.123 +    logging.warning('Note that the output is not synchronized.')
   1.124 +    logging.warning('Look for the "Final result" banner in the end.')
   1.125 +    logging.warning('*' * 80)
   1.126 +    all_passed = []
   1.127 +    test_results = TestResults()
   1.128 +    tests_to_run = self.tests
   1.129 +    for retry in xrange(self.retries):
   1.130 +      logging.warning('Try %d of %d', retry + 1, self.retries)
   1.131 +      self._SetupSharding(self.tests)
   1.132 +      test_runners = self._MakeTestRunners(self.attached_devices)
   1.133 +      logging.warning('Starting...')
   1.134 +      pool = multiprocessing.Pool(len(self.attached_devices),
   1.135 +                                  SetTestsContainer,
   1.136 +                                  [PythonTestSharder.tests_container])
   1.137 +
   1.138 +      # List of TestResults objects from each test execution.
   1.139 +      try:
   1.140 +        results_lists = pool.map(_DefaultRunnable, test_runners)
   1.141 +      except Exception:
   1.142 +        logging.exception('Unable to run tests. Something with the '
   1.143 +                          'PythonTestRunners has gone wrong.')
   1.144 +        raise FatalTestException('PythonTestRunners were unable to run tests.')
   1.145 +
   1.146 +      test_results = TestResults.FromTestResults(results_lists)
   1.147 +      # Accumulate passing results.
   1.148 +      all_passed += test_results.ok
   1.149 +      # If we have failed tests, map them to tests to retry.
   1.150 +      failed_tests = test_results.GetAllBroken()
   1.151 +      tests_to_run = self._GetTestsToRetry(self.tests,
   1.152 +                                           failed_tests)
   1.153 +
   1.154 +      # Bail out early if we have no more tests. This can happen if all tests
   1.155 +      # pass before we're out of retries, for example.
   1.156 +      if not tests_to_run:
   1.157 +        break
   1.158 +
   1.159 +    final_results = TestResults()
   1.160 +    # all_passed has accumulated all passing test results.
   1.161 +    # test_results will have the results from the most recent run, which could
   1.162 +    # include a variety of failure modes (unknown, crashed, failed, etc).
   1.163 +    final_results = test_results
   1.164 +    final_results.ok = all_passed
   1.165 +
   1.166 +    return final_results
   1.167 +
   1.168 +  def _MakeTestRunners(self, attached_devices):
   1.169 +    """Initialize and return a list of PythonTestRunners.
   1.170 +
   1.171 +    Args:
   1.172 +      attached_devices: list of device IDs attached to host.
   1.173 +
   1.174 +    Returns:
   1.175 +      A list of PythonTestRunners, one for each device.
   1.176 +    """
   1.177 +    test_runners = []
   1.178 +    for index, device in enumerate(attached_devices):
   1.179 +      logging.warning('*' * 80)
   1.180 +      logging.warning('Creating shard %d for %s', index, device)
   1.181 +      logging.warning('*' * 80)
   1.182 +      # Bind the PythonTestRunner to a device & shard index. Give it the
   1.183 +      # runnable which it will use to actually execute the tests.
   1.184 +      test_options = copy.deepcopy(self.options)
   1.185 +      test_options.ensure_value('device_id', device)
   1.186 +      test_options.ensure_value('shard_index', index)
   1.187 +      test_runner = PythonTestRunner(test_options)
   1.188 +      test_runners.append(test_runner)
   1.189 +
   1.190 +    return test_runners
   1.191 +
   1.192 +  def _GetTestsToRetry(self, available_tests, failed_tests):
   1.193 +    """Infers a list of tests to retry from failed tests and available tests.
   1.194 +
   1.195 +    Args:
   1.196 +      available_tests: a list of tests which subclass PythonTestBase.
   1.197 +      failed_tests: a list of SingleTestResults representing failed tests.
   1.198 +
   1.199 +    Returns:
   1.200 +      A list of test objects which correspond to test names found in
   1.201 +      failed_tests, or an empty list if there is no correspondence.
   1.202 +    """
   1.203 +    failed_test_names = map(lambda t: t.test_name, failed_tests)
   1.204 +    tests_to_retry = [t for t in available_tests
   1.205 +                      if t.qualified_name in failed_test_names]
   1.206 +    return tests_to_retry

mercurial