1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/python/psutil/test/_posix.py Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,238 @@ 1.4 +#!/usr/bin/env python 1.5 + 1.6 +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 1.7 +# Use of this source code is governed by a BSD-style license that can be 1.8 +# found in the LICENSE file. 1.9 + 1.10 +"""POSIX specific tests. These are implicitly run by test_psutil.py.""" 1.11 + 1.12 +import unittest 1.13 +import subprocess 1.14 +import time 1.15 +import sys 1.16 +import os 1.17 +import datetime 1.18 + 1.19 +import psutil 1.20 + 1.21 +from psutil._compat import PY3 1.22 +from test_psutil import * 1.23 + 1.24 + 1.25 +def ps(cmd): 1.26 + """Expects a ps command with a -o argument and parse the result 1.27 + returning only the value of interest. 1.28 + """ 1.29 + if not LINUX: 1.30 + cmd = cmd.replace(" --no-headers ", " ") 1.31 + if SUNOS: 1.32 + cmd = cmd.replace("-o command", "-o comm") 1.33 + cmd = cmd.replace("-o start", "-o stime") 1.34 + p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) 1.35 + output = p.communicate()[0].strip() 1.36 + if PY3: 1.37 + output = str(output, sys.stdout.encoding) 1.38 + if not LINUX: 1.39 + output = output.split('\n')[1].strip() 1.40 + try: 1.41 + return int(output) 1.42 + except ValueError: 1.43 + return output 1.44 + 1.45 + 1.46 +class PosixSpecificTestCase(unittest.TestCase): 1.47 + """Compare psutil results against 'ps' command line utility.""" 1.48 + 1.49 + # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps 1.50 + 1.51 + def setUp(self): 1.52 + self.pid = get_test_subprocess([PYTHON, "-E", "-O"], 1.53 + stdin=subprocess.PIPE).pid 1.54 + 1.55 + def tearDown(self): 1.56 + reap_children() 1.57 + 1.58 + def test_process_parent_pid(self): 1.59 + ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) 1.60 + ppid_psutil = psutil.Process(self.pid).ppid 1.61 + self.assertEqual(ppid_ps, ppid_psutil) 1.62 + 1.63 + def test_process_uid(self): 1.64 + uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) 1.65 + uid_psutil = psutil.Process(self.pid).uids.real 1.66 + self.assertEqual(uid_ps, uid_psutil) 1.67 + 1.68 + def test_process_gid(self): 1.69 + gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) 1.70 + gid_psutil = psutil.Process(self.pid).gids.real 1.71 + self.assertEqual(gid_ps, gid_psutil) 1.72 + 1.73 + def test_process_username(self): 1.74 + username_ps = ps("ps --no-headers -o user -p %s" %self.pid) 1.75 + username_psutil = psutil.Process(self.pid).username 1.76 + self.assertEqual(username_ps, username_psutil) 1.77 + 1.78 + @skip_on_access_denied() 1.79 + def test_process_rss_memory(self): 1.80 + # give python interpreter some time to properly initialize 1.81 + # so that the results are the same 1.82 + time.sleep(0.1) 1.83 + rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) 1.84 + rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 1.85 + self.assertEqual(rss_ps, rss_psutil) 1.86 + 1.87 + @skip_on_access_denied() 1.88 + def test_process_vsz_memory(self): 1.89 + # give python interpreter some time to properly initialize 1.90 + # so that the results are the same 1.91 + time.sleep(0.1) 1.92 + vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) 1.93 + vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 1.94 + self.assertEqual(vsz_ps, vsz_psutil) 1.95 + 1.96 + def test_process_name(self): 1.97 + # use command + arg since "comm" keyword not supported on all platforms 1.98 + name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] 1.99 + # remove path if there is any, from the command 1.100 + name_ps = os.path.basename(name_ps).lower() 1.101 + name_psutil = psutil.Process(self.pid).name.lower() 1.102 + self.assertEqual(name_ps, name_psutil) 1.103 + 1.104 + @unittest.skipIf(OSX or BSD, 1.105 + 'ps -o start not available') 1.106 + def test_process_create_time(self): 1.107 + time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0] 1.108 + time_psutil = psutil.Process(self.pid).create_time 1.109 + if SUNOS: 1.110 + time_psutil = round(time_psutil) 1.111 + time_psutil_tstamp = datetime.datetime.fromtimestamp( 1.112 + time_psutil).strftime("%H:%M:%S") 1.113 + self.assertEqual(time_ps, time_psutil_tstamp) 1.114 + 1.115 + def test_process_exe(self): 1.116 + ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] 1.117 + psutil_pathname = psutil.Process(self.pid).exe 1.118 + try: 1.119 + self.assertEqual(ps_pathname, psutil_pathname) 1.120 + except AssertionError: 1.121 + # certain platforms such as BSD are more accurate returning: 1.122 + # "/usr/local/bin/python2.7" 1.123 + # ...instead of: 1.124 + # "/usr/local/bin/python" 1.125 + # We do not want to consider this difference in accuracy 1.126 + # an error. 1.127 + adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] 1.128 + self.assertEqual(ps_pathname, adjusted_ps_pathname) 1.129 + 1.130 + def test_process_cmdline(self): 1.131 + ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) 1.132 + psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) 1.133 + if SUNOS: 1.134 + # ps on Solaris only shows the first part of the cmdline 1.135 + psutil_cmdline = psutil_cmdline.split(" ")[0] 1.136 + self.assertEqual(ps_cmdline, psutil_cmdline) 1.137 + 1.138 + @retry_before_failing() 1.139 + def test_get_pids(self): 1.140 + # Note: this test might fail if the OS is starting/killing 1.141 + # other processes in the meantime 1.142 + if SUNOS: 1.143 + cmd = ["ps", "ax"] 1.144 + else: 1.145 + cmd = ["ps", "ax", "-o", "pid"] 1.146 + p = get_test_subprocess(cmd, stdout=subprocess.PIPE) 1.147 + output = p.communicate()[0].strip() 1.148 + if PY3: 1.149 + output = str(output, sys.stdout.encoding) 1.150 + pids_ps = [] 1.151 + for line in output.split('\n')[1:]: 1.152 + if line: 1.153 + pid = int(line.split()[0].strip()) 1.154 + pids_ps.append(pid) 1.155 + # remove ps subprocess pid which is supposed to be dead in meantime 1.156 + pids_ps.remove(p.pid) 1.157 + pids_psutil = psutil.get_pid_list() 1.158 + pids_ps.sort() 1.159 + pids_psutil.sort() 1.160 + 1.161 + # on OSX ps doesn't show pid 0 1.162 + if OSX and 0 not in pids_ps: 1.163 + pids_ps.insert(0, 0) 1.164 + 1.165 + if pids_ps != pids_psutil: 1.166 + difference = [x for x in pids_psutil if x not in pids_ps] + \ 1.167 + [x for x in pids_ps if x not in pids_psutil] 1.168 + self.fail("difference: " + str(difference)) 1.169 + 1.170 + # for some reason ifconfig -a does not report differente interfaces 1.171 + # psutil does 1.172 + @unittest.skipIf(SUNOS, "test not reliable on SUNOS") 1.173 + def test_nic_names(self): 1.174 + p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) 1.175 + output = p.communicate()[0].strip() 1.176 + if PY3: 1.177 + output = str(output, sys.stdout.encoding) 1.178 + for nic in psutil.net_io_counters(pernic=True).keys(): 1.179 + for line in output.split(): 1.180 + if line.startswith(nic): 1.181 + break 1.182 + else: 1.183 + self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic) 1.184 + 1.185 + def test_get_users(self): 1.186 + out = sh("who") 1.187 + lines = out.split('\n') 1.188 + users = [x.split()[0] for x in lines] 1.189 + self.assertEqual(len(users), len(psutil.get_users())) 1.190 + terminals = [x.split()[1] for x in lines] 1.191 + for u in psutil.get_users(): 1.192 + self.assertTrue(u.name in users, u.name) 1.193 + self.assertTrue(u.terminal in terminals, u.terminal) 1.194 + 1.195 + def test_fds_open(self): 1.196 + # Note: this fails from time to time; I'm keen on thinking 1.197 + # it doesn't mean something is broken 1.198 + def call(p, attr): 1.199 + attr = getattr(p, name, None) 1.200 + if attr is not None and callable(attr): 1.201 + ret = attr() 1.202 + else: 1.203 + ret = attr 1.204 + 1.205 + p = psutil.Process(os.getpid()) 1.206 + attrs = [] 1.207 + failures = [] 1.208 + for name in dir(psutil.Process): 1.209 + if name.startswith('_') \ 1.210 + or name.startswith('set_') \ 1.211 + or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', 1.212 + 'send_signal', 'wait', 'get_children', 'as_dict'): 1.213 + continue 1.214 + else: 1.215 + try: 1.216 + num1 = p.get_num_fds() 1.217 + for x in range(2): 1.218 + call(p, name) 1.219 + num2 = p.get_num_fds() 1.220 + except psutil.AccessDenied: 1.221 + pass 1.222 + else: 1.223 + if abs(num2 - num1) > 1: 1.224 + fail = "failure while processing Process.%s method " \ 1.225 + "(before=%s, after=%s)" % (name, num1, num2) 1.226 + failures.append(fail) 1.227 + if failures: 1.228 + self.fail('\n' + '\n'.join(failures)) 1.229 + 1.230 + 1.231 + 1.232 + 1.233 +def test_main(): 1.234 + test_suite = unittest.TestSuite() 1.235 + test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) 1.236 + result = unittest.TextTestRunner(verbosity=2).run(test_suite) 1.237 + return result.wasSuccessful() 1.238 + 1.239 +if __name__ == '__main__': 1.240 + if not test_main(): 1.241 + sys.exit(1)