michael@0: # Copyright (c) 2012 The Chromium Authors. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """Takes care of sharding the python-drive tests in multiple devices.""" michael@0: michael@0: import copy michael@0: import logging michael@0: import multiprocessing michael@0: michael@0: from python_test_caller import CallPythonTest michael@0: from run_java_tests import FatalTestException michael@0: import sharded_tests_queue michael@0: from test_result import TestResults michael@0: michael@0: michael@0: def SetTestsContainer(tests_container): michael@0: """Sets PythonTestSharder as a top-level field. michael@0: michael@0: PythonTestSharder uses multiprocessing.Pool, which creates a pool of michael@0: processes. This is used to initialize each worker in the pool, ensuring that michael@0: each worker has access to this shared pool of tests. michael@0: michael@0: The multiprocessing module requires that this be a top-level method. michael@0: michael@0: Args: michael@0: tests_container: the container for all the tests. michael@0: """ michael@0: PythonTestSharder.tests_container = tests_container michael@0: michael@0: michael@0: def _DefaultRunnable(test_runner): michael@0: """A default runnable for a PythonTestRunner. michael@0: michael@0: Args: michael@0: test_runner: A PythonTestRunner which will run tests. michael@0: michael@0: Returns: michael@0: The test results. michael@0: """ michael@0: return test_runner.RunTests() michael@0: michael@0: michael@0: class PythonTestRunner(object): michael@0: """Thin wrapper around a list of PythonTestBase instances. michael@0: michael@0: This is meant to be a long-lived object which can run multiple Python tests michael@0: within its lifetime. Tests will receive the device_id and shard_index. michael@0: michael@0: The shard index affords the ability to create unique port numbers (e.g. michael@0: DEFAULT_PORT + shard_index) if the test so wishes. michael@0: """ michael@0: michael@0: def __init__(self, options): michael@0: """Constructor. michael@0: michael@0: Args: michael@0: options: Options to use for setting up tests. michael@0: """ michael@0: self.options = options michael@0: michael@0: def RunTests(self): michael@0: """Runs tests from the shared pool of tests, aggregating results. michael@0: michael@0: Returns: michael@0: A list of test results for all of the tests which this runner executed. michael@0: """ michael@0: tests = PythonTestSharder.tests_container michael@0: michael@0: results = [] michael@0: for t in tests: michael@0: res = CallPythonTest(t, self.options) michael@0: results.append(res) michael@0: michael@0: return TestResults.FromTestResults(results) michael@0: michael@0: michael@0: class PythonTestSharder(object): michael@0: """Runs Python tests in parallel on multiple devices. michael@0: michael@0: This is lifted more or less wholesale from BaseTestRunner. michael@0: michael@0: Under the covers, it creates a pool of long-lived PythonTestRunners, which michael@0: execute tests from the pool of tests. michael@0: michael@0: Args: michael@0: attached_devices: a list of device IDs attached to the host. michael@0: available_tests: a list of tests to run which subclass PythonTestBase. michael@0: options: Options to use for setting up tests. michael@0: michael@0: Returns: michael@0: An aggregated list of test results. michael@0: """ michael@0: tests_container = None michael@0: michael@0: def __init__(self, attached_devices, available_tests, options): michael@0: self.options = options michael@0: self.attached_devices = attached_devices michael@0: self.retries = options.shard_retries michael@0: self.tests = available_tests michael@0: michael@0: def _SetupSharding(self, tests): michael@0: """Creates the shared pool of tests and makes it available to test runners. michael@0: michael@0: Args: michael@0: tests: the list of tests which will be consumed by workers. michael@0: """ michael@0: SetTestsContainer(sharded_tests_queue.ShardedTestsQueue( michael@0: len(self.attached_devices), tests)) michael@0: michael@0: def RunShardedTests(self): michael@0: """Runs tests in parallel using a pool of workers. michael@0: michael@0: Returns: michael@0: A list of test results aggregated from all test runs. michael@0: """ michael@0: logging.warning('*' * 80) michael@0: logging.warning('Sharding in ' + str(len(self.attached_devices)) + michael@0: ' devices.') michael@0: logging.warning('Note that the output is not synchronized.') michael@0: logging.warning('Look for the "Final result" banner in the end.') michael@0: logging.warning('*' * 80) michael@0: all_passed = [] michael@0: test_results = TestResults() michael@0: tests_to_run = self.tests michael@0: for retry in xrange(self.retries): michael@0: logging.warning('Try %d of %d', retry + 1, self.retries) michael@0: self._SetupSharding(self.tests) michael@0: test_runners = self._MakeTestRunners(self.attached_devices) michael@0: logging.warning('Starting...') michael@0: pool = multiprocessing.Pool(len(self.attached_devices), michael@0: SetTestsContainer, michael@0: [PythonTestSharder.tests_container]) michael@0: michael@0: # List of TestResults objects from each test execution. michael@0: try: michael@0: results_lists = pool.map(_DefaultRunnable, test_runners) michael@0: except Exception: michael@0: logging.exception('Unable to run tests. Something with the ' michael@0: 'PythonTestRunners has gone wrong.') michael@0: raise FatalTestException('PythonTestRunners were unable to run tests.') michael@0: michael@0: test_results = TestResults.FromTestResults(results_lists) michael@0: # Accumulate passing results. michael@0: all_passed += test_results.ok michael@0: # If we have failed tests, map them to tests to retry. michael@0: failed_tests = test_results.GetAllBroken() michael@0: tests_to_run = self._GetTestsToRetry(self.tests, michael@0: failed_tests) michael@0: michael@0: # Bail out early if we have no more tests. This can happen if all tests michael@0: # pass before we're out of retries, for example. michael@0: if not tests_to_run: michael@0: break michael@0: michael@0: final_results = TestResults() michael@0: # all_passed has accumulated all passing test results. michael@0: # test_results will have the results from the most recent run, which could michael@0: # include a variety of failure modes (unknown, crashed, failed, etc). michael@0: final_results = test_results michael@0: final_results.ok = all_passed michael@0: michael@0: return final_results michael@0: michael@0: def _MakeTestRunners(self, attached_devices): michael@0: """Initialize and return a list of PythonTestRunners. michael@0: michael@0: Args: michael@0: attached_devices: list of device IDs attached to host. michael@0: michael@0: Returns: michael@0: A list of PythonTestRunners, one for each device. michael@0: """ michael@0: test_runners = [] michael@0: for index, device in enumerate(attached_devices): michael@0: logging.warning('*' * 80) michael@0: logging.warning('Creating shard %d for %s', index, device) michael@0: logging.warning('*' * 80) michael@0: # Bind the PythonTestRunner to a device & shard index. Give it the michael@0: # runnable which it will use to actually execute the tests. michael@0: test_options = copy.deepcopy(self.options) michael@0: test_options.ensure_value('device_id', device) michael@0: test_options.ensure_value('shard_index', index) michael@0: test_runner = PythonTestRunner(test_options) michael@0: test_runners.append(test_runner) michael@0: michael@0: return test_runners michael@0: michael@0: def _GetTestsToRetry(self, available_tests, failed_tests): michael@0: """Infers a list of tests to retry from failed tests and available tests. michael@0: michael@0: Args: michael@0: available_tests: a list of tests which subclass PythonTestBase. michael@0: failed_tests: a list of SingleTestResults representing failed tests. michael@0: michael@0: Returns: michael@0: A list of test objects which correspond to test names found in michael@0: failed_tests, or an empty list if there is no correspondence. michael@0: """ michael@0: failed_test_names = map(lambda t: t.test_name, failed_tests) michael@0: tests_to_retry = [t for t in available_tests michael@0: if t.qualified_name in failed_test_names] michael@0: return tests_to_retry