python/psutil/test/_windows.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 #!/usr/bin/env python
michael@0 2
michael@0 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
michael@0 4 # Use of this source code is governed by a BSD-style license that can be
michael@0 5 # found in the LICENSE file.
michael@0 6
michael@0 7 """Windows specific tests. These are implicitly run by test_psutil.py."""
michael@0 8
michael@0 9 import os
michael@0 10 import unittest
michael@0 11 import platform
michael@0 12 import signal
michael@0 13 import time
michael@0 14 import sys
michael@0 15 import subprocess
michael@0 16 import errno
michael@0 17 import traceback
michael@0 18
michael@0 19 import psutil
michael@0 20 import _psutil_mswindows
michael@0 21 from psutil._compat import PY3, callable, long
michael@0 22 from test_psutil import *
michael@0 23
michael@0 24 try:
michael@0 25 import wmi
michael@0 26 except ImportError:
michael@0 27 err = sys.exc_info()[1]
michael@0 28 register_warning("Couldn't run wmi tests: %s" % str(err))
michael@0 29 wmi = None
michael@0 30 try:
michael@0 31 import win32api
michael@0 32 import win32con
michael@0 33 except ImportError:
michael@0 34 err = sys.exc_info()[1]
michael@0 35 register_warning("Couldn't run pywin32 tests: %s" % str(err))
michael@0 36 win32api = None
michael@0 37
michael@0 38
michael@0 39 class WindowsSpecificTestCase(unittest.TestCase):
michael@0 40
michael@0 41 def setUp(self):
michael@0 42 sproc = get_test_subprocess()
michael@0 43 wait_for_pid(sproc.pid)
michael@0 44 self.pid = sproc.pid
michael@0 45
michael@0 46 def tearDown(self):
michael@0 47 reap_children()
michael@0 48
michael@0 49 def test_issue_24(self):
michael@0 50 p = psutil.Process(0)
michael@0 51 self.assertRaises(psutil.AccessDenied, p.kill)
michael@0 52
michael@0 53 def test_special_pid(self):
michael@0 54 p = psutil.Process(4)
michael@0 55 self.assertEqual(p.name, 'System')
michael@0 56 # use __str__ to access all common Process properties to check
michael@0 57 # that nothing strange happens
michael@0 58 str(p)
michael@0 59 p.username
michael@0 60 self.assertTrue(p.create_time >= 0.0)
michael@0 61 try:
michael@0 62 rss, vms = p.get_memory_info()
michael@0 63 except psutil.AccessDenied:
michael@0 64 # expected on Windows Vista and Windows 7
michael@0 65 if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
michael@0 66 raise
michael@0 67 else:
michael@0 68 self.assertTrue(rss > 0)
michael@0 69
michael@0 70 def test_signal(self):
michael@0 71 p = psutil.Process(self.pid)
michael@0 72 self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
michael@0 73
michael@0 74 def test_nic_names(self):
michael@0 75 p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
michael@0 76 out = p.communicate()[0]
michael@0 77 if PY3:
michael@0 78 out = str(out, sys.stdout.encoding)
michael@0 79 nics = psutil.net_io_counters(pernic=True).keys()
michael@0 80 for nic in nics:
michael@0 81 if "pseudo-interface" in nic.replace(' ', '-').lower():
michael@0 82 continue
michael@0 83 if nic not in out:
michael@0 84 self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)
michael@0 85
michael@0 86 def test_exe(self):
michael@0 87 for p in psutil.process_iter():
michael@0 88 try:
michael@0 89 self.assertEqual(os.path.basename(p.exe), p.name)
michael@0 90 except psutil.Error:
michael@0 91 pass
michael@0 92
michael@0 93 if wmi is not None:
michael@0 94
michael@0 95 # --- Process class tests
michael@0 96
michael@0 97 def test_process_name(self):
michael@0 98 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 99 p = psutil.Process(self.pid)
michael@0 100 self.assertEqual(p.name, w.Caption)
michael@0 101
michael@0 102 def test_process_exe(self):
michael@0 103 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 104 p = psutil.Process(self.pid)
michael@0 105 self.assertEqual(p.exe, w.ExecutablePath)
michael@0 106
michael@0 107 def test_process_cmdline(self):
michael@0 108 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 109 p = psutil.Process(self.pid)
michael@0 110 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))
michael@0 111
michael@0 112 def test_process_username(self):
michael@0 113 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 114 p = psutil.Process(self.pid)
michael@0 115 domain, _, username = w.GetOwner()
michael@0 116 username = "%s\\%s" %(domain, username)
michael@0 117 self.assertEqual(p.username, username)
michael@0 118
michael@0 119 def test_process_rss_memory(self):
michael@0 120 time.sleep(0.1)
michael@0 121 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 122 p = psutil.Process(self.pid)
michael@0 123 rss = p.get_memory_info().rss
michael@0 124 self.assertEqual(rss, int(w.WorkingSetSize))
michael@0 125
michael@0 126 def test_process_vms_memory(self):
michael@0 127 time.sleep(0.1)
michael@0 128 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 129 p = psutil.Process(self.pid)
michael@0 130 vms = p.get_memory_info().vms
michael@0 131 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
michael@0 132 # ...claims that PageFileUsage is represented in Kilo
michael@0 133 # bytes but funnily enough on certain platforms bytes are
michael@0 134 # returned instead.
michael@0 135 wmi_usage = int(w.PageFileUsage)
michael@0 136 if (vms != wmi_usage) and (vms != wmi_usage * 1024):
michael@0 137 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
michael@0 138
michael@0 139 def test_process_create_time(self):
michael@0 140 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 141 p = psutil.Process(self.pid)
michael@0 142 wmic_create = str(w.CreationDate.split('.')[0])
michael@0 143 psutil_create = time.strftime("%Y%m%d%H%M%S",
michael@0 144 time.localtime(p.create_time))
michael@0 145 self.assertEqual(wmic_create, psutil_create)
michael@0 146
michael@0 147
michael@0 148 # --- psutil namespace functions and constants tests
michael@0 149
michael@0 150 @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
michael@0 151 'NUMBER_OF_PROCESSORS env var is not available')
michael@0 152 def test_NUM_CPUS(self):
michael@0 153 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
michael@0 154 self.assertEqual(num_cpus, psutil.NUM_CPUS)
michael@0 155
michael@0 156 def test_TOTAL_PHYMEM(self):
michael@0 157 w = wmi.WMI().Win32_ComputerSystem()[0]
michael@0 158 self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)
michael@0 159
michael@0 160 def test__UPTIME(self):
michael@0 161 # _UPTIME constant is not public but it is used internally
michael@0 162 # as value to return for pid 0 creation time.
michael@0 163 # WMI behaves the same.
michael@0 164 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
michael@0 165 p = psutil.Process(0)
michael@0 166 wmic_create = str(w.CreationDate.split('.')[0])
michael@0 167 psutil_create = time.strftime("%Y%m%d%H%M%S",
michael@0 168 time.localtime(p.create_time))
michael@0 169 # XXX - ? no actual test here
michael@0 170
michael@0 171 def test_get_pids(self):
michael@0 172 # Note: this test might fail if the OS is starting/killing
michael@0 173 # other processes in the meantime
michael@0 174 w = wmi.WMI().Win32_Process()
michael@0 175 wmi_pids = [x.ProcessId for x in w]
michael@0 176 wmi_pids.sort()
michael@0 177 psutil_pids = psutil.get_pid_list()
michael@0 178 psutil_pids.sort()
michael@0 179 if wmi_pids != psutil_pids:
michael@0 180 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
michael@0 181 filter(lambda x:x not in psutil_pids, wmi_pids)
michael@0 182 self.fail("difference: " + str(difference))
michael@0 183
michael@0 184 def test_disks(self):
michael@0 185 ps_parts = psutil.disk_partitions(all=True)
michael@0 186 wmi_parts = wmi.WMI().Win32_LogicalDisk()
michael@0 187 for ps_part in ps_parts:
michael@0 188 for wmi_part in wmi_parts:
michael@0 189 if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
michael@0 190 if not ps_part.mountpoint:
michael@0 191 # this is usually a CD-ROM with no disk inserted
michael@0 192 break
michael@0 193 try:
michael@0 194 usage = psutil.disk_usage(ps_part.mountpoint)
michael@0 195 except OSError:
michael@0 196 err = sys.exc_info()[1]
michael@0 197 if err.errno == errno.ENOENT:
michael@0 198 # usually this is the floppy
michael@0 199 break
michael@0 200 else:
michael@0 201 raise
michael@0 202 self.assertEqual(usage.total, int(wmi_part.Size))
michael@0 203 wmi_free = int(wmi_part.FreeSpace)
michael@0 204 self.assertEqual(usage.free, wmi_free)
michael@0 205 # 10 MB tollerance
michael@0 206 if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
michael@0 207 self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
michael@0 208 break
michael@0 209 else:
michael@0 210 self.fail("can't find partition %s" % repr(ps_part))
michael@0 211
michael@0 212 if win32api is not None:
michael@0 213
michael@0 214 def test_get_num_handles(self):
michael@0 215 p = psutil.Process(os.getpid())
michael@0 216 before = p.get_num_handles()
michael@0 217 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
michael@0 218 win32con.FALSE, os.getpid())
michael@0 219 after = p.get_num_handles()
michael@0 220 self.assertEqual(after, before+1)
michael@0 221 win32api.CloseHandle(handle)
michael@0 222 self.assertEqual(p.get_num_handles(), before)
michael@0 223
michael@0 224 def test_get_num_handles_2(self):
michael@0 225 # Note: this fails from time to time; I'm keen on thinking
michael@0 226 # it doesn't mean something is broken
michael@0 227 def call(p, attr):
michael@0 228 attr = getattr(p, name, None)
michael@0 229 if attr is not None and callable(attr):
michael@0 230 ret = attr()
michael@0 231 else:
michael@0 232 ret = attr
michael@0 233
michael@0 234 p = psutil.Process(self.pid)
michael@0 235 attrs = []
michael@0 236 failures = []
michael@0 237 for name in dir(psutil.Process):
michael@0 238 if name.startswith('_') \
michael@0 239 or name.startswith('set_') \
michael@0 240 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
michael@0 241 'send_signal', 'wait', 'get_children', 'as_dict'):
michael@0 242 continue
michael@0 243 else:
michael@0 244 try:
michael@0 245 call(p, name)
michael@0 246 num1 = p.get_num_handles()
michael@0 247 call(p, name)
michael@0 248 num2 = p.get_num_handles()
michael@0 249 except (psutil.NoSuchProcess, psutil.AccessDenied):
michael@0 250 pass
michael@0 251 else:
michael@0 252 if num2 > num1:
michael@0 253 fail = "failure while processing Process.%s method " \
michael@0 254 "(before=%s, after=%s)" % (name, num1, num2)
michael@0 255 failures.append(fail)
michael@0 256 if failures:
michael@0 257 self.fail('\n' + '\n'.join(failures))
michael@0 258
michael@0 259
michael@0 260 import _psutil_mswindows
michael@0 261 from psutil._psmswindows import ACCESS_DENIED_SET
michael@0 262
michael@0 263 def wrap_exceptions(callable):
michael@0 264 def wrapper(self, *args, **kwargs):
michael@0 265 try:
michael@0 266 return callable(self, *args, **kwargs)
michael@0 267 except OSError:
michael@0 268 err = sys.exc_info()[1]
michael@0 269 if err.errno in ACCESS_DENIED_SET:
michael@0 270 raise psutil.AccessDenied(None, None)
michael@0 271 if err.errno == errno.ESRCH:
michael@0 272 raise psutil.NoSuchProcess(None, None)
michael@0 273 raise
michael@0 274 return wrapper
michael@0 275
michael@0 276 class TestDualProcessImplementation(unittest.TestCase):
michael@0 277 fun_names = [
michael@0 278 # function name tolerance
michael@0 279 ('get_process_cpu_times', 0.2),
michael@0 280 ('get_process_create_time', 0.5),
michael@0 281 ('get_process_num_handles', 1), # 1 because impl #1 opens a handle
michael@0 282 ('get_process_io_counters', 0),
michael@0 283 ('get_process_memory_info', 1024), # KB
michael@0 284 ]
michael@0 285
michael@0 286 def test_compare_values(self):
michael@0 287 # Certain APIs on Windows have 2 internal implementations, one
michael@0 288 # based on documented Windows APIs, another one based
michael@0 289 # NtQuerySystemInformation() which gets called as fallback in
michael@0 290 # case the first fails because of limited permission error.
michael@0 291 # Here we test that the two methods return the exact same value,
michael@0 292 # see:
michael@0 293 # http://code.google.com/p/psutil/issues/detail?id=304
michael@0 294 def assert_ge_0(obj):
michael@0 295 if isinstance(obj, tuple):
michael@0 296 for value in obj:
michael@0 297 self.assertGreaterEqual(value, 0)
michael@0 298 elif isinstance(obj, (int, long, float)):
michael@0 299 self.assertGreaterEqual(obj, 0)
michael@0 300 else:
michael@0 301 assert 0 # case not handled which needs to be fixed
michael@0 302
michael@0 303 def compare_with_tolerance(ret1, ret2, tolerance):
michael@0 304 if ret1 == ret2:
michael@0 305 return
michael@0 306 else:
michael@0 307 if isinstance(ret2, (int, long, float)):
michael@0 308 diff = abs(ret1 - ret2)
michael@0 309 self.assertLessEqual(diff, tolerance)
michael@0 310 elif isinstance(ret2, tuple):
michael@0 311 for a, b in zip(ret1, ret2):
michael@0 312 diff = abs(a - b)
michael@0 313 self.assertLessEqual(diff, tolerance)
michael@0 314
michael@0 315 failures = []
michael@0 316 for name, tolerance in self.fun_names:
michael@0 317 meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
michael@0 318 meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
michael@0 319 for p in psutil.process_iter():
michael@0 320 if name == 'get_process_memory_info' and p.pid == os.getpid():
michael@0 321 continue
michael@0 322 #
michael@0 323 try:
michael@0 324 ret1 = meth1(p.pid)
michael@0 325 except psutil.NoSuchProcess:
michael@0 326 continue
michael@0 327 except psutil.AccessDenied:
michael@0 328 ret1 = None
michael@0 329 #
michael@0 330 try:
michael@0 331 ret2 = meth2(p.pid)
michael@0 332 except psutil.NoSuchProcess:
michael@0 333 # this is supposed to fail only in case of zombie process
michael@0 334 # never for permission error
michael@0 335 continue
michael@0 336
michael@0 337 # compare values
michael@0 338 try:
michael@0 339 if ret1 is None:
michael@0 340 assert_ge_0(ret2)
michael@0 341 else:
michael@0 342 compare_with_tolerance(ret1, ret2, tolerance)
michael@0 343 assert_ge_0(ret1)
michael@0 344 assert_ge_0(ret2)
michael@0 345 except AssertionError:
michael@0 346 err = sys.exc_info()[1]
michael@0 347 trace = traceback.format_exc()
michael@0 348 msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
michael@0 349 % (trace, p.pid, name, ret1, ret2)
michael@0 350 failures.append(msg)
michael@0 351 break
michael@0 352 if failures:
michael@0 353 self.fail('\n\n'.join(failures))
michael@0 354
michael@0 355 def test_zombies(self):
michael@0 356 # test that NPS is raised by the 2nd implementation in case a
michael@0 357 # process no longer exists
michael@0 358 ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
michael@0 359 for name, _ in self.fun_names:
michael@0 360 meth = wrap_exceptions(getattr(_psutil_mswindows, name))
michael@0 361 self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
michael@0 362
michael@0 363
michael@0 364 def test_main():
michael@0 365 test_suite = unittest.TestSuite()
michael@0 366 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
michael@0 367 test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
michael@0 368 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
michael@0 369 return result.wasSuccessful()
michael@0 370
michael@0 371 if __name__ == '__main__':
michael@0 372 if not test_main():
michael@0 373 sys.exit(1)

mercurial