python/psutil/test/_posix.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rwxr-xr-x

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 #!/usr/bin/env python
michael@0 2
michael@0 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
michael@0 4 # Use of this source code is governed by a BSD-style license that can be
michael@0 5 # found in the LICENSE file.
michael@0 6
michael@0 7 """POSIX specific tests. These are implicitly run by test_psutil.py."""
michael@0 8
michael@0 9 import unittest
michael@0 10 import subprocess
michael@0 11 import time
michael@0 12 import sys
michael@0 13 import os
michael@0 14 import datetime
michael@0 15
michael@0 16 import psutil
michael@0 17
michael@0 18 from psutil._compat import PY3
michael@0 19 from test_psutil import *
michael@0 20
michael@0 21
michael@0 22 def ps(cmd):
michael@0 23 """Expects a ps command with a -o argument and parse the result
michael@0 24 returning only the value of interest.
michael@0 25 """
michael@0 26 if not LINUX:
michael@0 27 cmd = cmd.replace(" --no-headers ", " ")
michael@0 28 if SUNOS:
michael@0 29 cmd = cmd.replace("-o command", "-o comm")
michael@0 30 cmd = cmd.replace("-o start", "-o stime")
michael@0 31 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
michael@0 32 output = p.communicate()[0].strip()
michael@0 33 if PY3:
michael@0 34 output = str(output, sys.stdout.encoding)
michael@0 35 if not LINUX:
michael@0 36 output = output.split('\n')[1].strip()
michael@0 37 try:
michael@0 38 return int(output)
michael@0 39 except ValueError:
michael@0 40 return output
michael@0 41
michael@0 42
michael@0 43 class PosixSpecificTestCase(unittest.TestCase):
michael@0 44 """Compare psutil results against 'ps' command line utility."""
michael@0 45
michael@0 46 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
michael@0 47
michael@0 48 def setUp(self):
michael@0 49 self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
michael@0 50 stdin=subprocess.PIPE).pid
michael@0 51
michael@0 52 def tearDown(self):
michael@0 53 reap_children()
michael@0 54
michael@0 55 def test_process_parent_pid(self):
michael@0 56 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
michael@0 57 ppid_psutil = psutil.Process(self.pid).ppid
michael@0 58 self.assertEqual(ppid_ps, ppid_psutil)
michael@0 59
michael@0 60 def test_process_uid(self):
michael@0 61 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
michael@0 62 uid_psutil = psutil.Process(self.pid).uids.real
michael@0 63 self.assertEqual(uid_ps, uid_psutil)
michael@0 64
michael@0 65 def test_process_gid(self):
michael@0 66 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
michael@0 67 gid_psutil = psutil.Process(self.pid).gids.real
michael@0 68 self.assertEqual(gid_ps, gid_psutil)
michael@0 69
michael@0 70 def test_process_username(self):
michael@0 71 username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
michael@0 72 username_psutil = psutil.Process(self.pid).username
michael@0 73 self.assertEqual(username_ps, username_psutil)
michael@0 74
michael@0 75 @skip_on_access_denied()
michael@0 76 def test_process_rss_memory(self):
michael@0 77 # give python interpreter some time to properly initialize
michael@0 78 # so that the results are the same
michael@0 79 time.sleep(0.1)
michael@0 80 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
michael@0 81 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
michael@0 82 self.assertEqual(rss_ps, rss_psutil)
michael@0 83
michael@0 84 @skip_on_access_denied()
michael@0 85 def test_process_vsz_memory(self):
michael@0 86 # give python interpreter some time to properly initialize
michael@0 87 # so that the results are the same
michael@0 88 time.sleep(0.1)
michael@0 89 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
michael@0 90 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
michael@0 91 self.assertEqual(vsz_ps, vsz_psutil)
michael@0 92
michael@0 93 def test_process_name(self):
michael@0 94 # use command + arg since "comm" keyword not supported on all platforms
michael@0 95 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
michael@0 96 # remove path if there is any, from the command
michael@0 97 name_ps = os.path.basename(name_ps).lower()
michael@0 98 name_psutil = psutil.Process(self.pid).name.lower()
michael@0 99 self.assertEqual(name_ps, name_psutil)
michael@0 100
michael@0 101 @unittest.skipIf(OSX or BSD,
michael@0 102 'ps -o start not available')
michael@0 103 def test_process_create_time(self):
michael@0 104 time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
michael@0 105 time_psutil = psutil.Process(self.pid).create_time
michael@0 106 if SUNOS:
michael@0 107 time_psutil = round(time_psutil)
michael@0 108 time_psutil_tstamp = datetime.datetime.fromtimestamp(
michael@0 109 time_psutil).strftime("%H:%M:%S")
michael@0 110 self.assertEqual(time_ps, time_psutil_tstamp)
michael@0 111
michael@0 112 def test_process_exe(self):
michael@0 113 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
michael@0 114 psutil_pathname = psutil.Process(self.pid).exe
michael@0 115 try:
michael@0 116 self.assertEqual(ps_pathname, psutil_pathname)
michael@0 117 except AssertionError:
michael@0 118 # certain platforms such as BSD are more accurate returning:
michael@0 119 # "/usr/local/bin/python2.7"
michael@0 120 # ...instead of:
michael@0 121 # "/usr/local/bin/python"
michael@0 122 # We do not want to consider this difference in accuracy
michael@0 123 # an error.
michael@0 124 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
michael@0 125 self.assertEqual(ps_pathname, adjusted_ps_pathname)
michael@0 126
michael@0 127 def test_process_cmdline(self):
michael@0 128 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
michael@0 129 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
michael@0 130 if SUNOS:
michael@0 131 # ps on Solaris only shows the first part of the cmdline
michael@0 132 psutil_cmdline = psutil_cmdline.split(" ")[0]
michael@0 133 self.assertEqual(ps_cmdline, psutil_cmdline)
michael@0 134
michael@0 135 @retry_before_failing()
michael@0 136 def test_get_pids(self):
michael@0 137 # Note: this test might fail if the OS is starting/killing
michael@0 138 # other processes in the meantime
michael@0 139 if SUNOS:
michael@0 140 cmd = ["ps", "ax"]
michael@0 141 else:
michael@0 142 cmd = ["ps", "ax", "-o", "pid"]
michael@0 143 p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
michael@0 144 output = p.communicate()[0].strip()
michael@0 145 if PY3:
michael@0 146 output = str(output, sys.stdout.encoding)
michael@0 147 pids_ps = []
michael@0 148 for line in output.split('\n')[1:]:
michael@0 149 if line:
michael@0 150 pid = int(line.split()[0].strip())
michael@0 151 pids_ps.append(pid)
michael@0 152 # remove ps subprocess pid which is supposed to be dead in meantime
michael@0 153 pids_ps.remove(p.pid)
michael@0 154 pids_psutil = psutil.get_pid_list()
michael@0 155 pids_ps.sort()
michael@0 156 pids_psutil.sort()
michael@0 157
michael@0 158 # on OSX ps doesn't show pid 0
michael@0 159 if OSX and 0 not in pids_ps:
michael@0 160 pids_ps.insert(0, 0)
michael@0 161
michael@0 162 if pids_ps != pids_psutil:
michael@0 163 difference = [x for x in pids_psutil if x not in pids_ps] + \
michael@0 164 [x for x in pids_ps if x not in pids_psutil]
michael@0 165 self.fail("difference: " + str(difference))
michael@0 166
michael@0 167 # for some reason ifconfig -a does not report differente interfaces
michael@0 168 # psutil does
michael@0 169 @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
michael@0 170 def test_nic_names(self):
michael@0 171 p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
michael@0 172 output = p.communicate()[0].strip()
michael@0 173 if PY3:
michael@0 174 output = str(output, sys.stdout.encoding)
michael@0 175 for nic in psutil.net_io_counters(pernic=True).keys():
michael@0 176 for line in output.split():
michael@0 177 if line.startswith(nic):
michael@0 178 break
michael@0 179 else:
michael@0 180 self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
michael@0 181
michael@0 182 def test_get_users(self):
michael@0 183 out = sh("who")
michael@0 184 lines = out.split('\n')
michael@0 185 users = [x.split()[0] for x in lines]
michael@0 186 self.assertEqual(len(users), len(psutil.get_users()))
michael@0 187 terminals = [x.split()[1] for x in lines]
michael@0 188 for u in psutil.get_users():
michael@0 189 self.assertTrue(u.name in users, u.name)
michael@0 190 self.assertTrue(u.terminal in terminals, u.terminal)
michael@0 191
michael@0 192 def test_fds_open(self):
michael@0 193 # Note: this fails from time to time; I'm keen on thinking
michael@0 194 # it doesn't mean something is broken
michael@0 195 def call(p, attr):
michael@0 196 attr = getattr(p, name, None)
michael@0 197 if attr is not None and callable(attr):
michael@0 198 ret = attr()
michael@0 199 else:
michael@0 200 ret = attr
michael@0 201
michael@0 202 p = psutil.Process(os.getpid())
michael@0 203 attrs = []
michael@0 204 failures = []
michael@0 205 for name in dir(psutil.Process):
michael@0 206 if name.startswith('_') \
michael@0 207 or name.startswith('set_') \
michael@0 208 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
michael@0 209 'send_signal', 'wait', 'get_children', 'as_dict'):
michael@0 210 continue
michael@0 211 else:
michael@0 212 try:
michael@0 213 num1 = p.get_num_fds()
michael@0 214 for x in range(2):
michael@0 215 call(p, name)
michael@0 216 num2 = p.get_num_fds()
michael@0 217 except psutil.AccessDenied:
michael@0 218 pass
michael@0 219 else:
michael@0 220 if abs(num2 - num1) > 1:
michael@0 221 fail = "failure while processing Process.%s method " \
michael@0 222 "(before=%s, after=%s)" % (name, num1, num2)
michael@0 223 failures.append(fail)
michael@0 224 if failures:
michael@0 225 self.fail('\n' + '\n'.join(failures))
michael@0 226
michael@0 227
michael@0 228
michael@0 229
michael@0 230 def test_main():
michael@0 231 test_suite = unittest.TestSuite()
michael@0 232 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
michael@0 233 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
michael@0 234 return result.wasSuccessful()
michael@0 235
michael@0 236 if __name__ == '__main__':
michael@0 237 if not test_main():
michael@0 238 sys.exit(1)

mercurial