Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
michael@0 | 1 | #!/usr/bin/env python |
michael@0 | 2 | |
michael@0 | 3 | # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
michael@0 | 4 | # Use of this source code is governed by a BSD-style license that can be |
michael@0 | 5 | # found in the LICENSE file. |
michael@0 | 6 | |
michael@0 | 7 | """Linux specific tests. These are implicitly run by test_psutil.py.""" |
michael@0 | 8 | |
michael@0 | 9 | from __future__ import division |
michael@0 | 10 | import unittest |
michael@0 | 11 | import subprocess |
michael@0 | 12 | import sys |
michael@0 | 13 | import time |
michael@0 | 14 | import os |
michael@0 | 15 | import re |
michael@0 | 16 | |
michael@0 | 17 | from test_psutil import * |
michael@0 | 18 | from psutil._compat import PY3 |
michael@0 | 19 | import psutil |
michael@0 | 20 | |
michael@0 | 21 | |
michael@0 | 22 | class LinuxSpecificTestCase(unittest.TestCase): |
michael@0 | 23 | |
michael@0 | 24 | @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
michael@0 | 25 | reason="os.statvfs() function not available on this platform") |
michael@0 | 26 | @skip_on_not_implemented() |
michael@0 | 27 | def test_disks(self): |
michael@0 | 28 | # test psutil.disk_usage() and psutil.disk_partitions() |
michael@0 | 29 | # against "df -a" |
michael@0 | 30 | def df(path): |
michael@0 | 31 | out = sh('df -P -B 1 "%s"' % path).strip() |
michael@0 | 32 | lines = out.split('\n') |
michael@0 | 33 | lines.pop(0) |
michael@0 | 34 | line = lines.pop(0) |
michael@0 | 35 | dev, total, used, free = line.split()[:4] |
michael@0 | 36 | if dev == 'none': |
michael@0 | 37 | dev = '' |
michael@0 | 38 | total, used, free = int(total), int(used), int(free) |
michael@0 | 39 | return dev, total, used, free |
michael@0 | 40 | |
michael@0 | 41 | for part in psutil.disk_partitions(all=False): |
michael@0 | 42 | usage = psutil.disk_usage(part.mountpoint) |
michael@0 | 43 | dev, total, used, free = df(part.mountpoint) |
michael@0 | 44 | self.assertEqual(part.device, dev) |
michael@0 | 45 | self.assertEqual(usage.total, total) |
michael@0 | 46 | # 10 MB tollerance |
michael@0 | 47 | if abs(usage.free - free) > 10 * 1024 * 1024: |
michael@0 | 48 | self.fail("psutil=%s, df=%s" % (usage.free, free)) |
michael@0 | 49 | if abs(usage.used - used) > 10 * 1024 * 1024: |
michael@0 | 50 | self.fail("psutil=%s, df=%s" % (usage.used, used)) |
michael@0 | 51 | |
michael@0 | 52 | def test_memory_maps(self): |
michael@0 | 53 | sproc = get_test_subprocess() |
michael@0 | 54 | time.sleep(1) |
michael@0 | 55 | p = psutil.Process(sproc.pid) |
michael@0 | 56 | maps = p.get_memory_maps(grouped=False) |
michael@0 | 57 | pmap = sh('pmap -x %s' % p.pid).split('\n') |
michael@0 | 58 | del pmap[0]; del pmap[0] # get rid of header |
michael@0 | 59 | while maps and pmap: |
michael@0 | 60 | this = maps.pop(0) |
michael@0 | 61 | other = pmap.pop(0) |
michael@0 | 62 | addr, _, rss, dirty, mode, path = other.split(None, 5) |
michael@0 | 63 | if not path.startswith('[') and not path.endswith(']'): |
michael@0 | 64 | self.assertEqual(path, os.path.basename(this.path)) |
michael@0 | 65 | self.assertEqual(int(rss) * 1024, this.rss) |
michael@0 | 66 | # test only rwx chars, ignore 's' and 'p' |
michael@0 | 67 | self.assertEqual(mode[:3], this.perms[:3]) |
michael@0 | 68 | |
michael@0 | 69 | def test_vmem_total(self): |
michael@0 | 70 | lines = sh('free').split('\n')[1:] |
michael@0 | 71 | total = int(lines[0].split()[1]) * 1024 |
michael@0 | 72 | self.assertEqual(total, psutil.virtual_memory().total) |
michael@0 | 73 | |
michael@0 | 74 | @retry_before_failing() |
michael@0 | 75 | def test_vmem_used(self): |
michael@0 | 76 | lines = sh('free').split('\n')[1:] |
michael@0 | 77 | used = int(lines[0].split()[2]) * 1024 |
michael@0 | 78 | self.assertAlmostEqual(used, psutil.virtual_memory().used, |
michael@0 | 79 | delta=TOLERANCE) |
michael@0 | 80 | |
michael@0 | 81 | @retry_before_failing() |
michael@0 | 82 | def test_vmem_free(self): |
michael@0 | 83 | lines = sh('free').split('\n')[1:] |
michael@0 | 84 | free = int(lines[0].split()[3]) * 1024 |
michael@0 | 85 | self.assertAlmostEqual(free, psutil.virtual_memory().free, |
michael@0 | 86 | delta=TOLERANCE) |
michael@0 | 87 | |
michael@0 | 88 | @retry_before_failing() |
michael@0 | 89 | def test_vmem_buffers(self): |
michael@0 | 90 | lines = sh('free').split('\n')[1:] |
michael@0 | 91 | buffers = int(lines[0].split()[5]) * 1024 |
michael@0 | 92 | self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, |
michael@0 | 93 | delta=TOLERANCE) |
michael@0 | 94 | |
michael@0 | 95 | @retry_before_failing() |
michael@0 | 96 | def test_vmem_cached(self): |
michael@0 | 97 | lines = sh('free').split('\n')[1:] |
michael@0 | 98 | cached = int(lines[0].split()[6]) * 1024 |
michael@0 | 99 | self.assertAlmostEqual(cached, psutil.virtual_memory().cached, |
michael@0 | 100 | delta=TOLERANCE) |
michael@0 | 101 | |
michael@0 | 102 | def test_swapmem_total(self): |
michael@0 | 103 | lines = sh('free').split('\n')[1:] |
michael@0 | 104 | total = int(lines[2].split()[1]) * 1024 |
michael@0 | 105 | self.assertEqual(total, psutil.swap_memory().total) |
michael@0 | 106 | |
michael@0 | 107 | @retry_before_failing() |
michael@0 | 108 | def test_swapmem_used(self): |
michael@0 | 109 | lines = sh('free').split('\n')[1:] |
michael@0 | 110 | used = int(lines[2].split()[2]) * 1024 |
michael@0 | 111 | self.assertAlmostEqual(used, psutil.swap_memory().used, |
michael@0 | 112 | delta=TOLERANCE) |
michael@0 | 113 | |
michael@0 | 114 | @retry_before_failing() |
michael@0 | 115 | def test_swapmem_free(self): |
michael@0 | 116 | lines = sh('free').split('\n')[1:] |
michael@0 | 117 | free = int(lines[2].split()[3]) * 1024 |
michael@0 | 118 | self.assertAlmostEqual(free, psutil.swap_memory().free, |
michael@0 | 119 | delta=TOLERANCE) |
michael@0 | 120 | |
michael@0 | 121 | def test_cpu_times(self): |
michael@0 | 122 | fields = psutil.cpu_times()._fields |
michael@0 | 123 | kernel_ver = re.findall('\d.\d.\d', os.uname()[2])[0] |
michael@0 | 124 | kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) |
michael@0 | 125 | # steal >= 2.6.11 |
michael@0 | 126 | # guest >= 2.6.24 |
michael@0 | 127 | # guest_nice >= 3.2.0 |
michael@0 | 128 | if kernel_ver_info >= (2, 6, 11): |
michael@0 | 129 | self.assertIn('steal', fields) |
michael@0 | 130 | else: |
michael@0 | 131 | self.assertNotIn('steal', fields) |
michael@0 | 132 | if kernel_ver_info >= (2, 6, 24): |
michael@0 | 133 | self.assertIn('guest', fields) |
michael@0 | 134 | else: |
michael@0 | 135 | self.assertNotIn('guest', fields) |
michael@0 | 136 | if kernel_ver_info >= (3, 2, 0): |
michael@0 | 137 | self.assertIn('guest_nice', fields) |
michael@0 | 138 | else: |
michael@0 | 139 | self.assertNotIn('guest_nice', fields) |
michael@0 | 140 | |
michael@0 | 141 | |
michael@0 | 142 | def test_main(): |
michael@0 | 143 | test_suite = unittest.TestSuite() |
michael@0 | 144 | test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) |
michael@0 | 145 | result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
michael@0 | 146 | return result.wasSuccessful() |
michael@0 | 147 | |
michael@0 | 148 | if __name__ == '__main__': |
michael@0 | 149 | if not test_main(): |
michael@0 | 150 | sys.exit(1) |