michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """Linux specific tests. These are implicitly run by test_psutil.py.""" michael@0: michael@0: from __future__ import division michael@0: import unittest michael@0: import subprocess michael@0: import sys michael@0: import time michael@0: import os michael@0: import re michael@0: michael@0: from test_psutil import * michael@0: from psutil._compat import PY3 michael@0: import psutil michael@0: michael@0: michael@0: class LinuxSpecificTestCase(unittest.TestCase): michael@0: michael@0: @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), michael@0: reason="os.statvfs() function not available on this platform") michael@0: @skip_on_not_implemented() michael@0: def test_disks(self): michael@0: # test psutil.disk_usage() and psutil.disk_partitions() michael@0: # against "df -a" michael@0: def df(path): michael@0: out = sh('df -P -B 1 "%s"' % path).strip() michael@0: lines = out.split('\n') michael@0: lines.pop(0) michael@0: line = lines.pop(0) michael@0: dev, total, used, free = line.split()[:4] michael@0: if dev == 'none': michael@0: dev = '' michael@0: total, used, free = int(total), int(used), int(free) michael@0: return dev, total, used, free michael@0: michael@0: for part in psutil.disk_partitions(all=False): michael@0: usage = psutil.disk_usage(part.mountpoint) michael@0: dev, total, used, free = df(part.mountpoint) michael@0: self.assertEqual(part.device, dev) michael@0: self.assertEqual(usage.total, total) michael@0: # 10 MB tollerance michael@0: if abs(usage.free - free) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % (usage.free, free)) michael@0: if abs(usage.used - used) > 10 * 1024 * 1024: michael@0: self.fail("psutil=%s, df=%s" % (usage.used, used)) michael@0: michael@0: def test_memory_maps(self): michael@0: sproc = get_test_subprocess() michael@0: time.sleep(1) michael@0: p = psutil.Process(sproc.pid) michael@0: maps = p.get_memory_maps(grouped=False) michael@0: pmap = sh('pmap -x %s' % p.pid).split('\n') michael@0: del pmap[0]; del pmap[0] # get rid of header michael@0: while maps and pmap: michael@0: this = maps.pop(0) michael@0: other = pmap.pop(0) michael@0: addr, _, rss, dirty, mode, path = other.split(None, 5) michael@0: if not path.startswith('[') and not path.endswith(']'): michael@0: self.assertEqual(path, os.path.basename(this.path)) michael@0: self.assertEqual(int(rss) * 1024, this.rss) michael@0: # test only rwx chars, ignore 's' and 'p' michael@0: self.assertEqual(mode[:3], this.perms[:3]) michael@0: michael@0: def test_vmem_total(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: total = int(lines[0].split()[1]) * 1024 michael@0: self.assertEqual(total, psutil.virtual_memory().total) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_used(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: used = int(lines[0].split()[2]) * 1024 michael@0: self.assertAlmostEqual(used, psutil.virtual_memory().used, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_free(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: free = int(lines[0].split()[3]) * 1024 michael@0: self.assertAlmostEqual(free, psutil.virtual_memory().free, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_buffers(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: buffers = int(lines[0].split()[5]) * 1024 michael@0: self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_vmem_cached(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: cached = int(lines[0].split()[6]) * 1024 michael@0: self.assertAlmostEqual(cached, psutil.virtual_memory().cached, michael@0: delta=TOLERANCE) michael@0: michael@0: def test_swapmem_total(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: total = int(lines[2].split()[1]) * 1024 michael@0: self.assertEqual(total, psutil.swap_memory().total) michael@0: michael@0: @retry_before_failing() michael@0: def test_swapmem_used(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: used = int(lines[2].split()[2]) * 1024 michael@0: self.assertAlmostEqual(used, psutil.swap_memory().used, michael@0: delta=TOLERANCE) michael@0: michael@0: @retry_before_failing() michael@0: def test_swapmem_free(self): michael@0: lines = sh('free').split('\n')[1:] michael@0: free = int(lines[2].split()[3]) * 1024 michael@0: self.assertAlmostEqual(free, psutil.swap_memory().free, michael@0: delta=TOLERANCE) michael@0: michael@0: def test_cpu_times(self): michael@0: fields = psutil.cpu_times()._fields michael@0: kernel_ver = re.findall('\d.\d.\d', os.uname()[2])[0] michael@0: kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) michael@0: # steal >= 2.6.11 michael@0: # guest >= 2.6.24 michael@0: # guest_nice >= 3.2.0 michael@0: if kernel_ver_info >= (2, 6, 11): michael@0: self.assertIn('steal', fields) michael@0: else: michael@0: self.assertNotIn('steal', fields) michael@0: if kernel_ver_info >= (2, 6, 24): michael@0: self.assertIn('guest', fields) michael@0: else: michael@0: self.assertNotIn('guest', fields) michael@0: if kernel_ver_info >= (3, 2, 0): michael@0: self.assertIn('guest_nice', fields) michael@0: else: michael@0: self.assertNotIn('guest_nice', fields) michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)