|
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
|
2 # Use of this source code is governed by a BSD-style license that can be |
|
3 # found in the LICENSE file. |
|
4 |
|
5 """Takes care of sharding the python-drive tests in multiple devices.""" |
|
6 |
|
7 import copy |
|
8 import logging |
|
9 import multiprocessing |
|
10 |
|
11 from python_test_caller import CallPythonTest |
|
12 from run_java_tests import FatalTestException |
|
13 import sharded_tests_queue |
|
14 from test_result import TestResults |
|
15 |
|
16 |
|
17 def SetTestsContainer(tests_container): |
|
18 """Sets PythonTestSharder as a top-level field. |
|
19 |
|
20 PythonTestSharder uses multiprocessing.Pool, which creates a pool of |
|
21 processes. This is used to initialize each worker in the pool, ensuring that |
|
22 each worker has access to this shared pool of tests. |
|
23 |
|
24 The multiprocessing module requires that this be a top-level method. |
|
25 |
|
26 Args: |
|
27 tests_container: the container for all the tests. |
|
28 """ |
|
29 PythonTestSharder.tests_container = tests_container |
|
30 |
|
31 |
|
32 def _DefaultRunnable(test_runner): |
|
33 """A default runnable for a PythonTestRunner. |
|
34 |
|
35 Args: |
|
36 test_runner: A PythonTestRunner which will run tests. |
|
37 |
|
38 Returns: |
|
39 The test results. |
|
40 """ |
|
41 return test_runner.RunTests() |
|
42 |
|
43 |
|
44 class PythonTestRunner(object): |
|
45 """Thin wrapper around a list of PythonTestBase instances. |
|
46 |
|
47 This is meant to be a long-lived object which can run multiple Python tests |
|
48 within its lifetime. Tests will receive the device_id and shard_index. |
|
49 |
|
50 The shard index affords the ability to create unique port numbers (e.g. |
|
51 DEFAULT_PORT + shard_index) if the test so wishes. |
|
52 """ |
|
53 |
|
54 def __init__(self, options): |
|
55 """Constructor. |
|
56 |
|
57 Args: |
|
58 options: Options to use for setting up tests. |
|
59 """ |
|
60 self.options = options |
|
61 |
|
62 def RunTests(self): |
|
63 """Runs tests from the shared pool of tests, aggregating results. |
|
64 |
|
65 Returns: |
|
66 A list of test results for all of the tests which this runner executed. |
|
67 """ |
|
68 tests = PythonTestSharder.tests_container |
|
69 |
|
70 results = [] |
|
71 for t in tests: |
|
72 res = CallPythonTest(t, self.options) |
|
73 results.append(res) |
|
74 |
|
75 return TestResults.FromTestResults(results) |
|
76 |
|
77 |
|
78 class PythonTestSharder(object): |
|
79 """Runs Python tests in parallel on multiple devices. |
|
80 |
|
81 This is lifted more or less wholesale from BaseTestRunner. |
|
82 |
|
83 Under the covers, it creates a pool of long-lived PythonTestRunners, which |
|
84 execute tests from the pool of tests. |
|
85 |
|
86 Args: |
|
87 attached_devices: a list of device IDs attached to the host. |
|
88 available_tests: a list of tests to run which subclass PythonTestBase. |
|
89 options: Options to use for setting up tests. |
|
90 |
|
91 Returns: |
|
92 An aggregated list of test results. |
|
93 """ |
|
94 tests_container = None |
|
95 |
|
96 def __init__(self, attached_devices, available_tests, options): |
|
97 self.options = options |
|
98 self.attached_devices = attached_devices |
|
99 self.retries = options.shard_retries |
|
100 self.tests = available_tests |
|
101 |
|
102 def _SetupSharding(self, tests): |
|
103 """Creates the shared pool of tests and makes it available to test runners. |
|
104 |
|
105 Args: |
|
106 tests: the list of tests which will be consumed by workers. |
|
107 """ |
|
108 SetTestsContainer(sharded_tests_queue.ShardedTestsQueue( |
|
109 len(self.attached_devices), tests)) |
|
110 |
|
111 def RunShardedTests(self): |
|
112 """Runs tests in parallel using a pool of workers. |
|
113 |
|
114 Returns: |
|
115 A list of test results aggregated from all test runs. |
|
116 """ |
|
117 logging.warning('*' * 80) |
|
118 logging.warning('Sharding in ' + str(len(self.attached_devices)) + |
|
119 ' devices.') |
|
120 logging.warning('Note that the output is not synchronized.') |
|
121 logging.warning('Look for the "Final result" banner in the end.') |
|
122 logging.warning('*' * 80) |
|
123 all_passed = [] |
|
124 test_results = TestResults() |
|
125 tests_to_run = self.tests |
|
126 for retry in xrange(self.retries): |
|
127 logging.warning('Try %d of %d', retry + 1, self.retries) |
|
128 self._SetupSharding(self.tests) |
|
129 test_runners = self._MakeTestRunners(self.attached_devices) |
|
130 logging.warning('Starting...') |
|
131 pool = multiprocessing.Pool(len(self.attached_devices), |
|
132 SetTestsContainer, |
|
133 [PythonTestSharder.tests_container]) |
|
134 |
|
135 # List of TestResults objects from each test execution. |
|
136 try: |
|
137 results_lists = pool.map(_DefaultRunnable, test_runners) |
|
138 except Exception: |
|
139 logging.exception('Unable to run tests. Something with the ' |
|
140 'PythonTestRunners has gone wrong.') |
|
141 raise FatalTestException('PythonTestRunners were unable to run tests.') |
|
142 |
|
143 test_results = TestResults.FromTestResults(results_lists) |
|
144 # Accumulate passing results. |
|
145 all_passed += test_results.ok |
|
146 # If we have failed tests, map them to tests to retry. |
|
147 failed_tests = test_results.GetAllBroken() |
|
148 tests_to_run = self._GetTestsToRetry(self.tests, |
|
149 failed_tests) |
|
150 |
|
151 # Bail out early if we have no more tests. This can happen if all tests |
|
152 # pass before we're out of retries, for example. |
|
153 if not tests_to_run: |
|
154 break |
|
155 |
|
156 final_results = TestResults() |
|
157 # all_passed has accumulated all passing test results. |
|
158 # test_results will have the results from the most recent run, which could |
|
159 # include a variety of failure modes (unknown, crashed, failed, etc). |
|
160 final_results = test_results |
|
161 final_results.ok = all_passed |
|
162 |
|
163 return final_results |
|
164 |
|
165 def _MakeTestRunners(self, attached_devices): |
|
166 """Initialize and return a list of PythonTestRunners. |
|
167 |
|
168 Args: |
|
169 attached_devices: list of device IDs attached to host. |
|
170 |
|
171 Returns: |
|
172 A list of PythonTestRunners, one for each device. |
|
173 """ |
|
174 test_runners = [] |
|
175 for index, device in enumerate(attached_devices): |
|
176 logging.warning('*' * 80) |
|
177 logging.warning('Creating shard %d for %s', index, device) |
|
178 logging.warning('*' * 80) |
|
179 # Bind the PythonTestRunner to a device & shard index. Give it the |
|
180 # runnable which it will use to actually execute the tests. |
|
181 test_options = copy.deepcopy(self.options) |
|
182 test_options.ensure_value('device_id', device) |
|
183 test_options.ensure_value('shard_index', index) |
|
184 test_runner = PythonTestRunner(test_options) |
|
185 test_runners.append(test_runner) |
|
186 |
|
187 return test_runners |
|
188 |
|
189 def _GetTestsToRetry(self, available_tests, failed_tests): |
|
190 """Infers a list of tests to retry from failed tests and available tests. |
|
191 |
|
192 Args: |
|
193 available_tests: a list of tests which subclass PythonTestBase. |
|
194 failed_tests: a list of SingleTestResults representing failed tests. |
|
195 |
|
196 Returns: |
|
197 A list of test objects which correspond to test names found in |
|
198 failed_tests, or an empty list if there is no correspondence. |
|
199 """ |
|
200 failed_test_names = map(lambda t: t.test_name, failed_tests) |
|
201 tests_to_retry = [t for t in available_tests |
|
202 if t.qualified_name in failed_test_names] |
|
203 return tests_to_retry |