michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """POSIX specific tests. These are implicitly run by test_psutil.py.""" michael@0: michael@0: import unittest michael@0: import subprocess michael@0: import time michael@0: import sys michael@0: import os michael@0: import datetime michael@0: michael@0: import psutil michael@0: michael@0: from psutil._compat import PY3 michael@0: from test_psutil import * michael@0: michael@0: michael@0: def ps(cmd): michael@0: """Expects a ps command with a -o argument and parse the result michael@0: returning only the value of interest. michael@0: """ michael@0: if not LINUX: michael@0: cmd = cmd.replace(" --no-headers ", " ") michael@0: if SUNOS: michael@0: cmd = cmd.replace("-o command", "-o comm") michael@0: cmd = cmd.replace("-o start", "-o stime") michael@0: p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) michael@0: output = p.communicate()[0].strip() michael@0: if PY3: michael@0: output = str(output, sys.stdout.encoding) michael@0: if not LINUX: michael@0: output = output.split('\n')[1].strip() michael@0: try: michael@0: return int(output) michael@0: except ValueError: michael@0: return output michael@0: michael@0: michael@0: class PosixSpecificTestCase(unittest.TestCase): michael@0: """Compare psutil results against 'ps' command line utility.""" michael@0: michael@0: # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps michael@0: michael@0: def setUp(self): michael@0: self.pid = get_test_subprocess([PYTHON, "-E", "-O"], michael@0: stdin=subprocess.PIPE).pid michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_process_parent_pid(self): michael@0: ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) michael@0: ppid_psutil = psutil.Process(self.pid).ppid michael@0: self.assertEqual(ppid_ps, ppid_psutil) michael@0: michael@0: def test_process_uid(self): michael@0: uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) michael@0: uid_psutil = psutil.Process(self.pid).uids.real michael@0: self.assertEqual(uid_ps, uid_psutil) michael@0: michael@0: def test_process_gid(self): michael@0: gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) michael@0: gid_psutil = psutil.Process(self.pid).gids.real michael@0: self.assertEqual(gid_ps, gid_psutil) michael@0: michael@0: def test_process_username(self): michael@0: username_ps = ps("ps --no-headers -o user -p %s" %self.pid) michael@0: username_psutil = psutil.Process(self.pid).username michael@0: self.assertEqual(username_ps, username_psutil) michael@0: michael@0: @skip_on_access_denied() michael@0: def test_process_rss_memory(self): michael@0: # give python interpreter some time to properly initialize michael@0: # so that the results are the same michael@0: time.sleep(0.1) michael@0: rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) michael@0: rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 michael@0: self.assertEqual(rss_ps, rss_psutil) michael@0: michael@0: @skip_on_access_denied() michael@0: def test_process_vsz_memory(self): michael@0: # give python interpreter some time to properly initialize michael@0: # so that the results are the same michael@0: time.sleep(0.1) michael@0: vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) michael@0: vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 michael@0: self.assertEqual(vsz_ps, vsz_psutil) michael@0: michael@0: def test_process_name(self): michael@0: # use command + arg since "comm" keyword not supported on all platforms michael@0: name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] michael@0: # remove path if there is any, from the command michael@0: name_ps = os.path.basename(name_ps).lower() michael@0: name_psutil = psutil.Process(self.pid).name.lower() michael@0: self.assertEqual(name_ps, name_psutil) michael@0: michael@0: @unittest.skipIf(OSX or BSD, michael@0: 'ps -o start not available') michael@0: def test_process_create_time(self): michael@0: time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0] michael@0: time_psutil = psutil.Process(self.pid).create_time michael@0: if SUNOS: michael@0: time_psutil = round(time_psutil) michael@0: time_psutil_tstamp = datetime.datetime.fromtimestamp( michael@0: time_psutil).strftime("%H:%M:%S") michael@0: self.assertEqual(time_ps, time_psutil_tstamp) michael@0: michael@0: def test_process_exe(self): michael@0: ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] michael@0: psutil_pathname = psutil.Process(self.pid).exe michael@0: try: michael@0: self.assertEqual(ps_pathname, psutil_pathname) michael@0: except AssertionError: michael@0: # certain platforms such as BSD are more accurate returning: michael@0: # "/usr/local/bin/python2.7" michael@0: # ...instead of: michael@0: # "/usr/local/bin/python" michael@0: # We do not want to consider this difference in accuracy michael@0: # an error. michael@0: adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] michael@0: self.assertEqual(ps_pathname, adjusted_ps_pathname) michael@0: michael@0: def test_process_cmdline(self): michael@0: ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) michael@0: psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) michael@0: if SUNOS: michael@0: # ps on Solaris only shows the first part of the cmdline michael@0: psutil_cmdline = psutil_cmdline.split(" ")[0] michael@0: self.assertEqual(ps_cmdline, psutil_cmdline) michael@0: michael@0: @retry_before_failing() michael@0: def test_get_pids(self): michael@0: # Note: this test might fail if the OS is starting/killing michael@0: # other processes in the meantime michael@0: if SUNOS: michael@0: cmd = ["ps", "ax"] michael@0: else: michael@0: cmd = ["ps", "ax", "-o", "pid"] michael@0: p = get_test_subprocess(cmd, stdout=subprocess.PIPE) michael@0: output = p.communicate()[0].strip() michael@0: if PY3: michael@0: output = str(output, sys.stdout.encoding) michael@0: pids_ps = [] michael@0: for line in output.split('\n')[1:]: michael@0: if line: michael@0: pid = int(line.split()[0].strip()) michael@0: pids_ps.append(pid) michael@0: # remove ps subprocess pid which is supposed to be dead in meantime michael@0: pids_ps.remove(p.pid) michael@0: pids_psutil = psutil.get_pid_list() michael@0: pids_ps.sort() michael@0: pids_psutil.sort() michael@0: michael@0: # on OSX ps doesn't show pid 0 michael@0: if OSX and 0 not in pids_ps: michael@0: pids_ps.insert(0, 0) michael@0: michael@0: if pids_ps != pids_psutil: michael@0: difference = [x for x in pids_psutil if x not in pids_ps] + \ michael@0: [x for x in pids_ps if x not in pids_psutil] michael@0: self.fail("difference: " + str(difference)) michael@0: michael@0: # for some reason ifconfig -a does not report differente interfaces michael@0: # psutil does michael@0: @unittest.skipIf(SUNOS, "test not reliable on SUNOS") michael@0: def test_nic_names(self): michael@0: p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) michael@0: output = p.communicate()[0].strip() michael@0: if PY3: michael@0: output = str(output, sys.stdout.encoding) michael@0: for nic in psutil.net_io_counters(pernic=True).keys(): michael@0: for line in output.split(): michael@0: if line.startswith(nic): michael@0: break michael@0: else: michael@0: self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic) michael@0: michael@0: def test_get_users(self): michael@0: out = sh("who") michael@0: lines = out.split('\n') michael@0: users = [x.split()[0] for x in lines] michael@0: self.assertEqual(len(users), len(psutil.get_users())) michael@0: terminals = [x.split()[1] for x in lines] michael@0: for u in psutil.get_users(): michael@0: self.assertTrue(u.name in users, u.name) michael@0: self.assertTrue(u.terminal in terminals, u.terminal) michael@0: michael@0: def test_fds_open(self): michael@0: # Note: this fails from time to time; I'm keen on thinking michael@0: # it doesn't mean something is broken michael@0: def call(p, attr): michael@0: attr = getattr(p, name, None) michael@0: if attr is not None and callable(attr): michael@0: ret = attr() michael@0: else: michael@0: ret = attr michael@0: michael@0: p = psutil.Process(os.getpid()) michael@0: attrs = [] michael@0: failures = [] michael@0: for name in dir(psutil.Process): michael@0: if name.startswith('_') \ michael@0: or name.startswith('set_') \ michael@0: or name in ('terminate', 'kill', 'suspend', 'resume', 'nice', michael@0: 'send_signal', 'wait', 'get_children', 'as_dict'): michael@0: continue michael@0: else: michael@0: try: michael@0: num1 = p.get_num_fds() michael@0: for x in range(2): michael@0: call(p, name) michael@0: num2 = p.get_num_fds() michael@0: except psutil.AccessDenied: michael@0: pass michael@0: else: michael@0: if abs(num2 - num1) > 1: michael@0: fail = "failure while processing Process.%s method " \ michael@0: "(before=%s, after=%s)" % (name, num1, num2) michael@0: failures.append(fail) michael@0: if failures: michael@0: self.fail('\n' + '\n'.join(failures)) michael@0: michael@0: michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)