michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """ michael@0: psutil test suite (you can quickly run it with "python setup.py test"). michael@0: michael@0: Note: this is targeted for both python 2.x and 3.x so there's no need michael@0: to use 2to3 tool first. michael@0: michael@0: If you're on Python < 2.7 it is recommended to install unittest2 module michael@0: from: https://pypi.python.org/pypi/unittest2 michael@0: """ michael@0: michael@0: from __future__ import division michael@0: import os michael@0: import sys michael@0: import subprocess michael@0: import time michael@0: import signal michael@0: import types michael@0: import traceback michael@0: import socket michael@0: import warnings michael@0: import atexit michael@0: import errno michael@0: import threading michael@0: import tempfile michael@0: import stat michael@0: import collections michael@0: import datetime michael@0: try: michael@0: import unittest2 as unittest # pyhon < 2.7 + unittest2 installed michael@0: except ImportError: michael@0: import unittest michael@0: try: michael@0: import ast # python >= 2.6 michael@0: except ImportError: michael@0: ast = None michael@0: michael@0: import psutil michael@0: from psutil._compat import PY3, callable, long, wraps michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Constants michael@0: # =================================================================== michael@0: michael@0: # conf for retry_before_failing() decorator michael@0: NO_RETRIES = 10 michael@0: # bytes tolerance for OS memory related tests michael@0: TOLERANCE = 500 * 1024 # 500KB michael@0: michael@0: PYTHON = os.path.realpath(sys.executable) michael@0: DEVNULL = open(os.devnull, 'r+') michael@0: TESTFN = os.path.join(os.getcwd(), "$testfile") michael@0: EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname( michael@0: os.path.dirname(__file__)), 'examples')) michael@0: POSIX = os.name == 'posix' michael@0: LINUX = sys.platform.startswith("linux") michael@0: WINDOWS = sys.platform.startswith("win32") michael@0: OSX = sys.platform.startswith("darwin") michael@0: BSD = sys.platform.startswith("freebsd") michael@0: SUNOS = sys.platform.startswith("sunos") michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Utility functions michael@0: # =================================================================== michael@0: michael@0: _subprocesses_started = set() michael@0: michael@0: def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL, michael@0: wait=False): michael@0: """Return a subprocess.Popen object to use in tests. michael@0: By default stdout and stderr are redirected to /dev/null and the michael@0: python interpreter is used as test process. michael@0: If 'wait' is True attemps to make sure the process is in a michael@0: reasonably initialized state. michael@0: """ michael@0: if cmd is None: michael@0: pyline = "" michael@0: if wait: michael@0: pyline += "open(r'%s', 'w'); " % TESTFN michael@0: pyline += "import time; time.sleep(2);" michael@0: cmd_ = [PYTHON, "-c", pyline] michael@0: else: michael@0: cmd_ = cmd michael@0: sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) michael@0: if wait: michael@0: if cmd is None: michael@0: stop_at = time.time() + 3 michael@0: while stop_at > time.time(): michael@0: if os.path.exists(TESTFN): michael@0: break michael@0: time.sleep(0.001) michael@0: else: michael@0: warn("couldn't make sure test file was actually created") michael@0: else: michael@0: wait_for_pid(sproc.pid) michael@0: _subprocesses_started.add(sproc.pid) michael@0: return sproc michael@0: michael@0: def warn(msg): michael@0: """Raise a warning msg.""" michael@0: warnings.warn(msg, UserWarning) michael@0: michael@0: def register_warning(msg): michael@0: """Register a warning which will be printed on interpreter exit.""" michael@0: atexit.register(lambda: warn(msg)) michael@0: michael@0: def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): michael@0: """run cmd in a subprocess and return its output. michael@0: raises RuntimeError on error. michael@0: """ michael@0: p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) michael@0: stdout, stderr = p.communicate() michael@0: if p.returncode != 0: michael@0: raise RuntimeError(stderr) michael@0: if stderr: michael@0: warn(stderr) michael@0: if PY3: michael@0: stdout = str(stdout, sys.stdout.encoding) michael@0: return stdout.strip() michael@0: michael@0: def which(program): michael@0: """Same as UNIX which command. Return None on command not found.""" michael@0: def is_exe(fpath): michael@0: return os.path.isfile(fpath) and os.access(fpath, os.X_OK) michael@0: michael@0: fpath, fname = os.path.split(program) michael@0: if fpath: michael@0: if is_exe(program): michael@0: return program michael@0: else: michael@0: for path in os.environ["PATH"].split(os.pathsep): michael@0: exe_file = os.path.join(path, program) michael@0: if is_exe(exe_file): michael@0: return exe_file michael@0: return None michael@0: michael@0: def wait_for_pid(pid, timeout=1): michael@0: """Wait for pid to show up in the process list then return. michael@0: Used in the test suite to give time the sub process to initialize. michael@0: """ michael@0: raise_at = time.time() + timeout michael@0: while 1: michael@0: if pid in psutil.get_pid_list(): michael@0: # give it one more iteration to allow full initialization michael@0: time.sleep(0.01) michael@0: return michael@0: time.sleep(0.0001) michael@0: if time.time() >= raise_at: michael@0: raise RuntimeError("Timed out") michael@0: michael@0: def reap_children(search_all=False): michael@0: """Kill any subprocess started by this test suite and ensure that michael@0: no zombies stick around to hog resources and create problems when michael@0: looking for refleaks. michael@0: """ michael@0: pids = _subprocesses_started michael@0: if search_all: michael@0: this_process = psutil.Process(os.getpid()) michael@0: for p in this_process.get_children(recursive=True): michael@0: pids.add(p.pid) michael@0: while pids: michael@0: pid = pids.pop() michael@0: try: michael@0: child = psutil.Process(pid) michael@0: child.kill() michael@0: except psutil.NoSuchProcess: michael@0: pass michael@0: except psutil.AccessDenied: michael@0: warn("couldn't kill child process with pid %s" % pid) michael@0: else: michael@0: child.wait(timeout=3) michael@0: michael@0: def check_ip_address(addr, family): michael@0: """Attempts to check IP address's validity.""" michael@0: if not addr: michael@0: return michael@0: ip, port = addr michael@0: assert isinstance(port, int), port michael@0: if family == socket.AF_INET: michael@0: ip = list(map(int, ip.split('.'))) michael@0: assert len(ip) == 4, ip michael@0: for num in ip: michael@0: assert 0 <= num <= 255, ip michael@0: assert 0 <= port <= 65535, port michael@0: michael@0: def safe_remove(fname): michael@0: """Deletes a file and does not exception if it doesn't exist.""" michael@0: try: michael@0: os.remove(fname) michael@0: except OSError: michael@0: err = sys.exc_info()[1] michael@0: if err.args[0] != errno.ENOENT: michael@0: raise michael@0: michael@0: def call_until(fun, expr, timeout=1): michael@0: """Keep calling function for timeout secs and exit if eval() michael@0: expression is True. michael@0: """ michael@0: stop_at = time.time() + timeout michael@0: while time.time() < stop_at: michael@0: ret = fun() michael@0: if eval(expr): michael@0: return ret michael@0: time.sleep(0.001) michael@0: raise RuntimeError('timed out (ret=%r)' % ret) michael@0: michael@0: def retry_before_failing(ntimes=None): michael@0: """Decorator which runs a test function and retries N times before michael@0: actually failing. michael@0: """ michael@0: def decorator(fun): michael@0: @wraps(fun) michael@0: def wrapper(*args, **kwargs): michael@0: for x in range(ntimes or NO_RETRIES): michael@0: try: michael@0: return fun(*args, **kwargs) michael@0: except AssertionError: michael@0: err = sys.exc_info()[1] michael@0: raise michael@0: return wrapper michael@0: return decorator michael@0: michael@0: def skip_on_access_denied(only_if=None): michael@0: """Decorator to Ignore AccessDenied exceptions.""" michael@0: def decorator(fun): michael@0: @wraps(fun) michael@0: def wrapper(*args, **kwargs): michael@0: try: michael@0: return fun(*args, **kwargs) michael@0: except psutil.AccessDenied: michael@0: if only_if is not None: michael@0: if not only_if: michael@0: raise michael@0: msg = "%r was skipped because it raised AccessDenied" \ michael@0: % fun.__name__ michael@0: self = args[0] michael@0: if hasattr(self, 'skip'): # python >= 2.7 michael@0: self.skip(msg) michael@0: else: michael@0: register_warning(msg) michael@0: return wrapper michael@0: return decorator michael@0: michael@0: def skip_on_not_implemented(only_if=None): michael@0: """Decorator to Ignore NotImplementedError exceptions.""" michael@0: def decorator(fun): michael@0: @wraps(fun) michael@0: def wrapper(*args, **kwargs): michael@0: try: michael@0: return fun(*args, **kwargs) michael@0: except NotImplementedError: michael@0: if only_if is not None: michael@0: if not only_if: michael@0: raise michael@0: msg = "%r was skipped because it raised NotImplementedError" \ michael@0: % fun.__name__ michael@0: self = args[0] michael@0: if hasattr(self, 'skip'): # python >= 2.7 michael@0: self.skip(msg) michael@0: else: michael@0: register_warning(msg) michael@0: return wrapper michael@0: return decorator michael@0: michael@0: def supports_ipv6(): michael@0: """Return True if IPv6 is supported on this platform.""" michael@0: if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): michael@0: return False michael@0: sock = None michael@0: try: michael@0: try: michael@0: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) michael@0: sock.bind(("::1", 0)) michael@0: except (socket.error, socket.gaierror): michael@0: return False michael@0: else: michael@0: return True michael@0: finally: michael@0: if sock is not None: michael@0: sock.close() michael@0: michael@0: michael@0: class ThreadTask(threading.Thread): michael@0: """A thread object used for running process thread tests.""" michael@0: michael@0: def __init__(self): michael@0: threading.Thread.__init__(self) michael@0: self._running = False michael@0: self._interval = None michael@0: self._flag = threading.Event() michael@0: michael@0: def __repr__(self): michael@0: name = self.__class__.__name__ michael@0: return '<%s running=%s at %#x>' % (name, self._running, id(self)) michael@0: michael@0: def start(self, interval=0.001): michael@0: """Start thread and keep it running until an explicit michael@0: stop() request. Polls for shutdown every 'timeout' seconds. michael@0: """ michael@0: if self._running: michael@0: raise ValueError("already started") michael@0: self._interval = interval michael@0: threading.Thread.start(self) michael@0: self._flag.wait() michael@0: michael@0: def run(self): michael@0: self._running = True michael@0: self._flag.set() michael@0: while self._running: michael@0: time.sleep(self._interval) michael@0: michael@0: def stop(self): michael@0: """Stop thread execution and and waits until it is stopped.""" michael@0: if not self._running: michael@0: raise ValueError("already stopped") michael@0: self._running = False michael@0: self.join() michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Support for python < 2.7 in case unittest2 is not installed michael@0: # =================================================================== michael@0: michael@0: if not hasattr(unittest, 'skip'): michael@0: register_warning("unittest2 module is not installed; a serie of pretty " \ michael@0: "darn ugly workarounds will be used") michael@0: michael@0: class SkipTest(Exception): michael@0: pass michael@0: michael@0: class TestCase(unittest.TestCase): michael@0: michael@0: def _safe_repr(self, obj): michael@0: MAX_LENGTH = 80 michael@0: try: michael@0: result = repr(obj) michael@0: except Exception: michael@0: result = object.__repr__(obj) michael@0: if len(result) < MAX_LENGTH: michael@0: return result michael@0: return result[:MAX_LENGTH] + ' [truncated]...' michael@0: michael@0: def _fail_w_msg(self, a, b, middle, msg): michael@0: self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle, michael@0: self._safe_repr(b))) michael@0: michael@0: def skip(self, msg): michael@0: raise SkipTest(msg) michael@0: michael@0: def assertIn(self, a, b, msg=None): michael@0: if a not in b: michael@0: self._fail_w_msg(a, b, 'not found in', msg) michael@0: michael@0: def assertNotIn(self, a, b, msg=None): michael@0: if a in b: michael@0: self._fail_w_msg(a, b, 'found in', msg) michael@0: michael@0: def assertGreater(self, a, b, msg=None): michael@0: if not a > b: michael@0: self._fail_w_msg(a, b, 'not greater than', msg) michael@0: michael@0: def assertGreaterEqual(self, a, b, msg=None): michael@0: if not a >= b: michael@0: self._fail_w_msg(a, b, 'not greater than or equal to', msg) michael@0: michael@0: def assertLess(self, a, b, msg=None): michael@0: if not a < b: michael@0: self._fail_w_msg(a, b, 'not less than', msg) michael@0: michael@0: def assertLessEqual(self, a, b, msg=None): michael@0: if not a <= b: michael@0: self._fail_w_msg(a, b, 'not less or equal to', msg) michael@0: michael@0: def assertIsInstance(self, a, b, msg=None): michael@0: if not isinstance(a, b): michael@0: self.fail(msg or '%s is not an instance of %r' \ michael@0: % (self._safe_repr(a), b)) michael@0: michael@0: def assertAlmostEqual(self, a, b, msg=None, delta=None): michael@0: if delta is not None: michael@0: if abs(a - b) <= delta: michael@0: return michael@0: self.fail(msg or '%s != %s within %s delta' \ michael@0: % (self._safe_repr(a), self._safe_repr(b), michael@0: self._safe_repr(delta))) michael@0: else: michael@0: self.assertEqual(a, b, msg=msg) michael@0: michael@0: michael@0: def skipIf(condition, reason): michael@0: def decorator(fun): michael@0: @wraps(fun) michael@0: def wrapper(*args, **kwargs): michael@0: self = args[0] michael@0: if condition: michael@0: sys.stdout.write("skipped-") michael@0: sys.stdout.flush() michael@0: if warn: michael@0: objname = "%s.%s" % (self.__class__.__name__, michael@0: fun.__name__) michael@0: msg = "%s was skipped" % objname michael@0: if reason: michael@0: msg += "; reason: " + repr(reason) michael@0: register_warning(msg) michael@0: return michael@0: else: michael@0: return fun(*args, **kwargs) michael@0: return wrapper michael@0: return decorator michael@0: michael@0: def skipUnless(condition, reason): michael@0: if not condition: michael@0: return unittest.skipIf(True, reason) michael@0: return unittest.skipIf(False, reason) michael@0: michael@0: unittest.TestCase = TestCase michael@0: unittest.skipIf = skipIf michael@0: unittest.skipUnless = skipUnless michael@0: del TestCase, skipIf, skipUnless michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- System-related API tests michael@0: # =================================================================== michael@0: michael@0: class TestSystemAPIs(unittest.TestCase): michael@0: """Tests for system-related APIs.""" michael@0: michael@0: def setUp(self): michael@0: safe_remove(TESTFN) michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_process_iter(self): michael@0: self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) michael@0: sproc = get_test_subprocess() michael@0: self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) michael@0: p = psutil.Process(sproc.pid) michael@0: p.kill() michael@0: p.wait() michael@0: self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) michael@0: michael@0: def test_TOTAL_PHYMEM(self): michael@0: x = psutil.TOTAL_PHYMEM michael@0: self.assertIsInstance(x, (int, long)) michael@0: self.assertGreater(x, 0) michael@0: self.assertEqual(x, psutil.virtual_memory().total) michael@0: michael@0: def test_BOOT_TIME(self, arg=None): michael@0: x = arg or psutil.BOOT_TIME michael@0: self.assertIsInstance(x, float) michael@0: self.assertGreater(x, 0) michael@0: self.assertLess(x, time.time()) michael@0: michael@0: def test_get_boot_time(self): michael@0: self.test_BOOT_TIME(psutil.get_boot_time()) michael@0: if WINDOWS: michael@0: # work around float precision issues; give it 1 secs tolerance michael@0: diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME) michael@0: self.assertLess(diff, 1) michael@0: else: michael@0: self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME) michael@0: michael@0: def test_NUM_CPUS(self): michael@0: self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True))) michael@0: self.assertGreaterEqual(psutil.NUM_CPUS, 1) michael@0: michael@0: @unittest.skipUnless(POSIX, 'posix only') michael@0: def test_PAGESIZE(self): michael@0: # pagesize is used internally to perform different calculations michael@0: # and it's determined by using SC_PAGE_SIZE; make sure michael@0: # getpagesize() returns the same value. michael@0: import resource michael@0: self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) michael@0: michael@0: def test_deprecated_apis(self): michael@0: s = socket.socket() michael@0: s.bind(('localhost', 0)) michael@0: s.listen(1) michael@0: warnings.filterwarnings("error") michael@0: p = psutil.Process(os.getpid()) michael@0: try: michael@0: self.assertRaises(DeprecationWarning, psutil.virtmem_usage) michael@0: self.assertRaises(DeprecationWarning, psutil.used_phymem) michael@0: self.assertRaises(DeprecationWarning, psutil.avail_phymem) michael@0: self.assertRaises(DeprecationWarning, psutil.total_virtmem) michael@0: self.assertRaises(DeprecationWarning, psutil.used_virtmem) michael@0: self.assertRaises(DeprecationWarning, psutil.avail_virtmem) michael@0: self.assertRaises(DeprecationWarning, psutil.phymem_usage) michael@0: self.assertRaises(DeprecationWarning, psutil.get_process_list) michael@0: self.assertRaises(DeprecationWarning, psutil.network_io_counters) michael@0: if LINUX: michael@0: self.assertRaises(DeprecationWarning, psutil.phymem_buffers) michael@0: self.assertRaises(DeprecationWarning, psutil.cached_phymem) michael@0: try: michael@0: p.nice michael@0: except DeprecationWarning: michael@0: pass michael@0: else: michael@0: self.fail("p.nice didn't raise DeprecationWarning") michael@0: ret = call_until(p.get_connections, "len(ret) != 0", timeout=1) michael@0: self.assertRaises(DeprecationWarning, michael@0: getattr, ret[0], 'local_address') michael@0: self.assertRaises(DeprecationWarning, michael@0: getattr, ret[0], 'remote_address') michael@0: finally: michael@0: s.close() michael@0: warnings.resetwarnings() michael@0: michael@0: def test_deprecated_apis_retval(self): michael@0: warnings.filterwarnings("ignore") michael@0: p = psutil.Process(os.getpid()) michael@0: try: michael@0: self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total) michael@0: self.assertEqual(p.nice, p.get_nice()) michael@0: finally: michael@0: warnings.resetwarnings() michael@0: michael@0: def test_virtual_memory(self): michael@0: mem = psutil.virtual_memory() michael@0: assert mem.total > 0, mem michael@0: assert mem.available > 0, mem michael@0: assert 0 <= mem.percent <= 100, mem michael@0: assert mem.used > 0, mem michael@0: assert mem.free >= 0, mem michael@0: for name in mem._fields: michael@0: if name != 'total': michael@0: value = getattr(mem, name) michael@0: if not value >= 0: michael@0: self.fail("%r < 0 (%s)" % (name, value)) michael@0: if value > mem.total: michael@0: self.fail("%r > total (total=%s, %s=%s)" \ michael@0: % (name, mem.total, name, value)) michael@0: michael@0: def test_swap_memory(self): michael@0: mem = psutil.swap_memory() michael@0: assert mem.total >= 0, mem michael@0: assert mem.used >= 0, mem michael@0: assert mem.free > 0, mem michael@0: assert 0 <= mem.percent <= 100, mem michael@0: assert mem.sin >= 0, mem michael@0: assert mem.sout >= 0, mem michael@0: michael@0: def test_pid_exists(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: assert psutil.pid_exists(sproc.pid) michael@0: p = psutil.Process(sproc.pid) michael@0: p.kill() michael@0: p.wait() michael@0: self.assertFalse(psutil.pid_exists(sproc.pid)) michael@0: self.assertFalse(psutil.pid_exists(-1)) michael@0: michael@0: def test_pid_exists_2(self): michael@0: reap_children() michael@0: pids = psutil.get_pid_list() michael@0: for pid in pids: michael@0: try: michael@0: assert psutil.pid_exists(pid) michael@0: except AssertionError: michael@0: # in case the process disappeared in meantime fail only michael@0: # if it is no longer in get_pid_list() michael@0: time.sleep(.1) michael@0: if pid in psutil.get_pid_list(): michael@0: self.fail(pid) michael@0: pids = range(max(pids) + 5000, max(pids) + 6000) michael@0: for pid in pids: michael@0: self.assertFalse(psutil.pid_exists(pid)) michael@0: michael@0: def test_get_pid_list(self): michael@0: plist = [x.pid for x in psutil.process_iter()] michael@0: pidlist = psutil.get_pid_list() michael@0: self.assertEqual(plist.sort(), pidlist.sort()) michael@0: # make sure every pid is unique michael@0: self.assertEqual(len(pidlist), len(set(pidlist))) michael@0: michael@0: def test_test(self): michael@0: # test for psutil.test() function michael@0: stdout = sys.stdout michael@0: sys.stdout = DEVNULL michael@0: try: michael@0: psutil.test() michael@0: finally: michael@0: sys.stdout = stdout michael@0: michael@0: def test_sys_cpu_times(self): michael@0: total = 0 michael@0: times = psutil.cpu_times() michael@0: sum(times) michael@0: for cp_time in times: michael@0: self.assertIsInstance(cp_time, float) michael@0: self.assertGreaterEqual(cp_time, 0.0) michael@0: total += cp_time michael@0: self.assertEqual(total, sum(times)) michael@0: str(times) michael@0: michael@0: def test_sys_cpu_times2(self): michael@0: t1 = sum(psutil.cpu_times()) michael@0: time.sleep(0.1) michael@0: t2 = sum(psutil.cpu_times()) michael@0: difference = t2 - t1 michael@0: if not difference >= 0.05: michael@0: self.fail("difference %s" % difference) michael@0: michael@0: def test_sys_per_cpu_times(self): michael@0: for times in psutil.cpu_times(percpu=True): michael@0: total = 0 michael@0: sum(times) michael@0: for cp_time in times: michael@0: self.assertIsInstance(cp_time, float) michael@0: self.assertGreaterEqual(cp_time, 0.0) michael@0: total += cp_time michael@0: self.assertEqual(total, sum(times)) michael@0: str(times) michael@0: self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), michael@0: len(psutil.cpu_times(percpu=False))) michael@0: michael@0: def test_sys_per_cpu_times2(self): michael@0: tot1 = psutil.cpu_times(percpu=True) michael@0: stop_at = time.time() + 0.1 michael@0: while 1: michael@0: if time.time() >= stop_at: michael@0: break michael@0: tot2 = psutil.cpu_times(percpu=True) michael@0: for t1, t2 in zip(tot1, tot2): michael@0: t1, t2 = sum(t1), sum(t2) michael@0: difference = t2 - t1 michael@0: if difference >= 0.05: michael@0: return michael@0: self.fail() michael@0: michael@0: def _test_cpu_percent(self, percent): michael@0: self.assertIsInstance(percent, float) michael@0: self.assertGreaterEqual(percent, 0.0) michael@0: self.assertLessEqual(percent, 100.0) michael@0: michael@0: def test_sys_cpu_percent(self): michael@0: psutil.cpu_percent(interval=0.001) michael@0: for x in range(1000): michael@0: self._test_cpu_percent(psutil.cpu_percent(interval=None)) michael@0: michael@0: def test_sys_per_cpu_percent(self): michael@0: self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)), michael@0: psutil.NUM_CPUS) michael@0: for x in range(1000): michael@0: percents = psutil.cpu_percent(interval=None, percpu=True) michael@0: for percent in percents: michael@0: self._test_cpu_percent(percent) michael@0: michael@0: def test_sys_cpu_times_percent(self): michael@0: psutil.cpu_times_percent(interval=0.001) michael@0: for x in range(1000): michael@0: cpu = psutil.cpu_times_percent(interval=None) michael@0: for percent in cpu: michael@0: self._test_cpu_percent(percent) michael@0: self._test_cpu_percent(sum(cpu)) michael@0: michael@0: def test_sys_per_cpu_times_percent(self): michael@0: self.assertEqual(len(psutil.cpu_times_percent(interval=0.001, michael@0: percpu=True)), michael@0: psutil.NUM_CPUS) michael@0: for x in range(1000): michael@0: cpus = psutil.cpu_times_percent(interval=None, percpu=True) michael@0: for cpu in cpus: michael@0: for percent in cpu: michael@0: self._test_cpu_percent(percent) michael@0: self._test_cpu_percent(sum(cpu)) michael@0: michael@0: @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), michael@0: "os.statvfs() function not available on this platform") michael@0: def test_disk_usage(self): michael@0: usage = psutil.disk_usage(os.getcwd()) michael@0: assert usage.total > 0, usage michael@0: assert usage.used > 0, usage michael@0: assert usage.free > 0, usage michael@0: assert usage.total > usage.used, usage michael@0: assert usage.total > usage.free, usage michael@0: assert 0 <= usage.percent <= 100, usage.percent michael@0: michael@0: # if path does not exist OSError ENOENT is expected across michael@0: # all platforms michael@0: fname = tempfile.mktemp() michael@0: try: michael@0: psutil.disk_usage(fname) michael@0: except OSError: michael@0: err = sys.exc_info()[1] michael@0: if err.args[0] != errno.ENOENT: michael@0: raise michael@0: else: michael@0: self.fail("OSError not raised") michael@0: michael@0: @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), michael@0: "os.statvfs() function not available on this platform") michael@0: def test_disk_partitions(self): michael@0: # all = False michael@0: for disk in psutil.disk_partitions(all=False): michael@0: if WINDOWS and 'cdrom' in disk.opts: michael@0: continue michael@0: if not POSIX: michael@0: assert os.path.exists(disk.device), disk michael@0: else: michael@0: # we cannot make any assumption about this, see: michael@0: # http://goo.gl/p9c43 michael@0: disk.device michael@0: if SUNOS: michael@0: # on solaris apparently mount points can also be files michael@0: assert os.path.exists(disk.mountpoint), disk michael@0: else: michael@0: assert os.path.isdir(disk.mountpoint), disk michael@0: assert disk.fstype, disk michael@0: self.assertIsInstance(disk.opts, str) michael@0: michael@0: # all = True michael@0: for disk in psutil.disk_partitions(all=True): michael@0: if not WINDOWS: michael@0: try: michael@0: os.stat(disk.mountpoint) michael@0: except OSError: michael@0: # http://mail.python.org/pipermail/python-dev/2012-June/120787.html michael@0: err = sys.exc_info()[1] michael@0: if err.errno not in (errno.EPERM, errno.EACCES): michael@0: raise michael@0: else: michael@0: if SUNOS: michael@0: # on solaris apparently mount points can also be files michael@0: assert os.path.exists(disk.mountpoint), disk michael@0: else: michael@0: assert os.path.isdir(disk.mountpoint), disk michael@0: self.assertIsInstance(disk.fstype, str) michael@0: self.assertIsInstance(disk.opts, str) michael@0: michael@0: def find_mount_point(path): michael@0: path = os.path.abspath(path) michael@0: while not os.path.ismount(path): michael@0: path = os.path.dirname(path) michael@0: return path michael@0: michael@0: mount = find_mount_point(__file__) michael@0: mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] michael@0: self.assertIn(mount, mounts) michael@0: psutil.disk_usage(mount) michael@0: michael@0: def test_net_io_counters(self): michael@0: def check_ntuple(nt): michael@0: self.assertEqual(nt[0], nt.bytes_sent) michael@0: self.assertEqual(nt[1], nt.bytes_recv) michael@0: self.assertEqual(nt[2], nt.packets_sent) michael@0: self.assertEqual(nt[3], nt.packets_recv) michael@0: self.assertEqual(nt[4], nt.errin) michael@0: self.assertEqual(nt[5], nt.errout) michael@0: self.assertEqual(nt[6], nt.dropin) michael@0: self.assertEqual(nt[7], nt.dropout) michael@0: assert nt.bytes_sent >= 0, nt michael@0: assert nt.bytes_recv >= 0, nt michael@0: assert nt.packets_sent >= 0, nt michael@0: assert nt.packets_recv >= 0, nt michael@0: assert nt.errin >= 0, nt michael@0: assert nt.errout >= 0, nt michael@0: assert nt.dropin >= 0, nt michael@0: assert nt.dropout >= 0, nt michael@0: michael@0: ret = psutil.net_io_counters(pernic=False) michael@0: check_ntuple(ret) michael@0: ret = psutil.net_io_counters(pernic=True) michael@0: assert ret != [] michael@0: for key in ret: michael@0: assert key michael@0: check_ntuple(ret[key]) michael@0: michael@0: def test_disk_io_counters(self): michael@0: def check_ntuple(nt): michael@0: self.assertEqual(nt[0], nt.read_count) michael@0: self.assertEqual(nt[1], nt.write_count) michael@0: self.assertEqual(nt[2], nt.read_bytes) michael@0: self.assertEqual(nt[3], nt.write_bytes) michael@0: self.assertEqual(nt[4], nt.read_time) michael@0: self.assertEqual(nt[5], nt.write_time) michael@0: assert nt.read_count >= 0, nt michael@0: assert nt.write_count >= 0, nt michael@0: assert nt.read_bytes >= 0, nt michael@0: assert nt.write_bytes >= 0, nt michael@0: assert nt.read_time >= 0, nt michael@0: assert nt.write_time >= 0, nt michael@0: michael@0: ret = psutil.disk_io_counters(perdisk=False) michael@0: check_ntuple(ret) michael@0: ret = psutil.disk_io_counters(perdisk=True) michael@0: # make sure there are no duplicates michael@0: self.assertEqual(len(ret), len(set(ret))) michael@0: for key in ret: michael@0: assert key, key michael@0: check_ntuple(ret[key]) michael@0: if LINUX and key[-1].isdigit(): michael@0: # if 'sda1' is listed 'sda' shouldn't, see: michael@0: # http://code.google.com/p/psutil/issues/detail?id=338 michael@0: while key[-1].isdigit(): michael@0: key = key[:-1] michael@0: self.assertNotIn(key, ret.keys()) michael@0: michael@0: def test_get_users(self): michael@0: users = psutil.get_users() michael@0: assert users michael@0: for user in users: michael@0: assert user.name, user michael@0: user.terminal michael@0: user.host michael@0: assert user.started > 0.0, user michael@0: datetime.datetime.fromtimestamp(user.started) michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- psutil.Process class tests michael@0: # =================================================================== michael@0: michael@0: class TestProcess(unittest.TestCase): michael@0: """Tests for psutil.Process class.""" michael@0: michael@0: def setUp(self): michael@0: safe_remove(TESTFN) michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_kill(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: test_pid = sproc.pid michael@0: p = psutil.Process(test_pid) michael@0: name = p.name michael@0: p.kill() michael@0: p.wait() michael@0: self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) michael@0: michael@0: def test_terminate(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: test_pid = sproc.pid michael@0: p = psutil.Process(test_pid) michael@0: name = p.name michael@0: p.terminate() michael@0: p.wait() michael@0: self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) michael@0: michael@0: def test_send_signal(self): michael@0: if POSIX: michael@0: sig = signal.SIGKILL michael@0: else: michael@0: sig = signal.SIGTERM michael@0: sproc = get_test_subprocess() michael@0: test_pid = sproc.pid michael@0: p = psutil.Process(test_pid) michael@0: name = p.name michael@0: p.send_signal(sig) michael@0: p.wait() michael@0: self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) michael@0: michael@0: def test_wait(self): michael@0: # check exit code signal michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: p.kill() michael@0: code = p.wait() michael@0: if os.name == 'posix': michael@0: self.assertEqual(code, signal.SIGKILL) michael@0: else: michael@0: self.assertEqual(code, 0) michael@0: self.assertFalse(p.is_running()) michael@0: michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: p.terminate() michael@0: code = p.wait() michael@0: if os.name == 'posix': michael@0: self.assertEqual(code, signal.SIGTERM) michael@0: else: michael@0: self.assertEqual(code, 0) michael@0: self.assertFalse(p.is_running()) michael@0: michael@0: # check sys.exit() code michael@0: code = "import time, sys; time.sleep(0.01); sys.exit(5);" michael@0: sproc = get_test_subprocess([PYTHON, "-c", code]) michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertEqual(p.wait(), 5) michael@0: self.assertFalse(p.is_running()) michael@0: michael@0: # Test wait() issued twice. michael@0: # It is not supposed to raise NSP when the process is gone. michael@0: # On UNIX this should return None, on Windows it should keep michael@0: # returning the exit code. michael@0: sproc = get_test_subprocess([PYTHON, "-c", code]) michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertEqual(p.wait(), 5) michael@0: self.assertIn(p.wait(), (5, None)) michael@0: michael@0: # test timeout michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: p.name michael@0: self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) michael@0: michael@0: # timeout < 0 not allowed michael@0: self.assertRaises(ValueError, p.wait, -1) michael@0: michael@0: @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows? michael@0: def test_wait_non_children(self): michael@0: # test wait() against processes which are not our children michael@0: code = "import sys;" michael@0: code += "from subprocess import Popen, PIPE;" michael@0: code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON michael@0: code += "sp = Popen(cmd, stdout=PIPE);" michael@0: code += "sys.stdout.write(str(sp.pid));" michael@0: sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE) michael@0: michael@0: grandson_pid = int(sproc.stdout.read()) michael@0: grandson_proc = psutil.Process(grandson_pid) michael@0: try: michael@0: self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) michael@0: grandson_proc.kill() michael@0: ret = grandson_proc.wait() michael@0: self.assertEqual(ret, None) michael@0: finally: michael@0: if grandson_proc.is_running(): michael@0: grandson_proc.kill() michael@0: grandson_proc.wait() michael@0: michael@0: def test_wait_timeout_0(self): michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertRaises(psutil.TimeoutExpired, p.wait, 0) michael@0: p.kill() michael@0: stop_at = time.time() + 2 michael@0: while 1: michael@0: try: michael@0: code = p.wait(0) michael@0: except psutil.TimeoutExpired: michael@0: if time.time() >= stop_at: michael@0: raise michael@0: else: michael@0: break michael@0: if os.name == 'posix': michael@0: self.assertEqual(code, signal.SIGKILL) michael@0: else: michael@0: self.assertEqual(code, 0) michael@0: self.assertFalse(p.is_running()) michael@0: michael@0: def test_cpu_percent(self): michael@0: p = psutil.Process(os.getpid()) michael@0: p.get_cpu_percent(interval=0.001) michael@0: p.get_cpu_percent(interval=0.001) michael@0: for x in range(100): michael@0: percent = p.get_cpu_percent(interval=None) michael@0: self.assertIsInstance(percent, float) michael@0: self.assertGreaterEqual(percent, 0.0) michael@0: if os.name != 'posix': michael@0: self.assertLessEqual(percent, 100.0) michael@0: else: michael@0: self.assertGreaterEqual(percent, 0.0) michael@0: michael@0: def test_cpu_times(self): michael@0: times = psutil.Process(os.getpid()).get_cpu_times() michael@0: assert (times.user > 0.0) or (times.system > 0.0), times michael@0: # make sure returned values can be pretty printed with strftime michael@0: time.strftime("%H:%M:%S", time.localtime(times.user)) michael@0: time.strftime("%H:%M:%S", time.localtime(times.system)) michael@0: michael@0: # Test Process.cpu_times() against os.times() michael@0: # os.times() is broken on Python 2.6 michael@0: # http://bugs.python.org/issue1040026 michael@0: # XXX fails on OSX: not sure if it's for os.times(). We should michael@0: # try this with Python 2.7 and re-enable the test. michael@0: michael@0: @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, michael@0: 'os.times() is not reliable on this Python version') michael@0: def test_cpu_times2(self): michael@0: user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() michael@0: utime, ktime = os.times()[:2] michael@0: michael@0: # Use os.times()[:2] as base values to compare our results michael@0: # using a tolerance of +/- 0.1 seconds. michael@0: # It will fail if the difference between the values is > 0.1s. michael@0: if (max([user_time, utime]) - min([user_time, utime])) > 0.1: michael@0: self.fail("expected: %s, found: %s" %(utime, user_time)) michael@0: michael@0: if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: michael@0: self.fail("expected: %s, found: %s" %(ktime, kernel_time)) michael@0: michael@0: def test_create_time(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: now = time.time() michael@0: p = psutil.Process(sproc.pid) michael@0: create_time = p.create_time michael@0: michael@0: # Use time.time() as base value to compare our result using a michael@0: # tolerance of +/- 1 second. michael@0: # It will fail if the difference between the values is > 2s. michael@0: difference = abs(create_time - now) michael@0: if difference > 2: michael@0: self.fail("expected: %s, found: %s, difference: %s" michael@0: % (now, create_time, difference)) michael@0: michael@0: # make sure returned value can be pretty printed with strftime michael@0: time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) michael@0: michael@0: @unittest.skipIf(WINDOWS, 'windows only') michael@0: def test_terminal(self): michael@0: terminal = psutil.Process(os.getpid()).terminal michael@0: if sys.stdin.isatty(): michael@0: self.assertEqual(terminal, sh('tty')) michael@0: else: michael@0: assert terminal, repr(terminal) michael@0: michael@0: @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'), michael@0: 'not available on this platform') michael@0: @skip_on_not_implemented(only_if=LINUX) michael@0: def test_get_io_counters(self): michael@0: p = psutil.Process(os.getpid()) michael@0: # test reads michael@0: io1 = p.get_io_counters() michael@0: f = open(PYTHON, 'rb') michael@0: f.read() michael@0: f.close() michael@0: io2 = p.get_io_counters() michael@0: if not BSD: michael@0: assert io2.read_count > io1.read_count, (io1, io2) michael@0: self.assertEqual(io2.write_count, io1.write_count) michael@0: assert io2.read_bytes >= io1.read_bytes, (io1, io2) michael@0: assert io2.write_bytes >= io1.write_bytes, (io1, io2) michael@0: # test writes michael@0: io1 = p.get_io_counters() michael@0: f = tempfile.TemporaryFile() michael@0: if PY3: michael@0: f.write(bytes("x" * 1000000, 'ascii')) michael@0: else: michael@0: f.write("x" * 1000000) michael@0: f.close() michael@0: io2 = p.get_io_counters() michael@0: assert io2.write_count >= io1.write_count, (io1, io2) michael@0: assert io2.write_bytes >= io1.write_bytes, (io1, io2) michael@0: assert io2.read_count >= io1.read_count, (io1, io2) michael@0: assert io2.read_bytes >= io1.read_bytes, (io1, io2) michael@0: michael@0: # Linux and Windows Vista+ michael@0: @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'), michael@0: 'Linux and Windows Vista only') michael@0: def test_get_set_ionice(self): michael@0: if LINUX: michael@0: from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, michael@0: IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) michael@0: self.assertEqual(IOPRIO_CLASS_NONE, 0) michael@0: self.assertEqual(IOPRIO_CLASS_RT, 1) michael@0: self.assertEqual(IOPRIO_CLASS_BE, 2) michael@0: self.assertEqual(IOPRIO_CLASS_IDLE, 3) michael@0: p = psutil.Process(os.getpid()) michael@0: try: michael@0: p.set_ionice(2) michael@0: ioclass, value = p.get_ionice() michael@0: self.assertEqual(ioclass, 2) michael@0: self.assertEqual(value, 4) michael@0: # michael@0: p.set_ionice(3) michael@0: ioclass, value = p.get_ionice() michael@0: self.assertEqual(ioclass, 3) michael@0: self.assertEqual(value, 0) michael@0: # michael@0: p.set_ionice(2, 0) michael@0: ioclass, value = p.get_ionice() michael@0: self.assertEqual(ioclass, 2) michael@0: self.assertEqual(value, 0) michael@0: p.set_ionice(2, 7) michael@0: ioclass, value = p.get_ionice() michael@0: self.assertEqual(ioclass, 2) michael@0: self.assertEqual(value, 7) michael@0: self.assertRaises(ValueError, p.set_ionice, 2, 10) michael@0: finally: michael@0: p.set_ionice(IOPRIO_CLASS_NONE) michael@0: else: michael@0: p = psutil.Process(os.getpid()) michael@0: original = p.get_ionice() michael@0: try: michael@0: value = 0 # very low michael@0: if original == value: michael@0: value = 1 # low michael@0: p.set_ionice(value) michael@0: self.assertEqual(p.get_ionice(), value) michael@0: finally: michael@0: p.set_ionice(original) michael@0: # michael@0: self.assertRaises(ValueError, p.set_ionice, 3) michael@0: self.assertRaises(TypeError, p.set_ionice, 2, 1) michael@0: michael@0: def test_get_num_threads(self): michael@0: # on certain platforms such as Linux we might test for exact michael@0: # thread number, since we always have with 1 thread per process, michael@0: # but this does not apply across all platforms (OSX, Windows) michael@0: p = psutil.Process(os.getpid()) michael@0: step1 = p.get_num_threads() michael@0: michael@0: thread = ThreadTask() michael@0: thread.start() michael@0: try: michael@0: step2 = p.get_num_threads() michael@0: self.assertEqual(step2, step1 + 1) michael@0: thread.stop() michael@0: finally: michael@0: if thread._running: michael@0: thread.stop() michael@0: michael@0: @unittest.skipUnless(WINDOWS, 'Windows only') michael@0: def test_get_num_handles(self): michael@0: # a better test is done later into test/_windows.py michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertGreater(p.get_num_handles(), 0) michael@0: michael@0: def test_get_threads(self): michael@0: p = psutil.Process(os.getpid()) michael@0: step1 = p.get_threads() michael@0: michael@0: thread = ThreadTask() michael@0: thread.start() michael@0: michael@0: try: michael@0: step2 = p.get_threads() michael@0: self.assertEqual(len(step2), len(step1) + 1) michael@0: # on Linux, first thread id is supposed to be this process michael@0: if LINUX: michael@0: self.assertEqual(step2[0].id, os.getpid()) michael@0: athread = step2[0] michael@0: # test named tuple michael@0: self.assertEqual(athread.id, athread[0]) michael@0: self.assertEqual(athread.user_time, athread[1]) michael@0: self.assertEqual(athread.system_time, athread[2]) michael@0: # test num threads michael@0: thread.stop() michael@0: finally: michael@0: if thread._running: michael@0: thread.stop() michael@0: michael@0: def test_get_memory_info(self): michael@0: p = psutil.Process(os.getpid()) michael@0: michael@0: # step 1 - get a base value to compare our results michael@0: rss1, vms1 = p.get_memory_info() michael@0: percent1 = p.get_memory_percent() michael@0: self.assertGreater(rss1, 0) michael@0: self.assertGreater(vms1, 0) michael@0: michael@0: # step 2 - allocate some memory michael@0: memarr = [None] * 1500000 michael@0: michael@0: rss2, vms2 = p.get_memory_info() michael@0: percent2 = p.get_memory_percent() michael@0: # make sure that the memory usage bumped up michael@0: self.assertGreater(rss2, rss1) michael@0: self.assertGreaterEqual(vms2, vms1) # vms might be equal michael@0: self.assertGreater(percent2, percent1) michael@0: del memarr michael@0: michael@0: # def test_get_ext_memory_info(self): michael@0: # # tested later in fetch all test suite michael@0: michael@0: def test_get_memory_maps(self): michael@0: p = psutil.Process(os.getpid()) michael@0: maps = p.get_memory_maps() michael@0: paths = [x for x in maps] michael@0: self.assertEqual(len(paths), len(set(paths))) michael@0: ext_maps = p.get_memory_maps(grouped=False) michael@0: michael@0: for nt in maps: michael@0: if not nt.path.startswith('['): michael@0: assert os.path.isabs(nt.path), nt.path michael@0: if POSIX: michael@0: assert os.path.exists(nt.path), nt.path michael@0: else: michael@0: # XXX - On Windows we have this strange behavior with michael@0: # 64 bit dlls: they are visible via explorer but cannot michael@0: # be accessed via os.stat() (wtf?). michael@0: if '64' not in os.path.basename(nt.path): michael@0: assert os.path.exists(nt.path), nt.path michael@0: for nt in ext_maps: michael@0: for fname in nt._fields: michael@0: value = getattr(nt, fname) michael@0: if fname == 'path': michael@0: continue michael@0: elif fname in ('addr', 'perms'): michael@0: assert value, value michael@0: else: michael@0: self.assertIsInstance(value, (int, long)) michael@0: assert value >= 0, value michael@0: michael@0: def test_get_memory_percent(self): michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertGreater(p.get_memory_percent(), 0.0) michael@0: michael@0: def test_pid(self): michael@0: sproc = get_test_subprocess() michael@0: self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) michael@0: michael@0: def test_is_running(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: p = psutil.Process(sproc.pid) michael@0: assert p.is_running() michael@0: assert p.is_running() michael@0: p.kill() michael@0: p.wait() michael@0: assert not p.is_running() michael@0: assert not p.is_running() michael@0: michael@0: def test_exe(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: exe = psutil.Process(sproc.pid).exe michael@0: try: michael@0: self.assertEqual(exe, PYTHON) michael@0: except AssertionError: michael@0: if WINDOWS and len(exe) == len(PYTHON): michael@0: # on Windows we don't care about case sensitivity michael@0: self.assertEqual(exe.lower(), PYTHON.lower()) michael@0: else: michael@0: # certain platforms such as BSD are more accurate returning: michael@0: # "/usr/local/bin/python2.7" michael@0: # ...instead of: michael@0: # "/usr/local/bin/python" michael@0: # We do not want to consider this difference in accuracy michael@0: # an error. michael@0: ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) michael@0: self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) michael@0: michael@0: def test_cmdline(self): michael@0: cmdline = [PYTHON, "-c", "import time; time.sleep(2)"] michael@0: sproc = get_test_subprocess(cmdline, wait=True) michael@0: self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline), michael@0: ' '.join(cmdline)) michael@0: michael@0: def test_name(self): michael@0: sproc = get_test_subprocess(PYTHON, wait=True) michael@0: name = psutil.Process(sproc.pid).name.lower() michael@0: pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() michael@0: assert pyexe.startswith(name), (pyexe, name) michael@0: michael@0: @unittest.skipUnless(POSIX, 'posix only') michael@0: def test_uids(self): michael@0: p = psutil.Process(os.getpid()) michael@0: real, effective, saved = p.uids michael@0: # os.getuid() refers to "real" uid michael@0: self.assertEqual(real, os.getuid()) michael@0: # os.geteuid() refers to "effective" uid michael@0: self.assertEqual(effective, os.geteuid()) michael@0: # no such thing as os.getsuid() ("saved" uid), but starting michael@0: # from python 2.7 we have os.getresuid()[2] michael@0: if hasattr(os, "getresuid"): michael@0: self.assertEqual(saved, os.getresuid()[2]) michael@0: michael@0: @unittest.skipUnless(POSIX, 'posix only') michael@0: def test_gids(self): michael@0: p = psutil.Process(os.getpid()) michael@0: real, effective, saved = p.gids michael@0: # os.getuid() refers to "real" uid michael@0: self.assertEqual(real, os.getgid()) michael@0: # os.geteuid() refers to "effective" uid michael@0: self.assertEqual(effective, os.getegid()) michael@0: # no such thing as os.getsuid() ("saved" uid), but starting michael@0: # from python 2.7 we have os.getresgid()[2] michael@0: if hasattr(os, "getresuid"): michael@0: self.assertEqual(saved, os.getresgid()[2]) michael@0: michael@0: def test_nice(self): michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertRaises(TypeError, p.set_nice, "str") michael@0: if os.name == 'nt': michael@0: try: michael@0: self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) michael@0: p.set_nice(psutil.HIGH_PRIORITY_CLASS) michael@0: self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS) michael@0: p.set_nice(psutil.NORMAL_PRIORITY_CLASS) michael@0: self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS) michael@0: finally: michael@0: p.set_nice(psutil.NORMAL_PRIORITY_CLASS) michael@0: else: michael@0: try: michael@0: try: michael@0: first_nice = p.get_nice() michael@0: p.set_nice(1) michael@0: self.assertEqual(p.get_nice(), 1) michael@0: # going back to previous nice value raises AccessDenied on OSX michael@0: if not OSX: michael@0: p.set_nice(0) michael@0: self.assertEqual(p.get_nice(), 0) michael@0: except psutil.AccessDenied: michael@0: pass michael@0: finally: michael@0: try: michael@0: p.set_nice(first_nice) michael@0: except psutil.AccessDenied: michael@0: pass michael@0: michael@0: def test_status(self): michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertEqual(p.status, psutil.STATUS_RUNNING) michael@0: self.assertEqual(str(p.status), "running") michael@0: michael@0: def test_status_constants(self): michael@0: # STATUS_* constants are supposed to be comparable also by michael@0: # using their str representation michael@0: self.assertTrue(psutil.STATUS_RUNNING == 0) michael@0: self.assertTrue(psutil.STATUS_RUNNING == long(0)) michael@0: self.assertTrue(psutil.STATUS_RUNNING == 'running') michael@0: self.assertFalse(psutil.STATUS_RUNNING == 1) michael@0: self.assertFalse(psutil.STATUS_RUNNING == 'sleeping') michael@0: self.assertFalse(psutil.STATUS_RUNNING != 0) michael@0: self.assertFalse(psutil.STATUS_RUNNING != 'running') michael@0: self.assertTrue(psutil.STATUS_RUNNING != 1) michael@0: self.assertTrue(psutil.STATUS_RUNNING != 'sleeping') michael@0: michael@0: def test_username(self): michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: if POSIX: michael@0: import pwd michael@0: self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) michael@0: elif WINDOWS and 'USERNAME' in os.environ: michael@0: expected_username = os.environ['USERNAME'] michael@0: expected_domain = os.environ['USERDOMAIN'] michael@0: domain, username = p.username.split('\\') michael@0: self.assertEqual(domain, expected_domain) michael@0: self.assertEqual(username, expected_username) michael@0: else: michael@0: p.username michael@0: michael@0: @unittest.skipUnless(hasattr(psutil.Process, "getcwd"), michael@0: 'not available on this platform') michael@0: def test_getcwd(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertEqual(p.getcwd(), os.getcwd()) michael@0: michael@0: @unittest.skipIf(not hasattr(psutil.Process, "getcwd"), michael@0: 'not available on this platform') michael@0: def test_getcwd_2(self): michael@0: cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"] michael@0: sproc = get_test_subprocess(cmd, wait=True) michael@0: p = psutil.Process(sproc.pid) michael@0: call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1) michael@0: michael@0: @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"), michael@0: 'not available on this platform') michael@0: def test_cpu_affinity(self): michael@0: p = psutil.Process(os.getpid()) michael@0: initial = p.get_cpu_affinity() michael@0: all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) michael@0: # michael@0: for n in all_cpus: michael@0: p.set_cpu_affinity([n]) michael@0: self.assertEqual(p.get_cpu_affinity(), [n]) michael@0: # michael@0: p.set_cpu_affinity(all_cpus) michael@0: self.assertEqual(p.get_cpu_affinity(), all_cpus) michael@0: # michael@0: p.set_cpu_affinity(initial) michael@0: invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] michael@0: self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu) michael@0: michael@0: def test_get_open_files(self): michael@0: # current process michael@0: p = psutil.Process(os.getpid()) michael@0: files = p.get_open_files() michael@0: self.assertFalse(TESTFN in files) michael@0: f = open(TESTFN, 'w') michael@0: call_until(p.get_open_files, "len(ret) != %i" % len(files)) michael@0: filenames = [x.path for x in p.get_open_files()] michael@0: self.assertIn(TESTFN, filenames) michael@0: f.close() michael@0: for file in filenames: michael@0: assert os.path.isfile(file), file michael@0: michael@0: # another process michael@0: cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN michael@0: sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) michael@0: p = psutil.Process(sproc.pid) michael@0: for x in range(100): michael@0: filenames = [x.path for x in p.get_open_files()] michael@0: if TESTFN in filenames: michael@0: break michael@0: time.sleep(.01) michael@0: else: michael@0: self.assertIn(TESTFN, filenames) michael@0: for file in filenames: michael@0: assert os.path.isfile(file), file michael@0: michael@0: def test_get_open_files2(self): michael@0: # test fd and path fields michael@0: fileobj = open(TESTFN, 'w') michael@0: p = psutil.Process(os.getpid()) michael@0: for path, fd in p.get_open_files(): michael@0: if path == fileobj.name or fd == fileobj.fileno(): michael@0: break michael@0: else: michael@0: self.fail("no file found; files=%s" % repr(p.get_open_files())) michael@0: self.assertEqual(path, fileobj.name) michael@0: if WINDOWS: michael@0: self.assertEqual(fd, -1) michael@0: else: michael@0: self.assertEqual(fd, fileobj.fileno()) michael@0: # test positions michael@0: ntuple = p.get_open_files()[0] michael@0: self.assertEqual(ntuple[0], ntuple.path) michael@0: self.assertEqual(ntuple[1], ntuple.fd) michael@0: # test file is gone michael@0: fileobj.close() michael@0: self.assertTrue(fileobj.name not in p.get_open_files()) michael@0: michael@0: def test_connection_constants(self): michael@0: ints = [] michael@0: strs = [] michael@0: for name in dir(psutil): michael@0: if name.startswith('CONN_'): michael@0: num = getattr(psutil, name) michael@0: str_ = str(num) michael@0: assert str_.isupper(), str_ michael@0: assert str_ not in strs, str_ michael@0: assert num not in ints, num michael@0: ints.append(num) michael@0: strs.append(str_) michael@0: if SUNOS: michael@0: psutil.CONN_IDLE michael@0: psutil.CONN_BOUND michael@0: if WINDOWS: michael@0: psutil.CONN_DELETE_TCB michael@0: michael@0: def test_get_connections(self): michael@0: arg = "import socket, time;" \ michael@0: "s = socket.socket();" \ michael@0: "s.bind(('127.0.0.1', 0));" \ michael@0: "s.listen(1);" \ michael@0: "conn, addr = s.accept();" \ michael@0: "time.sleep(2);" michael@0: sproc = get_test_subprocess([PYTHON, "-c", arg]) michael@0: p = psutil.Process(sproc.pid) michael@0: for x in range(100): michael@0: if p.get_connections(): michael@0: # give the subprocess some more time to bind() michael@0: time.sleep(.01) michael@0: cons = p.get_connections() michael@0: break michael@0: time.sleep(.01) michael@0: self.assertEqual(len(cons), 1) michael@0: con = cons[0] michael@0: self.assertEqual(con.family, socket.AF_INET) michael@0: self.assertEqual(con.type, socket.SOCK_STREAM) michael@0: self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status)) michael@0: ip, port = con.laddr michael@0: self.assertEqual(ip, '127.0.0.1') michael@0: self.assertEqual(con.raddr, ()) michael@0: if WINDOWS or SUNOS: michael@0: self.assertEqual(con.fd, -1) michael@0: else: michael@0: assert con.fd > 0, con michael@0: # test positions michael@0: self.assertEqual(con[0], con.fd) michael@0: self.assertEqual(con[1], con.family) michael@0: self.assertEqual(con[2], con.type) michael@0: self.assertEqual(con[3], con.laddr) michael@0: self.assertEqual(con[4], con.raddr) michael@0: self.assertEqual(con[5], con.status) michael@0: # test kind arg michael@0: self.assertRaises(ValueError, p.get_connections, 'foo') michael@0: michael@0: @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported') michael@0: def test_get_connections_ipv6(self): michael@0: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) michael@0: s.bind(('::1', 0)) michael@0: s.listen(1) michael@0: cons = psutil.Process(os.getpid()).get_connections() michael@0: s.close() michael@0: self.assertEqual(len(cons), 1) michael@0: self.assertEqual(cons[0].laddr[0], '::1') michael@0: michael@0: @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported') michael@0: def test_get_connections_unix(self): michael@0: # tcp michael@0: safe_remove(TESTFN) michael@0: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) michael@0: sock.bind(TESTFN) michael@0: conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] michael@0: if conn.fd != -1: # != sunos and windows michael@0: self.assertEqual(conn.fd, sock.fileno()) michael@0: self.assertEqual(conn.family, socket.AF_UNIX) michael@0: self.assertEqual(conn.type, socket.SOCK_STREAM) michael@0: self.assertEqual(conn.laddr, TESTFN) michael@0: self.assertTrue(not conn.raddr) michael@0: self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status)) michael@0: sock.close() michael@0: # udp michael@0: safe_remove(TESTFN) michael@0: sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) michael@0: sock.bind(TESTFN) michael@0: conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0] michael@0: self.assertEqual(conn.type, socket.SOCK_DGRAM) michael@0: sock.close() michael@0: michael@0: @unittest.skipUnless(hasattr(socket, "fromfd"), michael@0: 'socket.fromfd() is not availble') michael@0: @unittest.skipIf(WINDOWS or SUNOS, michael@0: 'connection fd available on this platform') michael@0: def test_connection_fromfd(self): michael@0: sock = socket.socket() michael@0: sock.bind(('localhost', 0)) michael@0: sock.listen(1) michael@0: p = psutil.Process(os.getpid()) michael@0: for conn in p.get_connections(): michael@0: if conn.fd == sock.fileno(): michael@0: break michael@0: else: michael@0: sock.close() michael@0: self.fail("couldn't find socket fd") michael@0: dupsock = socket.fromfd(conn.fd, conn.family, conn.type) michael@0: try: michael@0: self.assertEqual(dupsock.getsockname(), conn.laddr) michael@0: self.assertNotEqual(sock.fileno(), dupsock.fileno()) michael@0: finally: michael@0: sock.close() michael@0: dupsock.close() michael@0: michael@0: def test_get_connections_all(self): michael@0: tcp_template = "import socket;" \ michael@0: "s = socket.socket($family, socket.SOCK_STREAM);" \ michael@0: "s.bind(('$addr', 0));" \ michael@0: "s.listen(1);" \ michael@0: "conn, addr = s.accept();" michael@0: michael@0: udp_template = "import socket, time;" \ michael@0: "s = socket.socket($family, socket.SOCK_DGRAM);" \ michael@0: "s.bind(('$addr', 0));" \ michael@0: "time.sleep(2);" michael@0: michael@0: from string import Template michael@0: tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, michael@0: addr="127.0.0.1") michael@0: udp4_template = Template(udp_template).substitute(family=socket.AF_INET, michael@0: addr="127.0.0.1") michael@0: tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, michael@0: addr="::1") michael@0: udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, michael@0: addr="::1") michael@0: michael@0: # launch various subprocess instantiating a socket of various michael@0: # families and types to enrich psutil results michael@0: tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) michael@0: udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) michael@0: if supports_ipv6(): michael@0: tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) michael@0: udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) michael@0: else: michael@0: tcp6_proc = None michael@0: udp6_proc = None michael@0: michael@0: # check matches against subprocesses just created michael@0: all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", michael@0: "udp", "udp4", "udp6") michael@0: for p in psutil.Process(os.getpid()).get_children(): michael@0: for conn in p.get_connections(): michael@0: # TCP v4 michael@0: if p.pid == tcp4_proc.pid: michael@0: self.assertEqual(conn.family, socket.AF_INET) michael@0: self.assertEqual(conn.type, socket.SOCK_STREAM) michael@0: self.assertEqual(conn.laddr[0], "127.0.0.1") michael@0: self.assertEqual(conn.raddr, ()) michael@0: self.assertEqual(conn.status, psutil.CONN_LISTEN, michael@0: str(conn.status)) michael@0: for kind in all_kinds: michael@0: cons = p.get_connections(kind=kind) michael@0: if kind in ("all", "inet", "inet4", "tcp", "tcp4"): michael@0: assert cons != [], cons michael@0: else: michael@0: self.assertEqual(cons, [], cons) michael@0: # UDP v4 michael@0: elif p.pid == udp4_proc.pid: michael@0: self.assertEqual(conn.family, socket.AF_INET) michael@0: self.assertEqual(conn.type, socket.SOCK_DGRAM) michael@0: self.assertEqual(conn.laddr[0], "127.0.0.1") michael@0: self.assertEqual(conn.raddr, ()) michael@0: self.assertEqual(conn.status, psutil.CONN_NONE, michael@0: str(conn.status)) michael@0: for kind in all_kinds: michael@0: cons = p.get_connections(kind=kind) michael@0: if kind in ("all", "inet", "inet4", "udp", "udp4"): michael@0: assert cons != [], cons michael@0: else: michael@0: self.assertEqual(cons, [], cons) michael@0: # TCP v6 michael@0: elif p.pid == getattr(tcp6_proc, "pid", None): michael@0: self.assertEqual(conn.family, socket.AF_INET6) michael@0: self.assertEqual(conn.type, socket.SOCK_STREAM) michael@0: self.assertIn(conn.laddr[0], ("::", "::1")) michael@0: self.assertEqual(conn.raddr, ()) michael@0: self.assertEqual(conn.status, psutil.CONN_LISTEN, michael@0: str(conn.status)) michael@0: for kind in all_kinds: michael@0: cons = p.get_connections(kind=kind) michael@0: if kind in ("all", "inet", "inet6", "tcp", "tcp6"): michael@0: assert cons != [], cons michael@0: else: michael@0: self.assertEqual(cons, [], cons) michael@0: # UDP v6 michael@0: elif p.pid == getattr(udp6_proc, "pid", None): michael@0: self.assertEqual(conn.family, socket.AF_INET6) michael@0: self.assertEqual(conn.type, socket.SOCK_DGRAM) michael@0: self.assertIn(conn.laddr[0], ("::", "::1")) michael@0: self.assertEqual(conn.raddr, ()) michael@0: self.assertEqual(conn.status, psutil.CONN_NONE, michael@0: str(conn.status)) michael@0: for kind in all_kinds: michael@0: cons = p.get_connections(kind=kind) michael@0: if kind in ("all", "inet", "inet6", "udp", "udp6"): michael@0: assert cons != [], cons michael@0: else: michael@0: self.assertEqual(cons, [], cons) michael@0: michael@0: @unittest.skipUnless(POSIX, 'posix only') michael@0: def test_get_num_fds(self): michael@0: p = psutil.Process(os.getpid()) michael@0: start = p.get_num_fds() michael@0: file = open(TESTFN, 'w') michael@0: self.assertEqual(p.get_num_fds(), start + 1) michael@0: sock = socket.socket() michael@0: self.assertEqual(p.get_num_fds(), start + 2) michael@0: file.close() michael@0: sock.close() michael@0: self.assertEqual(p.get_num_fds(), start) michael@0: michael@0: @skip_on_not_implemented(only_if=LINUX) michael@0: def test_get_num_ctx_switches(self): michael@0: p = psutil.Process(os.getpid()) michael@0: before = sum(p.get_num_ctx_switches()) michael@0: for x in range(500000): michael@0: after = sum(p.get_num_ctx_switches()) michael@0: if after > before: michael@0: return michael@0: self.fail("num ctx switches still the same after 50.000 iterations") michael@0: michael@0: def test_parent_ppid(self): michael@0: this_parent = os.getpid() michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertEqual(p.ppid, this_parent) michael@0: self.assertEqual(p.parent.pid, this_parent) michael@0: # no other process is supposed to have us as parent michael@0: for p in psutil.process_iter(): michael@0: if p.pid == sproc.pid: michael@0: continue michael@0: self.assertTrue(p.ppid != this_parent) michael@0: michael@0: def test_get_children(self): michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertEqual(p.get_children(), []) michael@0: self.assertEqual(p.get_children(recursive=True), []) michael@0: sproc = get_test_subprocess() michael@0: children1 = p.get_children() michael@0: children2 = p.get_children(recursive=True) michael@0: for children in (children1, children2): michael@0: self.assertEqual(len(children), 1) michael@0: self.assertEqual(children[0].pid, sproc.pid) michael@0: self.assertEqual(children[0].ppid, os.getpid()) michael@0: michael@0: def test_get_children_recursive(self): michael@0: # here we create a subprocess which creates another one as in: michael@0: # A (parent) -> B (child) -> C (grandchild) michael@0: s = "import subprocess, os, sys, time;" michael@0: s += "PYTHON = os.path.realpath(sys.executable);" michael@0: s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];" michael@0: s += "subprocess.Popen(cmd);" michael@0: s += "time.sleep(2);" michael@0: get_test_subprocess(cmd=[PYTHON, "-c", s]) michael@0: p = psutil.Process(os.getpid()) michael@0: self.assertEqual(len(p.get_children(recursive=False)), 1) michael@0: # give the grandchild some time to start michael@0: stop_at = time.time() + 1.5 michael@0: while time.time() < stop_at: michael@0: children = p.get_children(recursive=True) michael@0: if len(children) > 1: michael@0: break michael@0: self.assertEqual(len(children), 2) michael@0: self.assertEqual(children[0].ppid, os.getpid()) michael@0: self.assertEqual(children[1].ppid, children[0].pid) michael@0: michael@0: def test_get_children_duplicates(self): michael@0: # find the process which has the highest number of children michael@0: from psutil._compat import defaultdict michael@0: table = defaultdict(int) michael@0: for p in psutil.process_iter(): michael@0: try: michael@0: table[p.ppid] += 1 michael@0: except psutil.Error: michael@0: pass michael@0: # this is the one, now let's make sure there are no duplicates michael@0: pid = sorted(table.items(), key=lambda x: x[1])[-1][0] michael@0: p = psutil.Process(pid) michael@0: try: michael@0: c = p.get_children(recursive=True) michael@0: except psutil.AccessDenied: # windows michael@0: pass michael@0: else: michael@0: self.assertEqual(len(c), len(set(c))) michael@0: michael@0: def test_suspend_resume(self): michael@0: sproc = get_test_subprocess(wait=True) michael@0: p = psutil.Process(sproc.pid) michael@0: p.suspend() michael@0: for x in range(100): michael@0: if p.status == psutil.STATUS_STOPPED: michael@0: break michael@0: time.sleep(0.01) michael@0: self.assertEqual(str(p.status), "stopped") michael@0: p.resume() michael@0: assert p.status != psutil.STATUS_STOPPED, p.status michael@0: michael@0: def test_invalid_pid(self): michael@0: self.assertRaises(TypeError, psutil.Process, "1") michael@0: self.assertRaises(TypeError, psutil.Process, None) michael@0: self.assertRaises(ValueError, psutil.Process, -1) michael@0: michael@0: def test_as_dict(self): michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: d = p.as_dict() michael@0: try: michael@0: import json michael@0: except ImportError: michael@0: pass michael@0: else: michael@0: # dict is supposed to be hashable michael@0: json.dumps(d) michael@0: # michael@0: d = p.as_dict(attrs=['exe', 'name']) michael@0: self.assertEqual(sorted(d.keys()), ['exe', 'name']) michael@0: # michael@0: p = psutil.Process(min(psutil.get_pid_list())) michael@0: d = p.as_dict(attrs=['get_connections'], ad_value='foo') michael@0: if not isinstance(d['connections'], list): michael@0: self.assertEqual(d['connections'], 'foo') michael@0: michael@0: def test_zombie_process(self): michael@0: # Test that NoSuchProcess exception gets raised in case the michael@0: # process dies after we create the Process object. michael@0: # Example: michael@0: # >>> proc = Process(1234) michael@0: # >>> time.sleep(2) # time-consuming task, process dies in meantime michael@0: # >>> proc.name michael@0: # Refers to Issue #15 michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: p.kill() michael@0: p.wait() michael@0: michael@0: for name in dir(p): michael@0: if name.startswith('_')\ michael@0: or name in ('pid', 'send_signal', 'is_running', 'set_ionice', michael@0: 'wait', 'set_cpu_affinity', 'create_time', 'set_nice', michael@0: 'nice'): michael@0: continue michael@0: try: michael@0: meth = getattr(p, name) michael@0: if callable(meth): michael@0: meth() michael@0: except psutil.NoSuchProcess: michael@0: pass michael@0: except NotImplementedError: michael@0: pass michael@0: else: michael@0: self.fail("NoSuchProcess exception not raised for %r" % name) michael@0: michael@0: # other methods michael@0: try: michael@0: if os.name == 'posix': michael@0: p.set_nice(1) michael@0: else: michael@0: p.set_nice(psutil.NORMAL_PRIORITY_CLASS) michael@0: except psutil.NoSuchProcess: michael@0: pass michael@0: else: michael@0: self.fail("exception not raised") michael@0: if hasattr(p, 'set_ionice'): michael@0: self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) michael@0: self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) michael@0: self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0) michael@0: self.assertFalse(p.is_running()) michael@0: if hasattr(p, "set_cpu_affinity"): michael@0: self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0]) michael@0: michael@0: def test__str__(self): michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: self.assertIn(str(sproc.pid), str(p)) michael@0: # python shows up as 'Python' in cmdline on OS X so test fails on OS X michael@0: if not OSX: michael@0: self.assertIn(os.path.basename(PYTHON), str(p)) michael@0: sproc = get_test_subprocess() michael@0: p = psutil.Process(sproc.pid) michael@0: p.kill() michael@0: p.wait() michael@0: self.assertIn(str(sproc.pid), str(p)) michael@0: self.assertIn("terminated", str(p)) michael@0: michael@0: @unittest.skipIf(LINUX, 'PID 0 not available on Linux') michael@0: def test_pid_0(self): michael@0: # Process(0) is supposed to work on all platforms except Linux michael@0: p = psutil.Process(0) michael@0: self.assertTrue(p.name) michael@0: michael@0: if os.name == 'posix': michael@0: try: michael@0: self.assertEqual(p.uids.real, 0) michael@0: self.assertEqual(p.gids.real, 0) michael@0: except psutil.AccessDenied: michael@0: pass michael@0: michael@0: self.assertIn(p.ppid, (0, 1)) michael@0: #self.assertEqual(p.exe, "") michael@0: p.cmdline michael@0: try: michael@0: p.get_num_threads() michael@0: except psutil.AccessDenied: michael@0: pass michael@0: michael@0: try: michael@0: p.get_memory_info() michael@0: except psutil.AccessDenied: michael@0: pass michael@0: michael@0: # username property michael@0: try: michael@0: if POSIX: michael@0: self.assertEqual(p.username, 'root') michael@0: elif WINDOWS: michael@0: self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') michael@0: else: michael@0: p.username michael@0: except psutil.AccessDenied: michael@0: pass michael@0: michael@0: self.assertIn(0, psutil.get_pid_list()) michael@0: self.assertTrue(psutil.pid_exists(0)) michael@0: michael@0: def test__all__(self): michael@0: for name in dir(psutil): michael@0: if name in ('callable', 'defaultdict', 'error', 'namedtuple', michael@0: 'test'): michael@0: continue michael@0: if not name.startswith('_'): michael@0: try: michael@0: __import__(name) michael@0: except ImportError: michael@0: if name not in psutil.__all__: michael@0: fun = getattr(psutil, name) michael@0: if fun is None: michael@0: continue michael@0: if 'deprecated' not in fun.__doc__.lower(): michael@0: self.fail('%r not in psutil.__all__' % name) michael@0: michael@0: def test_Popen(self): michael@0: # Popen class test michael@0: # XXX this test causes a ResourceWarning on Python 3 because michael@0: # psutil.__subproc instance doesn't get propertly freed. michael@0: # Not sure what to do though. michael@0: cmd = [PYTHON, "-c", "import time; time.sleep(2);"] michael@0: proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) michael@0: try: michael@0: proc.name michael@0: proc.stdin michael@0: self.assertTrue(hasattr(proc, 'name')) michael@0: self.assertTrue(hasattr(proc, 'stdin')) michael@0: self.assertRaises(AttributeError, getattr, proc, 'foo') michael@0: finally: michael@0: proc.kill() michael@0: proc.wait() michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Featch all processes test michael@0: # =================================================================== michael@0: michael@0: class TestFetchAllProcesses(unittest.TestCase): michael@0: # Iterates over all running processes and performs some sanity michael@0: # checks against Process API's returned values. michael@0: michael@0: def setUp(self): michael@0: if POSIX: michael@0: import pwd michael@0: pall = pwd.getpwall() michael@0: self._uids = set([x.pw_uid for x in pall]) michael@0: self._usernames = set([x.pw_name for x in pall]) michael@0: michael@0: def test_fetch_all(self): michael@0: valid_procs = 0 michael@0: excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', michael@0: 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice', michael@0: 'parent', 'get_children', 'pid'] michael@0: attrs = [] michael@0: for name in dir(psutil.Process): michael@0: if name.startswith("_"): michael@0: continue michael@0: if name.startswith("set_"): michael@0: continue michael@0: if name in excluded_names: michael@0: continue michael@0: attrs.append(name) michael@0: michael@0: default = object() michael@0: failures = [] michael@0: for name in attrs: michael@0: for p in psutil.process_iter(): michael@0: ret = default michael@0: try: michael@0: try: michael@0: attr = getattr(p, name, None) michael@0: if attr is not None and callable(attr): michael@0: ret = attr() michael@0: else: michael@0: ret = attr michael@0: valid_procs += 1 michael@0: except NotImplementedError: michael@0: register_warning("%r was skipped because not " michael@0: "implemented" % (self.__class__.__name__ + \ michael@0: '.test_' + name)) michael@0: except (psutil.NoSuchProcess, psutil.AccessDenied): michael@0: err = sys.exc_info()[1] michael@0: if isinstance(err, psutil.NoSuchProcess): michael@0: if psutil.pid_exists(p.pid): michael@0: # XXX race condition; we probably need michael@0: # to try figuring out the process michael@0: # identity before failing michael@0: self.fail("PID still exists but fun raised " \ michael@0: "NoSuchProcess") michael@0: self.assertEqual(err.pid, p.pid) michael@0: if err.name: michael@0: # make sure exception's name attr is set michael@0: # with the actual process name michael@0: self.assertEqual(err.name, p.name) michael@0: self.assertTrue(str(err)) michael@0: self.assertTrue(err.msg) michael@0: else: michael@0: if ret not in (0, 0.0, [], None, ''): michael@0: assert ret, ret michael@0: meth = getattr(self, name) michael@0: meth(ret) michael@0: except Exception: michael@0: err = sys.exc_info()[1] michael@0: s = '\n' + '=' * 70 + '\n' michael@0: s += "FAIL: test_%s (proc=%s" % (name, p) michael@0: if ret != default: michael@0: s += ", ret=%s)" % repr(ret) michael@0: s += ')\n' michael@0: s += '-' * 70 michael@0: s += "\n%s" % traceback.format_exc() michael@0: s = "\n".join((" " * 4) + i for i in s.splitlines()) michael@0: failures.append(s) michael@0: break michael@0: michael@0: if failures: michael@0: self.fail(''.join(failures)) michael@0: michael@0: # we should always have a non-empty list, not including PID 0 etc. michael@0: # special cases. michael@0: self.assertTrue(valid_procs > 0) michael@0: michael@0: def cmdline(self, ret): michael@0: pass michael@0: michael@0: def exe(self, ret): michael@0: if not ret: michael@0: self.assertEqual(ret, '') michael@0: else: michael@0: assert os.path.isabs(ret), ret michael@0: # Note: os.stat() may return False even if the file is there michael@0: # hence we skip the test, see: michael@0: # http://stackoverflow.com/questions/3112546/os-path-exists-lies michael@0: if POSIX: michael@0: assert os.path.isfile(ret), ret michael@0: if hasattr(os, 'access') and hasattr(os, "X_OK"): michael@0: # XXX may fail on OSX michael@0: self.assertTrue(os.access(ret, os.X_OK)) michael@0: michael@0: def ppid(self, ret): michael@0: self.assertTrue(ret >= 0) michael@0: michael@0: def name(self, ret): michael@0: self.assertTrue(isinstance(ret, str)) michael@0: self.assertTrue(ret) michael@0: michael@0: def create_time(self, ret): michael@0: self.assertTrue(ret > 0) michael@0: # this can't be taken for granted on all platforms michael@0: #self.assertGreaterEqual(ret, psutil.BOOT_TIME) michael@0: # make sure returned value can be pretty printed michael@0: # with strftime michael@0: time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) michael@0: michael@0: def uids(self, ret): michael@0: for uid in ret: michael@0: self.assertTrue(uid >= 0) michael@0: self.assertIn(uid, self._uids) michael@0: michael@0: def gids(self, ret): michael@0: # note: testing all gids as above seems not to be reliable for michael@0: # gid == 30 (nodoby); not sure why. michael@0: for gid in ret: michael@0: self.assertTrue(gid >= 0) michael@0: #self.assertIn(uid, self.gids) michael@0: michael@0: def username(self, ret): michael@0: self.assertTrue(ret) michael@0: if os.name == 'posix': michael@0: self.assertIn(ret, self._usernames) michael@0: michael@0: def status(self, ret): michael@0: self.assertTrue(ret >= 0) michael@0: self.assertTrue(str(ret) != '?') michael@0: michael@0: def get_io_counters(self, ret): michael@0: for field in ret: michael@0: if field != -1: michael@0: self.assertTrue(field >= 0) michael@0: michael@0: def get_ionice(self, ret): michael@0: if LINUX: michael@0: self.assertTrue(ret.ioclass >= 0) michael@0: self.assertTrue(ret.value >= 0) michael@0: else: michael@0: self.assertTrue(ret >= 0) michael@0: self.assertIn(ret, (0, 1, 2)) michael@0: michael@0: def get_num_threads(self, ret): michael@0: self.assertTrue(ret >= 1) michael@0: michael@0: def get_threads(self, ret): michael@0: for t in ret: michael@0: self.assertTrue(t.id >= 0) michael@0: self.assertTrue(t.user_time >= 0) michael@0: self.assertTrue(t.system_time >= 0) michael@0: michael@0: def get_cpu_times(self, ret): michael@0: self.assertTrue(ret.user >= 0) michael@0: self.assertTrue(ret.system >= 0) michael@0: michael@0: def get_memory_info(self, ret): michael@0: self.assertTrue(ret.rss >= 0) michael@0: self.assertTrue(ret.vms >= 0) michael@0: michael@0: def get_ext_memory_info(self, ret): michael@0: for name in ret._fields: michael@0: self.assertTrue(getattr(ret, name) >= 0) michael@0: if POSIX and ret.vms != 0: michael@0: # VMS is always supposed to be the highest michael@0: for name in ret._fields: michael@0: if name != 'vms': michael@0: value = getattr(ret, name) michael@0: assert ret.vms > value, ret michael@0: elif WINDOWS: michael@0: assert ret.peak_wset >= ret.wset, ret michael@0: assert ret.peak_paged_pool >= ret.paged_pool, ret michael@0: assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret michael@0: assert ret.peak_pagefile >= ret.pagefile, ret michael@0: michael@0: def get_open_files(self, ret): michael@0: for f in ret: michael@0: if WINDOWS: michael@0: assert f.fd == -1, f michael@0: else: michael@0: self.assertIsInstance(f.fd, int) michael@0: assert os.path.isabs(f.path), f michael@0: assert os.path.isfile(f.path), f michael@0: michael@0: def get_num_fds(self, ret): michael@0: self.assertTrue(ret >= 0) michael@0: michael@0: def get_connections(self, ret): michael@0: # all values are supposed to match Linux's tcp_states.h states michael@0: # table across all platforms. michael@0: valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", michael@0: "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", michael@0: "LAST_ACK", "LISTEN", "CLOSING", "NONE"] michael@0: if SUNOS: michael@0: valid_conn_states += ["IDLE", "BOUND"] michael@0: if WINDOWS: michael@0: valid_conn_states += ["DELETE_TCB"] michael@0: for conn in ret: michael@0: self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM)) michael@0: self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6)) michael@0: check_ip_address(conn.laddr, conn.family) michael@0: check_ip_address(conn.raddr, conn.family) michael@0: if conn.status not in valid_conn_states: michael@0: self.fail("%s is not a valid status" % conn.status) michael@0: # actually try to bind the local socket; ignore IPv6 michael@0: # sockets as their address might be represented as michael@0: # an IPv4-mapped-address (e.g. "::127.0.0.1") michael@0: # and that's rejected by bind() michael@0: if conn.family == socket.AF_INET: michael@0: s = socket.socket(conn.family, conn.type) michael@0: s.bind((conn.laddr[0], 0)) michael@0: s.close() michael@0: michael@0: if not WINDOWS and hasattr(socket, 'fromfd'): michael@0: dupsock = None michael@0: try: michael@0: try: michael@0: dupsock = socket.fromfd(conn.fd, conn.family, conn.type) michael@0: except (socket.error, OSError): michael@0: err = sys.exc_info()[1] michael@0: if err.args[0] == errno.EBADF: michael@0: continue michael@0: raise michael@0: # python >= 2.5 michael@0: if hasattr(dupsock, "family"): michael@0: self.assertEqual(dupsock.family, conn.family) michael@0: self.assertEqual(dupsock.type, conn.type) michael@0: finally: michael@0: if dupsock is not None: michael@0: dupsock.close() michael@0: michael@0: def getcwd(self, ret): michael@0: if ret is not None: # BSD may return None michael@0: assert os.path.isabs(ret), ret michael@0: try: michael@0: st = os.stat(ret) michael@0: except OSError: michael@0: err = sys.exc_info()[1] michael@0: # directory has been removed in mean time michael@0: if err.errno != errno.ENOENT: michael@0: raise michael@0: else: michael@0: self.assertTrue(stat.S_ISDIR(st.st_mode)) michael@0: michael@0: def get_memory_percent(self, ret): michael@0: assert 0 <= ret <= 100, ret michael@0: michael@0: def is_running(self, ret): michael@0: self.assertTrue(ret) michael@0: michael@0: def get_cpu_affinity(self, ret): michael@0: assert ret != [], ret michael@0: michael@0: def terminal(self, ret): michael@0: if ret is not None: michael@0: assert os.path.isabs(ret), ret michael@0: assert os.path.exists(ret), ret michael@0: michael@0: def get_memory_maps(self, ret): michael@0: for nt in ret: michael@0: for fname in nt._fields: michael@0: value = getattr(nt, fname) michael@0: if fname == 'path': michael@0: if not value.startswith('['): michael@0: assert os.path.isabs(nt.path), nt.path michael@0: # commented as on Linux we might get '/foo/bar (deleted)' michael@0: #assert os.path.exists(nt.path), nt.path michael@0: elif fname in ('addr', 'perms'): michael@0: self.assertTrue(value) michael@0: else: michael@0: self.assertIsInstance(value, (int, long)) michael@0: assert value >= 0, value michael@0: michael@0: def get_num_handles(self, ret): michael@0: if WINDOWS: michael@0: self.assertGreaterEqual(ret, 0) michael@0: else: michael@0: self.assertGreaterEqual(ret, 0) michael@0: michael@0: def get_nice(self, ret): michael@0: if POSIX: michael@0: assert -20 <= ret <= 20, ret michael@0: else: michael@0: priorities = [getattr(psutil, x) for x in dir(psutil) michael@0: if x.endswith('_PRIORITY_CLASS')] michael@0: self.assertIn(ret, priorities) michael@0: michael@0: def get_num_ctx_switches(self, ret): michael@0: self.assertTrue(ret.voluntary >= 0) michael@0: self.assertTrue(ret.involuntary >= 0) michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Limited user tests michael@0: # =================================================================== michael@0: michael@0: if hasattr(os, 'getuid') and os.getuid() == 0: michael@0: class LimitedUserTestCase(TestProcess): michael@0: """Repeat the previous tests by using a limited user. michael@0: Executed only on UNIX and only if the user who run the test script michael@0: is root. michael@0: """ michael@0: # the uid/gid the test suite runs under michael@0: PROCESS_UID = os.getuid() michael@0: PROCESS_GID = os.getgid() michael@0: michael@0: def __init__(self, *args, **kwargs): michael@0: TestProcess.__init__(self, *args, **kwargs) michael@0: # re-define all existent test methods in order to michael@0: # ignore AccessDenied exceptions michael@0: for attr in [x for x in dir(self) if x.startswith('test')]: michael@0: meth = getattr(self, attr) michael@0: def test_(self): michael@0: try: michael@0: meth() michael@0: except psutil.AccessDenied: michael@0: pass michael@0: setattr(self, attr, types.MethodType(test_, self)) michael@0: michael@0: def setUp(self): michael@0: os.setegid(1000) michael@0: os.seteuid(1000) michael@0: TestProcess.setUp(self) michael@0: michael@0: def tearDown(self): michael@0: os.setegid(self.PROCESS_UID) michael@0: os.seteuid(self.PROCESS_GID) michael@0: TestProcess.tearDown(self) michael@0: michael@0: def test_nice(self): michael@0: try: michael@0: psutil.Process(os.getpid()).set_nice(-1) michael@0: except psutil.AccessDenied: michael@0: pass michael@0: else: michael@0: self.fail("exception not raised") michael@0: michael@0: michael@0: # =================================================================== michael@0: # --- Example script tests michael@0: # =================================================================== michael@0: michael@0: class TestExampleScripts(unittest.TestCase): michael@0: """Tests for scripts in the examples directory.""" michael@0: michael@0: def assert_stdout(self, exe, args=None): michael@0: exe = os.path.join(EXAMPLES_DIR, exe) michael@0: if args: michael@0: exe = exe + ' ' + args michael@0: try: michael@0: out = sh(sys.executable + ' ' + exe).strip() michael@0: except RuntimeError: michael@0: err = sys.exc_info()[1] michael@0: if 'AccessDenied' in str(err): michael@0: return str(err) michael@0: else: michael@0: raise michael@0: assert out, out michael@0: return out michael@0: michael@0: def assert_syntax(self, exe, args=None): michael@0: exe = os.path.join(EXAMPLES_DIR, exe) michael@0: f = open(exe, 'r') michael@0: try: michael@0: src = f.read() michael@0: finally: michael@0: f.close() michael@0: ast.parse(src) michael@0: michael@0: def test_check_presence(self): michael@0: # make sure all example scripts have a test method defined michael@0: meths = dir(self) michael@0: for name in os.listdir(EXAMPLES_DIR): michael@0: if name.endswith('.py'): michael@0: if 'test_' + os.path.splitext(name)[0] not in meths: michael@0: #self.assert_stdout(name) michael@0: self.fail('no test defined for %r script' \ michael@0: % os.path.join(EXAMPLES_DIR, name)) michael@0: michael@0: def test_disk_usage(self): michael@0: self.assert_stdout('disk_usage.py') michael@0: michael@0: def test_free(self): michael@0: self.assert_stdout('free.py') michael@0: michael@0: def test_meminfo(self): michael@0: self.assert_stdout('meminfo.py') michael@0: michael@0: def test_process_detail(self): michael@0: self.assert_stdout('process_detail.py') michael@0: michael@0: def test_who(self): michael@0: self.assert_stdout('who.py') michael@0: michael@0: def test_netstat(self): michael@0: self.assert_stdout('netstat.py') michael@0: michael@0: def test_pmap(self): michael@0: self.assert_stdout('pmap.py', args=str(os.getpid())) michael@0: michael@0: @unittest.skipIf(ast is None, michael@0: 'ast module not available on this python version') michael@0: def test_killall(self): michael@0: self.assert_syntax('killall.py') michael@0: michael@0: @unittest.skipIf(ast is None, michael@0: 'ast module not available on this python version') michael@0: def test_nettop(self): michael@0: self.assert_syntax('nettop.py') michael@0: michael@0: @unittest.skipIf(ast is None, michael@0: 'ast module not available on this python version') michael@0: def test_top(self): michael@0: self.assert_syntax('top.py') michael@0: michael@0: @unittest.skipIf(ast is None, michael@0: 'ast module not available on this python version') michael@0: def test_iotop(self): michael@0: self.assert_syntax('iotop.py') michael@0: michael@0: michael@0: def cleanup(): michael@0: reap_children(search_all=True) michael@0: DEVNULL.close() michael@0: safe_remove(TESTFN) michael@0: michael@0: atexit.register(cleanup) michael@0: safe_remove(TESTFN) michael@0: michael@0: def test_main(): michael@0: tests = [] michael@0: test_suite = unittest.TestSuite() michael@0: tests.append(TestSystemAPIs) michael@0: tests.append(TestProcess) michael@0: tests.append(TestFetchAllProcesses) michael@0: michael@0: if POSIX: michael@0: from _posix import PosixSpecificTestCase michael@0: tests.append(PosixSpecificTestCase) michael@0: michael@0: # import the specific platform test suite michael@0: if LINUX: michael@0: from _linux import LinuxSpecificTestCase as stc michael@0: elif WINDOWS: michael@0: from _windows import WindowsSpecificTestCase as stc michael@0: from _windows import TestDualProcessImplementation michael@0: tests.append(TestDualProcessImplementation) michael@0: elif OSX: michael@0: from _osx import OSXSpecificTestCase as stc michael@0: elif BSD: michael@0: from _bsd import BSDSpecificTestCase as stc michael@0: elif SUNOS: michael@0: from _sunos import SunOSSpecificTestCase as stc michael@0: tests.append(stc) michael@0: michael@0: if hasattr(os, 'getuid'): michael@0: if 'LimitedUserTestCase' in globals(): michael@0: tests.append(LimitedUserTestCase) michael@0: else: michael@0: register_warning("LimitedUserTestCase was skipped (super-user " michael@0: "privileges are required)") michael@0: michael@0: tests.append(TestExampleScripts) michael@0: michael@0: for test_class in tests: michael@0: test_suite.addTest(unittest.makeSuite(test_class)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)