python/psutil/test/_posix.py

branch
TOR_BUG_9701
changeset 15
b8a032363ba2
equal deleted inserted replaced
-1:000000000000 0:1149137420a3
1 #!/usr/bin/env python
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """POSIX specific tests. These are implicitly run by test_psutil.py."""
8
9 import unittest
10 import subprocess
11 import time
12 import sys
13 import os
14 import datetime
15
16 import psutil
17
18 from psutil._compat import PY3
19 from test_psutil import *
20
21
22 def ps(cmd):
23 """Expects a ps command with a -o argument and parse the result
24 returning only the value of interest.
25 """
26 if not LINUX:
27 cmd = cmd.replace(" --no-headers ", " ")
28 if SUNOS:
29 cmd = cmd.replace("-o command", "-o comm")
30 cmd = cmd.replace("-o start", "-o stime")
31 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
32 output = p.communicate()[0].strip()
33 if PY3:
34 output = str(output, sys.stdout.encoding)
35 if not LINUX:
36 output = output.split('\n')[1].strip()
37 try:
38 return int(output)
39 except ValueError:
40 return output
41
42
43 class PosixSpecificTestCase(unittest.TestCase):
44 """Compare psutil results against 'ps' command line utility."""
45
46 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
47
48 def setUp(self):
49 self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
50 stdin=subprocess.PIPE).pid
51
52 def tearDown(self):
53 reap_children()
54
55 def test_process_parent_pid(self):
56 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
57 ppid_psutil = psutil.Process(self.pid).ppid
58 self.assertEqual(ppid_ps, ppid_psutil)
59
60 def test_process_uid(self):
61 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
62 uid_psutil = psutil.Process(self.pid).uids.real
63 self.assertEqual(uid_ps, uid_psutil)
64
65 def test_process_gid(self):
66 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
67 gid_psutil = psutil.Process(self.pid).gids.real
68 self.assertEqual(gid_ps, gid_psutil)
69
70 def test_process_username(self):
71 username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
72 username_psutil = psutil.Process(self.pid).username
73 self.assertEqual(username_ps, username_psutil)
74
75 @skip_on_access_denied()
76 def test_process_rss_memory(self):
77 # give python interpreter some time to properly initialize
78 # so that the results are the same
79 time.sleep(0.1)
80 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
81 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
82 self.assertEqual(rss_ps, rss_psutil)
83
84 @skip_on_access_denied()
85 def test_process_vsz_memory(self):
86 # give python interpreter some time to properly initialize
87 # so that the results are the same
88 time.sleep(0.1)
89 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
90 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
91 self.assertEqual(vsz_ps, vsz_psutil)
92
93 def test_process_name(self):
94 # use command + arg since "comm" keyword not supported on all platforms
95 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
96 # remove path if there is any, from the command
97 name_ps = os.path.basename(name_ps).lower()
98 name_psutil = psutil.Process(self.pid).name.lower()
99 self.assertEqual(name_ps, name_psutil)
100
101 @unittest.skipIf(OSX or BSD,
102 'ps -o start not available')
103 def test_process_create_time(self):
104 time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
105 time_psutil = psutil.Process(self.pid).create_time
106 if SUNOS:
107 time_psutil = round(time_psutil)
108 time_psutil_tstamp = datetime.datetime.fromtimestamp(
109 time_psutil).strftime("%H:%M:%S")
110 self.assertEqual(time_ps, time_psutil_tstamp)
111
112 def test_process_exe(self):
113 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
114 psutil_pathname = psutil.Process(self.pid).exe
115 try:
116 self.assertEqual(ps_pathname, psutil_pathname)
117 except AssertionError:
118 # certain platforms such as BSD are more accurate returning:
119 # "/usr/local/bin/python2.7"
120 # ...instead of:
121 # "/usr/local/bin/python"
122 # We do not want to consider this difference in accuracy
123 # an error.
124 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
125 self.assertEqual(ps_pathname, adjusted_ps_pathname)
126
127 def test_process_cmdline(self):
128 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
129 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
130 if SUNOS:
131 # ps on Solaris only shows the first part of the cmdline
132 psutil_cmdline = psutil_cmdline.split(" ")[0]
133 self.assertEqual(ps_cmdline, psutil_cmdline)
134
135 @retry_before_failing()
136 def test_get_pids(self):
137 # Note: this test might fail if the OS is starting/killing
138 # other processes in the meantime
139 if SUNOS:
140 cmd = ["ps", "ax"]
141 else:
142 cmd = ["ps", "ax", "-o", "pid"]
143 p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
144 output = p.communicate()[0].strip()
145 if PY3:
146 output = str(output, sys.stdout.encoding)
147 pids_ps = []
148 for line in output.split('\n')[1:]:
149 if line:
150 pid = int(line.split()[0].strip())
151 pids_ps.append(pid)
152 # remove ps subprocess pid which is supposed to be dead in meantime
153 pids_ps.remove(p.pid)
154 pids_psutil = psutil.get_pid_list()
155 pids_ps.sort()
156 pids_psutil.sort()
157
158 # on OSX ps doesn't show pid 0
159 if OSX and 0 not in pids_ps:
160 pids_ps.insert(0, 0)
161
162 if pids_ps != pids_psutil:
163 difference = [x for x in pids_psutil if x not in pids_ps] + \
164 [x for x in pids_ps if x not in pids_psutil]
165 self.fail("difference: " + str(difference))
166
167 # for some reason ifconfig -a does not report differente interfaces
168 # psutil does
169 @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
170 def test_nic_names(self):
171 p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
172 output = p.communicate()[0].strip()
173 if PY3:
174 output = str(output, sys.stdout.encoding)
175 for nic in psutil.net_io_counters(pernic=True).keys():
176 for line in output.split():
177 if line.startswith(nic):
178 break
179 else:
180 self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
181
182 def test_get_users(self):
183 out = sh("who")
184 lines = out.split('\n')
185 users = [x.split()[0] for x in lines]
186 self.assertEqual(len(users), len(psutil.get_users()))
187 terminals = [x.split()[1] for x in lines]
188 for u in psutil.get_users():
189 self.assertTrue(u.name in users, u.name)
190 self.assertTrue(u.terminal in terminals, u.terminal)
191
192 def test_fds_open(self):
193 # Note: this fails from time to time; I'm keen on thinking
194 # it doesn't mean something is broken
195 def call(p, attr):
196 attr = getattr(p, name, None)
197 if attr is not None and callable(attr):
198 ret = attr()
199 else:
200 ret = attr
201
202 p = psutil.Process(os.getpid())
203 attrs = []
204 failures = []
205 for name in dir(psutil.Process):
206 if name.startswith('_') \
207 or name.startswith('set_') \
208 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
209 'send_signal', 'wait', 'get_children', 'as_dict'):
210 continue
211 else:
212 try:
213 num1 = p.get_num_fds()
214 for x in range(2):
215 call(p, name)
216 num2 = p.get_num_fds()
217 except psutil.AccessDenied:
218 pass
219 else:
220 if abs(num2 - num1) > 1:
221 fail = "failure while processing Process.%s method " \
222 "(before=%s, after=%s)" % (name, num1, num2)
223 failures.append(fail)
224 if failures:
225 self.fail('\n' + '\n'.join(failures))
226
227
228
229
230 def test_main():
231 test_suite = unittest.TestSuite()
232 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
233 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
234 return result.wasSuccessful()
235
236 if __name__ == '__main__':
237 if not test_main():
238 sys.exit(1)

mercurial