|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """OSX specific tests. These are implicitly run by test_psutil.py.""" |
|
8 |
|
9 import unittest |
|
10 import subprocess |
|
11 import time |
|
12 import sys |
|
13 import os |
|
14 import re |
|
15 |
|
16 import psutil |
|
17 |
|
18 from psutil._compat import PY3 |
|
19 from test_psutil import * |
|
20 |
|
21 |
|
22 PAGESIZE = os.sysconf("SC_PAGE_SIZE") |
|
23 |
|
24 |
|
25 def sysctl(cmdline): |
|
26 """Expects a sysctl command with an argument and parse the result |
|
27 returning only the value of interest. |
|
28 """ |
|
29 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
|
30 result = p.communicate()[0].strip().split()[1] |
|
31 if PY3: |
|
32 result = str(result, sys.stdout.encoding) |
|
33 try: |
|
34 return int(result) |
|
35 except ValueError: |
|
36 return result |
|
37 |
|
38 def vm_stat(field): |
|
39 """Wrapper around 'vm_stat' cmdline utility.""" |
|
40 out = sh('vm_stat') |
|
41 for line in out.split('\n'): |
|
42 if field in line: |
|
43 break |
|
44 else: |
|
45 raise ValueError("line not found") |
|
46 return int(re.search('\d+', line).group(0)) * PAGESIZE |
|
47 |
|
48 |
|
49 class OSXSpecificTestCase(unittest.TestCase): |
|
50 |
|
51 def setUp(self): |
|
52 self.pid = get_test_subprocess().pid |
|
53 |
|
54 def tearDown(self): |
|
55 reap_children() |
|
56 |
|
57 def test_process_create_time(self): |
|
58 cmdline = "ps -o lstart -p %s" %self.pid |
|
59 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
|
60 output = p.communicate()[0] |
|
61 if PY3: |
|
62 output = str(output, sys.stdout.encoding) |
|
63 start_ps = output.replace('STARTED', '').strip() |
|
64 start_psutil = psutil.Process(self.pid).create_time |
|
65 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
|
66 time.localtime(start_psutil)) |
|
67 self.assertEqual(start_ps, start_psutil) |
|
68 |
|
69 def test_disks(self): |
|
70 # test psutil.disk_usage() and psutil.disk_partitions() |
|
71 # against "df -a" |
|
72 def df(path): |
|
73 out = sh('df -k "%s"' % path).strip() |
|
74 lines = out.split('\n') |
|
75 lines.pop(0) |
|
76 line = lines.pop(0) |
|
77 dev, total, used, free = line.split()[:4] |
|
78 if dev == 'none': |
|
79 dev = '' |
|
80 total = int(total) * 1024 |
|
81 used = int(used) * 1024 |
|
82 free = int(free) * 1024 |
|
83 return dev, total, used, free |
|
84 |
|
85 for part in psutil.disk_partitions(all=False): |
|
86 usage = psutil.disk_usage(part.mountpoint) |
|
87 dev, total, used, free = df(part.mountpoint) |
|
88 self.assertEqual(part.device, dev) |
|
89 self.assertEqual(usage.total, total) |
|
90 # 10 MB tollerance |
|
91 if abs(usage.free - free) > 10 * 1024 * 1024: |
|
92 self.fail("psutil=%s, df=%s" % usage.free, free) |
|
93 if abs(usage.used - used) > 10 * 1024 * 1024: |
|
94 self.fail("psutil=%s, df=%s" % usage.used, used) |
|
95 |
|
96 # --- virtual mem |
|
97 |
|
98 def test_vmem_total(self): |
|
99 sysctl_hwphymem = sysctl('sysctl hw.memsize') |
|
100 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) |
|
101 |
|
102 @retry_before_failing() |
|
103 def test_vmem_free(self): |
|
104 num = vm_stat("free") |
|
105 self.assertAlmostEqual(psutil.virtual_memory().free, num, |
|
106 delta=TOLERANCE) |
|
107 |
|
108 @retry_before_failing() |
|
109 def test_vmem_active(self): |
|
110 num = vm_stat("active") |
|
111 self.assertAlmostEqual(psutil.virtual_memory().active, num, |
|
112 delta=TOLERANCE) |
|
113 |
|
114 @retry_before_failing() |
|
115 def test_vmem_inactive(self): |
|
116 num = vm_stat("inactive") |
|
117 self.assertAlmostEqual(psutil.virtual_memory().inactive, num, |
|
118 delta=TOLERANCE) |
|
119 |
|
120 @retry_before_failing() |
|
121 def test_vmem_wired(self): |
|
122 num = vm_stat("wired") |
|
123 self.assertAlmostEqual(psutil.virtual_memory().wired, num, |
|
124 delta=TOLERANCE) |
|
125 |
|
126 # --- swap mem |
|
127 |
|
128 def test_swapmem_sin(self): |
|
129 num = vm_stat("Pageins") |
|
130 self.assertEqual(psutil.swap_memory().sin, num) |
|
131 |
|
132 def test_swapmem_sout(self): |
|
133 num = vm_stat("Pageouts") |
|
134 self.assertEqual(psutil.swap_memory().sout, num) |
|
135 |
|
136 def test_swapmem_total(self): |
|
137 tot1 = psutil.swap_memory().total |
|
138 tot2 = 0 |
|
139 # OSX uses multiple cache files: |
|
140 # http://en.wikipedia.org/wiki/Paging#OS_X |
|
141 for name in os.listdir("/var/vm/"): |
|
142 file = os.path.join("/var/vm", name) |
|
143 if os.path.isfile(file): |
|
144 tot2 += os.path.getsize(file) |
|
145 self.assertEqual(tot1, tot2) |
|
146 |
|
147 |
|
148 def test_main(): |
|
149 test_suite = unittest.TestSuite() |
|
150 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) |
|
151 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
152 return result.wasSuccessful() |
|
153 |
|
154 if __name__ == '__main__': |
|
155 if not test_main(): |
|
156 sys.exit(1) |