michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: from ctypes import sizeof, windll, addressof, c_wchar, create_unicode_buffer michael@0: from ctypes.wintypes import DWORD, HANDLE michael@0: michael@0: PROCESS_TERMINATE = 0x0001 michael@0: PROCESS_QUERY_INFORMATION = 0x0400 michael@0: PROCESS_VM_READ = 0x0010 michael@0: michael@0: def get_pids(process_name): michael@0: BIG_ARRAY = DWORD * 4096 michael@0: processes = BIG_ARRAY() michael@0: needed = DWORD() michael@0: michael@0: pids = [] michael@0: result = windll.psapi.EnumProcesses(processes, michael@0: sizeof(processes), michael@0: addressof(needed)) michael@0: if not result: michael@0: return pids michael@0: michael@0: num_results = needed.value / sizeof(DWORD) michael@0: michael@0: for i in range(num_results): michael@0: pid = processes[i] michael@0: process = windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | michael@0: PROCESS_VM_READ, michael@0: 0, pid) michael@0: if process: michael@0: module = HANDLE() michael@0: result = windll.psapi.EnumProcessModules(process, michael@0: addressof(module), michael@0: sizeof(module), michael@0: addressof(needed)) michael@0: if result: michael@0: name = create_unicode_buffer(1024) michael@0: result = windll.psapi.GetModuleBaseNameW(process, module, michael@0: name, len(name)) michael@0: # TODO: This might not be the best way to michael@0: # match a process name; maybe use a regexp instead. michael@0: if name.value.startswith(process_name): michael@0: pids.append(pid) michael@0: windll.kernel32.CloseHandle(module) michael@0: windll.kernel32.CloseHandle(process) michael@0: michael@0: return pids michael@0: michael@0: def kill_pid(pid): michael@0: process = windll.kernel32.OpenProcess(PROCESS_TERMINATE, 0, pid) michael@0: if process: michael@0: windll.kernel32.TerminateProcess(process, 0) michael@0: windll.kernel32.CloseHandle(process) michael@0: michael@0: if __name__ == '__main__': michael@0: import subprocess michael@0: import time michael@0: michael@0: # This test just opens a new notepad instance and kills it. michael@0: michael@0: name = 'notepad' michael@0: michael@0: old_pids = set(get_pids(name)) michael@0: subprocess.Popen([name]) michael@0: time.sleep(0.25) michael@0: new_pids = set(get_pids(name)).difference(old_pids) michael@0: michael@0: if len(new_pids) != 1: michael@0: raise Exception('%s was not opened or get_pids() is ' michael@0: 'malfunctioning' % name) michael@0: michael@0: kill_pid(tuple(new_pids)[0]) michael@0: michael@0: newest_pids = set(get_pids(name)).difference(old_pids) michael@0: michael@0: if len(newest_pids) != 0: michael@0: raise Exception('kill_pid() is malfunctioning') michael@0: michael@0: print "Test passed."