| |
1 #!/usr/bin/env python |
| |
2 |
| |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
| |
4 # Use of this source code is governed by a BSD-style license that can be |
| |
5 # found in the LICENSE file. |
| |
6 |
| |
7 """ |
| |
8 psutil test suite (you can quickly run it with "python setup.py test"). |
| |
9 |
| |
10 Note: this is targeted for both python 2.x and 3.x so there's no need |
| |
11 to use 2to3 tool first. |
| |
12 |
| |
13 If you're on Python < 2.7 it is recommended to install unittest2 module |
| |
14 from: https://pypi.python.org/pypi/unittest2 |
| |
15 """ |
| |
16 |
| |
17 from __future__ import division |
| |
18 import os |
| |
19 import sys |
| |
20 import subprocess |
| |
21 import time |
| |
22 import signal |
| |
23 import types |
| |
24 import traceback |
| |
25 import socket |
| |
26 import warnings |
| |
27 import atexit |
| |
28 import errno |
| |
29 import threading |
| |
30 import tempfile |
| |
31 import stat |
| |
32 import collections |
| |
33 import datetime |
| |
34 try: |
| |
35 import unittest2 as unittest # pyhon < 2.7 + unittest2 installed |
| |
36 except ImportError: |
| |
37 import unittest |
| |
38 try: |
| |
39 import ast # python >= 2.6 |
| |
40 except ImportError: |
| |
41 ast = None |
| |
42 |
| |
43 import psutil |
| |
44 from psutil._compat import PY3, callable, long, wraps |
| |
45 |
| |
46 |
| |
47 # =================================================================== |
| |
48 # --- Constants |
| |
49 # =================================================================== |
| |
50 |
| |
51 # conf for retry_before_failing() decorator |
| |
52 NO_RETRIES = 10 |
| |
53 # bytes tolerance for OS memory related tests |
| |
54 TOLERANCE = 500 * 1024 # 500KB |
| |
55 |
| |
56 PYTHON = os.path.realpath(sys.executable) |
| |
57 DEVNULL = open(os.devnull, 'r+') |
| |
58 TESTFN = os.path.join(os.getcwd(), "$testfile") |
| |
59 EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname( |
| |
60 os.path.dirname(__file__)), 'examples')) |
| |
61 POSIX = os.name == 'posix' |
| |
62 LINUX = sys.platform.startswith("linux") |
| |
63 WINDOWS = sys.platform.startswith("win32") |
| |
64 OSX = sys.platform.startswith("darwin") |
| |
65 BSD = sys.platform.startswith("freebsd") |
| |
66 SUNOS = sys.platform.startswith("sunos") |
| |
67 |
| |
68 |
| |
69 # =================================================================== |
| |
70 # --- Utility functions |
| |
71 # =================================================================== |
| |
72 |
| |
73 _subprocesses_started = set() |
| |
74 |
| |
75 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL, |
| |
76 wait=False): |
| |
77 """Return a subprocess.Popen object to use in tests. |
| |
78 By default stdout and stderr are redirected to /dev/null and the |
| |
79 python interpreter is used as test process. |
| |
80 If 'wait' is True attemps to make sure the process is in a |
| |
81 reasonably initialized state. |
| |
82 """ |
| |
83 if cmd is None: |
| |
84 pyline = "" |
| |
85 if wait: |
| |
86 pyline += "open(r'%s', 'w'); " % TESTFN |
| |
87 pyline += "import time; time.sleep(2);" |
| |
88 cmd_ = [PYTHON, "-c", pyline] |
| |
89 else: |
| |
90 cmd_ = cmd |
| |
91 sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) |
| |
92 if wait: |
| |
93 if cmd is None: |
| |
94 stop_at = time.time() + 3 |
| |
95 while stop_at > time.time(): |
| |
96 if os.path.exists(TESTFN): |
| |
97 break |
| |
98 time.sleep(0.001) |
| |
99 else: |
| |
100 warn("couldn't make sure test file was actually created") |
| |
101 else: |
| |
102 wait_for_pid(sproc.pid) |
| |
103 _subprocesses_started.add(sproc.pid) |
| |
104 return sproc |
| |
105 |
| |
106 def warn(msg): |
| |
107 """Raise a warning msg.""" |
| |
108 warnings.warn(msg, UserWarning) |
| |
109 |
| |
110 def register_warning(msg): |
| |
111 """Register a warning which will be printed on interpreter exit.""" |
| |
112 atexit.register(lambda: warn(msg)) |
| |
113 |
| |
114 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): |
| |
115 """run cmd in a subprocess and return its output. |
| |
116 raises RuntimeError on error. |
| |
117 """ |
| |
118 p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) |
| |
119 stdout, stderr = p.communicate() |
| |
120 if p.returncode != 0: |
| |
121 raise RuntimeError(stderr) |
| |
122 if stderr: |
| |
123 warn(stderr) |
| |
124 if PY3: |
| |
125 stdout = str(stdout, sys.stdout.encoding) |
| |
126 return stdout.strip() |
| |
127 |
| |
128 def which(program): |
| |
129 """Same as UNIX which command. Return None on command not found.""" |
| |
130 def is_exe(fpath): |
| |
131 return os.path.isfile(fpath) and os.access(fpath, os.X_OK) |
| |
132 |
| |
133 fpath, fname = os.path.split(program) |
| |
134 if fpath: |
| |
135 if is_exe(program): |
| |
136 return program |
| |
137 else: |
| |
138 for path in os.environ["PATH"].split(os.pathsep): |
| |
139 exe_file = os.path.join(path, program) |
| |
140 if is_exe(exe_file): |
| |
141 return exe_file |
| |
142 return None |
| |
143 |
| |
144 def wait_for_pid(pid, timeout=1): |
| |
145 """Wait for pid to show up in the process list then return. |
| |
146 Used in the test suite to give time the sub process to initialize. |
| |
147 """ |
| |
148 raise_at = time.time() + timeout |
| |
149 while 1: |
| |
150 if pid in psutil.get_pid_list(): |
| |
151 # give it one more iteration to allow full initialization |
| |
152 time.sleep(0.01) |
| |
153 return |
| |
154 time.sleep(0.0001) |
| |
155 if time.time() >= raise_at: |
| |
156 raise RuntimeError("Timed out") |
| |
157 |
| |
158 def reap_children(search_all=False): |
| |
159 """Kill any subprocess started by this test suite and ensure that |
| |
160 no zombies stick around to hog resources and create problems when |
| |
161 looking for refleaks. |
| |
162 """ |
| |
163 pids = _subprocesses_started |
| |
164 if search_all: |
| |
165 this_process = psutil.Process(os.getpid()) |
| |
166 for p in this_process.get_children(recursive=True): |
| |
167 pids.add(p.pid) |
| |
168 while pids: |
| |
169 pid = pids.pop() |
| |
170 try: |
| |
171 child = psutil.Process(pid) |
| |
172 child.kill() |
| |
173 except psutil.NoSuchProcess: |
| |
174 pass |
| |
175 except psutil.AccessDenied: |
| |
176 warn("couldn't kill child process with pid %s" % pid) |
| |
177 else: |
| |
178 child.wait(timeout=3) |
| |
179 |
| |
180 def check_ip_address(addr, family): |
| |
181 """Attempts to check IP address's validity.""" |
| |
182 if not addr: |
| |
183 return |
| |
184 ip, port = addr |
| |
185 assert isinstance(port, int), port |
| |
186 if family == socket.AF_INET: |
| |
187 ip = list(map(int, ip.split('.'))) |
| |
188 assert len(ip) == 4, ip |
| |
189 for num in ip: |
| |
190 assert 0 <= num <= 255, ip |
| |
191 assert 0 <= port <= 65535, port |
| |
192 |
| |
193 def safe_remove(fname): |
| |
194 """Deletes a file and does not exception if it doesn't exist.""" |
| |
195 try: |
| |
196 os.remove(fname) |
| |
197 except OSError: |
| |
198 err = sys.exc_info()[1] |
| |
199 if err.args[0] != errno.ENOENT: |
| |
200 raise |
| |
201 |
| |
202 def call_until(fun, expr, timeout=1): |
| |
203 """Keep calling function for timeout secs and exit if eval() |
| |
204 expression is True. |
| |
205 """ |
| |
206 stop_at = time.time() + timeout |
| |
207 while time.time() < stop_at: |
| |
208 ret = fun() |
| |
209 if eval(expr): |
| |
210 return ret |
| |
211 time.sleep(0.001) |
| |
212 raise RuntimeError('timed out (ret=%r)' % ret) |
| |
213 |
| |
214 def retry_before_failing(ntimes=None): |
| |
215 """Decorator which runs a test function and retries N times before |
| |
216 actually failing. |
| |
217 """ |
| |
218 def decorator(fun): |
| |
219 @wraps(fun) |
| |
220 def wrapper(*args, **kwargs): |
| |
221 for x in range(ntimes or NO_RETRIES): |
| |
222 try: |
| |
223 return fun(*args, **kwargs) |
| |
224 except AssertionError: |
| |
225 err = sys.exc_info()[1] |
| |
226 raise |
| |
227 return wrapper |
| |
228 return decorator |
| |
229 |
| |
230 def skip_on_access_denied(only_if=None): |
| |
231 """Decorator to Ignore AccessDenied exceptions.""" |
| |
232 def decorator(fun): |
| |
233 @wraps(fun) |
| |
234 def wrapper(*args, **kwargs): |
| |
235 try: |
| |
236 return fun(*args, **kwargs) |
| |
237 except psutil.AccessDenied: |
| |
238 if only_if is not None: |
| |
239 if not only_if: |
| |
240 raise |
| |
241 msg = "%r was skipped because it raised AccessDenied" \ |
| |
242 % fun.__name__ |
| |
243 self = args[0] |
| |
244 if hasattr(self, 'skip'): # python >= 2.7 |
| |
245 self.skip(msg) |
| |
246 else: |
| |
247 register_warning(msg) |
| |
248 return wrapper |
| |
249 return decorator |
| |
250 |
| |
251 def skip_on_not_implemented(only_if=None): |
| |
252 """Decorator to Ignore NotImplementedError exceptions.""" |
| |
253 def decorator(fun): |
| |
254 @wraps(fun) |
| |
255 def wrapper(*args, **kwargs): |
| |
256 try: |
| |
257 return fun(*args, **kwargs) |
| |
258 except NotImplementedError: |
| |
259 if only_if is not None: |
| |
260 if not only_if: |
| |
261 raise |
| |
262 msg = "%r was skipped because it raised NotImplementedError" \ |
| |
263 % fun.__name__ |
| |
264 self = args[0] |
| |
265 if hasattr(self, 'skip'): # python >= 2.7 |
| |
266 self.skip(msg) |
| |
267 else: |
| |
268 register_warning(msg) |
| |
269 return wrapper |
| |
270 return decorator |
| |
271 |
| |
272 def supports_ipv6(): |
| |
273 """Return True if IPv6 is supported on this platform.""" |
| |
274 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
| |
275 return False |
| |
276 sock = None |
| |
277 try: |
| |
278 try: |
| |
279 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| |
280 sock.bind(("::1", 0)) |
| |
281 except (socket.error, socket.gaierror): |
| |
282 return False |
| |
283 else: |
| |
284 return True |
| |
285 finally: |
| |
286 if sock is not None: |
| |
287 sock.close() |
| |
288 |
| |
289 |
| |
290 class ThreadTask(threading.Thread): |
| |
291 """A thread object used for running process thread tests.""" |
| |
292 |
| |
293 def __init__(self): |
| |
294 threading.Thread.__init__(self) |
| |
295 self._running = False |
| |
296 self._interval = None |
| |
297 self._flag = threading.Event() |
| |
298 |
| |
299 def __repr__(self): |
| |
300 name = self.__class__.__name__ |
| |
301 return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
| |
302 |
| |
303 def start(self, interval=0.001): |
| |
304 """Start thread and keep it running until an explicit |
| |
305 stop() request. Polls for shutdown every 'timeout' seconds. |
| |
306 """ |
| |
307 if self._running: |
| |
308 raise ValueError("already started") |
| |
309 self._interval = interval |
| |
310 threading.Thread.start(self) |
| |
311 self._flag.wait() |
| |
312 |
| |
313 def run(self): |
| |
314 self._running = True |
| |
315 self._flag.set() |
| |
316 while self._running: |
| |
317 time.sleep(self._interval) |
| |
318 |
| |
319 def stop(self): |
| |
320 """Stop thread execution and and waits until it is stopped.""" |
| |
321 if not self._running: |
| |
322 raise ValueError("already stopped") |
| |
323 self._running = False |
| |
324 self.join() |
| |
325 |
| |
326 |
| |
327 # =================================================================== |
| |
328 # --- Support for python < 2.7 in case unittest2 is not installed |
| |
329 # =================================================================== |
| |
330 |
| |
331 if not hasattr(unittest, 'skip'): |
| |
332 register_warning("unittest2 module is not installed; a serie of pretty " \ |
| |
333 "darn ugly workarounds will be used") |
| |
334 |
| |
335 class SkipTest(Exception): |
| |
336 pass |
| |
337 |
| |
338 class TestCase(unittest.TestCase): |
| |
339 |
| |
340 def _safe_repr(self, obj): |
| |
341 MAX_LENGTH = 80 |
| |
342 try: |
| |
343 result = repr(obj) |
| |
344 except Exception: |
| |
345 result = object.__repr__(obj) |
| |
346 if len(result) < MAX_LENGTH: |
| |
347 return result |
| |
348 return result[:MAX_LENGTH] + ' [truncated]...' |
| |
349 |
| |
350 def _fail_w_msg(self, a, b, middle, msg): |
| |
351 self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle, |
| |
352 self._safe_repr(b))) |
| |
353 |
| |
354 def skip(self, msg): |
| |
355 raise SkipTest(msg) |
| |
356 |
| |
357 def assertIn(self, a, b, msg=None): |
| |
358 if a not in b: |
| |
359 self._fail_w_msg(a, b, 'not found in', msg) |
| |
360 |
| |
361 def assertNotIn(self, a, b, msg=None): |
| |
362 if a in b: |
| |
363 self._fail_w_msg(a, b, 'found in', msg) |
| |
364 |
| |
365 def assertGreater(self, a, b, msg=None): |
| |
366 if not a > b: |
| |
367 self._fail_w_msg(a, b, 'not greater than', msg) |
| |
368 |
| |
369 def assertGreaterEqual(self, a, b, msg=None): |
| |
370 if not a >= b: |
| |
371 self._fail_w_msg(a, b, 'not greater than or equal to', msg) |
| |
372 |
| |
373 def assertLess(self, a, b, msg=None): |
| |
374 if not a < b: |
| |
375 self._fail_w_msg(a, b, 'not less than', msg) |
| |
376 |
| |
377 def assertLessEqual(self, a, b, msg=None): |
| |
378 if not a <= b: |
| |
379 self._fail_w_msg(a, b, 'not less or equal to', msg) |
| |
380 |
| |
381 def assertIsInstance(self, a, b, msg=None): |
| |
382 if not isinstance(a, b): |
| |
383 self.fail(msg or '%s is not an instance of %r' \ |
| |
384 % (self._safe_repr(a), b)) |
| |
385 |
| |
386 def assertAlmostEqual(self, a, b, msg=None, delta=None): |
| |
387 if delta is not None: |
| |
388 if abs(a - b) <= delta: |
| |
389 return |
| |
390 self.fail(msg or '%s != %s within %s delta' \ |
| |
391 % (self._safe_repr(a), self._safe_repr(b), |
| |
392 self._safe_repr(delta))) |
| |
393 else: |
| |
394 self.assertEqual(a, b, msg=msg) |
| |
395 |
| |
396 |
| |
397 def skipIf(condition, reason): |
| |
398 def decorator(fun): |
| |
399 @wraps(fun) |
| |
400 def wrapper(*args, **kwargs): |
| |
401 self = args[0] |
| |
402 if condition: |
| |
403 sys.stdout.write("skipped-") |
| |
404 sys.stdout.flush() |
| |
405 if warn: |
| |
406 objname = "%s.%s" % (self.__class__.__name__, |
| |
407 fun.__name__) |
| |
408 msg = "%s was skipped" % objname |
| |
409 if reason: |
| |
410 msg += "; reason: " + repr(reason) |
| |
411 register_warning(msg) |
| |
412 return |
| |
413 else: |
| |
414 return fun(*args, **kwargs) |
| |
415 return wrapper |
| |
416 return decorator |
| |
417 |
| |
418 def skipUnless(condition, reason): |
| |
419 if not condition: |
| |
420 return unittest.skipIf(True, reason) |
| |
421 return unittest.skipIf(False, reason) |
| |
422 |
| |
423 unittest.TestCase = TestCase |
| |
424 unittest.skipIf = skipIf |
| |
425 unittest.skipUnless = skipUnless |
| |
426 del TestCase, skipIf, skipUnless |
| |
427 |
| |
428 |
| |
429 # =================================================================== |
| |
430 # --- System-related API tests |
| |
431 # =================================================================== |
| |
432 |
| |
433 class TestSystemAPIs(unittest.TestCase): |
| |
434 """Tests for system-related APIs.""" |
| |
435 |
| |
436 def setUp(self): |
| |
437 safe_remove(TESTFN) |
| |
438 |
| |
439 def tearDown(self): |
| |
440 reap_children() |
| |
441 |
| |
442 def test_process_iter(self): |
| |
443 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) |
| |
444 sproc = get_test_subprocess() |
| |
445 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
| |
446 p = psutil.Process(sproc.pid) |
| |
447 p.kill() |
| |
448 p.wait() |
| |
449 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
| |
450 |
| |
451 def test_TOTAL_PHYMEM(self): |
| |
452 x = psutil.TOTAL_PHYMEM |
| |
453 self.assertIsInstance(x, (int, long)) |
| |
454 self.assertGreater(x, 0) |
| |
455 self.assertEqual(x, psutil.virtual_memory().total) |
| |
456 |
| |
457 def test_BOOT_TIME(self, arg=None): |
| |
458 x = arg or psutil.BOOT_TIME |
| |
459 self.assertIsInstance(x, float) |
| |
460 self.assertGreater(x, 0) |
| |
461 self.assertLess(x, time.time()) |
| |
462 |
| |
463 def test_get_boot_time(self): |
| |
464 self.test_BOOT_TIME(psutil.get_boot_time()) |
| |
465 if WINDOWS: |
| |
466 # work around float precision issues; give it 1 secs tolerance |
| |
467 diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME) |
| |
468 self.assertLess(diff, 1) |
| |
469 else: |
| |
470 self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME) |
| |
471 |
| |
472 def test_NUM_CPUS(self): |
| |
473 self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True))) |
| |
474 self.assertGreaterEqual(psutil.NUM_CPUS, 1) |
| |
475 |
| |
476 @unittest.skipUnless(POSIX, 'posix only') |
| |
477 def test_PAGESIZE(self): |
| |
478 # pagesize is used internally to perform different calculations |
| |
479 # and it's determined by using SC_PAGE_SIZE; make sure |
| |
480 # getpagesize() returns the same value. |
| |
481 import resource |
| |
482 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) |
| |
483 |
| |
484 def test_deprecated_apis(self): |
| |
485 s = socket.socket() |
| |
486 s.bind(('localhost', 0)) |
| |
487 s.listen(1) |
| |
488 warnings.filterwarnings("error") |
| |
489 p = psutil.Process(os.getpid()) |
| |
490 try: |
| |
491 self.assertRaises(DeprecationWarning, psutil.virtmem_usage) |
| |
492 self.assertRaises(DeprecationWarning, psutil.used_phymem) |
| |
493 self.assertRaises(DeprecationWarning, psutil.avail_phymem) |
| |
494 self.assertRaises(DeprecationWarning, psutil.total_virtmem) |
| |
495 self.assertRaises(DeprecationWarning, psutil.used_virtmem) |
| |
496 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) |
| |
497 self.assertRaises(DeprecationWarning, psutil.phymem_usage) |
| |
498 self.assertRaises(DeprecationWarning, psutil.get_process_list) |
| |
499 self.assertRaises(DeprecationWarning, psutil.network_io_counters) |
| |
500 if LINUX: |
| |
501 self.assertRaises(DeprecationWarning, psutil.phymem_buffers) |
| |
502 self.assertRaises(DeprecationWarning, psutil.cached_phymem) |
| |
503 try: |
| |
504 p.nice |
| |
505 except DeprecationWarning: |
| |
506 pass |
| |
507 else: |
| |
508 self.fail("p.nice didn't raise DeprecationWarning") |
| |
509 ret = call_until(p.get_connections, "len(ret) != 0", timeout=1) |
| |
510 self.assertRaises(DeprecationWarning, |
| |
511 getattr, ret[0], 'local_address') |
| |
512 self.assertRaises(DeprecationWarning, |
| |
513 getattr, ret[0], 'remote_address') |
| |
514 finally: |
| |
515 s.close() |
| |
516 warnings.resetwarnings() |
| |
517 |
| |
518 def test_deprecated_apis_retval(self): |
| |
519 warnings.filterwarnings("ignore") |
| |
520 p = psutil.Process(os.getpid()) |
| |
521 try: |
| |
522 self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total) |
| |
523 self.assertEqual(p.nice, p.get_nice()) |
| |
524 finally: |
| |
525 warnings.resetwarnings() |
| |
526 |
| |
527 def test_virtual_memory(self): |
| |
528 mem = psutil.virtual_memory() |
| |
529 assert mem.total > 0, mem |
| |
530 assert mem.available > 0, mem |
| |
531 assert 0 <= mem.percent <= 100, mem |
| |
532 assert mem.used > 0, mem |
| |
533 assert mem.free >= 0, mem |
| |
534 for name in mem._fields: |
| |
535 if name != 'total': |
| |
536 value = getattr(mem, name) |
| |
537 if not value >= 0: |
| |
538 self.fail("%r < 0 (%s)" % (name, value)) |
| |
539 if value > mem.total: |
| |
540 self.fail("%r > total (total=%s, %s=%s)" \ |
| |
541 % (name, mem.total, name, value)) |
| |
542 |
| |
543 def test_swap_memory(self): |
| |
544 mem = psutil.swap_memory() |
| |
545 assert mem.total >= 0, mem |
| |
546 assert mem.used >= 0, mem |
| |
547 assert mem.free > 0, mem |
| |
548 assert 0 <= mem.percent <= 100, mem |
| |
549 assert mem.sin >= 0, mem |
| |
550 assert mem.sout >= 0, mem |
| |
551 |
| |
552 def test_pid_exists(self): |
| |
553 sproc = get_test_subprocess(wait=True) |
| |
554 assert psutil.pid_exists(sproc.pid) |
| |
555 p = psutil.Process(sproc.pid) |
| |
556 p.kill() |
| |
557 p.wait() |
| |
558 self.assertFalse(psutil.pid_exists(sproc.pid)) |
| |
559 self.assertFalse(psutil.pid_exists(-1)) |
| |
560 |
| |
561 def test_pid_exists_2(self): |
| |
562 reap_children() |
| |
563 pids = psutil.get_pid_list() |
| |
564 for pid in pids: |
| |
565 try: |
| |
566 assert psutil.pid_exists(pid) |
| |
567 except AssertionError: |
| |
568 # in case the process disappeared in meantime fail only |
| |
569 # if it is no longer in get_pid_list() |
| |
570 time.sleep(.1) |
| |
571 if pid in psutil.get_pid_list(): |
| |
572 self.fail(pid) |
| |
573 pids = range(max(pids) + 5000, max(pids) + 6000) |
| |
574 for pid in pids: |
| |
575 self.assertFalse(psutil.pid_exists(pid)) |
| |
576 |
| |
577 def test_get_pid_list(self): |
| |
578 plist = [x.pid for x in psutil.process_iter()] |
| |
579 pidlist = psutil.get_pid_list() |
| |
580 self.assertEqual(plist.sort(), pidlist.sort()) |
| |
581 # make sure every pid is unique |
| |
582 self.assertEqual(len(pidlist), len(set(pidlist))) |
| |
583 |
| |
584 def test_test(self): |
| |
585 # test for psutil.test() function |
| |
586 stdout = sys.stdout |
| |
587 sys.stdout = DEVNULL |
| |
588 try: |
| |
589 psutil.test() |
| |
590 finally: |
| |
591 sys.stdout = stdout |
| |
592 |
| |
593 def test_sys_cpu_times(self): |
| |
594 total = 0 |
| |
595 times = psutil.cpu_times() |
| |
596 sum(times) |
| |
597 for cp_time in times: |
| |
598 self.assertIsInstance(cp_time, float) |
| |
599 self.assertGreaterEqual(cp_time, 0.0) |
| |
600 total += cp_time |
| |
601 self.assertEqual(total, sum(times)) |
| |
602 str(times) |
| |
603 |
| |
604 def test_sys_cpu_times2(self): |
| |
605 t1 = sum(psutil.cpu_times()) |
| |
606 time.sleep(0.1) |
| |
607 t2 = sum(psutil.cpu_times()) |
| |
608 difference = t2 - t1 |
| |
609 if not difference >= 0.05: |
| |
610 self.fail("difference %s" % difference) |
| |
611 |
| |
612 def test_sys_per_cpu_times(self): |
| |
613 for times in psutil.cpu_times(percpu=True): |
| |
614 total = 0 |
| |
615 sum(times) |
| |
616 for cp_time in times: |
| |
617 self.assertIsInstance(cp_time, float) |
| |
618 self.assertGreaterEqual(cp_time, 0.0) |
| |
619 total += cp_time |
| |
620 self.assertEqual(total, sum(times)) |
| |
621 str(times) |
| |
622 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), |
| |
623 len(psutil.cpu_times(percpu=False))) |
| |
624 |
| |
625 def test_sys_per_cpu_times2(self): |
| |
626 tot1 = psutil.cpu_times(percpu=True) |
| |
627 stop_at = time.time() + 0.1 |
| |
628 while 1: |
| |
629 if time.time() >= stop_at: |
| |
630 break |
| |
631 tot2 = psutil.cpu_times(percpu=True) |
| |
632 for t1, t2 in zip(tot1, tot2): |
| |
633 t1, t2 = sum(t1), sum(t2) |
| |
634 difference = t2 - t1 |
| |
635 if difference >= 0.05: |
| |
636 return |
| |
637 self.fail() |
| |
638 |
| |
639 def _test_cpu_percent(self, percent): |
| |
640 self.assertIsInstance(percent, float) |
| |
641 self.assertGreaterEqual(percent, 0.0) |
| |
642 self.assertLessEqual(percent, 100.0) |
| |
643 |
| |
644 def test_sys_cpu_percent(self): |
| |
645 psutil.cpu_percent(interval=0.001) |
| |
646 for x in range(1000): |
| |
647 self._test_cpu_percent(psutil.cpu_percent(interval=None)) |
| |
648 |
| |
649 def test_sys_per_cpu_percent(self): |
| |
650 self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)), |
| |
651 psutil.NUM_CPUS) |
| |
652 for x in range(1000): |
| |
653 percents = psutil.cpu_percent(interval=None, percpu=True) |
| |
654 for percent in percents: |
| |
655 self._test_cpu_percent(percent) |
| |
656 |
| |
657 def test_sys_cpu_times_percent(self): |
| |
658 psutil.cpu_times_percent(interval=0.001) |
| |
659 for x in range(1000): |
| |
660 cpu = psutil.cpu_times_percent(interval=None) |
| |
661 for percent in cpu: |
| |
662 self._test_cpu_percent(percent) |
| |
663 self._test_cpu_percent(sum(cpu)) |
| |
664 |
| |
665 def test_sys_per_cpu_times_percent(self): |
| |
666 self.assertEqual(len(psutil.cpu_times_percent(interval=0.001, |
| |
667 percpu=True)), |
| |
668 psutil.NUM_CPUS) |
| |
669 for x in range(1000): |
| |
670 cpus = psutil.cpu_times_percent(interval=None, percpu=True) |
| |
671 for cpu in cpus: |
| |
672 for percent in cpu: |
| |
673 self._test_cpu_percent(percent) |
| |
674 self._test_cpu_percent(sum(cpu)) |
| |
675 |
| |
676 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
| |
677 "os.statvfs() function not available on this platform") |
| |
678 def test_disk_usage(self): |
| |
679 usage = psutil.disk_usage(os.getcwd()) |
| |
680 assert usage.total > 0, usage |
| |
681 assert usage.used > 0, usage |
| |
682 assert usage.free > 0, usage |
| |
683 assert usage.total > usage.used, usage |
| |
684 assert usage.total > usage.free, usage |
| |
685 assert 0 <= usage.percent <= 100, usage.percent |
| |
686 |
| |
687 # if path does not exist OSError ENOENT is expected across |
| |
688 # all platforms |
| |
689 fname = tempfile.mktemp() |
| |
690 try: |
| |
691 psutil.disk_usage(fname) |
| |
692 except OSError: |
| |
693 err = sys.exc_info()[1] |
| |
694 if err.args[0] != errno.ENOENT: |
| |
695 raise |
| |
696 else: |
| |
697 self.fail("OSError not raised") |
| |
698 |
| |
699 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
| |
700 "os.statvfs() function not available on this platform") |
| |
701 def test_disk_partitions(self): |
| |
702 # all = False |
| |
703 for disk in psutil.disk_partitions(all=False): |
| |
704 if WINDOWS and 'cdrom' in disk.opts: |
| |
705 continue |
| |
706 if not POSIX: |
| |
707 assert os.path.exists(disk.device), disk |
| |
708 else: |
| |
709 # we cannot make any assumption about this, see: |
| |
710 # http://goo.gl/p9c43 |
| |
711 disk.device |
| |
712 if SUNOS: |
| |
713 # on solaris apparently mount points can also be files |
| |
714 assert os.path.exists(disk.mountpoint), disk |
| |
715 else: |
| |
716 assert os.path.isdir(disk.mountpoint), disk |
| |
717 assert disk.fstype, disk |
| |
718 self.assertIsInstance(disk.opts, str) |
| |
719 |
| |
720 # all = True |
| |
721 for disk in psutil.disk_partitions(all=True): |
| |
722 if not WINDOWS: |
| |
723 try: |
| |
724 os.stat(disk.mountpoint) |
| |
725 except OSError: |
| |
726 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html |
| |
727 err = sys.exc_info()[1] |
| |
728 if err.errno not in (errno.EPERM, errno.EACCES): |
| |
729 raise |
| |
730 else: |
| |
731 if SUNOS: |
| |
732 # on solaris apparently mount points can also be files |
| |
733 assert os.path.exists(disk.mountpoint), disk |
| |
734 else: |
| |
735 assert os.path.isdir(disk.mountpoint), disk |
| |
736 self.assertIsInstance(disk.fstype, str) |
| |
737 self.assertIsInstance(disk.opts, str) |
| |
738 |
| |
739 def find_mount_point(path): |
| |
740 path = os.path.abspath(path) |
| |
741 while not os.path.ismount(path): |
| |
742 path = os.path.dirname(path) |
| |
743 return path |
| |
744 |
| |
745 mount = find_mount_point(__file__) |
| |
746 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
| |
747 self.assertIn(mount, mounts) |
| |
748 psutil.disk_usage(mount) |
| |
749 |
| |
750 def test_net_io_counters(self): |
| |
751 def check_ntuple(nt): |
| |
752 self.assertEqual(nt[0], nt.bytes_sent) |
| |
753 self.assertEqual(nt[1], nt.bytes_recv) |
| |
754 self.assertEqual(nt[2], nt.packets_sent) |
| |
755 self.assertEqual(nt[3], nt.packets_recv) |
| |
756 self.assertEqual(nt[4], nt.errin) |
| |
757 self.assertEqual(nt[5], nt.errout) |
| |
758 self.assertEqual(nt[6], nt.dropin) |
| |
759 self.assertEqual(nt[7], nt.dropout) |
| |
760 assert nt.bytes_sent >= 0, nt |
| |
761 assert nt.bytes_recv >= 0, nt |
| |
762 assert nt.packets_sent >= 0, nt |
| |
763 assert nt.packets_recv >= 0, nt |
| |
764 assert nt.errin >= 0, nt |
| |
765 assert nt.errout >= 0, nt |
| |
766 assert nt.dropin >= 0, nt |
| |
767 assert nt.dropout >= 0, nt |
| |
768 |
| |
769 ret = psutil.net_io_counters(pernic=False) |
| |
770 check_ntuple(ret) |
| |
771 ret = psutil.net_io_counters(pernic=True) |
| |
772 assert ret != [] |
| |
773 for key in ret: |
| |
774 assert key |
| |
775 check_ntuple(ret[key]) |
| |
776 |
| |
777 def test_disk_io_counters(self): |
| |
778 def check_ntuple(nt): |
| |
779 self.assertEqual(nt[0], nt.read_count) |
| |
780 self.assertEqual(nt[1], nt.write_count) |
| |
781 self.assertEqual(nt[2], nt.read_bytes) |
| |
782 self.assertEqual(nt[3], nt.write_bytes) |
| |
783 self.assertEqual(nt[4], nt.read_time) |
| |
784 self.assertEqual(nt[5], nt.write_time) |
| |
785 assert nt.read_count >= 0, nt |
| |
786 assert nt.write_count >= 0, nt |
| |
787 assert nt.read_bytes >= 0, nt |
| |
788 assert nt.write_bytes >= 0, nt |
| |
789 assert nt.read_time >= 0, nt |
| |
790 assert nt.write_time >= 0, nt |
| |
791 |
| |
792 ret = psutil.disk_io_counters(perdisk=False) |
| |
793 check_ntuple(ret) |
| |
794 ret = psutil.disk_io_counters(perdisk=True) |
| |
795 # make sure there are no duplicates |
| |
796 self.assertEqual(len(ret), len(set(ret))) |
| |
797 for key in ret: |
| |
798 assert key, key |
| |
799 check_ntuple(ret[key]) |
| |
800 if LINUX and key[-1].isdigit(): |
| |
801 # if 'sda1' is listed 'sda' shouldn't, see: |
| |
802 # http://code.google.com/p/psutil/issues/detail?id=338 |
| |
803 while key[-1].isdigit(): |
| |
804 key = key[:-1] |
| |
805 self.assertNotIn(key, ret.keys()) |
| |
806 |
| |
807 def test_get_users(self): |
| |
808 users = psutil.get_users() |
| |
809 assert users |
| |
810 for user in users: |
| |
811 assert user.name, user |
| |
812 user.terminal |
| |
813 user.host |
| |
814 assert user.started > 0.0, user |
| |
815 datetime.datetime.fromtimestamp(user.started) |
| |
816 |
| |
817 |
| |
818 # =================================================================== |
| |
819 # --- psutil.Process class tests |
| |
820 # =================================================================== |
| |
821 |
| |
822 class TestProcess(unittest.TestCase): |
| |
823 """Tests for psutil.Process class.""" |
| |
824 |
| |
825 def setUp(self): |
| |
826 safe_remove(TESTFN) |
| |
827 |
| |
828 def tearDown(self): |
| |
829 reap_children() |
| |
830 |
| |
831 def test_kill(self): |
| |
832 sproc = get_test_subprocess(wait=True) |
| |
833 test_pid = sproc.pid |
| |
834 p = psutil.Process(test_pid) |
| |
835 name = p.name |
| |
836 p.kill() |
| |
837 p.wait() |
| |
838 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| |
839 |
| |
840 def test_terminate(self): |
| |
841 sproc = get_test_subprocess(wait=True) |
| |
842 test_pid = sproc.pid |
| |
843 p = psutil.Process(test_pid) |
| |
844 name = p.name |
| |
845 p.terminate() |
| |
846 p.wait() |
| |
847 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| |
848 |
| |
849 def test_send_signal(self): |
| |
850 if POSIX: |
| |
851 sig = signal.SIGKILL |
| |
852 else: |
| |
853 sig = signal.SIGTERM |
| |
854 sproc = get_test_subprocess() |
| |
855 test_pid = sproc.pid |
| |
856 p = psutil.Process(test_pid) |
| |
857 name = p.name |
| |
858 p.send_signal(sig) |
| |
859 p.wait() |
| |
860 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| |
861 |
| |
862 def test_wait(self): |
| |
863 # check exit code signal |
| |
864 sproc = get_test_subprocess() |
| |
865 p = psutil.Process(sproc.pid) |
| |
866 p.kill() |
| |
867 code = p.wait() |
| |
868 if os.name == 'posix': |
| |
869 self.assertEqual(code, signal.SIGKILL) |
| |
870 else: |
| |
871 self.assertEqual(code, 0) |
| |
872 self.assertFalse(p.is_running()) |
| |
873 |
| |
874 sproc = get_test_subprocess() |
| |
875 p = psutil.Process(sproc.pid) |
| |
876 p.terminate() |
| |
877 code = p.wait() |
| |
878 if os.name == 'posix': |
| |
879 self.assertEqual(code, signal.SIGTERM) |
| |
880 else: |
| |
881 self.assertEqual(code, 0) |
| |
882 self.assertFalse(p.is_running()) |
| |
883 |
| |
884 # check sys.exit() code |
| |
885 code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
| |
886 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| |
887 p = psutil.Process(sproc.pid) |
| |
888 self.assertEqual(p.wait(), 5) |
| |
889 self.assertFalse(p.is_running()) |
| |
890 |
| |
891 # Test wait() issued twice. |
| |
892 # It is not supposed to raise NSP when the process is gone. |
| |
893 # On UNIX this should return None, on Windows it should keep |
| |
894 # returning the exit code. |
| |
895 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| |
896 p = psutil.Process(sproc.pid) |
| |
897 self.assertEqual(p.wait(), 5) |
| |
898 self.assertIn(p.wait(), (5, None)) |
| |
899 |
| |
900 # test timeout |
| |
901 sproc = get_test_subprocess() |
| |
902 p = psutil.Process(sproc.pid) |
| |
903 p.name |
| |
904 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
| |
905 |
| |
906 # timeout < 0 not allowed |
| |
907 self.assertRaises(ValueError, p.wait, -1) |
| |
908 |
| |
909 @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows? |
| |
910 def test_wait_non_children(self): |
| |
911 # test wait() against processes which are not our children |
| |
912 code = "import sys;" |
| |
913 code += "from subprocess import Popen, PIPE;" |
| |
914 code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON |
| |
915 code += "sp = Popen(cmd, stdout=PIPE);" |
| |
916 code += "sys.stdout.write(str(sp.pid));" |
| |
917 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE) |
| |
918 |
| |
919 grandson_pid = int(sproc.stdout.read()) |
| |
920 grandson_proc = psutil.Process(grandson_pid) |
| |
921 try: |
| |
922 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
| |
923 grandson_proc.kill() |
| |
924 ret = grandson_proc.wait() |
| |
925 self.assertEqual(ret, None) |
| |
926 finally: |
| |
927 if grandson_proc.is_running(): |
| |
928 grandson_proc.kill() |
| |
929 grandson_proc.wait() |
| |
930 |
| |
931 def test_wait_timeout_0(self): |
| |
932 sproc = get_test_subprocess() |
| |
933 p = psutil.Process(sproc.pid) |
| |
934 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
| |
935 p.kill() |
| |
936 stop_at = time.time() + 2 |
| |
937 while 1: |
| |
938 try: |
| |
939 code = p.wait(0) |
| |
940 except psutil.TimeoutExpired: |
| |
941 if time.time() >= stop_at: |
| |
942 raise |
| |
943 else: |
| |
944 break |
| |
945 if os.name == 'posix': |
| |
946 self.assertEqual(code, signal.SIGKILL) |
| |
947 else: |
| |
948 self.assertEqual(code, 0) |
| |
949 self.assertFalse(p.is_running()) |
| |
950 |
| |
951 def test_cpu_percent(self): |
| |
952 p = psutil.Process(os.getpid()) |
| |
953 p.get_cpu_percent(interval=0.001) |
| |
954 p.get_cpu_percent(interval=0.001) |
| |
955 for x in range(100): |
| |
956 percent = p.get_cpu_percent(interval=None) |
| |
957 self.assertIsInstance(percent, float) |
| |
958 self.assertGreaterEqual(percent, 0.0) |
| |
959 if os.name != 'posix': |
| |
960 self.assertLessEqual(percent, 100.0) |
| |
961 else: |
| |
962 self.assertGreaterEqual(percent, 0.0) |
| |
963 |
| |
964 def test_cpu_times(self): |
| |
965 times = psutil.Process(os.getpid()).get_cpu_times() |
| |
966 assert (times.user > 0.0) or (times.system > 0.0), times |
| |
967 # make sure returned values can be pretty printed with strftime |
| |
968 time.strftime("%H:%M:%S", time.localtime(times.user)) |
| |
969 time.strftime("%H:%M:%S", time.localtime(times.system)) |
| |
970 |
| |
971 # Test Process.cpu_times() against os.times() |
| |
972 # os.times() is broken on Python 2.6 |
| |
973 # http://bugs.python.org/issue1040026 |
| |
974 # XXX fails on OSX: not sure if it's for os.times(). We should |
| |
975 # try this with Python 2.7 and re-enable the test. |
| |
976 |
| |
977 @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, |
| |
978 'os.times() is not reliable on this Python version') |
| |
979 def test_cpu_times2(self): |
| |
980 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
| |
981 utime, ktime = os.times()[:2] |
| |
982 |
| |
983 # Use os.times()[:2] as base values to compare our results |
| |
984 # using a tolerance of +/- 0.1 seconds. |
| |
985 # It will fail if the difference between the values is > 0.1s. |
| |
986 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
| |
987 self.fail("expected: %s, found: %s" %(utime, user_time)) |
| |
988 |
| |
989 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
| |
990 self.fail("expected: %s, found: %s" %(ktime, kernel_time)) |
| |
991 |
| |
992 def test_create_time(self): |
| |
993 sproc = get_test_subprocess(wait=True) |
| |
994 now = time.time() |
| |
995 p = psutil.Process(sproc.pid) |
| |
996 create_time = p.create_time |
| |
997 |
| |
998 # Use time.time() as base value to compare our result using a |
| |
999 # tolerance of +/- 1 second. |
| |
1000 # It will fail if the difference between the values is > 2s. |
| |
1001 difference = abs(create_time - now) |
| |
1002 if difference > 2: |
| |
1003 self.fail("expected: %s, found: %s, difference: %s" |
| |
1004 % (now, create_time, difference)) |
| |
1005 |
| |
1006 # make sure returned value can be pretty printed with strftime |
| |
1007 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
| |
1008 |
| |
1009 @unittest.skipIf(WINDOWS, 'windows only') |
| |
1010 def test_terminal(self): |
| |
1011 terminal = psutil.Process(os.getpid()).terminal |
| |
1012 if sys.stdin.isatty(): |
| |
1013 self.assertEqual(terminal, sh('tty')) |
| |
1014 else: |
| |
1015 assert terminal, repr(terminal) |
| |
1016 |
| |
1017 @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'), |
| |
1018 'not available on this platform') |
| |
1019 @skip_on_not_implemented(only_if=LINUX) |
| |
1020 def test_get_io_counters(self): |
| |
1021 p = psutil.Process(os.getpid()) |
| |
1022 # test reads |
| |
1023 io1 = p.get_io_counters() |
| |
1024 f = open(PYTHON, 'rb') |
| |
1025 f.read() |
| |
1026 f.close() |
| |
1027 io2 = p.get_io_counters() |
| |
1028 if not BSD: |
| |
1029 assert io2.read_count > io1.read_count, (io1, io2) |
| |
1030 self.assertEqual(io2.write_count, io1.write_count) |
| |
1031 assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
| |
1032 assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
| |
1033 # test writes |
| |
1034 io1 = p.get_io_counters() |
| |
1035 f = tempfile.TemporaryFile() |
| |
1036 if PY3: |
| |
1037 f.write(bytes("x" * 1000000, 'ascii')) |
| |
1038 else: |
| |
1039 f.write("x" * 1000000) |
| |
1040 f.close() |
| |
1041 io2 = p.get_io_counters() |
| |
1042 assert io2.write_count >= io1.write_count, (io1, io2) |
| |
1043 assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
| |
1044 assert io2.read_count >= io1.read_count, (io1, io2) |
| |
1045 assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
| |
1046 |
| |
1047 # Linux and Windows Vista+ |
| |
1048 @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'), |
| |
1049 'Linux and Windows Vista only') |
| |
1050 def test_get_set_ionice(self): |
| |
1051 if LINUX: |
| |
1052 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, |
| |
1053 IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) |
| |
1054 self.assertEqual(IOPRIO_CLASS_NONE, 0) |
| |
1055 self.assertEqual(IOPRIO_CLASS_RT, 1) |
| |
1056 self.assertEqual(IOPRIO_CLASS_BE, 2) |
| |
1057 self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
| |
1058 p = psutil.Process(os.getpid()) |
| |
1059 try: |
| |
1060 p.set_ionice(2) |
| |
1061 ioclass, value = p.get_ionice() |
| |
1062 self.assertEqual(ioclass, 2) |
| |
1063 self.assertEqual(value, 4) |
| |
1064 # |
| |
1065 p.set_ionice(3) |
| |
1066 ioclass, value = p.get_ionice() |
| |
1067 self.assertEqual(ioclass, 3) |
| |
1068 self.assertEqual(value, 0) |
| |
1069 # |
| |
1070 p.set_ionice(2, 0) |
| |
1071 ioclass, value = p.get_ionice() |
| |
1072 self.assertEqual(ioclass, 2) |
| |
1073 self.assertEqual(value, 0) |
| |
1074 p.set_ionice(2, 7) |
| |
1075 ioclass, value = p.get_ionice() |
| |
1076 self.assertEqual(ioclass, 2) |
| |
1077 self.assertEqual(value, 7) |
| |
1078 self.assertRaises(ValueError, p.set_ionice, 2, 10) |
| |
1079 finally: |
| |
1080 p.set_ionice(IOPRIO_CLASS_NONE) |
| |
1081 else: |
| |
1082 p = psutil.Process(os.getpid()) |
| |
1083 original = p.get_ionice() |
| |
1084 try: |
| |
1085 value = 0 # very low |
| |
1086 if original == value: |
| |
1087 value = 1 # low |
| |
1088 p.set_ionice(value) |
| |
1089 self.assertEqual(p.get_ionice(), value) |
| |
1090 finally: |
| |
1091 p.set_ionice(original) |
| |
1092 # |
| |
1093 self.assertRaises(ValueError, p.set_ionice, 3) |
| |
1094 self.assertRaises(TypeError, p.set_ionice, 2, 1) |
| |
1095 |
| |
1096 def test_get_num_threads(self): |
| |
1097 # on certain platforms such as Linux we might test for exact |
| |
1098 # thread number, since we always have with 1 thread per process, |
| |
1099 # but this does not apply across all platforms (OSX, Windows) |
| |
1100 p = psutil.Process(os.getpid()) |
| |
1101 step1 = p.get_num_threads() |
| |
1102 |
| |
1103 thread = ThreadTask() |
| |
1104 thread.start() |
| |
1105 try: |
| |
1106 step2 = p.get_num_threads() |
| |
1107 self.assertEqual(step2, step1 + 1) |
| |
1108 thread.stop() |
| |
1109 finally: |
| |
1110 if thread._running: |
| |
1111 thread.stop() |
| |
1112 |
| |
1113 @unittest.skipUnless(WINDOWS, 'Windows only') |
| |
1114 def test_get_num_handles(self): |
| |
1115 # a better test is done later into test/_windows.py |
| |
1116 p = psutil.Process(os.getpid()) |
| |
1117 self.assertGreater(p.get_num_handles(), 0) |
| |
1118 |
| |
1119 def test_get_threads(self): |
| |
1120 p = psutil.Process(os.getpid()) |
| |
1121 step1 = p.get_threads() |
| |
1122 |
| |
1123 thread = ThreadTask() |
| |
1124 thread.start() |
| |
1125 |
| |
1126 try: |
| |
1127 step2 = p.get_threads() |
| |
1128 self.assertEqual(len(step2), len(step1) + 1) |
| |
1129 # on Linux, first thread id is supposed to be this process |
| |
1130 if LINUX: |
| |
1131 self.assertEqual(step2[0].id, os.getpid()) |
| |
1132 athread = step2[0] |
| |
1133 # test named tuple |
| |
1134 self.assertEqual(athread.id, athread[0]) |
| |
1135 self.assertEqual(athread.user_time, athread[1]) |
| |
1136 self.assertEqual(athread.system_time, athread[2]) |
| |
1137 # test num threads |
| |
1138 thread.stop() |
| |
1139 finally: |
| |
1140 if thread._running: |
| |
1141 thread.stop() |
| |
1142 |
| |
1143 def test_get_memory_info(self): |
| |
1144 p = psutil.Process(os.getpid()) |
| |
1145 |
| |
1146 # step 1 - get a base value to compare our results |
| |
1147 rss1, vms1 = p.get_memory_info() |
| |
1148 percent1 = p.get_memory_percent() |
| |
1149 self.assertGreater(rss1, 0) |
| |
1150 self.assertGreater(vms1, 0) |
| |
1151 |
| |
1152 # step 2 - allocate some memory |
| |
1153 memarr = [None] * 1500000 |
| |
1154 |
| |
1155 rss2, vms2 = p.get_memory_info() |
| |
1156 percent2 = p.get_memory_percent() |
| |
1157 # make sure that the memory usage bumped up |
| |
1158 self.assertGreater(rss2, rss1) |
| |
1159 self.assertGreaterEqual(vms2, vms1) # vms might be equal |
| |
1160 self.assertGreater(percent2, percent1) |
| |
1161 del memarr |
| |
1162 |
| |
1163 # def test_get_ext_memory_info(self): |
| |
1164 # # tested later in fetch all test suite |
| |
1165 |
| |
1166 def test_get_memory_maps(self): |
| |
1167 p = psutil.Process(os.getpid()) |
| |
1168 maps = p.get_memory_maps() |
| |
1169 paths = [x for x in maps] |
| |
1170 self.assertEqual(len(paths), len(set(paths))) |
| |
1171 ext_maps = p.get_memory_maps(grouped=False) |
| |
1172 |
| |
1173 for nt in maps: |
| |
1174 if not nt.path.startswith('['): |
| |
1175 assert os.path.isabs(nt.path), nt.path |
| |
1176 if POSIX: |
| |
1177 assert os.path.exists(nt.path), nt.path |
| |
1178 else: |
| |
1179 # XXX - On Windows we have this strange behavior with |
| |
1180 # 64 bit dlls: they are visible via explorer but cannot |
| |
1181 # be accessed via os.stat() (wtf?). |
| |
1182 if '64' not in os.path.basename(nt.path): |
| |
1183 assert os.path.exists(nt.path), nt.path |
| |
1184 for nt in ext_maps: |
| |
1185 for fname in nt._fields: |
| |
1186 value = getattr(nt, fname) |
| |
1187 if fname == 'path': |
| |
1188 continue |
| |
1189 elif fname in ('addr', 'perms'): |
| |
1190 assert value, value |
| |
1191 else: |
| |
1192 self.assertIsInstance(value, (int, long)) |
| |
1193 assert value >= 0, value |
| |
1194 |
| |
1195 def test_get_memory_percent(self): |
| |
1196 p = psutil.Process(os.getpid()) |
| |
1197 self.assertGreater(p.get_memory_percent(), 0.0) |
| |
1198 |
| |
1199 def test_pid(self): |
| |
1200 sproc = get_test_subprocess() |
| |
1201 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
| |
1202 |
| |
1203 def test_is_running(self): |
| |
1204 sproc = get_test_subprocess(wait=True) |
| |
1205 p = psutil.Process(sproc.pid) |
| |
1206 assert p.is_running() |
| |
1207 assert p.is_running() |
| |
1208 p.kill() |
| |
1209 p.wait() |
| |
1210 assert not p.is_running() |
| |
1211 assert not p.is_running() |
| |
1212 |
| |
1213 def test_exe(self): |
| |
1214 sproc = get_test_subprocess(wait=True) |
| |
1215 exe = psutil.Process(sproc.pid).exe |
| |
1216 try: |
| |
1217 self.assertEqual(exe, PYTHON) |
| |
1218 except AssertionError: |
| |
1219 if WINDOWS and len(exe) == len(PYTHON): |
| |
1220 # on Windows we don't care about case sensitivity |
| |
1221 self.assertEqual(exe.lower(), PYTHON.lower()) |
| |
1222 else: |
| |
1223 # certain platforms such as BSD are more accurate returning: |
| |
1224 # "/usr/local/bin/python2.7" |
| |
1225 # ...instead of: |
| |
1226 # "/usr/local/bin/python" |
| |
1227 # We do not want to consider this difference in accuracy |
| |
1228 # an error. |
| |
1229 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) |
| |
1230 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) |
| |
1231 |
| |
1232 def test_cmdline(self): |
| |
1233 cmdline = [PYTHON, "-c", "import time; time.sleep(2)"] |
| |
1234 sproc = get_test_subprocess(cmdline, wait=True) |
| |
1235 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline), |
| |
1236 ' '.join(cmdline)) |
| |
1237 |
| |
1238 def test_name(self): |
| |
1239 sproc = get_test_subprocess(PYTHON, wait=True) |
| |
1240 name = psutil.Process(sproc.pid).name.lower() |
| |
1241 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() |
| |
1242 assert pyexe.startswith(name), (pyexe, name) |
| |
1243 |
| |
1244 @unittest.skipUnless(POSIX, 'posix only') |
| |
1245 def test_uids(self): |
| |
1246 p = psutil.Process(os.getpid()) |
| |
1247 real, effective, saved = p.uids |
| |
1248 # os.getuid() refers to "real" uid |
| |
1249 self.assertEqual(real, os.getuid()) |
| |
1250 # os.geteuid() refers to "effective" uid |
| |
1251 self.assertEqual(effective, os.geteuid()) |
| |
1252 # no such thing as os.getsuid() ("saved" uid), but starting |
| |
1253 # from python 2.7 we have os.getresuid()[2] |
| |
1254 if hasattr(os, "getresuid"): |
| |
1255 self.assertEqual(saved, os.getresuid()[2]) |
| |
1256 |
| |
1257 @unittest.skipUnless(POSIX, 'posix only') |
| |
1258 def test_gids(self): |
| |
1259 p = psutil.Process(os.getpid()) |
| |
1260 real, effective, saved = p.gids |
| |
1261 # os.getuid() refers to "real" uid |
| |
1262 self.assertEqual(real, os.getgid()) |
| |
1263 # os.geteuid() refers to "effective" uid |
| |
1264 self.assertEqual(effective, os.getegid()) |
| |
1265 # no such thing as os.getsuid() ("saved" uid), but starting |
| |
1266 # from python 2.7 we have os.getresgid()[2] |
| |
1267 if hasattr(os, "getresuid"): |
| |
1268 self.assertEqual(saved, os.getresgid()[2]) |
| |
1269 |
| |
1270 def test_nice(self): |
| |
1271 p = psutil.Process(os.getpid()) |
| |
1272 self.assertRaises(TypeError, p.set_nice, "str") |
| |
1273 if os.name == 'nt': |
| |
1274 try: |
| |
1275 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
| |
1276 p.set_nice(psutil.HIGH_PRIORITY_CLASS) |
| |
1277 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS) |
| |
1278 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
| |
1279 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
| |
1280 finally: |
| |
1281 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
| |
1282 else: |
| |
1283 try: |
| |
1284 try: |
| |
1285 first_nice = p.get_nice() |
| |
1286 p.set_nice(1) |
| |
1287 self.assertEqual(p.get_nice(), 1) |
| |
1288 # going back to previous nice value raises AccessDenied on OSX |
| |
1289 if not OSX: |
| |
1290 p.set_nice(0) |
| |
1291 self.assertEqual(p.get_nice(), 0) |
| |
1292 except psutil.AccessDenied: |
| |
1293 pass |
| |
1294 finally: |
| |
1295 try: |
| |
1296 p.set_nice(first_nice) |
| |
1297 except psutil.AccessDenied: |
| |
1298 pass |
| |
1299 |
| |
1300 def test_status(self): |
| |
1301 p = psutil.Process(os.getpid()) |
| |
1302 self.assertEqual(p.status, psutil.STATUS_RUNNING) |
| |
1303 self.assertEqual(str(p.status), "running") |
| |
1304 |
| |
1305 def test_status_constants(self): |
| |
1306 # STATUS_* constants are supposed to be comparable also by |
| |
1307 # using their str representation |
| |
1308 self.assertTrue(psutil.STATUS_RUNNING == 0) |
| |
1309 self.assertTrue(psutil.STATUS_RUNNING == long(0)) |
| |
1310 self.assertTrue(psutil.STATUS_RUNNING == 'running') |
| |
1311 self.assertFalse(psutil.STATUS_RUNNING == 1) |
| |
1312 self.assertFalse(psutil.STATUS_RUNNING == 'sleeping') |
| |
1313 self.assertFalse(psutil.STATUS_RUNNING != 0) |
| |
1314 self.assertFalse(psutil.STATUS_RUNNING != 'running') |
| |
1315 self.assertTrue(psutil.STATUS_RUNNING != 1) |
| |
1316 self.assertTrue(psutil.STATUS_RUNNING != 'sleeping') |
| |
1317 |
| |
1318 def test_username(self): |
| |
1319 sproc = get_test_subprocess() |
| |
1320 p = psutil.Process(sproc.pid) |
| |
1321 if POSIX: |
| |
1322 import pwd |
| |
1323 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
| |
1324 elif WINDOWS and 'USERNAME' in os.environ: |
| |
1325 expected_username = os.environ['USERNAME'] |
| |
1326 expected_domain = os.environ['USERDOMAIN'] |
| |
1327 domain, username = p.username.split('\\') |
| |
1328 self.assertEqual(domain, expected_domain) |
| |
1329 self.assertEqual(username, expected_username) |
| |
1330 else: |
| |
1331 p.username |
| |
1332 |
| |
1333 @unittest.skipUnless(hasattr(psutil.Process, "getcwd"), |
| |
1334 'not available on this platform') |
| |
1335 def test_getcwd(self): |
| |
1336 sproc = get_test_subprocess(wait=True) |
| |
1337 p = psutil.Process(sproc.pid) |
| |
1338 self.assertEqual(p.getcwd(), os.getcwd()) |
| |
1339 |
| |
1340 @unittest.skipIf(not hasattr(psutil.Process, "getcwd"), |
| |
1341 'not available on this platform') |
| |
1342 def test_getcwd_2(self): |
| |
1343 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"] |
| |
1344 sproc = get_test_subprocess(cmd, wait=True) |
| |
1345 p = psutil.Process(sproc.pid) |
| |
1346 call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1) |
| |
1347 |
| |
1348 @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"), |
| |
1349 'not available on this platform') |
| |
1350 def test_cpu_affinity(self): |
| |
1351 p = psutil.Process(os.getpid()) |
| |
1352 initial = p.get_cpu_affinity() |
| |
1353 all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) |
| |
1354 # |
| |
1355 for n in all_cpus: |
| |
1356 p.set_cpu_affinity([n]) |
| |
1357 self.assertEqual(p.get_cpu_affinity(), [n]) |
| |
1358 # |
| |
1359 p.set_cpu_affinity(all_cpus) |
| |
1360 self.assertEqual(p.get_cpu_affinity(), all_cpus) |
| |
1361 # |
| |
1362 p.set_cpu_affinity(initial) |
| |
1363 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] |
| |
1364 self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu) |
| |
1365 |
| |
1366 def test_get_open_files(self): |
| |
1367 # current process |
| |
1368 p = psutil.Process(os.getpid()) |
| |
1369 files = p.get_open_files() |
| |
1370 self.assertFalse(TESTFN in files) |
| |
1371 f = open(TESTFN, 'w') |
| |
1372 call_until(p.get_open_files, "len(ret) != %i" % len(files)) |
| |
1373 filenames = [x.path for x in p.get_open_files()] |
| |
1374 self.assertIn(TESTFN, filenames) |
| |
1375 f.close() |
| |
1376 for file in filenames: |
| |
1377 assert os.path.isfile(file), file |
| |
1378 |
| |
1379 # another process |
| |
1380 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN |
| |
1381 sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) |
| |
1382 p = psutil.Process(sproc.pid) |
| |
1383 for x in range(100): |
| |
1384 filenames = [x.path for x in p.get_open_files()] |
| |
1385 if TESTFN in filenames: |
| |
1386 break |
| |
1387 time.sleep(.01) |
| |
1388 else: |
| |
1389 self.assertIn(TESTFN, filenames) |
| |
1390 for file in filenames: |
| |
1391 assert os.path.isfile(file), file |
| |
1392 |
| |
1393 def test_get_open_files2(self): |
| |
1394 # test fd and path fields |
| |
1395 fileobj = open(TESTFN, 'w') |
| |
1396 p = psutil.Process(os.getpid()) |
| |
1397 for path, fd in p.get_open_files(): |
| |
1398 if path == fileobj.name or fd == fileobj.fileno(): |
| |
1399 break |
| |
1400 else: |
| |
1401 self.fail("no file found; files=%s" % repr(p.get_open_files())) |
| |
1402 self.assertEqual(path, fileobj.name) |
| |
1403 if WINDOWS: |
| |
1404 self.assertEqual(fd, -1) |
| |
1405 else: |
| |
1406 self.assertEqual(fd, fileobj.fileno()) |
| |
1407 # test positions |
| |
1408 ntuple = p.get_open_files()[0] |
| |
1409 self.assertEqual(ntuple[0], ntuple.path) |
| |
1410 self.assertEqual(ntuple[1], ntuple.fd) |
| |
1411 # test file is gone |
| |
1412 fileobj.close() |
| |
1413 self.assertTrue(fileobj.name not in p.get_open_files()) |
| |
1414 |
| |
1415 def test_connection_constants(self): |
| |
1416 ints = [] |
| |
1417 strs = [] |
| |
1418 for name in dir(psutil): |
| |
1419 if name.startswith('CONN_'): |
| |
1420 num = getattr(psutil, name) |
| |
1421 str_ = str(num) |
| |
1422 assert str_.isupper(), str_ |
| |
1423 assert str_ not in strs, str_ |
| |
1424 assert num not in ints, num |
| |
1425 ints.append(num) |
| |
1426 strs.append(str_) |
| |
1427 if SUNOS: |
| |
1428 psutil.CONN_IDLE |
| |
1429 psutil.CONN_BOUND |
| |
1430 if WINDOWS: |
| |
1431 psutil.CONN_DELETE_TCB |
| |
1432 |
| |
1433 def test_get_connections(self): |
| |
1434 arg = "import socket, time;" \ |
| |
1435 "s = socket.socket();" \ |
| |
1436 "s.bind(('127.0.0.1', 0));" \ |
| |
1437 "s.listen(1);" \ |
| |
1438 "conn, addr = s.accept();" \ |
| |
1439 "time.sleep(2);" |
| |
1440 sproc = get_test_subprocess([PYTHON, "-c", arg]) |
| |
1441 p = psutil.Process(sproc.pid) |
| |
1442 for x in range(100): |
| |
1443 if p.get_connections(): |
| |
1444 # give the subprocess some more time to bind() |
| |
1445 time.sleep(.01) |
| |
1446 cons = p.get_connections() |
| |
1447 break |
| |
1448 time.sleep(.01) |
| |
1449 self.assertEqual(len(cons), 1) |
| |
1450 con = cons[0] |
| |
1451 self.assertEqual(con.family, socket.AF_INET) |
| |
1452 self.assertEqual(con.type, socket.SOCK_STREAM) |
| |
1453 self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status)) |
| |
1454 ip, port = con.laddr |
| |
1455 self.assertEqual(ip, '127.0.0.1') |
| |
1456 self.assertEqual(con.raddr, ()) |
| |
1457 if WINDOWS or SUNOS: |
| |
1458 self.assertEqual(con.fd, -1) |
| |
1459 else: |
| |
1460 assert con.fd > 0, con |
| |
1461 # test positions |
| |
1462 self.assertEqual(con[0], con.fd) |
| |
1463 self.assertEqual(con[1], con.family) |
| |
1464 self.assertEqual(con[2], con.type) |
| |
1465 self.assertEqual(con[3], con.laddr) |
| |
1466 self.assertEqual(con[4], con.raddr) |
| |
1467 self.assertEqual(con[5], con.status) |
| |
1468 # test kind arg |
| |
1469 self.assertRaises(ValueError, p.get_connections, 'foo') |
| |
1470 |
| |
1471 @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported') |
| |
1472 def test_get_connections_ipv6(self): |
| |
1473 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| |
1474 s.bind(('::1', 0)) |
| |
1475 s.listen(1) |
| |
1476 cons = psutil.Process(os.getpid()).get_connections() |
| |
1477 s.close() |
| |
1478 self.assertEqual(len(cons), 1) |
| |
1479 self.assertEqual(cons[0].laddr[0], '::1') |
| |
1480 |
| |
1481 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported') |
| |
1482 def test_get_connections_unix(self): |
| |
1483 # tcp |
| |
1484 safe_remove(TESTFN) |
| |
1485 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| |
1486 sock.bind(TESTFN) |
| |
1487 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
| |
1488 if conn.fd != -1: # != sunos and windows |
| |
1489 self.assertEqual(conn.fd, sock.fileno()) |
| |
1490 self.assertEqual(conn.family, socket.AF_UNIX) |
| |
1491 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| |
1492 self.assertEqual(conn.laddr, TESTFN) |
| |
1493 self.assertTrue(not conn.raddr) |
| |
1494 self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status)) |
| |
1495 sock.close() |
| |
1496 # udp |
| |
1497 safe_remove(TESTFN) |
| |
1498 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
| |
1499 sock.bind(TESTFN) |
| |
1500 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
| |
1501 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| |
1502 sock.close() |
| |
1503 |
| |
1504 @unittest.skipUnless(hasattr(socket, "fromfd"), |
| |
1505 'socket.fromfd() is not availble') |
| |
1506 @unittest.skipIf(WINDOWS or SUNOS, |
| |
1507 'connection fd available on this platform') |
| |
1508 def test_connection_fromfd(self): |
| |
1509 sock = socket.socket() |
| |
1510 sock.bind(('localhost', 0)) |
| |
1511 sock.listen(1) |
| |
1512 p = psutil.Process(os.getpid()) |
| |
1513 for conn in p.get_connections(): |
| |
1514 if conn.fd == sock.fileno(): |
| |
1515 break |
| |
1516 else: |
| |
1517 sock.close() |
| |
1518 self.fail("couldn't find socket fd") |
| |
1519 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| |
1520 try: |
| |
1521 self.assertEqual(dupsock.getsockname(), conn.laddr) |
| |
1522 self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
| |
1523 finally: |
| |
1524 sock.close() |
| |
1525 dupsock.close() |
| |
1526 |
| |
1527 def test_get_connections_all(self): |
| |
1528 tcp_template = "import socket;" \ |
| |
1529 "s = socket.socket($family, socket.SOCK_STREAM);" \ |
| |
1530 "s.bind(('$addr', 0));" \ |
| |
1531 "s.listen(1);" \ |
| |
1532 "conn, addr = s.accept();" |
| |
1533 |
| |
1534 udp_template = "import socket, time;" \ |
| |
1535 "s = socket.socket($family, socket.SOCK_DGRAM);" \ |
| |
1536 "s.bind(('$addr', 0));" \ |
| |
1537 "time.sleep(2);" |
| |
1538 |
| |
1539 from string import Template |
| |
1540 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
| |
1541 addr="127.0.0.1") |
| |
1542 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
| |
1543 addr="127.0.0.1") |
| |
1544 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, |
| |
1545 addr="::1") |
| |
1546 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, |
| |
1547 addr="::1") |
| |
1548 |
| |
1549 # launch various subprocess instantiating a socket of various |
| |
1550 # families and types to enrich psutil results |
| |
1551 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
| |
1552 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
| |
1553 if supports_ipv6(): |
| |
1554 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
| |
1555 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
| |
1556 else: |
| |
1557 tcp6_proc = None |
| |
1558 udp6_proc = None |
| |
1559 |
| |
1560 # check matches against subprocesses just created |
| |
1561 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
| |
1562 "udp", "udp4", "udp6") |
| |
1563 for p in psutil.Process(os.getpid()).get_children(): |
| |
1564 for conn in p.get_connections(): |
| |
1565 # TCP v4 |
| |
1566 if p.pid == tcp4_proc.pid: |
| |
1567 self.assertEqual(conn.family, socket.AF_INET) |
| |
1568 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| |
1569 self.assertEqual(conn.laddr[0], "127.0.0.1") |
| |
1570 self.assertEqual(conn.raddr, ()) |
| |
1571 self.assertEqual(conn.status, psutil.CONN_LISTEN, |
| |
1572 str(conn.status)) |
| |
1573 for kind in all_kinds: |
| |
1574 cons = p.get_connections(kind=kind) |
| |
1575 if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
| |
1576 assert cons != [], cons |
| |
1577 else: |
| |
1578 self.assertEqual(cons, [], cons) |
| |
1579 # UDP v4 |
| |
1580 elif p.pid == udp4_proc.pid: |
| |
1581 self.assertEqual(conn.family, socket.AF_INET) |
| |
1582 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| |
1583 self.assertEqual(conn.laddr[0], "127.0.0.1") |
| |
1584 self.assertEqual(conn.raddr, ()) |
| |
1585 self.assertEqual(conn.status, psutil.CONN_NONE, |
| |
1586 str(conn.status)) |
| |
1587 for kind in all_kinds: |
| |
1588 cons = p.get_connections(kind=kind) |
| |
1589 if kind in ("all", "inet", "inet4", "udp", "udp4"): |
| |
1590 assert cons != [], cons |
| |
1591 else: |
| |
1592 self.assertEqual(cons, [], cons) |
| |
1593 # TCP v6 |
| |
1594 elif p.pid == getattr(tcp6_proc, "pid", None): |
| |
1595 self.assertEqual(conn.family, socket.AF_INET6) |
| |
1596 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| |
1597 self.assertIn(conn.laddr[0], ("::", "::1")) |
| |
1598 self.assertEqual(conn.raddr, ()) |
| |
1599 self.assertEqual(conn.status, psutil.CONN_LISTEN, |
| |
1600 str(conn.status)) |
| |
1601 for kind in all_kinds: |
| |
1602 cons = p.get_connections(kind=kind) |
| |
1603 if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
| |
1604 assert cons != [], cons |
| |
1605 else: |
| |
1606 self.assertEqual(cons, [], cons) |
| |
1607 # UDP v6 |
| |
1608 elif p.pid == getattr(udp6_proc, "pid", None): |
| |
1609 self.assertEqual(conn.family, socket.AF_INET6) |
| |
1610 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| |
1611 self.assertIn(conn.laddr[0], ("::", "::1")) |
| |
1612 self.assertEqual(conn.raddr, ()) |
| |
1613 self.assertEqual(conn.status, psutil.CONN_NONE, |
| |
1614 str(conn.status)) |
| |
1615 for kind in all_kinds: |
| |
1616 cons = p.get_connections(kind=kind) |
| |
1617 if kind in ("all", "inet", "inet6", "udp", "udp6"): |
| |
1618 assert cons != [], cons |
| |
1619 else: |
| |
1620 self.assertEqual(cons, [], cons) |
| |
1621 |
| |
1622 @unittest.skipUnless(POSIX, 'posix only') |
| |
1623 def test_get_num_fds(self): |
| |
1624 p = psutil.Process(os.getpid()) |
| |
1625 start = p.get_num_fds() |
| |
1626 file = open(TESTFN, 'w') |
| |
1627 self.assertEqual(p.get_num_fds(), start + 1) |
| |
1628 sock = socket.socket() |
| |
1629 self.assertEqual(p.get_num_fds(), start + 2) |
| |
1630 file.close() |
| |
1631 sock.close() |
| |
1632 self.assertEqual(p.get_num_fds(), start) |
| |
1633 |
| |
1634 @skip_on_not_implemented(only_if=LINUX) |
| |
1635 def test_get_num_ctx_switches(self): |
| |
1636 p = psutil.Process(os.getpid()) |
| |
1637 before = sum(p.get_num_ctx_switches()) |
| |
1638 for x in range(500000): |
| |
1639 after = sum(p.get_num_ctx_switches()) |
| |
1640 if after > before: |
| |
1641 return |
| |
1642 self.fail("num ctx switches still the same after 50.000 iterations") |
| |
1643 |
| |
1644 def test_parent_ppid(self): |
| |
1645 this_parent = os.getpid() |
| |
1646 sproc = get_test_subprocess() |
| |
1647 p = psutil.Process(sproc.pid) |
| |
1648 self.assertEqual(p.ppid, this_parent) |
| |
1649 self.assertEqual(p.parent.pid, this_parent) |
| |
1650 # no other process is supposed to have us as parent |
| |
1651 for p in psutil.process_iter(): |
| |
1652 if p.pid == sproc.pid: |
| |
1653 continue |
| |
1654 self.assertTrue(p.ppid != this_parent) |
| |
1655 |
| |
1656 def test_get_children(self): |
| |
1657 p = psutil.Process(os.getpid()) |
| |
1658 self.assertEqual(p.get_children(), []) |
| |
1659 self.assertEqual(p.get_children(recursive=True), []) |
| |
1660 sproc = get_test_subprocess() |
| |
1661 children1 = p.get_children() |
| |
1662 children2 = p.get_children(recursive=True) |
| |
1663 for children in (children1, children2): |
| |
1664 self.assertEqual(len(children), 1) |
| |
1665 self.assertEqual(children[0].pid, sproc.pid) |
| |
1666 self.assertEqual(children[0].ppid, os.getpid()) |
| |
1667 |
| |
1668 def test_get_children_recursive(self): |
| |
1669 # here we create a subprocess which creates another one as in: |
| |
1670 # A (parent) -> B (child) -> C (grandchild) |
| |
1671 s = "import subprocess, os, sys, time;" |
| |
1672 s += "PYTHON = os.path.realpath(sys.executable);" |
| |
1673 s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];" |
| |
1674 s += "subprocess.Popen(cmd);" |
| |
1675 s += "time.sleep(2);" |
| |
1676 get_test_subprocess(cmd=[PYTHON, "-c", s]) |
| |
1677 p = psutil.Process(os.getpid()) |
| |
1678 self.assertEqual(len(p.get_children(recursive=False)), 1) |
| |
1679 # give the grandchild some time to start |
| |
1680 stop_at = time.time() + 1.5 |
| |
1681 while time.time() < stop_at: |
| |
1682 children = p.get_children(recursive=True) |
| |
1683 if len(children) > 1: |
| |
1684 break |
| |
1685 self.assertEqual(len(children), 2) |
| |
1686 self.assertEqual(children[0].ppid, os.getpid()) |
| |
1687 self.assertEqual(children[1].ppid, children[0].pid) |
| |
1688 |
| |
1689 def test_get_children_duplicates(self): |
| |
1690 # find the process which has the highest number of children |
| |
1691 from psutil._compat import defaultdict |
| |
1692 table = defaultdict(int) |
| |
1693 for p in psutil.process_iter(): |
| |
1694 try: |
| |
1695 table[p.ppid] += 1 |
| |
1696 except psutil.Error: |
| |
1697 pass |
| |
1698 # this is the one, now let's make sure there are no duplicates |
| |
1699 pid = sorted(table.items(), key=lambda x: x[1])[-1][0] |
| |
1700 p = psutil.Process(pid) |
| |
1701 try: |
| |
1702 c = p.get_children(recursive=True) |
| |
1703 except psutil.AccessDenied: # windows |
| |
1704 pass |
| |
1705 else: |
| |
1706 self.assertEqual(len(c), len(set(c))) |
| |
1707 |
| |
1708 def test_suspend_resume(self): |
| |
1709 sproc = get_test_subprocess(wait=True) |
| |
1710 p = psutil.Process(sproc.pid) |
| |
1711 p.suspend() |
| |
1712 for x in range(100): |
| |
1713 if p.status == psutil.STATUS_STOPPED: |
| |
1714 break |
| |
1715 time.sleep(0.01) |
| |
1716 self.assertEqual(str(p.status), "stopped") |
| |
1717 p.resume() |
| |
1718 assert p.status != psutil.STATUS_STOPPED, p.status |
| |
1719 |
| |
1720 def test_invalid_pid(self): |
| |
1721 self.assertRaises(TypeError, psutil.Process, "1") |
| |
1722 self.assertRaises(TypeError, psutil.Process, None) |
| |
1723 self.assertRaises(ValueError, psutil.Process, -1) |
| |
1724 |
| |
1725 def test_as_dict(self): |
| |
1726 sproc = get_test_subprocess() |
| |
1727 p = psutil.Process(sproc.pid) |
| |
1728 d = p.as_dict() |
| |
1729 try: |
| |
1730 import json |
| |
1731 except ImportError: |
| |
1732 pass |
| |
1733 else: |
| |
1734 # dict is supposed to be hashable |
| |
1735 json.dumps(d) |
| |
1736 # |
| |
1737 d = p.as_dict(attrs=['exe', 'name']) |
| |
1738 self.assertEqual(sorted(d.keys()), ['exe', 'name']) |
| |
1739 # |
| |
1740 p = psutil.Process(min(psutil.get_pid_list())) |
| |
1741 d = p.as_dict(attrs=['get_connections'], ad_value='foo') |
| |
1742 if not isinstance(d['connections'], list): |
| |
1743 self.assertEqual(d['connections'], 'foo') |
| |
1744 |
| |
1745 def test_zombie_process(self): |
| |
1746 # Test that NoSuchProcess exception gets raised in case the |
| |
1747 # process dies after we create the Process object. |
| |
1748 # Example: |
| |
1749 # >>> proc = Process(1234) |
| |
1750 # >>> time.sleep(2) # time-consuming task, process dies in meantime |
| |
1751 # >>> proc.name |
| |
1752 # Refers to Issue #15 |
| |
1753 sproc = get_test_subprocess() |
| |
1754 p = psutil.Process(sproc.pid) |
| |
1755 p.kill() |
| |
1756 p.wait() |
| |
1757 |
| |
1758 for name in dir(p): |
| |
1759 if name.startswith('_')\ |
| |
1760 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', |
| |
1761 'wait', 'set_cpu_affinity', 'create_time', 'set_nice', |
| |
1762 'nice'): |
| |
1763 continue |
| |
1764 try: |
| |
1765 meth = getattr(p, name) |
| |
1766 if callable(meth): |
| |
1767 meth() |
| |
1768 except psutil.NoSuchProcess: |
| |
1769 pass |
| |
1770 except NotImplementedError: |
| |
1771 pass |
| |
1772 else: |
| |
1773 self.fail("NoSuchProcess exception not raised for %r" % name) |
| |
1774 |
| |
1775 # other methods |
| |
1776 try: |
| |
1777 if os.name == 'posix': |
| |
1778 p.set_nice(1) |
| |
1779 else: |
| |
1780 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
| |
1781 except psutil.NoSuchProcess: |
| |
1782 pass |
| |
1783 else: |
| |
1784 self.fail("exception not raised") |
| |
1785 if hasattr(p, 'set_ionice'): |
| |
1786 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) |
| |
1787 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
| |
1788 self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0) |
| |
1789 self.assertFalse(p.is_running()) |
| |
1790 if hasattr(p, "set_cpu_affinity"): |
| |
1791 self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0]) |
| |
1792 |
| |
1793 def test__str__(self): |
| |
1794 sproc = get_test_subprocess() |
| |
1795 p = psutil.Process(sproc.pid) |
| |
1796 self.assertIn(str(sproc.pid), str(p)) |
| |
1797 # python shows up as 'Python' in cmdline on OS X so test fails on OS X |
| |
1798 if not OSX: |
| |
1799 self.assertIn(os.path.basename(PYTHON), str(p)) |
| |
1800 sproc = get_test_subprocess() |
| |
1801 p = psutil.Process(sproc.pid) |
| |
1802 p.kill() |
| |
1803 p.wait() |
| |
1804 self.assertIn(str(sproc.pid), str(p)) |
| |
1805 self.assertIn("terminated", str(p)) |
| |
1806 |
| |
1807 @unittest.skipIf(LINUX, 'PID 0 not available on Linux') |
| |
1808 def test_pid_0(self): |
| |
1809 # Process(0) is supposed to work on all platforms except Linux |
| |
1810 p = psutil.Process(0) |
| |
1811 self.assertTrue(p.name) |
| |
1812 |
| |
1813 if os.name == 'posix': |
| |
1814 try: |
| |
1815 self.assertEqual(p.uids.real, 0) |
| |
1816 self.assertEqual(p.gids.real, 0) |
| |
1817 except psutil.AccessDenied: |
| |
1818 pass |
| |
1819 |
| |
1820 self.assertIn(p.ppid, (0, 1)) |
| |
1821 #self.assertEqual(p.exe, "") |
| |
1822 p.cmdline |
| |
1823 try: |
| |
1824 p.get_num_threads() |
| |
1825 except psutil.AccessDenied: |
| |
1826 pass |
| |
1827 |
| |
1828 try: |
| |
1829 p.get_memory_info() |
| |
1830 except psutil.AccessDenied: |
| |
1831 pass |
| |
1832 |
| |
1833 # username property |
| |
1834 try: |
| |
1835 if POSIX: |
| |
1836 self.assertEqual(p.username, 'root') |
| |
1837 elif WINDOWS: |
| |
1838 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
| |
1839 else: |
| |
1840 p.username |
| |
1841 except psutil.AccessDenied: |
| |
1842 pass |
| |
1843 |
| |
1844 self.assertIn(0, psutil.get_pid_list()) |
| |
1845 self.assertTrue(psutil.pid_exists(0)) |
| |
1846 |
| |
1847 def test__all__(self): |
| |
1848 for name in dir(psutil): |
| |
1849 if name in ('callable', 'defaultdict', 'error', 'namedtuple', |
| |
1850 'test'): |
| |
1851 continue |
| |
1852 if not name.startswith('_'): |
| |
1853 try: |
| |
1854 __import__(name) |
| |
1855 except ImportError: |
| |
1856 if name not in psutil.__all__: |
| |
1857 fun = getattr(psutil, name) |
| |
1858 if fun is None: |
| |
1859 continue |
| |
1860 if 'deprecated' not in fun.__doc__.lower(): |
| |
1861 self.fail('%r not in psutil.__all__' % name) |
| |
1862 |
| |
1863 def test_Popen(self): |
| |
1864 # Popen class test |
| |
1865 # XXX this test causes a ResourceWarning on Python 3 because |
| |
1866 # psutil.__subproc instance doesn't get propertly freed. |
| |
1867 # Not sure what to do though. |
| |
1868 cmd = [PYTHON, "-c", "import time; time.sleep(2);"] |
| |
1869 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
1870 try: |
| |
1871 proc.name |
| |
1872 proc.stdin |
| |
1873 self.assertTrue(hasattr(proc, 'name')) |
| |
1874 self.assertTrue(hasattr(proc, 'stdin')) |
| |
1875 self.assertRaises(AttributeError, getattr, proc, 'foo') |
| |
1876 finally: |
| |
1877 proc.kill() |
| |
1878 proc.wait() |
| |
1879 |
| |
1880 |
| |
1881 # =================================================================== |
| |
1882 # --- Featch all processes test |
| |
1883 # =================================================================== |
| |
1884 |
| |
1885 class TestFetchAllProcesses(unittest.TestCase): |
| |
1886 # Iterates over all running processes and performs some sanity |
| |
1887 # checks against Process API's returned values. |
| |
1888 |
| |
1889 def setUp(self): |
| |
1890 if POSIX: |
| |
1891 import pwd |
| |
1892 pall = pwd.getpwall() |
| |
1893 self._uids = set([x.pw_uid for x in pall]) |
| |
1894 self._usernames = set([x.pw_name for x in pall]) |
| |
1895 |
| |
1896 def test_fetch_all(self): |
| |
1897 valid_procs = 0 |
| |
1898 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
| |
1899 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice', |
| |
1900 'parent', 'get_children', 'pid'] |
| |
1901 attrs = [] |
| |
1902 for name in dir(psutil.Process): |
| |
1903 if name.startswith("_"): |
| |
1904 continue |
| |
1905 if name.startswith("set_"): |
| |
1906 continue |
| |
1907 if name in excluded_names: |
| |
1908 continue |
| |
1909 attrs.append(name) |
| |
1910 |
| |
1911 default = object() |
| |
1912 failures = [] |
| |
1913 for name in attrs: |
| |
1914 for p in psutil.process_iter(): |
| |
1915 ret = default |
| |
1916 try: |
| |
1917 try: |
| |
1918 attr = getattr(p, name, None) |
| |
1919 if attr is not None and callable(attr): |
| |
1920 ret = attr() |
| |
1921 else: |
| |
1922 ret = attr |
| |
1923 valid_procs += 1 |
| |
1924 except NotImplementedError: |
| |
1925 register_warning("%r was skipped because not " |
| |
1926 "implemented" % (self.__class__.__name__ + \ |
| |
1927 '.test_' + name)) |
| |
1928 except (psutil.NoSuchProcess, psutil.AccessDenied): |
| |
1929 err = sys.exc_info()[1] |
| |
1930 if isinstance(err, psutil.NoSuchProcess): |
| |
1931 if psutil.pid_exists(p.pid): |
| |
1932 # XXX race condition; we probably need |
| |
1933 # to try figuring out the process |
| |
1934 # identity before failing |
| |
1935 self.fail("PID still exists but fun raised " \ |
| |
1936 "NoSuchProcess") |
| |
1937 self.assertEqual(err.pid, p.pid) |
| |
1938 if err.name: |
| |
1939 # make sure exception's name attr is set |
| |
1940 # with the actual process name |
| |
1941 self.assertEqual(err.name, p.name) |
| |
1942 self.assertTrue(str(err)) |
| |
1943 self.assertTrue(err.msg) |
| |
1944 else: |
| |
1945 if ret not in (0, 0.0, [], None, ''): |
| |
1946 assert ret, ret |
| |
1947 meth = getattr(self, name) |
| |
1948 meth(ret) |
| |
1949 except Exception: |
| |
1950 err = sys.exc_info()[1] |
| |
1951 s = '\n' + '=' * 70 + '\n' |
| |
1952 s += "FAIL: test_%s (proc=%s" % (name, p) |
| |
1953 if ret != default: |
| |
1954 s += ", ret=%s)" % repr(ret) |
| |
1955 s += ')\n' |
| |
1956 s += '-' * 70 |
| |
1957 s += "\n%s" % traceback.format_exc() |
| |
1958 s = "\n".join((" " * 4) + i for i in s.splitlines()) |
| |
1959 failures.append(s) |
| |
1960 break |
| |
1961 |
| |
1962 if failures: |
| |
1963 self.fail(''.join(failures)) |
| |
1964 |
| |
1965 # we should always have a non-empty list, not including PID 0 etc. |
| |
1966 # special cases. |
| |
1967 self.assertTrue(valid_procs > 0) |
| |
1968 |
| |
1969 def cmdline(self, ret): |
| |
1970 pass |
| |
1971 |
| |
1972 def exe(self, ret): |
| |
1973 if not ret: |
| |
1974 self.assertEqual(ret, '') |
| |
1975 else: |
| |
1976 assert os.path.isabs(ret), ret |
| |
1977 # Note: os.stat() may return False even if the file is there |
| |
1978 # hence we skip the test, see: |
| |
1979 # http://stackoverflow.com/questions/3112546/os-path-exists-lies |
| |
1980 if POSIX: |
| |
1981 assert os.path.isfile(ret), ret |
| |
1982 if hasattr(os, 'access') and hasattr(os, "X_OK"): |
| |
1983 # XXX may fail on OSX |
| |
1984 self.assertTrue(os.access(ret, os.X_OK)) |
| |
1985 |
| |
1986 def ppid(self, ret): |
| |
1987 self.assertTrue(ret >= 0) |
| |
1988 |
| |
1989 def name(self, ret): |
| |
1990 self.assertTrue(isinstance(ret, str)) |
| |
1991 self.assertTrue(ret) |
| |
1992 |
| |
1993 def create_time(self, ret): |
| |
1994 self.assertTrue(ret > 0) |
| |
1995 # this can't be taken for granted on all platforms |
| |
1996 #self.assertGreaterEqual(ret, psutil.BOOT_TIME) |
| |
1997 # make sure returned value can be pretty printed |
| |
1998 # with strftime |
| |
1999 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) |
| |
2000 |
| |
2001 def uids(self, ret): |
| |
2002 for uid in ret: |
| |
2003 self.assertTrue(uid >= 0) |
| |
2004 self.assertIn(uid, self._uids) |
| |
2005 |
| |
2006 def gids(self, ret): |
| |
2007 # note: testing all gids as above seems not to be reliable for |
| |
2008 # gid == 30 (nodoby); not sure why. |
| |
2009 for gid in ret: |
| |
2010 self.assertTrue(gid >= 0) |
| |
2011 #self.assertIn(uid, self.gids) |
| |
2012 |
| |
2013 def username(self, ret): |
| |
2014 self.assertTrue(ret) |
| |
2015 if os.name == 'posix': |
| |
2016 self.assertIn(ret, self._usernames) |
| |
2017 |
| |
2018 def status(self, ret): |
| |
2019 self.assertTrue(ret >= 0) |
| |
2020 self.assertTrue(str(ret) != '?') |
| |
2021 |
| |
2022 def get_io_counters(self, ret): |
| |
2023 for field in ret: |
| |
2024 if field != -1: |
| |
2025 self.assertTrue(field >= 0) |
| |
2026 |
| |
2027 def get_ionice(self, ret): |
| |
2028 if LINUX: |
| |
2029 self.assertTrue(ret.ioclass >= 0) |
| |
2030 self.assertTrue(ret.value >= 0) |
| |
2031 else: |
| |
2032 self.assertTrue(ret >= 0) |
| |
2033 self.assertIn(ret, (0, 1, 2)) |
| |
2034 |
| |
2035 def get_num_threads(self, ret): |
| |
2036 self.assertTrue(ret >= 1) |
| |
2037 |
| |
2038 def get_threads(self, ret): |
| |
2039 for t in ret: |
| |
2040 self.assertTrue(t.id >= 0) |
| |
2041 self.assertTrue(t.user_time >= 0) |
| |
2042 self.assertTrue(t.system_time >= 0) |
| |
2043 |
| |
2044 def get_cpu_times(self, ret): |
| |
2045 self.assertTrue(ret.user >= 0) |
| |
2046 self.assertTrue(ret.system >= 0) |
| |
2047 |
| |
2048 def get_memory_info(self, ret): |
| |
2049 self.assertTrue(ret.rss >= 0) |
| |
2050 self.assertTrue(ret.vms >= 0) |
| |
2051 |
| |
2052 def get_ext_memory_info(self, ret): |
| |
2053 for name in ret._fields: |
| |
2054 self.assertTrue(getattr(ret, name) >= 0) |
| |
2055 if POSIX and ret.vms != 0: |
| |
2056 # VMS is always supposed to be the highest |
| |
2057 for name in ret._fields: |
| |
2058 if name != 'vms': |
| |
2059 value = getattr(ret, name) |
| |
2060 assert ret.vms > value, ret |
| |
2061 elif WINDOWS: |
| |
2062 assert ret.peak_wset >= ret.wset, ret |
| |
2063 assert ret.peak_paged_pool >= ret.paged_pool, ret |
| |
2064 assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret |
| |
2065 assert ret.peak_pagefile >= ret.pagefile, ret |
| |
2066 |
| |
2067 def get_open_files(self, ret): |
| |
2068 for f in ret: |
| |
2069 if WINDOWS: |
| |
2070 assert f.fd == -1, f |
| |
2071 else: |
| |
2072 self.assertIsInstance(f.fd, int) |
| |
2073 assert os.path.isabs(f.path), f |
| |
2074 assert os.path.isfile(f.path), f |
| |
2075 |
| |
2076 def get_num_fds(self, ret): |
| |
2077 self.assertTrue(ret >= 0) |
| |
2078 |
| |
2079 def get_connections(self, ret): |
| |
2080 # all values are supposed to match Linux's tcp_states.h states |
| |
2081 # table across all platforms. |
| |
2082 valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
| |
2083 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
| |
2084 "LAST_ACK", "LISTEN", "CLOSING", "NONE"] |
| |
2085 if SUNOS: |
| |
2086 valid_conn_states += ["IDLE", "BOUND"] |
| |
2087 if WINDOWS: |
| |
2088 valid_conn_states += ["DELETE_TCB"] |
| |
2089 for conn in ret: |
| |
2090 self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM)) |
| |
2091 self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6)) |
| |
2092 check_ip_address(conn.laddr, conn.family) |
| |
2093 check_ip_address(conn.raddr, conn.family) |
| |
2094 if conn.status not in valid_conn_states: |
| |
2095 self.fail("%s is not a valid status" % conn.status) |
| |
2096 # actually try to bind the local socket; ignore IPv6 |
| |
2097 # sockets as their address might be represented as |
| |
2098 # an IPv4-mapped-address (e.g. "::127.0.0.1") |
| |
2099 # and that's rejected by bind() |
| |
2100 if conn.family == socket.AF_INET: |
| |
2101 s = socket.socket(conn.family, conn.type) |
| |
2102 s.bind((conn.laddr[0], 0)) |
| |
2103 s.close() |
| |
2104 |
| |
2105 if not WINDOWS and hasattr(socket, 'fromfd'): |
| |
2106 dupsock = None |
| |
2107 try: |
| |
2108 try: |
| |
2109 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| |
2110 except (socket.error, OSError): |
| |
2111 err = sys.exc_info()[1] |
| |
2112 if err.args[0] == errno.EBADF: |
| |
2113 continue |
| |
2114 raise |
| |
2115 # python >= 2.5 |
| |
2116 if hasattr(dupsock, "family"): |
| |
2117 self.assertEqual(dupsock.family, conn.family) |
| |
2118 self.assertEqual(dupsock.type, conn.type) |
| |
2119 finally: |
| |
2120 if dupsock is not None: |
| |
2121 dupsock.close() |
| |
2122 |
| |
2123 def getcwd(self, ret): |
| |
2124 if ret is not None: # BSD may return None |
| |
2125 assert os.path.isabs(ret), ret |
| |
2126 try: |
| |
2127 st = os.stat(ret) |
| |
2128 except OSError: |
| |
2129 err = sys.exc_info()[1] |
| |
2130 # directory has been removed in mean time |
| |
2131 if err.errno != errno.ENOENT: |
| |
2132 raise |
| |
2133 else: |
| |
2134 self.assertTrue(stat.S_ISDIR(st.st_mode)) |
| |
2135 |
| |
2136 def get_memory_percent(self, ret): |
| |
2137 assert 0 <= ret <= 100, ret |
| |
2138 |
| |
2139 def is_running(self, ret): |
| |
2140 self.assertTrue(ret) |
| |
2141 |
| |
2142 def get_cpu_affinity(self, ret): |
| |
2143 assert ret != [], ret |
| |
2144 |
| |
2145 def terminal(self, ret): |
| |
2146 if ret is not None: |
| |
2147 assert os.path.isabs(ret), ret |
| |
2148 assert os.path.exists(ret), ret |
| |
2149 |
| |
2150 def get_memory_maps(self, ret): |
| |
2151 for nt in ret: |
| |
2152 for fname in nt._fields: |
| |
2153 value = getattr(nt, fname) |
| |
2154 if fname == 'path': |
| |
2155 if not value.startswith('['): |
| |
2156 assert os.path.isabs(nt.path), nt.path |
| |
2157 # commented as on Linux we might get '/foo/bar (deleted)' |
| |
2158 #assert os.path.exists(nt.path), nt.path |
| |
2159 elif fname in ('addr', 'perms'): |
| |
2160 self.assertTrue(value) |
| |
2161 else: |
| |
2162 self.assertIsInstance(value, (int, long)) |
| |
2163 assert value >= 0, value |
| |
2164 |
| |
2165 def get_num_handles(self, ret): |
| |
2166 if WINDOWS: |
| |
2167 self.assertGreaterEqual(ret, 0) |
| |
2168 else: |
| |
2169 self.assertGreaterEqual(ret, 0) |
| |
2170 |
| |
2171 def get_nice(self, ret): |
| |
2172 if POSIX: |
| |
2173 assert -20 <= ret <= 20, ret |
| |
2174 else: |
| |
2175 priorities = [getattr(psutil, x) for x in dir(psutil) |
| |
2176 if x.endswith('_PRIORITY_CLASS')] |
| |
2177 self.assertIn(ret, priorities) |
| |
2178 |
| |
2179 def get_num_ctx_switches(self, ret): |
| |
2180 self.assertTrue(ret.voluntary >= 0) |
| |
2181 self.assertTrue(ret.involuntary >= 0) |
| |
2182 |
| |
2183 |
| |
2184 # =================================================================== |
| |
2185 # --- Limited user tests |
| |
2186 # =================================================================== |
| |
2187 |
| |
2188 if hasattr(os, 'getuid') and os.getuid() == 0: |
| |
2189 class LimitedUserTestCase(TestProcess): |
| |
2190 """Repeat the previous tests by using a limited user. |
| |
2191 Executed only on UNIX and only if the user who run the test script |
| |
2192 is root. |
| |
2193 """ |
| |
2194 # the uid/gid the test suite runs under |
| |
2195 PROCESS_UID = os.getuid() |
| |
2196 PROCESS_GID = os.getgid() |
| |
2197 |
| |
2198 def __init__(self, *args, **kwargs): |
| |
2199 TestProcess.__init__(self, *args, **kwargs) |
| |
2200 # re-define all existent test methods in order to |
| |
2201 # ignore AccessDenied exceptions |
| |
2202 for attr in [x for x in dir(self) if x.startswith('test')]: |
| |
2203 meth = getattr(self, attr) |
| |
2204 def test_(self): |
| |
2205 try: |
| |
2206 meth() |
| |
2207 except psutil.AccessDenied: |
| |
2208 pass |
| |
2209 setattr(self, attr, types.MethodType(test_, self)) |
| |
2210 |
| |
2211 def setUp(self): |
| |
2212 os.setegid(1000) |
| |
2213 os.seteuid(1000) |
| |
2214 TestProcess.setUp(self) |
| |
2215 |
| |
2216 def tearDown(self): |
| |
2217 os.setegid(self.PROCESS_UID) |
| |
2218 os.seteuid(self.PROCESS_GID) |
| |
2219 TestProcess.tearDown(self) |
| |
2220 |
| |
2221 def test_nice(self): |
| |
2222 try: |
| |
2223 psutil.Process(os.getpid()).set_nice(-1) |
| |
2224 except psutil.AccessDenied: |
| |
2225 pass |
| |
2226 else: |
| |
2227 self.fail("exception not raised") |
| |
2228 |
| |
2229 |
| |
2230 # =================================================================== |
| |
2231 # --- Example script tests |
| |
2232 # =================================================================== |
| |
2233 |
| |
2234 class TestExampleScripts(unittest.TestCase): |
| |
2235 """Tests for scripts in the examples directory.""" |
| |
2236 |
| |
2237 def assert_stdout(self, exe, args=None): |
| |
2238 exe = os.path.join(EXAMPLES_DIR, exe) |
| |
2239 if args: |
| |
2240 exe = exe + ' ' + args |
| |
2241 try: |
| |
2242 out = sh(sys.executable + ' ' + exe).strip() |
| |
2243 except RuntimeError: |
| |
2244 err = sys.exc_info()[1] |
| |
2245 if 'AccessDenied' in str(err): |
| |
2246 return str(err) |
| |
2247 else: |
| |
2248 raise |
| |
2249 assert out, out |
| |
2250 return out |
| |
2251 |
| |
2252 def assert_syntax(self, exe, args=None): |
| |
2253 exe = os.path.join(EXAMPLES_DIR, exe) |
| |
2254 f = open(exe, 'r') |
| |
2255 try: |
| |
2256 src = f.read() |
| |
2257 finally: |
| |
2258 f.close() |
| |
2259 ast.parse(src) |
| |
2260 |
| |
2261 def test_check_presence(self): |
| |
2262 # make sure all example scripts have a test method defined |
| |
2263 meths = dir(self) |
| |
2264 for name in os.listdir(EXAMPLES_DIR): |
| |
2265 if name.endswith('.py'): |
| |
2266 if 'test_' + os.path.splitext(name)[0] not in meths: |
| |
2267 #self.assert_stdout(name) |
| |
2268 self.fail('no test defined for %r script' \ |
| |
2269 % os.path.join(EXAMPLES_DIR, name)) |
| |
2270 |
| |
2271 def test_disk_usage(self): |
| |
2272 self.assert_stdout('disk_usage.py') |
| |
2273 |
| |
2274 def test_free(self): |
| |
2275 self.assert_stdout('free.py') |
| |
2276 |
| |
2277 def test_meminfo(self): |
| |
2278 self.assert_stdout('meminfo.py') |
| |
2279 |
| |
2280 def test_process_detail(self): |
| |
2281 self.assert_stdout('process_detail.py') |
| |
2282 |
| |
2283 def test_who(self): |
| |
2284 self.assert_stdout('who.py') |
| |
2285 |
| |
2286 def test_netstat(self): |
| |
2287 self.assert_stdout('netstat.py') |
| |
2288 |
| |
2289 def test_pmap(self): |
| |
2290 self.assert_stdout('pmap.py', args=str(os.getpid())) |
| |
2291 |
| |
2292 @unittest.skipIf(ast is None, |
| |
2293 'ast module not available on this python version') |
| |
2294 def test_killall(self): |
| |
2295 self.assert_syntax('killall.py') |
| |
2296 |
| |
2297 @unittest.skipIf(ast is None, |
| |
2298 'ast module not available on this python version') |
| |
2299 def test_nettop(self): |
| |
2300 self.assert_syntax('nettop.py') |
| |
2301 |
| |
2302 @unittest.skipIf(ast is None, |
| |
2303 'ast module not available on this python version') |
| |
2304 def test_top(self): |
| |
2305 self.assert_syntax('top.py') |
| |
2306 |
| |
2307 @unittest.skipIf(ast is None, |
| |
2308 'ast module not available on this python version') |
| |
2309 def test_iotop(self): |
| |
2310 self.assert_syntax('iotop.py') |
| |
2311 |
| |
2312 |
| |
2313 def cleanup(): |
| |
2314 reap_children(search_all=True) |
| |
2315 DEVNULL.close() |
| |
2316 safe_remove(TESTFN) |
| |
2317 |
| |
2318 atexit.register(cleanup) |
| |
2319 safe_remove(TESTFN) |
| |
2320 |
| |
2321 def test_main(): |
| |
2322 tests = [] |
| |
2323 test_suite = unittest.TestSuite() |
| |
2324 tests.append(TestSystemAPIs) |
| |
2325 tests.append(TestProcess) |
| |
2326 tests.append(TestFetchAllProcesses) |
| |
2327 |
| |
2328 if POSIX: |
| |
2329 from _posix import PosixSpecificTestCase |
| |
2330 tests.append(PosixSpecificTestCase) |
| |
2331 |
| |
2332 # import the specific platform test suite |
| |
2333 if LINUX: |
| |
2334 from _linux import LinuxSpecificTestCase as stc |
| |
2335 elif WINDOWS: |
| |
2336 from _windows import WindowsSpecificTestCase as stc |
| |
2337 from _windows import TestDualProcessImplementation |
| |
2338 tests.append(TestDualProcessImplementation) |
| |
2339 elif OSX: |
| |
2340 from _osx import OSXSpecificTestCase as stc |
| |
2341 elif BSD: |
| |
2342 from _bsd import BSDSpecificTestCase as stc |
| |
2343 elif SUNOS: |
| |
2344 from _sunos import SunOSSpecificTestCase as stc |
| |
2345 tests.append(stc) |
| |
2346 |
| |
2347 if hasattr(os, 'getuid'): |
| |
2348 if 'LimitedUserTestCase' in globals(): |
| |
2349 tests.append(LimitedUserTestCase) |
| |
2350 else: |
| |
2351 register_warning("LimitedUserTestCase was skipped (super-user " |
| |
2352 "privileges are required)") |
| |
2353 |
| |
2354 tests.append(TestExampleScripts) |
| |
2355 |
| |
2356 for test_class in tests: |
| |
2357 test_suite.addTest(unittest.makeSuite(test_class)) |
| |
2358 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
| |
2359 return result.wasSuccessful() |
| |
2360 |
| |
2361 if __name__ == '__main__': |
| |
2362 if not test_main(): |
| |
2363 sys.exit(1) |