michael@0: #!/usr/bin/env python michael@0: michael@0: # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. michael@0: # Use of this source code is governed by a BSD-style license that can be michael@0: # found in the LICENSE file. michael@0: michael@0: """ michael@0: A test script which attempts to detect memory leaks by calling C michael@0: functions many times and compare process memory usage before and michael@0: after the calls. It might produce false positives. michael@0: """ michael@0: michael@0: import os michael@0: import gc michael@0: import unittest michael@0: import time michael@0: import socket michael@0: import threading michael@0: import types michael@0: import sys michael@0: michael@0: import psutil michael@0: import psutil._common michael@0: from psutil._compat import PY3, callable, xrange michael@0: from test_psutil import * michael@0: michael@0: # disable cache for Process class properties michael@0: psutil._common.cached_property.enabled = False michael@0: michael@0: LOOPS = 1000 michael@0: TOLERANCE = 4096 michael@0: michael@0: michael@0: class Base(unittest.TestCase): michael@0: michael@0: proc = psutil.Process(os.getpid()) michael@0: michael@0: def execute(self, function, *args, **kwargs): michael@0: def call_many_times(): michael@0: for x in xrange(LOOPS - 1): michael@0: self.call(function, *args, **kwargs) michael@0: del x michael@0: gc.collect() michael@0: return self.get_mem() michael@0: michael@0: self.call(function, *args, **kwargs) michael@0: self.assertEqual(gc.garbage, []) michael@0: self.assertEqual(threading.active_count(), 1) michael@0: michael@0: # RSS comparison michael@0: # step 1 michael@0: rss1 = call_many_times() michael@0: # step 2 michael@0: rss2 = call_many_times() michael@0: michael@0: difference = rss2 - rss1 michael@0: if difference > TOLERANCE: michael@0: # This doesn't necessarily mean we have a leak yet. michael@0: # At this point we assume that after having called the michael@0: # function so many times the memory usage is stabilized michael@0: # and if there are no leaks it should not increase any michael@0: # more. michael@0: # Let's keep calling fun for 3 more seconds and fail if michael@0: # we notice any difference. michael@0: stop_at = time.time() + 3 michael@0: while 1: michael@0: self.call(function, *args, **kwargs) michael@0: if time.time() >= stop_at: michael@0: break michael@0: del stop_at michael@0: gc.collect() michael@0: rss3 = self.get_mem() michael@0: difference = rss3 - rss2 michael@0: if rss3 > rss2: michael@0: self.fail("rss2=%s, rss3=%s, difference=%s" \ michael@0: % (rss2, rss3, difference)) michael@0: michael@0: def get_mem(self): michael@0: return psutil.Process(os.getpid()).get_memory_info()[0] michael@0: michael@0: def call(self, *args, **kwargs): michael@0: raise NotImplementedError("must be implemented in subclass") michael@0: michael@0: michael@0: class TestProcessObjectLeaks(Base): michael@0: """Test leaks of Process class methods and properties""" michael@0: michael@0: def __init__(self, *args, **kwargs): michael@0: Base.__init__(self, *args, **kwargs) michael@0: # skip tests which are not supported by Process API michael@0: supported_attrs = dir(psutil.Process) michael@0: for attr in [x for x in dir(self) if x.startswith('test')]: michael@0: if attr[5:] not in supported_attrs: michael@0: meth = getattr(self, attr) michael@0: name = meth.__func__.__name__.replace('test_', '') michael@0: @unittest.skipIf(True, michael@0: "%s not supported on this platform" % name) michael@0: def test_(self): michael@0: pass michael@0: setattr(self, attr, types.MethodType(test_, self)) michael@0: michael@0: def setUp(self): michael@0: gc.collect() michael@0: michael@0: def tearDown(self): michael@0: reap_children() michael@0: michael@0: def call(self, function, *args, **kwargs): michael@0: try: michael@0: obj = getattr(self.proc, function) michael@0: if callable(obj): michael@0: obj(*args, **kwargs) michael@0: except psutil.Error: michael@0: pass michael@0: michael@0: def test_name(self): michael@0: self.execute('name') michael@0: michael@0: def test_cmdline(self): michael@0: self.execute('cmdline') michael@0: michael@0: def test_exe(self): michael@0: self.execute('exe') michael@0: michael@0: def test_ppid(self): michael@0: self.execute('ppid') michael@0: michael@0: def test_uids(self): michael@0: self.execute('uids') michael@0: michael@0: def test_gids(self): michael@0: self.execute('gids') michael@0: michael@0: def test_status(self): michael@0: self.execute('status') michael@0: michael@0: def test_get_nice(self): michael@0: self.execute('get_nice') michael@0: michael@0: def test_set_nice(self): michael@0: niceness = psutil.Process(os.getpid()).get_nice() michael@0: self.execute('set_nice', niceness) michael@0: michael@0: def test_get_io_counters(self): michael@0: self.execute('get_io_counters') michael@0: michael@0: def test_get_ionice(self): michael@0: self.execute('get_ionice') michael@0: michael@0: def test_set_ionice(self): michael@0: if WINDOWS: michael@0: value = psutil.Process(os.getpid()).get_ionice() michael@0: self.execute('set_ionice', value) michael@0: else: michael@0: self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE) michael@0: michael@0: def test_username(self): michael@0: self.execute('username') michael@0: michael@0: def test_create_time(self): michael@0: self.execute('create_time') michael@0: michael@0: def test_get_num_threads(self): michael@0: self.execute('get_num_threads') michael@0: michael@0: def test_get_num_handles(self): michael@0: self.execute('get_num_handles') michael@0: michael@0: def test_get_num_fds(self): michael@0: self.execute('get_num_fds') michael@0: michael@0: def test_get_threads(self): michael@0: self.execute('get_threads') michael@0: michael@0: def test_get_cpu_times(self): michael@0: self.execute('get_cpu_times') michael@0: michael@0: def test_get_memory_info(self): michael@0: self.execute('get_memory_info') michael@0: michael@0: def test_get_ext_memory_info(self): michael@0: self.execute('get_ext_memory_info') michael@0: michael@0: def test_terminal(self): michael@0: self.execute('terminal') michael@0: michael@0: @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") michael@0: def test_resume(self): michael@0: self.execute('resume') michael@0: michael@0: def test_getcwd(self): michael@0: self.execute('getcwd') michael@0: michael@0: def test_get_cpu_affinity(self): michael@0: self.execute('get_cpu_affinity') michael@0: michael@0: def test_set_cpu_affinity(self): michael@0: affinity = psutil.Process(os.getpid()).get_cpu_affinity() michael@0: self.execute('set_cpu_affinity', affinity) michael@0: michael@0: def test_get_open_files(self): michael@0: safe_remove(TESTFN) # needed after UNIX socket test has run michael@0: f = open(TESTFN, 'w') michael@0: try: michael@0: self.execute('get_open_files') michael@0: finally: michael@0: f.close() michael@0: michael@0: # OSX implementation is unbelievably slow michael@0: @unittest.skipIf(OSX, "OSX implementation is too slow") michael@0: def test_get_memory_maps(self): michael@0: self.execute('get_memory_maps') michael@0: michael@0: # Linux implementation is pure python so since it's slow we skip it michael@0: @unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)") michael@0: def test_get_connections(self): michael@0: def create_socket(family, type): michael@0: sock = socket.socket(family, type) michael@0: sock.bind(('', 0)) michael@0: if type == socket.SOCK_STREAM: michael@0: sock.listen(1) michael@0: return sock michael@0: michael@0: socks = [] michael@0: socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM)) michael@0: socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM)) michael@0: if supports_ipv6(): michael@0: socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM)) michael@0: socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM)) michael@0: if hasattr(socket, 'AF_UNIX'): michael@0: safe_remove(TESTFN) michael@0: s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) michael@0: s.bind(TESTFN) michael@0: s.listen(1) michael@0: socks.append(s) michael@0: kind = 'all' michael@0: # TODO: UNIX sockets are temporarily implemented by parsing michael@0: # 'pfiles' cmd output; we don't want that part of the code to michael@0: # be executed. michael@0: if SUNOS: michael@0: kind = 'inet' michael@0: try: michael@0: self.execute('get_connections', kind=kind) michael@0: finally: michael@0: for s in socks: michael@0: s.close() michael@0: michael@0: michael@0: p = get_test_subprocess() michael@0: DEAD_PROC = psutil.Process(p.pid) michael@0: DEAD_PROC.kill() michael@0: DEAD_PROC.wait() michael@0: del p michael@0: michael@0: class TestProcessObjectLeaksZombie(TestProcessObjectLeaks): michael@0: """Same as above but looks for leaks occurring when dealing with michael@0: zombie processes raising NoSuchProcess exception. michael@0: """ michael@0: proc = DEAD_PROC michael@0: michael@0: if not POSIX: michael@0: def test_kill(self): michael@0: self.execute('kill') michael@0: michael@0: def test_terminate(self): michael@0: self.execute('terminate') michael@0: michael@0: def test_suspend(self): michael@0: self.execute('suspend') michael@0: michael@0: def test_resume(self): michael@0: self.execute('resume') michael@0: michael@0: def test_wait(self): michael@0: self.execute('wait') michael@0: michael@0: michael@0: class TestModuleFunctionsLeaks(Base): michael@0: """Test leaks of psutil module functions.""" michael@0: michael@0: def setUp(self): michael@0: gc.collect() michael@0: michael@0: def call(self, function, *args, **kwargs): michael@0: obj = getattr(psutil, function) michael@0: if callable(obj): michael@0: retvalue = obj(*args, **kwargs) michael@0: michael@0: @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") michael@0: def test_pid_exists(self): michael@0: self.execute('pid_exists', os.getpid()) michael@0: michael@0: def test_virtual_memory(self): michael@0: self.execute('virtual_memory') michael@0: michael@0: # TODO: remove this skip when this gets fixed michael@0: @unittest.skipIf(SUNOS, michael@0: "not worth being tested on SUNOS (uses a subprocess)") michael@0: def test_swap_memory(self): michael@0: self.execute('swap_memory') michael@0: michael@0: def test_cpu_times(self): michael@0: self.execute('cpu_times') michael@0: michael@0: def test_per_cpu_times(self): michael@0: self.execute('cpu_times', percpu=True) michael@0: michael@0: @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") michael@0: def test_disk_usage(self): michael@0: self.execute('disk_usage', '.') michael@0: michael@0: def test_disk_partitions(self): michael@0: self.execute('disk_partitions') michael@0: michael@0: def test_net_io_counters(self): michael@0: self.execute('net_io_counters') michael@0: michael@0: def test_disk_io_counters(self): michael@0: self.execute('disk_io_counters') michael@0: michael@0: # XXX - on Windows this produces a false positive michael@0: @unittest.skipIf(WINDOWS, michael@0: "XXX produces a false positive on Windows") michael@0: def test_get_users(self): michael@0: self.execute('get_users') michael@0: michael@0: michael@0: def test_main(): michael@0: test_suite = unittest.TestSuite() michael@0: tests = [TestProcessObjectLeaksZombie, michael@0: TestProcessObjectLeaks, michael@0: TestModuleFunctionsLeaks,] michael@0: for test in tests: michael@0: test_suite.addTest(unittest.makeSuite(test)) michael@0: result = unittest.TextTestRunner(verbosity=2).run(test_suite) michael@0: return result.wasSuccessful() michael@0: michael@0: if __name__ == '__main__': michael@0: if not test_main(): michael@0: sys.exit(1)