1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/python/psutil/test/_windows.py Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,373 @@ 1.4 +#!/usr/bin/env python 1.5 + 1.6 +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 1.7 +# Use of this source code is governed by a BSD-style license that can be 1.8 +# found in the LICENSE file. 1.9 + 1.10 +"""Windows specific tests. These are implicitly run by test_psutil.py.""" 1.11 + 1.12 +import os 1.13 +import unittest 1.14 +import platform 1.15 +import signal 1.16 +import time 1.17 +import sys 1.18 +import subprocess 1.19 +import errno 1.20 +import traceback 1.21 + 1.22 +import psutil 1.23 +import _psutil_mswindows 1.24 +from psutil._compat import PY3, callable, long 1.25 +from test_psutil import * 1.26 + 1.27 +try: 1.28 + import wmi 1.29 +except ImportError: 1.30 + err = sys.exc_info()[1] 1.31 + register_warning("Couldn't run wmi tests: %s" % str(err)) 1.32 + wmi = None 1.33 +try: 1.34 + import win32api 1.35 + import win32con 1.36 +except ImportError: 1.37 + err = sys.exc_info()[1] 1.38 + register_warning("Couldn't run pywin32 tests: %s" % str(err)) 1.39 + win32api = None 1.40 + 1.41 + 1.42 +class WindowsSpecificTestCase(unittest.TestCase): 1.43 + 1.44 + def setUp(self): 1.45 + sproc = get_test_subprocess() 1.46 + wait_for_pid(sproc.pid) 1.47 + self.pid = sproc.pid 1.48 + 1.49 + def tearDown(self): 1.50 + reap_children() 1.51 + 1.52 + def test_issue_24(self): 1.53 + p = psutil.Process(0) 1.54 + self.assertRaises(psutil.AccessDenied, p.kill) 1.55 + 1.56 + def test_special_pid(self): 1.57 + p = psutil.Process(4) 1.58 + self.assertEqual(p.name, 'System') 1.59 + # use __str__ to access all common Process properties to check 1.60 + # that nothing strange happens 1.61 + str(p) 1.62 + p.username 1.63 + self.assertTrue(p.create_time >= 0.0) 1.64 + try: 1.65 + rss, vms = p.get_memory_info() 1.66 + except psutil.AccessDenied: 1.67 + # expected on Windows Vista and Windows 7 1.68 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): 1.69 + raise 1.70 + else: 1.71 + self.assertTrue(rss > 0) 1.72 + 1.73 + def test_signal(self): 1.74 + p = psutil.Process(self.pid) 1.75 + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) 1.76 + 1.77 + def test_nic_names(self): 1.78 + p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) 1.79 + out = p.communicate()[0] 1.80 + if PY3: 1.81 + out = str(out, sys.stdout.encoding) 1.82 + nics = psutil.net_io_counters(pernic=True).keys() 1.83 + for nic in nics: 1.84 + if "pseudo-interface" in nic.replace(' ', '-').lower(): 1.85 + continue 1.86 + if nic not in out: 1.87 + self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic) 1.88 + 1.89 + def test_exe(self): 1.90 + for p in psutil.process_iter(): 1.91 + try: 1.92 + self.assertEqual(os.path.basename(p.exe), p.name) 1.93 + except psutil.Error: 1.94 + pass 1.95 + 1.96 + if wmi is not None: 1.97 + 1.98 + # --- Process class tests 1.99 + 1.100 + def test_process_name(self): 1.101 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.102 + p = psutil.Process(self.pid) 1.103 + self.assertEqual(p.name, w.Caption) 1.104 + 1.105 + def test_process_exe(self): 1.106 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.107 + p = psutil.Process(self.pid) 1.108 + self.assertEqual(p.exe, w.ExecutablePath) 1.109 + 1.110 + def test_process_cmdline(self): 1.111 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.112 + p = psutil.Process(self.pid) 1.113 + self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', '')) 1.114 + 1.115 + def test_process_username(self): 1.116 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.117 + p = psutil.Process(self.pid) 1.118 + domain, _, username = w.GetOwner() 1.119 + username = "%s\\%s" %(domain, username) 1.120 + self.assertEqual(p.username, username) 1.121 + 1.122 + def test_process_rss_memory(self): 1.123 + time.sleep(0.1) 1.124 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.125 + p = psutil.Process(self.pid) 1.126 + rss = p.get_memory_info().rss 1.127 + self.assertEqual(rss, int(w.WorkingSetSize)) 1.128 + 1.129 + def test_process_vms_memory(self): 1.130 + time.sleep(0.1) 1.131 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.132 + p = psutil.Process(self.pid) 1.133 + vms = p.get_memory_info().vms 1.134 + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx 1.135 + # ...claims that PageFileUsage is represented in Kilo 1.136 + # bytes but funnily enough on certain platforms bytes are 1.137 + # returned instead. 1.138 + wmi_usage = int(w.PageFileUsage) 1.139 + if (vms != wmi_usage) and (vms != wmi_usage * 1024): 1.140 + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) 1.141 + 1.142 + def test_process_create_time(self): 1.143 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.144 + p = psutil.Process(self.pid) 1.145 + wmic_create = str(w.CreationDate.split('.')[0]) 1.146 + psutil_create = time.strftime("%Y%m%d%H%M%S", 1.147 + time.localtime(p.create_time)) 1.148 + self.assertEqual(wmic_create, psutil_create) 1.149 + 1.150 + 1.151 + # --- psutil namespace functions and constants tests 1.152 + 1.153 + @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'), 1.154 + 'NUMBER_OF_PROCESSORS env var is not available') 1.155 + def test_NUM_CPUS(self): 1.156 + num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) 1.157 + self.assertEqual(num_cpus, psutil.NUM_CPUS) 1.158 + 1.159 + def test_TOTAL_PHYMEM(self): 1.160 + w = wmi.WMI().Win32_ComputerSystem()[0] 1.161 + self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) 1.162 + 1.163 + def test__UPTIME(self): 1.164 + # _UPTIME constant is not public but it is used internally 1.165 + # as value to return for pid 0 creation time. 1.166 + # WMI behaves the same. 1.167 + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] 1.168 + p = psutil.Process(0) 1.169 + wmic_create = str(w.CreationDate.split('.')[0]) 1.170 + psutil_create = time.strftime("%Y%m%d%H%M%S", 1.171 + time.localtime(p.create_time)) 1.172 + # XXX - ? no actual test here 1.173 + 1.174 + def test_get_pids(self): 1.175 + # Note: this test might fail if the OS is starting/killing 1.176 + # other processes in the meantime 1.177 + w = wmi.WMI().Win32_Process() 1.178 + wmi_pids = [x.ProcessId for x in w] 1.179 + wmi_pids.sort() 1.180 + psutil_pids = psutil.get_pid_list() 1.181 + psutil_pids.sort() 1.182 + if wmi_pids != psutil_pids: 1.183 + difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ 1.184 + filter(lambda x:x not in psutil_pids, wmi_pids) 1.185 + self.fail("difference: " + str(difference)) 1.186 + 1.187 + def test_disks(self): 1.188 + ps_parts = psutil.disk_partitions(all=True) 1.189 + wmi_parts = wmi.WMI().Win32_LogicalDisk() 1.190 + for ps_part in ps_parts: 1.191 + for wmi_part in wmi_parts: 1.192 + if ps_part.device.replace('\\', '') == wmi_part.DeviceID: 1.193 + if not ps_part.mountpoint: 1.194 + # this is usually a CD-ROM with no disk inserted 1.195 + break 1.196 + try: 1.197 + usage = psutil.disk_usage(ps_part.mountpoint) 1.198 + except OSError: 1.199 + err = sys.exc_info()[1] 1.200 + if err.errno == errno.ENOENT: 1.201 + # usually this is the floppy 1.202 + break 1.203 + else: 1.204 + raise 1.205 + self.assertEqual(usage.total, int(wmi_part.Size)) 1.206 + wmi_free = int(wmi_part.FreeSpace) 1.207 + self.assertEqual(usage.free, wmi_free) 1.208 + # 10 MB tollerance 1.209 + if abs(usage.free - wmi_free) > 10 * 1024 * 1024: 1.210 + self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free) 1.211 + break 1.212 + else: 1.213 + self.fail("can't find partition %s" % repr(ps_part)) 1.214 + 1.215 + if win32api is not None: 1.216 + 1.217 + def test_get_num_handles(self): 1.218 + p = psutil.Process(os.getpid()) 1.219 + before = p.get_num_handles() 1.220 + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, 1.221 + win32con.FALSE, os.getpid()) 1.222 + after = p.get_num_handles() 1.223 + self.assertEqual(after, before+1) 1.224 + win32api.CloseHandle(handle) 1.225 + self.assertEqual(p.get_num_handles(), before) 1.226 + 1.227 + def test_get_num_handles_2(self): 1.228 + # Note: this fails from time to time; I'm keen on thinking 1.229 + # it doesn't mean something is broken 1.230 + def call(p, attr): 1.231 + attr = getattr(p, name, None) 1.232 + if attr is not None and callable(attr): 1.233 + ret = attr() 1.234 + else: 1.235 + ret = attr 1.236 + 1.237 + p = psutil.Process(self.pid) 1.238 + attrs = [] 1.239 + failures = [] 1.240 + for name in dir(psutil.Process): 1.241 + if name.startswith('_') \ 1.242 + or name.startswith('set_') \ 1.243 + or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', 1.244 + 'send_signal', 'wait', 'get_children', 'as_dict'): 1.245 + continue 1.246 + else: 1.247 + try: 1.248 + call(p, name) 1.249 + num1 = p.get_num_handles() 1.250 + call(p, name) 1.251 + num2 = p.get_num_handles() 1.252 + except (psutil.NoSuchProcess, psutil.AccessDenied): 1.253 + pass 1.254 + else: 1.255 + if num2 > num1: 1.256 + fail = "failure while processing Process.%s method " \ 1.257 + "(before=%s, after=%s)" % (name, num1, num2) 1.258 + failures.append(fail) 1.259 + if failures: 1.260 + self.fail('\n' + '\n'.join(failures)) 1.261 + 1.262 + 1.263 +import _psutil_mswindows 1.264 +from psutil._psmswindows import ACCESS_DENIED_SET 1.265 + 1.266 +def wrap_exceptions(callable): 1.267 + def wrapper(self, *args, **kwargs): 1.268 + try: 1.269 + return callable(self, *args, **kwargs) 1.270 + except OSError: 1.271 + err = sys.exc_info()[1] 1.272 + if err.errno in ACCESS_DENIED_SET: 1.273 + raise psutil.AccessDenied(None, None) 1.274 + if err.errno == errno.ESRCH: 1.275 + raise psutil.NoSuchProcess(None, None) 1.276 + raise 1.277 + return wrapper 1.278 + 1.279 +class TestDualProcessImplementation(unittest.TestCase): 1.280 + fun_names = [ 1.281 + # function name tolerance 1.282 + ('get_process_cpu_times', 0.2), 1.283 + ('get_process_create_time', 0.5), 1.284 + ('get_process_num_handles', 1), # 1 because impl #1 opens a handle 1.285 + ('get_process_io_counters', 0), 1.286 + ('get_process_memory_info', 1024), # KB 1.287 + ] 1.288 + 1.289 + def test_compare_values(self): 1.290 + # Certain APIs on Windows have 2 internal implementations, one 1.291 + # based on documented Windows APIs, another one based 1.292 + # NtQuerySystemInformation() which gets called as fallback in 1.293 + # case the first fails because of limited permission error. 1.294 + # Here we test that the two methods return the exact same value, 1.295 + # see: 1.296 + # http://code.google.com/p/psutil/issues/detail?id=304 1.297 + def assert_ge_0(obj): 1.298 + if isinstance(obj, tuple): 1.299 + for value in obj: 1.300 + self.assertGreaterEqual(value, 0) 1.301 + elif isinstance(obj, (int, long, float)): 1.302 + self.assertGreaterEqual(obj, 0) 1.303 + else: 1.304 + assert 0 # case not handled which needs to be fixed 1.305 + 1.306 + def compare_with_tolerance(ret1, ret2, tolerance): 1.307 + if ret1 == ret2: 1.308 + return 1.309 + else: 1.310 + if isinstance(ret2, (int, long, float)): 1.311 + diff = abs(ret1 - ret2) 1.312 + self.assertLessEqual(diff, tolerance) 1.313 + elif isinstance(ret2, tuple): 1.314 + for a, b in zip(ret1, ret2): 1.315 + diff = abs(a - b) 1.316 + self.assertLessEqual(diff, tolerance) 1.317 + 1.318 + failures = [] 1.319 + for name, tolerance in self.fun_names: 1.320 + meth1 = wrap_exceptions(getattr(_psutil_mswindows, name)) 1.321 + meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2')) 1.322 + for p in psutil.process_iter(): 1.323 + if name == 'get_process_memory_info' and p.pid == os.getpid(): 1.324 + continue 1.325 + # 1.326 + try: 1.327 + ret1 = meth1(p.pid) 1.328 + except psutil.NoSuchProcess: 1.329 + continue 1.330 + except psutil.AccessDenied: 1.331 + ret1 = None 1.332 + # 1.333 + try: 1.334 + ret2 = meth2(p.pid) 1.335 + except psutil.NoSuchProcess: 1.336 + # this is supposed to fail only in case of zombie process 1.337 + # never for permission error 1.338 + continue 1.339 + 1.340 + # compare values 1.341 + try: 1.342 + if ret1 is None: 1.343 + assert_ge_0(ret2) 1.344 + else: 1.345 + compare_with_tolerance(ret1, ret2, tolerance) 1.346 + assert_ge_0(ret1) 1.347 + assert_ge_0(ret2) 1.348 + except AssertionError: 1.349 + err = sys.exc_info()[1] 1.350 + trace = traceback.format_exc() 1.351 + msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \ 1.352 + % (trace, p.pid, name, ret1, ret2) 1.353 + failures.append(msg) 1.354 + break 1.355 + if failures: 1.356 + self.fail('\n\n'.join(failures)) 1.357 + 1.358 + def test_zombies(self): 1.359 + # test that NPS is raised by the 2nd implementation in case a 1.360 + # process no longer exists 1.361 + ZOMBIE_PID = max(psutil.get_pid_list()) + 5000 1.362 + for name, _ in self.fun_names: 1.363 + meth = wrap_exceptions(getattr(_psutil_mswindows, name)) 1.364 + self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) 1.365 + 1.366 + 1.367 +def test_main(): 1.368 + test_suite = unittest.TestSuite() 1.369 + test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) 1.370 + test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) 1.371 + result = unittest.TextTestRunner(verbosity=2).run(test_suite) 1.372 + return result.wasSuccessful() 1.373 + 1.374 +if __name__ == '__main__': 1.375 + if not test_main(): 1.376 + sys.exit(1)