testing/marionette/update-smoketests/smoketest.py

changeset 0
6474c204b198
equal deleted inserted replaced
-1:000000000000 0:ad00cf225fcf
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5 import json
6 import os
7 import subprocess
8 import sys
9 import tempfile
10 import threading
11 import zipfile
12
13 from ConfigParser import ConfigParser
14
15 this_dir = os.path.abspath(os.path.dirname(__file__))
16 marionette_dir = os.path.dirname(this_dir)
17 marionette_client_dir = os.path.join(marionette_dir, 'client', 'marionette')
18
19 def find_b2g():
20 sys.path.append(marionette_client_dir)
21 from b2gbuild import B2GBuild
22 return B2GBuild()
23
24 class DictObject(dict):
25 def __getattr__(self, item):
26 try:
27 return self.__getitem__(item)
28 except KeyError:
29 raise AttributeError(item)
30
31 def __getitem__(self, item):
32 value = dict.__getitem__(self, item)
33 if isinstance(value, dict):
34 return DictObject(value)
35 return value
36
37 class SmokeTestError(Exception):
38 pass
39
40 class SmokeTestConfigError(SmokeTestError):
41 def __init__(self, message):
42 SmokeTestError.__init__(self, 'smoketest-config.json: ' + message)
43
44 class SmokeTestConfig(DictObject):
45 TOP_LEVEL_REQUIRED = ('devices', 'public_key', 'private_key')
46 DEVICE_REQUIRED = ('system_fs_type', 'system_location', 'data_fs_type',
47 'data_location', 'sdcard', 'sdcard_recovery',
48 'serials')
49
50 def __init__(self, build_dir):
51 self.top_dir = build_dir
52 self.build_data = {}
53 self.flash_template = None
54
55 with open(os.path.join(build_dir, 'smoketest-config.json')) as f:
56 DictObject.__init__(self, json.loads(f.read()))
57
58 for required in self.TOP_LEVEL_REQUIRED:
59 if required not in self:
60 raise SmokeTestConfigError('No "%s" found' % required)
61
62 if len(self.devices) == 0:
63 raise SmokeTestConfigError('No devices found')
64
65 for name, device in self.devices.iteritems():
66 for required in self.DEVICE_REQUIRED:
67 if required not in device:
68 raise SmokeTestConfigError('No "%s" found in device "%s"' % (required, name))
69
70 def get_build_data(self, device, build_id):
71 if device in self.build_data:
72 if build_id in self.build_data[device]:
73 return self.build_data[device][build_id]
74 else:
75 self.build_data[device] = {}
76
77 build_dir = os.path.join(self.top_dir, device, build_id)
78 flash_zip = os.path.join(build_dir, 'flash.zip')
79 with zipfile.ZipFile(flash_zip) as zip:
80 app_ini = ConfigParser()
81 app_ini.readfp(zip.open('system/b2g/application.ini'))
82 platform_ini = ConfigParser()
83 platform_ini.readfp(zip.open('system/b2g/platform.ini'))
84
85 build_data = self.build_data[device][build_id] = DictObject({
86 'app_version': app_ini.get('App', 'version'),
87 'app_build_id': app_ini.get('App', 'buildid'),
88 'platform_build_id': platform_ini.get('Build', 'buildid'),
89 'platform_milestone': platform_ini.get('Build', 'milestone'),
90 'complete_mar': os.path.join(build_dir, 'complete.mar'),
91 'flash_script': os.path.join(build_dir, 'flash.sh')
92 })
93
94 return build_data
95
96 class SmokeTestRunner(object):
97 DEVICE_TIMEOUT = 30
98
99 def __init__(self, config, b2g, run_dir=None):
100 self.config = config
101 self.b2g = b2g
102 self.run_dir = run_dir or tempfile.mkdtemp()
103
104 update_tools = self.b2g.import_update_tools()
105 self.b2g_config = update_tools.B2GConfig()
106
107 def run_b2g_update_test(self, serial, testvars, tests):
108 b2g_update_test = os.path.join(marionette_client_dir,
109 'venv_b2g_update_test.sh')
110
111 if not tests:
112 tests = [os.path.join(marionette_client_dir, 'tests',
113 'update-tests.ini')]
114
115 args = ['bash', b2g_update_test, sys.executable,
116 '--homedir', self.b2g.homedir,
117 '--address', 'localhost:2828',
118 '--type', 'b2g+smoketest',
119 '--device', serial,
120 '--testvars', testvars]
121 args.extend(tests)
122
123 print ' '.join(args)
124 subprocess.check_call(args)
125
126 def build_testvars(self, device, start_id, finish_id):
127 run_dir = os.path.join(self.run_dir, device, start_id, finish_id)
128 if not os.path.exists(run_dir):
129 os.makedirs(run_dir)
130
131 start_data = self.config.get_build_data(device, start_id)
132 finish_data = self.config.get_build_data(device, finish_id)
133
134 partial_mar = os.path.join(run_dir, 'partial.mar')
135 if not os.path.exists(partial_mar):
136 build_gecko_mar = os.path.join(self.b2g.update_tools,
137 'build-gecko-mar.py')
138 subprocess.check_call([sys.executable, build_gecko_mar,
139 '--from', start_data.complete_mar,
140 '--to', finish_data.complete_mar,
141 partial_mar])
142 finish_data['partial_mar'] = partial_mar
143
144 testvars = os.path.join(run_dir, 'testvars.json')
145 if not os.path.exists(testvars):
146 open(testvars, 'w').write(json.dumps({
147 'start': start_data,
148 'finish': finish_data
149 }))
150
151 return testvars
152
153 def wait_for_device(self, device):
154 for serial in self.config.devices[device].serials:
155 proc = subprocess.Popen([self.b2g.adb_path, '-s', serial,
156 'wait-for-device'])
157 def wait_for_adb():
158 proc.communicate()
159
160 thread = threading.Thread(target=wait_for_adb)
161 thread.start()
162 thread.join(self.DEVICE_TIMEOUT)
163
164 if thread.isAlive():
165 print >>sys.stderr, '%s device %s is not recognized by ADB, ' \
166 'trying next device' % (device, serial)
167 proc.kill()
168 thread.join()
169 continue
170
171 return serial
172 return None
173
174 def run_smoketests_for_device(self, device, start_id, finish_id, tests):
175 testvars = self.build_testvars(device, start_id, finish_id)
176 serial = self.wait_for_device(device)
177 if not serial:
178 raise SmokeTestError('No connected serials for device "%s" could ' \
179 'be found' % device)
180
181 try:
182 self.run_b2g_update_test(serial, testvars, tests)
183 except subprocess.CalledProcessError:
184 print >>sys.stderr, 'SMOKETEST-FAIL | START=%s | FINISH=%s | ' \
185 'DEVICE=%s/%s | %s' % (start_id, finish_id,
186 device, serial, testvars)
187
188 def run_smoketests(self, build_ids, tests):
189 build_ids.sort()
190
191 latest_build_id = build_ids.pop(-1)
192 for build_id in build_ids:
193 for device in self.config.devices:
194 self.run_smoketests_for_device(device, build_id,
195 latest_build_id, tests)

mercurial