python/psutil/test/test_psutil.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

     1 #!/usr/bin/env python
     3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     4 # Use of this source code is governed by a BSD-style license that can be
     5 # found in the LICENSE file.
     7 """
     8 psutil test suite (you can quickly run it with "python setup.py test").
    10 Note: this is targeted for both python 2.x and 3.x so there's no need
    11 to use 2to3 tool first.
    13 If you're on Python < 2.7 it is recommended to install unittest2 module
    14 from: https://pypi.python.org/pypi/unittest2
    15 """
    17 from __future__ import division
    18 import os
    19 import sys
    20 import subprocess
    21 import time
    22 import signal
    23 import types
    24 import traceback
    25 import socket
    26 import warnings
    27 import atexit
    28 import errno
    29 import threading
    30 import tempfile
    31 import stat
    32 import collections
    33 import datetime
    34 try:
    35     import unittest2 as unittest  # pyhon < 2.7 + unittest2 installed
    36 except ImportError:
    37     import unittest
    38 try:
    39     import ast  # python >= 2.6
    40 except ImportError:
    41     ast = None
    43 import psutil
    44 from psutil._compat import PY3, callable, long, wraps
    47 # ===================================================================
    48 # --- Constants
    49 # ===================================================================
    51 # conf for retry_before_failing() decorator
    52 NO_RETRIES = 10
    53 # bytes tolerance for OS memory related tests
    54 TOLERANCE = 500 * 1024  # 500KB
    56 PYTHON = os.path.realpath(sys.executable)
    57 DEVNULL = open(os.devnull, 'r+')
    58 TESTFN = os.path.join(os.getcwd(), "$testfile")
    59 EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(
    60                                os.path.dirname(__file__)), 'examples'))
    61 POSIX = os.name == 'posix'
    62 LINUX = sys.platform.startswith("linux")
    63 WINDOWS = sys.platform.startswith("win32")
    64 OSX = sys.platform.startswith("darwin")
    65 BSD = sys.platform.startswith("freebsd")
    66 SUNOS = sys.platform.startswith("sunos")
    69 # ===================================================================
    70 # --- Utility functions
    71 # ===================================================================
    73 _subprocesses_started = set()
    75 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL,
    76                         wait=False):
    77     """Return a subprocess.Popen object to use in tests.
    78     By default stdout and stderr are redirected to /dev/null and the
    79     python interpreter is used as test process.
    80     If 'wait' is True attemps to make sure the process is in a
    81     reasonably initialized state.
    82     """
    83     if cmd is None:
    84         pyline = ""
    85         if wait:
    86             pyline += "open(r'%s', 'w'); " % TESTFN
    87         pyline += "import time; time.sleep(2);"
    88         cmd_ = [PYTHON, "-c", pyline]
    89     else:
    90         cmd_ = cmd
    91     sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
    92     if wait:
    93         if cmd is None:
    94             stop_at = time.time() + 3
    95             while stop_at > time.time():
    96                 if os.path.exists(TESTFN):
    97                     break
    98                 time.sleep(0.001)
    99             else:
   100                 warn("couldn't make sure test file was actually created")
   101         else:
   102             wait_for_pid(sproc.pid)
   103     _subprocesses_started.add(sproc.pid)
   104     return sproc
   106 def warn(msg):
   107     """Raise a warning msg."""
   108     warnings.warn(msg, UserWarning)
   110 def register_warning(msg):
   111     """Register a warning which will be printed on interpreter exit."""
   112     atexit.register(lambda: warn(msg))
   114 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
   115     """run cmd in a subprocess and return its output.
   116     raises RuntimeError on error.
   117     """
   118     p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
   119     stdout, stderr = p.communicate()
   120     if p.returncode != 0:
   121         raise RuntimeError(stderr)
   122     if stderr:
   123         warn(stderr)
   124     if PY3:
   125         stdout = str(stdout, sys.stdout.encoding)
   126     return stdout.strip()
   128 def which(program):
   129     """Same as UNIX which command. Return None on command not found."""
   130     def is_exe(fpath):
   131         return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
   133     fpath, fname = os.path.split(program)
   134     if fpath:
   135         if is_exe(program):
   136             return program
   137     else:
   138         for path in os.environ["PATH"].split(os.pathsep):
   139             exe_file = os.path.join(path, program)
   140             if is_exe(exe_file):
   141                 return exe_file
   142     return None
   144 def wait_for_pid(pid, timeout=1):
   145     """Wait for pid to show up in the process list then return.
   146     Used in the test suite to give time the sub process to initialize.
   147     """
   148     raise_at = time.time() + timeout
   149     while 1:
   150         if pid in psutil.get_pid_list():
   151             # give it one more iteration to allow full initialization
   152             time.sleep(0.01)
   153             return
   154         time.sleep(0.0001)
   155         if time.time() >= raise_at:
   156             raise RuntimeError("Timed out")
   158 def reap_children(search_all=False):
   159     """Kill any subprocess started by this test suite and ensure that
   160     no zombies stick around to hog resources and create problems when
   161     looking for refleaks.
   162     """
   163     pids = _subprocesses_started
   164     if search_all:
   165         this_process = psutil.Process(os.getpid())
   166         for p in this_process.get_children(recursive=True):
   167             pids.add(p.pid)
   168     while pids:
   169         pid = pids.pop()
   170         try:
   171             child = psutil.Process(pid)
   172             child.kill()
   173         except psutil.NoSuchProcess:
   174             pass
   175         except psutil.AccessDenied:
   176             warn("couldn't kill child process with pid %s" % pid)
   177         else:
   178             child.wait(timeout=3)
   180 def check_ip_address(addr, family):
   181     """Attempts to check IP address's validity."""
   182     if not addr:
   183         return
   184     ip, port = addr
   185     assert isinstance(port, int), port
   186     if family == socket.AF_INET:
   187         ip = list(map(int, ip.split('.')))
   188         assert len(ip) == 4, ip
   189         for num in ip:
   190             assert 0 <= num <= 255, ip
   191     assert 0 <= port <= 65535, port
   193 def safe_remove(fname):
   194     """Deletes a file and does not exception if it doesn't exist."""
   195     try:
   196         os.remove(fname)
   197     except OSError:
   198         err = sys.exc_info()[1]
   199         if err.args[0] != errno.ENOENT:
   200             raise
   202 def call_until(fun, expr, timeout=1):
   203     """Keep calling function for timeout secs and exit if eval()
   204     expression is True.
   205     """
   206     stop_at = time.time() + timeout
   207     while time.time() < stop_at:
   208         ret = fun()
   209         if eval(expr):
   210             return ret
   211         time.sleep(0.001)
   212     raise RuntimeError('timed out (ret=%r)' % ret)
   214 def retry_before_failing(ntimes=None):
   215     """Decorator which runs a test function and retries N times before
   216     actually failing.
   217     """
   218     def decorator(fun):
   219         @wraps(fun)
   220         def wrapper(*args, **kwargs):
   221             for x in range(ntimes or NO_RETRIES):
   222                 try:
   223                     return fun(*args, **kwargs)
   224                 except AssertionError:
   225                     err = sys.exc_info()[1]
   226             raise
   227         return wrapper
   228     return decorator
   230 def skip_on_access_denied(only_if=None):
   231     """Decorator to Ignore AccessDenied exceptions."""
   232     def decorator(fun):
   233         @wraps(fun)
   234         def wrapper(*args, **kwargs):
   235             try:
   236                 return fun(*args, **kwargs)
   237             except psutil.AccessDenied:
   238                 if only_if is not None:
   239                     if not only_if:
   240                         raise
   241                 msg = "%r was skipped because it raised AccessDenied" \
   242                       % fun.__name__
   243                 self = args[0]
   244                 if hasattr(self, 'skip'):  # python >= 2.7
   245                     self.skip(msg)
   246                 else:
   247                     register_warning(msg)
   248         return wrapper
   249     return decorator
   251 def skip_on_not_implemented(only_if=None):
   252     """Decorator to Ignore NotImplementedError exceptions."""
   253     def decorator(fun):
   254         @wraps(fun)
   255         def wrapper(*args, **kwargs):
   256             try:
   257                 return fun(*args, **kwargs)
   258             except NotImplementedError:
   259                 if only_if is not None:
   260                     if not only_if:
   261                         raise
   262                 msg = "%r was skipped because it raised NotImplementedError" \
   263                       % fun.__name__
   264                 self = args[0]
   265                 if hasattr(self, 'skip'):  # python >= 2.7
   266                     self.skip(msg)
   267                 else:
   268                     register_warning(msg)
   269         return wrapper
   270     return decorator
   272 def supports_ipv6():
   273     """Return True if IPv6 is supported on this platform."""
   274     if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
   275         return False
   276     sock = None
   277     try:
   278         try:
   279             sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
   280             sock.bind(("::1", 0))
   281         except (socket.error, socket.gaierror):
   282             return False
   283         else:
   284             return True
   285     finally:
   286         if sock is not None:
   287             sock.close()
   290 class ThreadTask(threading.Thread):
   291     """A thread object used for running process thread tests."""
   293     def __init__(self):
   294         threading.Thread.__init__(self)
   295         self._running = False
   296         self._interval = None
   297         self._flag = threading.Event()
   299     def __repr__(self):
   300         name =  self.__class__.__name__
   301         return '<%s running=%s at %#x>' % (name, self._running, id(self))
   303     def start(self, interval=0.001):
   304         """Start thread and keep it running until an explicit
   305         stop() request. Polls for shutdown every 'timeout' seconds.
   306         """
   307         if self._running:
   308             raise ValueError("already started")
   309         self._interval = interval
   310         threading.Thread.start(self)
   311         self._flag.wait()
   313     def run(self):
   314         self._running = True
   315         self._flag.set()
   316         while self._running:
   317             time.sleep(self._interval)
   319     def stop(self):
   320         """Stop thread execution and and waits until it is stopped."""
   321         if not self._running:
   322             raise ValueError("already stopped")
   323         self._running = False
   324         self.join()
   327 # ===================================================================
   328 # --- Support for python < 2.7 in case unittest2 is not installed
   329 # ===================================================================
   331 if not hasattr(unittest, 'skip'):
   332     register_warning("unittest2 module is not installed; a serie of pretty " \
   333                      "darn ugly workarounds will be used")
   335     class SkipTest(Exception):
   336         pass
   338     class TestCase(unittest.TestCase):
   340         def _safe_repr(self, obj):
   341             MAX_LENGTH = 80
   342             try:
   343                 result = repr(obj)
   344             except Exception:
   345                 result = object.__repr__(obj)
   346             if len(result) < MAX_LENGTH:
   347                 return result
   348             return result[:MAX_LENGTH] + ' [truncated]...'
   350         def _fail_w_msg(self, a, b, middle, msg):
   351             self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle,
   352                                            self._safe_repr(b)))
   354         def skip(self, msg):
   355             raise SkipTest(msg)
   357         def assertIn(self, a, b, msg=None):
   358             if a not in b:
   359                 self._fail_w_msg(a, b, 'not found in', msg)
   361         def assertNotIn(self, a, b, msg=None):
   362             if a in b:
   363                 self._fail_w_msg(a, b, 'found in', msg)
   365         def assertGreater(self, a, b, msg=None):
   366             if not a > b:
   367                 self._fail_w_msg(a, b, 'not greater than', msg)
   369         def assertGreaterEqual(self, a, b, msg=None):
   370             if not a >= b:
   371                 self._fail_w_msg(a, b, 'not greater than or equal to', msg)
   373         def assertLess(self, a, b, msg=None):
   374             if not a < b:
   375                 self._fail_w_msg(a, b, 'not less than', msg)
   377         def assertLessEqual(self, a, b, msg=None):
   378             if not a <= b:
   379                 self._fail_w_msg(a, b, 'not less or equal to', msg)
   381         def assertIsInstance(self, a, b, msg=None):
   382             if not isinstance(a, b):
   383                 self.fail(msg or '%s is not an instance of %r' \
   384                         % (self._safe_repr(a), b))
   386         def assertAlmostEqual(self, a, b, msg=None, delta=None):
   387             if delta is not None:
   388                 if abs(a - b) <= delta:
   389                     return
   390                 self.fail(msg or '%s != %s within %s delta' \
   391                                   % (self._safe_repr(a), self._safe_repr(b),
   392                                      self._safe_repr(delta)))
   393             else:
   394                 self.assertEqual(a, b, msg=msg)
   397     def skipIf(condition, reason):
   398         def decorator(fun):
   399             @wraps(fun)
   400             def wrapper(*args, **kwargs):
   401                 self = args[0]
   402                 if condition:
   403                     sys.stdout.write("skipped-")
   404                     sys.stdout.flush()
   405                     if warn:
   406                         objname = "%s.%s" % (self.__class__.__name__,
   407                                              fun.__name__)
   408                         msg = "%s was skipped" % objname
   409                         if reason:
   410                             msg += "; reason: " + repr(reason)
   411                         register_warning(msg)
   412                     return
   413                 else:
   414                     return fun(*args, **kwargs)
   415             return wrapper
   416         return decorator
   418     def skipUnless(condition, reason):
   419         if not condition:
   420             return unittest.skipIf(True, reason)
   421         return unittest.skipIf(False, reason)
   423     unittest.TestCase = TestCase
   424     unittest.skipIf = skipIf
   425     unittest.skipUnless = skipUnless
   426     del TestCase, skipIf, skipUnless
   429 # ===================================================================
   430 # --- System-related API tests
   431 # ===================================================================
   433 class TestSystemAPIs(unittest.TestCase):
   434     """Tests for system-related APIs."""
   436     def setUp(self):
   437         safe_remove(TESTFN)
   439     def tearDown(self):
   440         reap_children()
   442     def test_process_iter(self):
   443         self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
   444         sproc = get_test_subprocess()
   445         self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
   446         p = psutil.Process(sproc.pid)
   447         p.kill()
   448         p.wait()
   449         self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
   451     def test_TOTAL_PHYMEM(self):
   452         x = psutil.TOTAL_PHYMEM
   453         self.assertIsInstance(x, (int, long))
   454         self.assertGreater(x, 0)
   455         self.assertEqual(x, psutil.virtual_memory().total)
   457     def test_BOOT_TIME(self, arg=None):
   458         x = arg or psutil.BOOT_TIME
   459         self.assertIsInstance(x, float)
   460         self.assertGreater(x, 0)
   461         self.assertLess(x, time.time())
   463     def test_get_boot_time(self):
   464         self.test_BOOT_TIME(psutil.get_boot_time())
   465         if WINDOWS:
   466             # work around float precision issues; give it 1 secs tolerance
   467             diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME)
   468             self.assertLess(diff, 1)
   469         else:
   470             self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME)
   472     def test_NUM_CPUS(self):
   473         self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True)))
   474         self.assertGreaterEqual(psutil.NUM_CPUS, 1)
   476     @unittest.skipUnless(POSIX, 'posix only')
   477     def test_PAGESIZE(self):
   478         # pagesize is used internally to perform different calculations
   479         # and it's determined by using SC_PAGE_SIZE; make sure
   480         # getpagesize() returns the same value.
   481         import resource
   482         self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
   484     def test_deprecated_apis(self):
   485         s = socket.socket()
   486         s.bind(('localhost', 0))
   487         s.listen(1)
   488         warnings.filterwarnings("error")
   489         p = psutil.Process(os.getpid())
   490         try:
   491             self.assertRaises(DeprecationWarning, psutil.virtmem_usage)
   492             self.assertRaises(DeprecationWarning, psutil.used_phymem)
   493             self.assertRaises(DeprecationWarning, psutil.avail_phymem)
   494             self.assertRaises(DeprecationWarning, psutil.total_virtmem)
   495             self.assertRaises(DeprecationWarning, psutil.used_virtmem)
   496             self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
   497             self.assertRaises(DeprecationWarning, psutil.phymem_usage)
   498             self.assertRaises(DeprecationWarning, psutil.get_process_list)
   499             self.assertRaises(DeprecationWarning, psutil.network_io_counters)
   500             if LINUX:
   501                 self.assertRaises(DeprecationWarning, psutil.phymem_buffers)
   502                 self.assertRaises(DeprecationWarning, psutil.cached_phymem)
   503             try:
   504                 p.nice
   505             except DeprecationWarning:
   506                 pass
   507             else:
   508                 self.fail("p.nice didn't raise DeprecationWarning")
   509             ret = call_until(p.get_connections, "len(ret) != 0", timeout=1)
   510             self.assertRaises(DeprecationWarning,
   511                               getattr, ret[0], 'local_address')
   512             self.assertRaises(DeprecationWarning,
   513                               getattr, ret[0], 'remote_address')
   514         finally:
   515             s.close()
   516             warnings.resetwarnings()
   518     def test_deprecated_apis_retval(self):
   519         warnings.filterwarnings("ignore")
   520         p = psutil.Process(os.getpid())
   521         try:
   522             self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total)
   523             self.assertEqual(p.nice, p.get_nice())
   524         finally:
   525             warnings.resetwarnings()
   527     def test_virtual_memory(self):
   528         mem = psutil.virtual_memory()
   529         assert mem.total > 0, mem
   530         assert mem.available > 0, mem
   531         assert 0 <= mem.percent <= 100, mem
   532         assert mem.used > 0, mem
   533         assert mem.free >= 0, mem
   534         for name in mem._fields:
   535             if name != 'total':
   536                 value = getattr(mem, name)
   537                 if not value >= 0:
   538                     self.fail("%r < 0 (%s)" % (name, value))
   539                 if value > mem.total:
   540                     self.fail("%r > total (total=%s, %s=%s)" \
   541                               % (name, mem.total, name, value))
   543     def test_swap_memory(self):
   544         mem = psutil.swap_memory()
   545         assert mem.total >= 0, mem
   546         assert mem.used >= 0, mem
   547         assert mem.free > 0, mem
   548         assert 0 <= mem.percent <= 100, mem
   549         assert mem.sin >= 0, mem
   550         assert mem.sout >= 0, mem
   552     def test_pid_exists(self):
   553         sproc = get_test_subprocess(wait=True)
   554         assert psutil.pid_exists(sproc.pid)
   555         p = psutil.Process(sproc.pid)
   556         p.kill()
   557         p.wait()
   558         self.assertFalse(psutil.pid_exists(sproc.pid))
   559         self.assertFalse(psutil.pid_exists(-1))
   561     def test_pid_exists_2(self):
   562         reap_children()
   563         pids = psutil.get_pid_list()
   564         for pid in pids:
   565             try:
   566                 assert psutil.pid_exists(pid)
   567             except AssertionError:
   568                 # in case the process disappeared in meantime fail only
   569                 # if it is no longer in get_pid_list()
   570                 time.sleep(.1)
   571                 if pid in psutil.get_pid_list():
   572                     self.fail(pid)
   573         pids = range(max(pids) + 5000, max(pids) + 6000)
   574         for pid in pids:
   575             self.assertFalse(psutil.pid_exists(pid))
   577     def test_get_pid_list(self):
   578         plist = [x.pid for x in psutil.process_iter()]
   579         pidlist = psutil.get_pid_list()
   580         self.assertEqual(plist.sort(), pidlist.sort())
   581         # make sure every pid is unique
   582         self.assertEqual(len(pidlist), len(set(pidlist)))
   584     def test_test(self):
   585         # test for psutil.test() function
   586         stdout = sys.stdout
   587         sys.stdout = DEVNULL
   588         try:
   589             psutil.test()
   590         finally:
   591             sys.stdout = stdout
   593     def test_sys_cpu_times(self):
   594         total = 0
   595         times = psutil.cpu_times()
   596         sum(times)
   597         for cp_time in times:
   598             self.assertIsInstance(cp_time, float)
   599             self.assertGreaterEqual(cp_time, 0.0)
   600             total += cp_time
   601         self.assertEqual(total, sum(times))
   602         str(times)
   604     def test_sys_cpu_times2(self):
   605         t1 = sum(psutil.cpu_times())
   606         time.sleep(0.1)
   607         t2 = sum(psutil.cpu_times())
   608         difference = t2 - t1
   609         if not difference >= 0.05:
   610             self.fail("difference %s" % difference)
   612     def test_sys_per_cpu_times(self):
   613         for times in psutil.cpu_times(percpu=True):
   614             total = 0
   615             sum(times)
   616             for cp_time in times:
   617                 self.assertIsInstance(cp_time, float)
   618                 self.assertGreaterEqual(cp_time, 0.0)
   619                 total += cp_time
   620             self.assertEqual(total, sum(times))
   621             str(times)
   622         self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
   623                          len(psutil.cpu_times(percpu=False)))
   625     def test_sys_per_cpu_times2(self):
   626         tot1 = psutil.cpu_times(percpu=True)
   627         stop_at = time.time() + 0.1
   628         while 1:
   629             if time.time() >= stop_at:
   630                 break
   631         tot2 = psutil.cpu_times(percpu=True)
   632         for t1, t2 in zip(tot1, tot2):
   633             t1, t2 = sum(t1), sum(t2)
   634             difference = t2 - t1
   635             if difference >= 0.05:
   636                 return
   637         self.fail()
   639     def _test_cpu_percent(self, percent):
   640         self.assertIsInstance(percent, float)
   641         self.assertGreaterEqual(percent, 0.0)
   642         self.assertLessEqual(percent, 100.0)
   644     def test_sys_cpu_percent(self):
   645         psutil.cpu_percent(interval=0.001)
   646         for x in range(1000):
   647             self._test_cpu_percent(psutil.cpu_percent(interval=None))
   649     def test_sys_per_cpu_percent(self):
   650         self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)),
   651                          psutil.NUM_CPUS)
   652         for x in range(1000):
   653             percents = psutil.cpu_percent(interval=None, percpu=True)
   654             for percent in percents:
   655                 self._test_cpu_percent(percent)
   657     def test_sys_cpu_times_percent(self):
   658         psutil.cpu_times_percent(interval=0.001)
   659         for x in range(1000):
   660             cpu = psutil.cpu_times_percent(interval=None)
   661             for percent in cpu:
   662                 self._test_cpu_percent(percent)
   663             self._test_cpu_percent(sum(cpu))
   665     def test_sys_per_cpu_times_percent(self):
   666         self.assertEqual(len(psutil.cpu_times_percent(interval=0.001,
   667                                                       percpu=True)),
   668                          psutil.NUM_CPUS)
   669         for x in range(1000):
   670             cpus = psutil.cpu_times_percent(interval=None, percpu=True)
   671             for cpu in cpus:
   672                 for percent in cpu:
   673                     self._test_cpu_percent(percent)
   674                 self._test_cpu_percent(sum(cpu))
   676     @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
   677                      "os.statvfs() function not available on this platform")
   678     def test_disk_usage(self):
   679         usage = psutil.disk_usage(os.getcwd())
   680         assert usage.total > 0, usage
   681         assert usage.used > 0, usage
   682         assert usage.free > 0, usage
   683         assert usage.total > usage.used, usage
   684         assert usage.total > usage.free, usage
   685         assert 0 <= usage.percent <= 100, usage.percent
   687         # if path does not exist OSError ENOENT is expected across
   688         # all platforms
   689         fname = tempfile.mktemp()
   690         try:
   691             psutil.disk_usage(fname)
   692         except OSError:
   693             err = sys.exc_info()[1]
   694             if err.args[0] != errno.ENOENT:
   695                 raise
   696         else:
   697             self.fail("OSError not raised")
   699     @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
   700                      "os.statvfs() function not available on this platform")
   701     def test_disk_partitions(self):
   702         # all = False
   703         for disk in psutil.disk_partitions(all=False):
   704             if WINDOWS and 'cdrom' in disk.opts:
   705                 continue
   706             if not POSIX:
   707                 assert os.path.exists(disk.device), disk
   708             else:
   709                 # we cannot make any assumption about this, see:
   710                 # http://goo.gl/p9c43
   711                 disk.device
   712             if SUNOS:
   713                 # on solaris apparently mount points can also be files
   714                 assert os.path.exists(disk.mountpoint), disk
   715             else:
   716                 assert os.path.isdir(disk.mountpoint), disk
   717             assert disk.fstype, disk
   718             self.assertIsInstance(disk.opts, str)
   720         # all = True
   721         for disk in psutil.disk_partitions(all=True):
   722             if not WINDOWS:
   723                 try:
   724                     os.stat(disk.mountpoint)
   725                 except OSError:
   726                     # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
   727                     err = sys.exc_info()[1]
   728                     if err.errno not in (errno.EPERM, errno.EACCES):
   729                         raise
   730                 else:
   731                     if SUNOS:
   732                         # on solaris apparently mount points can also be files
   733                         assert os.path.exists(disk.mountpoint), disk
   734                     else:
   735                         assert os.path.isdir(disk.mountpoint), disk
   736             self.assertIsInstance(disk.fstype, str)
   737             self.assertIsInstance(disk.opts, str)
   739         def find_mount_point(path):
   740             path = os.path.abspath(path)
   741             while not os.path.ismount(path):
   742                 path = os.path.dirname(path)
   743             return path
   745         mount = find_mount_point(__file__)
   746         mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
   747         self.assertIn(mount, mounts)
   748         psutil.disk_usage(mount)
   750     def test_net_io_counters(self):
   751         def check_ntuple(nt):
   752             self.assertEqual(nt[0], nt.bytes_sent)
   753             self.assertEqual(nt[1], nt.bytes_recv)
   754             self.assertEqual(nt[2], nt.packets_sent)
   755             self.assertEqual(nt[3], nt.packets_recv)
   756             self.assertEqual(nt[4], nt.errin)
   757             self.assertEqual(nt[5], nt.errout)
   758             self.assertEqual(nt[6], nt.dropin)
   759             self.assertEqual(nt[7], nt.dropout)
   760             assert nt.bytes_sent >= 0, nt
   761             assert nt.bytes_recv >= 0, nt
   762             assert nt.packets_sent >= 0, nt
   763             assert nt.packets_recv >= 0, nt
   764             assert nt.errin >= 0, nt
   765             assert nt.errout >= 0, nt
   766             assert nt.dropin >= 0, nt
   767             assert nt.dropout >= 0, nt
   769         ret = psutil.net_io_counters(pernic=False)
   770         check_ntuple(ret)
   771         ret = psutil.net_io_counters(pernic=True)
   772         assert ret != []
   773         for key in ret:
   774             assert key
   775             check_ntuple(ret[key])
   777     def test_disk_io_counters(self):
   778         def check_ntuple(nt):
   779             self.assertEqual(nt[0], nt.read_count)
   780             self.assertEqual(nt[1], nt.write_count)
   781             self.assertEqual(nt[2], nt.read_bytes)
   782             self.assertEqual(nt[3], nt.write_bytes)
   783             self.assertEqual(nt[4], nt.read_time)
   784             self.assertEqual(nt[5], nt.write_time)
   785             assert nt.read_count >= 0, nt
   786             assert nt.write_count >= 0, nt
   787             assert nt.read_bytes >= 0, nt
   788             assert nt.write_bytes >= 0, nt
   789             assert nt.read_time >= 0, nt
   790             assert nt.write_time >= 0, nt
   792         ret = psutil.disk_io_counters(perdisk=False)
   793         check_ntuple(ret)
   794         ret = psutil.disk_io_counters(perdisk=True)
   795         # make sure there are no duplicates
   796         self.assertEqual(len(ret), len(set(ret)))
   797         for key in ret:
   798             assert key, key
   799             check_ntuple(ret[key])
   800             if LINUX and key[-1].isdigit():
   801                 # if 'sda1' is listed 'sda' shouldn't, see:
   802                 # http://code.google.com/p/psutil/issues/detail?id=338
   803                 while key[-1].isdigit():
   804                     key = key[:-1]
   805                 self.assertNotIn(key, ret.keys())
   807     def test_get_users(self):
   808         users = psutil.get_users()
   809         assert users
   810         for user in users:
   811             assert user.name, user
   812             user.terminal
   813             user.host
   814             assert user.started > 0.0, user
   815             datetime.datetime.fromtimestamp(user.started)
   818 # ===================================================================
   819 # --- psutil.Process class tests
   820 # ===================================================================
   822 class TestProcess(unittest.TestCase):
   823     """Tests for psutil.Process class."""
   825     def setUp(self):
   826         safe_remove(TESTFN)
   828     def tearDown(self):
   829         reap_children()
   831     def test_kill(self):
   832         sproc = get_test_subprocess(wait=True)
   833         test_pid = sproc.pid
   834         p = psutil.Process(test_pid)
   835         name = p.name
   836         p.kill()
   837         p.wait()
   838         self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
   840     def test_terminate(self):
   841         sproc = get_test_subprocess(wait=True)
   842         test_pid = sproc.pid
   843         p = psutil.Process(test_pid)
   844         name = p.name
   845         p.terminate()
   846         p.wait()
   847         self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
   849     def test_send_signal(self):
   850         if POSIX:
   851             sig = signal.SIGKILL
   852         else:
   853             sig = signal.SIGTERM
   854         sproc = get_test_subprocess()
   855         test_pid = sproc.pid
   856         p = psutil.Process(test_pid)
   857         name = p.name
   858         p.send_signal(sig)
   859         p.wait()
   860         self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
   862     def test_wait(self):
   863         # check exit code signal
   864         sproc = get_test_subprocess()
   865         p = psutil.Process(sproc.pid)
   866         p.kill()
   867         code = p.wait()
   868         if os.name == 'posix':
   869             self.assertEqual(code, signal.SIGKILL)
   870         else:
   871             self.assertEqual(code, 0)
   872         self.assertFalse(p.is_running())
   874         sproc = get_test_subprocess()
   875         p = psutil.Process(sproc.pid)
   876         p.terminate()
   877         code = p.wait()
   878         if os.name == 'posix':
   879             self.assertEqual(code, signal.SIGTERM)
   880         else:
   881             self.assertEqual(code, 0)
   882         self.assertFalse(p.is_running())
   884         # check sys.exit() code
   885         code = "import time, sys; time.sleep(0.01); sys.exit(5);"
   886         sproc = get_test_subprocess([PYTHON, "-c", code])
   887         p = psutil.Process(sproc.pid)
   888         self.assertEqual(p.wait(), 5)
   889         self.assertFalse(p.is_running())
   891         # Test wait() issued twice.
   892         # It is not supposed to raise NSP when the process is gone.
   893         # On UNIX this should return None, on Windows it should keep
   894         # returning the exit code.
   895         sproc = get_test_subprocess([PYTHON, "-c", code])
   896         p = psutil.Process(sproc.pid)
   897         self.assertEqual(p.wait(), 5)
   898         self.assertIn(p.wait(), (5, None))
   900         # test timeout
   901         sproc = get_test_subprocess()
   902         p = psutil.Process(sproc.pid)
   903         p.name
   904         self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
   906         # timeout < 0 not allowed
   907         self.assertRaises(ValueError, p.wait, -1)
   909     @unittest.skipUnless(POSIX, '')  # XXX why is this skipped on Windows?
   910     def test_wait_non_children(self):
   911         # test wait() against processes which are not our children
   912         code = "import sys;"
   913         code += "from subprocess import Popen, PIPE;"
   914         code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON
   915         code += "sp = Popen(cmd, stdout=PIPE);"
   916         code += "sys.stdout.write(str(sp.pid));"
   917         sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
   919         grandson_pid = int(sproc.stdout.read())
   920         grandson_proc = psutil.Process(grandson_pid)
   921         try:
   922             self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
   923             grandson_proc.kill()
   924             ret = grandson_proc.wait()
   925             self.assertEqual(ret, None)
   926         finally:
   927             if grandson_proc.is_running():
   928                 grandson_proc.kill()
   929                 grandson_proc.wait()
   931     def test_wait_timeout_0(self):
   932         sproc = get_test_subprocess()
   933         p = psutil.Process(sproc.pid)
   934         self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
   935         p.kill()
   936         stop_at = time.time() + 2
   937         while 1:
   938             try:
   939                 code = p.wait(0)
   940             except psutil.TimeoutExpired:
   941                 if time.time() >= stop_at:
   942                     raise
   943             else:
   944                 break
   945         if os.name == 'posix':
   946             self.assertEqual(code, signal.SIGKILL)
   947         else:
   948             self.assertEqual(code, 0)
   949         self.assertFalse(p.is_running())
   951     def test_cpu_percent(self):
   952         p = psutil.Process(os.getpid())
   953         p.get_cpu_percent(interval=0.001)
   954         p.get_cpu_percent(interval=0.001)
   955         for x in range(100):
   956             percent = p.get_cpu_percent(interval=None)
   957             self.assertIsInstance(percent, float)
   958             self.assertGreaterEqual(percent, 0.0)
   959             if os.name != 'posix':
   960                 self.assertLessEqual(percent, 100.0)
   961             else:
   962                 self.assertGreaterEqual(percent, 0.0)
   964     def test_cpu_times(self):
   965         times = psutil.Process(os.getpid()).get_cpu_times()
   966         assert (times.user > 0.0) or (times.system > 0.0), times
   967         # make sure returned values can be pretty printed with strftime
   968         time.strftime("%H:%M:%S", time.localtime(times.user))
   969         time.strftime("%H:%M:%S", time.localtime(times.system))
   971     # Test Process.cpu_times() against os.times()
   972     # os.times() is broken on Python 2.6
   973     # http://bugs.python.org/issue1040026
   974     # XXX fails on OSX: not sure if it's for os.times(). We should
   975     # try this with Python 2.7 and re-enable the test.
   977     @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX,
   978                          'os.times() is not reliable on this Python version')
   979     def test_cpu_times2(self):
   980         user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
   981         utime, ktime = os.times()[:2]
   983         # Use os.times()[:2] as base values to compare our results
   984         # using a tolerance  of +/- 0.1 seconds.
   985         # It will fail if the difference between the values is > 0.1s.
   986         if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
   987             self.fail("expected: %s, found: %s" %(utime, user_time))
   989         if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
   990             self.fail("expected: %s, found: %s" %(ktime, kernel_time))
   992     def test_create_time(self):
   993         sproc = get_test_subprocess(wait=True)
   994         now = time.time()
   995         p = psutil.Process(sproc.pid)
   996         create_time = p.create_time
   998         # Use time.time() as base value to compare our result using a
   999         # tolerance of +/- 1 second.
  1000         # It will fail if the difference between the values is > 2s.
  1001         difference = abs(create_time - now)
  1002         if difference > 2:
  1003             self.fail("expected: %s, found: %s, difference: %s"
  1004                       % (now, create_time, difference))
  1006         # make sure returned value can be pretty printed with strftime
  1007         time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
  1009     @unittest.skipIf(WINDOWS, 'windows only')
  1010     def test_terminal(self):
  1011         terminal = psutil.Process(os.getpid()).terminal
  1012         if sys.stdin.isatty():
  1013             self.assertEqual(terminal, sh('tty'))
  1014         else:
  1015             assert terminal, repr(terminal)
  1017     @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'),
  1018                      'not available on this platform')
  1019     @skip_on_not_implemented(only_if=LINUX)
  1020     def test_get_io_counters(self):
  1021         p = psutil.Process(os.getpid())
  1022         # test reads
  1023         io1 = p.get_io_counters()
  1024         f = open(PYTHON, 'rb')
  1025         f.read()
  1026         f.close()
  1027         io2 = p.get_io_counters()
  1028         if not BSD:
  1029             assert io2.read_count > io1.read_count, (io1, io2)
  1030             self.assertEqual(io2.write_count, io1.write_count)
  1031         assert io2.read_bytes >= io1.read_bytes, (io1, io2)
  1032         assert io2.write_bytes >= io1.write_bytes, (io1, io2)
  1033         # test writes
  1034         io1 = p.get_io_counters()
  1035         f = tempfile.TemporaryFile()
  1036         if PY3:
  1037             f.write(bytes("x" * 1000000, 'ascii'))
  1038         else:
  1039             f.write("x" * 1000000)
  1040         f.close()
  1041         io2 = p.get_io_counters()
  1042         assert io2.write_count >= io1.write_count, (io1, io2)
  1043         assert io2.write_bytes >= io1.write_bytes, (io1, io2)
  1044         assert io2.read_count >= io1.read_count, (io1, io2)
  1045         assert io2.read_bytes >= io1.read_bytes, (io1, io2)
  1047     # Linux and Windows Vista+
  1048     @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'),
  1049                          'Linux and Windows Vista only')
  1050     def test_get_set_ionice(self):
  1051         if LINUX:
  1052             from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
  1053                                 IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
  1054             self.assertEqual(IOPRIO_CLASS_NONE, 0)
  1055             self.assertEqual(IOPRIO_CLASS_RT, 1)
  1056             self.assertEqual(IOPRIO_CLASS_BE, 2)
  1057             self.assertEqual(IOPRIO_CLASS_IDLE, 3)
  1058             p = psutil.Process(os.getpid())
  1059             try:
  1060                 p.set_ionice(2)
  1061                 ioclass, value = p.get_ionice()
  1062                 self.assertEqual(ioclass, 2)
  1063                 self.assertEqual(value, 4)
  1065                 p.set_ionice(3)
  1066                 ioclass, value = p.get_ionice()
  1067                 self.assertEqual(ioclass, 3)
  1068                 self.assertEqual(value, 0)
  1070                 p.set_ionice(2, 0)
  1071                 ioclass, value = p.get_ionice()
  1072                 self.assertEqual(ioclass, 2)
  1073                 self.assertEqual(value, 0)
  1074                 p.set_ionice(2, 7)
  1075                 ioclass, value = p.get_ionice()
  1076                 self.assertEqual(ioclass, 2)
  1077                 self.assertEqual(value, 7)
  1078                 self.assertRaises(ValueError, p.set_ionice, 2, 10)
  1079             finally:
  1080                 p.set_ionice(IOPRIO_CLASS_NONE)
  1081         else:
  1082             p = psutil.Process(os.getpid())
  1083             original = p.get_ionice()
  1084             try:
  1085                 value = 0  # very low
  1086                 if original == value:
  1087                     value = 1  # low
  1088                 p.set_ionice(value)
  1089                 self.assertEqual(p.get_ionice(), value)
  1090             finally:
  1091                 p.set_ionice(original)
  1093             self.assertRaises(ValueError, p.set_ionice, 3)
  1094             self.assertRaises(TypeError, p.set_ionice, 2, 1)
  1096     def test_get_num_threads(self):
  1097         # on certain platforms such as Linux we might test for exact
  1098         # thread number, since we always have with 1 thread per process,
  1099         # but this does not apply across all platforms (OSX, Windows)
  1100         p = psutil.Process(os.getpid())
  1101         step1 = p.get_num_threads()
  1103         thread = ThreadTask()
  1104         thread.start()
  1105         try:
  1106             step2 = p.get_num_threads()
  1107             self.assertEqual(step2, step1 + 1)
  1108             thread.stop()
  1109         finally:
  1110             if thread._running:
  1111                 thread.stop()
  1113     @unittest.skipUnless(WINDOWS, 'Windows only')
  1114     def test_get_num_handles(self):
  1115         # a better test is done later into test/_windows.py
  1116         p = psutil.Process(os.getpid())
  1117         self.assertGreater(p.get_num_handles(), 0)
  1119     def test_get_threads(self):
  1120         p = psutil.Process(os.getpid())
  1121         step1 = p.get_threads()
  1123         thread = ThreadTask()
  1124         thread.start()
  1126         try:
  1127             step2 = p.get_threads()
  1128             self.assertEqual(len(step2), len(step1) + 1)
  1129             # on Linux, first thread id is supposed to be this process
  1130             if LINUX:
  1131                 self.assertEqual(step2[0].id, os.getpid())
  1132             athread = step2[0]
  1133             # test named tuple
  1134             self.assertEqual(athread.id, athread[0])
  1135             self.assertEqual(athread.user_time, athread[1])
  1136             self.assertEqual(athread.system_time, athread[2])
  1137             # test num threads
  1138             thread.stop()
  1139         finally:
  1140             if thread._running:
  1141                 thread.stop()
  1143     def test_get_memory_info(self):
  1144         p = psutil.Process(os.getpid())
  1146         # step 1 - get a base value to compare our results
  1147         rss1, vms1 = p.get_memory_info()
  1148         percent1 = p.get_memory_percent()
  1149         self.assertGreater(rss1, 0)
  1150         self.assertGreater(vms1, 0)
  1152         # step 2 - allocate some memory
  1153         memarr = [None] * 1500000
  1155         rss2, vms2 = p.get_memory_info()
  1156         percent2 = p.get_memory_percent()
  1157         # make sure that the memory usage bumped up
  1158         self.assertGreater(rss2, rss1)
  1159         self.assertGreaterEqual(vms2, vms1)  # vms might be equal
  1160         self.assertGreater(percent2, percent1)
  1161         del memarr
  1163 #    def test_get_ext_memory_info(self):
  1164 #       # tested later in fetch all test suite
  1166     def test_get_memory_maps(self):
  1167         p = psutil.Process(os.getpid())
  1168         maps = p.get_memory_maps()
  1169         paths = [x for x in maps]
  1170         self.assertEqual(len(paths), len(set(paths)))
  1171         ext_maps = p.get_memory_maps(grouped=False)
  1173         for nt in maps:
  1174             if not nt.path.startswith('['):
  1175                 assert os.path.isabs(nt.path), nt.path
  1176                 if POSIX:
  1177                     assert os.path.exists(nt.path), nt.path
  1178                 else:
  1179                     # XXX - On Windows we have this strange behavior with
  1180                     # 64 bit dlls: they are visible via explorer but cannot
  1181                     # be accessed via os.stat() (wtf?).
  1182                     if '64' not in os.path.basename(nt.path):
  1183                         assert os.path.exists(nt.path), nt.path
  1184         for nt in ext_maps:
  1185             for fname in nt._fields:
  1186                 value = getattr(nt, fname)
  1187                 if fname == 'path':
  1188                     continue
  1189                 elif fname in ('addr', 'perms'):
  1190                     assert value, value
  1191                 else:
  1192                     self.assertIsInstance(value, (int, long))
  1193                     assert value >= 0, value
  1195     def test_get_memory_percent(self):
  1196         p = psutil.Process(os.getpid())
  1197         self.assertGreater(p.get_memory_percent(), 0.0)
  1199     def test_pid(self):
  1200         sproc = get_test_subprocess()
  1201         self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
  1203     def test_is_running(self):
  1204         sproc = get_test_subprocess(wait=True)
  1205         p = psutil.Process(sproc.pid)
  1206         assert p.is_running()
  1207         assert p.is_running()
  1208         p.kill()
  1209         p.wait()
  1210         assert not p.is_running()
  1211         assert not p.is_running()
  1213     def test_exe(self):
  1214         sproc = get_test_subprocess(wait=True)
  1215         exe = psutil.Process(sproc.pid).exe
  1216         try:
  1217             self.assertEqual(exe, PYTHON)
  1218         except AssertionError:
  1219             if WINDOWS and len(exe) == len(PYTHON):
  1220                 # on Windows we don't care about case sensitivity
  1221                 self.assertEqual(exe.lower(), PYTHON.lower())
  1222             else:
  1223                 # certain platforms such as BSD are more accurate returning:
  1224                 # "/usr/local/bin/python2.7"
  1225                 # ...instead of:
  1226                 # "/usr/local/bin/python"
  1227                 # We do not want to consider this difference in accuracy
  1228                 # an error.
  1229                 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
  1230                 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
  1232     def test_cmdline(self):
  1233         cmdline = [PYTHON, "-c", "import time; time.sleep(2)"]
  1234         sproc = get_test_subprocess(cmdline, wait=True)
  1235         self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline),
  1236                          ' '.join(cmdline))
  1238     def test_name(self):
  1239         sproc = get_test_subprocess(PYTHON, wait=True)
  1240         name = psutil.Process(sproc.pid).name.lower()
  1241         pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
  1242         assert pyexe.startswith(name), (pyexe, name)
  1244     @unittest.skipUnless(POSIX, 'posix only')
  1245     def test_uids(self):
  1246         p = psutil.Process(os.getpid())
  1247         real, effective, saved = p.uids
  1248         # os.getuid() refers to "real" uid
  1249         self.assertEqual(real, os.getuid())
  1250         # os.geteuid() refers to "effective" uid
  1251         self.assertEqual(effective, os.geteuid())
  1252         # no such thing as os.getsuid() ("saved" uid), but starting
  1253         # from python 2.7 we have os.getresuid()[2]
  1254         if hasattr(os, "getresuid"):
  1255             self.assertEqual(saved, os.getresuid()[2])
  1257     @unittest.skipUnless(POSIX, 'posix only')
  1258     def test_gids(self):
  1259         p = psutil.Process(os.getpid())
  1260         real, effective, saved = p.gids
  1261         # os.getuid() refers to "real" uid
  1262         self.assertEqual(real, os.getgid())
  1263         # os.geteuid() refers to "effective" uid
  1264         self.assertEqual(effective, os.getegid())
  1265         # no such thing as os.getsuid() ("saved" uid), but starting
  1266         # from python 2.7 we have os.getresgid()[2]
  1267         if hasattr(os, "getresuid"):
  1268             self.assertEqual(saved, os.getresgid()[2])
  1270     def test_nice(self):
  1271         p = psutil.Process(os.getpid())
  1272         self.assertRaises(TypeError, p.set_nice, "str")
  1273         if os.name == 'nt':
  1274             try:
  1275                 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
  1276                 p.set_nice(psutil.HIGH_PRIORITY_CLASS)
  1277                 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS)
  1278                 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
  1279                 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
  1280             finally:
  1281                 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
  1282         else:
  1283             try:
  1284                 try:
  1285                     first_nice = p.get_nice()
  1286                     p.set_nice(1)
  1287                     self.assertEqual(p.get_nice(), 1)
  1288                     # going back to previous nice value raises AccessDenied on OSX
  1289                     if not OSX:
  1290                         p.set_nice(0)
  1291                         self.assertEqual(p.get_nice(), 0)
  1292                 except psutil.AccessDenied:
  1293                     pass
  1294             finally:
  1295                 try:
  1296                     p.set_nice(first_nice)
  1297                 except psutil.AccessDenied:
  1298                     pass
  1300     def test_status(self):
  1301         p = psutil.Process(os.getpid())
  1302         self.assertEqual(p.status, psutil.STATUS_RUNNING)
  1303         self.assertEqual(str(p.status), "running")
  1305     def test_status_constants(self):
  1306         # STATUS_* constants are supposed to be comparable also by
  1307         # using their str representation
  1308         self.assertTrue(psutil.STATUS_RUNNING == 0)
  1309         self.assertTrue(psutil.STATUS_RUNNING == long(0))
  1310         self.assertTrue(psutil.STATUS_RUNNING == 'running')
  1311         self.assertFalse(psutil.STATUS_RUNNING == 1)
  1312         self.assertFalse(psutil.STATUS_RUNNING == 'sleeping')
  1313         self.assertFalse(psutil.STATUS_RUNNING != 0)
  1314         self.assertFalse(psutil.STATUS_RUNNING != 'running')
  1315         self.assertTrue(psutil.STATUS_RUNNING != 1)
  1316         self.assertTrue(psutil.STATUS_RUNNING != 'sleeping')
  1318     def test_username(self):
  1319         sproc = get_test_subprocess()
  1320         p = psutil.Process(sproc.pid)
  1321         if POSIX:
  1322             import pwd
  1323             self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
  1324         elif WINDOWS and 'USERNAME' in os.environ:
  1325             expected_username = os.environ['USERNAME']
  1326             expected_domain = os.environ['USERDOMAIN']
  1327             domain, username = p.username.split('\\')
  1328             self.assertEqual(domain, expected_domain)
  1329             self.assertEqual(username, expected_username)
  1330         else:
  1331             p.username
  1333     @unittest.skipUnless(hasattr(psutil.Process, "getcwd"),
  1334                          'not available on this platform')
  1335     def test_getcwd(self):
  1336         sproc = get_test_subprocess(wait=True)
  1337         p = psutil.Process(sproc.pid)
  1338         self.assertEqual(p.getcwd(), os.getcwd())
  1340     @unittest.skipIf(not hasattr(psutil.Process, "getcwd"),
  1341                      'not available on this platform')
  1342     def test_getcwd_2(self):
  1343         cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"]
  1344         sproc = get_test_subprocess(cmd, wait=True)
  1345         p = psutil.Process(sproc.pid)
  1346         call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1)
  1348     @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"),
  1349                      'not available on this platform')
  1350     def test_cpu_affinity(self):
  1351         p = psutil.Process(os.getpid())
  1352         initial = p.get_cpu_affinity()
  1353         all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
  1355         for n in all_cpus:
  1356             p.set_cpu_affinity([n])
  1357             self.assertEqual(p.get_cpu_affinity(), [n])
  1359         p.set_cpu_affinity(all_cpus)
  1360         self.assertEqual(p.get_cpu_affinity(), all_cpus)
  1362         p.set_cpu_affinity(initial)
  1363         invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
  1364         self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu)
  1366     def test_get_open_files(self):
  1367         # current process
  1368         p = psutil.Process(os.getpid())
  1369         files = p.get_open_files()
  1370         self.assertFalse(TESTFN in files)
  1371         f = open(TESTFN, 'w')
  1372         call_until(p.get_open_files, "len(ret) != %i" % len(files))
  1373         filenames = [x.path for x in p.get_open_files()]
  1374         self.assertIn(TESTFN, filenames)
  1375         f.close()
  1376         for file in filenames:
  1377             assert os.path.isfile(file), file
  1379         # another process
  1380         cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN
  1381         sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
  1382         p = psutil.Process(sproc.pid)
  1383         for x in range(100):
  1384             filenames = [x.path for x in p.get_open_files()]
  1385             if TESTFN in filenames:
  1386                 break
  1387             time.sleep(.01)
  1388         else:
  1389             self.assertIn(TESTFN, filenames)
  1390         for file in filenames:
  1391             assert os.path.isfile(file), file
  1393     def test_get_open_files2(self):
  1394         # test fd and path fields
  1395         fileobj = open(TESTFN, 'w')
  1396         p = psutil.Process(os.getpid())
  1397         for path, fd in p.get_open_files():
  1398             if path == fileobj.name or fd == fileobj.fileno():
  1399                 break
  1400         else:
  1401             self.fail("no file found; files=%s" % repr(p.get_open_files()))
  1402         self.assertEqual(path, fileobj.name)
  1403         if WINDOWS:
  1404             self.assertEqual(fd, -1)
  1405         else:
  1406             self.assertEqual(fd, fileobj.fileno())
  1407         # test positions
  1408         ntuple = p.get_open_files()[0]
  1409         self.assertEqual(ntuple[0], ntuple.path)
  1410         self.assertEqual(ntuple[1], ntuple.fd)
  1411         # test file is gone
  1412         fileobj.close()
  1413         self.assertTrue(fileobj.name not in p.get_open_files())
  1415     def test_connection_constants(self):
  1416         ints = []
  1417         strs = []
  1418         for name in dir(psutil):
  1419             if name.startswith('CONN_'):
  1420                 num = getattr(psutil, name)
  1421                 str_ = str(num)
  1422                 assert str_.isupper(), str_
  1423                 assert str_ not in strs, str_
  1424                 assert num not in ints, num
  1425                 ints.append(num)
  1426                 strs.append(str_)
  1427         if SUNOS:
  1428             psutil.CONN_IDLE
  1429             psutil.CONN_BOUND
  1430         if WINDOWS:
  1431             psutil.CONN_DELETE_TCB
  1433     def test_get_connections(self):
  1434         arg = "import socket, time;" \
  1435               "s = socket.socket();" \
  1436               "s.bind(('127.0.0.1', 0));" \
  1437               "s.listen(1);" \
  1438               "conn, addr = s.accept();" \
  1439               "time.sleep(2);"
  1440         sproc = get_test_subprocess([PYTHON, "-c", arg])
  1441         p = psutil.Process(sproc.pid)
  1442         for x in range(100):
  1443             if p.get_connections():
  1444                 # give the subprocess some more time to bind()
  1445                 time.sleep(.01)
  1446                 cons = p.get_connections()
  1447                 break
  1448             time.sleep(.01)
  1449         self.assertEqual(len(cons), 1)
  1450         con = cons[0]
  1451         self.assertEqual(con.family, socket.AF_INET)
  1452         self.assertEqual(con.type, socket.SOCK_STREAM)
  1453         self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status))
  1454         ip, port = con.laddr
  1455         self.assertEqual(ip, '127.0.0.1')
  1456         self.assertEqual(con.raddr, ())
  1457         if WINDOWS or SUNOS:
  1458             self.assertEqual(con.fd, -1)
  1459         else:
  1460             assert con.fd > 0, con
  1461         # test positions
  1462         self.assertEqual(con[0], con.fd)
  1463         self.assertEqual(con[1], con.family)
  1464         self.assertEqual(con[2], con.type)
  1465         self.assertEqual(con[3], con.laddr)
  1466         self.assertEqual(con[4], con.raddr)
  1467         self.assertEqual(con[5], con.status)
  1468         # test kind arg
  1469         self.assertRaises(ValueError, p.get_connections, 'foo')
  1471     @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported')
  1472     def test_get_connections_ipv6(self):
  1473         s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  1474         s.bind(('::1', 0))
  1475         s.listen(1)
  1476         cons = psutil.Process(os.getpid()).get_connections()
  1477         s.close()
  1478         self.assertEqual(len(cons), 1)
  1479         self.assertEqual(cons[0].laddr[0], '::1')
  1481     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported')
  1482     def test_get_connections_unix(self):
  1483         # tcp
  1484         safe_remove(TESTFN)
  1485         sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  1486         sock.bind(TESTFN)
  1487         conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
  1488         if conn.fd != -1:  # != sunos and windows
  1489             self.assertEqual(conn.fd, sock.fileno())
  1490         self.assertEqual(conn.family, socket.AF_UNIX)
  1491         self.assertEqual(conn.type, socket.SOCK_STREAM)
  1492         self.assertEqual(conn.laddr, TESTFN)
  1493         self.assertTrue(not conn.raddr)
  1494         self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status))
  1495         sock.close()
  1496         # udp
  1497         safe_remove(TESTFN)
  1498         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
  1499         sock.bind(TESTFN)
  1500         conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
  1501         self.assertEqual(conn.type, socket.SOCK_DGRAM)
  1502         sock.close()
  1504     @unittest.skipUnless(hasattr(socket, "fromfd"),
  1505                          'socket.fromfd() is not availble')
  1506     @unittest.skipIf(WINDOWS or SUNOS,
  1507                      'connection fd available on this platform')
  1508     def test_connection_fromfd(self):
  1509         sock = socket.socket()
  1510         sock.bind(('localhost', 0))
  1511         sock.listen(1)
  1512         p = psutil.Process(os.getpid())
  1513         for conn in p.get_connections():
  1514             if conn.fd == sock.fileno():
  1515                 break
  1516         else:
  1517             sock.close()
  1518             self.fail("couldn't find socket fd")
  1519         dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
  1520         try:
  1521             self.assertEqual(dupsock.getsockname(), conn.laddr)
  1522             self.assertNotEqual(sock.fileno(), dupsock.fileno())
  1523         finally:
  1524             sock.close()
  1525             dupsock.close()
  1527     def test_get_connections_all(self):
  1528         tcp_template = "import socket;" \
  1529                        "s = socket.socket($family, socket.SOCK_STREAM);" \
  1530                        "s.bind(('$addr', 0));" \
  1531                        "s.listen(1);" \
  1532                        "conn, addr = s.accept();"
  1534         udp_template = "import socket, time;" \
  1535                        "s = socket.socket($family, socket.SOCK_DGRAM);" \
  1536                        "s.bind(('$addr', 0));" \
  1537                        "time.sleep(2);"
  1539         from string import Template
  1540         tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
  1541                                                           addr="127.0.0.1")
  1542         udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
  1543                                                           addr="127.0.0.1")
  1544         tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
  1545                                                           addr="::1")
  1546         udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
  1547                                                           addr="::1")
  1549         # launch various subprocess instantiating a socket of various
  1550         # families and types to enrich psutil results
  1551         tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
  1552         udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
  1553         if supports_ipv6():
  1554             tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
  1555             udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
  1556         else:
  1557             tcp6_proc = None
  1558             udp6_proc = None
  1560         # check matches against subprocesses just created
  1561         all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
  1562                      "udp", "udp4", "udp6")
  1563         for p in psutil.Process(os.getpid()).get_children():
  1564             for conn in p.get_connections():
  1565                 # TCP v4
  1566                 if p.pid == tcp4_proc.pid:
  1567                     self.assertEqual(conn.family, socket.AF_INET)
  1568                     self.assertEqual(conn.type, socket.SOCK_STREAM)
  1569                     self.assertEqual(conn.laddr[0], "127.0.0.1")
  1570                     self.assertEqual(conn.raddr, ())
  1571                     self.assertEqual(conn.status, psutil.CONN_LISTEN,
  1572                                     str(conn.status))
  1573                     for kind in all_kinds:
  1574                         cons = p.get_connections(kind=kind)
  1575                         if kind in ("all", "inet", "inet4", "tcp", "tcp4"):
  1576                             assert cons != [], cons
  1577                         else:
  1578                             self.assertEqual(cons, [], cons)
  1579                 # UDP v4
  1580                 elif p.pid == udp4_proc.pid:
  1581                     self.assertEqual(conn.family, socket.AF_INET)
  1582                     self.assertEqual(conn.type, socket.SOCK_DGRAM)
  1583                     self.assertEqual(conn.laddr[0], "127.0.0.1")
  1584                     self.assertEqual(conn.raddr, ())
  1585                     self.assertEqual(conn.status, psutil.CONN_NONE,
  1586                                      str(conn.status))
  1587                     for kind in all_kinds:
  1588                         cons = p.get_connections(kind=kind)
  1589                         if kind in ("all", "inet", "inet4", "udp", "udp4"):
  1590                             assert cons != [], cons
  1591                         else:
  1592                             self.assertEqual(cons, [], cons)
  1593                 # TCP v6
  1594                 elif p.pid == getattr(tcp6_proc, "pid", None):
  1595                     self.assertEqual(conn.family, socket.AF_INET6)
  1596                     self.assertEqual(conn.type, socket.SOCK_STREAM)
  1597                     self.assertIn(conn.laddr[0], ("::", "::1"))
  1598                     self.assertEqual(conn.raddr, ())
  1599                     self.assertEqual(conn.status, psutil.CONN_LISTEN,
  1600                                      str(conn.status))
  1601                     for kind in all_kinds:
  1602                         cons = p.get_connections(kind=kind)
  1603                         if kind in ("all", "inet", "inet6", "tcp", "tcp6"):
  1604                             assert cons != [], cons
  1605                         else:
  1606                             self.assertEqual(cons, [], cons)
  1607                 # UDP v6
  1608                 elif p.pid == getattr(udp6_proc, "pid", None):
  1609                     self.assertEqual(conn.family, socket.AF_INET6)
  1610                     self.assertEqual(conn.type, socket.SOCK_DGRAM)
  1611                     self.assertIn(conn.laddr[0], ("::", "::1"))
  1612                     self.assertEqual(conn.raddr, ())
  1613                     self.assertEqual(conn.status, psutil.CONN_NONE,
  1614                                      str(conn.status))
  1615                     for kind in all_kinds:
  1616                         cons = p.get_connections(kind=kind)
  1617                         if kind in ("all", "inet", "inet6", "udp", "udp6"):
  1618                             assert cons != [], cons
  1619                         else:
  1620                             self.assertEqual(cons, [], cons)
  1622     @unittest.skipUnless(POSIX, 'posix only')
  1623     def test_get_num_fds(self):
  1624         p = psutil.Process(os.getpid())
  1625         start = p.get_num_fds()
  1626         file = open(TESTFN, 'w')
  1627         self.assertEqual(p.get_num_fds(), start + 1)
  1628         sock = socket.socket()
  1629         self.assertEqual(p.get_num_fds(), start + 2)
  1630         file.close()
  1631         sock.close()
  1632         self.assertEqual(p.get_num_fds(), start)
  1634     @skip_on_not_implemented(only_if=LINUX)
  1635     def test_get_num_ctx_switches(self):
  1636         p = psutil.Process(os.getpid())
  1637         before = sum(p.get_num_ctx_switches())
  1638         for x in range(500000):
  1639             after = sum(p.get_num_ctx_switches())
  1640             if after > before:
  1641                 return
  1642         self.fail("num ctx switches still the same after 50.000 iterations")
  1644     def test_parent_ppid(self):
  1645         this_parent = os.getpid()
  1646         sproc = get_test_subprocess()
  1647         p = psutil.Process(sproc.pid)
  1648         self.assertEqual(p.ppid, this_parent)
  1649         self.assertEqual(p.parent.pid, this_parent)
  1650         # no other process is supposed to have us as parent
  1651         for p in psutil.process_iter():
  1652             if p.pid == sproc.pid:
  1653                 continue
  1654             self.assertTrue(p.ppid != this_parent)
  1656     def test_get_children(self):
  1657         p = psutil.Process(os.getpid())
  1658         self.assertEqual(p.get_children(), [])
  1659         self.assertEqual(p.get_children(recursive=True), [])
  1660         sproc = get_test_subprocess()
  1661         children1 = p.get_children()
  1662         children2 = p.get_children(recursive=True)
  1663         for children in (children1, children2):
  1664             self.assertEqual(len(children), 1)
  1665             self.assertEqual(children[0].pid, sproc.pid)
  1666             self.assertEqual(children[0].ppid, os.getpid())
  1668     def test_get_children_recursive(self):
  1669         # here we create a subprocess which creates another one as in:
  1670         # A (parent) -> B (child) -> C (grandchild)
  1671         s =  "import subprocess, os, sys, time;"
  1672         s += "PYTHON = os.path.realpath(sys.executable);"
  1673         s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];"
  1674         s += "subprocess.Popen(cmd);"
  1675         s += "time.sleep(2);"
  1676         get_test_subprocess(cmd=[PYTHON, "-c", s])
  1677         p = psutil.Process(os.getpid())
  1678         self.assertEqual(len(p.get_children(recursive=False)), 1)
  1679         # give the grandchild some time to start
  1680         stop_at = time.time() + 1.5
  1681         while time.time() < stop_at:
  1682             children = p.get_children(recursive=True)
  1683             if len(children) > 1:
  1684                 break
  1685         self.assertEqual(len(children), 2)
  1686         self.assertEqual(children[0].ppid, os.getpid())
  1687         self.assertEqual(children[1].ppid, children[0].pid)
  1689     def test_get_children_duplicates(self):
  1690         # find the process which has the highest number of children
  1691         from psutil._compat import defaultdict
  1692         table = defaultdict(int)
  1693         for p in psutil.process_iter():
  1694             try:
  1695                 table[p.ppid] += 1
  1696             except psutil.Error:
  1697                 pass
  1698         # this is the one, now let's make sure there are no duplicates
  1699         pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
  1700         p = psutil.Process(pid)
  1701         try:
  1702             c = p.get_children(recursive=True)
  1703         except psutil.AccessDenied:  # windows
  1704             pass
  1705         else:
  1706             self.assertEqual(len(c), len(set(c)))
  1708     def test_suspend_resume(self):
  1709         sproc = get_test_subprocess(wait=True)
  1710         p = psutil.Process(sproc.pid)
  1711         p.suspend()
  1712         for x in range(100):
  1713             if p.status == psutil.STATUS_STOPPED:
  1714                 break
  1715             time.sleep(0.01)
  1716         self.assertEqual(str(p.status), "stopped")
  1717         p.resume()
  1718         assert p.status != psutil.STATUS_STOPPED, p.status
  1720     def test_invalid_pid(self):
  1721         self.assertRaises(TypeError, psutil.Process, "1")
  1722         self.assertRaises(TypeError, psutil.Process, None)
  1723         self.assertRaises(ValueError, psutil.Process, -1)
  1725     def test_as_dict(self):
  1726         sproc = get_test_subprocess()
  1727         p = psutil.Process(sproc.pid)
  1728         d = p.as_dict()
  1729         try:
  1730             import json
  1731         except ImportError:
  1732             pass
  1733         else:
  1734             # dict is supposed to be hashable
  1735             json.dumps(d)
  1737         d = p.as_dict(attrs=['exe', 'name'])
  1738         self.assertEqual(sorted(d.keys()), ['exe', 'name'])
  1740         p = psutil.Process(min(psutil.get_pid_list()))
  1741         d = p.as_dict(attrs=['get_connections'], ad_value='foo')
  1742         if not isinstance(d['connections'], list):
  1743             self.assertEqual(d['connections'], 'foo')
  1745     def test_zombie_process(self):
  1746         # Test that NoSuchProcess exception gets raised in case the
  1747         # process dies after we create the Process object.
  1748         # Example:
  1749         #  >>> proc = Process(1234)
  1750         #  >>> time.sleep(2)  # time-consuming task, process dies in meantime
  1751         #  >>> proc.name
  1752         # Refers to Issue #15
  1753         sproc = get_test_subprocess()
  1754         p = psutil.Process(sproc.pid)
  1755         p.kill()
  1756         p.wait()
  1758         for name in dir(p):
  1759             if name.startswith('_')\
  1760             or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
  1761                         'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
  1762                         'nice'):
  1763                 continue
  1764             try:
  1765                 meth = getattr(p, name)
  1766                 if callable(meth):
  1767                     meth()
  1768             except psutil.NoSuchProcess:
  1769                 pass
  1770             except NotImplementedError:
  1771                 pass
  1772             else:
  1773                 self.fail("NoSuchProcess exception not raised for %r" % name)
  1775         # other methods
  1776         try:
  1777             if os.name == 'posix':
  1778                 p.set_nice(1)
  1779             else:
  1780                 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
  1781         except psutil.NoSuchProcess:
  1782             pass
  1783         else:
  1784             self.fail("exception not raised")
  1785         if hasattr(p, 'set_ionice'):
  1786             self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
  1787         self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
  1788         self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0)
  1789         self.assertFalse(p.is_running())
  1790         if hasattr(p, "set_cpu_affinity"):
  1791             self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0])
  1793     def test__str__(self):
  1794         sproc = get_test_subprocess()
  1795         p = psutil.Process(sproc.pid)
  1796         self.assertIn(str(sproc.pid), str(p))
  1797         # python shows up as 'Python' in cmdline on OS X so test fails on OS X
  1798         if not OSX:
  1799             self.assertIn(os.path.basename(PYTHON), str(p))
  1800         sproc = get_test_subprocess()
  1801         p = psutil.Process(sproc.pid)
  1802         p.kill()
  1803         p.wait()
  1804         self.assertIn(str(sproc.pid), str(p))
  1805         self.assertIn("terminated", str(p))
  1807     @unittest.skipIf(LINUX, 'PID 0 not available on Linux')
  1808     def test_pid_0(self):
  1809         # Process(0) is supposed to work on all platforms except Linux
  1810         p = psutil.Process(0)
  1811         self.assertTrue(p.name)
  1813         if os.name == 'posix':
  1814             try:
  1815                 self.assertEqual(p.uids.real, 0)
  1816                 self.assertEqual(p.gids.real, 0)
  1817             except psutil.AccessDenied:
  1818                 pass
  1820         self.assertIn(p.ppid, (0, 1))
  1821         #self.assertEqual(p.exe, "")
  1822         p.cmdline
  1823         try:
  1824             p.get_num_threads()
  1825         except psutil.AccessDenied:
  1826             pass
  1828         try:
  1829             p.get_memory_info()
  1830         except psutil.AccessDenied:
  1831             pass
  1833         # username property
  1834         try:
  1835             if POSIX:
  1836                 self.assertEqual(p.username, 'root')
  1837             elif WINDOWS:
  1838                 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
  1839             else:
  1840                 p.username
  1841         except psutil.AccessDenied:
  1842             pass
  1844         self.assertIn(0, psutil.get_pid_list())
  1845         self.assertTrue(psutil.pid_exists(0))
  1847     def test__all__(self):
  1848         for name in dir(psutil):
  1849             if name in ('callable', 'defaultdict', 'error', 'namedtuple',
  1850                         'test'):
  1851                 continue
  1852             if not name.startswith('_'):
  1853                 try:
  1854                     __import__(name)
  1855                 except ImportError:
  1856                     if name not in psutil.__all__:
  1857                         fun = getattr(psutil, name)
  1858                         if fun is None:
  1859                             continue
  1860                         if 'deprecated' not in fun.__doc__.lower():
  1861                             self.fail('%r not in psutil.__all__' % name)
  1863     def test_Popen(self):
  1864         # Popen class test
  1865         # XXX this test causes a ResourceWarning on Python 3 because
  1866         # psutil.__subproc instance doesn't get propertly freed.
  1867         # Not sure what to do though.
  1868         cmd = [PYTHON, "-c", "import time; time.sleep(2);"]
  1869         proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  1870         try:
  1871             proc.name
  1872             proc.stdin
  1873             self.assertTrue(hasattr(proc, 'name'))
  1874             self.assertTrue(hasattr(proc, 'stdin'))
  1875             self.assertRaises(AttributeError, getattr, proc, 'foo')
  1876         finally:
  1877             proc.kill()
  1878             proc.wait()
  1881 # ===================================================================
  1882 # --- Featch all processes test
  1883 # ===================================================================
  1885 class TestFetchAllProcesses(unittest.TestCase):
  1886     # Iterates over all running processes and performs some sanity
  1887     # checks against Process API's returned values.
  1889     def setUp(self):
  1890         if POSIX:
  1891             import pwd
  1892             pall = pwd.getpwall()
  1893             self._uids = set([x.pw_uid for x in pall])
  1894             self._usernames = set([x.pw_name for x in pall])
  1896     def test_fetch_all(self):
  1897         valid_procs = 0
  1898         excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
  1899                           'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
  1900                           'parent', 'get_children', 'pid']
  1901         attrs = []
  1902         for name in dir(psutil.Process):
  1903             if name.startswith("_"):
  1904                 continue
  1905             if name.startswith("set_"):
  1906                 continue
  1907             if name in excluded_names:
  1908                 continue
  1909             attrs.append(name)
  1911         default = object()
  1912         failures = []
  1913         for name in attrs:
  1914             for p in psutil.process_iter():
  1915                 ret = default
  1916                 try:
  1917                     try:
  1918                         attr = getattr(p, name, None)
  1919                         if attr is not None and callable(attr):
  1920                             ret = attr()
  1921                         else:
  1922                             ret = attr
  1923                         valid_procs += 1
  1924                     except NotImplementedError:
  1925                         register_warning("%r was skipped because not "
  1926                             "implemented" % (self.__class__.__name__ + \
  1927                                              '.test_' + name))
  1928                     except (psutil.NoSuchProcess, psutil.AccessDenied):
  1929                         err = sys.exc_info()[1]
  1930                         if isinstance(err, psutil.NoSuchProcess):
  1931                             if psutil.pid_exists(p.pid):
  1932                                 # XXX race condition; we probably need
  1933                                 # to try figuring out the process
  1934                                 # identity before failing
  1935                                 self.fail("PID still exists but fun raised " \
  1936                                           "NoSuchProcess")
  1937                         self.assertEqual(err.pid, p.pid)
  1938                         if err.name:
  1939                             # make sure exception's name attr is set
  1940                             # with the actual process name
  1941                             self.assertEqual(err.name, p.name)
  1942                         self.assertTrue(str(err))
  1943                         self.assertTrue(err.msg)
  1944                     else:
  1945                         if ret not in (0, 0.0, [], None, ''):
  1946                             assert ret, ret
  1947                         meth = getattr(self, name)
  1948                         meth(ret)
  1949                 except Exception:
  1950                     err = sys.exc_info()[1]
  1951                     s = '\n' + '=' * 70 + '\n'
  1952                     s += "FAIL: test_%s (proc=%s" % (name, p)
  1953                     if ret != default:
  1954                         s += ", ret=%s)" % repr(ret)
  1955                     s += ')\n'
  1956                     s += '-' * 70
  1957                     s += "\n%s" % traceback.format_exc()
  1958                     s =  "\n".join((" " * 4) + i for i in s.splitlines())
  1959                     failures.append(s)
  1960                     break
  1962         if failures:
  1963             self.fail(''.join(failures))
  1965         # we should always have a non-empty list, not including PID 0 etc.
  1966         # special cases.
  1967         self.assertTrue(valid_procs > 0)
  1969     def cmdline(self, ret):
  1970         pass
  1972     def exe(self, ret):
  1973         if not ret:
  1974             self.assertEqual(ret, '')
  1975         else:
  1976             assert os.path.isabs(ret), ret
  1977             # Note: os.stat() may return False even if the file is there
  1978             # hence we skip the test, see:
  1979             # http://stackoverflow.com/questions/3112546/os-path-exists-lies
  1980             if POSIX:
  1981                 assert os.path.isfile(ret), ret
  1982                 if hasattr(os, 'access') and hasattr(os, "X_OK"):
  1983                     # XXX may fail on OSX
  1984                     self.assertTrue(os.access(ret, os.X_OK))
  1986     def ppid(self, ret):
  1987         self.assertTrue(ret >= 0)
  1989     def name(self, ret):
  1990         self.assertTrue(isinstance(ret, str))
  1991         self.assertTrue(ret)
  1993     def create_time(self, ret):
  1994         self.assertTrue(ret > 0)
  1995         # this can't be taken for granted on all platforms
  1996         #self.assertGreaterEqual(ret, psutil.BOOT_TIME)
  1997         # make sure returned value can be pretty printed
  1998         # with strftime
  1999         time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
  2001     def uids(self, ret):
  2002         for uid in ret:
  2003             self.assertTrue(uid >= 0)
  2004             self.assertIn(uid, self._uids)
  2006     def gids(self, ret):
  2007         # note: testing all gids as above seems not to be reliable for
  2008         # gid == 30 (nodoby); not sure why.
  2009         for gid in ret:
  2010             self.assertTrue(gid >= 0)
  2011             #self.assertIn(uid, self.gids)
  2013     def username(self, ret):
  2014         self.assertTrue(ret)
  2015         if os.name == 'posix':
  2016             self.assertIn(ret, self._usernames)
  2018     def status(self, ret):
  2019         self.assertTrue(ret >= 0)
  2020         self.assertTrue(str(ret) != '?')
  2022     def get_io_counters(self, ret):
  2023         for field in ret:
  2024             if field != -1:
  2025                 self.assertTrue(field >= 0)
  2027     def get_ionice(self, ret):
  2028         if LINUX:
  2029             self.assertTrue(ret.ioclass >= 0)
  2030             self.assertTrue(ret.value >= 0)
  2031         else:
  2032             self.assertTrue(ret >= 0)
  2033             self.assertIn(ret, (0, 1, 2))
  2035     def get_num_threads(self, ret):
  2036         self.assertTrue(ret >= 1)
  2038     def get_threads(self, ret):
  2039         for t in ret:
  2040             self.assertTrue(t.id >= 0)
  2041             self.assertTrue(t.user_time >= 0)
  2042             self.assertTrue(t.system_time >= 0)
  2044     def get_cpu_times(self, ret):
  2045         self.assertTrue(ret.user >= 0)
  2046         self.assertTrue(ret.system >= 0)
  2048     def get_memory_info(self, ret):
  2049         self.assertTrue(ret.rss >= 0)
  2050         self.assertTrue(ret.vms >= 0)
  2052     def get_ext_memory_info(self, ret):
  2053         for name in ret._fields:
  2054             self.assertTrue(getattr(ret, name) >= 0)
  2055         if POSIX and ret.vms != 0:
  2056             # VMS is always supposed to be the highest
  2057             for name in ret._fields:
  2058                 if name != 'vms':
  2059                     value = getattr(ret, name)
  2060                     assert ret.vms > value, ret
  2061         elif WINDOWS:
  2062             assert ret.peak_wset >= ret.wset, ret
  2063             assert ret.peak_paged_pool >= ret.paged_pool, ret
  2064             assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
  2065             assert ret.peak_pagefile >= ret.pagefile, ret
  2067     def get_open_files(self, ret):
  2068         for f in ret:
  2069             if WINDOWS:
  2070                 assert f.fd == -1, f
  2071             else:
  2072                 self.assertIsInstance(f.fd, int)
  2073             assert os.path.isabs(f.path), f
  2074             assert os.path.isfile(f.path), f
  2076     def get_num_fds(self, ret):
  2077         self.assertTrue(ret >= 0)
  2079     def get_connections(self, ret):
  2080         # all values are supposed to match Linux's tcp_states.h states
  2081         # table across all platforms.
  2082         valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
  2083                              "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
  2084                              "LAST_ACK", "LISTEN", "CLOSING", "NONE"]
  2085         if SUNOS:
  2086             valid_conn_states += ["IDLE", "BOUND"]
  2087         if WINDOWS:
  2088             valid_conn_states += ["DELETE_TCB"]
  2089         for conn in ret:
  2090             self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM))
  2091             self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6))
  2092             check_ip_address(conn.laddr, conn.family)
  2093             check_ip_address(conn.raddr, conn.family)
  2094             if conn.status not in valid_conn_states:
  2095                 self.fail("%s is not a valid status" % conn.status)
  2096             # actually try to bind the local socket; ignore IPv6
  2097             # sockets as their address might be represented as
  2098             # an IPv4-mapped-address (e.g. "::127.0.0.1")
  2099             # and that's rejected by bind()
  2100             if conn.family == socket.AF_INET:
  2101                 s = socket.socket(conn.family, conn.type)
  2102                 s.bind((conn.laddr[0], 0))
  2103                 s.close()
  2105             if not WINDOWS and hasattr(socket, 'fromfd'):
  2106                 dupsock = None
  2107                 try:
  2108                     try:
  2109                         dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
  2110                     except (socket.error, OSError):
  2111                         err = sys.exc_info()[1]
  2112                         if err.args[0] == errno.EBADF:
  2113                             continue
  2114                         raise
  2115                     # python >= 2.5
  2116                     if hasattr(dupsock, "family"):
  2117                         self.assertEqual(dupsock.family, conn.family)
  2118                         self.assertEqual(dupsock.type, conn.type)
  2119                 finally:
  2120                     if dupsock is not None:
  2121                         dupsock.close()
  2123     def getcwd(self, ret):
  2124         if ret is not None:  # BSD may return None
  2125             assert os.path.isabs(ret), ret
  2126             try:
  2127                 st = os.stat(ret)
  2128             except OSError:
  2129                 err = sys.exc_info()[1]
  2130                 # directory has been removed in mean time
  2131                 if err.errno != errno.ENOENT:
  2132                     raise
  2133             else:
  2134                 self.assertTrue(stat.S_ISDIR(st.st_mode))
  2136     def get_memory_percent(self, ret):
  2137         assert 0 <= ret <= 100, ret
  2139     def is_running(self, ret):
  2140         self.assertTrue(ret)
  2142     def get_cpu_affinity(self, ret):
  2143         assert ret != [], ret
  2145     def terminal(self, ret):
  2146         if ret is not None:
  2147             assert os.path.isabs(ret), ret
  2148             assert os.path.exists(ret), ret
  2150     def get_memory_maps(self, ret):
  2151         for nt in ret:
  2152             for fname in nt._fields:
  2153                 value = getattr(nt, fname)
  2154                 if fname == 'path':
  2155                     if not value.startswith('['):
  2156                         assert os.path.isabs(nt.path), nt.path
  2157                         # commented as on Linux we might get '/foo/bar (deleted)'
  2158                         #assert os.path.exists(nt.path), nt.path
  2159                 elif fname in ('addr', 'perms'):
  2160                     self.assertTrue(value)
  2161                 else:
  2162                     self.assertIsInstance(value, (int, long))
  2163                     assert value >= 0, value
  2165     def get_num_handles(self, ret):
  2166         if WINDOWS:
  2167             self.assertGreaterEqual(ret, 0)
  2168         else:
  2169             self.assertGreaterEqual(ret, 0)
  2171     def get_nice(self, ret):
  2172         if POSIX:
  2173             assert -20 <= ret <= 20, ret
  2174         else:
  2175             priorities = [getattr(psutil, x) for x in dir(psutil)
  2176                           if x.endswith('_PRIORITY_CLASS')]
  2177             self.assertIn(ret, priorities)
  2179     def get_num_ctx_switches(self, ret):
  2180         self.assertTrue(ret.voluntary >= 0)
  2181         self.assertTrue(ret.involuntary >= 0)
  2184 # ===================================================================
  2185 # --- Limited user tests
  2186 # ===================================================================
  2188 if hasattr(os, 'getuid') and os.getuid() == 0:
  2189     class LimitedUserTestCase(TestProcess):
  2190         """Repeat the previous tests by using a limited user.
  2191         Executed only on UNIX and only if the user who run the test script
  2192         is root.
  2193         """
  2194         # the uid/gid the test suite runs under
  2195         PROCESS_UID = os.getuid()
  2196         PROCESS_GID = os.getgid()
  2198         def __init__(self, *args, **kwargs):
  2199             TestProcess.__init__(self, *args, **kwargs)
  2200             # re-define all existent test methods in order to
  2201             # ignore AccessDenied exceptions
  2202             for attr in [x for x in dir(self) if x.startswith('test')]:
  2203                 meth = getattr(self, attr)
  2204                 def test_(self):
  2205                     try:
  2206                         meth()
  2207                     except psutil.AccessDenied:
  2208                         pass
  2209                 setattr(self, attr, types.MethodType(test_, self))
  2211         def setUp(self):
  2212             os.setegid(1000)
  2213             os.seteuid(1000)
  2214             TestProcess.setUp(self)
  2216         def tearDown(self):
  2217             os.setegid(self.PROCESS_UID)
  2218             os.seteuid(self.PROCESS_GID)
  2219             TestProcess.tearDown(self)
  2221         def test_nice(self):
  2222             try:
  2223                 psutil.Process(os.getpid()).set_nice(-1)
  2224             except psutil.AccessDenied:
  2225                 pass
  2226             else:
  2227                 self.fail("exception not raised")
  2230 # ===================================================================
  2231 # --- Example script tests
  2232 # ===================================================================
  2234 class TestExampleScripts(unittest.TestCase):
  2235     """Tests for scripts in the examples directory."""
  2237     def assert_stdout(self, exe, args=None):
  2238         exe = os.path.join(EXAMPLES_DIR, exe)
  2239         if args:
  2240             exe = exe + ' ' + args
  2241         try:
  2242             out = sh(sys.executable + ' ' + exe).strip()
  2243         except RuntimeError:
  2244             err = sys.exc_info()[1]
  2245             if 'AccessDenied' in str(err):
  2246                 return str(err)
  2247             else:
  2248                 raise
  2249         assert out, out
  2250         return out
  2252     def assert_syntax(self, exe, args=None):
  2253         exe = os.path.join(EXAMPLES_DIR, exe)
  2254         f = open(exe, 'r')
  2255         try:
  2256             src = f.read()
  2257         finally:
  2258             f.close()
  2259         ast.parse(src)
  2261     def test_check_presence(self):
  2262         # make sure all example scripts have a test method defined
  2263         meths = dir(self)
  2264         for name in os.listdir(EXAMPLES_DIR):
  2265             if name.endswith('.py'):
  2266                 if 'test_' + os.path.splitext(name)[0] not in meths:
  2267                     #self.assert_stdout(name)
  2268                     self.fail('no test defined for %r script' \
  2269                               % os.path.join(EXAMPLES_DIR, name))
  2271     def test_disk_usage(self):
  2272         self.assert_stdout('disk_usage.py')
  2274     def test_free(self):
  2275         self.assert_stdout('free.py')
  2277     def test_meminfo(self):
  2278         self.assert_stdout('meminfo.py')
  2280     def test_process_detail(self):
  2281         self.assert_stdout('process_detail.py')
  2283     def test_who(self):
  2284         self.assert_stdout('who.py')
  2286     def test_netstat(self):
  2287         self.assert_stdout('netstat.py')
  2289     def test_pmap(self):
  2290         self.assert_stdout('pmap.py', args=str(os.getpid()))
  2292     @unittest.skipIf(ast is None,
  2293                      'ast module not available on this python version')
  2294     def test_killall(self):
  2295         self.assert_syntax('killall.py')
  2297     @unittest.skipIf(ast is None,
  2298                      'ast module not available on this python version')
  2299     def test_nettop(self):
  2300         self.assert_syntax('nettop.py')
  2302     @unittest.skipIf(ast is None,
  2303                      'ast module not available on this python version')
  2304     def test_top(self):
  2305         self.assert_syntax('top.py')
  2307     @unittest.skipIf(ast is None,
  2308                      'ast module not available on this python version')
  2309     def test_iotop(self):
  2310         self.assert_syntax('iotop.py')
  2313 def cleanup():
  2314     reap_children(search_all=True)
  2315     DEVNULL.close()
  2316     safe_remove(TESTFN)
  2318 atexit.register(cleanup)
  2319 safe_remove(TESTFN)
  2321 def test_main():
  2322     tests = []
  2323     test_suite = unittest.TestSuite()
  2324     tests.append(TestSystemAPIs)
  2325     tests.append(TestProcess)
  2326     tests.append(TestFetchAllProcesses)
  2328     if POSIX:
  2329         from _posix import PosixSpecificTestCase
  2330         tests.append(PosixSpecificTestCase)
  2332     # import the specific platform test suite
  2333     if LINUX:
  2334         from _linux import LinuxSpecificTestCase as stc
  2335     elif WINDOWS:
  2336         from _windows import WindowsSpecificTestCase as stc
  2337         from _windows import TestDualProcessImplementation
  2338         tests.append(TestDualProcessImplementation)
  2339     elif OSX:
  2340         from _osx import OSXSpecificTestCase as stc
  2341     elif BSD:
  2342         from _bsd import BSDSpecificTestCase as stc
  2343     elif SUNOS:
  2344         from _sunos import SunOSSpecificTestCase as stc
  2345     tests.append(stc)
  2347     if hasattr(os, 'getuid'):
  2348         if 'LimitedUserTestCase' in globals():
  2349             tests.append(LimitedUserTestCase)
  2350         else:
  2351             register_warning("LimitedUserTestCase was skipped (super-user "
  2352                              "privileges are required)")
  2354     tests.append(TestExampleScripts)
  2356     for test_class in tests:
  2357         test_suite.addTest(unittest.makeSuite(test_class))
  2358     result = unittest.TextTestRunner(verbosity=2).run(test_suite)
  2359     return result.wasSuccessful()
  2361 if __name__ == '__main__':
  2362     if not test_main():
  2363         sys.exit(1)

mercurial