|
1 # This Source Code Form is subject to the terms of the Mozilla Public |
|
2 # License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
|
4 |
|
5 from ctypes import sizeof, windll, addressof, c_wchar, create_unicode_buffer |
|
6 from ctypes.wintypes import DWORD, HANDLE |
|
7 |
|
8 PROCESS_TERMINATE = 0x0001 |
|
9 PROCESS_QUERY_INFORMATION = 0x0400 |
|
10 PROCESS_VM_READ = 0x0010 |
|
11 |
|
12 def get_pids(process_name): |
|
13 BIG_ARRAY = DWORD * 4096 |
|
14 processes = BIG_ARRAY() |
|
15 needed = DWORD() |
|
16 |
|
17 pids = [] |
|
18 result = windll.psapi.EnumProcesses(processes, |
|
19 sizeof(processes), |
|
20 addressof(needed)) |
|
21 if not result: |
|
22 return pids |
|
23 |
|
24 num_results = needed.value / sizeof(DWORD) |
|
25 |
|
26 for i in range(num_results): |
|
27 pid = processes[i] |
|
28 process = windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | |
|
29 PROCESS_VM_READ, |
|
30 0, pid) |
|
31 if process: |
|
32 module = HANDLE() |
|
33 result = windll.psapi.EnumProcessModules(process, |
|
34 addressof(module), |
|
35 sizeof(module), |
|
36 addressof(needed)) |
|
37 if result: |
|
38 name = create_unicode_buffer(1024) |
|
39 result = windll.psapi.GetModuleBaseNameW(process, module, |
|
40 name, len(name)) |
|
41 # TODO: This might not be the best way to |
|
42 # match a process name; maybe use a regexp instead. |
|
43 if name.value.startswith(process_name): |
|
44 pids.append(pid) |
|
45 windll.kernel32.CloseHandle(module) |
|
46 windll.kernel32.CloseHandle(process) |
|
47 |
|
48 return pids |
|
49 |
|
50 def kill_pid(pid): |
|
51 process = windll.kernel32.OpenProcess(PROCESS_TERMINATE, 0, pid) |
|
52 if process: |
|
53 windll.kernel32.TerminateProcess(process, 0) |
|
54 windll.kernel32.CloseHandle(process) |
|
55 |
|
56 if __name__ == '__main__': |
|
57 import subprocess |
|
58 import time |
|
59 |
|
60 # This test just opens a new notepad instance and kills it. |
|
61 |
|
62 name = 'notepad' |
|
63 |
|
64 old_pids = set(get_pids(name)) |
|
65 subprocess.Popen([name]) |
|
66 time.sleep(0.25) |
|
67 new_pids = set(get_pids(name)).difference(old_pids) |
|
68 |
|
69 if len(new_pids) != 1: |
|
70 raise Exception('%s was not opened or get_pids() is ' |
|
71 'malfunctioning' % name) |
|
72 |
|
73 kill_pid(tuple(new_pids)[0]) |
|
74 |
|
75 newest_pids = set(get_pids(name)).difference(old_pids) |
|
76 |
|
77 if len(newest_pids) != 0: |
|
78 raise Exception('kill_pid() is malfunctioning') |
|
79 |
|
80 print "Test passed." |