python/psutil/test/test_psutil.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 #!/usr/bin/env python
michael@0 2
michael@0 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
michael@0 4 # Use of this source code is governed by a BSD-style license that can be
michael@0 5 # found in the LICENSE file.
michael@0 6
michael@0 7 """
michael@0 8 psutil test suite (you can quickly run it with "python setup.py test").
michael@0 9
michael@0 10 Note: this is targeted for both python 2.x and 3.x so there's no need
michael@0 11 to use 2to3 tool first.
michael@0 12
michael@0 13 If you're on Python < 2.7 it is recommended to install unittest2 module
michael@0 14 from: https://pypi.python.org/pypi/unittest2
michael@0 15 """
michael@0 16
michael@0 17 from __future__ import division
michael@0 18 import os
michael@0 19 import sys
michael@0 20 import subprocess
michael@0 21 import time
michael@0 22 import signal
michael@0 23 import types
michael@0 24 import traceback
michael@0 25 import socket
michael@0 26 import warnings
michael@0 27 import atexit
michael@0 28 import errno
michael@0 29 import threading
michael@0 30 import tempfile
michael@0 31 import stat
michael@0 32 import collections
michael@0 33 import datetime
michael@0 34 try:
michael@0 35 import unittest2 as unittest # pyhon < 2.7 + unittest2 installed
michael@0 36 except ImportError:
michael@0 37 import unittest
michael@0 38 try:
michael@0 39 import ast # python >= 2.6
michael@0 40 except ImportError:
michael@0 41 ast = None
michael@0 42
michael@0 43 import psutil
michael@0 44 from psutil._compat import PY3, callable, long, wraps
michael@0 45
michael@0 46
michael@0 47 # ===================================================================
michael@0 48 # --- Constants
michael@0 49 # ===================================================================
michael@0 50
michael@0 51 # conf for retry_before_failing() decorator
michael@0 52 NO_RETRIES = 10
michael@0 53 # bytes tolerance for OS memory related tests
michael@0 54 TOLERANCE = 500 * 1024 # 500KB
michael@0 55
michael@0 56 PYTHON = os.path.realpath(sys.executable)
michael@0 57 DEVNULL = open(os.devnull, 'r+')
michael@0 58 TESTFN = os.path.join(os.getcwd(), "$testfile")
michael@0 59 EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(
michael@0 60 os.path.dirname(__file__)), 'examples'))
michael@0 61 POSIX = os.name == 'posix'
michael@0 62 LINUX = sys.platform.startswith("linux")
michael@0 63 WINDOWS = sys.platform.startswith("win32")
michael@0 64 OSX = sys.platform.startswith("darwin")
michael@0 65 BSD = sys.platform.startswith("freebsd")
michael@0 66 SUNOS = sys.platform.startswith("sunos")
michael@0 67
michael@0 68
michael@0 69 # ===================================================================
michael@0 70 # --- Utility functions
michael@0 71 # ===================================================================
michael@0 72
michael@0 73 _subprocesses_started = set()
michael@0 74
michael@0 75 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL,
michael@0 76 wait=False):
michael@0 77 """Return a subprocess.Popen object to use in tests.
michael@0 78 By default stdout and stderr are redirected to /dev/null and the
michael@0 79 python interpreter is used as test process.
michael@0 80 If 'wait' is True attemps to make sure the process is in a
michael@0 81 reasonably initialized state.
michael@0 82 """
michael@0 83 if cmd is None:
michael@0 84 pyline = ""
michael@0 85 if wait:
michael@0 86 pyline += "open(r'%s', 'w'); " % TESTFN
michael@0 87 pyline += "import time; time.sleep(2);"
michael@0 88 cmd_ = [PYTHON, "-c", pyline]
michael@0 89 else:
michael@0 90 cmd_ = cmd
michael@0 91 sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
michael@0 92 if wait:
michael@0 93 if cmd is None:
michael@0 94 stop_at = time.time() + 3
michael@0 95 while stop_at > time.time():
michael@0 96 if os.path.exists(TESTFN):
michael@0 97 break
michael@0 98 time.sleep(0.001)
michael@0 99 else:
michael@0 100 warn("couldn't make sure test file was actually created")
michael@0 101 else:
michael@0 102 wait_for_pid(sproc.pid)
michael@0 103 _subprocesses_started.add(sproc.pid)
michael@0 104 return sproc
michael@0 105
michael@0 106 def warn(msg):
michael@0 107 """Raise a warning msg."""
michael@0 108 warnings.warn(msg, UserWarning)
michael@0 109
michael@0 110 def register_warning(msg):
michael@0 111 """Register a warning which will be printed on interpreter exit."""
michael@0 112 atexit.register(lambda: warn(msg))
michael@0 113
michael@0 114 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
michael@0 115 """run cmd in a subprocess and return its output.
michael@0 116 raises RuntimeError on error.
michael@0 117 """
michael@0 118 p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
michael@0 119 stdout, stderr = p.communicate()
michael@0 120 if p.returncode != 0:
michael@0 121 raise RuntimeError(stderr)
michael@0 122 if stderr:
michael@0 123 warn(stderr)
michael@0 124 if PY3:
michael@0 125 stdout = str(stdout, sys.stdout.encoding)
michael@0 126 return stdout.strip()
michael@0 127
michael@0 128 def which(program):
michael@0 129 """Same as UNIX which command. Return None on command not found."""
michael@0 130 def is_exe(fpath):
michael@0 131 return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
michael@0 132
michael@0 133 fpath, fname = os.path.split(program)
michael@0 134 if fpath:
michael@0 135 if is_exe(program):
michael@0 136 return program
michael@0 137 else:
michael@0 138 for path in os.environ["PATH"].split(os.pathsep):
michael@0 139 exe_file = os.path.join(path, program)
michael@0 140 if is_exe(exe_file):
michael@0 141 return exe_file
michael@0 142 return None
michael@0 143
michael@0 144 def wait_for_pid(pid, timeout=1):
michael@0 145 """Wait for pid to show up in the process list then return.
michael@0 146 Used in the test suite to give time the sub process to initialize.
michael@0 147 """
michael@0 148 raise_at = time.time() + timeout
michael@0 149 while 1:
michael@0 150 if pid in psutil.get_pid_list():
michael@0 151 # give it one more iteration to allow full initialization
michael@0 152 time.sleep(0.01)
michael@0 153 return
michael@0 154 time.sleep(0.0001)
michael@0 155 if time.time() >= raise_at:
michael@0 156 raise RuntimeError("Timed out")
michael@0 157
michael@0 158 def reap_children(search_all=False):
michael@0 159 """Kill any subprocess started by this test suite and ensure that
michael@0 160 no zombies stick around to hog resources and create problems when
michael@0 161 looking for refleaks.
michael@0 162 """
michael@0 163 pids = _subprocesses_started
michael@0 164 if search_all:
michael@0 165 this_process = psutil.Process(os.getpid())
michael@0 166 for p in this_process.get_children(recursive=True):
michael@0 167 pids.add(p.pid)
michael@0 168 while pids:
michael@0 169 pid = pids.pop()
michael@0 170 try:
michael@0 171 child = psutil.Process(pid)
michael@0 172 child.kill()
michael@0 173 except psutil.NoSuchProcess:
michael@0 174 pass
michael@0 175 except psutil.AccessDenied:
michael@0 176 warn("couldn't kill child process with pid %s" % pid)
michael@0 177 else:
michael@0 178 child.wait(timeout=3)
michael@0 179
michael@0 180 def check_ip_address(addr, family):
michael@0 181 """Attempts to check IP address's validity."""
michael@0 182 if not addr:
michael@0 183 return
michael@0 184 ip, port = addr
michael@0 185 assert isinstance(port, int), port
michael@0 186 if family == socket.AF_INET:
michael@0 187 ip = list(map(int, ip.split('.')))
michael@0 188 assert len(ip) == 4, ip
michael@0 189 for num in ip:
michael@0 190 assert 0 <= num <= 255, ip
michael@0 191 assert 0 <= port <= 65535, port
michael@0 192
michael@0 193 def safe_remove(fname):
michael@0 194 """Deletes a file and does not exception if it doesn't exist."""
michael@0 195 try:
michael@0 196 os.remove(fname)
michael@0 197 except OSError:
michael@0 198 err = sys.exc_info()[1]
michael@0 199 if err.args[0] != errno.ENOENT:
michael@0 200 raise
michael@0 201
michael@0 202 def call_until(fun, expr, timeout=1):
michael@0 203 """Keep calling function for timeout secs and exit if eval()
michael@0 204 expression is True.
michael@0 205 """
michael@0 206 stop_at = time.time() + timeout
michael@0 207 while time.time() < stop_at:
michael@0 208 ret = fun()
michael@0 209 if eval(expr):
michael@0 210 return ret
michael@0 211 time.sleep(0.001)
michael@0 212 raise RuntimeError('timed out (ret=%r)' % ret)
michael@0 213
michael@0 214 def retry_before_failing(ntimes=None):
michael@0 215 """Decorator which runs a test function and retries N times before
michael@0 216 actually failing.
michael@0 217 """
michael@0 218 def decorator(fun):
michael@0 219 @wraps(fun)
michael@0 220 def wrapper(*args, **kwargs):
michael@0 221 for x in range(ntimes or NO_RETRIES):
michael@0 222 try:
michael@0 223 return fun(*args, **kwargs)
michael@0 224 except AssertionError:
michael@0 225 err = sys.exc_info()[1]
michael@0 226 raise
michael@0 227 return wrapper
michael@0 228 return decorator
michael@0 229
michael@0 230 def skip_on_access_denied(only_if=None):
michael@0 231 """Decorator to Ignore AccessDenied exceptions."""
michael@0 232 def decorator(fun):
michael@0 233 @wraps(fun)
michael@0 234 def wrapper(*args, **kwargs):
michael@0 235 try:
michael@0 236 return fun(*args, **kwargs)
michael@0 237 except psutil.AccessDenied:
michael@0 238 if only_if is not None:
michael@0 239 if not only_if:
michael@0 240 raise
michael@0 241 msg = "%r was skipped because it raised AccessDenied" \
michael@0 242 % fun.__name__
michael@0 243 self = args[0]
michael@0 244 if hasattr(self, 'skip'): # python >= 2.7
michael@0 245 self.skip(msg)
michael@0 246 else:
michael@0 247 register_warning(msg)
michael@0 248 return wrapper
michael@0 249 return decorator
michael@0 250
michael@0 251 def skip_on_not_implemented(only_if=None):
michael@0 252 """Decorator to Ignore NotImplementedError exceptions."""
michael@0 253 def decorator(fun):
michael@0 254 @wraps(fun)
michael@0 255 def wrapper(*args, **kwargs):
michael@0 256 try:
michael@0 257 return fun(*args, **kwargs)
michael@0 258 except NotImplementedError:
michael@0 259 if only_if is not None:
michael@0 260 if not only_if:
michael@0 261 raise
michael@0 262 msg = "%r was skipped because it raised NotImplementedError" \
michael@0 263 % fun.__name__
michael@0 264 self = args[0]
michael@0 265 if hasattr(self, 'skip'): # python >= 2.7
michael@0 266 self.skip(msg)
michael@0 267 else:
michael@0 268 register_warning(msg)
michael@0 269 return wrapper
michael@0 270 return decorator
michael@0 271
michael@0 272 def supports_ipv6():
michael@0 273 """Return True if IPv6 is supported on this platform."""
michael@0 274 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
michael@0 275 return False
michael@0 276 sock = None
michael@0 277 try:
michael@0 278 try:
michael@0 279 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
michael@0 280 sock.bind(("::1", 0))
michael@0 281 except (socket.error, socket.gaierror):
michael@0 282 return False
michael@0 283 else:
michael@0 284 return True
michael@0 285 finally:
michael@0 286 if sock is not None:
michael@0 287 sock.close()
michael@0 288
michael@0 289
michael@0 290 class ThreadTask(threading.Thread):
michael@0 291 """A thread object used for running process thread tests."""
michael@0 292
michael@0 293 def __init__(self):
michael@0 294 threading.Thread.__init__(self)
michael@0 295 self._running = False
michael@0 296 self._interval = None
michael@0 297 self._flag = threading.Event()
michael@0 298
michael@0 299 def __repr__(self):
michael@0 300 name = self.__class__.__name__
michael@0 301 return '<%s running=%s at %#x>' % (name, self._running, id(self))
michael@0 302
michael@0 303 def start(self, interval=0.001):
michael@0 304 """Start thread and keep it running until an explicit
michael@0 305 stop() request. Polls for shutdown every 'timeout' seconds.
michael@0 306 """
michael@0 307 if self._running:
michael@0 308 raise ValueError("already started")
michael@0 309 self._interval = interval
michael@0 310 threading.Thread.start(self)
michael@0 311 self._flag.wait()
michael@0 312
michael@0 313 def run(self):
michael@0 314 self._running = True
michael@0 315 self._flag.set()
michael@0 316 while self._running:
michael@0 317 time.sleep(self._interval)
michael@0 318
michael@0 319 def stop(self):
michael@0 320 """Stop thread execution and and waits until it is stopped."""
michael@0 321 if not self._running:
michael@0 322 raise ValueError("already stopped")
michael@0 323 self._running = False
michael@0 324 self.join()
michael@0 325
michael@0 326
michael@0 327 # ===================================================================
michael@0 328 # --- Support for python < 2.7 in case unittest2 is not installed
michael@0 329 # ===================================================================
michael@0 330
michael@0 331 if not hasattr(unittest, 'skip'):
michael@0 332 register_warning("unittest2 module is not installed; a serie of pretty " \
michael@0 333 "darn ugly workarounds will be used")
michael@0 334
michael@0 335 class SkipTest(Exception):
michael@0 336 pass
michael@0 337
michael@0 338 class TestCase(unittest.TestCase):
michael@0 339
michael@0 340 def _safe_repr(self, obj):
michael@0 341 MAX_LENGTH = 80
michael@0 342 try:
michael@0 343 result = repr(obj)
michael@0 344 except Exception:
michael@0 345 result = object.__repr__(obj)
michael@0 346 if len(result) < MAX_LENGTH:
michael@0 347 return result
michael@0 348 return result[:MAX_LENGTH] + ' [truncated]...'
michael@0 349
michael@0 350 def _fail_w_msg(self, a, b, middle, msg):
michael@0 351 self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle,
michael@0 352 self._safe_repr(b)))
michael@0 353
michael@0 354 def skip(self, msg):
michael@0 355 raise SkipTest(msg)
michael@0 356
michael@0 357 def assertIn(self, a, b, msg=None):
michael@0 358 if a not in b:
michael@0 359 self._fail_w_msg(a, b, 'not found in', msg)
michael@0 360
michael@0 361 def assertNotIn(self, a, b, msg=None):
michael@0 362 if a in b:
michael@0 363 self._fail_w_msg(a, b, 'found in', msg)
michael@0 364
michael@0 365 def assertGreater(self, a, b, msg=None):
michael@0 366 if not a > b:
michael@0 367 self._fail_w_msg(a, b, 'not greater than', msg)
michael@0 368
michael@0 369 def assertGreaterEqual(self, a, b, msg=None):
michael@0 370 if not a >= b:
michael@0 371 self._fail_w_msg(a, b, 'not greater than or equal to', msg)
michael@0 372
michael@0 373 def assertLess(self, a, b, msg=None):
michael@0 374 if not a < b:
michael@0 375 self._fail_w_msg(a, b, 'not less than', msg)
michael@0 376
michael@0 377 def assertLessEqual(self, a, b, msg=None):
michael@0 378 if not a <= b:
michael@0 379 self._fail_w_msg(a, b, 'not less or equal to', msg)
michael@0 380
michael@0 381 def assertIsInstance(self, a, b, msg=None):
michael@0 382 if not isinstance(a, b):
michael@0 383 self.fail(msg or '%s is not an instance of %r' \
michael@0 384 % (self._safe_repr(a), b))
michael@0 385
michael@0 386 def assertAlmostEqual(self, a, b, msg=None, delta=None):
michael@0 387 if delta is not None:
michael@0 388 if abs(a - b) <= delta:
michael@0 389 return
michael@0 390 self.fail(msg or '%s != %s within %s delta' \
michael@0 391 % (self._safe_repr(a), self._safe_repr(b),
michael@0 392 self._safe_repr(delta)))
michael@0 393 else:
michael@0 394 self.assertEqual(a, b, msg=msg)
michael@0 395
michael@0 396
michael@0 397 def skipIf(condition, reason):
michael@0 398 def decorator(fun):
michael@0 399 @wraps(fun)
michael@0 400 def wrapper(*args, **kwargs):
michael@0 401 self = args[0]
michael@0 402 if condition:
michael@0 403 sys.stdout.write("skipped-")
michael@0 404 sys.stdout.flush()
michael@0 405 if warn:
michael@0 406 objname = "%s.%s" % (self.__class__.__name__,
michael@0 407 fun.__name__)
michael@0 408 msg = "%s was skipped" % objname
michael@0 409 if reason:
michael@0 410 msg += "; reason: " + repr(reason)
michael@0 411 register_warning(msg)
michael@0 412 return
michael@0 413 else:
michael@0 414 return fun(*args, **kwargs)
michael@0 415 return wrapper
michael@0 416 return decorator
michael@0 417
michael@0 418 def skipUnless(condition, reason):
michael@0 419 if not condition:
michael@0 420 return unittest.skipIf(True, reason)
michael@0 421 return unittest.skipIf(False, reason)
michael@0 422
michael@0 423 unittest.TestCase = TestCase
michael@0 424 unittest.skipIf = skipIf
michael@0 425 unittest.skipUnless = skipUnless
michael@0 426 del TestCase, skipIf, skipUnless
michael@0 427
michael@0 428
michael@0 429 # ===================================================================
michael@0 430 # --- System-related API tests
michael@0 431 # ===================================================================
michael@0 432
michael@0 433 class TestSystemAPIs(unittest.TestCase):
michael@0 434 """Tests for system-related APIs."""
michael@0 435
michael@0 436 def setUp(self):
michael@0 437 safe_remove(TESTFN)
michael@0 438
michael@0 439 def tearDown(self):
michael@0 440 reap_children()
michael@0 441
michael@0 442 def test_process_iter(self):
michael@0 443 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
michael@0 444 sproc = get_test_subprocess()
michael@0 445 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
michael@0 446 p = psutil.Process(sproc.pid)
michael@0 447 p.kill()
michael@0 448 p.wait()
michael@0 449 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
michael@0 450
michael@0 451 def test_TOTAL_PHYMEM(self):
michael@0 452 x = psutil.TOTAL_PHYMEM
michael@0 453 self.assertIsInstance(x, (int, long))
michael@0 454 self.assertGreater(x, 0)
michael@0 455 self.assertEqual(x, psutil.virtual_memory().total)
michael@0 456
michael@0 457 def test_BOOT_TIME(self, arg=None):
michael@0 458 x = arg or psutil.BOOT_TIME
michael@0 459 self.assertIsInstance(x, float)
michael@0 460 self.assertGreater(x, 0)
michael@0 461 self.assertLess(x, time.time())
michael@0 462
michael@0 463 def test_get_boot_time(self):
michael@0 464 self.test_BOOT_TIME(psutil.get_boot_time())
michael@0 465 if WINDOWS:
michael@0 466 # work around float precision issues; give it 1 secs tolerance
michael@0 467 diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME)
michael@0 468 self.assertLess(diff, 1)
michael@0 469 else:
michael@0 470 self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME)
michael@0 471
michael@0 472 def test_NUM_CPUS(self):
michael@0 473 self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True)))
michael@0 474 self.assertGreaterEqual(psutil.NUM_CPUS, 1)
michael@0 475
michael@0 476 @unittest.skipUnless(POSIX, 'posix only')
michael@0 477 def test_PAGESIZE(self):
michael@0 478 # pagesize is used internally to perform different calculations
michael@0 479 # and it's determined by using SC_PAGE_SIZE; make sure
michael@0 480 # getpagesize() returns the same value.
michael@0 481 import resource
michael@0 482 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
michael@0 483
michael@0 484 def test_deprecated_apis(self):
michael@0 485 s = socket.socket()
michael@0 486 s.bind(('localhost', 0))
michael@0 487 s.listen(1)
michael@0 488 warnings.filterwarnings("error")
michael@0 489 p = psutil.Process(os.getpid())
michael@0 490 try:
michael@0 491 self.assertRaises(DeprecationWarning, psutil.virtmem_usage)
michael@0 492 self.assertRaises(DeprecationWarning, psutil.used_phymem)
michael@0 493 self.assertRaises(DeprecationWarning, psutil.avail_phymem)
michael@0 494 self.assertRaises(DeprecationWarning, psutil.total_virtmem)
michael@0 495 self.assertRaises(DeprecationWarning, psutil.used_virtmem)
michael@0 496 self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
michael@0 497 self.assertRaises(DeprecationWarning, psutil.phymem_usage)
michael@0 498 self.assertRaises(DeprecationWarning, psutil.get_process_list)
michael@0 499 self.assertRaises(DeprecationWarning, psutil.network_io_counters)
michael@0 500 if LINUX:
michael@0 501 self.assertRaises(DeprecationWarning, psutil.phymem_buffers)
michael@0 502 self.assertRaises(DeprecationWarning, psutil.cached_phymem)
michael@0 503 try:
michael@0 504 p.nice
michael@0 505 except DeprecationWarning:
michael@0 506 pass
michael@0 507 else:
michael@0 508 self.fail("p.nice didn't raise DeprecationWarning")
michael@0 509 ret = call_until(p.get_connections, "len(ret) != 0", timeout=1)
michael@0 510 self.assertRaises(DeprecationWarning,
michael@0 511 getattr, ret[0], 'local_address')
michael@0 512 self.assertRaises(DeprecationWarning,
michael@0 513 getattr, ret[0], 'remote_address')
michael@0 514 finally:
michael@0 515 s.close()
michael@0 516 warnings.resetwarnings()
michael@0 517
michael@0 518 def test_deprecated_apis_retval(self):
michael@0 519 warnings.filterwarnings("ignore")
michael@0 520 p = psutil.Process(os.getpid())
michael@0 521 try:
michael@0 522 self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total)
michael@0 523 self.assertEqual(p.nice, p.get_nice())
michael@0 524 finally:
michael@0 525 warnings.resetwarnings()
michael@0 526
michael@0 527 def test_virtual_memory(self):
michael@0 528 mem = psutil.virtual_memory()
michael@0 529 assert mem.total > 0, mem
michael@0 530 assert mem.available > 0, mem
michael@0 531 assert 0 <= mem.percent <= 100, mem
michael@0 532 assert mem.used > 0, mem
michael@0 533 assert mem.free >= 0, mem
michael@0 534 for name in mem._fields:
michael@0 535 if name != 'total':
michael@0 536 value = getattr(mem, name)
michael@0 537 if not value >= 0:
michael@0 538 self.fail("%r < 0 (%s)" % (name, value))
michael@0 539 if value > mem.total:
michael@0 540 self.fail("%r > total (total=%s, %s=%s)" \
michael@0 541 % (name, mem.total, name, value))
michael@0 542
michael@0 543 def test_swap_memory(self):
michael@0 544 mem = psutil.swap_memory()
michael@0 545 assert mem.total >= 0, mem
michael@0 546 assert mem.used >= 0, mem
michael@0 547 assert mem.free > 0, mem
michael@0 548 assert 0 <= mem.percent <= 100, mem
michael@0 549 assert mem.sin >= 0, mem
michael@0 550 assert mem.sout >= 0, mem
michael@0 551
michael@0 552 def test_pid_exists(self):
michael@0 553 sproc = get_test_subprocess(wait=True)
michael@0 554 assert psutil.pid_exists(sproc.pid)
michael@0 555 p = psutil.Process(sproc.pid)
michael@0 556 p.kill()
michael@0 557 p.wait()
michael@0 558 self.assertFalse(psutil.pid_exists(sproc.pid))
michael@0 559 self.assertFalse(psutil.pid_exists(-1))
michael@0 560
michael@0 561 def test_pid_exists_2(self):
michael@0 562 reap_children()
michael@0 563 pids = psutil.get_pid_list()
michael@0 564 for pid in pids:
michael@0 565 try:
michael@0 566 assert psutil.pid_exists(pid)
michael@0 567 except AssertionError:
michael@0 568 # in case the process disappeared in meantime fail only
michael@0 569 # if it is no longer in get_pid_list()
michael@0 570 time.sleep(.1)
michael@0 571 if pid in psutil.get_pid_list():
michael@0 572 self.fail(pid)
michael@0 573 pids = range(max(pids) + 5000, max(pids) + 6000)
michael@0 574 for pid in pids:
michael@0 575 self.assertFalse(psutil.pid_exists(pid))
michael@0 576
michael@0 577 def test_get_pid_list(self):
michael@0 578 plist = [x.pid for x in psutil.process_iter()]
michael@0 579 pidlist = psutil.get_pid_list()
michael@0 580 self.assertEqual(plist.sort(), pidlist.sort())
michael@0 581 # make sure every pid is unique
michael@0 582 self.assertEqual(len(pidlist), len(set(pidlist)))
michael@0 583
michael@0 584 def test_test(self):
michael@0 585 # test for psutil.test() function
michael@0 586 stdout = sys.stdout
michael@0 587 sys.stdout = DEVNULL
michael@0 588 try:
michael@0 589 psutil.test()
michael@0 590 finally:
michael@0 591 sys.stdout = stdout
michael@0 592
michael@0 593 def test_sys_cpu_times(self):
michael@0 594 total = 0
michael@0 595 times = psutil.cpu_times()
michael@0 596 sum(times)
michael@0 597 for cp_time in times:
michael@0 598 self.assertIsInstance(cp_time, float)
michael@0 599 self.assertGreaterEqual(cp_time, 0.0)
michael@0 600 total += cp_time
michael@0 601 self.assertEqual(total, sum(times))
michael@0 602 str(times)
michael@0 603
michael@0 604 def test_sys_cpu_times2(self):
michael@0 605 t1 = sum(psutil.cpu_times())
michael@0 606 time.sleep(0.1)
michael@0 607 t2 = sum(psutil.cpu_times())
michael@0 608 difference = t2 - t1
michael@0 609 if not difference >= 0.05:
michael@0 610 self.fail("difference %s" % difference)
michael@0 611
michael@0 612 def test_sys_per_cpu_times(self):
michael@0 613 for times in psutil.cpu_times(percpu=True):
michael@0 614 total = 0
michael@0 615 sum(times)
michael@0 616 for cp_time in times:
michael@0 617 self.assertIsInstance(cp_time, float)
michael@0 618 self.assertGreaterEqual(cp_time, 0.0)
michael@0 619 total += cp_time
michael@0 620 self.assertEqual(total, sum(times))
michael@0 621 str(times)
michael@0 622 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
michael@0 623 len(psutil.cpu_times(percpu=False)))
michael@0 624
michael@0 625 def test_sys_per_cpu_times2(self):
michael@0 626 tot1 = psutil.cpu_times(percpu=True)
michael@0 627 stop_at = time.time() + 0.1
michael@0 628 while 1:
michael@0 629 if time.time() >= stop_at:
michael@0 630 break
michael@0 631 tot2 = psutil.cpu_times(percpu=True)
michael@0 632 for t1, t2 in zip(tot1, tot2):
michael@0 633 t1, t2 = sum(t1), sum(t2)
michael@0 634 difference = t2 - t1
michael@0 635 if difference >= 0.05:
michael@0 636 return
michael@0 637 self.fail()
michael@0 638
michael@0 639 def _test_cpu_percent(self, percent):
michael@0 640 self.assertIsInstance(percent, float)
michael@0 641 self.assertGreaterEqual(percent, 0.0)
michael@0 642 self.assertLessEqual(percent, 100.0)
michael@0 643
michael@0 644 def test_sys_cpu_percent(self):
michael@0 645 psutil.cpu_percent(interval=0.001)
michael@0 646 for x in range(1000):
michael@0 647 self._test_cpu_percent(psutil.cpu_percent(interval=None))
michael@0 648
michael@0 649 def test_sys_per_cpu_percent(self):
michael@0 650 self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)),
michael@0 651 psutil.NUM_CPUS)
michael@0 652 for x in range(1000):
michael@0 653 percents = psutil.cpu_percent(interval=None, percpu=True)
michael@0 654 for percent in percents:
michael@0 655 self._test_cpu_percent(percent)
michael@0 656
michael@0 657 def test_sys_cpu_times_percent(self):
michael@0 658 psutil.cpu_times_percent(interval=0.001)
michael@0 659 for x in range(1000):
michael@0 660 cpu = psutil.cpu_times_percent(interval=None)
michael@0 661 for percent in cpu:
michael@0 662 self._test_cpu_percent(percent)
michael@0 663 self._test_cpu_percent(sum(cpu))
michael@0 664
michael@0 665 def test_sys_per_cpu_times_percent(self):
michael@0 666 self.assertEqual(len(psutil.cpu_times_percent(interval=0.001,
michael@0 667 percpu=True)),
michael@0 668 psutil.NUM_CPUS)
michael@0 669 for x in range(1000):
michael@0 670 cpus = psutil.cpu_times_percent(interval=None, percpu=True)
michael@0 671 for cpu in cpus:
michael@0 672 for percent in cpu:
michael@0 673 self._test_cpu_percent(percent)
michael@0 674 self._test_cpu_percent(sum(cpu))
michael@0 675
michael@0 676 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
michael@0 677 "os.statvfs() function not available on this platform")
michael@0 678 def test_disk_usage(self):
michael@0 679 usage = psutil.disk_usage(os.getcwd())
michael@0 680 assert usage.total > 0, usage
michael@0 681 assert usage.used > 0, usage
michael@0 682 assert usage.free > 0, usage
michael@0 683 assert usage.total > usage.used, usage
michael@0 684 assert usage.total > usage.free, usage
michael@0 685 assert 0 <= usage.percent <= 100, usage.percent
michael@0 686
michael@0 687 # if path does not exist OSError ENOENT is expected across
michael@0 688 # all platforms
michael@0 689 fname = tempfile.mktemp()
michael@0 690 try:
michael@0 691 psutil.disk_usage(fname)
michael@0 692 except OSError:
michael@0 693 err = sys.exc_info()[1]
michael@0 694 if err.args[0] != errno.ENOENT:
michael@0 695 raise
michael@0 696 else:
michael@0 697 self.fail("OSError not raised")
michael@0 698
michael@0 699 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
michael@0 700 "os.statvfs() function not available on this platform")
michael@0 701 def test_disk_partitions(self):
michael@0 702 # all = False
michael@0 703 for disk in psutil.disk_partitions(all=False):
michael@0 704 if WINDOWS and 'cdrom' in disk.opts:
michael@0 705 continue
michael@0 706 if not POSIX:
michael@0 707 assert os.path.exists(disk.device), disk
michael@0 708 else:
michael@0 709 # we cannot make any assumption about this, see:
michael@0 710 # http://goo.gl/p9c43
michael@0 711 disk.device
michael@0 712 if SUNOS:
michael@0 713 # on solaris apparently mount points can also be files
michael@0 714 assert os.path.exists(disk.mountpoint), disk
michael@0 715 else:
michael@0 716 assert os.path.isdir(disk.mountpoint), disk
michael@0 717 assert disk.fstype, disk
michael@0 718 self.assertIsInstance(disk.opts, str)
michael@0 719
michael@0 720 # all = True
michael@0 721 for disk in psutil.disk_partitions(all=True):
michael@0 722 if not WINDOWS:
michael@0 723 try:
michael@0 724 os.stat(disk.mountpoint)
michael@0 725 except OSError:
michael@0 726 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
michael@0 727 err = sys.exc_info()[1]
michael@0 728 if err.errno not in (errno.EPERM, errno.EACCES):
michael@0 729 raise
michael@0 730 else:
michael@0 731 if SUNOS:
michael@0 732 # on solaris apparently mount points can also be files
michael@0 733 assert os.path.exists(disk.mountpoint), disk
michael@0 734 else:
michael@0 735 assert os.path.isdir(disk.mountpoint), disk
michael@0 736 self.assertIsInstance(disk.fstype, str)
michael@0 737 self.assertIsInstance(disk.opts, str)
michael@0 738
michael@0 739 def find_mount_point(path):
michael@0 740 path = os.path.abspath(path)
michael@0 741 while not os.path.ismount(path):
michael@0 742 path = os.path.dirname(path)
michael@0 743 return path
michael@0 744
michael@0 745 mount = find_mount_point(__file__)
michael@0 746 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
michael@0 747 self.assertIn(mount, mounts)
michael@0 748 psutil.disk_usage(mount)
michael@0 749
michael@0 750 def test_net_io_counters(self):
michael@0 751 def check_ntuple(nt):
michael@0 752 self.assertEqual(nt[0], nt.bytes_sent)
michael@0 753 self.assertEqual(nt[1], nt.bytes_recv)
michael@0 754 self.assertEqual(nt[2], nt.packets_sent)
michael@0 755 self.assertEqual(nt[3], nt.packets_recv)
michael@0 756 self.assertEqual(nt[4], nt.errin)
michael@0 757 self.assertEqual(nt[5], nt.errout)
michael@0 758 self.assertEqual(nt[6], nt.dropin)
michael@0 759 self.assertEqual(nt[7], nt.dropout)
michael@0 760 assert nt.bytes_sent >= 0, nt
michael@0 761 assert nt.bytes_recv >= 0, nt
michael@0 762 assert nt.packets_sent >= 0, nt
michael@0 763 assert nt.packets_recv >= 0, nt
michael@0 764 assert nt.errin >= 0, nt
michael@0 765 assert nt.errout >= 0, nt
michael@0 766 assert nt.dropin >= 0, nt
michael@0 767 assert nt.dropout >= 0, nt
michael@0 768
michael@0 769 ret = psutil.net_io_counters(pernic=False)
michael@0 770 check_ntuple(ret)
michael@0 771 ret = psutil.net_io_counters(pernic=True)
michael@0 772 assert ret != []
michael@0 773 for key in ret:
michael@0 774 assert key
michael@0 775 check_ntuple(ret[key])
michael@0 776
michael@0 777 def test_disk_io_counters(self):
michael@0 778 def check_ntuple(nt):
michael@0 779 self.assertEqual(nt[0], nt.read_count)
michael@0 780 self.assertEqual(nt[1], nt.write_count)
michael@0 781 self.assertEqual(nt[2], nt.read_bytes)
michael@0 782 self.assertEqual(nt[3], nt.write_bytes)
michael@0 783 self.assertEqual(nt[4], nt.read_time)
michael@0 784 self.assertEqual(nt[5], nt.write_time)
michael@0 785 assert nt.read_count >= 0, nt
michael@0 786 assert nt.write_count >= 0, nt
michael@0 787 assert nt.read_bytes >= 0, nt
michael@0 788 assert nt.write_bytes >= 0, nt
michael@0 789 assert nt.read_time >= 0, nt
michael@0 790 assert nt.write_time >= 0, nt
michael@0 791
michael@0 792 ret = psutil.disk_io_counters(perdisk=False)
michael@0 793 check_ntuple(ret)
michael@0 794 ret = psutil.disk_io_counters(perdisk=True)
michael@0 795 # make sure there are no duplicates
michael@0 796 self.assertEqual(len(ret), len(set(ret)))
michael@0 797 for key in ret:
michael@0 798 assert key, key
michael@0 799 check_ntuple(ret[key])
michael@0 800 if LINUX and key[-1].isdigit():
michael@0 801 # if 'sda1' is listed 'sda' shouldn't, see:
michael@0 802 # http://code.google.com/p/psutil/issues/detail?id=338
michael@0 803 while key[-1].isdigit():
michael@0 804 key = key[:-1]
michael@0 805 self.assertNotIn(key, ret.keys())
michael@0 806
michael@0 807 def test_get_users(self):
michael@0 808 users = psutil.get_users()
michael@0 809 assert users
michael@0 810 for user in users:
michael@0 811 assert user.name, user
michael@0 812 user.terminal
michael@0 813 user.host
michael@0 814 assert user.started > 0.0, user
michael@0 815 datetime.datetime.fromtimestamp(user.started)
michael@0 816
michael@0 817
michael@0 818 # ===================================================================
michael@0 819 # --- psutil.Process class tests
michael@0 820 # ===================================================================
michael@0 821
michael@0 822 class TestProcess(unittest.TestCase):
michael@0 823 """Tests for psutil.Process class."""
michael@0 824
michael@0 825 def setUp(self):
michael@0 826 safe_remove(TESTFN)
michael@0 827
michael@0 828 def tearDown(self):
michael@0 829 reap_children()
michael@0 830
michael@0 831 def test_kill(self):
michael@0 832 sproc = get_test_subprocess(wait=True)
michael@0 833 test_pid = sproc.pid
michael@0 834 p = psutil.Process(test_pid)
michael@0 835 name = p.name
michael@0 836 p.kill()
michael@0 837 p.wait()
michael@0 838 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
michael@0 839
michael@0 840 def test_terminate(self):
michael@0 841 sproc = get_test_subprocess(wait=True)
michael@0 842 test_pid = sproc.pid
michael@0 843 p = psutil.Process(test_pid)
michael@0 844 name = p.name
michael@0 845 p.terminate()
michael@0 846 p.wait()
michael@0 847 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
michael@0 848
michael@0 849 def test_send_signal(self):
michael@0 850 if POSIX:
michael@0 851 sig = signal.SIGKILL
michael@0 852 else:
michael@0 853 sig = signal.SIGTERM
michael@0 854 sproc = get_test_subprocess()
michael@0 855 test_pid = sproc.pid
michael@0 856 p = psutil.Process(test_pid)
michael@0 857 name = p.name
michael@0 858 p.send_signal(sig)
michael@0 859 p.wait()
michael@0 860 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
michael@0 861
michael@0 862 def test_wait(self):
michael@0 863 # check exit code signal
michael@0 864 sproc = get_test_subprocess()
michael@0 865 p = psutil.Process(sproc.pid)
michael@0 866 p.kill()
michael@0 867 code = p.wait()
michael@0 868 if os.name == 'posix':
michael@0 869 self.assertEqual(code, signal.SIGKILL)
michael@0 870 else:
michael@0 871 self.assertEqual(code, 0)
michael@0 872 self.assertFalse(p.is_running())
michael@0 873
michael@0 874 sproc = get_test_subprocess()
michael@0 875 p = psutil.Process(sproc.pid)
michael@0 876 p.terminate()
michael@0 877 code = p.wait()
michael@0 878 if os.name == 'posix':
michael@0 879 self.assertEqual(code, signal.SIGTERM)
michael@0 880 else:
michael@0 881 self.assertEqual(code, 0)
michael@0 882 self.assertFalse(p.is_running())
michael@0 883
michael@0 884 # check sys.exit() code
michael@0 885 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
michael@0 886 sproc = get_test_subprocess([PYTHON, "-c", code])
michael@0 887 p = psutil.Process(sproc.pid)
michael@0 888 self.assertEqual(p.wait(), 5)
michael@0 889 self.assertFalse(p.is_running())
michael@0 890
michael@0 891 # Test wait() issued twice.
michael@0 892 # It is not supposed to raise NSP when the process is gone.
michael@0 893 # On UNIX this should return None, on Windows it should keep
michael@0 894 # returning the exit code.
michael@0 895 sproc = get_test_subprocess([PYTHON, "-c", code])
michael@0 896 p = psutil.Process(sproc.pid)
michael@0 897 self.assertEqual(p.wait(), 5)
michael@0 898 self.assertIn(p.wait(), (5, None))
michael@0 899
michael@0 900 # test timeout
michael@0 901 sproc = get_test_subprocess()
michael@0 902 p = psutil.Process(sproc.pid)
michael@0 903 p.name
michael@0 904 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
michael@0 905
michael@0 906 # timeout < 0 not allowed
michael@0 907 self.assertRaises(ValueError, p.wait, -1)
michael@0 908
michael@0 909 @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows?
michael@0 910 def test_wait_non_children(self):
michael@0 911 # test wait() against processes which are not our children
michael@0 912 code = "import sys;"
michael@0 913 code += "from subprocess import Popen, PIPE;"
michael@0 914 code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON
michael@0 915 code += "sp = Popen(cmd, stdout=PIPE);"
michael@0 916 code += "sys.stdout.write(str(sp.pid));"
michael@0 917 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
michael@0 918
michael@0 919 grandson_pid = int(sproc.stdout.read())
michael@0 920 grandson_proc = psutil.Process(grandson_pid)
michael@0 921 try:
michael@0 922 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
michael@0 923 grandson_proc.kill()
michael@0 924 ret = grandson_proc.wait()
michael@0 925 self.assertEqual(ret, None)
michael@0 926 finally:
michael@0 927 if grandson_proc.is_running():
michael@0 928 grandson_proc.kill()
michael@0 929 grandson_proc.wait()
michael@0 930
michael@0 931 def test_wait_timeout_0(self):
michael@0 932 sproc = get_test_subprocess()
michael@0 933 p = psutil.Process(sproc.pid)
michael@0 934 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
michael@0 935 p.kill()
michael@0 936 stop_at = time.time() + 2
michael@0 937 while 1:
michael@0 938 try:
michael@0 939 code = p.wait(0)
michael@0 940 except psutil.TimeoutExpired:
michael@0 941 if time.time() >= stop_at:
michael@0 942 raise
michael@0 943 else:
michael@0 944 break
michael@0 945 if os.name == 'posix':
michael@0 946 self.assertEqual(code, signal.SIGKILL)
michael@0 947 else:
michael@0 948 self.assertEqual(code, 0)
michael@0 949 self.assertFalse(p.is_running())
michael@0 950
michael@0 951 def test_cpu_percent(self):
michael@0 952 p = psutil.Process(os.getpid())
michael@0 953 p.get_cpu_percent(interval=0.001)
michael@0 954 p.get_cpu_percent(interval=0.001)
michael@0 955 for x in range(100):
michael@0 956 percent = p.get_cpu_percent(interval=None)
michael@0 957 self.assertIsInstance(percent, float)
michael@0 958 self.assertGreaterEqual(percent, 0.0)
michael@0 959 if os.name != 'posix':
michael@0 960 self.assertLessEqual(percent, 100.0)
michael@0 961 else:
michael@0 962 self.assertGreaterEqual(percent, 0.0)
michael@0 963
michael@0 964 def test_cpu_times(self):
michael@0 965 times = psutil.Process(os.getpid()).get_cpu_times()
michael@0 966 assert (times.user > 0.0) or (times.system > 0.0), times
michael@0 967 # make sure returned values can be pretty printed with strftime
michael@0 968 time.strftime("%H:%M:%S", time.localtime(times.user))
michael@0 969 time.strftime("%H:%M:%S", time.localtime(times.system))
michael@0 970
michael@0 971 # Test Process.cpu_times() against os.times()
michael@0 972 # os.times() is broken on Python 2.6
michael@0 973 # http://bugs.python.org/issue1040026
michael@0 974 # XXX fails on OSX: not sure if it's for os.times(). We should
michael@0 975 # try this with Python 2.7 and re-enable the test.
michael@0 976
michael@0 977 @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX,
michael@0 978 'os.times() is not reliable on this Python version')
michael@0 979 def test_cpu_times2(self):
michael@0 980 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
michael@0 981 utime, ktime = os.times()[:2]
michael@0 982
michael@0 983 # Use os.times()[:2] as base values to compare our results
michael@0 984 # using a tolerance of +/- 0.1 seconds.
michael@0 985 # It will fail if the difference between the values is > 0.1s.
michael@0 986 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
michael@0 987 self.fail("expected: %s, found: %s" %(utime, user_time))
michael@0 988
michael@0 989 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
michael@0 990 self.fail("expected: %s, found: %s" %(ktime, kernel_time))
michael@0 991
michael@0 992 def test_create_time(self):
michael@0 993 sproc = get_test_subprocess(wait=True)
michael@0 994 now = time.time()
michael@0 995 p = psutil.Process(sproc.pid)
michael@0 996 create_time = p.create_time
michael@0 997
michael@0 998 # Use time.time() as base value to compare our result using a
michael@0 999 # tolerance of +/- 1 second.
michael@0 1000 # It will fail if the difference between the values is > 2s.
michael@0 1001 difference = abs(create_time - now)
michael@0 1002 if difference > 2:
michael@0 1003 self.fail("expected: %s, found: %s, difference: %s"
michael@0 1004 % (now, create_time, difference))
michael@0 1005
michael@0 1006 # make sure returned value can be pretty printed with strftime
michael@0 1007 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
michael@0 1008
michael@0 1009 @unittest.skipIf(WINDOWS, 'windows only')
michael@0 1010 def test_terminal(self):
michael@0 1011 terminal = psutil.Process(os.getpid()).terminal
michael@0 1012 if sys.stdin.isatty():
michael@0 1013 self.assertEqual(terminal, sh('tty'))
michael@0 1014 else:
michael@0 1015 assert terminal, repr(terminal)
michael@0 1016
michael@0 1017 @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'),
michael@0 1018 'not available on this platform')
michael@0 1019 @skip_on_not_implemented(only_if=LINUX)
michael@0 1020 def test_get_io_counters(self):
michael@0 1021 p = psutil.Process(os.getpid())
michael@0 1022 # test reads
michael@0 1023 io1 = p.get_io_counters()
michael@0 1024 f = open(PYTHON, 'rb')
michael@0 1025 f.read()
michael@0 1026 f.close()
michael@0 1027 io2 = p.get_io_counters()
michael@0 1028 if not BSD:
michael@0 1029 assert io2.read_count > io1.read_count, (io1, io2)
michael@0 1030 self.assertEqual(io2.write_count, io1.write_count)
michael@0 1031 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
michael@0 1032 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
michael@0 1033 # test writes
michael@0 1034 io1 = p.get_io_counters()
michael@0 1035 f = tempfile.TemporaryFile()
michael@0 1036 if PY3:
michael@0 1037 f.write(bytes("x" * 1000000, 'ascii'))
michael@0 1038 else:
michael@0 1039 f.write("x" * 1000000)
michael@0 1040 f.close()
michael@0 1041 io2 = p.get_io_counters()
michael@0 1042 assert io2.write_count >= io1.write_count, (io1, io2)
michael@0 1043 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
michael@0 1044 assert io2.read_count >= io1.read_count, (io1, io2)
michael@0 1045 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
michael@0 1046
michael@0 1047 # Linux and Windows Vista+
michael@0 1048 @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'),
michael@0 1049 'Linux and Windows Vista only')
michael@0 1050 def test_get_set_ionice(self):
michael@0 1051 if LINUX:
michael@0 1052 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
michael@0 1053 IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
michael@0 1054 self.assertEqual(IOPRIO_CLASS_NONE, 0)
michael@0 1055 self.assertEqual(IOPRIO_CLASS_RT, 1)
michael@0 1056 self.assertEqual(IOPRIO_CLASS_BE, 2)
michael@0 1057 self.assertEqual(IOPRIO_CLASS_IDLE, 3)
michael@0 1058 p = psutil.Process(os.getpid())
michael@0 1059 try:
michael@0 1060 p.set_ionice(2)
michael@0 1061 ioclass, value = p.get_ionice()
michael@0 1062 self.assertEqual(ioclass, 2)
michael@0 1063 self.assertEqual(value, 4)
michael@0 1064 #
michael@0 1065 p.set_ionice(3)
michael@0 1066 ioclass, value = p.get_ionice()
michael@0 1067 self.assertEqual(ioclass, 3)
michael@0 1068 self.assertEqual(value, 0)
michael@0 1069 #
michael@0 1070 p.set_ionice(2, 0)
michael@0 1071 ioclass, value = p.get_ionice()
michael@0 1072 self.assertEqual(ioclass, 2)
michael@0 1073 self.assertEqual(value, 0)
michael@0 1074 p.set_ionice(2, 7)
michael@0 1075 ioclass, value = p.get_ionice()
michael@0 1076 self.assertEqual(ioclass, 2)
michael@0 1077 self.assertEqual(value, 7)
michael@0 1078 self.assertRaises(ValueError, p.set_ionice, 2, 10)
michael@0 1079 finally:
michael@0 1080 p.set_ionice(IOPRIO_CLASS_NONE)
michael@0 1081 else:
michael@0 1082 p = psutil.Process(os.getpid())
michael@0 1083 original = p.get_ionice()
michael@0 1084 try:
michael@0 1085 value = 0 # very low
michael@0 1086 if original == value:
michael@0 1087 value = 1 # low
michael@0 1088 p.set_ionice(value)
michael@0 1089 self.assertEqual(p.get_ionice(), value)
michael@0 1090 finally:
michael@0 1091 p.set_ionice(original)
michael@0 1092 #
michael@0 1093 self.assertRaises(ValueError, p.set_ionice, 3)
michael@0 1094 self.assertRaises(TypeError, p.set_ionice, 2, 1)
michael@0 1095
michael@0 1096 def test_get_num_threads(self):
michael@0 1097 # on certain platforms such as Linux we might test for exact
michael@0 1098 # thread number, since we always have with 1 thread per process,
michael@0 1099 # but this does not apply across all platforms (OSX, Windows)
michael@0 1100 p = psutil.Process(os.getpid())
michael@0 1101 step1 = p.get_num_threads()
michael@0 1102
michael@0 1103 thread = ThreadTask()
michael@0 1104 thread.start()
michael@0 1105 try:
michael@0 1106 step2 = p.get_num_threads()
michael@0 1107 self.assertEqual(step2, step1 + 1)
michael@0 1108 thread.stop()
michael@0 1109 finally:
michael@0 1110 if thread._running:
michael@0 1111 thread.stop()
michael@0 1112
michael@0 1113 @unittest.skipUnless(WINDOWS, 'Windows only')
michael@0 1114 def test_get_num_handles(self):
michael@0 1115 # a better test is done later into test/_windows.py
michael@0 1116 p = psutil.Process(os.getpid())
michael@0 1117 self.assertGreater(p.get_num_handles(), 0)
michael@0 1118
michael@0 1119 def test_get_threads(self):
michael@0 1120 p = psutil.Process(os.getpid())
michael@0 1121 step1 = p.get_threads()
michael@0 1122
michael@0 1123 thread = ThreadTask()
michael@0 1124 thread.start()
michael@0 1125
michael@0 1126 try:
michael@0 1127 step2 = p.get_threads()
michael@0 1128 self.assertEqual(len(step2), len(step1) + 1)
michael@0 1129 # on Linux, first thread id is supposed to be this process
michael@0 1130 if LINUX:
michael@0 1131 self.assertEqual(step2[0].id, os.getpid())
michael@0 1132 athread = step2[0]
michael@0 1133 # test named tuple
michael@0 1134 self.assertEqual(athread.id, athread[0])
michael@0 1135 self.assertEqual(athread.user_time, athread[1])
michael@0 1136 self.assertEqual(athread.system_time, athread[2])
michael@0 1137 # test num threads
michael@0 1138 thread.stop()
michael@0 1139 finally:
michael@0 1140 if thread._running:
michael@0 1141 thread.stop()
michael@0 1142
michael@0 1143 def test_get_memory_info(self):
michael@0 1144 p = psutil.Process(os.getpid())
michael@0 1145
michael@0 1146 # step 1 - get a base value to compare our results
michael@0 1147 rss1, vms1 = p.get_memory_info()
michael@0 1148 percent1 = p.get_memory_percent()
michael@0 1149 self.assertGreater(rss1, 0)
michael@0 1150 self.assertGreater(vms1, 0)
michael@0 1151
michael@0 1152 # step 2 - allocate some memory
michael@0 1153 memarr = [None] * 1500000
michael@0 1154
michael@0 1155 rss2, vms2 = p.get_memory_info()
michael@0 1156 percent2 = p.get_memory_percent()
michael@0 1157 # make sure that the memory usage bumped up
michael@0 1158 self.assertGreater(rss2, rss1)
michael@0 1159 self.assertGreaterEqual(vms2, vms1) # vms might be equal
michael@0 1160 self.assertGreater(percent2, percent1)
michael@0 1161 del memarr
michael@0 1162
michael@0 1163 # def test_get_ext_memory_info(self):
michael@0 1164 # # tested later in fetch all test suite
michael@0 1165
michael@0 1166 def test_get_memory_maps(self):
michael@0 1167 p = psutil.Process(os.getpid())
michael@0 1168 maps = p.get_memory_maps()
michael@0 1169 paths = [x for x in maps]
michael@0 1170 self.assertEqual(len(paths), len(set(paths)))
michael@0 1171 ext_maps = p.get_memory_maps(grouped=False)
michael@0 1172
michael@0 1173 for nt in maps:
michael@0 1174 if not nt.path.startswith('['):
michael@0 1175 assert os.path.isabs(nt.path), nt.path
michael@0 1176 if POSIX:
michael@0 1177 assert os.path.exists(nt.path), nt.path
michael@0 1178 else:
michael@0 1179 # XXX - On Windows we have this strange behavior with
michael@0 1180 # 64 bit dlls: they are visible via explorer but cannot
michael@0 1181 # be accessed via os.stat() (wtf?).
michael@0 1182 if '64' not in os.path.basename(nt.path):
michael@0 1183 assert os.path.exists(nt.path), nt.path
michael@0 1184 for nt in ext_maps:
michael@0 1185 for fname in nt._fields:
michael@0 1186 value = getattr(nt, fname)
michael@0 1187 if fname == 'path':
michael@0 1188 continue
michael@0 1189 elif fname in ('addr', 'perms'):
michael@0 1190 assert value, value
michael@0 1191 else:
michael@0 1192 self.assertIsInstance(value, (int, long))
michael@0 1193 assert value >= 0, value
michael@0 1194
michael@0 1195 def test_get_memory_percent(self):
michael@0 1196 p = psutil.Process(os.getpid())
michael@0 1197 self.assertGreater(p.get_memory_percent(), 0.0)
michael@0 1198
michael@0 1199 def test_pid(self):
michael@0 1200 sproc = get_test_subprocess()
michael@0 1201 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
michael@0 1202
michael@0 1203 def test_is_running(self):
michael@0 1204 sproc = get_test_subprocess(wait=True)
michael@0 1205 p = psutil.Process(sproc.pid)
michael@0 1206 assert p.is_running()
michael@0 1207 assert p.is_running()
michael@0 1208 p.kill()
michael@0 1209 p.wait()
michael@0 1210 assert not p.is_running()
michael@0 1211 assert not p.is_running()
michael@0 1212
michael@0 1213 def test_exe(self):
michael@0 1214 sproc = get_test_subprocess(wait=True)
michael@0 1215 exe = psutil.Process(sproc.pid).exe
michael@0 1216 try:
michael@0 1217 self.assertEqual(exe, PYTHON)
michael@0 1218 except AssertionError:
michael@0 1219 if WINDOWS and len(exe) == len(PYTHON):
michael@0 1220 # on Windows we don't care about case sensitivity
michael@0 1221 self.assertEqual(exe.lower(), PYTHON.lower())
michael@0 1222 else:
michael@0 1223 # certain platforms such as BSD are more accurate returning:
michael@0 1224 # "/usr/local/bin/python2.7"
michael@0 1225 # ...instead of:
michael@0 1226 # "/usr/local/bin/python"
michael@0 1227 # We do not want to consider this difference in accuracy
michael@0 1228 # an error.
michael@0 1229 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
michael@0 1230 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
michael@0 1231
michael@0 1232 def test_cmdline(self):
michael@0 1233 cmdline = [PYTHON, "-c", "import time; time.sleep(2)"]
michael@0 1234 sproc = get_test_subprocess(cmdline, wait=True)
michael@0 1235 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline),
michael@0 1236 ' '.join(cmdline))
michael@0 1237
michael@0 1238 def test_name(self):
michael@0 1239 sproc = get_test_subprocess(PYTHON, wait=True)
michael@0 1240 name = psutil.Process(sproc.pid).name.lower()
michael@0 1241 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
michael@0 1242 assert pyexe.startswith(name), (pyexe, name)
michael@0 1243
michael@0 1244 @unittest.skipUnless(POSIX, 'posix only')
michael@0 1245 def test_uids(self):
michael@0 1246 p = psutil.Process(os.getpid())
michael@0 1247 real, effective, saved = p.uids
michael@0 1248 # os.getuid() refers to "real" uid
michael@0 1249 self.assertEqual(real, os.getuid())
michael@0 1250 # os.geteuid() refers to "effective" uid
michael@0 1251 self.assertEqual(effective, os.geteuid())
michael@0 1252 # no such thing as os.getsuid() ("saved" uid), but starting
michael@0 1253 # from python 2.7 we have os.getresuid()[2]
michael@0 1254 if hasattr(os, "getresuid"):
michael@0 1255 self.assertEqual(saved, os.getresuid()[2])
michael@0 1256
michael@0 1257 @unittest.skipUnless(POSIX, 'posix only')
michael@0 1258 def test_gids(self):
michael@0 1259 p = psutil.Process(os.getpid())
michael@0 1260 real, effective, saved = p.gids
michael@0 1261 # os.getuid() refers to "real" uid
michael@0 1262 self.assertEqual(real, os.getgid())
michael@0 1263 # os.geteuid() refers to "effective" uid
michael@0 1264 self.assertEqual(effective, os.getegid())
michael@0 1265 # no such thing as os.getsuid() ("saved" uid), but starting
michael@0 1266 # from python 2.7 we have os.getresgid()[2]
michael@0 1267 if hasattr(os, "getresuid"):
michael@0 1268 self.assertEqual(saved, os.getresgid()[2])
michael@0 1269
michael@0 1270 def test_nice(self):
michael@0 1271 p = psutil.Process(os.getpid())
michael@0 1272 self.assertRaises(TypeError, p.set_nice, "str")
michael@0 1273 if os.name == 'nt':
michael@0 1274 try:
michael@0 1275 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
michael@0 1276 p.set_nice(psutil.HIGH_PRIORITY_CLASS)
michael@0 1277 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS)
michael@0 1278 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
michael@0 1279 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
michael@0 1280 finally:
michael@0 1281 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
michael@0 1282 else:
michael@0 1283 try:
michael@0 1284 try:
michael@0 1285 first_nice = p.get_nice()
michael@0 1286 p.set_nice(1)
michael@0 1287 self.assertEqual(p.get_nice(), 1)
michael@0 1288 # going back to previous nice value raises AccessDenied on OSX
michael@0 1289 if not OSX:
michael@0 1290 p.set_nice(0)
michael@0 1291 self.assertEqual(p.get_nice(), 0)
michael@0 1292 except psutil.AccessDenied:
michael@0 1293 pass
michael@0 1294 finally:
michael@0 1295 try:
michael@0 1296 p.set_nice(first_nice)
michael@0 1297 except psutil.AccessDenied:
michael@0 1298 pass
michael@0 1299
michael@0 1300 def test_status(self):
michael@0 1301 p = psutil.Process(os.getpid())
michael@0 1302 self.assertEqual(p.status, psutil.STATUS_RUNNING)
michael@0 1303 self.assertEqual(str(p.status), "running")
michael@0 1304
michael@0 1305 def test_status_constants(self):
michael@0 1306 # STATUS_* constants are supposed to be comparable also by
michael@0 1307 # using their str representation
michael@0 1308 self.assertTrue(psutil.STATUS_RUNNING == 0)
michael@0 1309 self.assertTrue(psutil.STATUS_RUNNING == long(0))
michael@0 1310 self.assertTrue(psutil.STATUS_RUNNING == 'running')
michael@0 1311 self.assertFalse(psutil.STATUS_RUNNING == 1)
michael@0 1312 self.assertFalse(psutil.STATUS_RUNNING == 'sleeping')
michael@0 1313 self.assertFalse(psutil.STATUS_RUNNING != 0)
michael@0 1314 self.assertFalse(psutil.STATUS_RUNNING != 'running')
michael@0 1315 self.assertTrue(psutil.STATUS_RUNNING != 1)
michael@0 1316 self.assertTrue(psutil.STATUS_RUNNING != 'sleeping')
michael@0 1317
michael@0 1318 def test_username(self):
michael@0 1319 sproc = get_test_subprocess()
michael@0 1320 p = psutil.Process(sproc.pid)
michael@0 1321 if POSIX:
michael@0 1322 import pwd
michael@0 1323 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
michael@0 1324 elif WINDOWS and 'USERNAME' in os.environ:
michael@0 1325 expected_username = os.environ['USERNAME']
michael@0 1326 expected_domain = os.environ['USERDOMAIN']
michael@0 1327 domain, username = p.username.split('\\')
michael@0 1328 self.assertEqual(domain, expected_domain)
michael@0 1329 self.assertEqual(username, expected_username)
michael@0 1330 else:
michael@0 1331 p.username
michael@0 1332
michael@0 1333 @unittest.skipUnless(hasattr(psutil.Process, "getcwd"),
michael@0 1334 'not available on this platform')
michael@0 1335 def test_getcwd(self):
michael@0 1336 sproc = get_test_subprocess(wait=True)
michael@0 1337 p = psutil.Process(sproc.pid)
michael@0 1338 self.assertEqual(p.getcwd(), os.getcwd())
michael@0 1339
michael@0 1340 @unittest.skipIf(not hasattr(psutil.Process, "getcwd"),
michael@0 1341 'not available on this platform')
michael@0 1342 def test_getcwd_2(self):
michael@0 1343 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"]
michael@0 1344 sproc = get_test_subprocess(cmd, wait=True)
michael@0 1345 p = psutil.Process(sproc.pid)
michael@0 1346 call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1)
michael@0 1347
michael@0 1348 @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"),
michael@0 1349 'not available on this platform')
michael@0 1350 def test_cpu_affinity(self):
michael@0 1351 p = psutil.Process(os.getpid())
michael@0 1352 initial = p.get_cpu_affinity()
michael@0 1353 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
michael@0 1354 #
michael@0 1355 for n in all_cpus:
michael@0 1356 p.set_cpu_affinity([n])
michael@0 1357 self.assertEqual(p.get_cpu_affinity(), [n])
michael@0 1358 #
michael@0 1359 p.set_cpu_affinity(all_cpus)
michael@0 1360 self.assertEqual(p.get_cpu_affinity(), all_cpus)
michael@0 1361 #
michael@0 1362 p.set_cpu_affinity(initial)
michael@0 1363 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
michael@0 1364 self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu)
michael@0 1365
michael@0 1366 def test_get_open_files(self):
michael@0 1367 # current process
michael@0 1368 p = psutil.Process(os.getpid())
michael@0 1369 files = p.get_open_files()
michael@0 1370 self.assertFalse(TESTFN in files)
michael@0 1371 f = open(TESTFN, 'w')
michael@0 1372 call_until(p.get_open_files, "len(ret) != %i" % len(files))
michael@0 1373 filenames = [x.path for x in p.get_open_files()]
michael@0 1374 self.assertIn(TESTFN, filenames)
michael@0 1375 f.close()
michael@0 1376 for file in filenames:
michael@0 1377 assert os.path.isfile(file), file
michael@0 1378
michael@0 1379 # another process
michael@0 1380 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN
michael@0 1381 sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
michael@0 1382 p = psutil.Process(sproc.pid)
michael@0 1383 for x in range(100):
michael@0 1384 filenames = [x.path for x in p.get_open_files()]
michael@0 1385 if TESTFN in filenames:
michael@0 1386 break
michael@0 1387 time.sleep(.01)
michael@0 1388 else:
michael@0 1389 self.assertIn(TESTFN, filenames)
michael@0 1390 for file in filenames:
michael@0 1391 assert os.path.isfile(file), file
michael@0 1392
michael@0 1393 def test_get_open_files2(self):
michael@0 1394 # test fd and path fields
michael@0 1395 fileobj = open(TESTFN, 'w')
michael@0 1396 p = psutil.Process(os.getpid())
michael@0 1397 for path, fd in p.get_open_files():
michael@0 1398 if path == fileobj.name or fd == fileobj.fileno():
michael@0 1399 break
michael@0 1400 else:
michael@0 1401 self.fail("no file found; files=%s" % repr(p.get_open_files()))
michael@0 1402 self.assertEqual(path, fileobj.name)
michael@0 1403 if WINDOWS:
michael@0 1404 self.assertEqual(fd, -1)
michael@0 1405 else:
michael@0 1406 self.assertEqual(fd, fileobj.fileno())
michael@0 1407 # test positions
michael@0 1408 ntuple = p.get_open_files()[0]
michael@0 1409 self.assertEqual(ntuple[0], ntuple.path)
michael@0 1410 self.assertEqual(ntuple[1], ntuple.fd)
michael@0 1411 # test file is gone
michael@0 1412 fileobj.close()
michael@0 1413 self.assertTrue(fileobj.name not in p.get_open_files())
michael@0 1414
michael@0 1415 def test_connection_constants(self):
michael@0 1416 ints = []
michael@0 1417 strs = []
michael@0 1418 for name in dir(psutil):
michael@0 1419 if name.startswith('CONN_'):
michael@0 1420 num = getattr(psutil, name)
michael@0 1421 str_ = str(num)
michael@0 1422 assert str_.isupper(), str_
michael@0 1423 assert str_ not in strs, str_
michael@0 1424 assert num not in ints, num
michael@0 1425 ints.append(num)
michael@0 1426 strs.append(str_)
michael@0 1427 if SUNOS:
michael@0 1428 psutil.CONN_IDLE
michael@0 1429 psutil.CONN_BOUND
michael@0 1430 if WINDOWS:
michael@0 1431 psutil.CONN_DELETE_TCB
michael@0 1432
michael@0 1433 def test_get_connections(self):
michael@0 1434 arg = "import socket, time;" \
michael@0 1435 "s = socket.socket();" \
michael@0 1436 "s.bind(('127.0.0.1', 0));" \
michael@0 1437 "s.listen(1);" \
michael@0 1438 "conn, addr = s.accept();" \
michael@0 1439 "time.sleep(2);"
michael@0 1440 sproc = get_test_subprocess([PYTHON, "-c", arg])
michael@0 1441 p = psutil.Process(sproc.pid)
michael@0 1442 for x in range(100):
michael@0 1443 if p.get_connections():
michael@0 1444 # give the subprocess some more time to bind()
michael@0 1445 time.sleep(.01)
michael@0 1446 cons = p.get_connections()
michael@0 1447 break
michael@0 1448 time.sleep(.01)
michael@0 1449 self.assertEqual(len(cons), 1)
michael@0 1450 con = cons[0]
michael@0 1451 self.assertEqual(con.family, socket.AF_INET)
michael@0 1452 self.assertEqual(con.type, socket.SOCK_STREAM)
michael@0 1453 self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status))
michael@0 1454 ip, port = con.laddr
michael@0 1455 self.assertEqual(ip, '127.0.0.1')
michael@0 1456 self.assertEqual(con.raddr, ())
michael@0 1457 if WINDOWS or SUNOS:
michael@0 1458 self.assertEqual(con.fd, -1)
michael@0 1459 else:
michael@0 1460 assert con.fd > 0, con
michael@0 1461 # test positions
michael@0 1462 self.assertEqual(con[0], con.fd)
michael@0 1463 self.assertEqual(con[1], con.family)
michael@0 1464 self.assertEqual(con[2], con.type)
michael@0 1465 self.assertEqual(con[3], con.laddr)
michael@0 1466 self.assertEqual(con[4], con.raddr)
michael@0 1467 self.assertEqual(con[5], con.status)
michael@0 1468 # test kind arg
michael@0 1469 self.assertRaises(ValueError, p.get_connections, 'foo')
michael@0 1470
michael@0 1471 @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported')
michael@0 1472 def test_get_connections_ipv6(self):
michael@0 1473 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
michael@0 1474 s.bind(('::1', 0))
michael@0 1475 s.listen(1)
michael@0 1476 cons = psutil.Process(os.getpid()).get_connections()
michael@0 1477 s.close()
michael@0 1478 self.assertEqual(len(cons), 1)
michael@0 1479 self.assertEqual(cons[0].laddr[0], '::1')
michael@0 1480
michael@0 1481 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported')
michael@0 1482 def test_get_connections_unix(self):
michael@0 1483 # tcp
michael@0 1484 safe_remove(TESTFN)
michael@0 1485 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
michael@0 1486 sock.bind(TESTFN)
michael@0 1487 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
michael@0 1488 if conn.fd != -1: # != sunos and windows
michael@0 1489 self.assertEqual(conn.fd, sock.fileno())
michael@0 1490 self.assertEqual(conn.family, socket.AF_UNIX)
michael@0 1491 self.assertEqual(conn.type, socket.SOCK_STREAM)
michael@0 1492 self.assertEqual(conn.laddr, TESTFN)
michael@0 1493 self.assertTrue(not conn.raddr)
michael@0 1494 self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status))
michael@0 1495 sock.close()
michael@0 1496 # udp
michael@0 1497 safe_remove(TESTFN)
michael@0 1498 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
michael@0 1499 sock.bind(TESTFN)
michael@0 1500 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
michael@0 1501 self.assertEqual(conn.type, socket.SOCK_DGRAM)
michael@0 1502 sock.close()
michael@0 1503
michael@0 1504 @unittest.skipUnless(hasattr(socket, "fromfd"),
michael@0 1505 'socket.fromfd() is not availble')
michael@0 1506 @unittest.skipIf(WINDOWS or SUNOS,
michael@0 1507 'connection fd available on this platform')
michael@0 1508 def test_connection_fromfd(self):
michael@0 1509 sock = socket.socket()
michael@0 1510 sock.bind(('localhost', 0))
michael@0 1511 sock.listen(1)
michael@0 1512 p = psutil.Process(os.getpid())
michael@0 1513 for conn in p.get_connections():
michael@0 1514 if conn.fd == sock.fileno():
michael@0 1515 break
michael@0 1516 else:
michael@0 1517 sock.close()
michael@0 1518 self.fail("couldn't find socket fd")
michael@0 1519 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
michael@0 1520 try:
michael@0 1521 self.assertEqual(dupsock.getsockname(), conn.laddr)
michael@0 1522 self.assertNotEqual(sock.fileno(), dupsock.fileno())
michael@0 1523 finally:
michael@0 1524 sock.close()
michael@0 1525 dupsock.close()
michael@0 1526
michael@0 1527 def test_get_connections_all(self):
michael@0 1528 tcp_template = "import socket;" \
michael@0 1529 "s = socket.socket($family, socket.SOCK_STREAM);" \
michael@0 1530 "s.bind(('$addr', 0));" \
michael@0 1531 "s.listen(1);" \
michael@0 1532 "conn, addr = s.accept();"
michael@0 1533
michael@0 1534 udp_template = "import socket, time;" \
michael@0 1535 "s = socket.socket($family, socket.SOCK_DGRAM);" \
michael@0 1536 "s.bind(('$addr', 0));" \
michael@0 1537 "time.sleep(2);"
michael@0 1538
michael@0 1539 from string import Template
michael@0 1540 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
michael@0 1541 addr="127.0.0.1")
michael@0 1542 udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
michael@0 1543 addr="127.0.0.1")
michael@0 1544 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
michael@0 1545 addr="::1")
michael@0 1546 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
michael@0 1547 addr="::1")
michael@0 1548
michael@0 1549 # launch various subprocess instantiating a socket of various
michael@0 1550 # families and types to enrich psutil results
michael@0 1551 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
michael@0 1552 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
michael@0 1553 if supports_ipv6():
michael@0 1554 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
michael@0 1555 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
michael@0 1556 else:
michael@0 1557 tcp6_proc = None
michael@0 1558 udp6_proc = None
michael@0 1559
michael@0 1560 # check matches against subprocesses just created
michael@0 1561 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
michael@0 1562 "udp", "udp4", "udp6")
michael@0 1563 for p in psutil.Process(os.getpid()).get_children():
michael@0 1564 for conn in p.get_connections():
michael@0 1565 # TCP v4
michael@0 1566 if p.pid == tcp4_proc.pid:
michael@0 1567 self.assertEqual(conn.family, socket.AF_INET)
michael@0 1568 self.assertEqual(conn.type, socket.SOCK_STREAM)
michael@0 1569 self.assertEqual(conn.laddr[0], "127.0.0.1")
michael@0 1570 self.assertEqual(conn.raddr, ())
michael@0 1571 self.assertEqual(conn.status, psutil.CONN_LISTEN,
michael@0 1572 str(conn.status))
michael@0 1573 for kind in all_kinds:
michael@0 1574 cons = p.get_connections(kind=kind)
michael@0 1575 if kind in ("all", "inet", "inet4", "tcp", "tcp4"):
michael@0 1576 assert cons != [], cons
michael@0 1577 else:
michael@0 1578 self.assertEqual(cons, [], cons)
michael@0 1579 # UDP v4
michael@0 1580 elif p.pid == udp4_proc.pid:
michael@0 1581 self.assertEqual(conn.family, socket.AF_INET)
michael@0 1582 self.assertEqual(conn.type, socket.SOCK_DGRAM)
michael@0 1583 self.assertEqual(conn.laddr[0], "127.0.0.1")
michael@0 1584 self.assertEqual(conn.raddr, ())
michael@0 1585 self.assertEqual(conn.status, psutil.CONN_NONE,
michael@0 1586 str(conn.status))
michael@0 1587 for kind in all_kinds:
michael@0 1588 cons = p.get_connections(kind=kind)
michael@0 1589 if kind in ("all", "inet", "inet4", "udp", "udp4"):
michael@0 1590 assert cons != [], cons
michael@0 1591 else:
michael@0 1592 self.assertEqual(cons, [], cons)
michael@0 1593 # TCP v6
michael@0 1594 elif p.pid == getattr(tcp6_proc, "pid", None):
michael@0 1595 self.assertEqual(conn.family, socket.AF_INET6)
michael@0 1596 self.assertEqual(conn.type, socket.SOCK_STREAM)
michael@0 1597 self.assertIn(conn.laddr[0], ("::", "::1"))
michael@0 1598 self.assertEqual(conn.raddr, ())
michael@0 1599 self.assertEqual(conn.status, psutil.CONN_LISTEN,
michael@0 1600 str(conn.status))
michael@0 1601 for kind in all_kinds:
michael@0 1602 cons = p.get_connections(kind=kind)
michael@0 1603 if kind in ("all", "inet", "inet6", "tcp", "tcp6"):
michael@0 1604 assert cons != [], cons
michael@0 1605 else:
michael@0 1606 self.assertEqual(cons, [], cons)
michael@0 1607 # UDP v6
michael@0 1608 elif p.pid == getattr(udp6_proc, "pid", None):
michael@0 1609 self.assertEqual(conn.family, socket.AF_INET6)
michael@0 1610 self.assertEqual(conn.type, socket.SOCK_DGRAM)
michael@0 1611 self.assertIn(conn.laddr[0], ("::", "::1"))
michael@0 1612 self.assertEqual(conn.raddr, ())
michael@0 1613 self.assertEqual(conn.status, psutil.CONN_NONE,
michael@0 1614 str(conn.status))
michael@0 1615 for kind in all_kinds:
michael@0 1616 cons = p.get_connections(kind=kind)
michael@0 1617 if kind in ("all", "inet", "inet6", "udp", "udp6"):
michael@0 1618 assert cons != [], cons
michael@0 1619 else:
michael@0 1620 self.assertEqual(cons, [], cons)
michael@0 1621
michael@0 1622 @unittest.skipUnless(POSIX, 'posix only')
michael@0 1623 def test_get_num_fds(self):
michael@0 1624 p = psutil.Process(os.getpid())
michael@0 1625 start = p.get_num_fds()
michael@0 1626 file = open(TESTFN, 'w')
michael@0 1627 self.assertEqual(p.get_num_fds(), start + 1)
michael@0 1628 sock = socket.socket()
michael@0 1629 self.assertEqual(p.get_num_fds(), start + 2)
michael@0 1630 file.close()
michael@0 1631 sock.close()
michael@0 1632 self.assertEqual(p.get_num_fds(), start)
michael@0 1633
michael@0 1634 @skip_on_not_implemented(only_if=LINUX)
michael@0 1635 def test_get_num_ctx_switches(self):
michael@0 1636 p = psutil.Process(os.getpid())
michael@0 1637 before = sum(p.get_num_ctx_switches())
michael@0 1638 for x in range(500000):
michael@0 1639 after = sum(p.get_num_ctx_switches())
michael@0 1640 if after > before:
michael@0 1641 return
michael@0 1642 self.fail("num ctx switches still the same after 50.000 iterations")
michael@0 1643
michael@0 1644 def test_parent_ppid(self):
michael@0 1645 this_parent = os.getpid()
michael@0 1646 sproc = get_test_subprocess()
michael@0 1647 p = psutil.Process(sproc.pid)
michael@0 1648 self.assertEqual(p.ppid, this_parent)
michael@0 1649 self.assertEqual(p.parent.pid, this_parent)
michael@0 1650 # no other process is supposed to have us as parent
michael@0 1651 for p in psutil.process_iter():
michael@0 1652 if p.pid == sproc.pid:
michael@0 1653 continue
michael@0 1654 self.assertTrue(p.ppid != this_parent)
michael@0 1655
michael@0 1656 def test_get_children(self):
michael@0 1657 p = psutil.Process(os.getpid())
michael@0 1658 self.assertEqual(p.get_children(), [])
michael@0 1659 self.assertEqual(p.get_children(recursive=True), [])
michael@0 1660 sproc = get_test_subprocess()
michael@0 1661 children1 = p.get_children()
michael@0 1662 children2 = p.get_children(recursive=True)
michael@0 1663 for children in (children1, children2):
michael@0 1664 self.assertEqual(len(children), 1)
michael@0 1665 self.assertEqual(children[0].pid, sproc.pid)
michael@0 1666 self.assertEqual(children[0].ppid, os.getpid())
michael@0 1667
michael@0 1668 def test_get_children_recursive(self):
michael@0 1669 # here we create a subprocess which creates another one as in:
michael@0 1670 # A (parent) -> B (child) -> C (grandchild)
michael@0 1671 s = "import subprocess, os, sys, time;"
michael@0 1672 s += "PYTHON = os.path.realpath(sys.executable);"
michael@0 1673 s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];"
michael@0 1674 s += "subprocess.Popen(cmd);"
michael@0 1675 s += "time.sleep(2);"
michael@0 1676 get_test_subprocess(cmd=[PYTHON, "-c", s])
michael@0 1677 p = psutil.Process(os.getpid())
michael@0 1678 self.assertEqual(len(p.get_children(recursive=False)), 1)
michael@0 1679 # give the grandchild some time to start
michael@0 1680 stop_at = time.time() + 1.5
michael@0 1681 while time.time() < stop_at:
michael@0 1682 children = p.get_children(recursive=True)
michael@0 1683 if len(children) > 1:
michael@0 1684 break
michael@0 1685 self.assertEqual(len(children), 2)
michael@0 1686 self.assertEqual(children[0].ppid, os.getpid())
michael@0 1687 self.assertEqual(children[1].ppid, children[0].pid)
michael@0 1688
michael@0 1689 def test_get_children_duplicates(self):
michael@0 1690 # find the process which has the highest number of children
michael@0 1691 from psutil._compat import defaultdict
michael@0 1692 table = defaultdict(int)
michael@0 1693 for p in psutil.process_iter():
michael@0 1694 try:
michael@0 1695 table[p.ppid] += 1
michael@0 1696 except psutil.Error:
michael@0 1697 pass
michael@0 1698 # this is the one, now let's make sure there are no duplicates
michael@0 1699 pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
michael@0 1700 p = psutil.Process(pid)
michael@0 1701 try:
michael@0 1702 c = p.get_children(recursive=True)
michael@0 1703 except psutil.AccessDenied: # windows
michael@0 1704 pass
michael@0 1705 else:
michael@0 1706 self.assertEqual(len(c), len(set(c)))
michael@0 1707
michael@0 1708 def test_suspend_resume(self):
michael@0 1709 sproc = get_test_subprocess(wait=True)
michael@0 1710 p = psutil.Process(sproc.pid)
michael@0 1711 p.suspend()
michael@0 1712 for x in range(100):
michael@0 1713 if p.status == psutil.STATUS_STOPPED:
michael@0 1714 break
michael@0 1715 time.sleep(0.01)
michael@0 1716 self.assertEqual(str(p.status), "stopped")
michael@0 1717 p.resume()
michael@0 1718 assert p.status != psutil.STATUS_STOPPED, p.status
michael@0 1719
michael@0 1720 def test_invalid_pid(self):
michael@0 1721 self.assertRaises(TypeError, psutil.Process, "1")
michael@0 1722 self.assertRaises(TypeError, psutil.Process, None)
michael@0 1723 self.assertRaises(ValueError, psutil.Process, -1)
michael@0 1724
michael@0 1725 def test_as_dict(self):
michael@0 1726 sproc = get_test_subprocess()
michael@0 1727 p = psutil.Process(sproc.pid)
michael@0 1728 d = p.as_dict()
michael@0 1729 try:
michael@0 1730 import json
michael@0 1731 except ImportError:
michael@0 1732 pass
michael@0 1733 else:
michael@0 1734 # dict is supposed to be hashable
michael@0 1735 json.dumps(d)
michael@0 1736 #
michael@0 1737 d = p.as_dict(attrs=['exe', 'name'])
michael@0 1738 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
michael@0 1739 #
michael@0 1740 p = psutil.Process(min(psutil.get_pid_list()))
michael@0 1741 d = p.as_dict(attrs=['get_connections'], ad_value='foo')
michael@0 1742 if not isinstance(d['connections'], list):
michael@0 1743 self.assertEqual(d['connections'], 'foo')
michael@0 1744
michael@0 1745 def test_zombie_process(self):
michael@0 1746 # Test that NoSuchProcess exception gets raised in case the
michael@0 1747 # process dies after we create the Process object.
michael@0 1748 # Example:
michael@0 1749 # >>> proc = Process(1234)
michael@0 1750 # >>> time.sleep(2) # time-consuming task, process dies in meantime
michael@0 1751 # >>> proc.name
michael@0 1752 # Refers to Issue #15
michael@0 1753 sproc = get_test_subprocess()
michael@0 1754 p = psutil.Process(sproc.pid)
michael@0 1755 p.kill()
michael@0 1756 p.wait()
michael@0 1757
michael@0 1758 for name in dir(p):
michael@0 1759 if name.startswith('_')\
michael@0 1760 or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
michael@0 1761 'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
michael@0 1762 'nice'):
michael@0 1763 continue
michael@0 1764 try:
michael@0 1765 meth = getattr(p, name)
michael@0 1766 if callable(meth):
michael@0 1767 meth()
michael@0 1768 except psutil.NoSuchProcess:
michael@0 1769 pass
michael@0 1770 except NotImplementedError:
michael@0 1771 pass
michael@0 1772 else:
michael@0 1773 self.fail("NoSuchProcess exception not raised for %r" % name)
michael@0 1774
michael@0 1775 # other methods
michael@0 1776 try:
michael@0 1777 if os.name == 'posix':
michael@0 1778 p.set_nice(1)
michael@0 1779 else:
michael@0 1780 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
michael@0 1781 except psutil.NoSuchProcess:
michael@0 1782 pass
michael@0 1783 else:
michael@0 1784 self.fail("exception not raised")
michael@0 1785 if hasattr(p, 'set_ionice'):
michael@0 1786 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
michael@0 1787 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
michael@0 1788 self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0)
michael@0 1789 self.assertFalse(p.is_running())
michael@0 1790 if hasattr(p, "set_cpu_affinity"):
michael@0 1791 self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0])
michael@0 1792
michael@0 1793 def test__str__(self):
michael@0 1794 sproc = get_test_subprocess()
michael@0 1795 p = psutil.Process(sproc.pid)
michael@0 1796 self.assertIn(str(sproc.pid), str(p))
michael@0 1797 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
michael@0 1798 if not OSX:
michael@0 1799 self.assertIn(os.path.basename(PYTHON), str(p))
michael@0 1800 sproc = get_test_subprocess()
michael@0 1801 p = psutil.Process(sproc.pid)
michael@0 1802 p.kill()
michael@0 1803 p.wait()
michael@0 1804 self.assertIn(str(sproc.pid), str(p))
michael@0 1805 self.assertIn("terminated", str(p))
michael@0 1806
michael@0 1807 @unittest.skipIf(LINUX, 'PID 0 not available on Linux')
michael@0 1808 def test_pid_0(self):
michael@0 1809 # Process(0) is supposed to work on all platforms except Linux
michael@0 1810 p = psutil.Process(0)
michael@0 1811 self.assertTrue(p.name)
michael@0 1812
michael@0 1813 if os.name == 'posix':
michael@0 1814 try:
michael@0 1815 self.assertEqual(p.uids.real, 0)
michael@0 1816 self.assertEqual(p.gids.real, 0)
michael@0 1817 except psutil.AccessDenied:
michael@0 1818 pass
michael@0 1819
michael@0 1820 self.assertIn(p.ppid, (0, 1))
michael@0 1821 #self.assertEqual(p.exe, "")
michael@0 1822 p.cmdline
michael@0 1823 try:
michael@0 1824 p.get_num_threads()
michael@0 1825 except psutil.AccessDenied:
michael@0 1826 pass
michael@0 1827
michael@0 1828 try:
michael@0 1829 p.get_memory_info()
michael@0 1830 except psutil.AccessDenied:
michael@0 1831 pass
michael@0 1832
michael@0 1833 # username property
michael@0 1834 try:
michael@0 1835 if POSIX:
michael@0 1836 self.assertEqual(p.username, 'root')
michael@0 1837 elif WINDOWS:
michael@0 1838 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
michael@0 1839 else:
michael@0 1840 p.username
michael@0 1841 except psutil.AccessDenied:
michael@0 1842 pass
michael@0 1843
michael@0 1844 self.assertIn(0, psutil.get_pid_list())
michael@0 1845 self.assertTrue(psutil.pid_exists(0))
michael@0 1846
michael@0 1847 def test__all__(self):
michael@0 1848 for name in dir(psutil):
michael@0 1849 if name in ('callable', 'defaultdict', 'error', 'namedtuple',
michael@0 1850 'test'):
michael@0 1851 continue
michael@0 1852 if not name.startswith('_'):
michael@0 1853 try:
michael@0 1854 __import__(name)
michael@0 1855 except ImportError:
michael@0 1856 if name not in psutil.__all__:
michael@0 1857 fun = getattr(psutil, name)
michael@0 1858 if fun is None:
michael@0 1859 continue
michael@0 1860 if 'deprecated' not in fun.__doc__.lower():
michael@0 1861 self.fail('%r not in psutil.__all__' % name)
michael@0 1862
michael@0 1863 def test_Popen(self):
michael@0 1864 # Popen class test
michael@0 1865 # XXX this test causes a ResourceWarning on Python 3 because
michael@0 1866 # psutil.__subproc instance doesn't get propertly freed.
michael@0 1867 # Not sure what to do though.
michael@0 1868 cmd = [PYTHON, "-c", "import time; time.sleep(2);"]
michael@0 1869 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
michael@0 1870 try:
michael@0 1871 proc.name
michael@0 1872 proc.stdin
michael@0 1873 self.assertTrue(hasattr(proc, 'name'))
michael@0 1874 self.assertTrue(hasattr(proc, 'stdin'))
michael@0 1875 self.assertRaises(AttributeError, getattr, proc, 'foo')
michael@0 1876 finally:
michael@0 1877 proc.kill()
michael@0 1878 proc.wait()
michael@0 1879
michael@0 1880
michael@0 1881 # ===================================================================
michael@0 1882 # --- Featch all processes test
michael@0 1883 # ===================================================================
michael@0 1884
michael@0 1885 class TestFetchAllProcesses(unittest.TestCase):
michael@0 1886 # Iterates over all running processes and performs some sanity
michael@0 1887 # checks against Process API's returned values.
michael@0 1888
michael@0 1889 def setUp(self):
michael@0 1890 if POSIX:
michael@0 1891 import pwd
michael@0 1892 pall = pwd.getpwall()
michael@0 1893 self._uids = set([x.pw_uid for x in pall])
michael@0 1894 self._usernames = set([x.pw_name for x in pall])
michael@0 1895
michael@0 1896 def test_fetch_all(self):
michael@0 1897 valid_procs = 0
michael@0 1898 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
michael@0 1899 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
michael@0 1900 'parent', 'get_children', 'pid']
michael@0 1901 attrs = []
michael@0 1902 for name in dir(psutil.Process):
michael@0 1903 if name.startswith("_"):
michael@0 1904 continue
michael@0 1905 if name.startswith("set_"):
michael@0 1906 continue
michael@0 1907 if name in excluded_names:
michael@0 1908 continue
michael@0 1909 attrs.append(name)
michael@0 1910
michael@0 1911 default = object()
michael@0 1912 failures = []
michael@0 1913 for name in attrs:
michael@0 1914 for p in psutil.process_iter():
michael@0 1915 ret = default
michael@0 1916 try:
michael@0 1917 try:
michael@0 1918 attr = getattr(p, name, None)
michael@0 1919 if attr is not None and callable(attr):
michael@0 1920 ret = attr()
michael@0 1921 else:
michael@0 1922 ret = attr
michael@0 1923 valid_procs += 1
michael@0 1924 except NotImplementedError:
michael@0 1925 register_warning("%r was skipped because not "
michael@0 1926 "implemented" % (self.__class__.__name__ + \
michael@0 1927 '.test_' + name))
michael@0 1928 except (psutil.NoSuchProcess, psutil.AccessDenied):
michael@0 1929 err = sys.exc_info()[1]
michael@0 1930 if isinstance(err, psutil.NoSuchProcess):
michael@0 1931 if psutil.pid_exists(p.pid):
michael@0 1932 # XXX race condition; we probably need
michael@0 1933 # to try figuring out the process
michael@0 1934 # identity before failing
michael@0 1935 self.fail("PID still exists but fun raised " \
michael@0 1936 "NoSuchProcess")
michael@0 1937 self.assertEqual(err.pid, p.pid)
michael@0 1938 if err.name:
michael@0 1939 # make sure exception's name attr is set
michael@0 1940 # with the actual process name
michael@0 1941 self.assertEqual(err.name, p.name)
michael@0 1942 self.assertTrue(str(err))
michael@0 1943 self.assertTrue(err.msg)
michael@0 1944 else:
michael@0 1945 if ret not in (0, 0.0, [], None, ''):
michael@0 1946 assert ret, ret
michael@0 1947 meth = getattr(self, name)
michael@0 1948 meth(ret)
michael@0 1949 except Exception:
michael@0 1950 err = sys.exc_info()[1]
michael@0 1951 s = '\n' + '=' * 70 + '\n'
michael@0 1952 s += "FAIL: test_%s (proc=%s" % (name, p)
michael@0 1953 if ret != default:
michael@0 1954 s += ", ret=%s)" % repr(ret)
michael@0 1955 s += ')\n'
michael@0 1956 s += '-' * 70
michael@0 1957 s += "\n%s" % traceback.format_exc()
michael@0 1958 s = "\n".join((" " * 4) + i for i in s.splitlines())
michael@0 1959 failures.append(s)
michael@0 1960 break
michael@0 1961
michael@0 1962 if failures:
michael@0 1963 self.fail(''.join(failures))
michael@0 1964
michael@0 1965 # we should always have a non-empty list, not including PID 0 etc.
michael@0 1966 # special cases.
michael@0 1967 self.assertTrue(valid_procs > 0)
michael@0 1968
michael@0 1969 def cmdline(self, ret):
michael@0 1970 pass
michael@0 1971
michael@0 1972 def exe(self, ret):
michael@0 1973 if not ret:
michael@0 1974 self.assertEqual(ret, '')
michael@0 1975 else:
michael@0 1976 assert os.path.isabs(ret), ret
michael@0 1977 # Note: os.stat() may return False even if the file is there
michael@0 1978 # hence we skip the test, see:
michael@0 1979 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
michael@0 1980 if POSIX:
michael@0 1981 assert os.path.isfile(ret), ret
michael@0 1982 if hasattr(os, 'access') and hasattr(os, "X_OK"):
michael@0 1983 # XXX may fail on OSX
michael@0 1984 self.assertTrue(os.access(ret, os.X_OK))
michael@0 1985
michael@0 1986 def ppid(self, ret):
michael@0 1987 self.assertTrue(ret >= 0)
michael@0 1988
michael@0 1989 def name(self, ret):
michael@0 1990 self.assertTrue(isinstance(ret, str))
michael@0 1991 self.assertTrue(ret)
michael@0 1992
michael@0 1993 def create_time(self, ret):
michael@0 1994 self.assertTrue(ret > 0)
michael@0 1995 # this can't be taken for granted on all platforms
michael@0 1996 #self.assertGreaterEqual(ret, psutil.BOOT_TIME)
michael@0 1997 # make sure returned value can be pretty printed
michael@0 1998 # with strftime
michael@0 1999 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
michael@0 2000
michael@0 2001 def uids(self, ret):
michael@0 2002 for uid in ret:
michael@0 2003 self.assertTrue(uid >= 0)
michael@0 2004 self.assertIn(uid, self._uids)
michael@0 2005
michael@0 2006 def gids(self, ret):
michael@0 2007 # note: testing all gids as above seems not to be reliable for
michael@0 2008 # gid == 30 (nodoby); not sure why.
michael@0 2009 for gid in ret:
michael@0 2010 self.assertTrue(gid >= 0)
michael@0 2011 #self.assertIn(uid, self.gids)
michael@0 2012
michael@0 2013 def username(self, ret):
michael@0 2014 self.assertTrue(ret)
michael@0 2015 if os.name == 'posix':
michael@0 2016 self.assertIn(ret, self._usernames)
michael@0 2017
michael@0 2018 def status(self, ret):
michael@0 2019 self.assertTrue(ret >= 0)
michael@0 2020 self.assertTrue(str(ret) != '?')
michael@0 2021
michael@0 2022 def get_io_counters(self, ret):
michael@0 2023 for field in ret:
michael@0 2024 if field != -1:
michael@0 2025 self.assertTrue(field >= 0)
michael@0 2026
michael@0 2027 def get_ionice(self, ret):
michael@0 2028 if LINUX:
michael@0 2029 self.assertTrue(ret.ioclass >= 0)
michael@0 2030 self.assertTrue(ret.value >= 0)
michael@0 2031 else:
michael@0 2032 self.assertTrue(ret >= 0)
michael@0 2033 self.assertIn(ret, (0, 1, 2))
michael@0 2034
michael@0 2035 def get_num_threads(self, ret):
michael@0 2036 self.assertTrue(ret >= 1)
michael@0 2037
michael@0 2038 def get_threads(self, ret):
michael@0 2039 for t in ret:
michael@0 2040 self.assertTrue(t.id >= 0)
michael@0 2041 self.assertTrue(t.user_time >= 0)
michael@0 2042 self.assertTrue(t.system_time >= 0)
michael@0 2043
michael@0 2044 def get_cpu_times(self, ret):
michael@0 2045 self.assertTrue(ret.user >= 0)
michael@0 2046 self.assertTrue(ret.system >= 0)
michael@0 2047
michael@0 2048 def get_memory_info(self, ret):
michael@0 2049 self.assertTrue(ret.rss >= 0)
michael@0 2050 self.assertTrue(ret.vms >= 0)
michael@0 2051
michael@0 2052 def get_ext_memory_info(self, ret):
michael@0 2053 for name in ret._fields:
michael@0 2054 self.assertTrue(getattr(ret, name) >= 0)
michael@0 2055 if POSIX and ret.vms != 0:
michael@0 2056 # VMS is always supposed to be the highest
michael@0 2057 for name in ret._fields:
michael@0 2058 if name != 'vms':
michael@0 2059 value = getattr(ret, name)
michael@0 2060 assert ret.vms > value, ret
michael@0 2061 elif WINDOWS:
michael@0 2062 assert ret.peak_wset >= ret.wset, ret
michael@0 2063 assert ret.peak_paged_pool >= ret.paged_pool, ret
michael@0 2064 assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
michael@0 2065 assert ret.peak_pagefile >= ret.pagefile, ret
michael@0 2066
michael@0 2067 def get_open_files(self, ret):
michael@0 2068 for f in ret:
michael@0 2069 if WINDOWS:
michael@0 2070 assert f.fd == -1, f
michael@0 2071 else:
michael@0 2072 self.assertIsInstance(f.fd, int)
michael@0 2073 assert os.path.isabs(f.path), f
michael@0 2074 assert os.path.isfile(f.path), f
michael@0 2075
michael@0 2076 def get_num_fds(self, ret):
michael@0 2077 self.assertTrue(ret >= 0)
michael@0 2078
michael@0 2079 def get_connections(self, ret):
michael@0 2080 # all values are supposed to match Linux's tcp_states.h states
michael@0 2081 # table across all platforms.
michael@0 2082 valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
michael@0 2083 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
michael@0 2084 "LAST_ACK", "LISTEN", "CLOSING", "NONE"]
michael@0 2085 if SUNOS:
michael@0 2086 valid_conn_states += ["IDLE", "BOUND"]
michael@0 2087 if WINDOWS:
michael@0 2088 valid_conn_states += ["DELETE_TCB"]
michael@0 2089 for conn in ret:
michael@0 2090 self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM))
michael@0 2091 self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6))
michael@0 2092 check_ip_address(conn.laddr, conn.family)
michael@0 2093 check_ip_address(conn.raddr, conn.family)
michael@0 2094 if conn.status not in valid_conn_states:
michael@0 2095 self.fail("%s is not a valid status" % conn.status)
michael@0 2096 # actually try to bind the local socket; ignore IPv6
michael@0 2097 # sockets as their address might be represented as
michael@0 2098 # an IPv4-mapped-address (e.g. "::127.0.0.1")
michael@0 2099 # and that's rejected by bind()
michael@0 2100 if conn.family == socket.AF_INET:
michael@0 2101 s = socket.socket(conn.family, conn.type)
michael@0 2102 s.bind((conn.laddr[0], 0))
michael@0 2103 s.close()
michael@0 2104
michael@0 2105 if not WINDOWS and hasattr(socket, 'fromfd'):
michael@0 2106 dupsock = None
michael@0 2107 try:
michael@0 2108 try:
michael@0 2109 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
michael@0 2110 except (socket.error, OSError):
michael@0 2111 err = sys.exc_info()[1]
michael@0 2112 if err.args[0] == errno.EBADF:
michael@0 2113 continue
michael@0 2114 raise
michael@0 2115 # python >= 2.5
michael@0 2116 if hasattr(dupsock, "family"):
michael@0 2117 self.assertEqual(dupsock.family, conn.family)
michael@0 2118 self.assertEqual(dupsock.type, conn.type)
michael@0 2119 finally:
michael@0 2120 if dupsock is not None:
michael@0 2121 dupsock.close()
michael@0 2122
michael@0 2123 def getcwd(self, ret):
michael@0 2124 if ret is not None: # BSD may return None
michael@0 2125 assert os.path.isabs(ret), ret
michael@0 2126 try:
michael@0 2127 st = os.stat(ret)
michael@0 2128 except OSError:
michael@0 2129 err = sys.exc_info()[1]
michael@0 2130 # directory has been removed in mean time
michael@0 2131 if err.errno != errno.ENOENT:
michael@0 2132 raise
michael@0 2133 else:
michael@0 2134 self.assertTrue(stat.S_ISDIR(st.st_mode))
michael@0 2135
michael@0 2136 def get_memory_percent(self, ret):
michael@0 2137 assert 0 <= ret <= 100, ret
michael@0 2138
michael@0 2139 def is_running(self, ret):
michael@0 2140 self.assertTrue(ret)
michael@0 2141
michael@0 2142 def get_cpu_affinity(self, ret):
michael@0 2143 assert ret != [], ret
michael@0 2144
michael@0 2145 def terminal(self, ret):
michael@0 2146 if ret is not None:
michael@0 2147 assert os.path.isabs(ret), ret
michael@0 2148 assert os.path.exists(ret), ret
michael@0 2149
michael@0 2150 def get_memory_maps(self, ret):
michael@0 2151 for nt in ret:
michael@0 2152 for fname in nt._fields:
michael@0 2153 value = getattr(nt, fname)
michael@0 2154 if fname == 'path':
michael@0 2155 if not value.startswith('['):
michael@0 2156 assert os.path.isabs(nt.path), nt.path
michael@0 2157 # commented as on Linux we might get '/foo/bar (deleted)'
michael@0 2158 #assert os.path.exists(nt.path), nt.path
michael@0 2159 elif fname in ('addr', 'perms'):
michael@0 2160 self.assertTrue(value)
michael@0 2161 else:
michael@0 2162 self.assertIsInstance(value, (int, long))
michael@0 2163 assert value >= 0, value
michael@0 2164
michael@0 2165 def get_num_handles(self, ret):
michael@0 2166 if WINDOWS:
michael@0 2167 self.assertGreaterEqual(ret, 0)
michael@0 2168 else:
michael@0 2169 self.assertGreaterEqual(ret, 0)
michael@0 2170
michael@0 2171 def get_nice(self, ret):
michael@0 2172 if POSIX:
michael@0 2173 assert -20 <= ret <= 20, ret
michael@0 2174 else:
michael@0 2175 priorities = [getattr(psutil, x) for x in dir(psutil)
michael@0 2176 if x.endswith('_PRIORITY_CLASS')]
michael@0 2177 self.assertIn(ret, priorities)
michael@0 2178
michael@0 2179 def get_num_ctx_switches(self, ret):
michael@0 2180 self.assertTrue(ret.voluntary >= 0)
michael@0 2181 self.assertTrue(ret.involuntary >= 0)
michael@0 2182
michael@0 2183
michael@0 2184 # ===================================================================
michael@0 2185 # --- Limited user tests
michael@0 2186 # ===================================================================
michael@0 2187
michael@0 2188 if hasattr(os, 'getuid') and os.getuid() == 0:
michael@0 2189 class LimitedUserTestCase(TestProcess):
michael@0 2190 """Repeat the previous tests by using a limited user.
michael@0 2191 Executed only on UNIX and only if the user who run the test script
michael@0 2192 is root.
michael@0 2193 """
michael@0 2194 # the uid/gid the test suite runs under
michael@0 2195 PROCESS_UID = os.getuid()
michael@0 2196 PROCESS_GID = os.getgid()
michael@0 2197
michael@0 2198 def __init__(self, *args, **kwargs):
michael@0 2199 TestProcess.__init__(self, *args, **kwargs)
michael@0 2200 # re-define all existent test methods in order to
michael@0 2201 # ignore AccessDenied exceptions
michael@0 2202 for attr in [x for x in dir(self) if x.startswith('test')]:
michael@0 2203 meth = getattr(self, attr)
michael@0 2204 def test_(self):
michael@0 2205 try:
michael@0 2206 meth()
michael@0 2207 except psutil.AccessDenied:
michael@0 2208 pass
michael@0 2209 setattr(self, attr, types.MethodType(test_, self))
michael@0 2210
michael@0 2211 def setUp(self):
michael@0 2212 os.setegid(1000)
michael@0 2213 os.seteuid(1000)
michael@0 2214 TestProcess.setUp(self)
michael@0 2215
michael@0 2216 def tearDown(self):
michael@0 2217 os.setegid(self.PROCESS_UID)
michael@0 2218 os.seteuid(self.PROCESS_GID)
michael@0 2219 TestProcess.tearDown(self)
michael@0 2220
michael@0 2221 def test_nice(self):
michael@0 2222 try:
michael@0 2223 psutil.Process(os.getpid()).set_nice(-1)
michael@0 2224 except psutil.AccessDenied:
michael@0 2225 pass
michael@0 2226 else:
michael@0 2227 self.fail("exception not raised")
michael@0 2228
michael@0 2229
michael@0 2230 # ===================================================================
michael@0 2231 # --- Example script tests
michael@0 2232 # ===================================================================
michael@0 2233
michael@0 2234 class TestExampleScripts(unittest.TestCase):
michael@0 2235 """Tests for scripts in the examples directory."""
michael@0 2236
michael@0 2237 def assert_stdout(self, exe, args=None):
michael@0 2238 exe = os.path.join(EXAMPLES_DIR, exe)
michael@0 2239 if args:
michael@0 2240 exe = exe + ' ' + args
michael@0 2241 try:
michael@0 2242 out = sh(sys.executable + ' ' + exe).strip()
michael@0 2243 except RuntimeError:
michael@0 2244 err = sys.exc_info()[1]
michael@0 2245 if 'AccessDenied' in str(err):
michael@0 2246 return str(err)
michael@0 2247 else:
michael@0 2248 raise
michael@0 2249 assert out, out
michael@0 2250 return out
michael@0 2251
michael@0 2252 def assert_syntax(self, exe, args=None):
michael@0 2253 exe = os.path.join(EXAMPLES_DIR, exe)
michael@0 2254 f = open(exe, 'r')
michael@0 2255 try:
michael@0 2256 src = f.read()
michael@0 2257 finally:
michael@0 2258 f.close()
michael@0 2259 ast.parse(src)
michael@0 2260
michael@0 2261 def test_check_presence(self):
michael@0 2262 # make sure all example scripts have a test method defined
michael@0 2263 meths = dir(self)
michael@0 2264 for name in os.listdir(EXAMPLES_DIR):
michael@0 2265 if name.endswith('.py'):
michael@0 2266 if 'test_' + os.path.splitext(name)[0] not in meths:
michael@0 2267 #self.assert_stdout(name)
michael@0 2268 self.fail('no test defined for %r script' \
michael@0 2269 % os.path.join(EXAMPLES_DIR, name))
michael@0 2270
michael@0 2271 def test_disk_usage(self):
michael@0 2272 self.assert_stdout('disk_usage.py')
michael@0 2273
michael@0 2274 def test_free(self):
michael@0 2275 self.assert_stdout('free.py')
michael@0 2276
michael@0 2277 def test_meminfo(self):
michael@0 2278 self.assert_stdout('meminfo.py')
michael@0 2279
michael@0 2280 def test_process_detail(self):
michael@0 2281 self.assert_stdout('process_detail.py')
michael@0 2282
michael@0 2283 def test_who(self):
michael@0 2284 self.assert_stdout('who.py')
michael@0 2285
michael@0 2286 def test_netstat(self):
michael@0 2287 self.assert_stdout('netstat.py')
michael@0 2288
michael@0 2289 def test_pmap(self):
michael@0 2290 self.assert_stdout('pmap.py', args=str(os.getpid()))
michael@0 2291
michael@0 2292 @unittest.skipIf(ast is None,
michael@0 2293 'ast module not available on this python version')
michael@0 2294 def test_killall(self):
michael@0 2295 self.assert_syntax('killall.py')
michael@0 2296
michael@0 2297 @unittest.skipIf(ast is None,
michael@0 2298 'ast module not available on this python version')
michael@0 2299 def test_nettop(self):
michael@0 2300 self.assert_syntax('nettop.py')
michael@0 2301
michael@0 2302 @unittest.skipIf(ast is None,
michael@0 2303 'ast module not available on this python version')
michael@0 2304 def test_top(self):
michael@0 2305 self.assert_syntax('top.py')
michael@0 2306
michael@0 2307 @unittest.skipIf(ast is None,
michael@0 2308 'ast module not available on this python version')
michael@0 2309 def test_iotop(self):
michael@0 2310 self.assert_syntax('iotop.py')
michael@0 2311
michael@0 2312
michael@0 2313 def cleanup():
michael@0 2314 reap_children(search_all=True)
michael@0 2315 DEVNULL.close()
michael@0 2316 safe_remove(TESTFN)
michael@0 2317
michael@0 2318 atexit.register(cleanup)
michael@0 2319 safe_remove(TESTFN)
michael@0 2320
michael@0 2321 def test_main():
michael@0 2322 tests = []
michael@0 2323 test_suite = unittest.TestSuite()
michael@0 2324 tests.append(TestSystemAPIs)
michael@0 2325 tests.append(TestProcess)
michael@0 2326 tests.append(TestFetchAllProcesses)
michael@0 2327
michael@0 2328 if POSIX:
michael@0 2329 from _posix import PosixSpecificTestCase
michael@0 2330 tests.append(PosixSpecificTestCase)
michael@0 2331
michael@0 2332 # import the specific platform test suite
michael@0 2333 if LINUX:
michael@0 2334 from _linux import LinuxSpecificTestCase as stc
michael@0 2335 elif WINDOWS:
michael@0 2336 from _windows import WindowsSpecificTestCase as stc
michael@0 2337 from _windows import TestDualProcessImplementation
michael@0 2338 tests.append(TestDualProcessImplementation)
michael@0 2339 elif OSX:
michael@0 2340 from _osx import OSXSpecificTestCase as stc
michael@0 2341 elif BSD:
michael@0 2342 from _bsd import BSDSpecificTestCase as stc
michael@0 2343 elif SUNOS:
michael@0 2344 from _sunos import SunOSSpecificTestCase as stc
michael@0 2345 tests.append(stc)
michael@0 2346
michael@0 2347 if hasattr(os, 'getuid'):
michael@0 2348 if 'LimitedUserTestCase' in globals():
michael@0 2349 tests.append(LimitedUserTestCase)
michael@0 2350 else:
michael@0 2351 register_warning("LimitedUserTestCase was skipped (super-user "
michael@0 2352 "privileges are required)")
michael@0 2353
michael@0 2354 tests.append(TestExampleScripts)
michael@0 2355
michael@0 2356 for test_class in tests:
michael@0 2357 test_suite.addTest(unittest.makeSuite(test_class))
michael@0 2358 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
michael@0 2359 return result.wasSuccessful()
michael@0 2360
michael@0 2361 if __name__ == '__main__':
michael@0 2362 if not test_main():
michael@0 2363 sys.exit(1)

mercurial