michael@0: # Copyright (c) 2012 The Chromium Authors. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """A "Test Server Spawner" that handles killing/stopping per-test test servers. michael@0: michael@0: It's used to accept requests from the device to spawn and kill instances of the michael@0: chrome test server on the host. michael@0: """ michael@0: michael@0: import BaseHTTPServer michael@0: import json michael@0: import logging michael@0: import os michael@0: import select michael@0: import struct michael@0: import subprocess michael@0: import threading michael@0: import time michael@0: import urlparse michael@0: michael@0: import constants michael@0: from forwarder import Forwarder michael@0: import ports michael@0: michael@0: michael@0: # Path that are needed to import necessary modules when running testserver.py. michael@0: os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + ':%s:%s:%s:%s' % ( michael@0: os.path.join(constants.CHROME_DIR, 'third_party'), michael@0: os.path.join(constants.CHROME_DIR, 'third_party', 'tlslite'), michael@0: os.path.join(constants.CHROME_DIR, 'third_party', 'pyftpdlib', 'src'), michael@0: os.path.join(constants.CHROME_DIR, 'net', 'tools', 'testserver')) michael@0: michael@0: michael@0: SERVER_TYPES = { michael@0: 'http': '', michael@0: 'ftp': '-f', michael@0: 'sync': '--sync', michael@0: 'tcpecho': '--tcp-echo', michael@0: 'udpecho': '--udp-echo', michael@0: } michael@0: michael@0: michael@0: # The timeout (in seconds) of starting up the Python test server. michael@0: TEST_SERVER_STARTUP_TIMEOUT = 10 michael@0: michael@0: michael@0: def _CheckPortStatus(port, expected_status): michael@0: """Returns True if port has expected_status. michael@0: michael@0: Args: michael@0: port: the port number. michael@0: expected_status: boolean of expected status. michael@0: michael@0: Returns: michael@0: Returns True if the status is expected. Otherwise returns False. michael@0: """ michael@0: for timeout in range(1, 5): michael@0: if ports.IsHostPortUsed(port) == expected_status: michael@0: return True michael@0: time.sleep(timeout) michael@0: return False michael@0: michael@0: michael@0: def _GetServerTypeCommandLine(server_type): michael@0: """Returns the command-line by the given server type. michael@0: michael@0: Args: michael@0: server_type: the server type to be used (e.g. 'http'). michael@0: michael@0: Returns: michael@0: A string containing the command-line argument. michael@0: """ michael@0: if server_type not in SERVER_TYPES: michael@0: raise NotImplementedError('Unknown server type: %s' % server_type) michael@0: if server_type == 'udpecho': michael@0: raise Exception('Please do not run UDP echo tests because we do not have ' michael@0: 'a UDP forwarder tool.') michael@0: return SERVER_TYPES[server_type] michael@0: michael@0: michael@0: class TestServerThread(threading.Thread): michael@0: """A thread to run the test server in a separate process.""" michael@0: michael@0: def __init__(self, ready_event, arguments, adb, tool, build_type): michael@0: """Initialize TestServerThread with the following argument. michael@0: michael@0: Args: michael@0: ready_event: event which will be set when the test server is ready. michael@0: arguments: dictionary of arguments to run the test server. michael@0: adb: instance of AndroidCommands. michael@0: tool: instance of runtime error detection tool. michael@0: build_type: 'Release' or 'Debug'. michael@0: """ michael@0: threading.Thread.__init__(self) michael@0: self.wait_event = threading.Event() michael@0: self.stop_flag = False michael@0: self.ready_event = ready_event michael@0: self.ready_event.clear() michael@0: self.arguments = arguments michael@0: self.adb = adb michael@0: self.tool = tool michael@0: self.test_server_process = None michael@0: self.is_ready = False michael@0: self.host_port = self.arguments['port'] michael@0: assert isinstance(self.host_port, int) michael@0: self._test_server_forwarder = None michael@0: # The forwarder device port now is dynamically allocated. michael@0: self.forwarder_device_port = 0 michael@0: # Anonymous pipe in order to get port info from test server. michael@0: self.pipe_in = None michael@0: self.pipe_out = None michael@0: self.command_line = [] michael@0: self.build_type = build_type michael@0: michael@0: def _WaitToStartAndGetPortFromTestServer(self): michael@0: """Waits for the Python test server to start and gets the port it is using. michael@0: michael@0: The port information is passed by the Python test server with a pipe given michael@0: by self.pipe_out. It is written as a result to |self.host_port|. michael@0: michael@0: Returns: michael@0: Whether the port used by the test server was successfully fetched. michael@0: """ michael@0: assert self.host_port == 0 and self.pipe_out and self.pipe_in michael@0: (in_fds, _, _) = select.select([self.pipe_in, ], [], [], michael@0: TEST_SERVER_STARTUP_TIMEOUT) michael@0: if len(in_fds) == 0: michael@0: logging.error('Failed to wait to the Python test server to be started.') michael@0: return False michael@0: # First read the data length as an unsigned 4-byte value. This michael@0: # is _not_ using network byte ordering since the Python test server packs michael@0: # size as native byte order and all Chromium platforms so far are michael@0: # configured to use little-endian. michael@0: # TODO(jnd): Change the Python test server and local_test_server_*.cc to michael@0: # use a unified byte order (either big-endian or little-endian). michael@0: data_length = os.read(self.pipe_in, struct.calcsize('=L')) michael@0: if data_length: michael@0: (data_length,) = struct.unpack('=L', data_length) michael@0: assert data_length michael@0: if not data_length: michael@0: logging.error('Failed to get length of server data.') michael@0: return False michael@0: port_json = os.read(self.pipe_in, data_length) michael@0: if not port_json: michael@0: logging.error('Failed to get server data.') michael@0: return False michael@0: logging.info('Got port json data: %s', port_json) michael@0: port_json = json.loads(port_json) michael@0: if port_json.has_key('port') and isinstance(port_json['port'], int): michael@0: self.host_port = port_json['port'] michael@0: return _CheckPortStatus(self.host_port, True) michael@0: logging.error('Failed to get port information from the server data.') michael@0: return False michael@0: michael@0: def _GenerateCommandLineArguments(self): michael@0: """Generates the command line to run the test server. michael@0: michael@0: Note that all options are processed by following the definitions in michael@0: testserver.py. michael@0: """ michael@0: if self.command_line: michael@0: return michael@0: # The following arguments must exist. michael@0: type_cmd = _GetServerTypeCommandLine(self.arguments['server-type']) michael@0: if type_cmd: michael@0: self.command_line.append(type_cmd) michael@0: self.command_line.append('--port=%d' % self.host_port) michael@0: # Use a pipe to get the port given by the instance of Python test server michael@0: # if the test does not specify the port. michael@0: if self.host_port == 0: michael@0: (self.pipe_in, self.pipe_out) = os.pipe() michael@0: self.command_line.append('--startup-pipe=%d' % self.pipe_out) michael@0: self.command_line.append('--host=%s' % self.arguments['host']) michael@0: data_dir = self.arguments['data-dir'] or 'chrome/test/data' michael@0: if not os.path.isabs(data_dir): michael@0: data_dir = os.path.join(constants.CHROME_DIR, data_dir) michael@0: self.command_line.append('--data-dir=%s' % data_dir) michael@0: # The following arguments are optional depending on the individual test. michael@0: if self.arguments.has_key('log-to-console'): michael@0: self.command_line.append('--log-to-console') michael@0: if self.arguments.has_key('auth-token'): michael@0: self.command_line.append('--auth-token=%s' % self.arguments['auth-token']) michael@0: if self.arguments.has_key('https'): michael@0: self.command_line.append('--https') michael@0: if self.arguments.has_key('cert-and-key-file'): michael@0: self.command_line.append('--cert-and-key-file=%s' % os.path.join( michael@0: constants.CHROME_DIR, self.arguments['cert-and-key-file'])) michael@0: if self.arguments.has_key('ocsp'): michael@0: self.command_line.append('--ocsp=%s' % self.arguments['ocsp']) michael@0: if self.arguments.has_key('https-record-resume'): michael@0: self.command_line.append('--https-record-resume') michael@0: if self.arguments.has_key('ssl-client-auth'): michael@0: self.command_line.append('--ssl-client-auth') michael@0: if self.arguments.has_key('tls-intolerant'): michael@0: self.command_line.append('--tls-intolerant=%s' % michael@0: self.arguments['tls-intolerant']) michael@0: if self.arguments.has_key('ssl-client-ca'): michael@0: for ca in self.arguments['ssl-client-ca']: michael@0: self.command_line.append('--ssl-client-ca=%s' % michael@0: os.path.join(constants.CHROME_DIR, ca)) michael@0: if self.arguments.has_key('ssl-bulk-cipher'): michael@0: for bulk_cipher in self.arguments['ssl-bulk-cipher']: michael@0: self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher) michael@0: michael@0: def run(self): michael@0: logging.info('Start running the thread!') michael@0: self.wait_event.clear() michael@0: self._GenerateCommandLineArguments() michael@0: command = [os.path.join(constants.CHROME_DIR, 'net', 'tools', michael@0: 'testserver', 'testserver.py')] + self.command_line michael@0: logging.info('Running: %s', command) michael@0: self.process = subprocess.Popen(command) michael@0: if self.process: michael@0: if self.pipe_out: michael@0: self.is_ready = self._WaitToStartAndGetPortFromTestServer() michael@0: else: michael@0: self.is_ready = _CheckPortStatus(self.host_port, True) michael@0: if self.is_ready: michael@0: self._test_server_forwarder = Forwarder( michael@0: self.adb, [(0, self.host_port)], self.tool, '127.0.0.1', michael@0: self.build_type) michael@0: # Check whether the forwarder is ready on the device. michael@0: self.is_ready = False michael@0: device_port = self._test_server_forwarder.DevicePortForHostPort( michael@0: self.host_port) michael@0: if device_port: michael@0: for timeout in range(1, 5): michael@0: if ports.IsDevicePortUsed(self.adb, device_port, 'LISTEN'): michael@0: self.is_ready = True michael@0: self.forwarder_device_port = device_port michael@0: break michael@0: time.sleep(timeout) michael@0: # Wake up the request handler thread. michael@0: self.ready_event.set() michael@0: # Keep thread running until Stop() gets called. michael@0: while not self.stop_flag: michael@0: time.sleep(1) michael@0: if self.process.poll() is None: michael@0: self.process.kill() michael@0: if self._test_server_forwarder: michael@0: self._test_server_forwarder.Close() michael@0: self.process = None michael@0: self.is_ready = False michael@0: if self.pipe_out: michael@0: os.close(self.pipe_in) michael@0: os.close(self.pipe_out) michael@0: self.pipe_in = None michael@0: self.pipe_out = None michael@0: logging.info('Test-server has died.') michael@0: self.wait_event.set() michael@0: michael@0: def Stop(self): michael@0: """Blocks until the loop has finished. michael@0: michael@0: Note that this must be called in another thread. michael@0: """ michael@0: if not self.process: michael@0: return michael@0: self.stop_flag = True michael@0: self.wait_event.wait() michael@0: michael@0: michael@0: class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): michael@0: """A handler used to process http GET/POST request.""" michael@0: michael@0: def _SendResponse(self, response_code, response_reason, additional_headers, michael@0: contents): michael@0: """Generates a response sent to the client from the provided parameters. michael@0: michael@0: Args: michael@0: response_code: number of the response status. michael@0: response_reason: string of reason description of the response. michael@0: additional_headers: dict of additional headers. Each key is the name of michael@0: the header, each value is the content of the header. michael@0: contents: string of the contents we want to send to client. michael@0: """ michael@0: self.send_response(response_code, response_reason) michael@0: self.send_header('Content-Type', 'text/html') michael@0: # Specify the content-length as without it the http(s) response will not michael@0: # be completed properly (and the browser keeps expecting data). michael@0: self.send_header('Content-Length', len(contents)) michael@0: for header_name in additional_headers: michael@0: self.send_header(header_name, additional_headers[header_name]) michael@0: self.end_headers() michael@0: self.wfile.write(contents) michael@0: self.wfile.flush() michael@0: michael@0: def _StartTestServer(self): michael@0: """Starts the test server thread.""" michael@0: logging.info('Handling request to spawn a test server.') michael@0: content_type = self.headers.getheader('content-type') michael@0: if content_type != 'application/json': michael@0: raise Exception('Bad content-type for start request.') michael@0: content_length = self.headers.getheader('content-length') michael@0: if not content_length: michael@0: content_length = 0 michael@0: try: michael@0: content_length = int(content_length) michael@0: except: michael@0: raise Exception('Bad content-length for start request.') michael@0: logging.info(content_length) michael@0: test_server_argument_json = self.rfile.read(content_length) michael@0: logging.info(test_server_argument_json) michael@0: assert not self.server.test_server_instance michael@0: ready_event = threading.Event() michael@0: self.server.test_server_instance = TestServerThread( michael@0: ready_event, michael@0: json.loads(test_server_argument_json), michael@0: self.server.adb, michael@0: self.server.tool, michael@0: self.server.build_type) michael@0: self.server.test_server_instance.setDaemon(True) michael@0: self.server.test_server_instance.start() michael@0: ready_event.wait() michael@0: if self.server.test_server_instance.is_ready: michael@0: self._SendResponse(200, 'OK', {}, json.dumps( michael@0: {'port': self.server.test_server_instance.forwarder_device_port, michael@0: 'message': 'started'})) michael@0: logging.info('Test server is running on port: %d.', michael@0: self.server.test_server_instance.host_port) michael@0: else: michael@0: self.server.test_server_instance.Stop() michael@0: self.server.test_server_instance = None michael@0: self._SendResponse(500, 'Test Server Error.', {}, '') michael@0: logging.info('Encounter problem during starting a test server.') michael@0: michael@0: def _KillTestServer(self): michael@0: """Stops the test server instance.""" michael@0: # There should only ever be one test server at a time. This may do the michael@0: # wrong thing if we try and start multiple test servers. michael@0: if not self.server.test_server_instance: michael@0: return michael@0: port = self.server.test_server_instance.host_port michael@0: logging.info('Handling request to kill a test server on port: %d.', port) michael@0: self.server.test_server_instance.Stop() michael@0: # Make sure the status of test server is correct before sending response. michael@0: if _CheckPortStatus(port, False): michael@0: self._SendResponse(200, 'OK', {}, 'killed') michael@0: logging.info('Test server on port %d is killed', port) michael@0: else: michael@0: self._SendResponse(500, 'Test Server Error.', {}, '') michael@0: logging.info('Encounter problem during killing a test server.') michael@0: self.server.test_server_instance = None michael@0: michael@0: def do_POST(self): michael@0: parsed_path = urlparse.urlparse(self.path) michael@0: action = parsed_path.path michael@0: logging.info('Action for POST method is: %s.', action) michael@0: if action == '/start': michael@0: self._StartTestServer() michael@0: else: michael@0: self._SendResponse(400, 'Unknown request.', {}, '') michael@0: logging.info('Encounter unknown request: %s.', action) michael@0: michael@0: def do_GET(self): michael@0: parsed_path = urlparse.urlparse(self.path) michael@0: action = parsed_path.path michael@0: params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) michael@0: logging.info('Action for GET method is: %s.', action) michael@0: for param in params: michael@0: logging.info('%s=%s', param, params[param][0]) michael@0: if action == '/kill': michael@0: self._KillTestServer() michael@0: elif action == '/ping': michael@0: # The ping handler is used to check whether the spawner server is ready michael@0: # to serve the requests. We don't need to test the status of the test michael@0: # server when handling ping request. michael@0: self._SendResponse(200, 'OK', {}, 'ready') michael@0: logging.info('Handled ping request and sent response.') michael@0: else: michael@0: self._SendResponse(400, 'Unknown request', {}, '') michael@0: logging.info('Encounter unknown request: %s.', action) michael@0: michael@0: michael@0: class SpawningServer(object): michael@0: """The class used to start/stop a http server.""" michael@0: michael@0: def __init__(self, test_server_spawner_port, adb, tool, build_type): michael@0: logging.info('Creating new spawner on port: %d.', test_server_spawner_port) michael@0: self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), michael@0: SpawningServerRequestHandler) michael@0: self.port = test_server_spawner_port michael@0: self.server.adb = adb michael@0: self.server.tool = tool michael@0: self.server.test_server_instance = None michael@0: self.server.build_type = build_type michael@0: michael@0: def _Listen(self): michael@0: logging.info('Starting test server spawner') michael@0: self.server.serve_forever() michael@0: michael@0: def Start(self): michael@0: listener_thread = threading.Thread(target=self._Listen) michael@0: listener_thread.setDaemon(True) michael@0: listener_thread.start() michael@0: time.sleep(1) michael@0: michael@0: def Stop(self): michael@0: if self.server.test_server_instance: michael@0: self.server.test_server_instance.Stop() michael@0: self.server.shutdown()