|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """POSIX specific tests. These are implicitly run by test_psutil.py.""" |
|
8 |
|
9 import unittest |
|
10 import subprocess |
|
11 import time |
|
12 import sys |
|
13 import os |
|
14 import datetime |
|
15 |
|
16 import psutil |
|
17 |
|
18 from psutil._compat import PY3 |
|
19 from test_psutil import * |
|
20 |
|
21 |
|
22 def ps(cmd): |
|
23 """Expects a ps command with a -o argument and parse the result |
|
24 returning only the value of interest. |
|
25 """ |
|
26 if not LINUX: |
|
27 cmd = cmd.replace(" --no-headers ", " ") |
|
28 if SUNOS: |
|
29 cmd = cmd.replace("-o command", "-o comm") |
|
30 cmd = cmd.replace("-o start", "-o stime") |
|
31 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) |
|
32 output = p.communicate()[0].strip() |
|
33 if PY3: |
|
34 output = str(output, sys.stdout.encoding) |
|
35 if not LINUX: |
|
36 output = output.split('\n')[1].strip() |
|
37 try: |
|
38 return int(output) |
|
39 except ValueError: |
|
40 return output |
|
41 |
|
42 |
|
43 class PosixSpecificTestCase(unittest.TestCase): |
|
44 """Compare psutil results against 'ps' command line utility.""" |
|
45 |
|
46 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps |
|
47 |
|
48 def setUp(self): |
|
49 self.pid = get_test_subprocess([PYTHON, "-E", "-O"], |
|
50 stdin=subprocess.PIPE).pid |
|
51 |
|
52 def tearDown(self): |
|
53 reap_children() |
|
54 |
|
55 def test_process_parent_pid(self): |
|
56 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) |
|
57 ppid_psutil = psutil.Process(self.pid).ppid |
|
58 self.assertEqual(ppid_ps, ppid_psutil) |
|
59 |
|
60 def test_process_uid(self): |
|
61 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) |
|
62 uid_psutil = psutil.Process(self.pid).uids.real |
|
63 self.assertEqual(uid_ps, uid_psutil) |
|
64 |
|
65 def test_process_gid(self): |
|
66 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) |
|
67 gid_psutil = psutil.Process(self.pid).gids.real |
|
68 self.assertEqual(gid_ps, gid_psutil) |
|
69 |
|
70 def test_process_username(self): |
|
71 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) |
|
72 username_psutil = psutil.Process(self.pid).username |
|
73 self.assertEqual(username_ps, username_psutil) |
|
74 |
|
75 @skip_on_access_denied() |
|
76 def test_process_rss_memory(self): |
|
77 # give python interpreter some time to properly initialize |
|
78 # so that the results are the same |
|
79 time.sleep(0.1) |
|
80 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) |
|
81 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 |
|
82 self.assertEqual(rss_ps, rss_psutil) |
|
83 |
|
84 @skip_on_access_denied() |
|
85 def test_process_vsz_memory(self): |
|
86 # give python interpreter some time to properly initialize |
|
87 # so that the results are the same |
|
88 time.sleep(0.1) |
|
89 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) |
|
90 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 |
|
91 self.assertEqual(vsz_ps, vsz_psutil) |
|
92 |
|
93 def test_process_name(self): |
|
94 # use command + arg since "comm" keyword not supported on all platforms |
|
95 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
|
96 # remove path if there is any, from the command |
|
97 name_ps = os.path.basename(name_ps).lower() |
|
98 name_psutil = psutil.Process(self.pid).name.lower() |
|
99 self.assertEqual(name_ps, name_psutil) |
|
100 |
|
101 @unittest.skipIf(OSX or BSD, |
|
102 'ps -o start not available') |
|
103 def test_process_create_time(self): |
|
104 time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0] |
|
105 time_psutil = psutil.Process(self.pid).create_time |
|
106 if SUNOS: |
|
107 time_psutil = round(time_psutil) |
|
108 time_psutil_tstamp = datetime.datetime.fromtimestamp( |
|
109 time_psutil).strftime("%H:%M:%S") |
|
110 self.assertEqual(time_ps, time_psutil_tstamp) |
|
111 |
|
112 def test_process_exe(self): |
|
113 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
|
114 psutil_pathname = psutil.Process(self.pid).exe |
|
115 try: |
|
116 self.assertEqual(ps_pathname, psutil_pathname) |
|
117 except AssertionError: |
|
118 # certain platforms such as BSD are more accurate returning: |
|
119 # "/usr/local/bin/python2.7" |
|
120 # ...instead of: |
|
121 # "/usr/local/bin/python" |
|
122 # We do not want to consider this difference in accuracy |
|
123 # an error. |
|
124 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] |
|
125 self.assertEqual(ps_pathname, adjusted_ps_pathname) |
|
126 |
|
127 def test_process_cmdline(self): |
|
128 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) |
|
129 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) |
|
130 if SUNOS: |
|
131 # ps on Solaris only shows the first part of the cmdline |
|
132 psutil_cmdline = psutil_cmdline.split(" ")[0] |
|
133 self.assertEqual(ps_cmdline, psutil_cmdline) |
|
134 |
|
135 @retry_before_failing() |
|
136 def test_get_pids(self): |
|
137 # Note: this test might fail if the OS is starting/killing |
|
138 # other processes in the meantime |
|
139 if SUNOS: |
|
140 cmd = ["ps", "ax"] |
|
141 else: |
|
142 cmd = ["ps", "ax", "-o", "pid"] |
|
143 p = get_test_subprocess(cmd, stdout=subprocess.PIPE) |
|
144 output = p.communicate()[0].strip() |
|
145 if PY3: |
|
146 output = str(output, sys.stdout.encoding) |
|
147 pids_ps = [] |
|
148 for line in output.split('\n')[1:]: |
|
149 if line: |
|
150 pid = int(line.split()[0].strip()) |
|
151 pids_ps.append(pid) |
|
152 # remove ps subprocess pid which is supposed to be dead in meantime |
|
153 pids_ps.remove(p.pid) |
|
154 pids_psutil = psutil.get_pid_list() |
|
155 pids_ps.sort() |
|
156 pids_psutil.sort() |
|
157 |
|
158 # on OSX ps doesn't show pid 0 |
|
159 if OSX and 0 not in pids_ps: |
|
160 pids_ps.insert(0, 0) |
|
161 |
|
162 if pids_ps != pids_psutil: |
|
163 difference = [x for x in pids_psutil if x not in pids_ps] + \ |
|
164 [x for x in pids_ps if x not in pids_psutil] |
|
165 self.fail("difference: " + str(difference)) |
|
166 |
|
167 # for some reason ifconfig -a does not report differente interfaces |
|
168 # psutil does |
|
169 @unittest.skipIf(SUNOS, "test not reliable on SUNOS") |
|
170 def test_nic_names(self): |
|
171 p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) |
|
172 output = p.communicate()[0].strip() |
|
173 if PY3: |
|
174 output = str(output, sys.stdout.encoding) |
|
175 for nic in psutil.net_io_counters(pernic=True).keys(): |
|
176 for line in output.split(): |
|
177 if line.startswith(nic): |
|
178 break |
|
179 else: |
|
180 self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic) |
|
181 |
|
182 def test_get_users(self): |
|
183 out = sh("who") |
|
184 lines = out.split('\n') |
|
185 users = [x.split()[0] for x in lines] |
|
186 self.assertEqual(len(users), len(psutil.get_users())) |
|
187 terminals = [x.split()[1] for x in lines] |
|
188 for u in psutil.get_users(): |
|
189 self.assertTrue(u.name in users, u.name) |
|
190 self.assertTrue(u.terminal in terminals, u.terminal) |
|
191 |
|
192 def test_fds_open(self): |
|
193 # Note: this fails from time to time; I'm keen on thinking |
|
194 # it doesn't mean something is broken |
|
195 def call(p, attr): |
|
196 attr = getattr(p, name, None) |
|
197 if attr is not None and callable(attr): |
|
198 ret = attr() |
|
199 else: |
|
200 ret = attr |
|
201 |
|
202 p = psutil.Process(os.getpid()) |
|
203 attrs = [] |
|
204 failures = [] |
|
205 for name in dir(psutil.Process): |
|
206 if name.startswith('_') \ |
|
207 or name.startswith('set_') \ |
|
208 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', |
|
209 'send_signal', 'wait', 'get_children', 'as_dict'): |
|
210 continue |
|
211 else: |
|
212 try: |
|
213 num1 = p.get_num_fds() |
|
214 for x in range(2): |
|
215 call(p, name) |
|
216 num2 = p.get_num_fds() |
|
217 except psutil.AccessDenied: |
|
218 pass |
|
219 else: |
|
220 if abs(num2 - num1) > 1: |
|
221 fail = "failure while processing Process.%s method " \ |
|
222 "(before=%s, after=%s)" % (name, num1, num2) |
|
223 failures.append(fail) |
|
224 if failures: |
|
225 self.fail('\n' + '\n'.join(failures)) |
|
226 |
|
227 |
|
228 |
|
229 |
|
230 def test_main(): |
|
231 test_suite = unittest.TestSuite() |
|
232 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) |
|
233 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
234 return result.wasSuccessful() |
|
235 |
|
236 if __name__ == '__main__': |
|
237 if not test_main(): |
|
238 sys.exit(1) |