|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """Windows specific tests. These are implicitly run by test_psutil.py.""" |
|
8 |
|
9 import os |
|
10 import unittest |
|
11 import platform |
|
12 import signal |
|
13 import time |
|
14 import sys |
|
15 import subprocess |
|
16 import errno |
|
17 import traceback |
|
18 |
|
19 import psutil |
|
20 import _psutil_mswindows |
|
21 from psutil._compat import PY3, callable, long |
|
22 from test_psutil import * |
|
23 |
|
24 try: |
|
25 import wmi |
|
26 except ImportError: |
|
27 err = sys.exc_info()[1] |
|
28 register_warning("Couldn't run wmi tests: %s" % str(err)) |
|
29 wmi = None |
|
30 try: |
|
31 import win32api |
|
32 import win32con |
|
33 except ImportError: |
|
34 err = sys.exc_info()[1] |
|
35 register_warning("Couldn't run pywin32 tests: %s" % str(err)) |
|
36 win32api = None |
|
37 |
|
38 |
|
39 class WindowsSpecificTestCase(unittest.TestCase): |
|
40 |
|
41 def setUp(self): |
|
42 sproc = get_test_subprocess() |
|
43 wait_for_pid(sproc.pid) |
|
44 self.pid = sproc.pid |
|
45 |
|
46 def tearDown(self): |
|
47 reap_children() |
|
48 |
|
49 def test_issue_24(self): |
|
50 p = psutil.Process(0) |
|
51 self.assertRaises(psutil.AccessDenied, p.kill) |
|
52 |
|
53 def test_special_pid(self): |
|
54 p = psutil.Process(4) |
|
55 self.assertEqual(p.name, 'System') |
|
56 # use __str__ to access all common Process properties to check |
|
57 # that nothing strange happens |
|
58 str(p) |
|
59 p.username |
|
60 self.assertTrue(p.create_time >= 0.0) |
|
61 try: |
|
62 rss, vms = p.get_memory_info() |
|
63 except psutil.AccessDenied: |
|
64 # expected on Windows Vista and Windows 7 |
|
65 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): |
|
66 raise |
|
67 else: |
|
68 self.assertTrue(rss > 0) |
|
69 |
|
70 def test_signal(self): |
|
71 p = psutil.Process(self.pid) |
|
72 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) |
|
73 |
|
74 def test_nic_names(self): |
|
75 p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) |
|
76 out = p.communicate()[0] |
|
77 if PY3: |
|
78 out = str(out, sys.stdout.encoding) |
|
79 nics = psutil.net_io_counters(pernic=True).keys() |
|
80 for nic in nics: |
|
81 if "pseudo-interface" in nic.replace(' ', '-').lower(): |
|
82 continue |
|
83 if nic not in out: |
|
84 self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic) |
|
85 |
|
86 def test_exe(self): |
|
87 for p in psutil.process_iter(): |
|
88 try: |
|
89 self.assertEqual(os.path.basename(p.exe), p.name) |
|
90 except psutil.Error: |
|
91 pass |
|
92 |
|
93 if wmi is not None: |
|
94 |
|
95 # --- Process class tests |
|
96 |
|
97 def test_process_name(self): |
|
98 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
99 p = psutil.Process(self.pid) |
|
100 self.assertEqual(p.name, w.Caption) |
|
101 |
|
102 def test_process_exe(self): |
|
103 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
104 p = psutil.Process(self.pid) |
|
105 self.assertEqual(p.exe, w.ExecutablePath) |
|
106 |
|
107 def test_process_cmdline(self): |
|
108 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
109 p = psutil.Process(self.pid) |
|
110 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', '')) |
|
111 |
|
112 def test_process_username(self): |
|
113 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
114 p = psutil.Process(self.pid) |
|
115 domain, _, username = w.GetOwner() |
|
116 username = "%s\\%s" %(domain, username) |
|
117 self.assertEqual(p.username, username) |
|
118 |
|
119 def test_process_rss_memory(self): |
|
120 time.sleep(0.1) |
|
121 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
122 p = psutil.Process(self.pid) |
|
123 rss = p.get_memory_info().rss |
|
124 self.assertEqual(rss, int(w.WorkingSetSize)) |
|
125 |
|
126 def test_process_vms_memory(self): |
|
127 time.sleep(0.1) |
|
128 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
129 p = psutil.Process(self.pid) |
|
130 vms = p.get_memory_info().vms |
|
131 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx |
|
132 # ...claims that PageFileUsage is represented in Kilo |
|
133 # bytes but funnily enough on certain platforms bytes are |
|
134 # returned instead. |
|
135 wmi_usage = int(w.PageFileUsage) |
|
136 if (vms != wmi_usage) and (vms != wmi_usage * 1024): |
|
137 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) |
|
138 |
|
139 def test_process_create_time(self): |
|
140 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
141 p = psutil.Process(self.pid) |
|
142 wmic_create = str(w.CreationDate.split('.')[0]) |
|
143 psutil_create = time.strftime("%Y%m%d%H%M%S", |
|
144 time.localtime(p.create_time)) |
|
145 self.assertEqual(wmic_create, psutil_create) |
|
146 |
|
147 |
|
148 # --- psutil namespace functions and constants tests |
|
149 |
|
150 @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'), |
|
151 'NUMBER_OF_PROCESSORS env var is not available') |
|
152 def test_NUM_CPUS(self): |
|
153 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) |
|
154 self.assertEqual(num_cpus, psutil.NUM_CPUS) |
|
155 |
|
156 def test_TOTAL_PHYMEM(self): |
|
157 w = wmi.WMI().Win32_ComputerSystem()[0] |
|
158 self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) |
|
159 |
|
160 def test__UPTIME(self): |
|
161 # _UPTIME constant is not public but it is used internally |
|
162 # as value to return for pid 0 creation time. |
|
163 # WMI behaves the same. |
|
164 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
|
165 p = psutil.Process(0) |
|
166 wmic_create = str(w.CreationDate.split('.')[0]) |
|
167 psutil_create = time.strftime("%Y%m%d%H%M%S", |
|
168 time.localtime(p.create_time)) |
|
169 # XXX - ? no actual test here |
|
170 |
|
171 def test_get_pids(self): |
|
172 # Note: this test might fail if the OS is starting/killing |
|
173 # other processes in the meantime |
|
174 w = wmi.WMI().Win32_Process() |
|
175 wmi_pids = [x.ProcessId for x in w] |
|
176 wmi_pids.sort() |
|
177 psutil_pids = psutil.get_pid_list() |
|
178 psutil_pids.sort() |
|
179 if wmi_pids != psutil_pids: |
|
180 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ |
|
181 filter(lambda x:x not in psutil_pids, wmi_pids) |
|
182 self.fail("difference: " + str(difference)) |
|
183 |
|
184 def test_disks(self): |
|
185 ps_parts = psutil.disk_partitions(all=True) |
|
186 wmi_parts = wmi.WMI().Win32_LogicalDisk() |
|
187 for ps_part in ps_parts: |
|
188 for wmi_part in wmi_parts: |
|
189 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: |
|
190 if not ps_part.mountpoint: |
|
191 # this is usually a CD-ROM with no disk inserted |
|
192 break |
|
193 try: |
|
194 usage = psutil.disk_usage(ps_part.mountpoint) |
|
195 except OSError: |
|
196 err = sys.exc_info()[1] |
|
197 if err.errno == errno.ENOENT: |
|
198 # usually this is the floppy |
|
199 break |
|
200 else: |
|
201 raise |
|
202 self.assertEqual(usage.total, int(wmi_part.Size)) |
|
203 wmi_free = int(wmi_part.FreeSpace) |
|
204 self.assertEqual(usage.free, wmi_free) |
|
205 # 10 MB tollerance |
|
206 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: |
|
207 self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free) |
|
208 break |
|
209 else: |
|
210 self.fail("can't find partition %s" % repr(ps_part)) |
|
211 |
|
212 if win32api is not None: |
|
213 |
|
214 def test_get_num_handles(self): |
|
215 p = psutil.Process(os.getpid()) |
|
216 before = p.get_num_handles() |
|
217 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, |
|
218 win32con.FALSE, os.getpid()) |
|
219 after = p.get_num_handles() |
|
220 self.assertEqual(after, before+1) |
|
221 win32api.CloseHandle(handle) |
|
222 self.assertEqual(p.get_num_handles(), before) |
|
223 |
|
224 def test_get_num_handles_2(self): |
|
225 # Note: this fails from time to time; I'm keen on thinking |
|
226 # it doesn't mean something is broken |
|
227 def call(p, attr): |
|
228 attr = getattr(p, name, None) |
|
229 if attr is not None and callable(attr): |
|
230 ret = attr() |
|
231 else: |
|
232 ret = attr |
|
233 |
|
234 p = psutil.Process(self.pid) |
|
235 attrs = [] |
|
236 failures = [] |
|
237 for name in dir(psutil.Process): |
|
238 if name.startswith('_') \ |
|
239 or name.startswith('set_') \ |
|
240 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', |
|
241 'send_signal', 'wait', 'get_children', 'as_dict'): |
|
242 continue |
|
243 else: |
|
244 try: |
|
245 call(p, name) |
|
246 num1 = p.get_num_handles() |
|
247 call(p, name) |
|
248 num2 = p.get_num_handles() |
|
249 except (psutil.NoSuchProcess, psutil.AccessDenied): |
|
250 pass |
|
251 else: |
|
252 if num2 > num1: |
|
253 fail = "failure while processing Process.%s method " \ |
|
254 "(before=%s, after=%s)" % (name, num1, num2) |
|
255 failures.append(fail) |
|
256 if failures: |
|
257 self.fail('\n' + '\n'.join(failures)) |
|
258 |
|
259 |
|
260 import _psutil_mswindows |
|
261 from psutil._psmswindows import ACCESS_DENIED_SET |
|
262 |
|
263 def wrap_exceptions(callable): |
|
264 def wrapper(self, *args, **kwargs): |
|
265 try: |
|
266 return callable(self, *args, **kwargs) |
|
267 except OSError: |
|
268 err = sys.exc_info()[1] |
|
269 if err.errno in ACCESS_DENIED_SET: |
|
270 raise psutil.AccessDenied(None, None) |
|
271 if err.errno == errno.ESRCH: |
|
272 raise psutil.NoSuchProcess(None, None) |
|
273 raise |
|
274 return wrapper |
|
275 |
|
276 class TestDualProcessImplementation(unittest.TestCase): |
|
277 fun_names = [ |
|
278 # function name tolerance |
|
279 ('get_process_cpu_times', 0.2), |
|
280 ('get_process_create_time', 0.5), |
|
281 ('get_process_num_handles', 1), # 1 because impl #1 opens a handle |
|
282 ('get_process_io_counters', 0), |
|
283 ('get_process_memory_info', 1024), # KB |
|
284 ] |
|
285 |
|
286 def test_compare_values(self): |
|
287 # Certain APIs on Windows have 2 internal implementations, one |
|
288 # based on documented Windows APIs, another one based |
|
289 # NtQuerySystemInformation() which gets called as fallback in |
|
290 # case the first fails because of limited permission error. |
|
291 # Here we test that the two methods return the exact same value, |
|
292 # see: |
|
293 # http://code.google.com/p/psutil/issues/detail?id=304 |
|
294 def assert_ge_0(obj): |
|
295 if isinstance(obj, tuple): |
|
296 for value in obj: |
|
297 self.assertGreaterEqual(value, 0) |
|
298 elif isinstance(obj, (int, long, float)): |
|
299 self.assertGreaterEqual(obj, 0) |
|
300 else: |
|
301 assert 0 # case not handled which needs to be fixed |
|
302 |
|
303 def compare_with_tolerance(ret1, ret2, tolerance): |
|
304 if ret1 == ret2: |
|
305 return |
|
306 else: |
|
307 if isinstance(ret2, (int, long, float)): |
|
308 diff = abs(ret1 - ret2) |
|
309 self.assertLessEqual(diff, tolerance) |
|
310 elif isinstance(ret2, tuple): |
|
311 for a, b in zip(ret1, ret2): |
|
312 diff = abs(a - b) |
|
313 self.assertLessEqual(diff, tolerance) |
|
314 |
|
315 failures = [] |
|
316 for name, tolerance in self.fun_names: |
|
317 meth1 = wrap_exceptions(getattr(_psutil_mswindows, name)) |
|
318 meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2')) |
|
319 for p in psutil.process_iter(): |
|
320 if name == 'get_process_memory_info' and p.pid == os.getpid(): |
|
321 continue |
|
322 # |
|
323 try: |
|
324 ret1 = meth1(p.pid) |
|
325 except psutil.NoSuchProcess: |
|
326 continue |
|
327 except psutil.AccessDenied: |
|
328 ret1 = None |
|
329 # |
|
330 try: |
|
331 ret2 = meth2(p.pid) |
|
332 except psutil.NoSuchProcess: |
|
333 # this is supposed to fail only in case of zombie process |
|
334 # never for permission error |
|
335 continue |
|
336 |
|
337 # compare values |
|
338 try: |
|
339 if ret1 is None: |
|
340 assert_ge_0(ret2) |
|
341 else: |
|
342 compare_with_tolerance(ret1, ret2, tolerance) |
|
343 assert_ge_0(ret1) |
|
344 assert_ge_0(ret2) |
|
345 except AssertionError: |
|
346 err = sys.exc_info()[1] |
|
347 trace = traceback.format_exc() |
|
348 msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \ |
|
349 % (trace, p.pid, name, ret1, ret2) |
|
350 failures.append(msg) |
|
351 break |
|
352 if failures: |
|
353 self.fail('\n\n'.join(failures)) |
|
354 |
|
355 def test_zombies(self): |
|
356 # test that NPS is raised by the 2nd implementation in case a |
|
357 # process no longer exists |
|
358 ZOMBIE_PID = max(psutil.get_pid_list()) + 5000 |
|
359 for name, _ in self.fun_names: |
|
360 meth = wrap_exceptions(getattr(_psutil_mswindows, name)) |
|
361 self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) |
|
362 |
|
363 |
|
364 def test_main(): |
|
365 test_suite = unittest.TestSuite() |
|
366 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) |
|
367 test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) |
|
368 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
369 return result.wasSuccessful() |
|
370 |
|
371 if __name__ == '__main__': |
|
372 if not test_main(): |
|
373 sys.exit(1) |