|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """BSD specific tests. These are implicitly run by test_psutil.py.""" |
|
8 |
|
9 import unittest |
|
10 import subprocess |
|
11 import time |
|
12 import re |
|
13 import sys |
|
14 import os |
|
15 |
|
16 import psutil |
|
17 |
|
18 from psutil._compat import PY3 |
|
19 from test_psutil import * |
|
20 |
|
21 |
|
22 PAGESIZE = os.sysconf("SC_PAGE_SIZE") |
|
23 MUSE_AVAILABLE = which('muse') |
|
24 |
|
25 |
|
26 def sysctl(cmdline): |
|
27 """Expects a sysctl command with an argument and parse the result |
|
28 returning only the value of interest. |
|
29 """ |
|
30 result = sh("sysctl " + cmdline) |
|
31 result = result[result.find(": ") + 2:] |
|
32 try: |
|
33 return int(result) |
|
34 except ValueError: |
|
35 return result |
|
36 |
|
37 def muse(field): |
|
38 """Thin wrapper around 'muse' cmdline utility.""" |
|
39 out = sh('muse', stderr=DEVNULL) |
|
40 for line in out.split('\n'): |
|
41 if line.startswith(field): |
|
42 break |
|
43 else: |
|
44 raise ValueError("line not found") |
|
45 return int(line.split()[1]) |
|
46 |
|
47 |
|
48 class BSDSpecificTestCase(unittest.TestCase): |
|
49 |
|
50 def setUp(self): |
|
51 self.pid = get_test_subprocess().pid |
|
52 |
|
53 def tearDown(self): |
|
54 reap_children() |
|
55 |
|
56 def test_BOOT_TIME(self): |
|
57 s = sysctl('sysctl kern.boottime') |
|
58 s = s[s.find(" sec = ") + 7:] |
|
59 s = s[:s.find(',')] |
|
60 btime = int(s) |
|
61 self.assertEqual(btime, psutil.BOOT_TIME) |
|
62 |
|
63 def test_process_create_time(self): |
|
64 cmdline = "ps -o lstart -p %s" %self.pid |
|
65 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
|
66 output = p.communicate()[0] |
|
67 if PY3: |
|
68 output = str(output, sys.stdout.encoding) |
|
69 start_ps = output.replace('STARTED', '').strip() |
|
70 start_psutil = psutil.Process(self.pid).create_time |
|
71 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
|
72 time.localtime(start_psutil)) |
|
73 self.assertEqual(start_ps, start_psutil) |
|
74 |
|
75 def test_disks(self): |
|
76 # test psutil.disk_usage() and psutil.disk_partitions() |
|
77 # against "df -a" |
|
78 def df(path): |
|
79 out = sh('df -k "%s"' % path).strip() |
|
80 lines = out.split('\n') |
|
81 lines.pop(0) |
|
82 line = lines.pop(0) |
|
83 dev, total, used, free = line.split()[:4] |
|
84 if dev == 'none': |
|
85 dev = '' |
|
86 total = int(total) * 1024 |
|
87 used = int(used) * 1024 |
|
88 free = int(free) * 1024 |
|
89 return dev, total, used, free |
|
90 |
|
91 for part in psutil.disk_partitions(all=False): |
|
92 usage = psutil.disk_usage(part.mountpoint) |
|
93 dev, total, used, free = df(part.mountpoint) |
|
94 self.assertEqual(part.device, dev) |
|
95 self.assertEqual(usage.total, total) |
|
96 # 10 MB tollerance |
|
97 if abs(usage.free - free) > 10 * 1024 * 1024: |
|
98 self.fail("psutil=%s, df=%s" % (usage.free, free)) |
|
99 if abs(usage.used - used) > 10 * 1024 * 1024: |
|
100 self.fail("psutil=%s, df=%s" % (usage.used, used)) |
|
101 |
|
102 def test_memory_maps(self): |
|
103 out = sh('procstat -v %s' % self.pid) |
|
104 maps = psutil.Process(self.pid).get_memory_maps(grouped=False) |
|
105 lines = out.split('\n')[1:] |
|
106 while lines: |
|
107 line = lines.pop() |
|
108 fields = line.split() |
|
109 _, start, stop, perms, res = fields[:5] |
|
110 map = maps.pop() |
|
111 self.assertEqual("%s-%s" % (start, stop), map.addr) |
|
112 self.assertEqual(int(res), map.rss) |
|
113 if not map.path.startswith('['): |
|
114 self.assertEqual(fields[10], map.path) |
|
115 |
|
116 # --- virtual_memory(); tests against sysctl |
|
117 |
|
118 def test_vmem_total(self): |
|
119 syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE |
|
120 self.assertEqual(psutil.virtual_memory().total, syst) |
|
121 |
|
122 @retry_before_failing() |
|
123 def test_vmem_active(self): |
|
124 syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE |
|
125 self.assertAlmostEqual(psutil.virtual_memory().active, syst, |
|
126 delta=TOLERANCE) |
|
127 |
|
128 @retry_before_failing() |
|
129 def test_vmem_inactive(self): |
|
130 syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE |
|
131 self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, |
|
132 delta=TOLERANCE) |
|
133 |
|
134 @retry_before_failing() |
|
135 def test_vmem_wired(self): |
|
136 syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE |
|
137 self.assertAlmostEqual(psutil.virtual_memory().wired, syst, |
|
138 delta=TOLERANCE) |
|
139 |
|
140 @retry_before_failing() |
|
141 def test_vmem_cached(self): |
|
142 syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE |
|
143 self.assertAlmostEqual(psutil.virtual_memory().cached, syst, |
|
144 delta=TOLERANCE) |
|
145 |
|
146 @retry_before_failing() |
|
147 def test_vmem_free(self): |
|
148 syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE |
|
149 self.assertAlmostEqual(psutil.virtual_memory().free, syst, |
|
150 delta=TOLERANCE) |
|
151 |
|
152 @retry_before_failing() |
|
153 def test_vmem_buffers(self): |
|
154 syst = sysctl("vfs.bufspace") |
|
155 self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, |
|
156 delta=TOLERANCE) |
|
157 |
|
158 # --- virtual_memory(); tests against muse |
|
159 |
|
160 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
161 def test_total(self): |
|
162 num = muse('Total') |
|
163 self.assertEqual(psutil.virtual_memory().total, num) |
|
164 |
|
165 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
166 @retry_before_failing() |
|
167 def test_active(self): |
|
168 num = muse('Active') |
|
169 self.assertAlmostEqual(psutil.virtual_memory().active, num, |
|
170 delta=TOLERANCE) |
|
171 |
|
172 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
173 @retry_before_failing() |
|
174 def test_inactive(self): |
|
175 num = muse('Inactive') |
|
176 self.assertAlmostEqual(psutil.virtual_memory().inactive, num, |
|
177 delta=TOLERANCE) |
|
178 |
|
179 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
180 @retry_before_failing() |
|
181 def test_wired(self): |
|
182 num = muse('Wired') |
|
183 self.assertAlmostEqual(psutil.virtual_memory().wired, num, |
|
184 delta=TOLERANCE) |
|
185 |
|
186 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
187 @retry_before_failing() |
|
188 def test_cached(self): |
|
189 num = muse('Cache') |
|
190 self.assertAlmostEqual(psutil.virtual_memory().cached, num, |
|
191 delta=TOLERANCE) |
|
192 |
|
193 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
194 @retry_before_failing() |
|
195 def test_free(self): |
|
196 num = muse('Free') |
|
197 self.assertAlmostEqual(psutil.virtual_memory().free, num, |
|
198 delta=TOLERANCE) |
|
199 |
|
200 @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") |
|
201 @retry_before_failing() |
|
202 def test_buffers(self): |
|
203 num = muse('Buffer') |
|
204 self.assertAlmostEqual(psutil.virtual_memory().buffers, num, |
|
205 delta=TOLERANCE) |
|
206 |
|
207 |
|
208 def test_main(): |
|
209 test_suite = unittest.TestSuite() |
|
210 test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) |
|
211 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
212 return result.wasSuccessful() |
|
213 |
|
214 if __name__ == '__main__': |
|
215 if not test_main(): |
|
216 sys.exit(1) |