python/psutil/test/_osx.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 #!/usr/bin/env python
michael@0 2
michael@0 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
michael@0 4 # Use of this source code is governed by a BSD-style license that can be
michael@0 5 # found in the LICENSE file.
michael@0 6
michael@0 7 """OSX specific tests. These are implicitly run by test_psutil.py."""
michael@0 8
michael@0 9 import unittest
michael@0 10 import subprocess
michael@0 11 import time
michael@0 12 import sys
michael@0 13 import os
michael@0 14 import re
michael@0 15
michael@0 16 import psutil
michael@0 17
michael@0 18 from psutil._compat import PY3
michael@0 19 from test_psutil import *
michael@0 20
michael@0 21
michael@0 22 PAGESIZE = os.sysconf("SC_PAGE_SIZE")
michael@0 23
michael@0 24
michael@0 25 def sysctl(cmdline):
michael@0 26 """Expects a sysctl command with an argument and parse the result
michael@0 27 returning only the value of interest.
michael@0 28 """
michael@0 29 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
michael@0 30 result = p.communicate()[0].strip().split()[1]
michael@0 31 if PY3:
michael@0 32 result = str(result, sys.stdout.encoding)
michael@0 33 try:
michael@0 34 return int(result)
michael@0 35 except ValueError:
michael@0 36 return result
michael@0 37
michael@0 38 def vm_stat(field):
michael@0 39 """Wrapper around 'vm_stat' cmdline utility."""
michael@0 40 out = sh('vm_stat')
michael@0 41 for line in out.split('\n'):
michael@0 42 if field in line:
michael@0 43 break
michael@0 44 else:
michael@0 45 raise ValueError("line not found")
michael@0 46 return int(re.search('\d+', line).group(0)) * PAGESIZE
michael@0 47
michael@0 48
michael@0 49 class OSXSpecificTestCase(unittest.TestCase):
michael@0 50
michael@0 51 def setUp(self):
michael@0 52 self.pid = get_test_subprocess().pid
michael@0 53
michael@0 54 def tearDown(self):
michael@0 55 reap_children()
michael@0 56
michael@0 57 def test_process_create_time(self):
michael@0 58 cmdline = "ps -o lstart -p %s" %self.pid
michael@0 59 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
michael@0 60 output = p.communicate()[0]
michael@0 61 if PY3:
michael@0 62 output = str(output, sys.stdout.encoding)
michael@0 63 start_ps = output.replace('STARTED', '').strip()
michael@0 64 start_psutil = psutil.Process(self.pid).create_time
michael@0 65 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
michael@0 66 time.localtime(start_psutil))
michael@0 67 self.assertEqual(start_ps, start_psutil)
michael@0 68
michael@0 69 def test_disks(self):
michael@0 70 # test psutil.disk_usage() and psutil.disk_partitions()
michael@0 71 # against "df -a"
michael@0 72 def df(path):
michael@0 73 out = sh('df -k "%s"' % path).strip()
michael@0 74 lines = out.split('\n')
michael@0 75 lines.pop(0)
michael@0 76 line = lines.pop(0)
michael@0 77 dev, total, used, free = line.split()[:4]
michael@0 78 if dev == 'none':
michael@0 79 dev = ''
michael@0 80 total = int(total) * 1024
michael@0 81 used = int(used) * 1024
michael@0 82 free = int(free) * 1024
michael@0 83 return dev, total, used, free
michael@0 84
michael@0 85 for part in psutil.disk_partitions(all=False):
michael@0 86 usage = psutil.disk_usage(part.mountpoint)
michael@0 87 dev, total, used, free = df(part.mountpoint)
michael@0 88 self.assertEqual(part.device, dev)
michael@0 89 self.assertEqual(usage.total, total)
michael@0 90 # 10 MB tollerance
michael@0 91 if abs(usage.free - free) > 10 * 1024 * 1024:
michael@0 92 self.fail("psutil=%s, df=%s" % usage.free, free)
michael@0 93 if abs(usage.used - used) > 10 * 1024 * 1024:
michael@0 94 self.fail("psutil=%s, df=%s" % usage.used, used)
michael@0 95
michael@0 96 # --- virtual mem
michael@0 97
michael@0 98 def test_vmem_total(self):
michael@0 99 sysctl_hwphymem = sysctl('sysctl hw.memsize')
michael@0 100 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
michael@0 101
michael@0 102 @retry_before_failing()
michael@0 103 def test_vmem_free(self):
michael@0 104 num = vm_stat("free")
michael@0 105 self.assertAlmostEqual(psutil.virtual_memory().free, num,
michael@0 106 delta=TOLERANCE)
michael@0 107
michael@0 108 @retry_before_failing()
michael@0 109 def test_vmem_active(self):
michael@0 110 num = vm_stat("active")
michael@0 111 self.assertAlmostEqual(psutil.virtual_memory().active, num,
michael@0 112 delta=TOLERANCE)
michael@0 113
michael@0 114 @retry_before_failing()
michael@0 115 def test_vmem_inactive(self):
michael@0 116 num = vm_stat("inactive")
michael@0 117 self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
michael@0 118 delta=TOLERANCE)
michael@0 119
michael@0 120 @retry_before_failing()
michael@0 121 def test_vmem_wired(self):
michael@0 122 num = vm_stat("wired")
michael@0 123 self.assertAlmostEqual(psutil.virtual_memory().wired, num,
michael@0 124 delta=TOLERANCE)
michael@0 125
michael@0 126 # --- swap mem
michael@0 127
michael@0 128 def test_swapmem_sin(self):
michael@0 129 num = vm_stat("Pageins")
michael@0 130 self.assertEqual(psutil.swap_memory().sin, num)
michael@0 131
michael@0 132 def test_swapmem_sout(self):
michael@0 133 num = vm_stat("Pageouts")
michael@0 134 self.assertEqual(psutil.swap_memory().sout, num)
michael@0 135
michael@0 136 def test_swapmem_total(self):
michael@0 137 tot1 = psutil.swap_memory().total
michael@0 138 tot2 = 0
michael@0 139 # OSX uses multiple cache files:
michael@0 140 # http://en.wikipedia.org/wiki/Paging#OS_X
michael@0 141 for name in os.listdir("/var/vm/"):
michael@0 142 file = os.path.join("/var/vm", name)
michael@0 143 if os.path.isfile(file):
michael@0 144 tot2 += os.path.getsize(file)
michael@0 145 self.assertEqual(tot1, tot2)
michael@0 146
michael@0 147
michael@0 148 def test_main():
michael@0 149 test_suite = unittest.TestSuite()
michael@0 150 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
michael@0 151 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
michael@0 152 return result.wasSuccessful()
michael@0 153
michael@0 154 if __name__ == '__main__':
michael@0 155 if not test_main():
michael@0 156 sys.exit(1)

mercurial