michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """Windows specific tests. These are implicitly run by test_psutil.py.""" michael@0: michael@0: import os michael@0: import unittest michael@0: import platform michael@0: import signal michael@0: import time michael@0: import sys michael@0: import subprocess michael@0: import errno michael@0: import traceback michael@0: michael@0: import psutil michael@0: import _psutil_mswindows michael@0: from psutil._compat import PY3, callable, long michael@0: from test_psutil import * michael@0: michael@0: try: michael@0: import wmi michael@0: except ImportError: michael@0: err = sys.exc_info()[1] michael@0: register_warning("Couldn't run wmi tests: %s" % str(err)) michael@0: wmi = None michael@0: try: michael@0: import win32api michael@0: import win32con michael@0: except ImportError: michael@0: err = sys.exc_info()[1] michael@0: register_warning("Couldn't run pywin32 tests: %s" % str(err)) michael@0: win32api = None michael@0: michael@0: michael@0: class WindowsSpecificTestCase(unittest.TestCase): michael@0: michael@0: def setUp(self): michael@0: sproc = get_test_subprocess() michael@0: wait_for_pid(sproc.pid) michael@0: self.pid = sproc.pid michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_issue_24(self): michael@0: p = psutil.Process(0) michael@0: self.assertRaises(psutil.AccessDenied, p.kill) michael@0: michael@0: def test_special_pid(self): michael@0: p = psutil.Process(4) michael@0: self.assertEqual(p.name, 'System') michael@0: # use __str__ to access all common Process properties to check michael@0: # that nothing strange happens michael@0: str(p) michael@0: p.username michael@0: self.assertTrue(p.create_time >= 0.0) michael@0: try: michael@0: rss, vms = p.get_memory_info() michael@0: except psutil.AccessDenied: michael@0: # expected on Windows Vista and Windows 7 michael@0: if not platform.uname()[1] in ('vista', 'win-7', 'win7'): michael@0: raise michael@0: else: michael@0: self.assertTrue(rss > 0) michael@0: michael@0: def test_signal(self): michael@0: p = psutil.Process(self.pid) michael@0: self.assertRaises(ValueError, p.send_signal, signal.SIGINT) michael@0: michael@0: def test_nic_names(self): michael@0: p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) michael@0: out = p.communicate()[0] michael@0: if PY3: michael@0: out = str(out, sys.stdout.encoding) michael@0: nics = psutil.net_io_counters(pernic=True).keys() michael@0: for nic in nics: michael@0: if "pseudo-interface" in nic.replace(' ', '-').lower(): michael@0: continue michael@0: if nic not in out: michael@0: self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic) michael@0: michael@0: def test_exe(self): michael@0: for p in psutil.process_iter(): michael@0: try: michael@0: self.assertEqual(os.path.basename(p.exe), p.name) michael@0: except psutil.Error: michael@0: pass michael@0: michael@0: if wmi is not None: michael@0: michael@0: # --- Process class tests michael@0: michael@0: def test_process_name(self): michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: self.assertEqual(p.name, w.Caption) michael@0: michael@0: def test_process_exe(self): michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: self.assertEqual(p.exe, w.ExecutablePath) michael@0: michael@0: def test_process_cmdline(self): michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', '')) michael@0: michael@0: def test_process_username(self): michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: domain, _, username = w.GetOwner() michael@0: username = "%s\\%s" %(domain, username) michael@0: self.assertEqual(p.username, username) michael@0: michael@0: def test_process_rss_memory(self): michael@0: time.sleep(0.1) michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: rss = p.get_memory_info().rss michael@0: self.assertEqual(rss, int(w.WorkingSetSize)) michael@0: michael@0: def test_process_vms_memory(self): michael@0: time.sleep(0.1) michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: vms = p.get_memory_info().vms michael@0: # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx michael@0: # ...claims that PageFileUsage is represented in Kilo michael@0: # bytes but funnily enough on certain platforms bytes are michael@0: # returned instead. michael@0: wmi_usage = int(w.PageFileUsage) michael@0: if (vms != wmi_usage) and (vms != wmi_usage * 1024): michael@0: self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) michael@0: michael@0: def test_process_create_time(self): michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(self.pid) michael@0: wmic_create = str(w.CreationDate.split('.')[0]) michael@0: psutil_create = time.strftime("%Y%m%d%H%M%S", michael@0: time.localtime(p.create_time)) michael@0: self.assertEqual(wmic_create, psutil_create) michael@0: michael@0: michael@0: # --- psutil namespace functions and constants tests michael@0: michael@0: @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'), michael@0: 'NUMBER_OF_PROCESSORS env var is not available') michael@0: def test_NUM_CPUS(self): michael@0: num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) michael@0: self.assertEqual(num_cpus, psutil.NUM_CPUS) michael@0: michael@0: def test_TOTAL_PHYMEM(self): michael@0: w = wmi.WMI().Win32_ComputerSystem()[0] michael@0: self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) michael@0: michael@0: def test__UPTIME(self): michael@0: # _UPTIME constant is not public but it is used internally michael@0: # as value to return for pid 0 creation time. michael@0: # WMI behaves the same. michael@0: w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] michael@0: p = psutil.Process(0) michael@0: wmic_create = str(w.CreationDate.split('.')[0]) michael@0: psutil_create = time.strftime("%Y%m%d%H%M%S", michael@0: time.localtime(p.create_time)) michael@0: # XXX - ? no actual test here michael@0: michael@0: def test_get_pids(self): michael@0: # Note: this test might fail if the OS is starting/killing michael@0: # other processes in the meantime michael@0: w = wmi.WMI().Win32_Process() michael@0: wmi_pids = [x.ProcessId for x in w] michael@0: wmi_pids.sort() michael@0: psutil_pids = psutil.get_pid_list() michael@0: psutil_pids.sort() michael@0: if wmi_pids != psutil_pids: michael@0: difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ michael@0: filter(lambda x:x not in psutil_pids, wmi_pids) michael@0: self.fail("difference: " + str(difference)) michael@0: michael@0: def test_disks(self): michael@0: ps_parts = psutil.disk_partitions(all=True) michael@0: wmi_parts = wmi.WMI().Win32_LogicalDisk() michael@0: for ps_part in ps_parts: michael@0: for wmi_part in wmi_parts: michael@0: if ps_part.device.replace('\\', '') == wmi_part.DeviceID: michael@0: if not ps_part.mountpoint: michael@0: # this is usually a CD-ROM with no disk inserted michael@0: break michael@0: try: michael@0: usage = psutil.disk_usage(ps_part.mountpoint) michael@0: except OSError: michael@0: err = sys.exc_info()[1] michael@0: if err.errno == errno.ENOENT: michael@0: # usually this is the floppy michael@0: break michael@0: else: michael@0: raise michael@0: self.assertEqual(usage.total, int(wmi_part.Size)) michael@0: wmi_free = int(wmi_part.FreeSpace) michael@0: self.assertEqual(usage.free, wmi_free) michael@0: # 10 MB tollerance michael@0: if abs(usage.free - wmi_free) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free) michael@0: break michael@0: else: michael@0: self.fail("can't find partition %s" % repr(ps_part)) michael@0: michael@0: if win32api is not None: michael@0: michael@0: def test_get_num_handles(self): michael@0: p = psutil.Process(os.getpid()) michael@0: before = p.get_num_handles() michael@0: handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, michael@0: win32con.FALSE, os.getpid()) michael@0: after = p.get_num_handles() michael@0: self.assertEqual(after, before+1) michael@0: win32api.CloseHandle(handle) michael@0: self.assertEqual(p.get_num_handles(), before) michael@0: michael@0: def test_get_num_handles_2(self): michael@0: # Note: this fails from time to time; I'm keen on thinking michael@0: # it doesn't mean something is broken michael@0: def call(p, attr): michael@0: attr = getattr(p, name, None) michael@0: if attr is not None and callable(attr): michael@0: ret = attr() michael@0: else: michael@0: ret = attr michael@0: michael@0: p = psutil.Process(self.pid) michael@0: attrs = [] michael@0: failures = [] michael@0: for name in dir(psutil.Process): michael@0: if name.startswith('_') \ michael@0: or name.startswith('set_') \ michael@0: or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', michael@0: 'send_signal', 'wait', 'get_children', 'as_dict'): michael@0: continue michael@0: else: michael@0: try: michael@0: call(p, name) michael@0: num1 = p.get_num_handles() michael@0: call(p, name) michael@0: num2 = p.get_num_handles() michael@0: except (psutil.NoSuchProcess, psutil.AccessDenied): michael@0: pass michael@0: else: michael@0: if num2 > num1: michael@0: fail = "failure while processing Process.%s method " \ michael@0: "(before=%s, after=%s)" % (name, num1, num2) michael@0: failures.append(fail) michael@0: if failures: michael@0: self.fail('\n' + '\n'.join(failures)) michael@0: michael@0: michael@0: import _psutil_mswindows michael@0: from psutil._psmswindows import ACCESS_DENIED_SET michael@0: michael@0: def wrap_exceptions(callable): michael@0: def wrapper(self, *args, **kwargs): michael@0: try: michael@0: return callable(self, *args, **kwargs) michael@0: except OSError: michael@0: err = sys.exc_info()[1] michael@0: if err.errno in ACCESS_DENIED_SET: michael@0: raise psutil.AccessDenied(None, None) michael@0: if err.errno == errno.ESRCH: michael@0: raise psutil.NoSuchProcess(None, None) michael@0: raise michael@0: return wrapper michael@0: michael@0: class TestDualProcessImplementation(unittest.TestCase): michael@0: fun_names = [ michael@0: # function name tolerance michael@0: ('get_process_cpu_times', 0.2), michael@0: ('get_process_create_time', 0.5), michael@0: ('get_process_num_handles', 1), # 1 because impl #1 opens a handle michael@0: ('get_process_io_counters', 0), michael@0: ('get_process_memory_info', 1024), # KB michael@0: ] michael@0: michael@0: def test_compare_values(self): michael@0: # Certain APIs on Windows have 2 internal implementations, one michael@0: # based on documented Windows APIs, another one based michael@0: # NtQuerySystemInformation() which gets called as fallback in michael@0: # case the first fails because of limited permission error. michael@0: # Here we test that the two methods return the exact same value, michael@0: # see: michael@0: # http://code.google.com/p/psutil/issues/detail?id=304 michael@0: def assert_ge_0(obj): michael@0: if isinstance(obj, tuple): michael@0: for value in obj: michael@0: self.assertGreaterEqual(value, 0) michael@0: elif isinstance(obj, (int, long, float)): michael@0: self.assertGreaterEqual(obj, 0) michael@0: else: michael@0: assert 0 # case not handled which needs to be fixed michael@0: michael@0: def compare_with_tolerance(ret1, ret2, tolerance): michael@0: if ret1 == ret2: michael@0: return michael@0: else: michael@0: if isinstance(ret2, (int, long, float)): michael@0: diff = abs(ret1 - ret2) michael@0: self.assertLessEqual(diff, tolerance) michael@0: elif isinstance(ret2, tuple): michael@0: for a, b in zip(ret1, ret2): michael@0: diff = abs(a - b) michael@0: self.assertLessEqual(diff, tolerance) michael@0: michael@0: failures = [] michael@0: for name, tolerance in self.fun_names: michael@0: meth1 = wrap_exceptions(getattr(_psutil_mswindows, name)) michael@0: meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2')) michael@0: for p in psutil.process_iter(): michael@0: if name == 'get_process_memory_info' and p.pid == os.getpid(): michael@0: continue michael@0: # michael@0: try: michael@0: ret1 = meth1(p.pid) michael@0: except psutil.NoSuchProcess: michael@0: continue michael@0: except psutil.AccessDenied: michael@0: ret1 = None michael@0: # michael@0: try: michael@0: ret2 = meth2(p.pid) michael@0: except psutil.NoSuchProcess: michael@0: # this is supposed to fail only in case of zombie process michael@0: # never for permission error michael@0: continue michael@0: michael@0: # compare values michael@0: try: michael@0: if ret1 is None: michael@0: assert_ge_0(ret2) michael@0: else: michael@0: compare_with_tolerance(ret1, ret2, tolerance) michael@0: assert_ge_0(ret1) michael@0: assert_ge_0(ret2) michael@0: except AssertionError: michael@0: err = sys.exc_info()[1] michael@0: trace = traceback.format_exc() michael@0: msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \ michael@0: % (trace, p.pid, name, ret1, ret2) michael@0: failures.append(msg) michael@0: break michael@0: if failures: michael@0: self.fail('\n\n'.join(failures)) michael@0: michael@0: def test_zombies(self): michael@0: # test that NPS is raised by the 2nd implementation in case a michael@0: # process no longer exists michael@0: ZOMBIE_PID = max(psutil.get_pid_list()) + 5000 michael@0: for name, _ in self.fun_names: michael@0: meth = wrap_exceptions(getattr(_psutil_mswindows, name)) michael@0: self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) michael@0: test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)