python/psutil/test/_windows.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/python/psutil/test/_windows.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,373 @@
     1.4 +#!/usr/bin/env python
     1.5 +
     1.6 +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     1.7 +# Use of this source code is governed by a BSD-style license that can be
     1.8 +# found in the LICENSE file.
     1.9 +
    1.10 +"""Windows specific tests.  These are implicitly run by test_psutil.py."""
    1.11 +
    1.12 +import os
    1.13 +import unittest
    1.14 +import platform
    1.15 +import signal
    1.16 +import time
    1.17 +import sys
    1.18 +import subprocess
    1.19 +import errno
    1.20 +import traceback
    1.21 +
    1.22 +import psutil
    1.23 +import _psutil_mswindows
    1.24 +from psutil._compat import PY3, callable, long
    1.25 +from test_psutil import *
    1.26 +
    1.27 +try:
    1.28 +    import wmi
    1.29 +except ImportError:
    1.30 +    err = sys.exc_info()[1]
    1.31 +    register_warning("Couldn't run wmi tests: %s" % str(err))
    1.32 +    wmi = None
    1.33 +try:
    1.34 +    import win32api
    1.35 +    import win32con
    1.36 +except ImportError:
    1.37 +    err = sys.exc_info()[1]
    1.38 +    register_warning("Couldn't run pywin32 tests: %s" % str(err))
    1.39 +    win32api = None
    1.40 +
    1.41 +
    1.42 +class WindowsSpecificTestCase(unittest.TestCase):
    1.43 +
    1.44 +    def setUp(self):
    1.45 +        sproc = get_test_subprocess()
    1.46 +        wait_for_pid(sproc.pid)
    1.47 +        self.pid = sproc.pid
    1.48 +
    1.49 +    def tearDown(self):
    1.50 +        reap_children()
    1.51 +
    1.52 +    def test_issue_24(self):
    1.53 +        p = psutil.Process(0)
    1.54 +        self.assertRaises(psutil.AccessDenied, p.kill)
    1.55 +
    1.56 +    def test_special_pid(self):
    1.57 +        p = psutil.Process(4)
    1.58 +        self.assertEqual(p.name, 'System')
    1.59 +        # use __str__ to access all common Process properties to check
    1.60 +        # that nothing strange happens
    1.61 +        str(p)
    1.62 +        p.username
    1.63 +        self.assertTrue(p.create_time >= 0.0)
    1.64 +        try:
    1.65 +            rss, vms = p.get_memory_info()
    1.66 +        except psutil.AccessDenied:
    1.67 +            # expected on Windows Vista and Windows 7
    1.68 +            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
    1.69 +                raise
    1.70 +        else:
    1.71 +            self.assertTrue(rss > 0)
    1.72 +
    1.73 +    def test_signal(self):
    1.74 +        p = psutil.Process(self.pid)
    1.75 +        self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
    1.76 +
    1.77 +    def test_nic_names(self):
    1.78 +        p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
    1.79 +        out = p.communicate()[0]
    1.80 +        if PY3:
    1.81 +            out = str(out, sys.stdout.encoding)
    1.82 +        nics = psutil.net_io_counters(pernic=True).keys()
    1.83 +        for nic in nics:
    1.84 +            if "pseudo-interface" in nic.replace(' ', '-').lower():
    1.85 +                continue
    1.86 +            if nic not in out:
    1.87 +                self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)
    1.88 +
    1.89 +    def test_exe(self):
    1.90 +        for p in psutil.process_iter():
    1.91 +            try:
    1.92 +                self.assertEqual(os.path.basename(p.exe), p.name)
    1.93 +            except psutil.Error:
    1.94 +                pass
    1.95 +
    1.96 +    if wmi is not None:
    1.97 +
    1.98 +        # --- Process class tests
    1.99 +
   1.100 +        def test_process_name(self):
   1.101 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.102 +            p = psutil.Process(self.pid)
   1.103 +            self.assertEqual(p.name, w.Caption)
   1.104 +
   1.105 +        def test_process_exe(self):
   1.106 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.107 +            p = psutil.Process(self.pid)
   1.108 +            self.assertEqual(p.exe, w.ExecutablePath)
   1.109 +
   1.110 +        def test_process_cmdline(self):
   1.111 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.112 +            p = psutil.Process(self.pid)
   1.113 +            self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))
   1.114 +
   1.115 +        def test_process_username(self):
   1.116 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.117 +            p = psutil.Process(self.pid)
   1.118 +            domain, _, username = w.GetOwner()
   1.119 +            username = "%s\\%s" %(domain, username)
   1.120 +            self.assertEqual(p.username, username)
   1.121 +
   1.122 +        def test_process_rss_memory(self):
   1.123 +            time.sleep(0.1)
   1.124 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.125 +            p = psutil.Process(self.pid)
   1.126 +            rss = p.get_memory_info().rss
   1.127 +            self.assertEqual(rss, int(w.WorkingSetSize))
   1.128 +
   1.129 +        def test_process_vms_memory(self):
   1.130 +            time.sleep(0.1)
   1.131 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.132 +            p = psutil.Process(self.pid)
   1.133 +            vms = p.get_memory_info().vms
   1.134 +            # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
   1.135 +            # ...claims that PageFileUsage is represented in Kilo
   1.136 +            # bytes but funnily enough on certain platforms bytes are
   1.137 +            # returned instead.
   1.138 +            wmi_usage = int(w.PageFileUsage)
   1.139 +            if (vms != wmi_usage) and (vms != wmi_usage * 1024):
   1.140 +                self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
   1.141 +
   1.142 +        def test_process_create_time(self):
   1.143 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.144 +            p = psutil.Process(self.pid)
   1.145 +            wmic_create = str(w.CreationDate.split('.')[0])
   1.146 +            psutil_create = time.strftime("%Y%m%d%H%M%S",
   1.147 +                                          time.localtime(p.create_time))
   1.148 +            self.assertEqual(wmic_create, psutil_create)
   1.149 +
   1.150 +
   1.151 +        # --- psutil namespace functions and constants tests
   1.152 +
   1.153 +        @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
   1.154 +                             'NUMBER_OF_PROCESSORS env var is not available')
   1.155 +        def test_NUM_CPUS(self):
   1.156 +            num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
   1.157 +            self.assertEqual(num_cpus, psutil.NUM_CPUS)
   1.158 +
   1.159 +        def test_TOTAL_PHYMEM(self):
   1.160 +            w = wmi.WMI().Win32_ComputerSystem()[0]
   1.161 +            self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)
   1.162 +
   1.163 +        def test__UPTIME(self):
   1.164 +            # _UPTIME constant is not public but it is used internally
   1.165 +            # as value to return for pid 0 creation time.
   1.166 +            # WMI behaves the same.
   1.167 +            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
   1.168 +            p = psutil.Process(0)
   1.169 +            wmic_create = str(w.CreationDate.split('.')[0])
   1.170 +            psutil_create = time.strftime("%Y%m%d%H%M%S",
   1.171 +                                          time.localtime(p.create_time))
   1.172 +            # XXX - ? no actual test here
   1.173 +
   1.174 +        def test_get_pids(self):
   1.175 +            # Note: this test might fail if the OS is starting/killing
   1.176 +            # other processes in the meantime
   1.177 +            w = wmi.WMI().Win32_Process()
   1.178 +            wmi_pids = [x.ProcessId for x in w]
   1.179 +            wmi_pids.sort()
   1.180 +            psutil_pids = psutil.get_pid_list()
   1.181 +            psutil_pids.sort()
   1.182 +            if wmi_pids != psutil_pids:
   1.183 +                difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
   1.184 +                             filter(lambda x:x not in psutil_pids, wmi_pids)
   1.185 +                self.fail("difference: " + str(difference))
   1.186 +
   1.187 +        def test_disks(self):
   1.188 +            ps_parts = psutil.disk_partitions(all=True)
   1.189 +            wmi_parts = wmi.WMI().Win32_LogicalDisk()
   1.190 +            for ps_part in ps_parts:
   1.191 +                for wmi_part in wmi_parts:
   1.192 +                    if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
   1.193 +                        if not ps_part.mountpoint:
   1.194 +                            # this is usually a CD-ROM with no disk inserted
   1.195 +                            break
   1.196 +                        try:
   1.197 +                            usage = psutil.disk_usage(ps_part.mountpoint)
   1.198 +                        except OSError:
   1.199 +                            err = sys.exc_info()[1]
   1.200 +                            if err.errno == errno.ENOENT:
   1.201 +                                # usually this is the floppy
   1.202 +                                break
   1.203 +                            else:
   1.204 +                                raise
   1.205 +                        self.assertEqual(usage.total, int(wmi_part.Size))
   1.206 +                        wmi_free = int(wmi_part.FreeSpace)
   1.207 +                        self.assertEqual(usage.free, wmi_free)
   1.208 +                        # 10 MB tollerance
   1.209 +                        if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
   1.210 +                            self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
   1.211 +                        break
   1.212 +                else:
   1.213 +                    self.fail("can't find partition %s" % repr(ps_part))
   1.214 +
   1.215 +    if win32api is not None:
   1.216 +
   1.217 +        def test_get_num_handles(self):
   1.218 +            p = psutil.Process(os.getpid())
   1.219 +            before = p.get_num_handles()
   1.220 +            handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
   1.221 +                                          win32con.FALSE, os.getpid())
   1.222 +            after = p.get_num_handles()
   1.223 +            self.assertEqual(after, before+1)
   1.224 +            win32api.CloseHandle(handle)
   1.225 +            self.assertEqual(p.get_num_handles(), before)
   1.226 +
   1.227 +        def test_get_num_handles_2(self):
   1.228 +            # Note: this fails from time to time; I'm keen on thinking
   1.229 +            # it doesn't mean something is broken
   1.230 +            def call(p, attr):
   1.231 +                attr = getattr(p, name, None)
   1.232 +                if attr is not None and callable(attr):
   1.233 +                    ret = attr()
   1.234 +                else:
   1.235 +                    ret = attr
   1.236 +
   1.237 +            p = psutil.Process(self.pid)
   1.238 +            attrs = []
   1.239 +            failures = []
   1.240 +            for name in dir(psutil.Process):
   1.241 +                if name.startswith('_') \
   1.242 +                or name.startswith('set_') \
   1.243 +                or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
   1.244 +                            'send_signal', 'wait', 'get_children', 'as_dict'):
   1.245 +                    continue
   1.246 +                else:
   1.247 +                    try:
   1.248 +                        call(p, name)
   1.249 +                        num1 = p.get_num_handles()
   1.250 +                        call(p, name)
   1.251 +                        num2 = p.get_num_handles()
   1.252 +                    except (psutil.NoSuchProcess, psutil.AccessDenied):
   1.253 +                        pass
   1.254 +                    else:
   1.255 +                        if num2 > num1:
   1.256 +                            fail = "failure while processing Process.%s method " \
   1.257 +                                   "(before=%s, after=%s)" % (name, num1, num2)
   1.258 +                            failures.append(fail)
   1.259 +            if failures:
   1.260 +                self.fail('\n' + '\n'.join(failures))
   1.261 +
   1.262 +
   1.263 +import _psutil_mswindows
   1.264 +from psutil._psmswindows import ACCESS_DENIED_SET
   1.265 +
   1.266 +def wrap_exceptions(callable):
   1.267 +    def wrapper(self, *args, **kwargs):
   1.268 +        try:
   1.269 +            return callable(self, *args, **kwargs)
   1.270 +        except OSError:
   1.271 +            err = sys.exc_info()[1]
   1.272 +            if err.errno in ACCESS_DENIED_SET:
   1.273 +                raise psutil.AccessDenied(None, None)
   1.274 +            if err.errno == errno.ESRCH:
   1.275 +                raise psutil.NoSuchProcess(None, None)
   1.276 +            raise
   1.277 +    return wrapper
   1.278 +
   1.279 +class TestDualProcessImplementation(unittest.TestCase):
   1.280 +    fun_names = [
   1.281 +        # function name                 tolerance
   1.282 +        ('get_process_cpu_times',       0.2),
   1.283 +        ('get_process_create_time',     0.5),
   1.284 +        ('get_process_num_handles',     1),  # 1 because impl #1 opens a handle
   1.285 +        ('get_process_io_counters',     0),
   1.286 +        ('get_process_memory_info',     1024),  # KB
   1.287 +    ]
   1.288 +
   1.289 +    def test_compare_values(self):
   1.290 +        # Certain APIs on Windows have 2 internal implementations, one
   1.291 +        # based on documented Windows APIs, another one based
   1.292 +        # NtQuerySystemInformation() which gets called as fallback in
   1.293 +        # case the first fails because of limited permission error.
   1.294 +        # Here we test that the two methods return the exact same value,
   1.295 +        # see:
   1.296 +        # http://code.google.com/p/psutil/issues/detail?id=304
   1.297 +        def assert_ge_0(obj):
   1.298 +            if isinstance(obj, tuple):
   1.299 +                for value in obj:
   1.300 +                    self.assertGreaterEqual(value, 0)
   1.301 +            elif isinstance(obj, (int, long, float)):
   1.302 +                self.assertGreaterEqual(obj, 0)
   1.303 +            else:
   1.304 +                assert 0  # case not handled which needs to be fixed
   1.305 +
   1.306 +        def compare_with_tolerance(ret1, ret2, tolerance):
   1.307 +            if ret1 == ret2:
   1.308 +                return
   1.309 +            else:
   1.310 +                if isinstance(ret2, (int, long, float)):
   1.311 +                    diff = abs(ret1 - ret2)
   1.312 +                    self.assertLessEqual(diff, tolerance)
   1.313 +                elif isinstance(ret2, tuple):
   1.314 +                    for a, b in zip(ret1, ret2):
   1.315 +                        diff = abs(a - b)
   1.316 +                        self.assertLessEqual(diff, tolerance)
   1.317 +
   1.318 +        failures = []
   1.319 +        for name, tolerance in self.fun_names:
   1.320 +            meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
   1.321 +            meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
   1.322 +            for p in psutil.process_iter():
   1.323 +                if name == 'get_process_memory_info' and p.pid == os.getpid():
   1.324 +                    continue
   1.325 +                #
   1.326 +                try:
   1.327 +                    ret1 = meth1(p.pid)
   1.328 +                except psutil.NoSuchProcess:
   1.329 +                    continue
   1.330 +                except psutil.AccessDenied:
   1.331 +                    ret1 = None
   1.332 +                #
   1.333 +                try:
   1.334 +                    ret2 = meth2(p.pid)
   1.335 +                except psutil.NoSuchProcess:
   1.336 +                    # this is supposed to fail only in case of zombie process
   1.337 +                    # never for permission error
   1.338 +                    continue
   1.339 +
   1.340 +                # compare values
   1.341 +                try:
   1.342 +                    if ret1 is None:
   1.343 +                        assert_ge_0(ret2)
   1.344 +                    else:
   1.345 +                        compare_with_tolerance(ret1, ret2, tolerance)
   1.346 +                        assert_ge_0(ret1)
   1.347 +                        assert_ge_0(ret2)
   1.348 +                except AssertionError:
   1.349 +                    err = sys.exc_info()[1]
   1.350 +                    trace = traceback.format_exc()
   1.351 +                    msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
   1.352 +                           % (trace, p.pid, name, ret1, ret2)
   1.353 +                    failures.append(msg)
   1.354 +                    break
   1.355 +        if failures:
   1.356 +            self.fail('\n\n'.join(failures))
   1.357 +
   1.358 +    def test_zombies(self):
   1.359 +        # test that NPS is raised by the 2nd implementation in case a
   1.360 +        # process no longer exists
   1.361 +        ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
   1.362 +        for name, _ in self.fun_names:
   1.363 +            meth = wrap_exceptions(getattr(_psutil_mswindows, name))
   1.364 +            self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
   1.365 +
   1.366 +
   1.367 +def test_main():
   1.368 +    test_suite = unittest.TestSuite()
   1.369 +    test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
   1.370 +    test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
   1.371 +    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
   1.372 +    return result.wasSuccessful()
   1.373 +
   1.374 +if __name__ == '__main__':
   1.375 +    if not test_main():
   1.376 +        sys.exit(1)

mercurial