media/webrtc/trunk/build/android/pylib/python_test_sharder.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

     1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
     2 # Use of this source code is governed by a BSD-style license that can be
     3 # found in the LICENSE file.
     5 """Takes care of sharding the python-drive tests in multiple devices."""
     7 import copy
     8 import logging
     9 import multiprocessing
    11 from python_test_caller import CallPythonTest
    12 from run_java_tests import FatalTestException
    13 import sharded_tests_queue
    14 from test_result import TestResults
    17 def SetTestsContainer(tests_container):
    18   """Sets PythonTestSharder as a top-level field.
    20   PythonTestSharder uses multiprocessing.Pool, which creates a pool of
    21   processes. This is used to initialize each worker in the pool, ensuring that
    22   each worker has access to this shared pool of tests.
    24   The multiprocessing module requires that this be a top-level method.
    26   Args:
    27     tests_container: the container for all the tests.
    28   """
    29   PythonTestSharder.tests_container = tests_container
    32 def _DefaultRunnable(test_runner):
    33   """A default runnable for a PythonTestRunner.
    35   Args:
    36     test_runner: A PythonTestRunner which will run tests.
    38   Returns:
    39     The test results.
    40   """
    41   return test_runner.RunTests()
    44 class PythonTestRunner(object):
    45   """Thin wrapper around a list of PythonTestBase instances.
    47   This is meant to be a long-lived object which can run multiple Python tests
    48   within its lifetime. Tests will receive the device_id and shard_index.
    50   The shard index affords the ability to create unique port numbers (e.g.
    51   DEFAULT_PORT + shard_index) if the test so wishes.
    52   """
    54   def __init__(self, options):
    55     """Constructor.
    57     Args:
    58       options: Options to use for setting up tests.
    59     """
    60     self.options = options
    62   def RunTests(self):
    63     """Runs tests from the shared pool of tests, aggregating results.
    65     Returns:
    66       A list of test results for all of the tests which this runner executed.
    67     """
    68     tests = PythonTestSharder.tests_container
    70     results = []
    71     for t in tests:
    72       res = CallPythonTest(t, self.options)
    73       results.append(res)
    75     return TestResults.FromTestResults(results)
    78 class PythonTestSharder(object):
    79   """Runs Python tests in parallel on multiple devices.
    81   This is lifted more or less wholesale from BaseTestRunner.
    83   Under the covers, it creates a pool of long-lived PythonTestRunners, which
    84   execute tests from the pool of tests.
    86   Args:
    87     attached_devices: a list of device IDs attached to the host.
    88     available_tests: a list of tests to run which subclass PythonTestBase.
    89     options: Options to use for setting up tests.
    91   Returns:
    92     An aggregated list of test results.
    93   """
    94   tests_container = None
    96   def __init__(self, attached_devices, available_tests, options):
    97     self.options = options
    98     self.attached_devices = attached_devices
    99     self.retries = options.shard_retries
   100     self.tests = available_tests
   102   def _SetupSharding(self, tests):
   103     """Creates the shared pool of tests and makes it available to test runners.
   105     Args:
   106       tests: the list of tests which will be consumed by workers.
   107     """
   108     SetTestsContainer(sharded_tests_queue.ShardedTestsQueue(
   109         len(self.attached_devices), tests))
   111   def RunShardedTests(self):
   112     """Runs tests in parallel using a pool of workers.
   114     Returns:
   115       A list of test results aggregated from all test runs.
   116     """
   117     logging.warning('*' * 80)
   118     logging.warning('Sharding in ' + str(len(self.attached_devices)) +
   119                     ' devices.')
   120     logging.warning('Note that the output is not synchronized.')
   121     logging.warning('Look for the "Final result" banner in the end.')
   122     logging.warning('*' * 80)
   123     all_passed = []
   124     test_results = TestResults()
   125     tests_to_run = self.tests
   126     for retry in xrange(self.retries):
   127       logging.warning('Try %d of %d', retry + 1, self.retries)
   128       self._SetupSharding(self.tests)
   129       test_runners = self._MakeTestRunners(self.attached_devices)
   130       logging.warning('Starting...')
   131       pool = multiprocessing.Pool(len(self.attached_devices),
   132                                   SetTestsContainer,
   133                                   [PythonTestSharder.tests_container])
   135       # List of TestResults objects from each test execution.
   136       try:
   137         results_lists = pool.map(_DefaultRunnable, test_runners)
   138       except Exception:
   139         logging.exception('Unable to run tests. Something with the '
   140                           'PythonTestRunners has gone wrong.')
   141         raise FatalTestException('PythonTestRunners were unable to run tests.')
   143       test_results = TestResults.FromTestResults(results_lists)
   144       # Accumulate passing results.
   145       all_passed += test_results.ok
   146       # If we have failed tests, map them to tests to retry.
   147       failed_tests = test_results.GetAllBroken()
   148       tests_to_run = self._GetTestsToRetry(self.tests,
   149                                            failed_tests)
   151       # Bail out early if we have no more tests. This can happen if all tests
   152       # pass before we're out of retries, for example.
   153       if not tests_to_run:
   154         break
   156     final_results = TestResults()
   157     # all_passed has accumulated all passing test results.
   158     # test_results will have the results from the most recent run, which could
   159     # include a variety of failure modes (unknown, crashed, failed, etc).
   160     final_results = test_results
   161     final_results.ok = all_passed
   163     return final_results
   165   def _MakeTestRunners(self, attached_devices):
   166     """Initialize and return a list of PythonTestRunners.
   168     Args:
   169       attached_devices: list of device IDs attached to host.
   171     Returns:
   172       A list of PythonTestRunners, one for each device.
   173     """
   174     test_runners = []
   175     for index, device in enumerate(attached_devices):
   176       logging.warning('*' * 80)
   177       logging.warning('Creating shard %d for %s', index, device)
   178       logging.warning('*' * 80)
   179       # Bind the PythonTestRunner to a device & shard index. Give it the
   180       # runnable which it will use to actually execute the tests.
   181       test_options = copy.deepcopy(self.options)
   182       test_options.ensure_value('device_id', device)
   183       test_options.ensure_value('shard_index', index)
   184       test_runner = PythonTestRunner(test_options)
   185       test_runners.append(test_runner)
   187     return test_runners
   189   def _GetTestsToRetry(self, available_tests, failed_tests):
   190     """Infers a list of tests to retry from failed tests and available tests.
   192     Args:
   193       available_tests: a list of tests which subclass PythonTestBase.
   194       failed_tests: a list of SingleTestResults representing failed tests.
   196     Returns:
   197       A list of test objects which correspond to test names found in
   198       failed_tests, or an empty list if there is no correspondence.
   199     """
   200     failed_test_names = map(lambda t: t.test_name, failed_tests)
   201     tests_to_retry = [t for t in available_tests
   202                       if t.qualified_name in failed_test_names]
   203     return tests_to_retry

mercurial