diff -r 000000000000 -r 6474c204b198 python/psutil/test/_windows.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/psutil/test/_windows.py Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,373 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Windows specific tests. These are implicitly run by test_psutil.py.""" + +import os +import unittest +import platform +import signal +import time +import sys +import subprocess +import errno +import traceback + +import psutil +import _psutil_mswindows +from psutil._compat import PY3, callable, long +from test_psutil import * + +try: + import wmi +except ImportError: + err = sys.exc_info()[1] + register_warning("Couldn't run wmi tests: %s" % str(err)) + wmi = None +try: + import win32api + import win32con +except ImportError: + err = sys.exc_info()[1] + register_warning("Couldn't run pywin32 tests: %s" % str(err)) + win32api = None + + +class WindowsSpecificTestCase(unittest.TestCase): + + def setUp(self): + sproc = get_test_subprocess() + wait_for_pid(sproc.pid) + self.pid = sproc.pid + + def tearDown(self): + reap_children() + + def test_issue_24(self): + p = psutil.Process(0) + self.assertRaises(psutil.AccessDenied, p.kill) + + def test_special_pid(self): + p = psutil.Process(4) + self.assertEqual(p.name, 'System') + # use __str__ to access all common Process properties to check + # that nothing strange happens + str(p) + p.username + self.assertTrue(p.create_time >= 0.0) + try: + rss, vms = p.get_memory_info() + except psutil.AccessDenied: + # expected on Windows Vista and Windows 7 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): + raise + else: + self.assertTrue(rss > 0) + + def test_signal(self): + p = psutil.Process(self.pid) + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) + + def test_nic_names(self): + p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) + out = p.communicate()[0] + if PY3: + out = str(out, sys.stdout.encoding) + nics = psutil.net_io_counters(pernic=True).keys() + for nic in nics: + if "pseudo-interface" in nic.replace(' ', '-').lower(): + continue + if nic not in out: + self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic) + + def test_exe(self): + for p in psutil.process_iter(): + try: + self.assertEqual(os.path.basename(p.exe), p.name) + except psutil.Error: + pass + + if wmi is not None: + + # --- Process class tests + + def test_process_name(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.name, w.Caption) + + def test_process_exe(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.exe, w.ExecutablePath) + + def test_process_cmdline(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', '')) + + def test_process_username(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + domain, _, username = w.GetOwner() + username = "%s\\%s" %(domain, username) + self.assertEqual(p.username, username) + + def test_process_rss_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + rss = p.get_memory_info().rss + self.assertEqual(rss, int(w.WorkingSetSize)) + + def test_process_vms_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + vms = p.get_memory_info().vms + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx + # ...claims that PageFileUsage is represented in Kilo + # bytes but funnily enough on certain platforms bytes are + # returned instead. + wmi_usage = int(w.PageFileUsage) + if (vms != wmi_usage) and (vms != wmi_usage * 1024): + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + + def test_process_create_time(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time)) + self.assertEqual(wmic_create, psutil_create) + + + # --- psutil namespace functions and constants tests + + @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'), + 'NUMBER_OF_PROCESSORS env var is not available') + def test_NUM_CPUS(self): + num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) + self.assertEqual(num_cpus, psutil.NUM_CPUS) + + def test_TOTAL_PHYMEM(self): + w = wmi.WMI().Win32_ComputerSystem()[0] + self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) + + def test__UPTIME(self): + # _UPTIME constant is not public but it is used internally + # as value to return for pid 0 creation time. + # WMI behaves the same. + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(0) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time)) + # XXX - ? no actual test here + + def test_get_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + w = wmi.WMI().Win32_Process() + wmi_pids = [x.ProcessId for x in w] + wmi_pids.sort() + psutil_pids = psutil.get_pid_list() + psutil_pids.sort() + if wmi_pids != psutil_pids: + difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ + filter(lambda x:x not in psutil_pids, wmi_pids) + self.fail("difference: " + str(difference)) + + def test_disks(self): + ps_parts = psutil.disk_partitions(all=True) + wmi_parts = wmi.WMI().Win32_LogicalDisk() + for ps_part in ps_parts: + for wmi_part in wmi_parts: + if ps_part.device.replace('\\', '') == wmi_part.DeviceID: + if not ps_part.mountpoint: + # this is usually a CD-ROM with no disk inserted + break + try: + usage = psutil.disk_usage(ps_part.mountpoint) + except OSError: + err = sys.exc_info()[1] + if err.errno == errno.ENOENT: + # usually this is the floppy + break + else: + raise + self.assertEqual(usage.total, int(wmi_part.Size)) + wmi_free = int(wmi_part.FreeSpace) + self.assertEqual(usage.free, wmi_free) + # 10 MB tollerance + if abs(usage.free - wmi_free) > 10 * 1024 * 1024: + self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free) + break + else: + self.fail("can't find partition %s" % repr(ps_part)) + + if win32api is not None: + + def test_get_num_handles(self): + p = psutil.Process(os.getpid()) + before = p.get_num_handles() + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + after = p.get_num_handles() + self.assertEqual(after, before+1) + win32api.CloseHandle(handle) + self.assertEqual(p.get_num_handles(), before) + + def test_get_num_handles_2(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + attr = getattr(p, name, None) + if attr is not None and callable(attr): + ret = attr() + else: + ret = attr + + p = psutil.Process(self.pid) + attrs = [] + failures = [] + for name in dir(psutil.Process): + if name.startswith('_') \ + or name.startswith('set_') \ + or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', + 'send_signal', 'wait', 'get_children', 'as_dict'): + continue + else: + try: + call(p, name) + num1 = p.get_num_handles() + call(p, name) + num2 = p.get_num_handles() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + if num2 > num1: + fail = "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + +import _psutil_mswindows +from psutil._psmswindows import ACCESS_DENIED_SET + +def wrap_exceptions(callable): + def wrapper(self, *args, **kwargs): + try: + return callable(self, *args, **kwargs) + except OSError: + err = sys.exc_info()[1] + if err.errno in ACCESS_DENIED_SET: + raise psutil.AccessDenied(None, None) + if err.errno == errno.ESRCH: + raise psutil.NoSuchProcess(None, None) + raise + return wrapper + +class TestDualProcessImplementation(unittest.TestCase): + fun_names = [ + # function name tolerance + ('get_process_cpu_times', 0.2), + ('get_process_create_time', 0.5), + ('get_process_num_handles', 1), # 1 because impl #1 opens a handle + ('get_process_io_counters', 0), + ('get_process_memory_info', 1024), # KB + ] + + def test_compare_values(self): + # Certain APIs on Windows have 2 internal implementations, one + # based on documented Windows APIs, another one based + # NtQuerySystemInformation() which gets called as fallback in + # case the first fails because of limited permission error. + # Here we test that the two methods return the exact same value, + # see: + # http://code.google.com/p/psutil/issues/detail?id=304 + def assert_ge_0(obj): + if isinstance(obj, tuple): + for value in obj: + self.assertGreaterEqual(value, 0) + elif isinstance(obj, (int, long, float)): + self.assertGreaterEqual(obj, 0) + else: + assert 0 # case not handled which needs to be fixed + + def compare_with_tolerance(ret1, ret2, tolerance): + if ret1 == ret2: + return + else: + if isinstance(ret2, (int, long, float)): + diff = abs(ret1 - ret2) + self.assertLessEqual(diff, tolerance) + elif isinstance(ret2, tuple): + for a, b in zip(ret1, ret2): + diff = abs(a - b) + self.assertLessEqual(diff, tolerance) + + failures = [] + for name, tolerance in self.fun_names: + meth1 = wrap_exceptions(getattr(_psutil_mswindows, name)) + meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2')) + for p in psutil.process_iter(): + if name == 'get_process_memory_info' and p.pid == os.getpid(): + continue + # + try: + ret1 = meth1(p.pid) + except psutil.NoSuchProcess: + continue + except psutil.AccessDenied: + ret1 = None + # + try: + ret2 = meth2(p.pid) + except psutil.NoSuchProcess: + # this is supposed to fail only in case of zombie process + # never for permission error + continue + + # compare values + try: + if ret1 is None: + assert_ge_0(ret2) + else: + compare_with_tolerance(ret1, ret2, tolerance) + assert_ge_0(ret1) + assert_ge_0(ret2) + except AssertionError: + err = sys.exc_info()[1] + trace = traceback.format_exc() + msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \ + % (trace, p.pid, name, ret1, ret2) + failures.append(msg) + break + if failures: + self.fail('\n\n'.join(failures)) + + def test_zombies(self): + # test that NPS is raised by the 2nd implementation in case a + # process no longer exists + ZOMBIE_PID = max(psutil.get_pid_list()) + 5000 + for name, _ in self.fun_names: + meth = wrap_exceptions(getattr(_psutil_mswindows, name)) + self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) + + +def test_main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) + test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not test_main(): + sys.exit(1)