Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
michael@0 | 1 | #!/usr/bin/env python |
michael@0 | 2 | |
michael@0 | 3 | # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
michael@0 | 4 | # Use of this source code is governed by a BSD-style license that can be |
michael@0 | 5 | # found in the LICENSE file. |
michael@0 | 6 | |
michael@0 | 7 | """POSIX specific tests. These are implicitly run by test_psutil.py.""" |
michael@0 | 8 | |
michael@0 | 9 | import unittest |
michael@0 | 10 | import subprocess |
michael@0 | 11 | import time |
michael@0 | 12 | import sys |
michael@0 | 13 | import os |
michael@0 | 14 | import datetime |
michael@0 | 15 | |
michael@0 | 16 | import psutil |
michael@0 | 17 | |
michael@0 | 18 | from psutil._compat import PY3 |
michael@0 | 19 | from test_psutil import * |
michael@0 | 20 | |
michael@0 | 21 | |
michael@0 | 22 | def ps(cmd): |
michael@0 | 23 | """Expects a ps command with a -o argument and parse the result |
michael@0 | 24 | returning only the value of interest. |
michael@0 | 25 | """ |
michael@0 | 26 | if not LINUX: |
michael@0 | 27 | cmd = cmd.replace(" --no-headers ", " ") |
michael@0 | 28 | if SUNOS: |
michael@0 | 29 | cmd = cmd.replace("-o command", "-o comm") |
michael@0 | 30 | cmd = cmd.replace("-o start", "-o stime") |
michael@0 | 31 | p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) |
michael@0 | 32 | output = p.communicate()[0].strip() |
michael@0 | 33 | if PY3: |
michael@0 | 34 | output = str(output, sys.stdout.encoding) |
michael@0 | 35 | if not LINUX: |
michael@0 | 36 | output = output.split('\n')[1].strip() |
michael@0 | 37 | try: |
michael@0 | 38 | return int(output) |
michael@0 | 39 | except ValueError: |
michael@0 | 40 | return output |
michael@0 | 41 | |
michael@0 | 42 | |
michael@0 | 43 | class PosixSpecificTestCase(unittest.TestCase): |
michael@0 | 44 | """Compare psutil results against 'ps' command line utility.""" |
michael@0 | 45 | |
michael@0 | 46 | # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps |
michael@0 | 47 | |
michael@0 | 48 | def setUp(self): |
michael@0 | 49 | self.pid = get_test_subprocess([PYTHON, "-E", "-O"], |
michael@0 | 50 | stdin=subprocess.PIPE).pid |
michael@0 | 51 | |
michael@0 | 52 | def tearDown(self): |
michael@0 | 53 | reap_children() |
michael@0 | 54 | |
michael@0 | 55 | def test_process_parent_pid(self): |
michael@0 | 56 | ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) |
michael@0 | 57 | ppid_psutil = psutil.Process(self.pid).ppid |
michael@0 | 58 | self.assertEqual(ppid_ps, ppid_psutil) |
michael@0 | 59 | |
michael@0 | 60 | def test_process_uid(self): |
michael@0 | 61 | uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) |
michael@0 | 62 | uid_psutil = psutil.Process(self.pid).uids.real |
michael@0 | 63 | self.assertEqual(uid_ps, uid_psutil) |
michael@0 | 64 | |
michael@0 | 65 | def test_process_gid(self): |
michael@0 | 66 | gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) |
michael@0 | 67 | gid_psutil = psutil.Process(self.pid).gids.real |
michael@0 | 68 | self.assertEqual(gid_ps, gid_psutil) |
michael@0 | 69 | |
michael@0 | 70 | def test_process_username(self): |
michael@0 | 71 | username_ps = ps("ps --no-headers -o user -p %s" %self.pid) |
michael@0 | 72 | username_psutil = psutil.Process(self.pid).username |
michael@0 | 73 | self.assertEqual(username_ps, username_psutil) |
michael@0 | 74 | |
michael@0 | 75 | @skip_on_access_denied() |
michael@0 | 76 | def test_process_rss_memory(self): |
michael@0 | 77 | # give python interpreter some time to properly initialize |
michael@0 | 78 | # so that the results are the same |
michael@0 | 79 | time.sleep(0.1) |
michael@0 | 80 | rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) |
michael@0 | 81 | rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 |
michael@0 | 82 | self.assertEqual(rss_ps, rss_psutil) |
michael@0 | 83 | |
michael@0 | 84 | @skip_on_access_denied() |
michael@0 | 85 | def test_process_vsz_memory(self): |
michael@0 | 86 | # give python interpreter some time to properly initialize |
michael@0 | 87 | # so that the results are the same |
michael@0 | 88 | time.sleep(0.1) |
michael@0 | 89 | vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) |
michael@0 | 90 | vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 |
michael@0 | 91 | self.assertEqual(vsz_ps, vsz_psutil) |
michael@0 | 92 | |
michael@0 | 93 | def test_process_name(self): |
michael@0 | 94 | # use command + arg since "comm" keyword not supported on all platforms |
michael@0 | 95 | name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
michael@0 | 96 | # remove path if there is any, from the command |
michael@0 | 97 | name_ps = os.path.basename(name_ps).lower() |
michael@0 | 98 | name_psutil = psutil.Process(self.pid).name.lower() |
michael@0 | 99 | self.assertEqual(name_ps, name_psutil) |
michael@0 | 100 | |
michael@0 | 101 | @unittest.skipIf(OSX or BSD, |
michael@0 | 102 | 'ps -o start not available') |
michael@0 | 103 | def test_process_create_time(self): |
michael@0 | 104 | time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0] |
michael@0 | 105 | time_psutil = psutil.Process(self.pid).create_time |
michael@0 | 106 | if SUNOS: |
michael@0 | 107 | time_psutil = round(time_psutil) |
michael@0 | 108 | time_psutil_tstamp = datetime.datetime.fromtimestamp( |
michael@0 | 109 | time_psutil).strftime("%H:%M:%S") |
michael@0 | 110 | self.assertEqual(time_ps, time_psutil_tstamp) |
michael@0 | 111 | |
michael@0 | 112 | def test_process_exe(self): |
michael@0 | 113 | ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
michael@0 | 114 | psutil_pathname = psutil.Process(self.pid).exe |
michael@0 | 115 | try: |
michael@0 | 116 | self.assertEqual(ps_pathname, psutil_pathname) |
michael@0 | 117 | except AssertionError: |
michael@0 | 118 | # certain platforms such as BSD are more accurate returning: |
michael@0 | 119 | # "/usr/local/bin/python2.7" |
michael@0 | 120 | # ...instead of: |
michael@0 | 121 | # "/usr/local/bin/python" |
michael@0 | 122 | # We do not want to consider this difference in accuracy |
michael@0 | 123 | # an error. |
michael@0 | 124 | adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] |
michael@0 | 125 | self.assertEqual(ps_pathname, adjusted_ps_pathname) |
michael@0 | 126 | |
michael@0 | 127 | def test_process_cmdline(self): |
michael@0 | 128 | ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) |
michael@0 | 129 | psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) |
michael@0 | 130 | if SUNOS: |
michael@0 | 131 | # ps on Solaris only shows the first part of the cmdline |
michael@0 | 132 | psutil_cmdline = psutil_cmdline.split(" ")[0] |
michael@0 | 133 | self.assertEqual(ps_cmdline, psutil_cmdline) |
michael@0 | 134 | |
michael@0 | 135 | @retry_before_failing() |
michael@0 | 136 | def test_get_pids(self): |
michael@0 | 137 | # Note: this test might fail if the OS is starting/killing |
michael@0 | 138 | # other processes in the meantime |
michael@0 | 139 | if SUNOS: |
michael@0 | 140 | cmd = ["ps", "ax"] |
michael@0 | 141 | else: |
michael@0 | 142 | cmd = ["ps", "ax", "-o", "pid"] |
michael@0 | 143 | p = get_test_subprocess(cmd, stdout=subprocess.PIPE) |
michael@0 | 144 | output = p.communicate()[0].strip() |
michael@0 | 145 | if PY3: |
michael@0 | 146 | output = str(output, sys.stdout.encoding) |
michael@0 | 147 | pids_ps = [] |
michael@0 | 148 | for line in output.split('\n')[1:]: |
michael@0 | 149 | if line: |
michael@0 | 150 | pid = int(line.split()[0].strip()) |
michael@0 | 151 | pids_ps.append(pid) |
michael@0 | 152 | # remove ps subprocess pid which is supposed to be dead in meantime |
michael@0 | 153 | pids_ps.remove(p.pid) |
michael@0 | 154 | pids_psutil = psutil.get_pid_list() |
michael@0 | 155 | pids_ps.sort() |
michael@0 | 156 | pids_psutil.sort() |
michael@0 | 157 | |
michael@0 | 158 | # on OSX ps doesn't show pid 0 |
michael@0 | 159 | if OSX and 0 not in pids_ps: |
michael@0 | 160 | pids_ps.insert(0, 0) |
michael@0 | 161 | |
michael@0 | 162 | if pids_ps != pids_psutil: |
michael@0 | 163 | difference = [x for x in pids_psutil if x not in pids_ps] + \ |
michael@0 | 164 | [x for x in pids_ps if x not in pids_psutil] |
michael@0 | 165 | self.fail("difference: " + str(difference)) |
michael@0 | 166 | |
michael@0 | 167 | # for some reason ifconfig -a does not report differente interfaces |
michael@0 | 168 | # psutil does |
michael@0 | 169 | @unittest.skipIf(SUNOS, "test not reliable on SUNOS") |
michael@0 | 170 | def test_nic_names(self): |
michael@0 | 171 | p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) |
michael@0 | 172 | output = p.communicate()[0].strip() |
michael@0 | 173 | if PY3: |
michael@0 | 174 | output = str(output, sys.stdout.encoding) |
michael@0 | 175 | for nic in psutil.net_io_counters(pernic=True).keys(): |
michael@0 | 176 | for line in output.split(): |
michael@0 | 177 | if line.startswith(nic): |
michael@0 | 178 | break |
michael@0 | 179 | else: |
michael@0 | 180 | self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic) |
michael@0 | 181 | |
michael@0 | 182 | def test_get_users(self): |
michael@0 | 183 | out = sh("who") |
michael@0 | 184 | lines = out.split('\n') |
michael@0 | 185 | users = [x.split()[0] for x in lines] |
michael@0 | 186 | self.assertEqual(len(users), len(psutil.get_users())) |
michael@0 | 187 | terminals = [x.split()[1] for x in lines] |
michael@0 | 188 | for u in psutil.get_users(): |
michael@0 | 189 | self.assertTrue(u.name in users, u.name) |
michael@0 | 190 | self.assertTrue(u.terminal in terminals, u.terminal) |
michael@0 | 191 | |
michael@0 | 192 | def test_fds_open(self): |
michael@0 | 193 | # Note: this fails from time to time; I'm keen on thinking |
michael@0 | 194 | # it doesn't mean something is broken |
michael@0 | 195 | def call(p, attr): |
michael@0 | 196 | attr = getattr(p, name, None) |
michael@0 | 197 | if attr is not None and callable(attr): |
michael@0 | 198 | ret = attr() |
michael@0 | 199 | else: |
michael@0 | 200 | ret = attr |
michael@0 | 201 | |
michael@0 | 202 | p = psutil.Process(os.getpid()) |
michael@0 | 203 | attrs = [] |
michael@0 | 204 | failures = [] |
michael@0 | 205 | for name in dir(psutil.Process): |
michael@0 | 206 | if name.startswith('_') \ |
michael@0 | 207 | or name.startswith('set_') \ |
michael@0 | 208 | or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', |
michael@0 | 209 | 'send_signal', 'wait', 'get_children', 'as_dict'): |
michael@0 | 210 | continue |
michael@0 | 211 | else: |
michael@0 | 212 | try: |
michael@0 | 213 | num1 = p.get_num_fds() |
michael@0 | 214 | for x in range(2): |
michael@0 | 215 | call(p, name) |
michael@0 | 216 | num2 = p.get_num_fds() |
michael@0 | 217 | except psutil.AccessDenied: |
michael@0 | 218 | pass |
michael@0 | 219 | else: |
michael@0 | 220 | if abs(num2 - num1) > 1: |
michael@0 | 221 | fail = "failure while processing Process.%s method " \ |
michael@0 | 222 | "(before=%s, after=%s)" % (name, num1, num2) |
michael@0 | 223 | failures.append(fail) |
michael@0 | 224 | if failures: |
michael@0 | 225 | self.fail('\n' + '\n'.join(failures)) |
michael@0 | 226 | |
michael@0 | 227 | |
michael@0 | 228 | |
michael@0 | 229 | |
michael@0 | 230 | def test_main(): |
michael@0 | 231 | test_suite = unittest.TestSuite() |
michael@0 | 232 | test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) |
michael@0 | 233 | result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
michael@0 | 234 | return result.wasSuccessful() |
michael@0 | 235 | |
michael@0 | 236 | if __name__ == '__main__': |
michael@0 | 237 | if not test_main(): |
michael@0 | 238 | sys.exit(1) |