python/psutil/test/_posix.py

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/python/psutil/test/_posix.py	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,238 @@
     1.4 +#!/usr/bin/env python
     1.5 +
     1.6 +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     1.7 +# Use of this source code is governed by a BSD-style license that can be
     1.8 +# found in the LICENSE file.
     1.9 +
    1.10 +"""POSIX specific tests.  These are implicitly run by test_psutil.py."""
    1.11 +
    1.12 +import unittest
    1.13 +import subprocess
    1.14 +import time
    1.15 +import sys
    1.16 +import os
    1.17 +import datetime
    1.18 +
    1.19 +import psutil
    1.20 +
    1.21 +from psutil._compat import PY3
    1.22 +from test_psutil import *
    1.23 +
    1.24 +
    1.25 +def ps(cmd):
    1.26 +    """Expects a ps command with a -o argument and parse the result
    1.27 +    returning only the value of interest.
    1.28 +    """
    1.29 +    if not LINUX:
    1.30 +        cmd = cmd.replace(" --no-headers ", " ")
    1.31 +    if SUNOS:
    1.32 +        cmd = cmd.replace("-o command", "-o comm")
    1.33 +        cmd = cmd.replace("-o start", "-o stime")
    1.34 +    p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
    1.35 +    output = p.communicate()[0].strip()
    1.36 +    if PY3:
    1.37 +        output = str(output, sys.stdout.encoding)
    1.38 +    if not LINUX:
    1.39 +        output = output.split('\n')[1].strip()
    1.40 +    try:
    1.41 +        return int(output)
    1.42 +    except ValueError:
    1.43 +        return output
    1.44 +
    1.45 +
    1.46 +class PosixSpecificTestCase(unittest.TestCase):
    1.47 +    """Compare psutil results against 'ps' command line utility."""
    1.48 +
    1.49 +    # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
    1.50 +
    1.51 +    def setUp(self):
    1.52 +        self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
    1.53 +                                       stdin=subprocess.PIPE).pid
    1.54 +
    1.55 +    def tearDown(self):
    1.56 +        reap_children()
    1.57 +
    1.58 +    def test_process_parent_pid(self):
    1.59 +        ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
    1.60 +        ppid_psutil = psutil.Process(self.pid).ppid
    1.61 +        self.assertEqual(ppid_ps, ppid_psutil)
    1.62 +
    1.63 +    def test_process_uid(self):
    1.64 +        uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
    1.65 +        uid_psutil = psutil.Process(self.pid).uids.real
    1.66 +        self.assertEqual(uid_ps, uid_psutil)
    1.67 +
    1.68 +    def test_process_gid(self):
    1.69 +        gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
    1.70 +        gid_psutil = psutil.Process(self.pid).gids.real
    1.71 +        self.assertEqual(gid_ps, gid_psutil)
    1.72 +
    1.73 +    def test_process_username(self):
    1.74 +        username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
    1.75 +        username_psutil = psutil.Process(self.pid).username
    1.76 +        self.assertEqual(username_ps, username_psutil)
    1.77 +
    1.78 +    @skip_on_access_denied()
    1.79 +    def test_process_rss_memory(self):
    1.80 +        # give python interpreter some time to properly initialize
    1.81 +        # so that the results are the same
    1.82 +        time.sleep(0.1)
    1.83 +        rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
    1.84 +        rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
    1.85 +        self.assertEqual(rss_ps, rss_psutil)
    1.86 +
    1.87 +    @skip_on_access_denied()
    1.88 +    def test_process_vsz_memory(self):
    1.89 +        # give python interpreter some time to properly initialize
    1.90 +        # so that the results are the same
    1.91 +        time.sleep(0.1)
    1.92 +        vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
    1.93 +        vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
    1.94 +        self.assertEqual(vsz_ps, vsz_psutil)
    1.95 +
    1.96 +    def test_process_name(self):
    1.97 +        # use command + arg since "comm" keyword not supported on all platforms
    1.98 +        name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
    1.99 +        # remove path if there is any, from the command
   1.100 +        name_ps = os.path.basename(name_ps).lower()
   1.101 +        name_psutil = psutil.Process(self.pid).name.lower()
   1.102 +        self.assertEqual(name_ps, name_psutil)
   1.103 +
   1.104 +    @unittest.skipIf(OSX or BSD,
   1.105 +                    'ps -o start not available')
   1.106 +    def test_process_create_time(self):
   1.107 +        time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
   1.108 +        time_psutil = psutil.Process(self.pid).create_time
   1.109 +        if SUNOS:
   1.110 +            time_psutil = round(time_psutil)
   1.111 +        time_psutil_tstamp = datetime.datetime.fromtimestamp(
   1.112 +                        time_psutil).strftime("%H:%M:%S")
   1.113 +        self.assertEqual(time_ps, time_psutil_tstamp)
   1.114 +
   1.115 +    def test_process_exe(self):
   1.116 +        ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
   1.117 +        psutil_pathname = psutil.Process(self.pid).exe
   1.118 +        try:
   1.119 +            self.assertEqual(ps_pathname, psutil_pathname)
   1.120 +        except AssertionError:
   1.121 +            # certain platforms such as BSD are more accurate returning:
   1.122 +            # "/usr/local/bin/python2.7"
   1.123 +            # ...instead of:
   1.124 +            # "/usr/local/bin/python"
   1.125 +            # We do not want to consider this difference in accuracy
   1.126 +            # an error.
   1.127 +            adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
   1.128 +            self.assertEqual(ps_pathname, adjusted_ps_pathname)
   1.129 +
   1.130 +    def test_process_cmdline(self):
   1.131 +        ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
   1.132 +        psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
   1.133 +        if SUNOS:
   1.134 +            # ps on Solaris only shows the first part of the cmdline
   1.135 +            psutil_cmdline = psutil_cmdline.split(" ")[0]
   1.136 +        self.assertEqual(ps_cmdline, psutil_cmdline)
   1.137 +
   1.138 +    @retry_before_failing()
   1.139 +    def test_get_pids(self):
   1.140 +        # Note: this test might fail if the OS is starting/killing
   1.141 +        # other processes in the meantime
   1.142 +        if SUNOS:
   1.143 +            cmd = ["ps", "ax"]
   1.144 +        else:
   1.145 +            cmd = ["ps", "ax", "-o", "pid"]
   1.146 +        p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
   1.147 +        output = p.communicate()[0].strip()
   1.148 +        if PY3:
   1.149 +            output = str(output, sys.stdout.encoding)
   1.150 +        pids_ps = []
   1.151 +        for line in output.split('\n')[1:]:
   1.152 +            if line:
   1.153 +                pid = int(line.split()[0].strip())
   1.154 +                pids_ps.append(pid)
   1.155 +        # remove ps subprocess pid which is supposed to be dead in meantime
   1.156 +        pids_ps.remove(p.pid)
   1.157 +        pids_psutil = psutil.get_pid_list()
   1.158 +        pids_ps.sort()
   1.159 +        pids_psutil.sort()
   1.160 +
   1.161 +        # on OSX ps doesn't show pid 0
   1.162 +        if OSX and 0 not in pids_ps:
   1.163 +            pids_ps.insert(0, 0)
   1.164 +
   1.165 +        if pids_ps != pids_psutil:
   1.166 +            difference = [x for x in pids_psutil if x not in pids_ps] + \
   1.167 +                         [x for x in pids_ps if x not in pids_psutil]
   1.168 +            self.fail("difference: " + str(difference))
   1.169 +
   1.170 +    # for some reason ifconfig -a does not report differente interfaces
   1.171 +    # psutil does
   1.172 +    @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
   1.173 +    def test_nic_names(self):
   1.174 +        p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
   1.175 +        output = p.communicate()[0].strip()
   1.176 +        if PY3:
   1.177 +            output = str(output, sys.stdout.encoding)
   1.178 +        for nic in psutil.net_io_counters(pernic=True).keys():
   1.179 +            for line in output.split():
   1.180 +                if line.startswith(nic):
   1.181 +                    break
   1.182 +            else:
   1.183 +                self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
   1.184 +
   1.185 +    def test_get_users(self):
   1.186 +        out = sh("who")
   1.187 +        lines = out.split('\n')
   1.188 +        users = [x.split()[0] for x in lines]
   1.189 +        self.assertEqual(len(users), len(psutil.get_users()))
   1.190 +        terminals = [x.split()[1] for x in lines]
   1.191 +        for u in psutil.get_users():
   1.192 +            self.assertTrue(u.name in users, u.name)
   1.193 +            self.assertTrue(u.terminal in terminals, u.terminal)
   1.194 +
   1.195 +    def test_fds_open(self):
   1.196 +        # Note: this fails from time to time; I'm keen on thinking
   1.197 +        # it doesn't mean something is broken
   1.198 +        def call(p, attr):
   1.199 +            attr = getattr(p, name, None)
   1.200 +            if attr is not None and callable(attr):
   1.201 +                ret = attr()
   1.202 +            else:
   1.203 +                ret = attr
   1.204 +
   1.205 +        p = psutil.Process(os.getpid())
   1.206 +        attrs = []
   1.207 +        failures = []
   1.208 +        for name in dir(psutil.Process):
   1.209 +            if name.startswith('_') \
   1.210 +            or name.startswith('set_') \
   1.211 +            or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
   1.212 +                        'send_signal', 'wait', 'get_children', 'as_dict'):
   1.213 +                continue
   1.214 +            else:
   1.215 +                try:
   1.216 +                    num1 = p.get_num_fds()
   1.217 +                    for x in range(2):
   1.218 +                        call(p, name)
   1.219 +                    num2 = p.get_num_fds()
   1.220 +                except psutil.AccessDenied:
   1.221 +                    pass
   1.222 +                else:
   1.223 +                    if abs(num2 - num1) > 1:
   1.224 +                        fail = "failure while processing Process.%s method " \
   1.225 +                               "(before=%s, after=%s)" % (name, num1, num2)
   1.226 +                        failures.append(fail)
   1.227 +        if failures:
   1.228 +            self.fail('\n' + '\n'.join(failures))
   1.229 +
   1.230 +
   1.231 +
   1.232 +
   1.233 +def test_main():
   1.234 +    test_suite = unittest.TestSuite()
   1.235 +    test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
   1.236 +    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
   1.237 +    return result.wasSuccessful()
   1.238 +
   1.239 +if __name__ == '__main__':
   1.240 +    if not test_main():
   1.241 +        sys.exit(1)

mercurial