python/psutil/test/_posix.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rwxr-xr-x

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

     1 #!/usr/bin/env python
     3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     4 # Use of this source code is governed by a BSD-style license that can be
     5 # found in the LICENSE file.
     7 """POSIX specific tests.  These are implicitly run by test_psutil.py."""
     9 import unittest
    10 import subprocess
    11 import time
    12 import sys
    13 import os
    14 import datetime
    16 import psutil
    18 from psutil._compat import PY3
    19 from test_psutil import *
    22 def ps(cmd):
    23     """Expects a ps command with a -o argument and parse the result
    24     returning only the value of interest.
    25     """
    26     if not LINUX:
    27         cmd = cmd.replace(" --no-headers ", " ")
    28     if SUNOS:
    29         cmd = cmd.replace("-o command", "-o comm")
    30         cmd = cmd.replace("-o start", "-o stime")
    31     p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
    32     output = p.communicate()[0].strip()
    33     if PY3:
    34         output = str(output, sys.stdout.encoding)
    35     if not LINUX:
    36         output = output.split('\n')[1].strip()
    37     try:
    38         return int(output)
    39     except ValueError:
    40         return output
    43 class PosixSpecificTestCase(unittest.TestCase):
    44     """Compare psutil results against 'ps' command line utility."""
    46     # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
    48     def setUp(self):
    49         self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
    50                                        stdin=subprocess.PIPE).pid
    52     def tearDown(self):
    53         reap_children()
    55     def test_process_parent_pid(self):
    56         ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
    57         ppid_psutil = psutil.Process(self.pid).ppid
    58         self.assertEqual(ppid_ps, ppid_psutil)
    60     def test_process_uid(self):
    61         uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
    62         uid_psutil = psutil.Process(self.pid).uids.real
    63         self.assertEqual(uid_ps, uid_psutil)
    65     def test_process_gid(self):
    66         gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
    67         gid_psutil = psutil.Process(self.pid).gids.real
    68         self.assertEqual(gid_ps, gid_psutil)
    70     def test_process_username(self):
    71         username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
    72         username_psutil = psutil.Process(self.pid).username
    73         self.assertEqual(username_ps, username_psutil)
    75     @skip_on_access_denied()
    76     def test_process_rss_memory(self):
    77         # give python interpreter some time to properly initialize
    78         # so that the results are the same
    79         time.sleep(0.1)
    80         rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
    81         rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
    82         self.assertEqual(rss_ps, rss_psutil)
    84     @skip_on_access_denied()
    85     def test_process_vsz_memory(self):
    86         # give python interpreter some time to properly initialize
    87         # so that the results are the same
    88         time.sleep(0.1)
    89         vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
    90         vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
    91         self.assertEqual(vsz_ps, vsz_psutil)
    93     def test_process_name(self):
    94         # use command + arg since "comm" keyword not supported on all platforms
    95         name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
    96         # remove path if there is any, from the command
    97         name_ps = os.path.basename(name_ps).lower()
    98         name_psutil = psutil.Process(self.pid).name.lower()
    99         self.assertEqual(name_ps, name_psutil)
   101     @unittest.skipIf(OSX or BSD,
   102                     'ps -o start not available')
   103     def test_process_create_time(self):
   104         time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
   105         time_psutil = psutil.Process(self.pid).create_time
   106         if SUNOS:
   107             time_psutil = round(time_psutil)
   108         time_psutil_tstamp = datetime.datetime.fromtimestamp(
   109                         time_psutil).strftime("%H:%M:%S")
   110         self.assertEqual(time_ps, time_psutil_tstamp)
   112     def test_process_exe(self):
   113         ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
   114         psutil_pathname = psutil.Process(self.pid).exe
   115         try:
   116             self.assertEqual(ps_pathname, psutil_pathname)
   117         except AssertionError:
   118             # certain platforms such as BSD are more accurate returning:
   119             # "/usr/local/bin/python2.7"
   120             # ...instead of:
   121             # "/usr/local/bin/python"
   122             # We do not want to consider this difference in accuracy
   123             # an error.
   124             adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
   125             self.assertEqual(ps_pathname, adjusted_ps_pathname)
   127     def test_process_cmdline(self):
   128         ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
   129         psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
   130         if SUNOS:
   131             # ps on Solaris only shows the first part of the cmdline
   132             psutil_cmdline = psutil_cmdline.split(" ")[0]
   133         self.assertEqual(ps_cmdline, psutil_cmdline)
   135     @retry_before_failing()
   136     def test_get_pids(self):
   137         # Note: this test might fail if the OS is starting/killing
   138         # other processes in the meantime
   139         if SUNOS:
   140             cmd = ["ps", "ax"]
   141         else:
   142             cmd = ["ps", "ax", "-o", "pid"]
   143         p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
   144         output = p.communicate()[0].strip()
   145         if PY3:
   146             output = str(output, sys.stdout.encoding)
   147         pids_ps = []
   148         for line in output.split('\n')[1:]:
   149             if line:
   150                 pid = int(line.split()[0].strip())
   151                 pids_ps.append(pid)
   152         # remove ps subprocess pid which is supposed to be dead in meantime
   153         pids_ps.remove(p.pid)
   154         pids_psutil = psutil.get_pid_list()
   155         pids_ps.sort()
   156         pids_psutil.sort()
   158         # on OSX ps doesn't show pid 0
   159         if OSX and 0 not in pids_ps:
   160             pids_ps.insert(0, 0)
   162         if pids_ps != pids_psutil:
   163             difference = [x for x in pids_psutil if x not in pids_ps] + \
   164                          [x for x in pids_ps if x not in pids_psutil]
   165             self.fail("difference: " + str(difference))
   167     # for some reason ifconfig -a does not report differente interfaces
   168     # psutil does
   169     @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
   170     def test_nic_names(self):
   171         p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
   172         output = p.communicate()[0].strip()
   173         if PY3:
   174             output = str(output, sys.stdout.encoding)
   175         for nic in psutil.net_io_counters(pernic=True).keys():
   176             for line in output.split():
   177                 if line.startswith(nic):
   178                     break
   179             else:
   180                 self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
   182     def test_get_users(self):
   183         out = sh("who")
   184         lines = out.split('\n')
   185         users = [x.split()[0] for x in lines]
   186         self.assertEqual(len(users), len(psutil.get_users()))
   187         terminals = [x.split()[1] for x in lines]
   188         for u in psutil.get_users():
   189             self.assertTrue(u.name in users, u.name)
   190             self.assertTrue(u.terminal in terminals, u.terminal)
   192     def test_fds_open(self):
   193         # Note: this fails from time to time; I'm keen on thinking
   194         # it doesn't mean something is broken
   195         def call(p, attr):
   196             attr = getattr(p, name, None)
   197             if attr is not None and callable(attr):
   198                 ret = attr()
   199             else:
   200                 ret = attr
   202         p = psutil.Process(os.getpid())
   203         attrs = []
   204         failures = []
   205         for name in dir(psutil.Process):
   206             if name.startswith('_') \
   207             or name.startswith('set_') \
   208             or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
   209                         'send_signal', 'wait', 'get_children', 'as_dict'):
   210                 continue
   211             else:
   212                 try:
   213                     num1 = p.get_num_fds()
   214                     for x in range(2):
   215                         call(p, name)
   216                     num2 = p.get_num_fds()
   217                 except psutil.AccessDenied:
   218                     pass
   219                 else:
   220                     if abs(num2 - num1) > 1:
   221                         fail = "failure while processing Process.%s method " \
   222                                "(before=%s, after=%s)" % (name, num1, num2)
   223                         failures.append(fail)
   224         if failures:
   225             self.fail('\n' + '\n'.join(failures))
   230 def test_main():
   231     test_suite = unittest.TestSuite()
   232     test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
   233     result = unittest.TextTestRunner(verbosity=2).run(test_suite)
   234     return result.wasSuccessful()
   236 if __name__ == '__main__':
   237     if not test_main():
   238         sys.exit(1)

mercurial