michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """BSD specific tests. These are implicitly run by test_psutil.py.""" michael@0: michael@0: import unittest michael@0: import subprocess michael@0: import time michael@0: import re michael@0: import sys michael@0: import os michael@0: michael@0: import psutil michael@0: michael@0: from psutil._compat import PY3 michael@0: from test_psutil import * michael@0: michael@0: michael@0: PAGESIZE = os.sysconf("SC_PAGE_SIZE") michael@0: MUSE_AVAILABLE = which('muse') michael@0: michael@0: michael@0: def sysctl(cmdline): michael@0: """Expects a sysctl command with an argument and parse the result michael@0: returning only the value of interest. michael@0: """ michael@0: result = sh("sysctl " + cmdline) michael@0: result = result[result.find(": ") + 2:] michael@0: try: michael@0: return int(result) michael@0: except ValueError: michael@0: return result michael@0: michael@0: def muse(field): michael@0: """Thin wrapper around 'muse' cmdline utility.""" michael@0: out = sh('muse', stderr=DEVNULL) michael@0: for line in out.split('\n'): michael@0: if line.startswith(field): michael@0: break michael@0: else: michael@0: raise ValueError("line not found") michael@0: return int(line.split()[1]) michael@0: michael@0: michael@0: class BSDSpecificTestCase(unittest.TestCase): michael@0: michael@0: def setUp(self): michael@0: self.pid = get_test_subprocess().pid michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_BOOT_TIME(self): michael@0: s = sysctl('sysctl kern.boottime') michael@0: s = s[s.find(" sec = ") + 7:] michael@0: s = s[:s.find(',')] michael@0: btime = int(s) michael@0: self.assertEqual(btime, psutil.BOOT_TIME) michael@0: michael@0: def test_process_create_time(self): michael@0: cmdline = "ps -o lstart -p %s" %self.pid michael@0: p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) michael@0: output = p.communicate()[0] michael@0: if PY3: michael@0: output = str(output, sys.stdout.encoding) michael@0: start_ps = output.replace('STARTED', '').strip() michael@0: start_psutil = psutil.Process(self.pid).create_time michael@0: start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", michael@0: time.localtime(start_psutil)) michael@0: self.assertEqual(start_ps, start_psutil) michael@0: michael@0: def test_disks(self): michael@0: # test psutil.disk_usage() and psutil.disk_partitions() michael@0: # against "df -a" michael@0: def df(path): michael@0: out = sh('df -k "%s"' % path).strip() michael@0: lines = out.split('\n') michael@0: lines.pop(0) michael@0: line = lines.pop(0) michael@0: dev, total, used, free = line.split()[:4] michael@0: if dev == 'none': michael@0: dev = '' michael@0: total = int(total) * 1024 michael@0: used = int(used) * 1024 michael@0: free = int(free) * 1024 michael@0: return dev, total, used, free michael@0: michael@0: for part in psutil.disk_partitions(all=False): michael@0: usage = psutil.disk_usage(part.mountpoint) michael@0: dev, total, used, free = df(part.mountpoint) michael@0: self.assertEqual(part.device, dev) michael@0: self.assertEqual(usage.total, total) michael@0: # 10 MB tollerance michael@0: if abs(usage.free - free) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % (usage.free, free)) michael@0: if abs(usage.used - used) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % (usage.used, used)) michael@0: michael@0: def test_memory_maps(self): michael@0: out = sh('procstat -v %s' % self.pid) michael@0: maps = psutil.Process(self.pid).get_memory_maps(grouped=False) michael@0: lines = out.split('\n')[1:] michael@0: while lines: michael@0: line = lines.pop() michael@0: fields = line.split() michael@0: _, start, stop, perms, res = fields[:5] michael@0: map = maps.pop() michael@0: self.assertEqual("%s-%s" % (start, stop), map.addr) michael@0: self.assertEqual(int(res), map.rss) michael@0: if not map.path.startswith('['): michael@0: self.assertEqual(fields[10], map.path) michael@0: michael@0: # --- virtual_memory(); tests against sysctl michael@0: michael@0: def test_vmem_total(self): michael@0: syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE michael@0: self.assertEqual(psutil.virtual_memory().total, syst) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_active(self): michael@0: syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE michael@0: self.assertAlmostEqual(psutil.virtual_memory().active, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_inactive(self): michael@0: syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE michael@0: self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_wired(self): michael@0: syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE michael@0: self.assertAlmostEqual(psutil.virtual_memory().wired, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_cached(self): michael@0: syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE michael@0: self.assertAlmostEqual(psutil.virtual_memory().cached, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_free(self): michael@0: syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE michael@0: self.assertAlmostEqual(psutil.virtual_memory().free, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_buffers(self): michael@0: syst = sysctl("vfs.bufspace") michael@0: self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, michael@0: delta=TOLERANCE) michael@0: michael@0: # --- virtual_memory(); tests against muse michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: def test_total(self): michael@0: num = muse('Total') michael@0: self.assertEqual(psutil.virtual_memory().total, num) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_active(self): michael@0: num = muse('Active') michael@0: self.assertAlmostEqual(psutil.virtual_memory().active, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_inactive(self): michael@0: num = muse('Inactive') michael@0: self.assertAlmostEqual(psutil.virtual_memory().inactive, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_wired(self): michael@0: num = muse('Wired') michael@0: self.assertAlmostEqual(psutil.virtual_memory().wired, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_cached(self): michael@0: num = muse('Cache') michael@0: self.assertAlmostEqual(psutil.virtual_memory().cached, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_free(self): michael@0: num = muse('Free') michael@0: self.assertAlmostEqual(psutil.virtual_memory().free, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") michael@0: @retry_before_failing() michael@0: def test_buffers(self): michael@0: num = muse('Buffer') michael@0: self.assertAlmostEqual(psutil.virtual_memory().buffers, num, michael@0: delta=TOLERANCE) michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)