Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
michael@0 | 1 | #!/usr/bin/env python |
michael@0 | 2 | |
michael@0 | 3 | # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
michael@0 | 4 | # Use of this source code is governed by a BSD-style license that can be |
michael@0 | 5 | # found in the LICENSE file. |
michael@0 | 6 | |
michael@0 | 7 | """ |
michael@0 | 8 | psutil test suite (you can quickly run it with "python setup.py test"). |
michael@0 | 9 | |
michael@0 | 10 | Note: this is targeted for both python 2.x and 3.x so there's no need |
michael@0 | 11 | to use 2to3 tool first. |
michael@0 | 12 | |
michael@0 | 13 | If you're on Python < 2.7 it is recommended to install unittest2 module |
michael@0 | 14 | from: https://pypi.python.org/pypi/unittest2 |
michael@0 | 15 | """ |
michael@0 | 16 | |
michael@0 | 17 | from __future__ import division |
michael@0 | 18 | import os |
michael@0 | 19 | import sys |
michael@0 | 20 | import subprocess |
michael@0 | 21 | import time |
michael@0 | 22 | import signal |
michael@0 | 23 | import types |
michael@0 | 24 | import traceback |
michael@0 | 25 | import socket |
michael@0 | 26 | import warnings |
michael@0 | 27 | import atexit |
michael@0 | 28 | import errno |
michael@0 | 29 | import threading |
michael@0 | 30 | import tempfile |
michael@0 | 31 | import stat |
michael@0 | 32 | import collections |
michael@0 | 33 | import datetime |
michael@0 | 34 | try: |
michael@0 | 35 | import unittest2 as unittest # pyhon < 2.7 + unittest2 installed |
michael@0 | 36 | except ImportError: |
michael@0 | 37 | import unittest |
michael@0 | 38 | try: |
michael@0 | 39 | import ast # python >= 2.6 |
michael@0 | 40 | except ImportError: |
michael@0 | 41 | ast = None |
michael@0 | 42 | |
michael@0 | 43 | import psutil |
michael@0 | 44 | from psutil._compat import PY3, callable, long, wraps |
michael@0 | 45 | |
michael@0 | 46 | |
michael@0 | 47 | # =================================================================== |
michael@0 | 48 | # --- Constants |
michael@0 | 49 | # =================================================================== |
michael@0 | 50 | |
michael@0 | 51 | # conf for retry_before_failing() decorator |
michael@0 | 52 | NO_RETRIES = 10 |
michael@0 | 53 | # bytes tolerance for OS memory related tests |
michael@0 | 54 | TOLERANCE = 500 * 1024 # 500KB |
michael@0 | 55 | |
michael@0 | 56 | PYTHON = os.path.realpath(sys.executable) |
michael@0 | 57 | DEVNULL = open(os.devnull, 'r+') |
michael@0 | 58 | TESTFN = os.path.join(os.getcwd(), "$testfile") |
michael@0 | 59 | EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname( |
michael@0 | 60 | os.path.dirname(__file__)), 'examples')) |
michael@0 | 61 | POSIX = os.name == 'posix' |
michael@0 | 62 | LINUX = sys.platform.startswith("linux") |
michael@0 | 63 | WINDOWS = sys.platform.startswith("win32") |
michael@0 | 64 | OSX = sys.platform.startswith("darwin") |
michael@0 | 65 | BSD = sys.platform.startswith("freebsd") |
michael@0 | 66 | SUNOS = sys.platform.startswith("sunos") |
michael@0 | 67 | |
michael@0 | 68 | |
michael@0 | 69 | # =================================================================== |
michael@0 | 70 | # --- Utility functions |
michael@0 | 71 | # =================================================================== |
michael@0 | 72 | |
michael@0 | 73 | _subprocesses_started = set() |
michael@0 | 74 | |
michael@0 | 75 | def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL, |
michael@0 | 76 | wait=False): |
michael@0 | 77 | """Return a subprocess.Popen object to use in tests. |
michael@0 | 78 | By default stdout and stderr are redirected to /dev/null and the |
michael@0 | 79 | python interpreter is used as test process. |
michael@0 | 80 | If 'wait' is True attemps to make sure the process is in a |
michael@0 | 81 | reasonably initialized state. |
michael@0 | 82 | """ |
michael@0 | 83 | if cmd is None: |
michael@0 | 84 | pyline = "" |
michael@0 | 85 | if wait: |
michael@0 | 86 | pyline += "open(r'%s', 'w'); " % TESTFN |
michael@0 | 87 | pyline += "import time; time.sleep(2);" |
michael@0 | 88 | cmd_ = [PYTHON, "-c", pyline] |
michael@0 | 89 | else: |
michael@0 | 90 | cmd_ = cmd |
michael@0 | 91 | sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) |
michael@0 | 92 | if wait: |
michael@0 | 93 | if cmd is None: |
michael@0 | 94 | stop_at = time.time() + 3 |
michael@0 | 95 | while stop_at > time.time(): |
michael@0 | 96 | if os.path.exists(TESTFN): |
michael@0 | 97 | break |
michael@0 | 98 | time.sleep(0.001) |
michael@0 | 99 | else: |
michael@0 | 100 | warn("couldn't make sure test file was actually created") |
michael@0 | 101 | else: |
michael@0 | 102 | wait_for_pid(sproc.pid) |
michael@0 | 103 | _subprocesses_started.add(sproc.pid) |
michael@0 | 104 | return sproc |
michael@0 | 105 | |
michael@0 | 106 | def warn(msg): |
michael@0 | 107 | """Raise a warning msg.""" |
michael@0 | 108 | warnings.warn(msg, UserWarning) |
michael@0 | 109 | |
michael@0 | 110 | def register_warning(msg): |
michael@0 | 111 | """Register a warning which will be printed on interpreter exit.""" |
michael@0 | 112 | atexit.register(lambda: warn(msg)) |
michael@0 | 113 | |
michael@0 | 114 | def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): |
michael@0 | 115 | """run cmd in a subprocess and return its output. |
michael@0 | 116 | raises RuntimeError on error. |
michael@0 | 117 | """ |
michael@0 | 118 | p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) |
michael@0 | 119 | stdout, stderr = p.communicate() |
michael@0 | 120 | if p.returncode != 0: |
michael@0 | 121 | raise RuntimeError(stderr) |
michael@0 | 122 | if stderr: |
michael@0 | 123 | warn(stderr) |
michael@0 | 124 | if PY3: |
michael@0 | 125 | stdout = str(stdout, sys.stdout.encoding) |
michael@0 | 126 | return stdout.strip() |
michael@0 | 127 | |
michael@0 | 128 | def which(program): |
michael@0 | 129 | """Same as UNIX which command. Return None on command not found.""" |
michael@0 | 130 | def is_exe(fpath): |
michael@0 | 131 | return os.path.isfile(fpath) and os.access(fpath, os.X_OK) |
michael@0 | 132 | |
michael@0 | 133 | fpath, fname = os.path.split(program) |
michael@0 | 134 | if fpath: |
michael@0 | 135 | if is_exe(program): |
michael@0 | 136 | return program |
michael@0 | 137 | else: |
michael@0 | 138 | for path in os.environ["PATH"].split(os.pathsep): |
michael@0 | 139 | exe_file = os.path.join(path, program) |
michael@0 | 140 | if is_exe(exe_file): |
michael@0 | 141 | return exe_file |
michael@0 | 142 | return None |
michael@0 | 143 | |
michael@0 | 144 | def wait_for_pid(pid, timeout=1): |
michael@0 | 145 | """Wait for pid to show up in the process list then return. |
michael@0 | 146 | Used in the test suite to give time the sub process to initialize. |
michael@0 | 147 | """ |
michael@0 | 148 | raise_at = time.time() + timeout |
michael@0 | 149 | while 1: |
michael@0 | 150 | if pid in psutil.get_pid_list(): |
michael@0 | 151 | # give it one more iteration to allow full initialization |
michael@0 | 152 | time.sleep(0.01) |
michael@0 | 153 | return |
michael@0 | 154 | time.sleep(0.0001) |
michael@0 | 155 | if time.time() >= raise_at: |
michael@0 | 156 | raise RuntimeError("Timed out") |
michael@0 | 157 | |
michael@0 | 158 | def reap_children(search_all=False): |
michael@0 | 159 | """Kill any subprocess started by this test suite and ensure that |
michael@0 | 160 | no zombies stick around to hog resources and create problems when |
michael@0 | 161 | looking for refleaks. |
michael@0 | 162 | """ |
michael@0 | 163 | pids = _subprocesses_started |
michael@0 | 164 | if search_all: |
michael@0 | 165 | this_process = psutil.Process(os.getpid()) |
michael@0 | 166 | for p in this_process.get_children(recursive=True): |
michael@0 | 167 | pids.add(p.pid) |
michael@0 | 168 | while pids: |
michael@0 | 169 | pid = pids.pop() |
michael@0 | 170 | try: |
michael@0 | 171 | child = psutil.Process(pid) |
michael@0 | 172 | child.kill() |
michael@0 | 173 | except psutil.NoSuchProcess: |
michael@0 | 174 | pass |
michael@0 | 175 | except psutil.AccessDenied: |
michael@0 | 176 | warn("couldn't kill child process with pid %s" % pid) |
michael@0 | 177 | else: |
michael@0 | 178 | child.wait(timeout=3) |
michael@0 | 179 | |
michael@0 | 180 | def check_ip_address(addr, family): |
michael@0 | 181 | """Attempts to check IP address's validity.""" |
michael@0 | 182 | if not addr: |
michael@0 | 183 | return |
michael@0 | 184 | ip, port = addr |
michael@0 | 185 | assert isinstance(port, int), port |
michael@0 | 186 | if family == socket.AF_INET: |
michael@0 | 187 | ip = list(map(int, ip.split('.'))) |
michael@0 | 188 | assert len(ip) == 4, ip |
michael@0 | 189 | for num in ip: |
michael@0 | 190 | assert 0 <= num <= 255, ip |
michael@0 | 191 | assert 0 <= port <= 65535, port |
michael@0 | 192 | |
michael@0 | 193 | def safe_remove(fname): |
michael@0 | 194 | """Deletes a file and does not exception if it doesn't exist.""" |
michael@0 | 195 | try: |
michael@0 | 196 | os.remove(fname) |
michael@0 | 197 | except OSError: |
michael@0 | 198 | err = sys.exc_info()[1] |
michael@0 | 199 | if err.args[0] != errno.ENOENT: |
michael@0 | 200 | raise |
michael@0 | 201 | |
michael@0 | 202 | def call_until(fun, expr, timeout=1): |
michael@0 | 203 | """Keep calling function for timeout secs and exit if eval() |
michael@0 | 204 | expression is True. |
michael@0 | 205 | """ |
michael@0 | 206 | stop_at = time.time() + timeout |
michael@0 | 207 | while time.time() < stop_at: |
michael@0 | 208 | ret = fun() |
michael@0 | 209 | if eval(expr): |
michael@0 | 210 | return ret |
michael@0 | 211 | time.sleep(0.001) |
michael@0 | 212 | raise RuntimeError('timed out (ret=%r)' % ret) |
michael@0 | 213 | |
michael@0 | 214 | def retry_before_failing(ntimes=None): |
michael@0 | 215 | """Decorator which runs a test function and retries N times before |
michael@0 | 216 | actually failing. |
michael@0 | 217 | """ |
michael@0 | 218 | def decorator(fun): |
michael@0 | 219 | @wraps(fun) |
michael@0 | 220 | def wrapper(*args, **kwargs): |
michael@0 | 221 | for x in range(ntimes or NO_RETRIES): |
michael@0 | 222 | try: |
michael@0 | 223 | return fun(*args, **kwargs) |
michael@0 | 224 | except AssertionError: |
michael@0 | 225 | err = sys.exc_info()[1] |
michael@0 | 226 | raise |
michael@0 | 227 | return wrapper |
michael@0 | 228 | return decorator |
michael@0 | 229 | |
michael@0 | 230 | def skip_on_access_denied(only_if=None): |
michael@0 | 231 | """Decorator to Ignore AccessDenied exceptions.""" |
michael@0 | 232 | def decorator(fun): |
michael@0 | 233 | @wraps(fun) |
michael@0 | 234 | def wrapper(*args, **kwargs): |
michael@0 | 235 | try: |
michael@0 | 236 | return fun(*args, **kwargs) |
michael@0 | 237 | except psutil.AccessDenied: |
michael@0 | 238 | if only_if is not None: |
michael@0 | 239 | if not only_if: |
michael@0 | 240 | raise |
michael@0 | 241 | msg = "%r was skipped because it raised AccessDenied" \ |
michael@0 | 242 | % fun.__name__ |
michael@0 | 243 | self = args[0] |
michael@0 | 244 | if hasattr(self, 'skip'): # python >= 2.7 |
michael@0 | 245 | self.skip(msg) |
michael@0 | 246 | else: |
michael@0 | 247 | register_warning(msg) |
michael@0 | 248 | return wrapper |
michael@0 | 249 | return decorator |
michael@0 | 250 | |
michael@0 | 251 | def skip_on_not_implemented(only_if=None): |
michael@0 | 252 | """Decorator to Ignore NotImplementedError exceptions.""" |
michael@0 | 253 | def decorator(fun): |
michael@0 | 254 | @wraps(fun) |
michael@0 | 255 | def wrapper(*args, **kwargs): |
michael@0 | 256 | try: |
michael@0 | 257 | return fun(*args, **kwargs) |
michael@0 | 258 | except NotImplementedError: |
michael@0 | 259 | if only_if is not None: |
michael@0 | 260 | if not only_if: |
michael@0 | 261 | raise |
michael@0 | 262 | msg = "%r was skipped because it raised NotImplementedError" \ |
michael@0 | 263 | % fun.__name__ |
michael@0 | 264 | self = args[0] |
michael@0 | 265 | if hasattr(self, 'skip'): # python >= 2.7 |
michael@0 | 266 | self.skip(msg) |
michael@0 | 267 | else: |
michael@0 | 268 | register_warning(msg) |
michael@0 | 269 | return wrapper |
michael@0 | 270 | return decorator |
michael@0 | 271 | |
michael@0 | 272 | def supports_ipv6(): |
michael@0 | 273 | """Return True if IPv6 is supported on this platform.""" |
michael@0 | 274 | if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
michael@0 | 275 | return False |
michael@0 | 276 | sock = None |
michael@0 | 277 | try: |
michael@0 | 278 | try: |
michael@0 | 279 | sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
michael@0 | 280 | sock.bind(("::1", 0)) |
michael@0 | 281 | except (socket.error, socket.gaierror): |
michael@0 | 282 | return False |
michael@0 | 283 | else: |
michael@0 | 284 | return True |
michael@0 | 285 | finally: |
michael@0 | 286 | if sock is not None: |
michael@0 | 287 | sock.close() |
michael@0 | 288 | |
michael@0 | 289 | |
michael@0 | 290 | class ThreadTask(threading.Thread): |
michael@0 | 291 | """A thread object used for running process thread tests.""" |
michael@0 | 292 | |
michael@0 | 293 | def __init__(self): |
michael@0 | 294 | threading.Thread.__init__(self) |
michael@0 | 295 | self._running = False |
michael@0 | 296 | self._interval = None |
michael@0 | 297 | self._flag = threading.Event() |
michael@0 | 298 | |
michael@0 | 299 | def __repr__(self): |
michael@0 | 300 | name = self.__class__.__name__ |
michael@0 | 301 | return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
michael@0 | 302 | |
michael@0 | 303 | def start(self, interval=0.001): |
michael@0 | 304 | """Start thread and keep it running until an explicit |
michael@0 | 305 | stop() request. Polls for shutdown every 'timeout' seconds. |
michael@0 | 306 | """ |
michael@0 | 307 | if self._running: |
michael@0 | 308 | raise ValueError("already started") |
michael@0 | 309 | self._interval = interval |
michael@0 | 310 | threading.Thread.start(self) |
michael@0 | 311 | self._flag.wait() |
michael@0 | 312 | |
michael@0 | 313 | def run(self): |
michael@0 | 314 | self._running = True |
michael@0 | 315 | self._flag.set() |
michael@0 | 316 | while self._running: |
michael@0 | 317 | time.sleep(self._interval) |
michael@0 | 318 | |
michael@0 | 319 | def stop(self): |
michael@0 | 320 | """Stop thread execution and and waits until it is stopped.""" |
michael@0 | 321 | if not self._running: |
michael@0 | 322 | raise ValueError("already stopped") |
michael@0 | 323 | self._running = False |
michael@0 | 324 | self.join() |
michael@0 | 325 | |
michael@0 | 326 | |
michael@0 | 327 | # =================================================================== |
michael@0 | 328 | # --- Support for python < 2.7 in case unittest2 is not installed |
michael@0 | 329 | # =================================================================== |
michael@0 | 330 | |
michael@0 | 331 | if not hasattr(unittest, 'skip'): |
michael@0 | 332 | register_warning("unittest2 module is not installed; a serie of pretty " \ |
michael@0 | 333 | "darn ugly workarounds will be used") |
michael@0 | 334 | |
michael@0 | 335 | class SkipTest(Exception): |
michael@0 | 336 | pass |
michael@0 | 337 | |
michael@0 | 338 | class TestCase(unittest.TestCase): |
michael@0 | 339 | |
michael@0 | 340 | def _safe_repr(self, obj): |
michael@0 | 341 | MAX_LENGTH = 80 |
michael@0 | 342 | try: |
michael@0 | 343 | result = repr(obj) |
michael@0 | 344 | except Exception: |
michael@0 | 345 | result = object.__repr__(obj) |
michael@0 | 346 | if len(result) < MAX_LENGTH: |
michael@0 | 347 | return result |
michael@0 | 348 | return result[:MAX_LENGTH] + ' [truncated]...' |
michael@0 | 349 | |
michael@0 | 350 | def _fail_w_msg(self, a, b, middle, msg): |
michael@0 | 351 | self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle, |
michael@0 | 352 | self._safe_repr(b))) |
michael@0 | 353 | |
michael@0 | 354 | def skip(self, msg): |
michael@0 | 355 | raise SkipTest(msg) |
michael@0 | 356 | |
michael@0 | 357 | def assertIn(self, a, b, msg=None): |
michael@0 | 358 | if a not in b: |
michael@0 | 359 | self._fail_w_msg(a, b, 'not found in', msg) |
michael@0 | 360 | |
michael@0 | 361 | def assertNotIn(self, a, b, msg=None): |
michael@0 | 362 | if a in b: |
michael@0 | 363 | self._fail_w_msg(a, b, 'found in', msg) |
michael@0 | 364 | |
michael@0 | 365 | def assertGreater(self, a, b, msg=None): |
michael@0 | 366 | if not a > b: |
michael@0 | 367 | self._fail_w_msg(a, b, 'not greater than', msg) |
michael@0 | 368 | |
michael@0 | 369 | def assertGreaterEqual(self, a, b, msg=None): |
michael@0 | 370 | if not a >= b: |
michael@0 | 371 | self._fail_w_msg(a, b, 'not greater than or equal to', msg) |
michael@0 | 372 | |
michael@0 | 373 | def assertLess(self, a, b, msg=None): |
michael@0 | 374 | if not a < b: |
michael@0 | 375 | self._fail_w_msg(a, b, 'not less than', msg) |
michael@0 | 376 | |
michael@0 | 377 | def assertLessEqual(self, a, b, msg=None): |
michael@0 | 378 | if not a <= b: |
michael@0 | 379 | self._fail_w_msg(a, b, 'not less or equal to', msg) |
michael@0 | 380 | |
michael@0 | 381 | def assertIsInstance(self, a, b, msg=None): |
michael@0 | 382 | if not isinstance(a, b): |
michael@0 | 383 | self.fail(msg or '%s is not an instance of %r' \ |
michael@0 | 384 | % (self._safe_repr(a), b)) |
michael@0 | 385 | |
michael@0 | 386 | def assertAlmostEqual(self, a, b, msg=None, delta=None): |
michael@0 | 387 | if delta is not None: |
michael@0 | 388 | if abs(a - b) <= delta: |
michael@0 | 389 | return |
michael@0 | 390 | self.fail(msg or '%s != %s within %s delta' \ |
michael@0 | 391 | % (self._safe_repr(a), self._safe_repr(b), |
michael@0 | 392 | self._safe_repr(delta))) |
michael@0 | 393 | else: |
michael@0 | 394 | self.assertEqual(a, b, msg=msg) |
michael@0 | 395 | |
michael@0 | 396 | |
michael@0 | 397 | def skipIf(condition, reason): |
michael@0 | 398 | def decorator(fun): |
michael@0 | 399 | @wraps(fun) |
michael@0 | 400 | def wrapper(*args, **kwargs): |
michael@0 | 401 | self = args[0] |
michael@0 | 402 | if condition: |
michael@0 | 403 | sys.stdout.write("skipped-") |
michael@0 | 404 | sys.stdout.flush() |
michael@0 | 405 | if warn: |
michael@0 | 406 | objname = "%s.%s" % (self.__class__.__name__, |
michael@0 | 407 | fun.__name__) |
michael@0 | 408 | msg = "%s was skipped" % objname |
michael@0 | 409 | if reason: |
michael@0 | 410 | msg += "; reason: " + repr(reason) |
michael@0 | 411 | register_warning(msg) |
michael@0 | 412 | return |
michael@0 | 413 | else: |
michael@0 | 414 | return fun(*args, **kwargs) |
michael@0 | 415 | return wrapper |
michael@0 | 416 | return decorator |
michael@0 | 417 | |
michael@0 | 418 | def skipUnless(condition, reason): |
michael@0 | 419 | if not condition: |
michael@0 | 420 | return unittest.skipIf(True, reason) |
michael@0 | 421 | return unittest.skipIf(False, reason) |
michael@0 | 422 | |
michael@0 | 423 | unittest.TestCase = TestCase |
michael@0 | 424 | unittest.skipIf = skipIf |
michael@0 | 425 | unittest.skipUnless = skipUnless |
michael@0 | 426 | del TestCase, skipIf, skipUnless |
michael@0 | 427 | |
michael@0 | 428 | |
michael@0 | 429 | # =================================================================== |
michael@0 | 430 | # --- System-related API tests |
michael@0 | 431 | # =================================================================== |
michael@0 | 432 | |
michael@0 | 433 | class TestSystemAPIs(unittest.TestCase): |
michael@0 | 434 | """Tests for system-related APIs.""" |
michael@0 | 435 | |
michael@0 | 436 | def setUp(self): |
michael@0 | 437 | safe_remove(TESTFN) |
michael@0 | 438 | |
michael@0 | 439 | def tearDown(self): |
michael@0 | 440 | reap_children() |
michael@0 | 441 | |
michael@0 | 442 | def test_process_iter(self): |
michael@0 | 443 | self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) |
michael@0 | 444 | sproc = get_test_subprocess() |
michael@0 | 445 | self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
michael@0 | 446 | p = psutil.Process(sproc.pid) |
michael@0 | 447 | p.kill() |
michael@0 | 448 | p.wait() |
michael@0 | 449 | self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
michael@0 | 450 | |
michael@0 | 451 | def test_TOTAL_PHYMEM(self): |
michael@0 | 452 | x = psutil.TOTAL_PHYMEM |
michael@0 | 453 | self.assertIsInstance(x, (int, long)) |
michael@0 | 454 | self.assertGreater(x, 0) |
michael@0 | 455 | self.assertEqual(x, psutil.virtual_memory().total) |
michael@0 | 456 | |
michael@0 | 457 | def test_BOOT_TIME(self, arg=None): |
michael@0 | 458 | x = arg or psutil.BOOT_TIME |
michael@0 | 459 | self.assertIsInstance(x, float) |
michael@0 | 460 | self.assertGreater(x, 0) |
michael@0 | 461 | self.assertLess(x, time.time()) |
michael@0 | 462 | |
michael@0 | 463 | def test_get_boot_time(self): |
michael@0 | 464 | self.test_BOOT_TIME(psutil.get_boot_time()) |
michael@0 | 465 | if WINDOWS: |
michael@0 | 466 | # work around float precision issues; give it 1 secs tolerance |
michael@0 | 467 | diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME) |
michael@0 | 468 | self.assertLess(diff, 1) |
michael@0 | 469 | else: |
michael@0 | 470 | self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME) |
michael@0 | 471 | |
michael@0 | 472 | def test_NUM_CPUS(self): |
michael@0 | 473 | self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True))) |
michael@0 | 474 | self.assertGreaterEqual(psutil.NUM_CPUS, 1) |
michael@0 | 475 | |
michael@0 | 476 | @unittest.skipUnless(POSIX, 'posix only') |
michael@0 | 477 | def test_PAGESIZE(self): |
michael@0 | 478 | # pagesize is used internally to perform different calculations |
michael@0 | 479 | # and it's determined by using SC_PAGE_SIZE; make sure |
michael@0 | 480 | # getpagesize() returns the same value. |
michael@0 | 481 | import resource |
michael@0 | 482 | self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) |
michael@0 | 483 | |
michael@0 | 484 | def test_deprecated_apis(self): |
michael@0 | 485 | s = socket.socket() |
michael@0 | 486 | s.bind(('localhost', 0)) |
michael@0 | 487 | s.listen(1) |
michael@0 | 488 | warnings.filterwarnings("error") |
michael@0 | 489 | p = psutil.Process(os.getpid()) |
michael@0 | 490 | try: |
michael@0 | 491 | self.assertRaises(DeprecationWarning, psutil.virtmem_usage) |
michael@0 | 492 | self.assertRaises(DeprecationWarning, psutil.used_phymem) |
michael@0 | 493 | self.assertRaises(DeprecationWarning, psutil.avail_phymem) |
michael@0 | 494 | self.assertRaises(DeprecationWarning, psutil.total_virtmem) |
michael@0 | 495 | self.assertRaises(DeprecationWarning, psutil.used_virtmem) |
michael@0 | 496 | self.assertRaises(DeprecationWarning, psutil.avail_virtmem) |
michael@0 | 497 | self.assertRaises(DeprecationWarning, psutil.phymem_usage) |
michael@0 | 498 | self.assertRaises(DeprecationWarning, psutil.get_process_list) |
michael@0 | 499 | self.assertRaises(DeprecationWarning, psutil.network_io_counters) |
michael@0 | 500 | if LINUX: |
michael@0 | 501 | self.assertRaises(DeprecationWarning, psutil.phymem_buffers) |
michael@0 | 502 | self.assertRaises(DeprecationWarning, psutil.cached_phymem) |
michael@0 | 503 | try: |
michael@0 | 504 | p.nice |
michael@0 | 505 | except DeprecationWarning: |
michael@0 | 506 | pass |
michael@0 | 507 | else: |
michael@0 | 508 | self.fail("p.nice didn't raise DeprecationWarning") |
michael@0 | 509 | ret = call_until(p.get_connections, "len(ret) != 0", timeout=1) |
michael@0 | 510 | self.assertRaises(DeprecationWarning, |
michael@0 | 511 | getattr, ret[0], 'local_address') |
michael@0 | 512 | self.assertRaises(DeprecationWarning, |
michael@0 | 513 | getattr, ret[0], 'remote_address') |
michael@0 | 514 | finally: |
michael@0 | 515 | s.close() |
michael@0 | 516 | warnings.resetwarnings() |
michael@0 | 517 | |
michael@0 | 518 | def test_deprecated_apis_retval(self): |
michael@0 | 519 | warnings.filterwarnings("ignore") |
michael@0 | 520 | p = psutil.Process(os.getpid()) |
michael@0 | 521 | try: |
michael@0 | 522 | self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total) |
michael@0 | 523 | self.assertEqual(p.nice, p.get_nice()) |
michael@0 | 524 | finally: |
michael@0 | 525 | warnings.resetwarnings() |
michael@0 | 526 | |
michael@0 | 527 | def test_virtual_memory(self): |
michael@0 | 528 | mem = psutil.virtual_memory() |
michael@0 | 529 | assert mem.total > 0, mem |
michael@0 | 530 | assert mem.available > 0, mem |
michael@0 | 531 | assert 0 <= mem.percent <= 100, mem |
michael@0 | 532 | assert mem.used > 0, mem |
michael@0 | 533 | assert mem.free >= 0, mem |
michael@0 | 534 | for name in mem._fields: |
michael@0 | 535 | if name != 'total': |
michael@0 | 536 | value = getattr(mem, name) |
michael@0 | 537 | if not value >= 0: |
michael@0 | 538 | self.fail("%r < 0 (%s)" % (name, value)) |
michael@0 | 539 | if value > mem.total: |
michael@0 | 540 | self.fail("%r > total (total=%s, %s=%s)" \ |
michael@0 | 541 | % (name, mem.total, name, value)) |
michael@0 | 542 | |
michael@0 | 543 | def test_swap_memory(self): |
michael@0 | 544 | mem = psutil.swap_memory() |
michael@0 | 545 | assert mem.total >= 0, mem |
michael@0 | 546 | assert mem.used >= 0, mem |
michael@0 | 547 | assert mem.free > 0, mem |
michael@0 | 548 | assert 0 <= mem.percent <= 100, mem |
michael@0 | 549 | assert mem.sin >= 0, mem |
michael@0 | 550 | assert mem.sout >= 0, mem |
michael@0 | 551 | |
michael@0 | 552 | def test_pid_exists(self): |
michael@0 | 553 | sproc = get_test_subprocess(wait=True) |
michael@0 | 554 | assert psutil.pid_exists(sproc.pid) |
michael@0 | 555 | p = psutil.Process(sproc.pid) |
michael@0 | 556 | p.kill() |
michael@0 | 557 | p.wait() |
michael@0 | 558 | self.assertFalse(psutil.pid_exists(sproc.pid)) |
michael@0 | 559 | self.assertFalse(psutil.pid_exists(-1)) |
michael@0 | 560 | |
michael@0 | 561 | def test_pid_exists_2(self): |
michael@0 | 562 | reap_children() |
michael@0 | 563 | pids = psutil.get_pid_list() |
michael@0 | 564 | for pid in pids: |
michael@0 | 565 | try: |
michael@0 | 566 | assert psutil.pid_exists(pid) |
michael@0 | 567 | except AssertionError: |
michael@0 | 568 | # in case the process disappeared in meantime fail only |
michael@0 | 569 | # if it is no longer in get_pid_list() |
michael@0 | 570 | time.sleep(.1) |
michael@0 | 571 | if pid in psutil.get_pid_list(): |
michael@0 | 572 | self.fail(pid) |
michael@0 | 573 | pids = range(max(pids) + 5000, max(pids) + 6000) |
michael@0 | 574 | for pid in pids: |
michael@0 | 575 | self.assertFalse(psutil.pid_exists(pid)) |
michael@0 | 576 | |
michael@0 | 577 | def test_get_pid_list(self): |
michael@0 | 578 | plist = [x.pid for x in psutil.process_iter()] |
michael@0 | 579 | pidlist = psutil.get_pid_list() |
michael@0 | 580 | self.assertEqual(plist.sort(), pidlist.sort()) |
michael@0 | 581 | # make sure every pid is unique |
michael@0 | 582 | self.assertEqual(len(pidlist), len(set(pidlist))) |
michael@0 | 583 | |
michael@0 | 584 | def test_test(self): |
michael@0 | 585 | # test for psutil.test() function |
michael@0 | 586 | stdout = sys.stdout |
michael@0 | 587 | sys.stdout = DEVNULL |
michael@0 | 588 | try: |
michael@0 | 589 | psutil.test() |
michael@0 | 590 | finally: |
michael@0 | 591 | sys.stdout = stdout |
michael@0 | 592 | |
michael@0 | 593 | def test_sys_cpu_times(self): |
michael@0 | 594 | total = 0 |
michael@0 | 595 | times = psutil.cpu_times() |
michael@0 | 596 | sum(times) |
michael@0 | 597 | for cp_time in times: |
michael@0 | 598 | self.assertIsInstance(cp_time, float) |
michael@0 | 599 | self.assertGreaterEqual(cp_time, 0.0) |
michael@0 | 600 | total += cp_time |
michael@0 | 601 | self.assertEqual(total, sum(times)) |
michael@0 | 602 | str(times) |
michael@0 | 603 | |
michael@0 | 604 | def test_sys_cpu_times2(self): |
michael@0 | 605 | t1 = sum(psutil.cpu_times()) |
michael@0 | 606 | time.sleep(0.1) |
michael@0 | 607 | t2 = sum(psutil.cpu_times()) |
michael@0 | 608 | difference = t2 - t1 |
michael@0 | 609 | if not difference >= 0.05: |
michael@0 | 610 | self.fail("difference %s" % difference) |
michael@0 | 611 | |
michael@0 | 612 | def test_sys_per_cpu_times(self): |
michael@0 | 613 | for times in psutil.cpu_times(percpu=True): |
michael@0 | 614 | total = 0 |
michael@0 | 615 | sum(times) |
michael@0 | 616 | for cp_time in times: |
michael@0 | 617 | self.assertIsInstance(cp_time, float) |
michael@0 | 618 | self.assertGreaterEqual(cp_time, 0.0) |
michael@0 | 619 | total += cp_time |
michael@0 | 620 | self.assertEqual(total, sum(times)) |
michael@0 | 621 | str(times) |
michael@0 | 622 | self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), |
michael@0 | 623 | len(psutil.cpu_times(percpu=False))) |
michael@0 | 624 | |
michael@0 | 625 | def test_sys_per_cpu_times2(self): |
michael@0 | 626 | tot1 = psutil.cpu_times(percpu=True) |
michael@0 | 627 | stop_at = time.time() + 0.1 |
michael@0 | 628 | while 1: |
michael@0 | 629 | if time.time() >= stop_at: |
michael@0 | 630 | break |
michael@0 | 631 | tot2 = psutil.cpu_times(percpu=True) |
michael@0 | 632 | for t1, t2 in zip(tot1, tot2): |
michael@0 | 633 | t1, t2 = sum(t1), sum(t2) |
michael@0 | 634 | difference = t2 - t1 |
michael@0 | 635 | if difference >= 0.05: |
michael@0 | 636 | return |
michael@0 | 637 | self.fail() |
michael@0 | 638 | |
michael@0 | 639 | def _test_cpu_percent(self, percent): |
michael@0 | 640 | self.assertIsInstance(percent, float) |
michael@0 | 641 | self.assertGreaterEqual(percent, 0.0) |
michael@0 | 642 | self.assertLessEqual(percent, 100.0) |
michael@0 | 643 | |
michael@0 | 644 | def test_sys_cpu_percent(self): |
michael@0 | 645 | psutil.cpu_percent(interval=0.001) |
michael@0 | 646 | for x in range(1000): |
michael@0 | 647 | self._test_cpu_percent(psutil.cpu_percent(interval=None)) |
michael@0 | 648 | |
michael@0 | 649 | def test_sys_per_cpu_percent(self): |
michael@0 | 650 | self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)), |
michael@0 | 651 | psutil.NUM_CPUS) |
michael@0 | 652 | for x in range(1000): |
michael@0 | 653 | percents = psutil.cpu_percent(interval=None, percpu=True) |
michael@0 | 654 | for percent in percents: |
michael@0 | 655 | self._test_cpu_percent(percent) |
michael@0 | 656 | |
michael@0 | 657 | def test_sys_cpu_times_percent(self): |
michael@0 | 658 | psutil.cpu_times_percent(interval=0.001) |
michael@0 | 659 | for x in range(1000): |
michael@0 | 660 | cpu = psutil.cpu_times_percent(interval=None) |
michael@0 | 661 | for percent in cpu: |
michael@0 | 662 | self._test_cpu_percent(percent) |
michael@0 | 663 | self._test_cpu_percent(sum(cpu)) |
michael@0 | 664 | |
michael@0 | 665 | def test_sys_per_cpu_times_percent(self): |
michael@0 | 666 | self.assertEqual(len(psutil.cpu_times_percent(interval=0.001, |
michael@0 | 667 | percpu=True)), |
michael@0 | 668 | psutil.NUM_CPUS) |
michael@0 | 669 | for x in range(1000): |
michael@0 | 670 | cpus = psutil.cpu_times_percent(interval=None, percpu=True) |
michael@0 | 671 | for cpu in cpus: |
michael@0 | 672 | for percent in cpu: |
michael@0 | 673 | self._test_cpu_percent(percent) |
michael@0 | 674 | self._test_cpu_percent(sum(cpu)) |
michael@0 | 675 | |
michael@0 | 676 | @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
michael@0 | 677 | "os.statvfs() function not available on this platform") |
michael@0 | 678 | def test_disk_usage(self): |
michael@0 | 679 | usage = psutil.disk_usage(os.getcwd()) |
michael@0 | 680 | assert usage.total > 0, usage |
michael@0 | 681 | assert usage.used > 0, usage |
michael@0 | 682 | assert usage.free > 0, usage |
michael@0 | 683 | assert usage.total > usage.used, usage |
michael@0 | 684 | assert usage.total > usage.free, usage |
michael@0 | 685 | assert 0 <= usage.percent <= 100, usage.percent |
michael@0 | 686 | |
michael@0 | 687 | # if path does not exist OSError ENOENT is expected across |
michael@0 | 688 | # all platforms |
michael@0 | 689 | fname = tempfile.mktemp() |
michael@0 | 690 | try: |
michael@0 | 691 | psutil.disk_usage(fname) |
michael@0 | 692 | except OSError: |
michael@0 | 693 | err = sys.exc_info()[1] |
michael@0 | 694 | if err.args[0] != errno.ENOENT: |
michael@0 | 695 | raise |
michael@0 | 696 | else: |
michael@0 | 697 | self.fail("OSError not raised") |
michael@0 | 698 | |
michael@0 | 699 | @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
michael@0 | 700 | "os.statvfs() function not available on this platform") |
michael@0 | 701 | def test_disk_partitions(self): |
michael@0 | 702 | # all = False |
michael@0 | 703 | for disk in psutil.disk_partitions(all=False): |
michael@0 | 704 | if WINDOWS and 'cdrom' in disk.opts: |
michael@0 | 705 | continue |
michael@0 | 706 | if not POSIX: |
michael@0 | 707 | assert os.path.exists(disk.device), disk |
michael@0 | 708 | else: |
michael@0 | 709 | # we cannot make any assumption about this, see: |
michael@0 | 710 | # http://goo.gl/p9c43 |
michael@0 | 711 | disk.device |
michael@0 | 712 | if SUNOS: |
michael@0 | 713 | # on solaris apparently mount points can also be files |
michael@0 | 714 | assert os.path.exists(disk.mountpoint), disk |
michael@0 | 715 | else: |
michael@0 | 716 | assert os.path.isdir(disk.mountpoint), disk |
michael@0 | 717 | assert disk.fstype, disk |
michael@0 | 718 | self.assertIsInstance(disk.opts, str) |
michael@0 | 719 | |
michael@0 | 720 | # all = True |
michael@0 | 721 | for disk in psutil.disk_partitions(all=True): |
michael@0 | 722 | if not WINDOWS: |
michael@0 | 723 | try: |
michael@0 | 724 | os.stat(disk.mountpoint) |
michael@0 | 725 | except OSError: |
michael@0 | 726 | # http://mail.python.org/pipermail/python-dev/2012-June/120787.html |
michael@0 | 727 | err = sys.exc_info()[1] |
michael@0 | 728 | if err.errno not in (errno.EPERM, errno.EACCES): |
michael@0 | 729 | raise |
michael@0 | 730 | else: |
michael@0 | 731 | if SUNOS: |
michael@0 | 732 | # on solaris apparently mount points can also be files |
michael@0 | 733 | assert os.path.exists(disk.mountpoint), disk |
michael@0 | 734 | else: |
michael@0 | 735 | assert os.path.isdir(disk.mountpoint), disk |
michael@0 | 736 | self.assertIsInstance(disk.fstype, str) |
michael@0 | 737 | self.assertIsInstance(disk.opts, str) |
michael@0 | 738 | |
michael@0 | 739 | def find_mount_point(path): |
michael@0 | 740 | path = os.path.abspath(path) |
michael@0 | 741 | while not os.path.ismount(path): |
michael@0 | 742 | path = os.path.dirname(path) |
michael@0 | 743 | return path |
michael@0 | 744 | |
michael@0 | 745 | mount = find_mount_point(__file__) |
michael@0 | 746 | mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
michael@0 | 747 | self.assertIn(mount, mounts) |
michael@0 | 748 | psutil.disk_usage(mount) |
michael@0 | 749 | |
michael@0 | 750 | def test_net_io_counters(self): |
michael@0 | 751 | def check_ntuple(nt): |
michael@0 | 752 | self.assertEqual(nt[0], nt.bytes_sent) |
michael@0 | 753 | self.assertEqual(nt[1], nt.bytes_recv) |
michael@0 | 754 | self.assertEqual(nt[2], nt.packets_sent) |
michael@0 | 755 | self.assertEqual(nt[3], nt.packets_recv) |
michael@0 | 756 | self.assertEqual(nt[4], nt.errin) |
michael@0 | 757 | self.assertEqual(nt[5], nt.errout) |
michael@0 | 758 | self.assertEqual(nt[6], nt.dropin) |
michael@0 | 759 | self.assertEqual(nt[7], nt.dropout) |
michael@0 | 760 | assert nt.bytes_sent >= 0, nt |
michael@0 | 761 | assert nt.bytes_recv >= 0, nt |
michael@0 | 762 | assert nt.packets_sent >= 0, nt |
michael@0 | 763 | assert nt.packets_recv >= 0, nt |
michael@0 | 764 | assert nt.errin >= 0, nt |
michael@0 | 765 | assert nt.errout >= 0, nt |
michael@0 | 766 | assert nt.dropin >= 0, nt |
michael@0 | 767 | assert nt.dropout >= 0, nt |
michael@0 | 768 | |
michael@0 | 769 | ret = psutil.net_io_counters(pernic=False) |
michael@0 | 770 | check_ntuple(ret) |
michael@0 | 771 | ret = psutil.net_io_counters(pernic=True) |
michael@0 | 772 | assert ret != [] |
michael@0 | 773 | for key in ret: |
michael@0 | 774 | assert key |
michael@0 | 775 | check_ntuple(ret[key]) |
michael@0 | 776 | |
michael@0 | 777 | def test_disk_io_counters(self): |
michael@0 | 778 | def check_ntuple(nt): |
michael@0 | 779 | self.assertEqual(nt[0], nt.read_count) |
michael@0 | 780 | self.assertEqual(nt[1], nt.write_count) |
michael@0 | 781 | self.assertEqual(nt[2], nt.read_bytes) |
michael@0 | 782 | self.assertEqual(nt[3], nt.write_bytes) |
michael@0 | 783 | self.assertEqual(nt[4], nt.read_time) |
michael@0 | 784 | self.assertEqual(nt[5], nt.write_time) |
michael@0 | 785 | assert nt.read_count >= 0, nt |
michael@0 | 786 | assert nt.write_count >= 0, nt |
michael@0 | 787 | assert nt.read_bytes >= 0, nt |
michael@0 | 788 | assert nt.write_bytes >= 0, nt |
michael@0 | 789 | assert nt.read_time >= 0, nt |
michael@0 | 790 | assert nt.write_time >= 0, nt |
michael@0 | 791 | |
michael@0 | 792 | ret = psutil.disk_io_counters(perdisk=False) |
michael@0 | 793 | check_ntuple(ret) |
michael@0 | 794 | ret = psutil.disk_io_counters(perdisk=True) |
michael@0 | 795 | # make sure there are no duplicates |
michael@0 | 796 | self.assertEqual(len(ret), len(set(ret))) |
michael@0 | 797 | for key in ret: |
michael@0 | 798 | assert key, key |
michael@0 | 799 | check_ntuple(ret[key]) |
michael@0 | 800 | if LINUX and key[-1].isdigit(): |
michael@0 | 801 | # if 'sda1' is listed 'sda' shouldn't, see: |
michael@0 | 802 | # http://code.google.com/p/psutil/issues/detail?id=338 |
michael@0 | 803 | while key[-1].isdigit(): |
michael@0 | 804 | key = key[:-1] |
michael@0 | 805 | self.assertNotIn(key, ret.keys()) |
michael@0 | 806 | |
michael@0 | 807 | def test_get_users(self): |
michael@0 | 808 | users = psutil.get_users() |
michael@0 | 809 | assert users |
michael@0 | 810 | for user in users: |
michael@0 | 811 | assert user.name, user |
michael@0 | 812 | user.terminal |
michael@0 | 813 | user.host |
michael@0 | 814 | assert user.started > 0.0, user |
michael@0 | 815 | datetime.datetime.fromtimestamp(user.started) |
michael@0 | 816 | |
michael@0 | 817 | |
michael@0 | 818 | # =================================================================== |
michael@0 | 819 | # --- psutil.Process class tests |
michael@0 | 820 | # =================================================================== |
michael@0 | 821 | |
michael@0 | 822 | class TestProcess(unittest.TestCase): |
michael@0 | 823 | """Tests for psutil.Process class.""" |
michael@0 | 824 | |
michael@0 | 825 | def setUp(self): |
michael@0 | 826 | safe_remove(TESTFN) |
michael@0 | 827 | |
michael@0 | 828 | def tearDown(self): |
michael@0 | 829 | reap_children() |
michael@0 | 830 | |
michael@0 | 831 | def test_kill(self): |
michael@0 | 832 | sproc = get_test_subprocess(wait=True) |
michael@0 | 833 | test_pid = sproc.pid |
michael@0 | 834 | p = psutil.Process(test_pid) |
michael@0 | 835 | name = p.name |
michael@0 | 836 | p.kill() |
michael@0 | 837 | p.wait() |
michael@0 | 838 | self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
michael@0 | 839 | |
michael@0 | 840 | def test_terminate(self): |
michael@0 | 841 | sproc = get_test_subprocess(wait=True) |
michael@0 | 842 | test_pid = sproc.pid |
michael@0 | 843 | p = psutil.Process(test_pid) |
michael@0 | 844 | name = p.name |
michael@0 | 845 | p.terminate() |
michael@0 | 846 | p.wait() |
michael@0 | 847 | self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
michael@0 | 848 | |
michael@0 | 849 | def test_send_signal(self): |
michael@0 | 850 | if POSIX: |
michael@0 | 851 | sig = signal.SIGKILL |
michael@0 | 852 | else: |
michael@0 | 853 | sig = signal.SIGTERM |
michael@0 | 854 | sproc = get_test_subprocess() |
michael@0 | 855 | test_pid = sproc.pid |
michael@0 | 856 | p = psutil.Process(test_pid) |
michael@0 | 857 | name = p.name |
michael@0 | 858 | p.send_signal(sig) |
michael@0 | 859 | p.wait() |
michael@0 | 860 | self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
michael@0 | 861 | |
michael@0 | 862 | def test_wait(self): |
michael@0 | 863 | # check exit code signal |
michael@0 | 864 | sproc = get_test_subprocess() |
michael@0 | 865 | p = psutil.Process(sproc.pid) |
michael@0 | 866 | p.kill() |
michael@0 | 867 | code = p.wait() |
michael@0 | 868 | if os.name == 'posix': |
michael@0 | 869 | self.assertEqual(code, signal.SIGKILL) |
michael@0 | 870 | else: |
michael@0 | 871 | self.assertEqual(code, 0) |
michael@0 | 872 | self.assertFalse(p.is_running()) |
michael@0 | 873 | |
michael@0 | 874 | sproc = get_test_subprocess() |
michael@0 | 875 | p = psutil.Process(sproc.pid) |
michael@0 | 876 | p.terminate() |
michael@0 | 877 | code = p.wait() |
michael@0 | 878 | if os.name == 'posix': |
michael@0 | 879 | self.assertEqual(code, signal.SIGTERM) |
michael@0 | 880 | else: |
michael@0 | 881 | self.assertEqual(code, 0) |
michael@0 | 882 | self.assertFalse(p.is_running()) |
michael@0 | 883 | |
michael@0 | 884 | # check sys.exit() code |
michael@0 | 885 | code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
michael@0 | 886 | sproc = get_test_subprocess([PYTHON, "-c", code]) |
michael@0 | 887 | p = psutil.Process(sproc.pid) |
michael@0 | 888 | self.assertEqual(p.wait(), 5) |
michael@0 | 889 | self.assertFalse(p.is_running()) |
michael@0 | 890 | |
michael@0 | 891 | # Test wait() issued twice. |
michael@0 | 892 | # It is not supposed to raise NSP when the process is gone. |
michael@0 | 893 | # On UNIX this should return None, on Windows it should keep |
michael@0 | 894 | # returning the exit code. |
michael@0 | 895 | sproc = get_test_subprocess([PYTHON, "-c", code]) |
michael@0 | 896 | p = psutil.Process(sproc.pid) |
michael@0 | 897 | self.assertEqual(p.wait(), 5) |
michael@0 | 898 | self.assertIn(p.wait(), (5, None)) |
michael@0 | 899 | |
michael@0 | 900 | # test timeout |
michael@0 | 901 | sproc = get_test_subprocess() |
michael@0 | 902 | p = psutil.Process(sproc.pid) |
michael@0 | 903 | p.name |
michael@0 | 904 | self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
michael@0 | 905 | |
michael@0 | 906 | # timeout < 0 not allowed |
michael@0 | 907 | self.assertRaises(ValueError, p.wait, -1) |
michael@0 | 908 | |
michael@0 | 909 | @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows? |
michael@0 | 910 | def test_wait_non_children(self): |
michael@0 | 911 | # test wait() against processes which are not our children |
michael@0 | 912 | code = "import sys;" |
michael@0 | 913 | code += "from subprocess import Popen, PIPE;" |
michael@0 | 914 | code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON |
michael@0 | 915 | code += "sp = Popen(cmd, stdout=PIPE);" |
michael@0 | 916 | code += "sys.stdout.write(str(sp.pid));" |
michael@0 | 917 | sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE) |
michael@0 | 918 | |
michael@0 | 919 | grandson_pid = int(sproc.stdout.read()) |
michael@0 | 920 | grandson_proc = psutil.Process(grandson_pid) |
michael@0 | 921 | try: |
michael@0 | 922 | self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
michael@0 | 923 | grandson_proc.kill() |
michael@0 | 924 | ret = grandson_proc.wait() |
michael@0 | 925 | self.assertEqual(ret, None) |
michael@0 | 926 | finally: |
michael@0 | 927 | if grandson_proc.is_running(): |
michael@0 | 928 | grandson_proc.kill() |
michael@0 | 929 | grandson_proc.wait() |
michael@0 | 930 | |
michael@0 | 931 | def test_wait_timeout_0(self): |
michael@0 | 932 | sproc = get_test_subprocess() |
michael@0 | 933 | p = psutil.Process(sproc.pid) |
michael@0 | 934 | self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
michael@0 | 935 | p.kill() |
michael@0 | 936 | stop_at = time.time() + 2 |
michael@0 | 937 | while 1: |
michael@0 | 938 | try: |
michael@0 | 939 | code = p.wait(0) |
michael@0 | 940 | except psutil.TimeoutExpired: |
michael@0 | 941 | if time.time() >= stop_at: |
michael@0 | 942 | raise |
michael@0 | 943 | else: |
michael@0 | 944 | break |
michael@0 | 945 | if os.name == 'posix': |
michael@0 | 946 | self.assertEqual(code, signal.SIGKILL) |
michael@0 | 947 | else: |
michael@0 | 948 | self.assertEqual(code, 0) |
michael@0 | 949 | self.assertFalse(p.is_running()) |
michael@0 | 950 | |
michael@0 | 951 | def test_cpu_percent(self): |
michael@0 | 952 | p = psutil.Process(os.getpid()) |
michael@0 | 953 | p.get_cpu_percent(interval=0.001) |
michael@0 | 954 | p.get_cpu_percent(interval=0.001) |
michael@0 | 955 | for x in range(100): |
michael@0 | 956 | percent = p.get_cpu_percent(interval=None) |
michael@0 | 957 | self.assertIsInstance(percent, float) |
michael@0 | 958 | self.assertGreaterEqual(percent, 0.0) |
michael@0 | 959 | if os.name != 'posix': |
michael@0 | 960 | self.assertLessEqual(percent, 100.0) |
michael@0 | 961 | else: |
michael@0 | 962 | self.assertGreaterEqual(percent, 0.0) |
michael@0 | 963 | |
michael@0 | 964 | def test_cpu_times(self): |
michael@0 | 965 | times = psutil.Process(os.getpid()).get_cpu_times() |
michael@0 | 966 | assert (times.user > 0.0) or (times.system > 0.0), times |
michael@0 | 967 | # make sure returned values can be pretty printed with strftime |
michael@0 | 968 | time.strftime("%H:%M:%S", time.localtime(times.user)) |
michael@0 | 969 | time.strftime("%H:%M:%S", time.localtime(times.system)) |
michael@0 | 970 | |
michael@0 | 971 | # Test Process.cpu_times() against os.times() |
michael@0 | 972 | # os.times() is broken on Python 2.6 |
michael@0 | 973 | # http://bugs.python.org/issue1040026 |
michael@0 | 974 | # XXX fails on OSX: not sure if it's for os.times(). We should |
michael@0 | 975 | # try this with Python 2.7 and re-enable the test. |
michael@0 | 976 | |
michael@0 | 977 | @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, |
michael@0 | 978 | 'os.times() is not reliable on this Python version') |
michael@0 | 979 | def test_cpu_times2(self): |
michael@0 | 980 | user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
michael@0 | 981 | utime, ktime = os.times()[:2] |
michael@0 | 982 | |
michael@0 | 983 | # Use os.times()[:2] as base values to compare our results |
michael@0 | 984 | # using a tolerance of +/- 0.1 seconds. |
michael@0 | 985 | # It will fail if the difference between the values is > 0.1s. |
michael@0 | 986 | if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
michael@0 | 987 | self.fail("expected: %s, found: %s" %(utime, user_time)) |
michael@0 | 988 | |
michael@0 | 989 | if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
michael@0 | 990 | self.fail("expected: %s, found: %s" %(ktime, kernel_time)) |
michael@0 | 991 | |
michael@0 | 992 | def test_create_time(self): |
michael@0 | 993 | sproc = get_test_subprocess(wait=True) |
michael@0 | 994 | now = time.time() |
michael@0 | 995 | p = psutil.Process(sproc.pid) |
michael@0 | 996 | create_time = p.create_time |
michael@0 | 997 | |
michael@0 | 998 | # Use time.time() as base value to compare our result using a |
michael@0 | 999 | # tolerance of +/- 1 second. |
michael@0 | 1000 | # It will fail if the difference between the values is > 2s. |
michael@0 | 1001 | difference = abs(create_time - now) |
michael@0 | 1002 | if difference > 2: |
michael@0 | 1003 | self.fail("expected: %s, found: %s, difference: %s" |
michael@0 | 1004 | % (now, create_time, difference)) |
michael@0 | 1005 | |
michael@0 | 1006 | # make sure returned value can be pretty printed with strftime |
michael@0 | 1007 | time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
michael@0 | 1008 | |
michael@0 | 1009 | @unittest.skipIf(WINDOWS, 'windows only') |
michael@0 | 1010 | def test_terminal(self): |
michael@0 | 1011 | terminal = psutil.Process(os.getpid()).terminal |
michael@0 | 1012 | if sys.stdin.isatty(): |
michael@0 | 1013 | self.assertEqual(terminal, sh('tty')) |
michael@0 | 1014 | else: |
michael@0 | 1015 | assert terminal, repr(terminal) |
michael@0 | 1016 | |
michael@0 | 1017 | @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'), |
michael@0 | 1018 | 'not available on this platform') |
michael@0 | 1019 | @skip_on_not_implemented(only_if=LINUX) |
michael@0 | 1020 | def test_get_io_counters(self): |
michael@0 | 1021 | p = psutil.Process(os.getpid()) |
michael@0 | 1022 | # test reads |
michael@0 | 1023 | io1 = p.get_io_counters() |
michael@0 | 1024 | f = open(PYTHON, 'rb') |
michael@0 | 1025 | f.read() |
michael@0 | 1026 | f.close() |
michael@0 | 1027 | io2 = p.get_io_counters() |
michael@0 | 1028 | if not BSD: |
michael@0 | 1029 | assert io2.read_count > io1.read_count, (io1, io2) |
michael@0 | 1030 | self.assertEqual(io2.write_count, io1.write_count) |
michael@0 | 1031 | assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
michael@0 | 1032 | assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
michael@0 | 1033 | # test writes |
michael@0 | 1034 | io1 = p.get_io_counters() |
michael@0 | 1035 | f = tempfile.TemporaryFile() |
michael@0 | 1036 | if PY3: |
michael@0 | 1037 | f.write(bytes("x" * 1000000, 'ascii')) |
michael@0 | 1038 | else: |
michael@0 | 1039 | f.write("x" * 1000000) |
michael@0 | 1040 | f.close() |
michael@0 | 1041 | io2 = p.get_io_counters() |
michael@0 | 1042 | assert io2.write_count >= io1.write_count, (io1, io2) |
michael@0 | 1043 | assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
michael@0 | 1044 | assert io2.read_count >= io1.read_count, (io1, io2) |
michael@0 | 1045 | assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
michael@0 | 1046 | |
michael@0 | 1047 | # Linux and Windows Vista+ |
michael@0 | 1048 | @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'), |
michael@0 | 1049 | 'Linux and Windows Vista only') |
michael@0 | 1050 | def test_get_set_ionice(self): |
michael@0 | 1051 | if LINUX: |
michael@0 | 1052 | from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, |
michael@0 | 1053 | IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) |
michael@0 | 1054 | self.assertEqual(IOPRIO_CLASS_NONE, 0) |
michael@0 | 1055 | self.assertEqual(IOPRIO_CLASS_RT, 1) |
michael@0 | 1056 | self.assertEqual(IOPRIO_CLASS_BE, 2) |
michael@0 | 1057 | self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
michael@0 | 1058 | p = psutil.Process(os.getpid()) |
michael@0 | 1059 | try: |
michael@0 | 1060 | p.set_ionice(2) |
michael@0 | 1061 | ioclass, value = p.get_ionice() |
michael@0 | 1062 | self.assertEqual(ioclass, 2) |
michael@0 | 1063 | self.assertEqual(value, 4) |
michael@0 | 1064 | # |
michael@0 | 1065 | p.set_ionice(3) |
michael@0 | 1066 | ioclass, value = p.get_ionice() |
michael@0 | 1067 | self.assertEqual(ioclass, 3) |
michael@0 | 1068 | self.assertEqual(value, 0) |
michael@0 | 1069 | # |
michael@0 | 1070 | p.set_ionice(2, 0) |
michael@0 | 1071 | ioclass, value = p.get_ionice() |
michael@0 | 1072 | self.assertEqual(ioclass, 2) |
michael@0 | 1073 | self.assertEqual(value, 0) |
michael@0 | 1074 | p.set_ionice(2, 7) |
michael@0 | 1075 | ioclass, value = p.get_ionice() |
michael@0 | 1076 | self.assertEqual(ioclass, 2) |
michael@0 | 1077 | self.assertEqual(value, 7) |
michael@0 | 1078 | self.assertRaises(ValueError, p.set_ionice, 2, 10) |
michael@0 | 1079 | finally: |
michael@0 | 1080 | p.set_ionice(IOPRIO_CLASS_NONE) |
michael@0 | 1081 | else: |
michael@0 | 1082 | p = psutil.Process(os.getpid()) |
michael@0 | 1083 | original = p.get_ionice() |
michael@0 | 1084 | try: |
michael@0 | 1085 | value = 0 # very low |
michael@0 | 1086 | if original == value: |
michael@0 | 1087 | value = 1 # low |
michael@0 | 1088 | p.set_ionice(value) |
michael@0 | 1089 | self.assertEqual(p.get_ionice(), value) |
michael@0 | 1090 | finally: |
michael@0 | 1091 | p.set_ionice(original) |
michael@0 | 1092 | # |
michael@0 | 1093 | self.assertRaises(ValueError, p.set_ionice, 3) |
michael@0 | 1094 | self.assertRaises(TypeError, p.set_ionice, 2, 1) |
michael@0 | 1095 | |
michael@0 | 1096 | def test_get_num_threads(self): |
michael@0 | 1097 | # on certain platforms such as Linux we might test for exact |
michael@0 | 1098 | # thread number, since we always have with 1 thread per process, |
michael@0 | 1099 | # but this does not apply across all platforms (OSX, Windows) |
michael@0 | 1100 | p = psutil.Process(os.getpid()) |
michael@0 | 1101 | step1 = p.get_num_threads() |
michael@0 | 1102 | |
michael@0 | 1103 | thread = ThreadTask() |
michael@0 | 1104 | thread.start() |
michael@0 | 1105 | try: |
michael@0 | 1106 | step2 = p.get_num_threads() |
michael@0 | 1107 | self.assertEqual(step2, step1 + 1) |
michael@0 | 1108 | thread.stop() |
michael@0 | 1109 | finally: |
michael@0 | 1110 | if thread._running: |
michael@0 | 1111 | thread.stop() |
michael@0 | 1112 | |
michael@0 | 1113 | @unittest.skipUnless(WINDOWS, 'Windows only') |
michael@0 | 1114 | def test_get_num_handles(self): |
michael@0 | 1115 | # a better test is done later into test/_windows.py |
michael@0 | 1116 | p = psutil.Process(os.getpid()) |
michael@0 | 1117 | self.assertGreater(p.get_num_handles(), 0) |
michael@0 | 1118 | |
michael@0 | 1119 | def test_get_threads(self): |
michael@0 | 1120 | p = psutil.Process(os.getpid()) |
michael@0 | 1121 | step1 = p.get_threads() |
michael@0 | 1122 | |
michael@0 | 1123 | thread = ThreadTask() |
michael@0 | 1124 | thread.start() |
michael@0 | 1125 | |
michael@0 | 1126 | try: |
michael@0 | 1127 | step2 = p.get_threads() |
michael@0 | 1128 | self.assertEqual(len(step2), len(step1) + 1) |
michael@0 | 1129 | # on Linux, first thread id is supposed to be this process |
michael@0 | 1130 | if LINUX: |
michael@0 | 1131 | self.assertEqual(step2[0].id, os.getpid()) |
michael@0 | 1132 | athread = step2[0] |
michael@0 | 1133 | # test named tuple |
michael@0 | 1134 | self.assertEqual(athread.id, athread[0]) |
michael@0 | 1135 | self.assertEqual(athread.user_time, athread[1]) |
michael@0 | 1136 | self.assertEqual(athread.system_time, athread[2]) |
michael@0 | 1137 | # test num threads |
michael@0 | 1138 | thread.stop() |
michael@0 | 1139 | finally: |
michael@0 | 1140 | if thread._running: |
michael@0 | 1141 | thread.stop() |
michael@0 | 1142 | |
michael@0 | 1143 | def test_get_memory_info(self): |
michael@0 | 1144 | p = psutil.Process(os.getpid()) |
michael@0 | 1145 | |
michael@0 | 1146 | # step 1 - get a base value to compare our results |
michael@0 | 1147 | rss1, vms1 = p.get_memory_info() |
michael@0 | 1148 | percent1 = p.get_memory_percent() |
michael@0 | 1149 | self.assertGreater(rss1, 0) |
michael@0 | 1150 | self.assertGreater(vms1, 0) |
michael@0 | 1151 | |
michael@0 | 1152 | # step 2 - allocate some memory |
michael@0 | 1153 | memarr = [None] * 1500000 |
michael@0 | 1154 | |
michael@0 | 1155 | rss2, vms2 = p.get_memory_info() |
michael@0 | 1156 | percent2 = p.get_memory_percent() |
michael@0 | 1157 | # make sure that the memory usage bumped up |
michael@0 | 1158 | self.assertGreater(rss2, rss1) |
michael@0 | 1159 | self.assertGreaterEqual(vms2, vms1) # vms might be equal |
michael@0 | 1160 | self.assertGreater(percent2, percent1) |
michael@0 | 1161 | del memarr |
michael@0 | 1162 | |
michael@0 | 1163 | # def test_get_ext_memory_info(self): |
michael@0 | 1164 | # # tested later in fetch all test suite |
michael@0 | 1165 | |
michael@0 | 1166 | def test_get_memory_maps(self): |
michael@0 | 1167 | p = psutil.Process(os.getpid()) |
michael@0 | 1168 | maps = p.get_memory_maps() |
michael@0 | 1169 | paths = [x for x in maps] |
michael@0 | 1170 | self.assertEqual(len(paths), len(set(paths))) |
michael@0 | 1171 | ext_maps = p.get_memory_maps(grouped=False) |
michael@0 | 1172 | |
michael@0 | 1173 | for nt in maps: |
michael@0 | 1174 | if not nt.path.startswith('['): |
michael@0 | 1175 | assert os.path.isabs(nt.path), nt.path |
michael@0 | 1176 | if POSIX: |
michael@0 | 1177 | assert os.path.exists(nt.path), nt.path |
michael@0 | 1178 | else: |
michael@0 | 1179 | # XXX - On Windows we have this strange behavior with |
michael@0 | 1180 | # 64 bit dlls: they are visible via explorer but cannot |
michael@0 | 1181 | # be accessed via os.stat() (wtf?). |
michael@0 | 1182 | if '64' not in os.path.basename(nt.path): |
michael@0 | 1183 | assert os.path.exists(nt.path), nt.path |
michael@0 | 1184 | for nt in ext_maps: |
michael@0 | 1185 | for fname in nt._fields: |
michael@0 | 1186 | value = getattr(nt, fname) |
michael@0 | 1187 | if fname == 'path': |
michael@0 | 1188 | continue |
michael@0 | 1189 | elif fname in ('addr', 'perms'): |
michael@0 | 1190 | assert value, value |
michael@0 | 1191 | else: |
michael@0 | 1192 | self.assertIsInstance(value, (int, long)) |
michael@0 | 1193 | assert value >= 0, value |
michael@0 | 1194 | |
michael@0 | 1195 | def test_get_memory_percent(self): |
michael@0 | 1196 | p = psutil.Process(os.getpid()) |
michael@0 | 1197 | self.assertGreater(p.get_memory_percent(), 0.0) |
michael@0 | 1198 | |
michael@0 | 1199 | def test_pid(self): |
michael@0 | 1200 | sproc = get_test_subprocess() |
michael@0 | 1201 | self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
michael@0 | 1202 | |
michael@0 | 1203 | def test_is_running(self): |
michael@0 | 1204 | sproc = get_test_subprocess(wait=True) |
michael@0 | 1205 | p = psutil.Process(sproc.pid) |
michael@0 | 1206 | assert p.is_running() |
michael@0 | 1207 | assert p.is_running() |
michael@0 | 1208 | p.kill() |
michael@0 | 1209 | p.wait() |
michael@0 | 1210 | assert not p.is_running() |
michael@0 | 1211 | assert not p.is_running() |
michael@0 | 1212 | |
michael@0 | 1213 | def test_exe(self): |
michael@0 | 1214 | sproc = get_test_subprocess(wait=True) |
michael@0 | 1215 | exe = psutil.Process(sproc.pid).exe |
michael@0 | 1216 | try: |
michael@0 | 1217 | self.assertEqual(exe, PYTHON) |
michael@0 | 1218 | except AssertionError: |
michael@0 | 1219 | if WINDOWS and len(exe) == len(PYTHON): |
michael@0 | 1220 | # on Windows we don't care about case sensitivity |
michael@0 | 1221 | self.assertEqual(exe.lower(), PYTHON.lower()) |
michael@0 | 1222 | else: |
michael@0 | 1223 | # certain platforms such as BSD are more accurate returning: |
michael@0 | 1224 | # "/usr/local/bin/python2.7" |
michael@0 | 1225 | # ...instead of: |
michael@0 | 1226 | # "/usr/local/bin/python" |
michael@0 | 1227 | # We do not want to consider this difference in accuracy |
michael@0 | 1228 | # an error. |
michael@0 | 1229 | ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) |
michael@0 | 1230 | self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) |
michael@0 | 1231 | |
michael@0 | 1232 | def test_cmdline(self): |
michael@0 | 1233 | cmdline = [PYTHON, "-c", "import time; time.sleep(2)"] |
michael@0 | 1234 | sproc = get_test_subprocess(cmdline, wait=True) |
michael@0 | 1235 | self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline), |
michael@0 | 1236 | ' '.join(cmdline)) |
michael@0 | 1237 | |
michael@0 | 1238 | def test_name(self): |
michael@0 | 1239 | sproc = get_test_subprocess(PYTHON, wait=True) |
michael@0 | 1240 | name = psutil.Process(sproc.pid).name.lower() |
michael@0 | 1241 | pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() |
michael@0 | 1242 | assert pyexe.startswith(name), (pyexe, name) |
michael@0 | 1243 | |
michael@0 | 1244 | @unittest.skipUnless(POSIX, 'posix only') |
michael@0 | 1245 | def test_uids(self): |
michael@0 | 1246 | p = psutil.Process(os.getpid()) |
michael@0 | 1247 | real, effective, saved = p.uids |
michael@0 | 1248 | # os.getuid() refers to "real" uid |
michael@0 | 1249 | self.assertEqual(real, os.getuid()) |
michael@0 | 1250 | # os.geteuid() refers to "effective" uid |
michael@0 | 1251 | self.assertEqual(effective, os.geteuid()) |
michael@0 | 1252 | # no such thing as os.getsuid() ("saved" uid), but starting |
michael@0 | 1253 | # from python 2.7 we have os.getresuid()[2] |
michael@0 | 1254 | if hasattr(os, "getresuid"): |
michael@0 | 1255 | self.assertEqual(saved, os.getresuid()[2]) |
michael@0 | 1256 | |
michael@0 | 1257 | @unittest.skipUnless(POSIX, 'posix only') |
michael@0 | 1258 | def test_gids(self): |
michael@0 | 1259 | p = psutil.Process(os.getpid()) |
michael@0 | 1260 | real, effective, saved = p.gids |
michael@0 | 1261 | # os.getuid() refers to "real" uid |
michael@0 | 1262 | self.assertEqual(real, os.getgid()) |
michael@0 | 1263 | # os.geteuid() refers to "effective" uid |
michael@0 | 1264 | self.assertEqual(effective, os.getegid()) |
michael@0 | 1265 | # no such thing as os.getsuid() ("saved" uid), but starting |
michael@0 | 1266 | # from python 2.7 we have os.getresgid()[2] |
michael@0 | 1267 | if hasattr(os, "getresuid"): |
michael@0 | 1268 | self.assertEqual(saved, os.getresgid()[2]) |
michael@0 | 1269 | |
michael@0 | 1270 | def test_nice(self): |
michael@0 | 1271 | p = psutil.Process(os.getpid()) |
michael@0 | 1272 | self.assertRaises(TypeError, p.set_nice, "str") |
michael@0 | 1273 | if os.name == 'nt': |
michael@0 | 1274 | try: |
michael@0 | 1275 | self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
michael@0 | 1276 | p.set_nice(psutil.HIGH_PRIORITY_CLASS) |
michael@0 | 1277 | self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS) |
michael@0 | 1278 | p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
michael@0 | 1279 | self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) |
michael@0 | 1280 | finally: |
michael@0 | 1281 | p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
michael@0 | 1282 | else: |
michael@0 | 1283 | try: |
michael@0 | 1284 | try: |
michael@0 | 1285 | first_nice = p.get_nice() |
michael@0 | 1286 | p.set_nice(1) |
michael@0 | 1287 | self.assertEqual(p.get_nice(), 1) |
michael@0 | 1288 | # going back to previous nice value raises AccessDenied on OSX |
michael@0 | 1289 | if not OSX: |
michael@0 | 1290 | p.set_nice(0) |
michael@0 | 1291 | self.assertEqual(p.get_nice(), 0) |
michael@0 | 1292 | except psutil.AccessDenied: |
michael@0 | 1293 | pass |
michael@0 | 1294 | finally: |
michael@0 | 1295 | try: |
michael@0 | 1296 | p.set_nice(first_nice) |
michael@0 | 1297 | except psutil.AccessDenied: |
michael@0 | 1298 | pass |
michael@0 | 1299 | |
michael@0 | 1300 | def test_status(self): |
michael@0 | 1301 | p = psutil.Process(os.getpid()) |
michael@0 | 1302 | self.assertEqual(p.status, psutil.STATUS_RUNNING) |
michael@0 | 1303 | self.assertEqual(str(p.status), "running") |
michael@0 | 1304 | |
michael@0 | 1305 | def test_status_constants(self): |
michael@0 | 1306 | # STATUS_* constants are supposed to be comparable also by |
michael@0 | 1307 | # using their str representation |
michael@0 | 1308 | self.assertTrue(psutil.STATUS_RUNNING == 0) |
michael@0 | 1309 | self.assertTrue(psutil.STATUS_RUNNING == long(0)) |
michael@0 | 1310 | self.assertTrue(psutil.STATUS_RUNNING == 'running') |
michael@0 | 1311 | self.assertFalse(psutil.STATUS_RUNNING == 1) |
michael@0 | 1312 | self.assertFalse(psutil.STATUS_RUNNING == 'sleeping') |
michael@0 | 1313 | self.assertFalse(psutil.STATUS_RUNNING != 0) |
michael@0 | 1314 | self.assertFalse(psutil.STATUS_RUNNING != 'running') |
michael@0 | 1315 | self.assertTrue(psutil.STATUS_RUNNING != 1) |
michael@0 | 1316 | self.assertTrue(psutil.STATUS_RUNNING != 'sleeping') |
michael@0 | 1317 | |
michael@0 | 1318 | def test_username(self): |
michael@0 | 1319 | sproc = get_test_subprocess() |
michael@0 | 1320 | p = psutil.Process(sproc.pid) |
michael@0 | 1321 | if POSIX: |
michael@0 | 1322 | import pwd |
michael@0 | 1323 | self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
michael@0 | 1324 | elif WINDOWS and 'USERNAME' in os.environ: |
michael@0 | 1325 | expected_username = os.environ['USERNAME'] |
michael@0 | 1326 | expected_domain = os.environ['USERDOMAIN'] |
michael@0 | 1327 | domain, username = p.username.split('\\') |
michael@0 | 1328 | self.assertEqual(domain, expected_domain) |
michael@0 | 1329 | self.assertEqual(username, expected_username) |
michael@0 | 1330 | else: |
michael@0 | 1331 | p.username |
michael@0 | 1332 | |
michael@0 | 1333 | @unittest.skipUnless(hasattr(psutil.Process, "getcwd"), |
michael@0 | 1334 | 'not available on this platform') |
michael@0 | 1335 | def test_getcwd(self): |
michael@0 | 1336 | sproc = get_test_subprocess(wait=True) |
michael@0 | 1337 | p = psutil.Process(sproc.pid) |
michael@0 | 1338 | self.assertEqual(p.getcwd(), os.getcwd()) |
michael@0 | 1339 | |
michael@0 | 1340 | @unittest.skipIf(not hasattr(psutil.Process, "getcwd"), |
michael@0 | 1341 | 'not available on this platform') |
michael@0 | 1342 | def test_getcwd_2(self): |
michael@0 | 1343 | cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"] |
michael@0 | 1344 | sproc = get_test_subprocess(cmd, wait=True) |
michael@0 | 1345 | p = psutil.Process(sproc.pid) |
michael@0 | 1346 | call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1) |
michael@0 | 1347 | |
michael@0 | 1348 | @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"), |
michael@0 | 1349 | 'not available on this platform') |
michael@0 | 1350 | def test_cpu_affinity(self): |
michael@0 | 1351 | p = psutil.Process(os.getpid()) |
michael@0 | 1352 | initial = p.get_cpu_affinity() |
michael@0 | 1353 | all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) |
michael@0 | 1354 | # |
michael@0 | 1355 | for n in all_cpus: |
michael@0 | 1356 | p.set_cpu_affinity([n]) |
michael@0 | 1357 | self.assertEqual(p.get_cpu_affinity(), [n]) |
michael@0 | 1358 | # |
michael@0 | 1359 | p.set_cpu_affinity(all_cpus) |
michael@0 | 1360 | self.assertEqual(p.get_cpu_affinity(), all_cpus) |
michael@0 | 1361 | # |
michael@0 | 1362 | p.set_cpu_affinity(initial) |
michael@0 | 1363 | invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] |
michael@0 | 1364 | self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu) |
michael@0 | 1365 | |
michael@0 | 1366 | def test_get_open_files(self): |
michael@0 | 1367 | # current process |
michael@0 | 1368 | p = psutil.Process(os.getpid()) |
michael@0 | 1369 | files = p.get_open_files() |
michael@0 | 1370 | self.assertFalse(TESTFN in files) |
michael@0 | 1371 | f = open(TESTFN, 'w') |
michael@0 | 1372 | call_until(p.get_open_files, "len(ret) != %i" % len(files)) |
michael@0 | 1373 | filenames = [x.path for x in p.get_open_files()] |
michael@0 | 1374 | self.assertIn(TESTFN, filenames) |
michael@0 | 1375 | f.close() |
michael@0 | 1376 | for file in filenames: |
michael@0 | 1377 | assert os.path.isfile(file), file |
michael@0 | 1378 | |
michael@0 | 1379 | # another process |
michael@0 | 1380 | cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN |
michael@0 | 1381 | sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) |
michael@0 | 1382 | p = psutil.Process(sproc.pid) |
michael@0 | 1383 | for x in range(100): |
michael@0 | 1384 | filenames = [x.path for x in p.get_open_files()] |
michael@0 | 1385 | if TESTFN in filenames: |
michael@0 | 1386 | break |
michael@0 | 1387 | time.sleep(.01) |
michael@0 | 1388 | else: |
michael@0 | 1389 | self.assertIn(TESTFN, filenames) |
michael@0 | 1390 | for file in filenames: |
michael@0 | 1391 | assert os.path.isfile(file), file |
michael@0 | 1392 | |
michael@0 | 1393 | def test_get_open_files2(self): |
michael@0 | 1394 | # test fd and path fields |
michael@0 | 1395 | fileobj = open(TESTFN, 'w') |
michael@0 | 1396 | p = psutil.Process(os.getpid()) |
michael@0 | 1397 | for path, fd in p.get_open_files(): |
michael@0 | 1398 | if path == fileobj.name or fd == fileobj.fileno(): |
michael@0 | 1399 | break |
michael@0 | 1400 | else: |
michael@0 | 1401 | self.fail("no file found; files=%s" % repr(p.get_open_files())) |
michael@0 | 1402 | self.assertEqual(path, fileobj.name) |
michael@0 | 1403 | if WINDOWS: |
michael@0 | 1404 | self.assertEqual(fd, -1) |
michael@0 | 1405 | else: |
michael@0 | 1406 | self.assertEqual(fd, fileobj.fileno()) |
michael@0 | 1407 | # test positions |
michael@0 | 1408 | ntuple = p.get_open_files()[0] |
michael@0 | 1409 | self.assertEqual(ntuple[0], ntuple.path) |
michael@0 | 1410 | self.assertEqual(ntuple[1], ntuple.fd) |
michael@0 | 1411 | # test file is gone |
michael@0 | 1412 | fileobj.close() |
michael@0 | 1413 | self.assertTrue(fileobj.name not in p.get_open_files()) |
michael@0 | 1414 | |
michael@0 | 1415 | def test_connection_constants(self): |
michael@0 | 1416 | ints = [] |
michael@0 | 1417 | strs = [] |
michael@0 | 1418 | for name in dir(psutil): |
michael@0 | 1419 | if name.startswith('CONN_'): |
michael@0 | 1420 | num = getattr(psutil, name) |
michael@0 | 1421 | str_ = str(num) |
michael@0 | 1422 | assert str_.isupper(), str_ |
michael@0 | 1423 | assert str_ not in strs, str_ |
michael@0 | 1424 | assert num not in ints, num |
michael@0 | 1425 | ints.append(num) |
michael@0 | 1426 | strs.append(str_) |
michael@0 | 1427 | if SUNOS: |
michael@0 | 1428 | psutil.CONN_IDLE |
michael@0 | 1429 | psutil.CONN_BOUND |
michael@0 | 1430 | if WINDOWS: |
michael@0 | 1431 | psutil.CONN_DELETE_TCB |
michael@0 | 1432 | |
michael@0 | 1433 | def test_get_connections(self): |
michael@0 | 1434 | arg = "import socket, time;" \ |
michael@0 | 1435 | "s = socket.socket();" \ |
michael@0 | 1436 | "s.bind(('127.0.0.1', 0));" \ |
michael@0 | 1437 | "s.listen(1);" \ |
michael@0 | 1438 | "conn, addr = s.accept();" \ |
michael@0 | 1439 | "time.sleep(2);" |
michael@0 | 1440 | sproc = get_test_subprocess([PYTHON, "-c", arg]) |
michael@0 | 1441 | p = psutil.Process(sproc.pid) |
michael@0 | 1442 | for x in range(100): |
michael@0 | 1443 | if p.get_connections(): |
michael@0 | 1444 | # give the subprocess some more time to bind() |
michael@0 | 1445 | time.sleep(.01) |
michael@0 | 1446 | cons = p.get_connections() |
michael@0 | 1447 | break |
michael@0 | 1448 | time.sleep(.01) |
michael@0 | 1449 | self.assertEqual(len(cons), 1) |
michael@0 | 1450 | con = cons[0] |
michael@0 | 1451 | self.assertEqual(con.family, socket.AF_INET) |
michael@0 | 1452 | self.assertEqual(con.type, socket.SOCK_STREAM) |
michael@0 | 1453 | self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status)) |
michael@0 | 1454 | ip, port = con.laddr |
michael@0 | 1455 | self.assertEqual(ip, '127.0.0.1') |
michael@0 | 1456 | self.assertEqual(con.raddr, ()) |
michael@0 | 1457 | if WINDOWS or SUNOS: |
michael@0 | 1458 | self.assertEqual(con.fd, -1) |
michael@0 | 1459 | else: |
michael@0 | 1460 | assert con.fd > 0, con |
michael@0 | 1461 | # test positions |
michael@0 | 1462 | self.assertEqual(con[0], con.fd) |
michael@0 | 1463 | self.assertEqual(con[1], con.family) |
michael@0 | 1464 | self.assertEqual(con[2], con.type) |
michael@0 | 1465 | self.assertEqual(con[3], con.laddr) |
michael@0 | 1466 | self.assertEqual(con[4], con.raddr) |
michael@0 | 1467 | self.assertEqual(con[5], con.status) |
michael@0 | 1468 | # test kind arg |
michael@0 | 1469 | self.assertRaises(ValueError, p.get_connections, 'foo') |
michael@0 | 1470 | |
michael@0 | 1471 | @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported') |
michael@0 | 1472 | def test_get_connections_ipv6(self): |
michael@0 | 1473 | s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
michael@0 | 1474 | s.bind(('::1', 0)) |
michael@0 | 1475 | s.listen(1) |
michael@0 | 1476 | cons = psutil.Process(os.getpid()).get_connections() |
michael@0 | 1477 | s.close() |
michael@0 | 1478 | self.assertEqual(len(cons), 1) |
michael@0 | 1479 | self.assertEqual(cons[0].laddr[0], '::1') |
michael@0 | 1480 | |
michael@0 | 1481 | @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported') |
michael@0 | 1482 | def test_get_connections_unix(self): |
michael@0 | 1483 | # tcp |
michael@0 | 1484 | safe_remove(TESTFN) |
michael@0 | 1485 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
michael@0 | 1486 | sock.bind(TESTFN) |
michael@0 | 1487 | conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
michael@0 | 1488 | if conn.fd != -1: # != sunos and windows |
michael@0 | 1489 | self.assertEqual(conn.fd, sock.fileno()) |
michael@0 | 1490 | self.assertEqual(conn.family, socket.AF_UNIX) |
michael@0 | 1491 | self.assertEqual(conn.type, socket.SOCK_STREAM) |
michael@0 | 1492 | self.assertEqual(conn.laddr, TESTFN) |
michael@0 | 1493 | self.assertTrue(not conn.raddr) |
michael@0 | 1494 | self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status)) |
michael@0 | 1495 | sock.close() |
michael@0 | 1496 | # udp |
michael@0 | 1497 | safe_remove(TESTFN) |
michael@0 | 1498 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
michael@0 | 1499 | sock.bind(TESTFN) |
michael@0 | 1500 | conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] |
michael@0 | 1501 | self.assertEqual(conn.type, socket.SOCK_DGRAM) |
michael@0 | 1502 | sock.close() |
michael@0 | 1503 | |
michael@0 | 1504 | @unittest.skipUnless(hasattr(socket, "fromfd"), |
michael@0 | 1505 | 'socket.fromfd() is not availble') |
michael@0 | 1506 | @unittest.skipIf(WINDOWS or SUNOS, |
michael@0 | 1507 | 'connection fd available on this platform') |
michael@0 | 1508 | def test_connection_fromfd(self): |
michael@0 | 1509 | sock = socket.socket() |
michael@0 | 1510 | sock.bind(('localhost', 0)) |
michael@0 | 1511 | sock.listen(1) |
michael@0 | 1512 | p = psutil.Process(os.getpid()) |
michael@0 | 1513 | for conn in p.get_connections(): |
michael@0 | 1514 | if conn.fd == sock.fileno(): |
michael@0 | 1515 | break |
michael@0 | 1516 | else: |
michael@0 | 1517 | sock.close() |
michael@0 | 1518 | self.fail("couldn't find socket fd") |
michael@0 | 1519 | dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
michael@0 | 1520 | try: |
michael@0 | 1521 | self.assertEqual(dupsock.getsockname(), conn.laddr) |
michael@0 | 1522 | self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
michael@0 | 1523 | finally: |
michael@0 | 1524 | sock.close() |
michael@0 | 1525 | dupsock.close() |
michael@0 | 1526 | |
michael@0 | 1527 | def test_get_connections_all(self): |
michael@0 | 1528 | tcp_template = "import socket;" \ |
michael@0 | 1529 | "s = socket.socket($family, socket.SOCK_STREAM);" \ |
michael@0 | 1530 | "s.bind(('$addr', 0));" \ |
michael@0 | 1531 | "s.listen(1);" \ |
michael@0 | 1532 | "conn, addr = s.accept();" |
michael@0 | 1533 | |
michael@0 | 1534 | udp_template = "import socket, time;" \ |
michael@0 | 1535 | "s = socket.socket($family, socket.SOCK_DGRAM);" \ |
michael@0 | 1536 | "s.bind(('$addr', 0));" \ |
michael@0 | 1537 | "time.sleep(2);" |
michael@0 | 1538 | |
michael@0 | 1539 | from string import Template |
michael@0 | 1540 | tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
michael@0 | 1541 | addr="127.0.0.1") |
michael@0 | 1542 | udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
michael@0 | 1543 | addr="127.0.0.1") |
michael@0 | 1544 | tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, |
michael@0 | 1545 | addr="::1") |
michael@0 | 1546 | udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, |
michael@0 | 1547 | addr="::1") |
michael@0 | 1548 | |
michael@0 | 1549 | # launch various subprocess instantiating a socket of various |
michael@0 | 1550 | # families and types to enrich psutil results |
michael@0 | 1551 | tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
michael@0 | 1552 | udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
michael@0 | 1553 | if supports_ipv6(): |
michael@0 | 1554 | tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
michael@0 | 1555 | udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
michael@0 | 1556 | else: |
michael@0 | 1557 | tcp6_proc = None |
michael@0 | 1558 | udp6_proc = None |
michael@0 | 1559 | |
michael@0 | 1560 | # check matches against subprocesses just created |
michael@0 | 1561 | all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
michael@0 | 1562 | "udp", "udp4", "udp6") |
michael@0 | 1563 | for p in psutil.Process(os.getpid()).get_children(): |
michael@0 | 1564 | for conn in p.get_connections(): |
michael@0 | 1565 | # TCP v4 |
michael@0 | 1566 | if p.pid == tcp4_proc.pid: |
michael@0 | 1567 | self.assertEqual(conn.family, socket.AF_INET) |
michael@0 | 1568 | self.assertEqual(conn.type, socket.SOCK_STREAM) |
michael@0 | 1569 | self.assertEqual(conn.laddr[0], "127.0.0.1") |
michael@0 | 1570 | self.assertEqual(conn.raddr, ()) |
michael@0 | 1571 | self.assertEqual(conn.status, psutil.CONN_LISTEN, |
michael@0 | 1572 | str(conn.status)) |
michael@0 | 1573 | for kind in all_kinds: |
michael@0 | 1574 | cons = p.get_connections(kind=kind) |
michael@0 | 1575 | if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
michael@0 | 1576 | assert cons != [], cons |
michael@0 | 1577 | else: |
michael@0 | 1578 | self.assertEqual(cons, [], cons) |
michael@0 | 1579 | # UDP v4 |
michael@0 | 1580 | elif p.pid == udp4_proc.pid: |
michael@0 | 1581 | self.assertEqual(conn.family, socket.AF_INET) |
michael@0 | 1582 | self.assertEqual(conn.type, socket.SOCK_DGRAM) |
michael@0 | 1583 | self.assertEqual(conn.laddr[0], "127.0.0.1") |
michael@0 | 1584 | self.assertEqual(conn.raddr, ()) |
michael@0 | 1585 | self.assertEqual(conn.status, psutil.CONN_NONE, |
michael@0 | 1586 | str(conn.status)) |
michael@0 | 1587 | for kind in all_kinds: |
michael@0 | 1588 | cons = p.get_connections(kind=kind) |
michael@0 | 1589 | if kind in ("all", "inet", "inet4", "udp", "udp4"): |
michael@0 | 1590 | assert cons != [], cons |
michael@0 | 1591 | else: |
michael@0 | 1592 | self.assertEqual(cons, [], cons) |
michael@0 | 1593 | # TCP v6 |
michael@0 | 1594 | elif p.pid == getattr(tcp6_proc, "pid", None): |
michael@0 | 1595 | self.assertEqual(conn.family, socket.AF_INET6) |
michael@0 | 1596 | self.assertEqual(conn.type, socket.SOCK_STREAM) |
michael@0 | 1597 | self.assertIn(conn.laddr[0], ("::", "::1")) |
michael@0 | 1598 | self.assertEqual(conn.raddr, ()) |
michael@0 | 1599 | self.assertEqual(conn.status, psutil.CONN_LISTEN, |
michael@0 | 1600 | str(conn.status)) |
michael@0 | 1601 | for kind in all_kinds: |
michael@0 | 1602 | cons = p.get_connections(kind=kind) |
michael@0 | 1603 | if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
michael@0 | 1604 | assert cons != [], cons |
michael@0 | 1605 | else: |
michael@0 | 1606 | self.assertEqual(cons, [], cons) |
michael@0 | 1607 | # UDP v6 |
michael@0 | 1608 | elif p.pid == getattr(udp6_proc, "pid", None): |
michael@0 | 1609 | self.assertEqual(conn.family, socket.AF_INET6) |
michael@0 | 1610 | self.assertEqual(conn.type, socket.SOCK_DGRAM) |
michael@0 | 1611 | self.assertIn(conn.laddr[0], ("::", "::1")) |
michael@0 | 1612 | self.assertEqual(conn.raddr, ()) |
michael@0 | 1613 | self.assertEqual(conn.status, psutil.CONN_NONE, |
michael@0 | 1614 | str(conn.status)) |
michael@0 | 1615 | for kind in all_kinds: |
michael@0 | 1616 | cons = p.get_connections(kind=kind) |
michael@0 | 1617 | if kind in ("all", "inet", "inet6", "udp", "udp6"): |
michael@0 | 1618 | assert cons != [], cons |
michael@0 | 1619 | else: |
michael@0 | 1620 | self.assertEqual(cons, [], cons) |
michael@0 | 1621 | |
michael@0 | 1622 | @unittest.skipUnless(POSIX, 'posix only') |
michael@0 | 1623 | def test_get_num_fds(self): |
michael@0 | 1624 | p = psutil.Process(os.getpid()) |
michael@0 | 1625 | start = p.get_num_fds() |
michael@0 | 1626 | file = open(TESTFN, 'w') |
michael@0 | 1627 | self.assertEqual(p.get_num_fds(), start + 1) |
michael@0 | 1628 | sock = socket.socket() |
michael@0 | 1629 | self.assertEqual(p.get_num_fds(), start + 2) |
michael@0 | 1630 | file.close() |
michael@0 | 1631 | sock.close() |
michael@0 | 1632 | self.assertEqual(p.get_num_fds(), start) |
michael@0 | 1633 | |
michael@0 | 1634 | @skip_on_not_implemented(only_if=LINUX) |
michael@0 | 1635 | def test_get_num_ctx_switches(self): |
michael@0 | 1636 | p = psutil.Process(os.getpid()) |
michael@0 | 1637 | before = sum(p.get_num_ctx_switches()) |
michael@0 | 1638 | for x in range(500000): |
michael@0 | 1639 | after = sum(p.get_num_ctx_switches()) |
michael@0 | 1640 | if after > before: |
michael@0 | 1641 | return |
michael@0 | 1642 | self.fail("num ctx switches still the same after 50.000 iterations") |
michael@0 | 1643 | |
michael@0 | 1644 | def test_parent_ppid(self): |
michael@0 | 1645 | this_parent = os.getpid() |
michael@0 | 1646 | sproc = get_test_subprocess() |
michael@0 | 1647 | p = psutil.Process(sproc.pid) |
michael@0 | 1648 | self.assertEqual(p.ppid, this_parent) |
michael@0 | 1649 | self.assertEqual(p.parent.pid, this_parent) |
michael@0 | 1650 | # no other process is supposed to have us as parent |
michael@0 | 1651 | for p in psutil.process_iter(): |
michael@0 | 1652 | if p.pid == sproc.pid: |
michael@0 | 1653 | continue |
michael@0 | 1654 | self.assertTrue(p.ppid != this_parent) |
michael@0 | 1655 | |
michael@0 | 1656 | def test_get_children(self): |
michael@0 | 1657 | p = psutil.Process(os.getpid()) |
michael@0 | 1658 | self.assertEqual(p.get_children(), []) |
michael@0 | 1659 | self.assertEqual(p.get_children(recursive=True), []) |
michael@0 | 1660 | sproc = get_test_subprocess() |
michael@0 | 1661 | children1 = p.get_children() |
michael@0 | 1662 | children2 = p.get_children(recursive=True) |
michael@0 | 1663 | for children in (children1, children2): |
michael@0 | 1664 | self.assertEqual(len(children), 1) |
michael@0 | 1665 | self.assertEqual(children[0].pid, sproc.pid) |
michael@0 | 1666 | self.assertEqual(children[0].ppid, os.getpid()) |
michael@0 | 1667 | |
michael@0 | 1668 | def test_get_children_recursive(self): |
michael@0 | 1669 | # here we create a subprocess which creates another one as in: |
michael@0 | 1670 | # A (parent) -> B (child) -> C (grandchild) |
michael@0 | 1671 | s = "import subprocess, os, sys, time;" |
michael@0 | 1672 | s += "PYTHON = os.path.realpath(sys.executable);" |
michael@0 | 1673 | s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];" |
michael@0 | 1674 | s += "subprocess.Popen(cmd);" |
michael@0 | 1675 | s += "time.sleep(2);" |
michael@0 | 1676 | get_test_subprocess(cmd=[PYTHON, "-c", s]) |
michael@0 | 1677 | p = psutil.Process(os.getpid()) |
michael@0 | 1678 | self.assertEqual(len(p.get_children(recursive=False)), 1) |
michael@0 | 1679 | # give the grandchild some time to start |
michael@0 | 1680 | stop_at = time.time() + 1.5 |
michael@0 | 1681 | while time.time() < stop_at: |
michael@0 | 1682 | children = p.get_children(recursive=True) |
michael@0 | 1683 | if len(children) > 1: |
michael@0 | 1684 | break |
michael@0 | 1685 | self.assertEqual(len(children), 2) |
michael@0 | 1686 | self.assertEqual(children[0].ppid, os.getpid()) |
michael@0 | 1687 | self.assertEqual(children[1].ppid, children[0].pid) |
michael@0 | 1688 | |
michael@0 | 1689 | def test_get_children_duplicates(self): |
michael@0 | 1690 | # find the process which has the highest number of children |
michael@0 | 1691 | from psutil._compat import defaultdict |
michael@0 | 1692 | table = defaultdict(int) |
michael@0 | 1693 | for p in psutil.process_iter(): |
michael@0 | 1694 | try: |
michael@0 | 1695 | table[p.ppid] += 1 |
michael@0 | 1696 | except psutil.Error: |
michael@0 | 1697 | pass |
michael@0 | 1698 | # this is the one, now let's make sure there are no duplicates |
michael@0 | 1699 | pid = sorted(table.items(), key=lambda x: x[1])[-1][0] |
michael@0 | 1700 | p = psutil.Process(pid) |
michael@0 | 1701 | try: |
michael@0 | 1702 | c = p.get_children(recursive=True) |
michael@0 | 1703 | except psutil.AccessDenied: # windows |
michael@0 | 1704 | pass |
michael@0 | 1705 | else: |
michael@0 | 1706 | self.assertEqual(len(c), len(set(c))) |
michael@0 | 1707 | |
michael@0 | 1708 | def test_suspend_resume(self): |
michael@0 | 1709 | sproc = get_test_subprocess(wait=True) |
michael@0 | 1710 | p = psutil.Process(sproc.pid) |
michael@0 | 1711 | p.suspend() |
michael@0 | 1712 | for x in range(100): |
michael@0 | 1713 | if p.status == psutil.STATUS_STOPPED: |
michael@0 | 1714 | break |
michael@0 | 1715 | time.sleep(0.01) |
michael@0 | 1716 | self.assertEqual(str(p.status), "stopped") |
michael@0 | 1717 | p.resume() |
michael@0 | 1718 | assert p.status != psutil.STATUS_STOPPED, p.status |
michael@0 | 1719 | |
michael@0 | 1720 | def test_invalid_pid(self): |
michael@0 | 1721 | self.assertRaises(TypeError, psutil.Process, "1") |
michael@0 | 1722 | self.assertRaises(TypeError, psutil.Process, None) |
michael@0 | 1723 | self.assertRaises(ValueError, psutil.Process, -1) |
michael@0 | 1724 | |
michael@0 | 1725 | def test_as_dict(self): |
michael@0 | 1726 | sproc = get_test_subprocess() |
michael@0 | 1727 | p = psutil.Process(sproc.pid) |
michael@0 | 1728 | d = p.as_dict() |
michael@0 | 1729 | try: |
michael@0 | 1730 | import json |
michael@0 | 1731 | except ImportError: |
michael@0 | 1732 | pass |
michael@0 | 1733 | else: |
michael@0 | 1734 | # dict is supposed to be hashable |
michael@0 | 1735 | json.dumps(d) |
michael@0 | 1736 | # |
michael@0 | 1737 | d = p.as_dict(attrs=['exe', 'name']) |
michael@0 | 1738 | self.assertEqual(sorted(d.keys()), ['exe', 'name']) |
michael@0 | 1739 | # |
michael@0 | 1740 | p = psutil.Process(min(psutil.get_pid_list())) |
michael@0 | 1741 | d = p.as_dict(attrs=['get_connections'], ad_value='foo') |
michael@0 | 1742 | if not isinstance(d['connections'], list): |
michael@0 | 1743 | self.assertEqual(d['connections'], 'foo') |
michael@0 | 1744 | |
michael@0 | 1745 | def test_zombie_process(self): |
michael@0 | 1746 | # Test that NoSuchProcess exception gets raised in case the |
michael@0 | 1747 | # process dies after we create the Process object. |
michael@0 | 1748 | # Example: |
michael@0 | 1749 | # >>> proc = Process(1234) |
michael@0 | 1750 | # >>> time.sleep(2) # time-consuming task, process dies in meantime |
michael@0 | 1751 | # >>> proc.name |
michael@0 | 1752 | # Refers to Issue #15 |
michael@0 | 1753 | sproc = get_test_subprocess() |
michael@0 | 1754 | p = psutil.Process(sproc.pid) |
michael@0 | 1755 | p.kill() |
michael@0 | 1756 | p.wait() |
michael@0 | 1757 | |
michael@0 | 1758 | for name in dir(p): |
michael@0 | 1759 | if name.startswith('_')\ |
michael@0 | 1760 | or name in ('pid', 'send_signal', 'is_running', 'set_ionice', |
michael@0 | 1761 | 'wait', 'set_cpu_affinity', 'create_time', 'set_nice', |
michael@0 | 1762 | 'nice'): |
michael@0 | 1763 | continue |
michael@0 | 1764 | try: |
michael@0 | 1765 | meth = getattr(p, name) |
michael@0 | 1766 | if callable(meth): |
michael@0 | 1767 | meth() |
michael@0 | 1768 | except psutil.NoSuchProcess: |
michael@0 | 1769 | pass |
michael@0 | 1770 | except NotImplementedError: |
michael@0 | 1771 | pass |
michael@0 | 1772 | else: |
michael@0 | 1773 | self.fail("NoSuchProcess exception not raised for %r" % name) |
michael@0 | 1774 | |
michael@0 | 1775 | # other methods |
michael@0 | 1776 | try: |
michael@0 | 1777 | if os.name == 'posix': |
michael@0 | 1778 | p.set_nice(1) |
michael@0 | 1779 | else: |
michael@0 | 1780 | p.set_nice(psutil.NORMAL_PRIORITY_CLASS) |
michael@0 | 1781 | except psutil.NoSuchProcess: |
michael@0 | 1782 | pass |
michael@0 | 1783 | else: |
michael@0 | 1784 | self.fail("exception not raised") |
michael@0 | 1785 | if hasattr(p, 'set_ionice'): |
michael@0 | 1786 | self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) |
michael@0 | 1787 | self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
michael@0 | 1788 | self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0) |
michael@0 | 1789 | self.assertFalse(p.is_running()) |
michael@0 | 1790 | if hasattr(p, "set_cpu_affinity"): |
michael@0 | 1791 | self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0]) |
michael@0 | 1792 | |
michael@0 | 1793 | def test__str__(self): |
michael@0 | 1794 | sproc = get_test_subprocess() |
michael@0 | 1795 | p = psutil.Process(sproc.pid) |
michael@0 | 1796 | self.assertIn(str(sproc.pid), str(p)) |
michael@0 | 1797 | # python shows up as 'Python' in cmdline on OS X so test fails on OS X |
michael@0 | 1798 | if not OSX: |
michael@0 | 1799 | self.assertIn(os.path.basename(PYTHON), str(p)) |
michael@0 | 1800 | sproc = get_test_subprocess() |
michael@0 | 1801 | p = psutil.Process(sproc.pid) |
michael@0 | 1802 | p.kill() |
michael@0 | 1803 | p.wait() |
michael@0 | 1804 | self.assertIn(str(sproc.pid), str(p)) |
michael@0 | 1805 | self.assertIn("terminated", str(p)) |
michael@0 | 1806 | |
michael@0 | 1807 | @unittest.skipIf(LINUX, 'PID 0 not available on Linux') |
michael@0 | 1808 | def test_pid_0(self): |
michael@0 | 1809 | # Process(0) is supposed to work on all platforms except Linux |
michael@0 | 1810 | p = psutil.Process(0) |
michael@0 | 1811 | self.assertTrue(p.name) |
michael@0 | 1812 | |
michael@0 | 1813 | if os.name == 'posix': |
michael@0 | 1814 | try: |
michael@0 | 1815 | self.assertEqual(p.uids.real, 0) |
michael@0 | 1816 | self.assertEqual(p.gids.real, 0) |
michael@0 | 1817 | except psutil.AccessDenied: |
michael@0 | 1818 | pass |
michael@0 | 1819 | |
michael@0 | 1820 | self.assertIn(p.ppid, (0, 1)) |
michael@0 | 1821 | #self.assertEqual(p.exe, "") |
michael@0 | 1822 | p.cmdline |
michael@0 | 1823 | try: |
michael@0 | 1824 | p.get_num_threads() |
michael@0 | 1825 | except psutil.AccessDenied: |
michael@0 | 1826 | pass |
michael@0 | 1827 | |
michael@0 | 1828 | try: |
michael@0 | 1829 | p.get_memory_info() |
michael@0 | 1830 | except psutil.AccessDenied: |
michael@0 | 1831 | pass |
michael@0 | 1832 | |
michael@0 | 1833 | # username property |
michael@0 | 1834 | try: |
michael@0 | 1835 | if POSIX: |
michael@0 | 1836 | self.assertEqual(p.username, 'root') |
michael@0 | 1837 | elif WINDOWS: |
michael@0 | 1838 | self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
michael@0 | 1839 | else: |
michael@0 | 1840 | p.username |
michael@0 | 1841 | except psutil.AccessDenied: |
michael@0 | 1842 | pass |
michael@0 | 1843 | |
michael@0 | 1844 | self.assertIn(0, psutil.get_pid_list()) |
michael@0 | 1845 | self.assertTrue(psutil.pid_exists(0)) |
michael@0 | 1846 | |
michael@0 | 1847 | def test__all__(self): |
michael@0 | 1848 | for name in dir(psutil): |
michael@0 | 1849 | if name in ('callable', 'defaultdict', 'error', 'namedtuple', |
michael@0 | 1850 | 'test'): |
michael@0 | 1851 | continue |
michael@0 | 1852 | if not name.startswith('_'): |
michael@0 | 1853 | try: |
michael@0 | 1854 | __import__(name) |
michael@0 | 1855 | except ImportError: |
michael@0 | 1856 | if name not in psutil.__all__: |
michael@0 | 1857 | fun = getattr(psutil, name) |
michael@0 | 1858 | if fun is None: |
michael@0 | 1859 | continue |
michael@0 | 1860 | if 'deprecated' not in fun.__doc__.lower(): |
michael@0 | 1861 | self.fail('%r not in psutil.__all__' % name) |
michael@0 | 1862 | |
michael@0 | 1863 | def test_Popen(self): |
michael@0 | 1864 | # Popen class test |
michael@0 | 1865 | # XXX this test causes a ResourceWarning on Python 3 because |
michael@0 | 1866 | # psutil.__subproc instance doesn't get propertly freed. |
michael@0 | 1867 | # Not sure what to do though. |
michael@0 | 1868 | cmd = [PYTHON, "-c", "import time; time.sleep(2);"] |
michael@0 | 1869 | proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
michael@0 | 1870 | try: |
michael@0 | 1871 | proc.name |
michael@0 | 1872 | proc.stdin |
michael@0 | 1873 | self.assertTrue(hasattr(proc, 'name')) |
michael@0 | 1874 | self.assertTrue(hasattr(proc, 'stdin')) |
michael@0 | 1875 | self.assertRaises(AttributeError, getattr, proc, 'foo') |
michael@0 | 1876 | finally: |
michael@0 | 1877 | proc.kill() |
michael@0 | 1878 | proc.wait() |
michael@0 | 1879 | |
michael@0 | 1880 | |
michael@0 | 1881 | # =================================================================== |
michael@0 | 1882 | # --- Featch all processes test |
michael@0 | 1883 | # =================================================================== |
michael@0 | 1884 | |
michael@0 | 1885 | class TestFetchAllProcesses(unittest.TestCase): |
michael@0 | 1886 | # Iterates over all running processes and performs some sanity |
michael@0 | 1887 | # checks against Process API's returned values. |
michael@0 | 1888 | |
michael@0 | 1889 | def setUp(self): |
michael@0 | 1890 | if POSIX: |
michael@0 | 1891 | import pwd |
michael@0 | 1892 | pall = pwd.getpwall() |
michael@0 | 1893 | self._uids = set([x.pw_uid for x in pall]) |
michael@0 | 1894 | self._usernames = set([x.pw_name for x in pall]) |
michael@0 | 1895 | |
michael@0 | 1896 | def test_fetch_all(self): |
michael@0 | 1897 | valid_procs = 0 |
michael@0 | 1898 | excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
michael@0 | 1899 | 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice', |
michael@0 | 1900 | 'parent', 'get_children', 'pid'] |
michael@0 | 1901 | attrs = [] |
michael@0 | 1902 | for name in dir(psutil.Process): |
michael@0 | 1903 | if name.startswith("_"): |
michael@0 | 1904 | continue |
michael@0 | 1905 | if name.startswith("set_"): |
michael@0 | 1906 | continue |
michael@0 | 1907 | if name in excluded_names: |
michael@0 | 1908 | continue |
michael@0 | 1909 | attrs.append(name) |
michael@0 | 1910 | |
michael@0 | 1911 | default = object() |
michael@0 | 1912 | failures = [] |
michael@0 | 1913 | for name in attrs: |
michael@0 | 1914 | for p in psutil.process_iter(): |
michael@0 | 1915 | ret = default |
michael@0 | 1916 | try: |
michael@0 | 1917 | try: |
michael@0 | 1918 | attr = getattr(p, name, None) |
michael@0 | 1919 | if attr is not None and callable(attr): |
michael@0 | 1920 | ret = attr() |
michael@0 | 1921 | else: |
michael@0 | 1922 | ret = attr |
michael@0 | 1923 | valid_procs += 1 |
michael@0 | 1924 | except NotImplementedError: |
michael@0 | 1925 | register_warning("%r was skipped because not " |
michael@0 | 1926 | "implemented" % (self.__class__.__name__ + \ |
michael@0 | 1927 | '.test_' + name)) |
michael@0 | 1928 | except (psutil.NoSuchProcess, psutil.AccessDenied): |
michael@0 | 1929 | err = sys.exc_info()[1] |
michael@0 | 1930 | if isinstance(err, psutil.NoSuchProcess): |
michael@0 | 1931 | if psutil.pid_exists(p.pid): |
michael@0 | 1932 | # XXX race condition; we probably need |
michael@0 | 1933 | # to try figuring out the process |
michael@0 | 1934 | # identity before failing |
michael@0 | 1935 | self.fail("PID still exists but fun raised " \ |
michael@0 | 1936 | "NoSuchProcess") |
michael@0 | 1937 | self.assertEqual(err.pid, p.pid) |
michael@0 | 1938 | if err.name: |
michael@0 | 1939 | # make sure exception's name attr is set |
michael@0 | 1940 | # with the actual process name |
michael@0 | 1941 | self.assertEqual(err.name, p.name) |
michael@0 | 1942 | self.assertTrue(str(err)) |
michael@0 | 1943 | self.assertTrue(err.msg) |
michael@0 | 1944 | else: |
michael@0 | 1945 | if ret not in (0, 0.0, [], None, ''): |
michael@0 | 1946 | assert ret, ret |
michael@0 | 1947 | meth = getattr(self, name) |
michael@0 | 1948 | meth(ret) |
michael@0 | 1949 | except Exception: |
michael@0 | 1950 | err = sys.exc_info()[1] |
michael@0 | 1951 | s = '\n' + '=' * 70 + '\n' |
michael@0 | 1952 | s += "FAIL: test_%s (proc=%s" % (name, p) |
michael@0 | 1953 | if ret != default: |
michael@0 | 1954 | s += ", ret=%s)" % repr(ret) |
michael@0 | 1955 | s += ')\n' |
michael@0 | 1956 | s += '-' * 70 |
michael@0 | 1957 | s += "\n%s" % traceback.format_exc() |
michael@0 | 1958 | s = "\n".join((" " * 4) + i for i in s.splitlines()) |
michael@0 | 1959 | failures.append(s) |
michael@0 | 1960 | break |
michael@0 | 1961 | |
michael@0 | 1962 | if failures: |
michael@0 | 1963 | self.fail(''.join(failures)) |
michael@0 | 1964 | |
michael@0 | 1965 | # we should always have a non-empty list, not including PID 0 etc. |
michael@0 | 1966 | # special cases. |
michael@0 | 1967 | self.assertTrue(valid_procs > 0) |
michael@0 | 1968 | |
michael@0 | 1969 | def cmdline(self, ret): |
michael@0 | 1970 | pass |
michael@0 | 1971 | |
michael@0 | 1972 | def exe(self, ret): |
michael@0 | 1973 | if not ret: |
michael@0 | 1974 | self.assertEqual(ret, '') |
michael@0 | 1975 | else: |
michael@0 | 1976 | assert os.path.isabs(ret), ret |
michael@0 | 1977 | # Note: os.stat() may return False even if the file is there |
michael@0 | 1978 | # hence we skip the test, see: |
michael@0 | 1979 | # http://stackoverflow.com/questions/3112546/os-path-exists-lies |
michael@0 | 1980 | if POSIX: |
michael@0 | 1981 | assert os.path.isfile(ret), ret |
michael@0 | 1982 | if hasattr(os, 'access') and hasattr(os, "X_OK"): |
michael@0 | 1983 | # XXX may fail on OSX |
michael@0 | 1984 | self.assertTrue(os.access(ret, os.X_OK)) |
michael@0 | 1985 | |
michael@0 | 1986 | def ppid(self, ret): |
michael@0 | 1987 | self.assertTrue(ret >= 0) |
michael@0 | 1988 | |
michael@0 | 1989 | def name(self, ret): |
michael@0 | 1990 | self.assertTrue(isinstance(ret, str)) |
michael@0 | 1991 | self.assertTrue(ret) |
michael@0 | 1992 | |
michael@0 | 1993 | def create_time(self, ret): |
michael@0 | 1994 | self.assertTrue(ret > 0) |
michael@0 | 1995 | # this can't be taken for granted on all platforms |
michael@0 | 1996 | #self.assertGreaterEqual(ret, psutil.BOOT_TIME) |
michael@0 | 1997 | # make sure returned value can be pretty printed |
michael@0 | 1998 | # with strftime |
michael@0 | 1999 | time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) |
michael@0 | 2000 | |
michael@0 | 2001 | def uids(self, ret): |
michael@0 | 2002 | for uid in ret: |
michael@0 | 2003 | self.assertTrue(uid >= 0) |
michael@0 | 2004 | self.assertIn(uid, self._uids) |
michael@0 | 2005 | |
michael@0 | 2006 | def gids(self, ret): |
michael@0 | 2007 | # note: testing all gids as above seems not to be reliable for |
michael@0 | 2008 | # gid == 30 (nodoby); not sure why. |
michael@0 | 2009 | for gid in ret: |
michael@0 | 2010 | self.assertTrue(gid >= 0) |
michael@0 | 2011 | #self.assertIn(uid, self.gids) |
michael@0 | 2012 | |
michael@0 | 2013 | def username(self, ret): |
michael@0 | 2014 | self.assertTrue(ret) |
michael@0 | 2015 | if os.name == 'posix': |
michael@0 | 2016 | self.assertIn(ret, self._usernames) |
michael@0 | 2017 | |
michael@0 | 2018 | def status(self, ret): |
michael@0 | 2019 | self.assertTrue(ret >= 0) |
michael@0 | 2020 | self.assertTrue(str(ret) != '?') |
michael@0 | 2021 | |
michael@0 | 2022 | def get_io_counters(self, ret): |
michael@0 | 2023 | for field in ret: |
michael@0 | 2024 | if field != -1: |
michael@0 | 2025 | self.assertTrue(field >= 0) |
michael@0 | 2026 | |
michael@0 | 2027 | def get_ionice(self, ret): |
michael@0 | 2028 | if LINUX: |
michael@0 | 2029 | self.assertTrue(ret.ioclass >= 0) |
michael@0 | 2030 | self.assertTrue(ret.value >= 0) |
michael@0 | 2031 | else: |
michael@0 | 2032 | self.assertTrue(ret >= 0) |
michael@0 | 2033 | self.assertIn(ret, (0, 1, 2)) |
michael@0 | 2034 | |
michael@0 | 2035 | def get_num_threads(self, ret): |
michael@0 | 2036 | self.assertTrue(ret >= 1) |
michael@0 | 2037 | |
michael@0 | 2038 | def get_threads(self, ret): |
michael@0 | 2039 | for t in ret: |
michael@0 | 2040 | self.assertTrue(t.id >= 0) |
michael@0 | 2041 | self.assertTrue(t.user_time >= 0) |
michael@0 | 2042 | self.assertTrue(t.system_time >= 0) |
michael@0 | 2043 | |
michael@0 | 2044 | def get_cpu_times(self, ret): |
michael@0 | 2045 | self.assertTrue(ret.user >= 0) |
michael@0 | 2046 | self.assertTrue(ret.system >= 0) |
michael@0 | 2047 | |
michael@0 | 2048 | def get_memory_info(self, ret): |
michael@0 | 2049 | self.assertTrue(ret.rss >= 0) |
michael@0 | 2050 | self.assertTrue(ret.vms >= 0) |
michael@0 | 2051 | |
michael@0 | 2052 | def get_ext_memory_info(self, ret): |
michael@0 | 2053 | for name in ret._fields: |
michael@0 | 2054 | self.assertTrue(getattr(ret, name) >= 0) |
michael@0 | 2055 | if POSIX and ret.vms != 0: |
michael@0 | 2056 | # VMS is always supposed to be the highest |
michael@0 | 2057 | for name in ret._fields: |
michael@0 | 2058 | if name != 'vms': |
michael@0 | 2059 | value = getattr(ret, name) |
michael@0 | 2060 | assert ret.vms > value, ret |
michael@0 | 2061 | elif WINDOWS: |
michael@0 | 2062 | assert ret.peak_wset >= ret.wset, ret |
michael@0 | 2063 | assert ret.peak_paged_pool >= ret.paged_pool, ret |
michael@0 | 2064 | assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret |
michael@0 | 2065 | assert ret.peak_pagefile >= ret.pagefile, ret |
michael@0 | 2066 | |
michael@0 | 2067 | def get_open_files(self, ret): |
michael@0 | 2068 | for f in ret: |
michael@0 | 2069 | if WINDOWS: |
michael@0 | 2070 | assert f.fd == -1, f |
michael@0 | 2071 | else: |
michael@0 | 2072 | self.assertIsInstance(f.fd, int) |
michael@0 | 2073 | assert os.path.isabs(f.path), f |
michael@0 | 2074 | assert os.path.isfile(f.path), f |
michael@0 | 2075 | |
michael@0 | 2076 | def get_num_fds(self, ret): |
michael@0 | 2077 | self.assertTrue(ret >= 0) |
michael@0 | 2078 | |
michael@0 | 2079 | def get_connections(self, ret): |
michael@0 | 2080 | # all values are supposed to match Linux's tcp_states.h states |
michael@0 | 2081 | # table across all platforms. |
michael@0 | 2082 | valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
michael@0 | 2083 | "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
michael@0 | 2084 | "LAST_ACK", "LISTEN", "CLOSING", "NONE"] |
michael@0 | 2085 | if SUNOS: |
michael@0 | 2086 | valid_conn_states += ["IDLE", "BOUND"] |
michael@0 | 2087 | if WINDOWS: |
michael@0 | 2088 | valid_conn_states += ["DELETE_TCB"] |
michael@0 | 2089 | for conn in ret: |
michael@0 | 2090 | self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM)) |
michael@0 | 2091 | self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6)) |
michael@0 | 2092 | check_ip_address(conn.laddr, conn.family) |
michael@0 | 2093 | check_ip_address(conn.raddr, conn.family) |
michael@0 | 2094 | if conn.status not in valid_conn_states: |
michael@0 | 2095 | self.fail("%s is not a valid status" % conn.status) |
michael@0 | 2096 | # actually try to bind the local socket; ignore IPv6 |
michael@0 | 2097 | # sockets as their address might be represented as |
michael@0 | 2098 | # an IPv4-mapped-address (e.g. "::127.0.0.1") |
michael@0 | 2099 | # and that's rejected by bind() |
michael@0 | 2100 | if conn.family == socket.AF_INET: |
michael@0 | 2101 | s = socket.socket(conn.family, conn.type) |
michael@0 | 2102 | s.bind((conn.laddr[0], 0)) |
michael@0 | 2103 | s.close() |
michael@0 | 2104 | |
michael@0 | 2105 | if not WINDOWS and hasattr(socket, 'fromfd'): |
michael@0 | 2106 | dupsock = None |
michael@0 | 2107 | try: |
michael@0 | 2108 | try: |
michael@0 | 2109 | dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
michael@0 | 2110 | except (socket.error, OSError): |
michael@0 | 2111 | err = sys.exc_info()[1] |
michael@0 | 2112 | if err.args[0] == errno.EBADF: |
michael@0 | 2113 | continue |
michael@0 | 2114 | raise |
michael@0 | 2115 | # python >= 2.5 |
michael@0 | 2116 | if hasattr(dupsock, "family"): |
michael@0 | 2117 | self.assertEqual(dupsock.family, conn.family) |
michael@0 | 2118 | self.assertEqual(dupsock.type, conn.type) |
michael@0 | 2119 | finally: |
michael@0 | 2120 | if dupsock is not None: |
michael@0 | 2121 | dupsock.close() |
michael@0 | 2122 | |
michael@0 | 2123 | def getcwd(self, ret): |
michael@0 | 2124 | if ret is not None: # BSD may return None |
michael@0 | 2125 | assert os.path.isabs(ret), ret |
michael@0 | 2126 | try: |
michael@0 | 2127 | st = os.stat(ret) |
michael@0 | 2128 | except OSError: |
michael@0 | 2129 | err = sys.exc_info()[1] |
michael@0 | 2130 | # directory has been removed in mean time |
michael@0 | 2131 | if err.errno != errno.ENOENT: |
michael@0 | 2132 | raise |
michael@0 | 2133 | else: |
michael@0 | 2134 | self.assertTrue(stat.S_ISDIR(st.st_mode)) |
michael@0 | 2135 | |
michael@0 | 2136 | def get_memory_percent(self, ret): |
michael@0 | 2137 | assert 0 <= ret <= 100, ret |
michael@0 | 2138 | |
michael@0 | 2139 | def is_running(self, ret): |
michael@0 | 2140 | self.assertTrue(ret) |
michael@0 | 2141 | |
michael@0 | 2142 | def get_cpu_affinity(self, ret): |
michael@0 | 2143 | assert ret != [], ret |
michael@0 | 2144 | |
michael@0 | 2145 | def terminal(self, ret): |
michael@0 | 2146 | if ret is not None: |
michael@0 | 2147 | assert os.path.isabs(ret), ret |
michael@0 | 2148 | assert os.path.exists(ret), ret |
michael@0 | 2149 | |
michael@0 | 2150 | def get_memory_maps(self, ret): |
michael@0 | 2151 | for nt in ret: |
michael@0 | 2152 | for fname in nt._fields: |
michael@0 | 2153 | value = getattr(nt, fname) |
michael@0 | 2154 | if fname == 'path': |
michael@0 | 2155 | if not value.startswith('['): |
michael@0 | 2156 | assert os.path.isabs(nt.path), nt.path |
michael@0 | 2157 | # commented as on Linux we might get '/foo/bar (deleted)' |
michael@0 | 2158 | #assert os.path.exists(nt.path), nt.path |
michael@0 | 2159 | elif fname in ('addr', 'perms'): |
michael@0 | 2160 | self.assertTrue(value) |
michael@0 | 2161 | else: |
michael@0 | 2162 | self.assertIsInstance(value, (int, long)) |
michael@0 | 2163 | assert value >= 0, value |
michael@0 | 2164 | |
michael@0 | 2165 | def get_num_handles(self, ret): |
michael@0 | 2166 | if WINDOWS: |
michael@0 | 2167 | self.assertGreaterEqual(ret, 0) |
michael@0 | 2168 | else: |
michael@0 | 2169 | self.assertGreaterEqual(ret, 0) |
michael@0 | 2170 | |
michael@0 | 2171 | def get_nice(self, ret): |
michael@0 | 2172 | if POSIX: |
michael@0 | 2173 | assert -20 <= ret <= 20, ret |
michael@0 | 2174 | else: |
michael@0 | 2175 | priorities = [getattr(psutil, x) for x in dir(psutil) |
michael@0 | 2176 | if x.endswith('_PRIORITY_CLASS')] |
michael@0 | 2177 | self.assertIn(ret, priorities) |
michael@0 | 2178 | |
michael@0 | 2179 | def get_num_ctx_switches(self, ret): |
michael@0 | 2180 | self.assertTrue(ret.voluntary >= 0) |
michael@0 | 2181 | self.assertTrue(ret.involuntary >= 0) |
michael@0 | 2182 | |
michael@0 | 2183 | |
michael@0 | 2184 | # =================================================================== |
michael@0 | 2185 | # --- Limited user tests |
michael@0 | 2186 | # =================================================================== |
michael@0 | 2187 | |
michael@0 | 2188 | if hasattr(os, 'getuid') and os.getuid() == 0: |
michael@0 | 2189 | class LimitedUserTestCase(TestProcess): |
michael@0 | 2190 | """Repeat the previous tests by using a limited user. |
michael@0 | 2191 | Executed only on UNIX and only if the user who run the test script |
michael@0 | 2192 | is root. |
michael@0 | 2193 | """ |
michael@0 | 2194 | # the uid/gid the test suite runs under |
michael@0 | 2195 | PROCESS_UID = os.getuid() |
michael@0 | 2196 | PROCESS_GID = os.getgid() |
michael@0 | 2197 | |
michael@0 | 2198 | def __init__(self, *args, **kwargs): |
michael@0 | 2199 | TestProcess.__init__(self, *args, **kwargs) |
michael@0 | 2200 | # re-define all existent test methods in order to |
michael@0 | 2201 | # ignore AccessDenied exceptions |
michael@0 | 2202 | for attr in [x for x in dir(self) if x.startswith('test')]: |
michael@0 | 2203 | meth = getattr(self, attr) |
michael@0 | 2204 | def test_(self): |
michael@0 | 2205 | try: |
michael@0 | 2206 | meth() |
michael@0 | 2207 | except psutil.AccessDenied: |
michael@0 | 2208 | pass |
michael@0 | 2209 | setattr(self, attr, types.MethodType(test_, self)) |
michael@0 | 2210 | |
michael@0 | 2211 | def setUp(self): |
michael@0 | 2212 | os.setegid(1000) |
michael@0 | 2213 | os.seteuid(1000) |
michael@0 | 2214 | TestProcess.setUp(self) |
michael@0 | 2215 | |
michael@0 | 2216 | def tearDown(self): |
michael@0 | 2217 | os.setegid(self.PROCESS_UID) |
michael@0 | 2218 | os.seteuid(self.PROCESS_GID) |
michael@0 | 2219 | TestProcess.tearDown(self) |
michael@0 | 2220 | |
michael@0 | 2221 | def test_nice(self): |
michael@0 | 2222 | try: |
michael@0 | 2223 | psutil.Process(os.getpid()).set_nice(-1) |
michael@0 | 2224 | except psutil.AccessDenied: |
michael@0 | 2225 | pass |
michael@0 | 2226 | else: |
michael@0 | 2227 | self.fail("exception not raised") |
michael@0 | 2228 | |
michael@0 | 2229 | |
michael@0 | 2230 | # =================================================================== |
michael@0 | 2231 | # --- Example script tests |
michael@0 | 2232 | # =================================================================== |
michael@0 | 2233 | |
michael@0 | 2234 | class TestExampleScripts(unittest.TestCase): |
michael@0 | 2235 | """Tests for scripts in the examples directory.""" |
michael@0 | 2236 | |
michael@0 | 2237 | def assert_stdout(self, exe, args=None): |
michael@0 | 2238 | exe = os.path.join(EXAMPLES_DIR, exe) |
michael@0 | 2239 | if args: |
michael@0 | 2240 | exe = exe + ' ' + args |
michael@0 | 2241 | try: |
michael@0 | 2242 | out = sh(sys.executable + ' ' + exe).strip() |
michael@0 | 2243 | except RuntimeError: |
michael@0 | 2244 | err = sys.exc_info()[1] |
michael@0 | 2245 | if 'AccessDenied' in str(err): |
michael@0 | 2246 | return str(err) |
michael@0 | 2247 | else: |
michael@0 | 2248 | raise |
michael@0 | 2249 | assert out, out |
michael@0 | 2250 | return out |
michael@0 | 2251 | |
michael@0 | 2252 | def assert_syntax(self, exe, args=None): |
michael@0 | 2253 | exe = os.path.join(EXAMPLES_DIR, exe) |
michael@0 | 2254 | f = open(exe, 'r') |
michael@0 | 2255 | try: |
michael@0 | 2256 | src = f.read() |
michael@0 | 2257 | finally: |
michael@0 | 2258 | f.close() |
michael@0 | 2259 | ast.parse(src) |
michael@0 | 2260 | |
michael@0 | 2261 | def test_check_presence(self): |
michael@0 | 2262 | # make sure all example scripts have a test method defined |
michael@0 | 2263 | meths = dir(self) |
michael@0 | 2264 | for name in os.listdir(EXAMPLES_DIR): |
michael@0 | 2265 | if name.endswith('.py'): |
michael@0 | 2266 | if 'test_' + os.path.splitext(name)[0] not in meths: |
michael@0 | 2267 | #self.assert_stdout(name) |
michael@0 | 2268 | self.fail('no test defined for %r script' \ |
michael@0 | 2269 | % os.path.join(EXAMPLES_DIR, name)) |
michael@0 | 2270 | |
michael@0 | 2271 | def test_disk_usage(self): |
michael@0 | 2272 | self.assert_stdout('disk_usage.py') |
michael@0 | 2273 | |
michael@0 | 2274 | def test_free(self): |
michael@0 | 2275 | self.assert_stdout('free.py') |
michael@0 | 2276 | |
michael@0 | 2277 | def test_meminfo(self): |
michael@0 | 2278 | self.assert_stdout('meminfo.py') |
michael@0 | 2279 | |
michael@0 | 2280 | def test_process_detail(self): |
michael@0 | 2281 | self.assert_stdout('process_detail.py') |
michael@0 | 2282 | |
michael@0 | 2283 | def test_who(self): |
michael@0 | 2284 | self.assert_stdout('who.py') |
michael@0 | 2285 | |
michael@0 | 2286 | def test_netstat(self): |
michael@0 | 2287 | self.assert_stdout('netstat.py') |
michael@0 | 2288 | |
michael@0 | 2289 | def test_pmap(self): |
michael@0 | 2290 | self.assert_stdout('pmap.py', args=str(os.getpid())) |
michael@0 | 2291 | |
michael@0 | 2292 | @unittest.skipIf(ast is None, |
michael@0 | 2293 | 'ast module not available on this python version') |
michael@0 | 2294 | def test_killall(self): |
michael@0 | 2295 | self.assert_syntax('killall.py') |
michael@0 | 2296 | |
michael@0 | 2297 | @unittest.skipIf(ast is None, |
michael@0 | 2298 | 'ast module not available on this python version') |
michael@0 | 2299 | def test_nettop(self): |
michael@0 | 2300 | self.assert_syntax('nettop.py') |
michael@0 | 2301 | |
michael@0 | 2302 | @unittest.skipIf(ast is None, |
michael@0 | 2303 | 'ast module not available on this python version') |
michael@0 | 2304 | def test_top(self): |
michael@0 | 2305 | self.assert_syntax('top.py') |
michael@0 | 2306 | |
michael@0 | 2307 | @unittest.skipIf(ast is None, |
michael@0 | 2308 | 'ast module not available on this python version') |
michael@0 | 2309 | def test_iotop(self): |
michael@0 | 2310 | self.assert_syntax('iotop.py') |
michael@0 | 2311 | |
michael@0 | 2312 | |
michael@0 | 2313 | def cleanup(): |
michael@0 | 2314 | reap_children(search_all=True) |
michael@0 | 2315 | DEVNULL.close() |
michael@0 | 2316 | safe_remove(TESTFN) |
michael@0 | 2317 | |
michael@0 | 2318 | atexit.register(cleanup) |
michael@0 | 2319 | safe_remove(TESTFN) |
michael@0 | 2320 | |
michael@0 | 2321 | def test_main(): |
michael@0 | 2322 | tests = [] |
michael@0 | 2323 | test_suite = unittest.TestSuite() |
michael@0 | 2324 | tests.append(TestSystemAPIs) |
michael@0 | 2325 | tests.append(TestProcess) |
michael@0 | 2326 | tests.append(TestFetchAllProcesses) |
michael@0 | 2327 | |
michael@0 | 2328 | if POSIX: |
michael@0 | 2329 | from _posix import PosixSpecificTestCase |
michael@0 | 2330 | tests.append(PosixSpecificTestCase) |
michael@0 | 2331 | |
michael@0 | 2332 | # import the specific platform test suite |
michael@0 | 2333 | if LINUX: |
michael@0 | 2334 | from _linux import LinuxSpecificTestCase as stc |
michael@0 | 2335 | elif WINDOWS: |
michael@0 | 2336 | from _windows import WindowsSpecificTestCase as stc |
michael@0 | 2337 | from _windows import TestDualProcessImplementation |
michael@0 | 2338 | tests.append(TestDualProcessImplementation) |
michael@0 | 2339 | elif OSX: |
michael@0 | 2340 | from _osx import OSXSpecificTestCase as stc |
michael@0 | 2341 | elif BSD: |
michael@0 | 2342 | from _bsd import BSDSpecificTestCase as stc |
michael@0 | 2343 | elif SUNOS: |
michael@0 | 2344 | from _sunos import SunOSSpecificTestCase as stc |
michael@0 | 2345 | tests.append(stc) |
michael@0 | 2346 | |
michael@0 | 2347 | if hasattr(os, 'getuid'): |
michael@0 | 2348 | if 'LimitedUserTestCase' in globals(): |
michael@0 | 2349 | tests.append(LimitedUserTestCase) |
michael@0 | 2350 | else: |
michael@0 | 2351 | register_warning("LimitedUserTestCase was skipped (super-user " |
michael@0 | 2352 | "privileges are required)") |
michael@0 | 2353 | |
michael@0 | 2354 | tests.append(TestExampleScripts) |
michael@0 | 2355 | |
michael@0 | 2356 | for test_class in tests: |
michael@0 | 2357 | test_suite.addTest(unittest.makeSuite(test_class)) |
michael@0 | 2358 | result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
michael@0 | 2359 | return result.wasSuccessful() |
michael@0 | 2360 | |
michael@0 | 2361 | if __name__ == '__main__': |
michael@0 | 2362 | if not test_main(): |
michael@0 | 2363 | sys.exit(1) |