michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """OSX specific tests. These are implicitly run by test_psutil.py.""" michael@0: michael@0: import unittest michael@0: import subprocess michael@0: import time michael@0: import sys michael@0: import os michael@0: import re michael@0: michael@0: import psutil michael@0: michael@0: from psutil._compat import PY3 michael@0: from test_psutil import * michael@0: michael@0: michael@0: PAGESIZE = os.sysconf("SC_PAGE_SIZE") michael@0: michael@0: michael@0: def sysctl(cmdline): michael@0: """Expects a sysctl command with an argument and parse the result michael@0: returning only the value of interest. michael@0: """ michael@0: p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) michael@0: result = p.communicate()[0].strip().split()[1] michael@0: if PY3: michael@0: result = str(result, sys.stdout.encoding) michael@0: try: michael@0: return int(result) michael@0: except ValueError: michael@0: return result michael@0: michael@0: def vm_stat(field): michael@0: """Wrapper around 'vm_stat' cmdline utility.""" michael@0: out = sh('vm_stat') michael@0: for line in out.split('\n'): michael@0: if field in line: michael@0: break michael@0: else: michael@0: raise ValueError("line not found") michael@0: return int(re.search('\d+', line).group(0)) * PAGESIZE michael@0: michael@0: michael@0: class OSXSpecificTestCase(unittest.TestCase): michael@0: michael@0: def setUp(self): michael@0: self.pid = get_test_subprocess().pid michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def test_process_create_time(self): michael@0: cmdline = "ps -o lstart -p %s" %self.pid michael@0: p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) michael@0: output = p.communicate()[0] michael@0: if PY3: michael@0: output = str(output, sys.stdout.encoding) michael@0: start_ps = output.replace('STARTED', '').strip() michael@0: start_psutil = psutil.Process(self.pid).create_time michael@0: start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", michael@0: time.localtime(start_psutil)) michael@0: self.assertEqual(start_ps, start_psutil) michael@0: michael@0: def test_disks(self): michael@0: # test psutil.disk_usage() and psutil.disk_partitions() michael@0: # against "df -a" michael@0: def df(path): michael@0: out = sh('df -k "%s"' % path).strip() michael@0: lines = out.split('\n') michael@0: lines.pop(0) michael@0: line = lines.pop(0) michael@0: dev, total, used, free = line.split()[:4] michael@0: if dev == 'none': michael@0: dev = '' michael@0: total = int(total) * 1024 michael@0: used = int(used) * 1024 michael@0: free = int(free) * 1024 michael@0: return dev, total, used, free michael@0: michael@0: for part in psutil.disk_partitions(all=False): michael@0: usage = psutil.disk_usage(part.mountpoint) michael@0: dev, total, used, free = df(part.mountpoint) michael@0: self.assertEqual(part.device, dev) michael@0: self.assertEqual(usage.total, total) michael@0: # 10 MB tollerance michael@0: if abs(usage.free - free) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % usage.free, free) michael@0: if abs(usage.used - used) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % usage.used, used) michael@0: michael@0: # --- virtual mem michael@0: michael@0: def test_vmem_total(self): michael@0: sysctl_hwphymem = sysctl('sysctl hw.memsize') michael@0: self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_free(self): michael@0: num = vm_stat("free") michael@0: self.assertAlmostEqual(psutil.virtual_memory().free, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_active(self): michael@0: num = vm_stat("active") michael@0: self.assertAlmostEqual(psutil.virtual_memory().active, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_inactive(self): michael@0: num = vm_stat("inactive") michael@0: self.assertAlmostEqual(psutil.virtual_memory().inactive, num, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_wired(self): michael@0: num = vm_stat("wired") michael@0: self.assertAlmostEqual(psutil.virtual_memory().wired, num, michael@0: delta=TOLERANCE) michael@0: michael@0: # --- swap mem michael@0: michael@0: def test_swapmem_sin(self): michael@0: num = vm_stat("Pageins") michael@0: self.assertEqual(psutil.swap_memory().sin, num) michael@0: michael@0: def test_swapmem_sout(self): michael@0: num = vm_stat("Pageouts") michael@0: self.assertEqual(psutil.swap_memory().sout, num) michael@0: michael@0: def test_swapmem_total(self): michael@0: tot1 = psutil.swap_memory().total michael@0: tot2 = 0 michael@0: # OSX uses multiple cache files: michael@0: # http://en.wikipedia.org/wiki/Paging#OS_X michael@0: for name in os.listdir("/var/vm/"): michael@0: file = os.path.join("/var/vm", name) michael@0: if os.path.isfile(file): michael@0: tot2 += os.path.getsize(file) michael@0: self.assertEqual(tot1, tot2) michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)