python/psutil/test/_osx.py

branch
TOR_BUG_9701
changeset 15
b8a032363ba2
equal deleted inserted replaced
-1:000000000000 0:2c33d4b50f56
1 #!/usr/bin/env python
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """OSX specific tests. These are implicitly run by test_psutil.py."""
8
9 import unittest
10 import subprocess
11 import time
12 import sys
13 import os
14 import re
15
16 import psutil
17
18 from psutil._compat import PY3
19 from test_psutil import *
20
21
22 PAGESIZE = os.sysconf("SC_PAGE_SIZE")
23
24
25 def sysctl(cmdline):
26 """Expects a sysctl command with an argument and parse the result
27 returning only the value of interest.
28 """
29 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
30 result = p.communicate()[0].strip().split()[1]
31 if PY3:
32 result = str(result, sys.stdout.encoding)
33 try:
34 return int(result)
35 except ValueError:
36 return result
37
38 def vm_stat(field):
39 """Wrapper around 'vm_stat' cmdline utility."""
40 out = sh('vm_stat')
41 for line in out.split('\n'):
42 if field in line:
43 break
44 else:
45 raise ValueError("line not found")
46 return int(re.search('\d+', line).group(0)) * PAGESIZE
47
48
49 class OSXSpecificTestCase(unittest.TestCase):
50
51 def setUp(self):
52 self.pid = get_test_subprocess().pid
53
54 def tearDown(self):
55 reap_children()
56
57 def test_process_create_time(self):
58 cmdline = "ps -o lstart -p %s" %self.pid
59 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
60 output = p.communicate()[0]
61 if PY3:
62 output = str(output, sys.stdout.encoding)
63 start_ps = output.replace('STARTED', '').strip()
64 start_psutil = psutil.Process(self.pid).create_time
65 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
66 time.localtime(start_psutil))
67 self.assertEqual(start_ps, start_psutil)
68
69 def test_disks(self):
70 # test psutil.disk_usage() and psutil.disk_partitions()
71 # against "df -a"
72 def df(path):
73 out = sh('df -k "%s"' % path).strip()
74 lines = out.split('\n')
75 lines.pop(0)
76 line = lines.pop(0)
77 dev, total, used, free = line.split()[:4]
78 if dev == 'none':
79 dev = ''
80 total = int(total) * 1024
81 used = int(used) * 1024
82 free = int(free) * 1024
83 return dev, total, used, free
84
85 for part in psutil.disk_partitions(all=False):
86 usage = psutil.disk_usage(part.mountpoint)
87 dev, total, used, free = df(part.mountpoint)
88 self.assertEqual(part.device, dev)
89 self.assertEqual(usage.total, total)
90 # 10 MB tollerance
91 if abs(usage.free - free) > 10 * 1024 * 1024:
92 self.fail("psutil=%s, df=%s" % usage.free, free)
93 if abs(usage.used - used) > 10 * 1024 * 1024:
94 self.fail("psutil=%s, df=%s" % usage.used, used)
95
96 # --- virtual mem
97
98 def test_vmem_total(self):
99 sysctl_hwphymem = sysctl('sysctl hw.memsize')
100 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
101
102 @retry_before_failing()
103 def test_vmem_free(self):
104 num = vm_stat("free")
105 self.assertAlmostEqual(psutil.virtual_memory().free, num,
106 delta=TOLERANCE)
107
108 @retry_before_failing()
109 def test_vmem_active(self):
110 num = vm_stat("active")
111 self.assertAlmostEqual(psutil.virtual_memory().active, num,
112 delta=TOLERANCE)
113
114 @retry_before_failing()
115 def test_vmem_inactive(self):
116 num = vm_stat("inactive")
117 self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
118 delta=TOLERANCE)
119
120 @retry_before_failing()
121 def test_vmem_wired(self):
122 num = vm_stat("wired")
123 self.assertAlmostEqual(psutil.virtual_memory().wired, num,
124 delta=TOLERANCE)
125
126 # --- swap mem
127
128 def test_swapmem_sin(self):
129 num = vm_stat("Pageins")
130 self.assertEqual(psutil.swap_memory().sin, num)
131
132 def test_swapmem_sout(self):
133 num = vm_stat("Pageouts")
134 self.assertEqual(psutil.swap_memory().sout, num)
135
136 def test_swapmem_total(self):
137 tot1 = psutil.swap_memory().total
138 tot2 = 0
139 # OSX uses multiple cache files:
140 # http://en.wikipedia.org/wiki/Paging#OS_X
141 for name in os.listdir("/var/vm/"):
142 file = os.path.join("/var/vm", name)
143 if os.path.isfile(file):
144 tot2 += os.path.getsize(file)
145 self.assertEqual(tot1, tot2)
146
147
148 def test_main():
149 test_suite = unittest.TestSuite()
150 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
151 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
152 return result.wasSuccessful()
153
154 if __name__ == '__main__':
155 if not test_main():
156 sys.exit(1)

mercurial