|
1 #!/usr/bin/env python |
|
2 |
|
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
|
4 # Use of this source code is governed by a BSD-style license that can be |
|
5 # found in the LICENSE file. |
|
6 |
|
7 """Linux specific tests. These are implicitly run by test_psutil.py.""" |
|
8 |
|
9 from __future__ import division |
|
10 import unittest |
|
11 import subprocess |
|
12 import sys |
|
13 import time |
|
14 import os |
|
15 import re |
|
16 |
|
17 from test_psutil import * |
|
18 from psutil._compat import PY3 |
|
19 import psutil |
|
20 |
|
21 |
|
22 class LinuxSpecificTestCase(unittest.TestCase): |
|
23 |
|
24 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
|
25 reason="os.statvfs() function not available on this platform") |
|
26 @skip_on_not_implemented() |
|
27 def test_disks(self): |
|
28 # test psutil.disk_usage() and psutil.disk_partitions() |
|
29 # against "df -a" |
|
30 def df(path): |
|
31 out = sh('df -P -B 1 "%s"' % path).strip() |
|
32 lines = out.split('\n') |
|
33 lines.pop(0) |
|
34 line = lines.pop(0) |
|
35 dev, total, used, free = line.split()[:4] |
|
36 if dev == 'none': |
|
37 dev = '' |
|
38 total, used, free = int(total), int(used), int(free) |
|
39 return dev, total, used, free |
|
40 |
|
41 for part in psutil.disk_partitions(all=False): |
|
42 usage = psutil.disk_usage(part.mountpoint) |
|
43 dev, total, used, free = df(part.mountpoint) |
|
44 self.assertEqual(part.device, dev) |
|
45 self.assertEqual(usage.total, total) |
|
46 # 10 MB tollerance |
|
47 if abs(usage.free - free) > 10 * 1024 * 1024: |
|
48 self.fail("psutil=%s, df=%s" % (usage.free, free)) |
|
49 if abs(usage.used - used) > 10 * 1024 * 1024: |
|
50 self.fail("psutil=%s, df=%s" % (usage.used, used)) |
|
51 |
|
52 def test_memory_maps(self): |
|
53 sproc = get_test_subprocess() |
|
54 time.sleep(1) |
|
55 p = psutil.Process(sproc.pid) |
|
56 maps = p.get_memory_maps(grouped=False) |
|
57 pmap = sh('pmap -x %s' % p.pid).split('\n') |
|
58 del pmap[0]; del pmap[0] # get rid of header |
|
59 while maps and pmap: |
|
60 this = maps.pop(0) |
|
61 other = pmap.pop(0) |
|
62 addr, _, rss, dirty, mode, path = other.split(None, 5) |
|
63 if not path.startswith('[') and not path.endswith(']'): |
|
64 self.assertEqual(path, os.path.basename(this.path)) |
|
65 self.assertEqual(int(rss) * 1024, this.rss) |
|
66 # test only rwx chars, ignore 's' and 'p' |
|
67 self.assertEqual(mode[:3], this.perms[:3]) |
|
68 |
|
69 def test_vmem_total(self): |
|
70 lines = sh('free').split('\n')[1:] |
|
71 total = int(lines[0].split()[1]) * 1024 |
|
72 self.assertEqual(total, psutil.virtual_memory().total) |
|
73 |
|
74 @retry_before_failing() |
|
75 def test_vmem_used(self): |
|
76 lines = sh('free').split('\n')[1:] |
|
77 used = int(lines[0].split()[2]) * 1024 |
|
78 self.assertAlmostEqual(used, psutil.virtual_memory().used, |
|
79 delta=TOLERANCE) |
|
80 |
|
81 @retry_before_failing() |
|
82 def test_vmem_free(self): |
|
83 lines = sh('free').split('\n')[1:] |
|
84 free = int(lines[0].split()[3]) * 1024 |
|
85 self.assertAlmostEqual(free, psutil.virtual_memory().free, |
|
86 delta=TOLERANCE) |
|
87 |
|
88 @retry_before_failing() |
|
89 def test_vmem_buffers(self): |
|
90 lines = sh('free').split('\n')[1:] |
|
91 buffers = int(lines[0].split()[5]) * 1024 |
|
92 self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, |
|
93 delta=TOLERANCE) |
|
94 |
|
95 @retry_before_failing() |
|
96 def test_vmem_cached(self): |
|
97 lines = sh('free').split('\n')[1:] |
|
98 cached = int(lines[0].split()[6]) * 1024 |
|
99 self.assertAlmostEqual(cached, psutil.virtual_memory().cached, |
|
100 delta=TOLERANCE) |
|
101 |
|
102 def test_swapmem_total(self): |
|
103 lines = sh('free').split('\n')[1:] |
|
104 total = int(lines[2].split()[1]) * 1024 |
|
105 self.assertEqual(total, psutil.swap_memory().total) |
|
106 |
|
107 @retry_before_failing() |
|
108 def test_swapmem_used(self): |
|
109 lines = sh('free').split('\n')[1:] |
|
110 used = int(lines[2].split()[2]) * 1024 |
|
111 self.assertAlmostEqual(used, psutil.swap_memory().used, |
|
112 delta=TOLERANCE) |
|
113 |
|
114 @retry_before_failing() |
|
115 def test_swapmem_free(self): |
|
116 lines = sh('free').split('\n')[1:] |
|
117 free = int(lines[2].split()[3]) * 1024 |
|
118 self.assertAlmostEqual(free, psutil.swap_memory().free, |
|
119 delta=TOLERANCE) |
|
120 |
|
121 def test_cpu_times(self): |
|
122 fields = psutil.cpu_times()._fields |
|
123 kernel_ver = re.findall('\d.\d.\d', os.uname()[2])[0] |
|
124 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) |
|
125 # steal >= 2.6.11 |
|
126 # guest >= 2.6.24 |
|
127 # guest_nice >= 3.2.0 |
|
128 if kernel_ver_info >= (2, 6, 11): |
|
129 self.assertIn('steal', fields) |
|
130 else: |
|
131 self.assertNotIn('steal', fields) |
|
132 if kernel_ver_info >= (2, 6, 24): |
|
133 self.assertIn('guest', fields) |
|
134 else: |
|
135 self.assertNotIn('guest', fields) |
|
136 if kernel_ver_info >= (3, 2, 0): |
|
137 self.assertIn('guest_nice', fields) |
|
138 else: |
|
139 self.assertNotIn('guest_nice', fields) |
|
140 |
|
141 |
|
142 def test_main(): |
|
143 test_suite = unittest.TestSuite() |
|
144 test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) |
|
145 result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
|
146 return result.wasSuccessful() |
|
147 |
|
148 if __name__ == '__main__': |
|
149 if not test_main(): |
|
150 sys.exit(1) |