python/psutil/test/_windows.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

     1 #!/usr/bin/env python
     3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     4 # Use of this source code is governed by a BSD-style license that can be
     5 # found in the LICENSE file.
     7 """Windows specific tests.  These are implicitly run by test_psutil.py."""
     9 import os
    10 import unittest
    11 import platform
    12 import signal
    13 import time
    14 import sys
    15 import subprocess
    16 import errno
    17 import traceback
    19 import psutil
    20 import _psutil_mswindows
    21 from psutil._compat import PY3, callable, long
    22 from test_psutil import *
    24 try:
    25     import wmi
    26 except ImportError:
    27     err = sys.exc_info()[1]
    28     register_warning("Couldn't run wmi tests: %s" % str(err))
    29     wmi = None
    30 try:
    31     import win32api
    32     import win32con
    33 except ImportError:
    34     err = sys.exc_info()[1]
    35     register_warning("Couldn't run pywin32 tests: %s" % str(err))
    36     win32api = None
    39 class WindowsSpecificTestCase(unittest.TestCase):
    41     def setUp(self):
    42         sproc = get_test_subprocess()
    43         wait_for_pid(sproc.pid)
    44         self.pid = sproc.pid
    46     def tearDown(self):
    47         reap_children()
    49     def test_issue_24(self):
    50         p = psutil.Process(0)
    51         self.assertRaises(psutil.AccessDenied, p.kill)
    53     def test_special_pid(self):
    54         p = psutil.Process(4)
    55         self.assertEqual(p.name, 'System')
    56         # use __str__ to access all common Process properties to check
    57         # that nothing strange happens
    58         str(p)
    59         p.username
    60         self.assertTrue(p.create_time >= 0.0)
    61         try:
    62             rss, vms = p.get_memory_info()
    63         except psutil.AccessDenied:
    64             # expected on Windows Vista and Windows 7
    65             if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
    66                 raise
    67         else:
    68             self.assertTrue(rss > 0)
    70     def test_signal(self):
    71         p = psutil.Process(self.pid)
    72         self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
    74     def test_nic_names(self):
    75         p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
    76         out = p.communicate()[0]
    77         if PY3:
    78             out = str(out, sys.stdout.encoding)
    79         nics = psutil.net_io_counters(pernic=True).keys()
    80         for nic in nics:
    81             if "pseudo-interface" in nic.replace(' ', '-').lower():
    82                 continue
    83             if nic not in out:
    84                 self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)
    86     def test_exe(self):
    87         for p in psutil.process_iter():
    88             try:
    89                 self.assertEqual(os.path.basename(p.exe), p.name)
    90             except psutil.Error:
    91                 pass
    93     if wmi is not None:
    95         # --- Process class tests
    97         def test_process_name(self):
    98             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
    99             p = psutil.Process(self.pid)
   100             self.assertEqual(p.name, w.Caption)
   102         def test_process_exe(self):
   103             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   104             p = psutil.Process(self.pid)
   105             self.assertEqual(p.exe, w.ExecutablePath)
   107         def test_process_cmdline(self):
   108             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   109             p = psutil.Process(self.pid)
   110             self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))
   112         def test_process_username(self):
   113             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   114             p = psutil.Process(self.pid)
   115             domain, _, username = w.GetOwner()
   116             username = "%s\\%s" %(domain, username)
   117             self.assertEqual(p.username, username)
   119         def test_process_rss_memory(self):
   120             time.sleep(0.1)
   121             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   122             p = psutil.Process(self.pid)
   123             rss = p.get_memory_info().rss
   124             self.assertEqual(rss, int(w.WorkingSetSize))
   126         def test_process_vms_memory(self):
   127             time.sleep(0.1)
   128             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   129             p = psutil.Process(self.pid)
   130             vms = p.get_memory_info().vms
   131             # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
   132             # ...claims that PageFileUsage is represented in Kilo
   133             # bytes but funnily enough on certain platforms bytes are
   134             # returned instead.
   135             wmi_usage = int(w.PageFileUsage)
   136             if (vms != wmi_usage) and (vms != wmi_usage * 1024):
   137                 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
   139         def test_process_create_time(self):
   140             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   141             p = psutil.Process(self.pid)
   142             wmic_create = str(w.CreationDate.split('.')[0])
   143             psutil_create = time.strftime("%Y%m%d%H%M%S",
   144                                           time.localtime(p.create_time))
   145             self.assertEqual(wmic_create, psutil_create)
   148         # --- psutil namespace functions and constants tests
   150         @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
   151                              'NUMBER_OF_PROCESSORS env var is not available')
   152         def test_NUM_CPUS(self):
   153             num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
   154             self.assertEqual(num_cpus, psutil.NUM_CPUS)
   156         def test_TOTAL_PHYMEM(self):
   157             w = wmi.WMI().Win32_ComputerSystem()[0]
   158             self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)
   160         def test__UPTIME(self):
   161             # _UPTIME constant is not public but it is used internally
   162             # as value to return for pid 0 creation time.
   163             # WMI behaves the same.
   164             w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   165             p = psutil.Process(0)
   166             wmic_create = str(w.CreationDate.split('.')[0])
   167             psutil_create = time.strftime("%Y%m%d%H%M%S",
   168                                           time.localtime(p.create_time))
   169             # XXX - ? no actual test here
   171         def test_get_pids(self):
   172             # Note: this test might fail if the OS is starting/killing
   173             # other processes in the meantime
   174             w = wmi.WMI().Win32_Process()
   175             wmi_pids = [x.ProcessId for x in w]
   176             wmi_pids.sort()
   177             psutil_pids = psutil.get_pid_list()
   178             psutil_pids.sort()
   179             if wmi_pids != psutil_pids:
   180                 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
   181                              filter(lambda x:x not in psutil_pids, wmi_pids)
   182                 self.fail("difference: " + str(difference))
   184         def test_disks(self):
   185             ps_parts = psutil.disk_partitions(all=True)
   186             wmi_parts = wmi.WMI().Win32_LogicalDisk()
   187             for ps_part in ps_parts:
   188                 for wmi_part in wmi_parts:
   189                     if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
   190                         if not ps_part.mountpoint:
   191                             # this is usually a CD-ROM with no disk inserted
   192                             break
   193                         try:
   194                             usage = psutil.disk_usage(ps_part.mountpoint)
   195                         except OSError:
   196                             err = sys.exc_info()[1]
   197                             if err.errno == errno.ENOENT:
   198                                 # usually this is the floppy
   199                                 break
   200                             else:
   201                                 raise
   202                         self.assertEqual(usage.total, int(wmi_part.Size))
   203                         wmi_free = int(wmi_part.FreeSpace)
   204                         self.assertEqual(usage.free, wmi_free)
   205                         # 10 MB tollerance
   206                         if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
   207                             self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
   208                         break
   209                 else:
   210                     self.fail("can't find partition %s" % repr(ps_part))
   212     if win32api is not None:
   214         def test_get_num_handles(self):
   215             p = psutil.Process(os.getpid())
   216             before = p.get_num_handles()
   217             handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
   218                                           win32con.FALSE, os.getpid())
   219             after = p.get_num_handles()
   220             self.assertEqual(after, before+1)
   221             win32api.CloseHandle(handle)
   222             self.assertEqual(p.get_num_handles(), before)
   224         def test_get_num_handles_2(self):
   225             # Note: this fails from time to time; I'm keen on thinking
   226             # it doesn't mean something is broken
   227             def call(p, attr):
   228                 attr = getattr(p, name, None)
   229                 if attr is not None and callable(attr):
   230                     ret = attr()
   231                 else:
   232                     ret = attr
   234             p = psutil.Process(self.pid)
   235             attrs = []
   236             failures = []
   237             for name in dir(psutil.Process):
   238                 if name.startswith('_') \
   239                 or name.startswith('set_') \
   240                 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
   241                             'send_signal', 'wait', 'get_children', 'as_dict'):
   242                     continue
   243                 else:
   244                     try:
   245                         call(p, name)
   246                         num1 = p.get_num_handles()
   247                         call(p, name)
   248                         num2 = p.get_num_handles()
   249                     except (psutil.NoSuchProcess, psutil.AccessDenied):
   250                         pass
   251                     else:
   252                         if num2 > num1:
   253                             fail = "failure while processing Process.%s method " \
   254                                    "(before=%s, after=%s)" % (name, num1, num2)
   255                             failures.append(fail)
   256             if failures:
   257                 self.fail('\n' + '\n'.join(failures))
   260 import _psutil_mswindows
   261 from psutil._psmswindows import ACCESS_DENIED_SET
   263 def wrap_exceptions(callable):
   264     def wrapper(self, *args, **kwargs):
   265         try:
   266             return callable(self, *args, **kwargs)
   267         except OSError:
   268             err = sys.exc_info()[1]
   269             if err.errno in ACCESS_DENIED_SET:
   270                 raise psutil.AccessDenied(None, None)
   271             if err.errno == errno.ESRCH:
   272                 raise psutil.NoSuchProcess(None, None)
   273             raise
   274     return wrapper
   276 class TestDualProcessImplementation(unittest.TestCase):
   277     fun_names = [
   278         # function name                 tolerance
   279         ('get_process_cpu_times',       0.2),
   280         ('get_process_create_time',     0.5),
   281         ('get_process_num_handles',     1),  # 1 because impl #1 opens a handle
   282         ('get_process_io_counters',     0),
   283         ('get_process_memory_info',     1024),  # KB
   284     ]
   286     def test_compare_values(self):
   287         # Certain APIs on Windows have 2 internal implementations, one
   288         # based on documented Windows APIs, another one based
   289         # NtQuerySystemInformation() which gets called as fallback in
   290         # case the first fails because of limited permission error.
   291         # Here we test that the two methods return the exact same value,
   292         # see:
   293         # http://code.google.com/p/psutil/issues/detail?id=304
   294         def assert_ge_0(obj):
   295             if isinstance(obj, tuple):
   296                 for value in obj:
   297                     self.assertGreaterEqual(value, 0)
   298             elif isinstance(obj, (int, long, float)):
   299                 self.assertGreaterEqual(obj, 0)
   300             else:
   301                 assert 0  # case not handled which needs to be fixed
   303         def compare_with_tolerance(ret1, ret2, tolerance):
   304             if ret1 == ret2:
   305                 return
   306             else:
   307                 if isinstance(ret2, (int, long, float)):
   308                     diff = abs(ret1 - ret2)
   309                     self.assertLessEqual(diff, tolerance)
   310                 elif isinstance(ret2, tuple):
   311                     for a, b in zip(ret1, ret2):
   312                         diff = abs(a - b)
   313                         self.assertLessEqual(diff, tolerance)
   315         failures = []
   316         for name, tolerance in self.fun_names:
   317             meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
   318             meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
   319             for p in psutil.process_iter():
   320                 if name == 'get_process_memory_info' and p.pid == os.getpid():
   321                     continue
   322                 #
   323                 try:
   324                     ret1 = meth1(p.pid)
   325                 except psutil.NoSuchProcess:
   326                     continue
   327                 except psutil.AccessDenied:
   328                     ret1 = None
   329                 #
   330                 try:
   331                     ret2 = meth2(p.pid)
   332                 except psutil.NoSuchProcess:
   333                     # this is supposed to fail only in case of zombie process
   334                     # never for permission error
   335                     continue
   337                 # compare values
   338                 try:
   339                     if ret1 is None:
   340                         assert_ge_0(ret2)
   341                     else:
   342                         compare_with_tolerance(ret1, ret2, tolerance)
   343                         assert_ge_0(ret1)
   344                         assert_ge_0(ret2)
   345                 except AssertionError:
   346                     err = sys.exc_info()[1]
   347                     trace = traceback.format_exc()
   348                     msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
   349                            % (trace, p.pid, name, ret1, ret2)
   350                     failures.append(msg)
   351                     break
   352         if failures:
   353             self.fail('\n\n'.join(failures))
   355     def test_zombies(self):
   356         # test that NPS is raised by the 2nd implementation in case a
   357         # process no longer exists
   358         ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
   359         for name, _ in self.fun_names:
   360             meth = wrap_exceptions(getattr(_psutil_mswindows, name))
   361             self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
   364 def test_main():
   365     test_suite = unittest.TestSuite()
   366     test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
   367     test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
   368     result = unittest.TextTestRunner(verbosity=2).run(test_suite)
   369     return result.wasSuccessful()
   371 if __name__ == '__main__':
   372     if not test_main():
   373         sys.exit(1)

mercurial