|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """ |
|
8 psutil test suite (you can quickly run it with "python setup.py test"). |
|
9 |
|
10 Note: this is targeted for both python 2.x and 3.x so there's no need |
|
11 to use 2to3 tool first. |
|
12 |
|
13 If you're on Python < 2.7 it is recommended to install unittest2 module |
|
14 from: https://pypi.python.org/pypi/unittest2 |
|
15 """ |
|
16 |
|
17 from __future__ import division |
|
18 import os |
|
19 import sys |
|
20 import subprocess |
|
21 import time |
|
22 import signal |
|
23 import types |
|
24 import traceback |
|
25 import socket |
|
26 import warnings |
|
27 import atexit |
|
28 import errno |
|
29 import threading |
|
30 import tempfile |
|
31 import stat |
|
32 import collections |
|
33 import datetime |
|
34 try: |
|
35 import unittest2 as unittest # pyhon < 2.7 + unittest2 installed |
|
36 except ImportError: |
|
37 import unittest |
|
38 try: |
|
39 import ast # python >= 2.6 |
|
40 except ImportError: |
|
41 ast = None |
|
42 |
|
43 import psutil |
|
44 from psutil._compat import PY3, callable, long, wraps |
|
45 |
|
46 |
|
47 # =================================================================== |
|
48 # --- Constants |
|
49 # =================================================================== |
|
50 |
|
51 # conf for retry_before_failing() decorator |
|
52 NO_RETRIES = 10 |
|
53 # bytes tolerance for OS memory related tests |
|
54 TOLERANCE = 500 * 1024 # 500KB |
|
55 |
|
56 PYTHON = os.path.realpath(sys.executable) |
|
57 DEVNULL = open(os.devnull, 'r+') |
|
58 TESTFN = os.path.join(os.getcwd(), "$testfile") |
|
59 EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname( |
|
60 os.path.dirname(__file__)), 'examples')) |
|
61 POSIX = os.name == 'posix' |
|
62 LINUX = sys.platform.startswith("linux") |
|
63 WINDOWS = sys.platform.startswith("win32") |
|
64 OSX = sys.platform.startswith("darwin") |
|
65 BSD = sys.platform.startswith("freebsd") |
|
66 SUNOS = sys.platform.startswith("sunos") |
|
67 |
|
68 |
|
69 # =================================================================== |
|
70 # --- Utility functions |
|
71 # =================================================================== |
|
72 |
|
73 _subprocesses_started = set() |
|
74 |
|
75 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL, |
|
76 wait=False): |
|
77 """Return a subprocess.Popen object to use in tests. |
|
78 By default stdout and stderr are redirected to /dev/null and the |
|
79 python interpreter is used as test process. |
|
80 If 'wait' is True attemps to make sure the process is in a |
|
81 reasonably initialized state. |
|
82 """ |
|
83 if cmd is None: |
|
84 pyline = "" |
|
85 if wait: |
|
86 pyline += "open(r'%s', 'w'); " % TESTFN |
|
87 pyline += "import time; time.sleep(2);" |
|
88 cmd_ = [PYTHON, "-c", pyline] |
|
89 else: |
|
90 cmd_ = cmd |
|
91 sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) |
|
92 if wait: |
|
93 if cmd is None: |
|
94 stop_at = time.time() + 3 |
|
95 while stop_at > time.time(): |
|
96 if os.path.exists(TESTFN): |
|
97 break |
|
98 time.sleep(0.001) |
|
99 else: |
|
100 warn("couldn't make sure test file was actually created") |
|
101 else: |
|
102 wait_for_pid(sproc.pid) |
|
103 _subprocesses_started.add(sproc.pid) |
|
104 return sproc |
|
105 |
|
106 def warn(msg): |
|
107 """Raise a warning msg.""" |
|
108 warnings.warn(msg, UserWarning) |
|
109 |
|
110 def register_warning(msg): |
|
111 """Register a warning which will be printed on interpreter exit.""" |
|
112 atexit.register(lambda: warn(msg)) |
|
113 |
|
114 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): |
|
115 """run cmd in a subprocess and return its output. |
|
116 raises RuntimeError on error. |
|
117 """ |
|
118 p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) |
|
119 stdout, stderr = p.communicate() |
|
120 if p.returncode != 0: |
|
121 raise RuntimeError(stderr) |
|
122 if stderr: |
|
123 warn(stderr) |
|
124 if PY3: |
|
125 stdout = str(stdout, sys.stdout.encoding) |
|
126 return stdout.strip() |
|
127 |
|
128 def which(program): |
|
129 """Same as UNIX which command. Return None on command not found.""" |
|
130 def is_exe(fpath): |
|
131 return os.path.isfile(fpath) and os.access(fpath, os.X_OK) |
|
132 |
|
133 fpath, fname = os.path.split(program) |
|
134 if fpath: |
|
135 if is_exe(program): |
|
136 return program |
|
137 else: |
|
138 for path in os.environ["PATH"].split(os.pathsep): |
|
139 exe_file = os.path.join(path, program) |
|
140 if is_exe(exe_file): |
|
141 return exe_file |
|
142 return None |
|
143 |
|
144 def wait_for_pid(pid, timeout=1): |
|
145 """Wait for pid to show up in the process list then return. |
|
146 Used in the test suite to give time the sub process to initialize. |
|
147 """ |
|
148 raise_at = time.time() + timeout |
|
149 while 1: |
|
150 if pid in psutil.get_pid_list(): |
|
151 # give it one more iteration to allow full initialization |
|
152 time.sleep(0.01) |
|
153 return |
|
154 time.sleep(0.0001) |
|
155 if time.time() >= raise_at: |
|
156 raise RuntimeError("Timed out") |
|
157 |
|
158 def reap_children(search_all=False): |
|
159 """Kill any subprocess started by this test suite and ensure that |
|
160 no zombies stick around to hog resources and create problems when |
|
161 looking for refleaks. |
|
162 """ |
|
163 pids = _subprocesses_started |
|
164 if search_all: |
|
165 this_process = psutil.Process(os.getpid()) |
|
166 for p in this_process.get_children(recursive=True): |
|
167 pids.add(p.pid) |
|
168 while pids: |
|
169 pid = pids.pop() |
|
170 try: |
|
171 child = psutil.Process(pid) |
|
172 child.kill() |
|
173 except psutil.NoSuchProcess: |
|
174 pass |
|
175 except psutil.AccessDenied: |
|
176 warn("couldn't kill child process with pid %s" % pid) |
|
177 else: |
|
178 child.wait(timeout=3) |
|
179 |
|
180 def check_ip_address(addr, family): |
|
181 """Attempts to check IP address's validity.""" |
|
182 if not addr: |
|
183 return |
|
184 ip, port = addr |
|
185 assert isinstance(port, int), port |
|
186 if family == socket.AF_INET: |
|
187 ip = list(map(int, ip.split('.'))) |
|
188 assert len(ip) == 4, ip |
|
189 for num in ip: |
|
190 assert 0 <= num <= 255, ip |
|
191 assert 0 <= port <= 65535, port |
|
192 |
|
193 def safe_remove(fname): |
|
194 """Deletes a file and does not exception if it doesn't exist.""" |
|
195 try: |
|
196 os.remove(fname) |
|
197 except OSError: |
|
198 err = sys.exc_info()[1] |
|
199 if err.args[0] != errno.ENOENT: |
|
200 raise |
|
201 |
|
202 def call_until(fun, expr, timeout=1): |
|
203 """Keep calling function for timeout secs and exit if eval() |
|
204 expression is True. |
|
205 """ |
|
206 stop_at = time.time() + timeout |
|
207 while time.time() < stop_at: |
|
208 ret = fun() |
|
209 if eval(expr): |
|
210 return ret |
|
211 time.sleep(0.001) |
|
212 raise RuntimeError('timed out (ret=%r)' % ret) |
|
213 |
|
214 def retry_before_failing(ntimes=None): |
|
215 """Decorator which runs a test function and retries N times before |
|
216 actually failing. |
|
217 """ |
|
218 def decorator(fun): |
|
219 @wraps(fun) |
|
220 def wrapper(*args, **kwargs): |
|
221 for x in range(ntimes or NO_RETRIES): |
|
222 try: |
|
223 return fun(*args, **kwargs) |
|
224 except AssertionError: |
|
225 err = sys.exc_info()[1] |
|
226 raise |
|
227 return wrapper |
|
228 return decorator |
|
229 |
|
230 def skip_on_access_denied(only_if=None): |
|
231 """Decorator to Ignore AccessDenied exceptions.""" |
|
232 def decorator(fun): |
|
233 @wraps(fun) |
|
234 def wrapper(*args, **kwargs): |
|
235 try: |
|
236 return fun(*args, **kwargs) |
|
237 except psutil.AccessDenied: |
|
238 if only_if is not None: |
|
239 if not only_if: |
|
240 raise |
|
241 msg = "%r was skipped because it raised AccessDenied" \ |
|
242 % fun.__name__ |
|
243 self = args[0] |
|
244 if hasattr(self, 'skip'): # python >= 2.7 |
|
245 self.skip(msg) |
|
246 else: |
|
247 register_warning(msg) |
|
248 return wrapper |
|
249 return decorator |
|
250 |
|
251 def skip_on_not_implemented(only_if=None): |
|
252 """Decorator to Ignore NotImplementedError exceptions.""" |
|
253 def decorator(fun): |
|
254 @wraps(fun) |
|
255 def wrapper(*args, **kwargs): |
|
256 try: |
|
257 return fun(*args, **kwargs) |
|
258 except NotImplementedError: |
|
259 if only_if is not None: |
|
260 if not only_if: |
|
261 raise |
|
262 msg = "%r was skipped because it raised NotImplementedError" \ |
|
263 % fun.__name__ |
|
264 self = args[0] |
|
265 if hasattr(self, 'skip'): # python >= 2.7 |
|
266 self.skip(msg) |
|
267 else: |
|
268 register_warning(msg) |
|
269 return wrapper |
|
270 return decorator |
|
271 |
|
272 def supports_ipv6(): |
|
273 """Return True if IPv6 is supported on this platform.""" |
|
274 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
|
275 return False |
|
276 sock = None |
|
277 try: |
|
278 try: |
|
279 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
|
280 sock.bind(("::1", 0)) |
|
281 except (socket.error, socket.gaierror): |
|
282 return False |
|
283 else: |
|
284 return True |
|
285 finally: |
|
286 if sock is not None: |
|
287 sock.close() |
|
288 |
|
289 |
|
290 class ThreadTask(threading.Thread): |
|
291 """A thread object used for running process thread tests.""" |
|
292 |
|
293 def __init__(self): |
|
294 threading.Thread.__init__(self) |
|
295 self._running = False |
|
296 self._interval = None |
|
297 self._flag = threading.Event() |
|
298 |
|
299 def __repr__(self): |
|
300 name = self.__class__.__name__ |
|
301 return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
|
302 |
|
303 def start(self, interval=0.001): |
|
304 """Start thread and keep it running until an explicit |
|
305 stop() request. Polls for shutdown every 'timeout' seconds. |
|
306 """ |
|
307 if self._running: |
|
308 raise ValueError("already started") |
|
309 self._interval = interval |
|
310 threading.Thread.start(self) |
|
311 self._flag.wait() |
|
312 |
|
313 def run(self): |
|
314 self._running = True |
|
315 self._flag.set() |
|
316 while self._running: |
|
317 time.sleep(self._interval) |
|
318 |
|
319 def stop(self): |
|
320 """Stop thread execution and and waits until it is stopped.""" |
|
321 if not self._running: |
|
322 raise ValueError("already stopped") |
|
323 self._running = False |
|
324 self.join() |
|
325 |
|
326 |
|
327 # =================================================================== |
|
328 # --- Support for python < 2.7 in case unittest2 is not installed |
|
329 # =================================================================== |
|
330 |
|
331 if not hasattr(unittest, 'skip'): |
|
332 register_warning("unittest2 module is not installed; a serie of pretty " \ |
|
333 "darn ugly workarounds will be used") |
|
334 |
|
335 class SkipTest(Exception): |
|
336 pass |
|
337 |
|
338 class TestCase(unittest.TestCase): |
|
339 |
|
340 def _safe_repr(self, obj): |
|
341 MAX_LENGTH = 80 |
|
342 try: |
|
343 result = repr(obj) |
|
344 except Exception: |
|
345 result = object.__repr__(obj) |
|
346 if len(result) < MAX_LENGTH: |
|
347 return result |
|
348 return result[:MAX_LENGTH] + ' [truncated]...' |
|
349 |
|
350 def _fail_w_msg(self, a, b, middle, msg): |
|
351 self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle, |
|
352 self._safe_repr(b))) |
|
353 |
|
354 def skip(self, msg): |
|
355 raise SkipTest(msg) |
|
356 |
|
357 def assertIn(self, a, b, msg=None): |
|
358 if a not in b: |
|
359 self._fail_w_msg(a, b, 'not found in', msg) |
|
360 |
|
361 def assertNotIn(self, a, b, msg=None): |
|
362 if a in b: |
|
363 self._fail_w_msg(a, b, 'found in', msg) |
|
364 |
|
365 def assertGreater(self, a, b, msg=None): |
|
366 if not a > b: |
|
367 self._fail_w_msg(a, b, 'not greater than', msg) |
|
368 |
|
369 def assertGreaterEqual(self, a, b, msg=None): |
|
370 if not a >= b: |
|
371 self._fail_w_msg(a, b, 'not greater than or equal to', msg) |
|
372 |
|
373 def assertLess(self, a, b, msg=None): |
|
374 if not a < b: |
|
375 self._fail_w_msg(a, b, 'not less than', msg) |
|
376 |
|
377 def assertLessEqual(self, a, b, msg=None): |
|
378 if not a <= b: |
|
379 self._fail_w_msg(a, b, 'not less or equal to', msg) |
|
380 |
|
381 def assertIsInstance(self, a, b, msg=None): |
|
382 if not isinstance(a, b): |
|
383 self.fail(msg or '%s is not an instance of %r' \ |
|
384 % (self._safe_repr(a), b)) |
|
385 |
|
386 def assertAlmostEqual(self, a, b, msg=None, delta=None): |
|
387 if delta is not None: |
|
388 if abs(a - b) <= delta: |
|
389 return |
|
390 self.fail(msg or '%s != %s within %s delta' \ |
|
391 % (self._safe_repr(a), self._safe_repr(b), |
|
392 self._safe_repr(delta))) |
|
393 else: |
|
394 self.assertEqual(a, b, msg=msg) |
|
395 |
|
396 |
|
397 def skipIf(condition, reason): |
|
398 def decorator(fun): |
|
399 @wraps(fun) |
|
400 def wrapper(*args, **kwargs): |
|
401 self = args[0] |
|
402 if condition: |
|
403 sys.stdout.write("skipped-") |
|
404 sys.stdout.flush() |
|
405 if warn: |
|
406 objname = "%s.%s" % (self.__class__.__name__, |
|
407 fun.__name__) |
|
408 msg = "%s was skipped" % objname |
|
409 if reason: |
|
410 msg += "; reason: " + repr(reason) |
|
411 register_warning(msg) |
|
412 return |
|
413 else: |
|
414 return fun(*args, **kwargs) |
|
415 return wrapper |
|
416 return decorator |
|
417 |
|
418 def skipUnless(condition, reason): |
|
419 if not condition: |
|
420 return unittest.skipIf(True, reason) |
|
421 return unittest.skipIf(False, reason) |
|
422 |
|
423 unittest.TestCase = TestCase |
|
424 unittest.skipIf = skipIf |
|
425 unittest.skipUnless = skipUnless |
|
426 del TestCase, skipIf, skipUnless |
|
427 |
|
428 |
|
429 # =================================================================== |
|
430 # --- System-related API tests |
|
431 # =================================================================== |
|
432 |
|
433 class TestSystemAPIs(unittest.TestCase): |
|
434 """Tests for system-related APIs.""" |
|
435 |
|
436 def setUp(self): |
|
437 safe_remove(TESTFN) |
|
438 |
|
439 def tearDown(self): |
|
440 reap_children() |
|
441 |
|
442 def test_process_iter(self): |
|
443 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) |
|
444 sproc = get_test_subprocess() |
|
445 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
|
446 p = psutil.Process(sproc.pid) |
|
447 p.kill() |
|
448 p.wait() |
|
449 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
|
450 |
|
451 def test_TOTAL_PHYMEM(self): |
|
452 x = psutil.TOTAL_PHYMEM |
|
453 self.assertIsInstance(x, (int, long)) |
|
454 self.assertGreater(x, 0) |
|
455 self.assertEqual(x, psutil.virtual_memory().total) |
|
456 |
|
457 def test_BOOT_TIME(self, arg=None): |
|
458 x = arg or psutil.BOOT_TIME |
|
459 self.assertIsInstance(x, float) |
|
460 self.assertGreater(x, 0) |
|
461 self.assertLess(x, time.time()) |
|
462 |
|
463 def test_get_boot_time(self): |
|
464 self.test_BOOT_TIME(psutil.get_boot_time()) |
|
465 if WINDOWS: |
|
466 # work around float precision issues; give it 1 secs tolerance |
|
467 diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME) |
|
468 self.assertLess(diff, 1) |
|
469 else: |
|
470 self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME) |
|
471 |
|
472 def test_NUM_CPUS(self): |
|
473 self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True))) |
|
474 self.assertGreaterEqual(psutil.NUM_CPUS, 1) |
|
475 |
|
476 @unittest.skipUnless(POSIX, 'posix only') |
|
477 def test_PAGESIZE(self): |
|
478 # pagesize is used internally to perform different calculations |
|
479 # and it's determined by using SC_PAGE_SIZE; make sure |
|
480 # getpagesize() returns the same value. |
|
481 import resource |
|
482 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) |
|
483 |
|
484 def test_deprecated_apis(self): |
|
485 s = socket.socket() |
|
486 s.bind(('localhost', 0)) |
|
487 s.listen(1) |
|
488 warnings.filterwarnings("error") |
|
489 p = psutil.Process(os.getpid()) |
|
490 try: |
|
491 self.assertRaises(DeprecationWarning, psutil.virtmem_usage) |
|
492 self.assertRaises(DeprecationWarning, psutil.used_phymem) |
|
493 self.assertRaises(DeprecationWarning, psutil.avail_phymem) |
|
494 self.assertRaises(DeprecationWarning, psutil.total_virtmem) |
|
495 self.assertRaises(DeprecationWarning, psutil.used_virtmem) |
|
496 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) |
|
497 self.assertRaises(DeprecationWarning, psutil.phymem_usage) |
|
498 self.assertRaises(DeprecationWarning, psutil.get_process_list) |
|
499 self.assertRaises(DeprecationWarning, psutil.network_io_counters) |
|
500 if LINUX: |
|
501 self.assertRaises(DeprecationWarning, psutil.phymem_buffers) |
|
502 self.assertRaises(DeprecationWarning, psutil.cached_phymem) |
|
503 try: |
|
504 p.nice |
|
505 except DeprecationWarning: |
|
506 pass |
|
507 else: |
|
508 self.fail("p.nice didn't raise DeprecationWarning") |
|
509 ret = call_until(p.get_connections, "len(ret) != 0", timeout=1) |
|
510 self.assertRaises(DeprecationWarning, |
|
511 getattr, ret[0], 'local_address') |
|
512 self.assertRaises(DeprecationWarning, |
|
513 getattr, ret[0], 'remote_address') |
|
514 finally: |
|
515 s.close() |
|
516 warnings.resetwarnings() |
|
517 |
|
518 def test_deprecated_apis_retval(self): |
|
519 warnings.filterwarnings("ignore") |
|
520 p = psutil.Process(os.getpid()) |
|
521 try: |
|
522 self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total) |
|
523 self.assertEqual(p.nice, p.get_nice()) |
|
524 finally: |
|
525 warnings.resetwarnings() |
|
526 |
|
527 def test_virtual_memory(self): |
|
528 mem = psutil.virtual_memory() |
|
529 assert mem.total > 0, mem |
|
530 assert mem.available > 0, mem |
|
531 assert 0 <= mem.percent <= 100, mem |
|
532 assert mem.used > 0, mem |
|
533 assert mem.free >= 0, mem |
|
534 for name in mem._fields: |
|
535 if name != 'total': |
|
536 value = getattr(mem, name) |
|
537 if not value >= 0: |
|
538 self.fail("%r < 0 (%s)" % (name, value)) |
|
539 if value > mem.total: |
|
540 self.fail("%r > total (total=%s, %s=%s)" \ |
|
541 % (name, mem.total, name, value)) |
|
542 |
|
543 def test_swap_memory(self): |
|
544 mem = psutil.swap_memory() |
|
545 assert mem.total >= 0, mem |
|
546 assert mem.used >= 0, mem |
|
547 assert mem.free > 0, mem |
|
548 assert 0 <= mem.percent <= 100, mem |
|
549 assert mem.sin >= 0, mem |
|
550 assert mem.sout >= 0, mem |
|
551 |
|
552 def test_pid_exists(self): |
|
553 sproc = get_test_subprocess(wait=True) |
|
554 assert psutil.pid_exists(sproc.pid) |
|
555 p = psutil.Process(sproc.pid) |
|
556 p.kill() |
|
557 p.wait() |
|
558 self.assertFalse(psutil.pid_exists(sproc.pid)) |
|
559 self.assertFalse(psutil.pid_exists(-1)) |
|
560 |
|
561 def test_pid_exists_2(self): |
|
562 reap_children() |
|
563 pids = psutil.get_pid_list() |
|
564 for pid in pids: |
|
565 try: |
|
566 assert psutil.pid_exists(pid) |
|
567 except AssertionError: |
|
568 # in case the process disappeared in meantime fail only |
|
569 # if it is no longer in get_pid_list() |
|
570 time.sleep(.1) |
|
571 if pid in psutil.get_pid_list(): |
|
572 self.fail(pid) |
|
573 pids = range(max(pids) + 5000, max(pids) + 6000) |
|
574 for pid in pids: |
|
575 self.assertFalse(psutil.pid_exists(pid)) |
|
576 |
|
577 def test_get_pid_list(self): |
|
578 plist = [x.pid for x in psutil.process_iter()] |
|
579 pidlist = psutil.get_pid_list() |
|
580 self.assertEqual(plist.sort(), pidlist.sort()) |
|
581 # make sure every pid is unique |
|
582 self.assertEqual(len(pidlist), len(set(pidlist))) |
|
583 |
|
584 def test_test(self): |
|
585 # test for psutil.test() function |
|
586 stdout = sys.stdout |
|
587 sys.stdout = DEVNULL |
|
588 try: |
|
589 psutil.test() |
|
590 finally: |
|
591 sys.stdout = stdout |
|
592 |
|
593 def test_sys_cpu_times(self): |
|
594 total = 0 |
|
595 times = psutil.cpu_times() |
|
596 sum(times) |
|
597 for cp_time in times: |
|
598 self.assertIsInstance(cp_time, float) |
|
599 self.assertGreaterEqual(cp_time, 0.0) |
|
600 total += cp_time |
|
601 self.assertEqual(total, sum(times)) |
|
602 str(times) |
|
603 |
|
604 def test_sys_cpu_times2(self): |
|
605 t1 = sum(psutil.cpu_times()) |
|
606 time.sleep(0.1) |
|
607 t2 = sum(psutil.cpu_times()) |
|
608 difference = t2 - t1 |
|
609 if not difference >= 0.05: |
|
610 self.fail("difference %s" % difference) |
|
611 |
|
612 def test_sys_per_cpu_times(self): |
|
613 for times in psutil.cpu_times(percpu=True): |
|
614 total = 0 |
|
615 sum(times) |
|
616 for cp_time in times: |
|
617 self.assertIsInstance(cp_time, float) |
|
618 self.assertGreaterEqual(cp_time, 0.0) |
|
619 total += cp_time |
|
620 self.assertEqual(total, sum(times)) |
|
621 str(times) |
|
622 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), |
|
623 len(psutil.cpu_times(percpu=False))) |
|
624 |
|
625 def test_sys_per_cpu_times2(self): |
|
626 tot1 = psutil.cpu_times(percpu=True) |
|
627 stop_at = time.time() + 0.1 |
|
628 while 1: |
|
629 if time.time() >= stop_at: |
|
630 break |
|
631 tot2 = psutil.cpu_times(percpu=True) |
|
632 for t1, t2 in zip(tot1, tot2): |
|
633 t1, t2 = sum(t1), sum(t2) |
|
634 difference = t2 - t1 |
|
635 if difference >= 0.05: |
|
636 return |
|
637 self.fail() |
|
638 |
|
639 def _test_cpu_percent(self, percent): |
|
640 self.assertIsInstance(percent, float) |
|
641 self.assertGreaterEqual(percent, 0.0) |
|
642 self.assertLessEqual(percent, 100.0) |
|
643 |
|
644 def test_sys_cpu_percent(self): |
|
645 psutil.cpu_percent(interval=0.001) |
|
646 for x in range(1000): |
|
647 self._test_cpu_percent(psutil.cpu_percent(interval=None)) |
|
648 |
|
649 def test_sys_per_cpu_percent(self): |
|
650 self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)), |
|
651 psutil.NUM_CPUS) |
|
652 for x in range(1000): |
|
653 percents = psutil.cpu_percent(interval=None, percpu=True) |
|
654 for percent in percents: |
|
655 self._test_cpu_percent(percent) |
|
656 |
|
657 def test_sys_cpu_times_percent(self): |
|
658 psutil.cpu_times_percent(interval=0.001) |
|
659 for x in range(1000): |
|
660 cpu = psutil.cpu_times_percent(interval=None) |
|
661 for percent in cpu: |
|
662 self._test_cpu_percent(percent) |
|
663 self._test_cpu_percent(sum(cpu)) |
|
664 |
|
665 def test_sys_per_cpu_times_percent(self): |
|
666 self.assertEqual(len(psutil.cpu_times_percent(interval=0.001, |
|
667 percpu=True)), |
|
668 psutil.NUM_CPUS) |
|
669 for x in range(1000): |
|
670 cpus = psutil.cpu_times_percent(interval=None, percpu=True) |
|
671 for cpu in cpus: |
|
672 for percent in cpu: |
|
673 self._test_cpu_percent(percent) |
|
674 self._test_cpu_percent(sum(cpu)) |
|
675 |
|
676 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
|
677 "os.statvfs() function not available on this platform") |
|
678 def test_disk_usage(self): |
|
679 usage = psutil.disk_usage(os.getcwd()) |
|
680 assert usage.total > 0, usage |
|
681 assert usage.used > 0, usage |
|
682 assert usage.free > 0, usage |
|
683 assert usage.total > usage.used, usage |
|
684 assert usage.total > usage.free, usage |
|
685 assert 0 <= usage.percent <= 100, usage.percent |
|
686 |
|
687 # if path does not exist OSError ENOENT is expected across |
|
688 # all platforms |
|
689 fname = tempfile.mktemp() |
|
690 try: |
|
691 psutil.disk_usage(fname) |
|
692 except OSError: |
|
693 err = sys.exc_info()[1] |
|
694 if err.args[0] != errno.ENOENT: |
|
695 raise |
|
696 else: |
|
697 self.fail("OSError not raised") |
|
698 |
|
699 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
|
700 "os.statvfs() function not available on this platform") |
|
701 def test_disk_partitions(self): |
|
702 # all = False |
|
703 for disk in psutil.disk_partitions(all=False): |
|
704 if WINDOWS and 'cdrom' in disk.opts: |
|
705 continue |
|
706 if not POSIX: |
|
707 assert os.path.exists(disk.device), disk |
|
708 else: |
|
709 # we cannot make any assumption about this, see: |
|
710 # http://goo.gl/p9c43 |
|
711 disk.device |
|
712 if SUNOS: |
|
713 # on solaris apparently mount points can also be files |
|
714 assert os.path.exists(disk.mountpoint), disk |
|
715 else: |
|
716 assert os.path.isdir(disk.mountpoint), disk |
|
717 assert disk.fstype, disk |
|
718 self.assertIsInstance(disk.opts, str) |
|
719 |
|
720 # all = True |
|
721 for disk in psutil.disk_partitions(all=True): |
|
722 if not WINDOWS: |
|
723 try: |
|
724 os.stat(disk.mountpoint) |
|
725 except OSError: |
|
726 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html |
|
727 err = sys.exc_info()[1] |
|
728 if err.errno not in (errno.EPERM, errno.EACCES): |
|
729 raise |
|
730 else: |
|
731 if SUNOS: |
|
732 # on solaris apparently mount points can also be files |
|
733 assert os.path.exists(disk.mountpoint), disk |
|
734 else: |
|
735 assert os.path.isdir(disk.mountpoint), disk |
|
736 self.assertIsInstance(disk.fstype, str) |
|
737 self.assertIsInstance(disk.opts, str) |
|
738 |
|
739 def find_mount_point(path): |
|
740 path = os.path.abspath(path) |
|
741 while not os.path.ismount(path): |
|
742 path = os.path.dirname(path) |
|
743 return path |
|
744 |
|
745 mount = find_mount_point(__file__) |
|
746 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
|
747 self.assertIn(mount, mounts) |
|
748 psutil.disk_usage(mount) |
|
749 |
|
750 def test_net_io_counters(self): |
|
751 def check_ntuple(nt): |
|
752 self.assertEqual(nt[0], nt.bytes_sent) |
|
753 self.assertEqual(nt[1], nt.bytes_recv) |
|
754 self.assertEqual(nt[2], nt.packets_sent) |
|
755 self.assertEqual(nt[3], nt.packets_recv) |
|
756 self.assertEqual(nt[4], nt.errin) |
|
757 self.assertEqual(nt[5], nt.errout) |
|
758 self.assertEqual(nt[6], nt.dropin) |
|
759 self.assertEqual(nt[7], nt.dropout) |
|
760 assert nt.bytes_sent >= 0, nt |
|
761 assert nt.bytes_recv >= 0, nt |
|
762 assert nt.packets_sent >= 0, nt |
|
763 assert nt.packets_recv >= 0, nt |
|
764 assert nt.errin >= 0, nt |
|
765 assert nt.errout >= 0, nt |
|
766 assert nt.dropin >= 0, nt |
|
767 assert nt.dropout >= 0, nt |
|
768 |
|
769 ret = psutil.net_io_counters(pernic=False) |
|
770 check_ntuple(ret) |
|
771 ret = psutil.net_io_counters(pernic=True) |
|
772 assert ret != [] |
|
773 for key in ret: |
|
774 assert key |
|
775 check_ntuple(ret[key]) |
|
776 |
|
777 def test_disk_io_counters(self): |
|
778 def check_ntuple(nt): |
|
779 self.assertEqual(nt[0], nt.read_count) |
|
780 self.assertEqual(nt[1], nt.write_count) |
|
781 self.assertEqual(nt[2], nt.read_bytes) |
|
782 self.assertEqual(nt[3], nt.write_bytes) |
|
783 self.assertEqual(nt[4], nt.read_time) |
|
784 self.assertEqual(nt[5], nt.write_time) |
|
785 assert nt.read_count >= 0, nt |
|
786 assert nt.write_count >= 0, nt |
|
787 assert nt.read_bytes >= 0, nt |
|
788 assert nt.write_bytes >= 0, nt |
|
789 assert nt.read_time >= 0, nt |
|
790 assert nt.write_time >= 0, nt |
|
791 |
|
792 ret = psutil.disk_io_counters(perdisk=False) |
|
793 check_ntuple(ret) |
|
794 ret = psutil.disk_io_counters(perdisk=True) |
|
795 # make sure there are no duplicates |
|
796 self.assertEqual(len(ret), len(set(ret))) |
|
797 for key in ret: |
|
798 assert key, key |
|
799 check_ntuple(ret[key]) |
|
800 if LINUX and key[-1].isdigit(): |
|
801 # if 'sda1' is listed 'sda' shouldn't, see: |
|
802 # http://code.google.com/p/psutil/issues/detail?id=338 |
|
803 while key[-1].isdigit(): |
|
804 key = key[:-1] |
|
805 self.assertNotIn(key, ret.keys()) |
|
806 |
|
807 def test_get_users(self): |
|
808 users = psutil.get_users() |
|
809 assert users |
|
810 for user in users: |
|
811 assert user.name, user |
|
812 user.terminal |
|
813 user.host |
|
814 assert user.started > 0.0, user |
|
815 datetime.datetime.fromtimestamp(user.started) |
|
816 |
|
817 |
|
818 # =================================================================== |
|
819 # --- psutil.Process class tests |
|
820 # =================================================================== |
|
821 |
|
822 class TestProcess(unittest.TestCase): |
|
823 """Tests for psutil.Process class.""" |
|
824 |
|
825 def setUp(self): |
|
826 safe_remove(TESTFN) |
|
827 |
|
828 def tearDown(self): |
|
829 reap_children() |
|
830 |
|
831 def test_kill(self): |
|
832 sproc = get_test_subprocess(wait=True) |
|
833 test_pid = sproc.pid |
|
834 p = psutil.Process(test_pid) |
|
835 name = p.name |
|
836 p.kill() |
|
837 p.wait() |
|
838 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
|
839 |
|
840 def test_terminate(self): |
|
841 sproc = get_test_subprocess(wait=True) |
|
842 test_pid = sproc.pid |
|
843 p = psutil.Process(test_pid) |
|
844 name = p.name |
|
845 p.terminate() |
|
846 p.wait() |
|
847 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
|
848 |
|
849 def test_send_signal(self): |
|
850 if POSIX: |
|
851 sig = signal.SIGKILL |
|
852 else: |
|
853 sig = signal.SIGTERM |
|
854 sproc = get_test_subprocess() |
|
855 test_pid = sproc.pid |
|
856 p = psutil.Process(test_pid) |
|
857 name = p.name |
|
858 p.send_signal(sig) |
|
859 p.wait() |
|
860 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
|
861 |
|
862 def test_wait(self): |
|
863 # check exit code signal |
|
864 sproc = get_test_subprocess() |
|
865 p = psutil.Process(sproc.pid) |
|
866 p.kill() |
|
867 code = p.wait() |
|
868 if os.name == 'posix': |
|
869 self.assertEqual(code, signal.SIGKILL) |
|
870 else: |
|
871 self.assertEqual(code, 0) |
|
872 self.assertFalse(p.is_running()) |
|
873 |
|
874 sproc = get_test_subprocess() |
|
875 p = psutil.Process(sproc.pid) |
|
876 p.terminate() |
|
877 code = p.wait() |
|
878 if os.name == 'posix': |
|
879 self.assertEqual(code, signal.SIGTERM) |
|
880 else: |
|
881 self.assertEqual(code, 0) |
|
882 self.assertFalse(p.is_running()) |
|
883 |
|
884 # check sys.exit() code |
|
885 code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
|
886 sproc = get_test_subprocess([PYTHON, "-c", code]) |
|
887 p = psutil.Process(sproc.pid) |
|
888 self.assertEqual(p.wait(), 5) |
|
889 self.assertFalse(p.is_running()) |
|
890 |
|
891 # Test wait() issued twice. |
|
892 # It is not supposed to raise NSP when the process is gone. |
|
893 # On UNIX this should return None, on Windows it should keep |
|
894 # returning the exit code. |
|
895 sproc = get_test_subprocess([PYTHON, "-c", code]) |
|
896 p = psutil.Process(sproc.pid) |
|
897 self.assertEqual(p.wait(), 5) |
|
898 self.assertIn(p.wait(), (5, None)) |
|
899 |
|
900 # test timeout |
|
901 sproc = get_test_subprocess() |
|
902 p = psutil.Process(sproc.pid) |
|
903 p.name |
|
904 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
|
905 |
|
906 # timeout < 0 not allowed |
|
907 self.assertRaises(ValueError, p.wait, -1) |
|
908 |
|
909 @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows? |
|
910 def test_wait_non_children(self): |
|
911 # test wait() against processes which are not our children |
|
912 code = "import sys;" |
|
913 code += "from subprocess import Popen, PIPE;" |
|
914 code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON |
|
915 code += "sp = Popen(cmd, stdout=PIPE);" |
|
916 code += "sys.stdout.write(str(sp.pid));" |
|
917 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE) |
|
918 |
|
919 grandson_pid = int(sproc.stdout.read()) |
|
920 grandson_proc = psutil.Process(grandson_pid) |
|
921 try: |
|
922 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
|
923 grandson_proc.kill() |
|
924 ret = grandson_proc.wait() |
|
925 self.assertEqual(ret, None) |
|
926 finally: |
|
927 if grandson_proc.is_running(): |
|
928 grandson_proc.kill() |
|
929 grandson_proc.wait() |
|
930 |
|
931 def test_wait_timeout_0(self): |
|
932 sproc = get_test_subprocess() |
|
933 p = psutil.Process(sproc.pid) |
|
934 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
|
935 p.kill() |
|
936 stop_at = time.time() + 2 |
|
937 while 1: |
|
938 try: |
|
939 code = p.wait(0) |
|
940 except psutil.TimeoutExpired: |
|
941 if time.time() >= stop_at: |
|
942 raise |
|
943 else: |
|
944 break |
|
945 if os.name == 'posix': |
|
946 self.assertEqual(code, signal.SIGKILL) |
|
947 else: |
|
948 self.assertEqual(code, 0) |
|
949 self.assertFalse(p.is_running()) |
|
950 |
|
951 def test_cpu_percent(self): |
|
952 p = psutil.Process(os.getpid()) |
|
953 p.get_cpu_percent(interval=0.001) |
|
954 p.get_cpu_percent(interval=0.001) |
|
955 for x in range(100): |
|
956 percent = p.get_cpu_percent(interval=None) |
|
957 self.assertIsInstance(percent, float) |
|
958 self.assertGreaterEqual(percent, 0.0) |
|
959 if os.name != 'posix': |
|
960 self.assertLessEqual(percent, 100.0) |
|
961 else: |
|
962 self.assertGreaterEqual(percent, 0.0) |
|
963 |
|
964 def test_cpu_times(self): |
|
965 times = psutil.Process(os.getpid()).get_cpu_times() |
|
966 assert (times.user > 0.0) or (times.system > 0.0), times |
|
967 # make sure returned values can be pretty printed with strftime |
|
968 time.strftime("%H:%M:%S", time.localtime(times.user)) |
|
969 time.strftime("%H:%M:%S", time.localtime(times.system)) |
|
970 |
|
971 # Test Process.cpu_times() against os.times() |
|
972 # os.times() is broken on Python 2.6 |
|
973 # http://bugs.python.org/issue1040026 |
|
974 # XXX fails on OSX: not sure if it's for os.times(). We should |
|
975 # try this with Python 2.7 and re-enable the test. |
|
976 |
|
977 @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, |
|
978 'os.times() is not reliable on this Python version') |
|
979 def test_cpu_times2(self): |
|
980 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
|
981 utime, ktime = os.times()[:2] |
|
982 |
|
983 # Use os.times()[:2] as base values to compare our results |
|
984 # using a tolerance of +/- 0.1 seconds. |
|
985 # It will fail if the difference between the values is > 0.1s. |
|
986 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
|
987 self.fail("expected: %s, found: %s" %(utime, user_time)) |
|
988 |
|
989 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
|
990 self.fail("expected: %s, found: %s" %(ktime, kernel_time)) |
|
991 |
|
992 def test_create_time(self): |
|
993 sproc = get_test_subprocess(wait=True) |
|
994 now = time.time() |
|
995 p = psutil.Process(sproc.pid) |
|
996 create_time = p.create_time |
|
997 |
|
998 # Use time.time() as base value to compare our result using a |
|
999 # tolerance of +/- 1 second. |
|
1000 # It will fail if the difference between the values is > 2s. |
|
1001 difference = abs(create_time - now) |
|
1002 if difference > 2: |
|
1003 self.fail("expected: %s, found: %s, difference: %s" |
|
1004 % (now, create_time, difference)) |
|
1005 |
|
1006 # make sure returned value can be pretty printed with strftime |
|
1007 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
|
1008 |
|
1009 @unittest.skipIf(WINDOWS, 'windows only') |
|
1010 def test_terminal(self): |
|
1011 terminal = psutil.Process(os.getpid()).terminal |
|
1012 if sys.stdin.isatty(): |
|
1013 self.assertEqual(terminal, sh('tty')) |
|
1014 else: |
|
1015 assert terminal, repr(terminal) |
|
1016 |
|
1017 @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'), |
|
1018 'not available on this platform') |
|
1019 @skip_on_not_implemented(only_if=LINUX) |
|
1020 def test_get_io_counters(self): |
|
1021 p = psutil.Process(os.getpid()) |
|
1022 # test reads |
|
1023 io1 = p.get_io_counters() |
|
1024 f = open(PYTHON, 'rb') |
|
1025 f.read() |
|
1026 f.close() |
|
1027 io2 = p.get_io_counters() |
|
1028 if not BSD: |
|
1029 assert io2.read_count > io1.read_count, (io1, io2) |
|
1030 self.assertEqual(io2.write_count, io1.write_count) |
|
1031 assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
|
1032 assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
|
1033 # test writes |
|
1034 io1 = p.get_io_counters() |
|
1035 f = tempfile.TemporaryFile() |
|
1036 if PY3: |
|
1037 f.write(bytes("x" * 1000000, 'ascii')) |
|
1038 else: |
|
1039 f.write("x" * 1000000) |
|
1040 f.close() |
|
1041 io2 = p.get_io_counters() |
|
1042 assert io2.write_count >= io1.write_count, (io1, io2) |
|
1043 assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
|
1044 assert io2.read_count >= io1.read_count, (io1, io2) |
|
1045 assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
|
1046 |
|
1047 # Linux and Windows Vista+ |
|
1048 @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'), |
|
1049 'Linux and Windows Vista only') |
|
1050 def test_get_set_ionice(self): |
|
1051 if LINUX: |
|
1052 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, |
|
1053 IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) |
|
1054 self.assertEqual(IOPRIO_CLASS_NONE, 0) |
|
1055 self.assertEqual(IOPRIO_CLASS_RT, 1) |
|
1056 self.assertEqual(IOPRIO_CLASS_BE, 2) |
|
1057 self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
|
1058 p = psutil.Process(os.getpid()) |
|
1059 try: |
|
1060 p.set_ionice(2) |
|
1061 ioclass, value = p.get_ionice() |
|
1062 self.assertEqual(ioclass, 2) |
|
1063 self.assertEqual(value, 4) |
|
1064 # |
|
1065 p.set_ionice(3) |
|
1066 ioclass, value = p.get_ionice() |
|
1067 self.assertEqual(ioclass, 3) |
|
1068 self.assertEqual(value, 0) |
|
1069 # |
|
1070 p.set_ionice(2, 0) |
|
1071 ioclass, value = p.get_ionice() |
|
1072 self.assertEqual(ioclass, 2) |
|
1073 self.assertEqual(value, 0) |
|
1074 p.set_ionice(2, 7) |
|
1075 ioclass, value = p.get_ionice() |
|
1076 self.assertEqual(ioclass, 2) |
|
1077 self.assertEqual(value, 7) |
|
1078 self.assertRaises(ValueError, p.set_ionice, 2, 10) |
|
1079 finally: |
|
1080 p.set_ionice(IOPRIO_CLASS_NONE) |
|
1081 else: |
|
1082 p = psutil.Process(os.getpid()) |
|
1083 original = p.get_ionice() |
|
1084 try: |
|
1085 value = 0 # very low |
|
1086 if original == value: |
|
1087 value = 1 # low |
|
1088 p.set_ionice(value) |
|
1089 self.assertEqual(p.get_ionice(), value) |
|
1090 finally: |
|
1091 p.set_ionice(original) |
|
1092 # |
|
1093 self.assertRaises(ValueError, p.set_ionice, 3) |
|
1094 self.assertRaises(TypeError, p.set_ionice, 2, 1) |
|
1095 |
|
1096 def test_get_num_threads(self): |
|
1097 # on certain platforms such as Linux we might test for exact |
|
1098 # thread number, since we always have with 1 thread per process, |
|
1099 # but this does not apply across all platforms (OSX, Windows) |
|
1100 p = psutil.Process(os.getpid()) |
|
1101 step1 = p.get_num_threads() |
|
1102 |
|
1103 thread = ThreadTask() |
|
1104 thread.start() |
|
1105 try: |
|
1106 step2 = p.get_num_threads() |
|
1107 self.assertEqual(step2, step1 + 1) |
|
1108 thread.stop() |
|
1109 finally: |
|
1110 if thread._running: |
|
1111 thread.stop() |
|
1112 |
|
1113 @unittest.skipUnless(WINDOWS, 'Windows only') |
|
1114 def test_get_num_handles(self): |
|
1115 # a better test is done later into test/_windows.py |
|
1116 p = psutil.Process(os.getpid()) |
|
1117 self.assertGreater(p.get_num_handles(), 0) |
|
1118 |
|
1119 def test_get_threads(self): |
|
1120 p = psutil.Process(os.getpid()) |
|
1121 step1 = p.get_threads() |
|
1122 |
|
1123 thread = ThreadTask() |
|
1124 thread.start() |
|
1125 |
|
1126 try: |
|
1127 step2 = p.get_threads() |
|
1128 self.assertEqual(len(step2), len(step1) + 1) |
|
1129 # on Linux, first thread id is supposed to be this process |
|
1130 if LINUX: |
|
1131 self.assertEqual(step2[0].id, os.getpid()) |
|
1132 athread = step2[0] |
|
1133 # test named tuple |
|
1134 self.assertEqual(athread.id, athread[0]) |
|
1135 self.assertEqual(athread.user_time, athread[1]) |
|
1136 self.assertEqual(athread.system_time, athread[2]) |
|
1137 # test num threads |
|
1138 thread.stop() |
|
1139 finally: |
|
1140 if thread._running: |
|
1141 thread.stop() |
|
1142 |
|
1143 def test_get_memory_info(self): |
|
1144 p = psutil.Process(os.getpid()) |
|
1145 |
|
1146 # step 1 - get a base value to compare our results |
|
1147 rss1, vms1 = p.get_memory_info() |
|
1148 percent1 = p.get_memory_percent() |
|
1149 self.assertGreater(rss1, 0) |
|
1150 self.assertGreater(vms1, 0) |
|
1151 |
|
1152 # step 2 - allocate some memory |
|
1153 memarr = [None] * 1500000 |
|
1154 |
|
1155 rss2, vms2 = p.get_memory_info() |
|
1156 percent2 = p.get_memory_percent() |
|
1157 # make sure that the memory usage bumped up |
|
1158 self.assertGreater(rss2, rss1) |
|
1159 self.assertGreaterEqual(vms2, vms1) # vms might be equal |
|
1160 self.assertGreater(percent2, percent1) |
|
1161 del memarr |
|
1162 |
|
1163 # def test_get_ext_memory_info(self): |
|
1164 # # tested later in fetch all test suite |
|
1165 |
|
1166 def test_get_memory_maps(self): |
|
1167 p = psutil.Process(os.getpid()) |
|
1168 maps = p.get_memory_maps() |
|
1169 paths = [x for x in maps] |
|
1170 self.assertEqual(len(paths), len(set(paths))) |
|
1171 ext_maps = p.get_memory_maps(grouped=False) |
|
1172 |
|
1173 for nt in maps: |
|
1174 if not nt.path.startswith('['): |
|
1175 assert os.path.isabs(nt.path), nt.path |
|
1176 if POSIX: |
|
1177 assert os.path.exists(nt.path), nt.path |
|
1178 else: |
|
1179 # XXX - On Windows we have this strange behavior with |
|
1180 # 64 bit dlls: they are visible via explorer but cannot |
|
1181 # be accessed via os.stat() (wtf?). |
|
1182 if '64' not in os.path.basename(nt.path): |
|
1183 assert os.path.exists(nt.path), nt.path |
|
1184 for nt in ext_maps: |
|
1185 for fname in nt._fields: |
|
1186 value = getattr(nt, fname) |
|
1187 if fname == 'path': |
|
1188 continue |
|
1189 elif fname in ('addr', 'perms'): |
|
1190 assert value, value |
|
1191 else: |
|
1192 self.assertIsInstance(value, (int, long)) |
|
1193 assert value >= 0, value |
|
1194 |
|
1195 def test_get_memory_percent(self): |
|
1196 p = psutil.Process(os.getpid()) |
|
1197 self.assertGreater(p.get_memory_percent(), 0.0) |
|
1198 |
|
1199 def test_pid(self): |
|
1200 sproc = get_test_subprocess() |
|
1201 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
|
1202 |
|
1203 def test_is_running(self): |
|
1204 sproc = get_test_subprocess(wait=True) |
|
1205 p = psutil.Process(sproc.pid) |
|
1206 assert p.is_running() |
|
1207 assert p.is_running() |
|
1208 p.kill() |
|
1209 p.wait() |
|
1210 assert not p.is_running() |
|
1211 assert not p.is_running() |
|
1212 |
|
1213 def test_exe(self): |
|
1214 sproc = get_test_subprocess(wait=True) |
|
1215 exe = psutil.Process(sproc.pid).exe |
|
1216 try: |
|
1217 self.assertEqual(exe, PYTHON) |
|
1218 except AssertionError: |
|
1219 if WINDOWS and len(exe) == len(PYTHON): |
|
1220 # on Windows we don't care about case sensitivity |
|
1221 self.assertEqual(exe.lower(), PYTHON.lower()) |
|
1222 else: |
|
1223 # certain platforms such as BSD are more accurate returning: |
|
1224 # "/usr/local/bin/python2.7" |
|
1225 # ...instead of: |
|
1226 # "/usr/local/bin/python" |
|
1227 # We do not want to consider this difference in accuracy |
|
1228 # an error. |
|
1229 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) |
|
1230 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) |
|
1231 |
|
1232 def test_cmdline(self): |
|
1233 cmdline = [PYTHON, "-c", "import time; time.sleep(2)"] |
|
1234 sproc = get_test_subprocess(cmdline, wait=True) |
|
1235 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline), |
|
1236 ' '.join(cmdline)) |
|
1237 |
|
1238 def test_name(self): |
|
1239 sproc = get_test_subprocess(PYTHON, wait=True) |
|
1240 name = psutil.Process(sproc.pid).name.lower() |
|
1241 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() |
|
1242 assert pyexe.startswith(name), (pyexe, name) |
|
1243 |
|
1244 @unittest.skipUnless(POSIX, 'posix only') |
|
1245 def test_uids(self): |
|
1246 p = psutil.Process(os.getpid()) |
|
1247 real, effective, saved = p.uids |
|
1248 # os.getuid() refers to "real" uid |
|
1249 self.assertEqual(real, os.getuid()) |
|
1250 # os.geteuid() refers to "effective" uid |
|
1251 self.assertEqual(effective, os.geteuid()) |
|
1252 # no such thing as os.getsuid() ("saved" uid), but starting |
|
1253 # from python 2.7 we have os.getresuid()[2] |
|
1254 if hasattr(os, "getresuid"): |
|
1255 self.assertEqual(saved, os.getresuid()[2]) |
|
1256 |
|
1257 @unittest.skipUnless(POSIX, 'posix only') |
|
1258 def test_gids(self): |
|
1259 p = psutil.Process(os.getpid()) |
|
1260 real, effective, saved = p.gids |
|
1261 # os.getuid() refers to "real" uid |
|
1262 self.assertEqual(real, os.getgid()) |
|
1263 # os.geteuid() refers to "effective" uid |
|
1264 self.assertEqual(effective, os.getegid()) |
|
1265 # no such thing as os.getsuid() ("saved" uid), but starting |
|
1266 # from python 2.7 we have os.getresgid()[2] |
|
1267 if hasattr(os, "getresuid"): |
|
1268 self.assertEqual(saved, os.getresgid()[2]) |
|
1269 |
|
1270 def test_nice(self): |
|
1271 p = psutil.Process(os.getpid()) |
|
1272 self.assertRaises(TypeError, p.set_nice, "str") |
|
1273 if os.name == 'nt': |
|
1274 try: |
|
1275 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
|
1276 p.set_nice(psutil.HIGH_PRIORITY_CLASS) |
|
1277 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS) |
|
1278 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
|
1279 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
|
1280 finally: |
|
1281 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
|
1282 else: |
|
1283 try: |
|
1284 try: |
|
1285 first_nice = p.get_nice() |
|
1286 p.set_nice(1) |
|
1287 self.assertEqual(p.get_nice(), 1) |
|
1288 # going back to previous nice value raises AccessDenied on OSX |
|
1289 if not OSX: |
|
1290 p.set_nice(0) |
|
1291 self.assertEqual(p.get_nice(), 0) |
|
1292 except psutil.AccessDenied: |
|
1293 pass |
|
1294 finally: |
|
1295 try: |
|
1296 p.set_nice(first_nice) |
|
1297 except psutil.AccessDenied: |
|
1298 pass |
|
1299 |
|
1300 def test_status(self): |
|
1301 p = psutil.Process(os.getpid()) |
|
1302 self.assertEqual(p.status, psutil.STATUS_RUNNING) |
|
1303 self.assertEqual(str(p.status), "running") |
|
1304 |
|
1305 def test_status_constants(self): |
|
1306 # STATUS_* constants are supposed to be comparable also by |
|
1307 # using their str representation |
|
1308 self.assertTrue(psutil.STATUS_RUNNING == 0) |
|
1309 self.assertTrue(psutil.STATUS_RUNNING == long(0)) |
|
1310 self.assertTrue(psutil.STATUS_RUNNING == 'running') |
|
1311 self.assertFalse(psutil.STATUS_RUNNING == 1) |
|
1312 self.assertFalse(psutil.STATUS_RUNNING == 'sleeping') |
|
1313 self.assertFalse(psutil.STATUS_RUNNING != 0) |
|
1314 self.assertFalse(psutil.STATUS_RUNNING != 'running') |
|
1315 self.assertTrue(psutil.STATUS_RUNNING != 1) |
|
1316 self.assertTrue(psutil.STATUS_RUNNING != 'sleeping') |
|
1317 |
|
1318 def test_username(self): |
|
1319 sproc = get_test_subprocess() |
|
1320 p = psutil.Process(sproc.pid) |
|
1321 if POSIX: |
|
1322 import pwd |
|
1323 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
|
1324 elif WINDOWS and 'USERNAME' in os.environ: |
|
1325 expected_username = os.environ['USERNAME'] |
|
1326 expected_domain = os.environ['USERDOMAIN'] |
|
1327 domain, username = p.username.split('\\') |
|
1328 self.assertEqual(domain, expected_domain) |
|
1329 self.assertEqual(username, expected_username) |
|
1330 else: |
|
1331 p.username |
|
1332 |
|
1333 @unittest.skipUnless(hasattr(psutil.Process, "getcwd"), |
|
1334 'not available on this platform') |
|
1335 def test_getcwd(self): |
|
1336 sproc = get_test_subprocess(wait=True) |
|
1337 p = psutil.Process(sproc.pid) |
|
1338 self.assertEqual(p.getcwd(), os.getcwd()) |
|
1339 |
|
1340 @unittest.skipIf(not hasattr(psutil.Process, "getcwd"), |
|
1341 'not available on this platform') |
|
1342 def test_getcwd_2(self): |
|
1343 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"] |
|
1344 sproc = get_test_subprocess(cmd, wait=True) |
|
1345 p = psutil.Process(sproc.pid) |
|
1346 call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1) |
|
1347 |
|
1348 @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"), |
|
1349 'not available on this platform') |
|
1350 def test_cpu_affinity(self): |
|
1351 p = psutil.Process(os.getpid()) |
|
1352 initial = p.get_cpu_affinity() |
|
1353 all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) |
|
1354 # |
|
1355 for n in all_cpus: |
|
1356 p.set_cpu_affinity([n]) |
|
1357 self.assertEqual(p.get_cpu_affinity(), [n]) |
|
1358 # |
|
1359 p.set_cpu_affinity(all_cpus) |
|
1360 self.assertEqual(p.get_cpu_affinity(), all_cpus) |
|
1361 # |
|
1362 p.set_cpu_affinity(initial) |
|
1363 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] |
|
1364 self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu) |
|
1365 |
|
1366 def test_get_open_files(self): |
|
1367 # current process |
|
1368 p = psutil.Process(os.getpid()) |
|
1369 files = p.get_open_files() |
|
1370 self.assertFalse(TESTFN in files) |
|
1371 f = open(TESTFN, 'w') |
|
1372 call_until(p.get_open_files, "len(ret) != %i" % len(files)) |
|
1373 filenames = [x.path for x in p.get_open_files()] |
|
1374 self.assertIn(TESTFN, filenames) |
|
1375 f.close() |
|
1376 for file in filenames: |
|
1377 assert os.path.isfile(file), file |
|
1378 |
|
1379 # another process |
|
1380 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN |
|
1381 sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) |
|
1382 p = psutil.Process(sproc.pid) |
|
1383 for x in range(100): |
|
1384 filenames = [x.path for x in p.get_open_files()] |
|
1385 if TESTFN in filenames: |
|
1386 break |
|
1387 time.sleep(.01) |
|
1388 else: |
|
1389 self.assertIn(TESTFN, filenames) |
|
1390 for file in filenames: |
|
1391 assert os.path.isfile(file), file |
|
1392 |
|
1393 def test_get_open_files2(self): |
|
1394 # test fd and path fields |
|
1395 fileobj = open(TESTFN, 'w') |
|
1396 p = psutil.Process(os.getpid()) |
|
1397 for path, fd in p.get_open_files(): |
|
1398 if path == fileobj.name or fd == fileobj.fileno(): |
|
1399 break |
|
1400 else: |
|
1401 self.fail("no file found; files=%s" % repr(p.get_open_files())) |
|
1402 self.assertEqual(path, fileobj.name) |
|
1403 if WINDOWS: |
|
1404 self.assertEqual(fd, -1) |
|
1405 else: |
|
1406 self.assertEqual(fd, fileobj.fileno()) |
|
1407 # test positions |
|
1408 ntuple = p.get_open_files()[0] |
|
1409 self.assertEqual(ntuple[0], ntuple.path) |
|
1410 self.assertEqual(ntuple[1], ntuple.fd) |
|
1411 # test file is gone |
|
1412 fileobj.close() |
|
1413 self.assertTrue(fileobj.name not in p.get_open_files()) |
|
1414 |
|
1415 def test_connection_constants(self): |
|
1416 ints = [] |
|
1417 strs = [] |
|
1418 for name in dir(psutil): |
|
1419 if name.startswith('CONN_'): |
|
1420 num = getattr(psutil, name) |
|
1421 str_ = str(num) |
|
1422 assert str_.isupper(), str_ |
|
1423 assert str_ not in strs, str_ |
|
1424 assert num not in ints, num |
|
1425 ints.append(num) |
|
1426 strs.append(str_) |
|
1427 if SUNOS: |
|
1428 psutil.CONN_IDLE |
|
1429 psutil.CONN_BOUND |
|
1430 if WINDOWS: |
|
1431 psutil.CONN_DELETE_TCB |
|
1432 |
|
1433 def test_get_connections(self): |
|
1434 arg = "import socket, time;" \ |
|
1435 "s = socket.socket();" \ |
|
1436 "s.bind(('127.0.0.1', 0));" \ |
|
1437 "s.listen(1);" \ |
|
1438 "conn, addr = s.accept();" \ |
|
1439 "time.sleep(2);" |
|
1440 sproc = get_test_subprocess([PYTHON, "-c", arg]) |
|
1441 p = psutil.Process(sproc.pid) |
|
1442 for x in range(100): |
|
1443 if p.get_connections(): |
|
1444 # give the subprocess some more time to bind() |
|
1445 time.sleep(.01) |
|
1446 cons = p.get_connections() |
|
1447 break |
|
1448 time.sleep(.01) |
|
1449 self.assertEqual(len(cons), 1) |
|
1450 con = cons[0] |
|
1451 self.assertEqual(con.family, socket.AF_INET) |
|
1452 self.assertEqual(con.type, socket.SOCK_STREAM) |
|
1453 self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status)) |
|
1454 ip, port = con.laddr |
|
1455 self.assertEqual(ip, '127.0.0.1') |
|
1456 self.assertEqual(con.raddr, ()) |
|
1457 if WINDOWS or SUNOS: |
|
1458 self.assertEqual(con.fd, -1) |
|
1459 else: |
|
1460 assert con.fd > 0, con |
|
1461 # test positions |
|
1462 self.assertEqual(con[0], con.fd) |
|
1463 self.assertEqual(con[1], con.family) |
|
1464 self.assertEqual(con[2], con.type) |
|
1465 self.assertEqual(con[3], con.laddr) |
|
1466 self.assertEqual(con[4], con.raddr) |
|
1467 self.assertEqual(con[5], con.status) |
|
1468 # test kind arg |
|
1469 self.assertRaises(ValueError, p.get_connections, 'foo') |
|
1470 |
|
1471 @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported') |
|
1472 def test_get_connections_ipv6(self): |
|
1473 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
|
1474 s.bind(('::1', 0)) |
|
1475 s.listen(1) |
|
1476 cons = psutil.Process(os.getpid()).get_connections() |
|
1477 s.close() |
|
1478 self.assertEqual(len(cons), 1) |
|
1479 self.assertEqual(cons[0].laddr[0], '::1') |
|
1480 |
|
1481 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported') |
|
1482 def test_get_connections_unix(self): |
|
1483 # tcp |
|
1484 safe_remove(TESTFN) |
|
1485 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
1486 sock.bind(TESTFN) |
|
1487 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
|
1488 if conn.fd != -1: # != sunos and windows |
|
1489 self.assertEqual(conn.fd, sock.fileno()) |
|
1490 self.assertEqual(conn.family, socket.AF_UNIX) |
|
1491 self.assertEqual(conn.type, socket.SOCK_STREAM) |
|
1492 self.assertEqual(conn.laddr, TESTFN) |
|
1493 self.assertTrue(not conn.raddr) |
|
1494 self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status)) |
|
1495 sock.close() |
|
1496 # udp |
|
1497 safe_remove(TESTFN) |
|
1498 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
|
1499 sock.bind(TESTFN) |
|
1500 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
|
1501 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
|
1502 sock.close() |
|
1503 |
|
1504 @unittest.skipUnless(hasattr(socket, "fromfd"), |
|
1505 'socket.fromfd() is not availble') |
|
1506 @unittest.skipIf(WINDOWS or SUNOS, |
|
1507 'connection fd available on this platform') |
|
1508 def test_connection_fromfd(self): |
|
1509 sock = socket.socket() |
|
1510 sock.bind(('localhost', 0)) |
|
1511 sock.listen(1) |
|
1512 p = psutil.Process(os.getpid()) |
|
1513 for conn in p.get_connections(): |
|
1514 if conn.fd == sock.fileno(): |
|
1515 break |
|
1516 else: |
|
1517 sock.close() |
|
1518 self.fail("couldn't find socket fd") |
|
1519 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
|
1520 try: |
|
1521 self.assertEqual(dupsock.getsockname(), conn.laddr) |
|
1522 self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
|
1523 finally: |
|
1524 sock.close() |
|
1525 dupsock.close() |
|
1526 |
|
1527 def test_get_connections_all(self): |
|
1528 tcp_template = "import socket;" \ |
|
1529 "s = socket.socket($family, socket.SOCK_STREAM);" \ |
|
1530 "s.bind(('$addr', 0));" \ |
|
1531 "s.listen(1);" \ |
|
1532 "conn, addr = s.accept();" |
|
1533 |
|
1534 udp_template = "import socket, time;" \ |
|
1535 "s = socket.socket($family, socket.SOCK_DGRAM);" \ |
|
1536 "s.bind(('$addr', 0));" \ |
|
1537 "time.sleep(2);" |
|
1538 |
|
1539 from string import Template |
|
1540 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
|
1541 addr="127.0.0.1") |
|
1542 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
|
1543 addr="127.0.0.1") |
|
1544 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, |
|
1545 addr="::1") |
|
1546 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, |
|
1547 addr="::1") |
|
1548 |
|
1549 # launch various subprocess instantiating a socket of various |
|
1550 # families and types to enrich psutil results |
|
1551 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
|
1552 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
|
1553 if supports_ipv6(): |
|
1554 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
|
1555 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
|
1556 else: |
|
1557 tcp6_proc = None |
|
1558 udp6_proc = None |
|
1559 |
|
1560 # check matches against subprocesses just created |
|
1561 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
|
1562 "udp", "udp4", "udp6") |
|
1563 for p in psutil.Process(os.getpid()).get_children(): |
|
1564 for conn in p.get_connections(): |
|
1565 # TCP v4 |
|
1566 if p.pid == tcp4_proc.pid: |
|
1567 self.assertEqual(conn.family, socket.AF_INET) |
|
1568 self.assertEqual(conn.type, socket.SOCK_STREAM) |
|
1569 self.assertEqual(conn.laddr[0], "127.0.0.1") |
|
1570 self.assertEqual(conn.raddr, ()) |
|
1571 self.assertEqual(conn.status, psutil.CONN_LISTEN, |
|
1572 str(conn.status)) |
|
1573 for kind in all_kinds: |
|
1574 cons = p.get_connections(kind=kind) |
|
1575 if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
|
1576 assert cons != [], cons |
|
1577 else: |
|
1578 self.assertEqual(cons, [], cons) |
|
1579 # UDP v4 |
|
1580 elif p.pid == udp4_proc.pid: |
|
1581 self.assertEqual(conn.family, socket.AF_INET) |
|
1582 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
|
1583 self.assertEqual(conn.laddr[0], "127.0.0.1") |
|
1584 self.assertEqual(conn.raddr, ()) |
|
1585 self.assertEqual(conn.status, psutil.CONN_NONE, |
|
1586 str(conn.status)) |
|
1587 for kind in all_kinds: |
|
1588 cons = p.get_connections(kind=kind) |
|
1589 if kind in ("all", "inet", "inet4", "udp", "udp4"): |
|
1590 assert cons != [], cons |
|
1591 else: |
|
1592 self.assertEqual(cons, [], cons) |
|
1593 # TCP v6 |
|
1594 elif p.pid == getattr(tcp6_proc, "pid", None): |
|
1595 self.assertEqual(conn.family, socket.AF_INET6) |
|
1596 self.assertEqual(conn.type, socket.SOCK_STREAM) |
|
1597 self.assertIn(conn.laddr[0], ("::", "::1")) |
|
1598 self.assertEqual(conn.raddr, ()) |
|
1599 self.assertEqual(conn.status, psutil.CONN_LISTEN, |
|
1600 str(conn.status)) |
|
1601 for kind in all_kinds: |
|
1602 cons = p.get_connections(kind=kind) |
|
1603 if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
|
1604 assert cons != [], cons |
|
1605 else: |
|
1606 self.assertEqual(cons, [], cons) |
|
1607 # UDP v6 |
|
1608 elif p.pid == getattr(udp6_proc, "pid", None): |
|
1609 self.assertEqual(conn.family, socket.AF_INET6) |
|
1610 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
|
1611 self.assertIn(conn.laddr[0], ("::", "::1")) |
|
1612 self.assertEqual(conn.raddr, ()) |
|
1613 self.assertEqual(conn.status, psutil.CONN_NONE, |
|
1614 str(conn.status)) |
|
1615 for kind in all_kinds: |
|
1616 cons = p.get_connections(kind=kind) |
|
1617 if kind in ("all", "inet", "inet6", "udp", "udp6"): |
|
1618 assert cons != [], cons |
|
1619 else: |
|
1620 self.assertEqual(cons, [], cons) |
|
1621 |
|
1622 @unittest.skipUnless(POSIX, 'posix only') |
|
1623 def test_get_num_fds(self): |
|
1624 p = psutil.Process(os.getpid()) |
|
1625 start = p.get_num_fds() |
|
1626 file = open(TESTFN, 'w') |
|
1627 self.assertEqual(p.get_num_fds(), start + 1) |
|
1628 sock = socket.socket() |
|
1629 self.assertEqual(p.get_num_fds(), start + 2) |
|
1630 file.close() |
|
1631 sock.close() |
|
1632 self.assertEqual(p.get_num_fds(), start) |
|
1633 |
|
1634 @skip_on_not_implemented(only_if=LINUX) |
|
1635 def test_get_num_ctx_switches(self): |
|
1636 p = psutil.Process(os.getpid()) |
|
1637 before = sum(p.get_num_ctx_switches()) |
|
1638 for x in range(500000): |
|
1639 after = sum(p.get_num_ctx_switches()) |
|
1640 if after > before: |
|
1641 return |
|
1642 self.fail("num ctx switches still the same after 50.000 iterations") |
|
1643 |
|
1644 def test_parent_ppid(self): |
|
1645 this_parent = os.getpid() |
|
1646 sproc = get_test_subprocess() |
|
1647 p = psutil.Process(sproc.pid) |
|
1648 self.assertEqual(p.ppid, this_parent) |
|
1649 self.assertEqual(p.parent.pid, this_parent) |
|
1650 # no other process is supposed to have us as parent |
|
1651 for p in psutil.process_iter(): |
|
1652 if p.pid == sproc.pid: |
|
1653 continue |
|
1654 self.assertTrue(p.ppid != this_parent) |
|
1655 |
|
1656 def test_get_children(self): |
|
1657 p = psutil.Process(os.getpid()) |
|
1658 self.assertEqual(p.get_children(), []) |
|
1659 self.assertEqual(p.get_children(recursive=True), []) |
|
1660 sproc = get_test_subprocess() |
|
1661 children1 = p.get_children() |
|
1662 children2 = p.get_children(recursive=True) |
|
1663 for children in (children1, children2): |
|
1664 self.assertEqual(len(children), 1) |
|
1665 self.assertEqual(children[0].pid, sproc.pid) |
|
1666 self.assertEqual(children[0].ppid, os.getpid()) |
|
1667 |
|
1668 def test_get_children_recursive(self): |
|
1669 # here we create a subprocess which creates another one as in: |
|
1670 # A (parent) -> B (child) -> C (grandchild) |
|
1671 s = "import subprocess, os, sys, time;" |
|
1672 s += "PYTHON = os.path.realpath(sys.executable);" |
|
1673 s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];" |
|
1674 s += "subprocess.Popen(cmd);" |
|
1675 s += "time.sleep(2);" |
|
1676 get_test_subprocess(cmd=[PYTHON, "-c", s]) |
|
1677 p = psutil.Process(os.getpid()) |
|
1678 self.assertEqual(len(p.get_children(recursive=False)), 1) |
|
1679 # give the grandchild some time to start |
|
1680 stop_at = time.time() + 1.5 |
|
1681 while time.time() < stop_at: |
|
1682 children = p.get_children(recursive=True) |
|
1683 if len(children) > 1: |
|
1684 break |
|
1685 self.assertEqual(len(children), 2) |
|
1686 self.assertEqual(children[0].ppid, os.getpid()) |
|
1687 self.assertEqual(children[1].ppid, children[0].pid) |
|
1688 |
|
1689 def test_get_children_duplicates(self): |
|
1690 # find the process which has the highest number of children |
|
1691 from psutil._compat import defaultdict |
|
1692 table = defaultdict(int) |
|
1693 for p in psutil.process_iter(): |
|
1694 try: |
|
1695 table[p.ppid] += 1 |
|
1696 except psutil.Error: |
|
1697 pass |
|
1698 # this is the one, now let's make sure there are no duplicates |
|
1699 pid = sorted(table.items(), key=lambda x: x[1])[-1][0] |
|
1700 p = psutil.Process(pid) |
|
1701 try: |
|
1702 c = p.get_children(recursive=True) |
|
1703 except psutil.AccessDenied: # windows |
|
1704 pass |
|
1705 else: |
|
1706 self.assertEqual(len(c), len(set(c))) |
|
1707 |
|
1708 def test_suspend_resume(self): |
|
1709 sproc = get_test_subprocess(wait=True) |
|
1710 p = psutil.Process(sproc.pid) |
|
1711 p.suspend() |
|
1712 for x in range(100): |
|
1713 if p.status == psutil.STATUS_STOPPED: |
|
1714 break |
|
1715 time.sleep(0.01) |
|
1716 self.assertEqual(str(p.status), "stopped") |
|
1717 p.resume() |
|
1718 assert p.status != psutil.STATUS_STOPPED, p.status |
|
1719 |
|
1720 def test_invalid_pid(self): |
|
1721 self.assertRaises(TypeError, psutil.Process, "1") |
|
1722 self.assertRaises(TypeError, psutil.Process, None) |
|
1723 self.assertRaises(ValueError, psutil.Process, -1) |
|
1724 |
|
1725 def test_as_dict(self): |
|
1726 sproc = get_test_subprocess() |
|
1727 p = psutil.Process(sproc.pid) |
|
1728 d = p.as_dict() |
|
1729 try: |
|
1730 import json |
|
1731 except ImportError: |
|
1732 pass |
|
1733 else: |
|
1734 # dict is supposed to be hashable |
|
1735 json.dumps(d) |
|
1736 # |
|
1737 d = p.as_dict(attrs=['exe', 'name']) |
|
1738 self.assertEqual(sorted(d.keys()), ['exe', 'name']) |
|
1739 # |
|
1740 p = psutil.Process(min(psutil.get_pid_list())) |
|
1741 d = p.as_dict(attrs=['get_connections'], ad_value='foo') |
|
1742 if not isinstance(d['connections'], list): |
|
1743 self.assertEqual(d['connections'], 'foo') |
|
1744 |
|
1745 def test_zombie_process(self): |
|
1746 # Test that NoSuchProcess exception gets raised in case the |
|
1747 # process dies after we create the Process object. |
|
1748 # Example: |
|
1749 # >>> proc = Process(1234) |
|
1750 # >>> time.sleep(2) # time-consuming task, process dies in meantime |
|
1751 # >>> proc.name |
|
1752 # Refers to Issue #15 |
|
1753 sproc = get_test_subprocess() |
|
1754 p = psutil.Process(sproc.pid) |
|
1755 p.kill() |
|
1756 p.wait() |
|
1757 |
|
1758 for name in dir(p): |
|
1759 if name.startswith('_')\ |
|
1760 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', |
|
1761 'wait', 'set_cpu_affinity', 'create_time', 'set_nice', |
|
1762 'nice'): |
|
1763 continue |
|
1764 try: |
|
1765 meth = getattr(p, name) |
|
1766 if callable(meth): |
|
1767 meth() |
|
1768 except psutil.NoSuchProcess: |
|
1769 pass |
|
1770 except NotImplementedError: |
|
1771 pass |
|
1772 else: |
|
1773 self.fail("NoSuchProcess exception not raised for %r" % name) |
|
1774 |
|
1775 # other methods |
|
1776 try: |
|
1777 if os.name == 'posix': |
|
1778 p.set_nice(1) |
|
1779 else: |
|
1780 p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
|
1781 except psutil.NoSuchProcess: |
|
1782 pass |
|
1783 else: |
|
1784 self.fail("exception not raised") |
|
1785 if hasattr(p, 'set_ionice'): |
|
1786 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) |
|
1787 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
|
1788 self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0) |
|
1789 self.assertFalse(p.is_running()) |
|
1790 if hasattr(p, "set_cpu_affinity"): |
|
1791 self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0]) |
|
1792 |
|
1793 def test__str__(self): |
|
1794 sproc = get_test_subprocess() |
|
1795 p = psutil.Process(sproc.pid) |
|
1796 self.assertIn(str(sproc.pid), str(p)) |
|
1797 # python shows up as 'Python' in cmdline on OS X so test fails on OS X |
|
1798 if not OSX: |
|
1799 self.assertIn(os.path.basename(PYTHON), str(p)) |
|
1800 sproc = get_test_subprocess() |
|
1801 p = psutil.Process(sproc.pid) |
|
1802 p.kill() |
|
1803 p.wait() |
|
1804 self.assertIn(str(sproc.pid), str(p)) |
|
1805 self.assertIn("terminated", str(p)) |
|
1806 |
|
1807 @unittest.skipIf(LINUX, 'PID 0 not available on Linux') |
|
1808 def test_pid_0(self): |
|
1809 # Process(0) is supposed to work on all platforms except Linux |
|
1810 p = psutil.Process(0) |
|
1811 self.assertTrue(p.name) |
|
1812 |
|
1813 if os.name == 'posix': |
|
1814 try: |
|
1815 self.assertEqual(p.uids.real, 0) |
|
1816 self.assertEqual(p.gids.real, 0) |
|
1817 except psutil.AccessDenied: |
|
1818 pass |
|
1819 |
|
1820 self.assertIn(p.ppid, (0, 1)) |
|
1821 #self.assertEqual(p.exe, "") |
|
1822 p.cmdline |
|
1823 try: |
|
1824 p.get_num_threads() |
|
1825 except psutil.AccessDenied: |
|
1826 pass |
|
1827 |
|
1828 try: |
|
1829 p.get_memory_info() |
|
1830 except psutil.AccessDenied: |
|
1831 pass |
|
1832 |
|
1833 # username property |
|
1834 try: |
|
1835 if POSIX: |
|
1836 self.assertEqual(p.username, 'root') |
|
1837 elif WINDOWS: |
|
1838 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
|
1839 else: |
|
1840 p.username |
|
1841 except psutil.AccessDenied: |
|
1842 pass |
|
1843 |
|
1844 self.assertIn(0, psutil.get_pid_list()) |
|
1845 self.assertTrue(psutil.pid_exists(0)) |
|
1846 |
|
1847 def test__all__(self): |
|
1848 for name in dir(psutil): |
|
1849 if name in ('callable', 'defaultdict', 'error', 'namedtuple', |
|
1850 'test'): |
|
1851 continue |
|
1852 if not name.startswith('_'): |
|
1853 try: |
|
1854 __import__(name) |
|
1855 except ImportError: |
|
1856 if name not in psutil.__all__: |
|
1857 fun = getattr(psutil, name) |
|
1858 if fun is None: |
|
1859 continue |
|
1860 if 'deprecated' not in fun.__doc__.lower(): |
|
1861 self.fail('%r not in psutil.__all__' % name) |
|
1862 |
|
1863 def test_Popen(self): |
|
1864 # Popen class test |
|
1865 # XXX this test causes a ResourceWarning on Python 3 because |
|
1866 # psutil.__subproc instance doesn't get propertly freed. |
|
1867 # Not sure what to do though. |
|
1868 cmd = [PYTHON, "-c", "import time; time.sleep(2);"] |
|
1869 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
1870 try: |
|
1871 proc.name |
|
1872 proc.stdin |
|
1873 self.assertTrue(hasattr(proc, 'name')) |
|
1874 self.assertTrue(hasattr(proc, 'stdin')) |
|
1875 self.assertRaises(AttributeError, getattr, proc, 'foo') |
|
1876 finally: |
|
1877 proc.kill() |
|
1878 proc.wait() |
|
1879 |
|
1880 |
|
1881 # =================================================================== |
|
1882 # --- Featch all processes test |
|
1883 # =================================================================== |
|
1884 |
|
1885 class TestFetchAllProcesses(unittest.TestCase): |
|
1886 # Iterates over all running processes and performs some sanity |
|
1887 # checks against Process API's returned values. |
|
1888 |
|
1889 def setUp(self): |
|
1890 if POSIX: |
|
1891 import pwd |
|
1892 pall = pwd.getpwall() |
|
1893 self._uids = set([x.pw_uid for x in pall]) |
|
1894 self._usernames = set([x.pw_name for x in pall]) |
|
1895 |
|
1896 def test_fetch_all(self): |
|
1897 valid_procs = 0 |
|
1898 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
|
1899 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice', |
|
1900 'parent', 'get_children', 'pid'] |
|
1901 attrs = [] |
|
1902 for name in dir(psutil.Process): |
|
1903 if name.startswith("_"): |
|
1904 continue |
|
1905 if name.startswith("set_"): |
|
1906 continue |
|
1907 if name in excluded_names: |
|
1908 continue |
|
1909 attrs.append(name) |
|
1910 |
|
1911 default = object() |
|
1912 failures = [] |
|
1913 for name in attrs: |
|
1914 for p in psutil.process_iter(): |
|
1915 ret = default |
|
1916 try: |
|
1917 try: |
|
1918 attr = getattr(p, name, None) |
|
1919 if attr is not None and callable(attr): |
|
1920 ret = attr() |
|
1921 else: |
|
1922 ret = attr |
|
1923 valid_procs += 1 |
|
1924 except NotImplementedError: |
|
1925 register_warning("%r was skipped because not " |
|
1926 "implemented" % (self.__class__.__name__ + \ |
|
1927 '.test_' + name)) |
|
1928 except (psutil.NoSuchProcess, psutil.AccessDenied): |
|
1929 err = sys.exc_info()[1] |
|
1930 if isinstance(err, psutil.NoSuchProcess): |
|
1931 if psutil.pid_exists(p.pid): |
|
1932 # XXX race condition; we probably need |
|
1933 # to try figuring out the process |
|
1934 # identity before failing |
|
1935 self.fail("PID still exists but fun raised " \ |
|
1936 "NoSuchProcess") |
|
1937 self.assertEqual(err.pid, p.pid) |
|
1938 if err.name: |
|
1939 # make sure exception's name attr is set |
|
1940 # with the actual process name |
|
1941 self.assertEqual(err.name, p.name) |
|
1942 self.assertTrue(str(err)) |
|
1943 self.assertTrue(err.msg) |
|
1944 else: |
|
1945 if ret not in (0, 0.0, [], None, ''): |
|
1946 assert ret, ret |
|
1947 meth = getattr(self, name) |
|
1948 meth(ret) |
|
1949 except Exception: |
|
1950 err = sys.exc_info()[1] |
|
1951 s = '\n' + '=' * 70 + '\n' |
|
1952 s += "FAIL: test_%s (proc=%s" % (name, p) |
|
1953 if ret != default: |
|
1954 s += ", ret=%s)" % repr(ret) |
|
1955 s += ')\n' |
|
1956 s += '-' * 70 |
|
1957 s += "\n%s" % traceback.format_exc() |
|
1958 s = "\n".join((" " * 4) + i for i in s.splitlines()) |
|
1959 failures.append(s) |
|
1960 break |
|
1961 |
|
1962 if failures: |
|
1963 self.fail(''.join(failures)) |
|
1964 |
|
1965 # we should always have a non-empty list, not including PID 0 etc. |
|
1966 # special cases. |
|
1967 self.assertTrue(valid_procs > 0) |
|
1968 |
|
1969 def cmdline(self, ret): |
|
1970 pass |
|
1971 |
|
1972 def exe(self, ret): |
|
1973 if not ret: |
|
1974 self.assertEqual(ret, '') |
|
1975 else: |
|
1976 assert os.path.isabs(ret), ret |
|
1977 # Note: os.stat() may return False even if the file is there |
|
1978 # hence we skip the test, see: |
|
1979 # http://stackoverflow.com/questions/3112546/os-path-exists-lies |
|
1980 if POSIX: |
|
1981 assert os.path.isfile(ret), ret |
|
1982 if hasattr(os, 'access') and hasattr(os, "X_OK"): |
|
1983 # XXX may fail on OSX |
|
1984 self.assertTrue(os.access(ret, os.X_OK)) |
|
1985 |
|
1986 def ppid(self, ret): |
|
1987 self.assertTrue(ret >= 0) |
|
1988 |
|
1989 def name(self, ret): |
|
1990 self.assertTrue(isinstance(ret, str)) |
|
1991 self.assertTrue(ret) |
|
1992 |
|
1993 def create_time(self, ret): |
|
1994 self.assertTrue(ret > 0) |
|
1995 # this can't be taken for granted on all platforms |
|
1996 #self.assertGreaterEqual(ret, psutil.BOOT_TIME) |
|
1997 # make sure returned value can be pretty printed |
|
1998 # with strftime |
|
1999 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) |
|
2000 |
|
2001 def uids(self, ret): |
|
2002 for uid in ret: |
|
2003 self.assertTrue(uid >= 0) |
|
2004 self.assertIn(uid, self._uids) |
|
2005 |
|
2006 def gids(self, ret): |
|
2007 # note: testing all gids as above seems not to be reliable for |
|
2008 # gid == 30 (nodoby); not sure why. |
|
2009 for gid in ret: |
|
2010 self.assertTrue(gid >= 0) |
|
2011 #self.assertIn(uid, self.gids) |
|
2012 |
|
2013 def username(self, ret): |
|
2014 self.assertTrue(ret) |
|
2015 if os.name == 'posix': |
|
2016 self.assertIn(ret, self._usernames) |
|
2017 |
|
2018 def status(self, ret): |
|
2019 self.assertTrue(ret >= 0) |
|
2020 self.assertTrue(str(ret) != '?') |
|
2021 |
|
2022 def get_io_counters(self, ret): |
|
2023 for field in ret: |
|
2024 if field != -1: |
|
2025 self.assertTrue(field >= 0) |
|
2026 |
|
2027 def get_ionice(self, ret): |
|
2028 if LINUX: |
|
2029 self.assertTrue(ret.ioclass >= 0) |
|
2030 self.assertTrue(ret.value >= 0) |
|
2031 else: |
|
2032 self.assertTrue(ret >= 0) |
|
2033 self.assertIn(ret, (0, 1, 2)) |
|
2034 |
|
2035 def get_num_threads(self, ret): |
|
2036 self.assertTrue(ret >= 1) |
|
2037 |
|
2038 def get_threads(self, ret): |
|
2039 for t in ret: |
|
2040 self.assertTrue(t.id >= 0) |
|
2041 self.assertTrue(t.user_time >= 0) |
|
2042 self.assertTrue(t.system_time >= 0) |
|
2043 |
|
2044 def get_cpu_times(self, ret): |
|
2045 self.assertTrue(ret.user >= 0) |
|
2046 self.assertTrue(ret.system >= 0) |
|
2047 |
|
2048 def get_memory_info(self, ret): |
|
2049 self.assertTrue(ret.rss >= 0) |
|
2050 self.assertTrue(ret.vms >= 0) |
|
2051 |
|
2052 def get_ext_memory_info(self, ret): |
|
2053 for name in ret._fields: |
|
2054 self.assertTrue(getattr(ret, name) >= 0) |
|
2055 if POSIX and ret.vms != 0: |
|
2056 # VMS is always supposed to be the highest |
|
2057 for name in ret._fields: |
|
2058 if name != 'vms': |
|
2059 value = getattr(ret, name) |
|
2060 assert ret.vms > value, ret |
|
2061 elif WINDOWS: |
|
2062 assert ret.peak_wset >= ret.wset, ret |
|
2063 assert ret.peak_paged_pool >= ret.paged_pool, ret |
|
2064 assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret |
|
2065 assert ret.peak_pagefile >= ret.pagefile, ret |
|
2066 |
|
2067 def get_open_files(self, ret): |
|
2068 for f in ret: |
|
2069 if WINDOWS: |
|
2070 assert f.fd == -1, f |
|
2071 else: |
|
2072 self.assertIsInstance(f.fd, int) |
|
2073 assert os.path.isabs(f.path), f |
|
2074 assert os.path.isfile(f.path), f |
|
2075 |
|
2076 def get_num_fds(self, ret): |
|
2077 self.assertTrue(ret >= 0) |
|
2078 |
|
2079 def get_connections(self, ret): |
|
2080 # all values are supposed to match Linux's tcp_states.h states |
|
2081 # table across all platforms. |
|
2082 valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
|
2083 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
|
2084 "LAST_ACK", "LISTEN", "CLOSING", "NONE"] |
|
2085 if SUNOS: |
|
2086 valid_conn_states += ["IDLE", "BOUND"] |
|
2087 if WINDOWS: |
|
2088 valid_conn_states += ["DELETE_TCB"] |
|
2089 for conn in ret: |
|
2090 self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM)) |
|
2091 self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6)) |
|
2092 check_ip_address(conn.laddr, conn.family) |
|
2093 check_ip_address(conn.raddr, conn.family) |
|
2094 if conn.status not in valid_conn_states: |
|
2095 self.fail("%s is not a valid status" % conn.status) |
|
2096 # actually try to bind the local socket; ignore IPv6 |
|
2097 # sockets as their address might be represented as |
|
2098 # an IPv4-mapped-address (e.g. "::127.0.0.1") |
|
2099 # and that's rejected by bind() |
|
2100 if conn.family == socket.AF_INET: |
|
2101 s = socket.socket(conn.family, conn.type) |
|
2102 s.bind((conn.laddr[0], 0)) |
|
2103 s.close() |
|
2104 |
|
2105 if not WINDOWS and hasattr(socket, 'fromfd'): |
|
2106 dupsock = None |
|
2107 try: |
|
2108 try: |
|
2109 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
|
2110 except (socket.error, OSError): |
|
2111 err = sys.exc_info()[1] |
|
2112 if err.args[0] == errno.EBADF: |
|
2113 continue |
|
2114 raise |
|
2115 # python >= 2.5 |
|
2116 if hasattr(dupsock, "family"): |
|
2117 self.assertEqual(dupsock.family, conn.family) |
|
2118 self.assertEqual(dupsock.type, conn.type) |
|
2119 finally: |
|
2120 if dupsock is not None: |
|
2121 dupsock.close() |
|
2122 |
|
2123 def getcwd(self, ret): |
|
2124 if ret is not None: # BSD may return None |
|
2125 assert os.path.isabs(ret), ret |
|
2126 try: |
|
2127 st = os.stat(ret) |
|
2128 except OSError: |
|
2129 err = sys.exc_info()[1] |
|
2130 # directory has been removed in mean time |
|
2131 if err.errno != errno.ENOENT: |
|
2132 raise |
|
2133 else: |
|
2134 self.assertTrue(stat.S_ISDIR(st.st_mode)) |
|
2135 |
|
2136 def get_memory_percent(self, ret): |
|
2137 assert 0 <= ret <= 100, ret |
|
2138 |
|
2139 def is_running(self, ret): |
|
2140 self.assertTrue(ret) |
|
2141 |
|
2142 def get_cpu_affinity(self, ret): |
|
2143 assert ret != [], ret |
|
2144 |
|
2145 def terminal(self, ret): |
|
2146 if ret is not None: |
|
2147 assert os.path.isabs(ret), ret |
|
2148 assert os.path.exists(ret), ret |
|
2149 |
|
2150 def get_memory_maps(self, ret): |
|
2151 for nt in ret: |
|
2152 for fname in nt._fields: |
|
2153 value = getattr(nt, fname) |
|
2154 if fname == 'path': |
|
2155 if not value.startswith('['): |
|
2156 assert os.path.isabs(nt.path), nt.path |
|
2157 # commented as on Linux we might get '/foo/bar (deleted)' |
|
2158 #assert os.path.exists(nt.path), nt.path |
|
2159 elif fname in ('addr', 'perms'): |
|
2160 self.assertTrue(value) |
|
2161 else: |
|
2162 self.assertIsInstance(value, (int, long)) |
|
2163 assert value >= 0, value |
|
2164 |
|
2165 def get_num_handles(self, ret): |
|
2166 if WINDOWS: |
|
2167 self.assertGreaterEqual(ret, 0) |
|
2168 else: |
|
2169 self.assertGreaterEqual(ret, 0) |
|
2170 |
|
2171 def get_nice(self, ret): |
|
2172 if POSIX: |
|
2173 assert -20 <= ret <= 20, ret |
|
2174 else: |
|
2175 priorities = [getattr(psutil, x) for x in dir(psutil) |
|
2176 if x.endswith('_PRIORITY_CLASS')] |
|
2177 self.assertIn(ret, priorities) |
|
2178 |
|
2179 def get_num_ctx_switches(self, ret): |
|
2180 self.assertTrue(ret.voluntary >= 0) |
|
2181 self.assertTrue(ret.involuntary >= 0) |
|
2182 |
|
2183 |
|
2184 # =================================================================== |
|
2185 # --- Limited user tests |
|
2186 # =================================================================== |
|
2187 |
|
2188 if hasattr(os, 'getuid') and os.getuid() == 0: |
|
2189 class LimitedUserTestCase(TestProcess): |
|
2190 """Repeat the previous tests by using a limited user. |
|
2191 Executed only on UNIX and only if the user who run the test script |
|
2192 is root. |
|
2193 """ |
|
2194 # the uid/gid the test suite runs under |
|
2195 PROCESS_UID = os.getuid() |
|
2196 PROCESS_GID = os.getgid() |
|
2197 |
|
2198 def __init__(self, *args, **kwargs): |
|
2199 TestProcess.__init__(self, *args, **kwargs) |
|
2200 # re-define all existent test methods in order to |
|
2201 # ignore AccessDenied exceptions |
|
2202 for attr in [x for x in dir(self) if x.startswith('test')]: |
|
2203 meth = getattr(self, attr) |
|
2204 def test_(self): |
|
2205 try: |
|
2206 meth() |
|
2207 except psutil.AccessDenied: |
|
2208 pass |
|
2209 setattr(self, attr, types.MethodType(test_, self)) |
|
2210 |
|
2211 def setUp(self): |
|
2212 os.setegid(1000) |
|
2213 os.seteuid(1000) |
|
2214 TestProcess.setUp(self) |
|
2215 |
|
2216 def tearDown(self): |
|
2217 os.setegid(self.PROCESS_UID) |
|
2218 os.seteuid(self.PROCESS_GID) |
|
2219 TestProcess.tearDown(self) |
|
2220 |
|
2221 def test_nice(self): |
|
2222 try: |
|
2223 psutil.Process(os.getpid()).set_nice(-1) |
|
2224 except psutil.AccessDenied: |
|
2225 pass |
|
2226 else: |
|
2227 self.fail("exception not raised") |
|
2228 |
|
2229 |
|
2230 # =================================================================== |
|
2231 # --- Example script tests |
|
2232 # =================================================================== |
|
2233 |
|
2234 class TestExampleScripts(unittest.TestCase): |
|
2235 """Tests for scripts in the examples directory.""" |
|
2236 |
|
2237 def assert_stdout(self, exe, args=None): |
|
2238 exe = os.path.join(EXAMPLES_DIR, exe) |
|
2239 if args: |
|
2240 exe = exe + ' ' + args |
|
2241 try: |
|
2242 out = sh(sys.executable + ' ' + exe).strip() |
|
2243 except RuntimeError: |
|
2244 err = sys.exc_info()[1] |
|
2245 if 'AccessDenied' in str(err): |
|
2246 return str(err) |
|
2247 else: |
|
2248 raise |
|
2249 assert out, out |
|
2250 return out |
|
2251 |
|
2252 def assert_syntax(self, exe, args=None): |
|
2253 exe = os.path.join(EXAMPLES_DIR, exe) |
|
2254 f = open(exe, 'r') |
|
2255 try: |
|
2256 src = f.read() |
|
2257 finally: |
|
2258 f.close() |
|
2259 ast.parse(src) |
|
2260 |
|
2261 def test_check_presence(self): |
|
2262 # make sure all example scripts have a test method defined |
|
2263 meths = dir(self) |
|
2264 for name in os.listdir(EXAMPLES_DIR): |
|
2265 if name.endswith('.py'): |
|
2266 if 'test_' + os.path.splitext(name)[0] not in meths: |
|
2267 #self.assert_stdout(name) |
|
2268 self.fail('no test defined for %r script' \ |
|
2269 % os.path.join(EXAMPLES_DIR, name)) |
|
2270 |
|
2271 def test_disk_usage(self): |
|
2272 self.assert_stdout('disk_usage.py') |
|
2273 |
|
2274 def test_free(self): |
|
2275 self.assert_stdout('free.py') |
|
2276 |
|
2277 def test_meminfo(self): |
|
2278 self.assert_stdout('meminfo.py') |
|
2279 |
|
2280 def test_process_detail(self): |
|
2281 self.assert_stdout('process_detail.py') |
|
2282 |
|
2283 def test_who(self): |
|
2284 self.assert_stdout('who.py') |
|
2285 |
|
2286 def test_netstat(self): |
|
2287 self.assert_stdout('netstat.py') |
|
2288 |
|
2289 def test_pmap(self): |
|
2290 self.assert_stdout('pmap.py', args=str(os.getpid())) |
|
2291 |
|
2292 @unittest.skipIf(ast is None, |
|
2293 'ast module not available on this python version') |
|
2294 def test_killall(self): |
|
2295 self.assert_syntax('killall.py') |
|
2296 |
|
2297 @unittest.skipIf(ast is None, |
|
2298 'ast module not available on this python version') |
|
2299 def test_nettop(self): |
|
2300 self.assert_syntax('nettop.py') |
|
2301 |
|
2302 @unittest.skipIf(ast is None, |
|
2303 'ast module not available on this python version') |
|
2304 def test_top(self): |
|
2305 self.assert_syntax('top.py') |
|
2306 |
|
2307 @unittest.skipIf(ast is None, |
|
2308 'ast module not available on this python version') |
|
2309 def test_iotop(self): |
|
2310 self.assert_syntax('iotop.py') |
|
2311 |
|
2312 |
|
2313 def cleanup(): |
|
2314 reap_children(search_all=True) |
|
2315 DEVNULL.close() |
|
2316 safe_remove(TESTFN) |
|
2317 |
|
2318 atexit.register(cleanup) |
|
2319 safe_remove(TESTFN) |
|
2320 |
|
2321 def test_main(): |
|
2322 tests = [] |
|
2323 test_suite = unittest.TestSuite() |
|
2324 tests.append(TestSystemAPIs) |
|
2325 tests.append(TestProcess) |
|
2326 tests.append(TestFetchAllProcesses) |
|
2327 |
|
2328 if POSIX: |
|
2329 from _posix import PosixSpecificTestCase |
|
2330 tests.append(PosixSpecificTestCase) |
|
2331 |
|
2332 # import the specific platform test suite |
|
2333 if LINUX: |
|
2334 from _linux import LinuxSpecificTestCase as stc |
|
2335 elif WINDOWS: |
|
2336 from _windows import WindowsSpecificTestCase as stc |
|
2337 from _windows import TestDualProcessImplementation |
|
2338 tests.append(TestDualProcessImplementation) |
|
2339 elif OSX: |
|
2340 from _osx import OSXSpecificTestCase as stc |
|
2341 elif BSD: |
|
2342 from _bsd import BSDSpecificTestCase as stc |
|
2343 elif SUNOS: |
|
2344 from _sunos import SunOSSpecificTestCase as stc |
|
2345 tests.append(stc) |
|
2346 |
|
2347 if hasattr(os, 'getuid'): |
|
2348 if 'LimitedUserTestCase' in globals(): |
|
2349 tests.append(LimitedUserTestCase) |
|
2350 else: |
|
2351 register_warning("LimitedUserTestCase was skipped (super-user " |
|
2352 "privileges are required)") |
|
2353 |
|
2354 tests.append(TestExampleScripts) |
|
2355 |
|
2356 for test_class in tests: |
|
2357 test_suite.addTest(unittest.makeSuite(test_class)) |
|
2358 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
2359 return result.wasSuccessful() |
|
2360 |
|
2361 if __name__ == '__main__': |
|
2362 if not test_main(): |
|
2363 sys.exit(1) |