1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/python/psutil/test/test_memory_leaks.py Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,340 @@ 1.4 +#!/usr/bin/env python 1.5 + 1.6 +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 1.7 +# Use of this source code is governed by a BSD-style license that can be 1.8 +# found in the LICENSE file. 1.9 + 1.10 +""" 1.11 +A test script which attempts to detect memory leaks by calling C 1.12 +functions many times and compare process memory usage before and 1.13 +after the calls. It might produce false positives. 1.14 +""" 1.15 + 1.16 +import os 1.17 +import gc 1.18 +import unittest 1.19 +import time 1.20 +import socket 1.21 +import threading 1.22 +import types 1.23 +import sys 1.24 + 1.25 +import psutil 1.26 +import psutil._common 1.27 +from psutil._compat import PY3, callable, xrange 1.28 +from test_psutil import * 1.29 + 1.30 +# disable cache for Process class properties 1.31 +psutil._common.cached_property.enabled = False 1.32 + 1.33 +LOOPS = 1000 1.34 +TOLERANCE = 4096 1.35 + 1.36 + 1.37 +class Base(unittest.TestCase): 1.38 + 1.39 + proc = psutil.Process(os.getpid()) 1.40 + 1.41 + def execute(self, function, *args, **kwargs): 1.42 + def call_many_times(): 1.43 + for x in xrange(LOOPS - 1): 1.44 + self.call(function, *args, **kwargs) 1.45 + del x 1.46 + gc.collect() 1.47 + return self.get_mem() 1.48 + 1.49 + self.call(function, *args, **kwargs) 1.50 + self.assertEqual(gc.garbage, []) 1.51 + self.assertEqual(threading.active_count(), 1) 1.52 + 1.53 + # RSS comparison 1.54 + # step 1 1.55 + rss1 = call_many_times() 1.56 + # step 2 1.57 + rss2 = call_many_times() 1.58 + 1.59 + difference = rss2 - rss1 1.60 + if difference > TOLERANCE: 1.61 + # This doesn't necessarily mean we have a leak yet. 1.62 + # At this point we assume that after having called the 1.63 + # function so many times the memory usage is stabilized 1.64 + # and if there are no leaks it should not increase any 1.65 + # more. 1.66 + # Let's keep calling fun for 3 more seconds and fail if 1.67 + # we notice any difference. 1.68 + stop_at = time.time() + 3 1.69 + while 1: 1.70 + self.call(function, *args, **kwargs) 1.71 + if time.time() >= stop_at: 1.72 + break 1.73 + del stop_at 1.74 + gc.collect() 1.75 + rss3 = self.get_mem() 1.76 + difference = rss3 - rss2 1.77 + if rss3 > rss2: 1.78 + self.fail("rss2=%s, rss3=%s, difference=%s" \ 1.79 + % (rss2, rss3, difference)) 1.80 + 1.81 + def get_mem(self): 1.82 + return psutil.Process(os.getpid()).get_memory_info()[0] 1.83 + 1.84 + def call(self, *args, **kwargs): 1.85 + raise NotImplementedError("must be implemented in subclass") 1.86 + 1.87 + 1.88 +class TestProcessObjectLeaks(Base): 1.89 + """Test leaks of Process class methods and properties""" 1.90 + 1.91 + def __init__(self, *args, **kwargs): 1.92 + Base.__init__(self, *args, **kwargs) 1.93 + # skip tests which are not supported by Process API 1.94 + supported_attrs = dir(psutil.Process) 1.95 + for attr in [x for x in dir(self) if x.startswith('test')]: 1.96 + if attr[5:] not in supported_attrs: 1.97 + meth = getattr(self, attr) 1.98 + name = meth.__func__.__name__.replace('test_', '') 1.99 + @unittest.skipIf(True, 1.100 + "%s not supported on this platform" % name) 1.101 + def test_(self): 1.102 + pass 1.103 + setattr(self, attr, types.MethodType(test_, self)) 1.104 + 1.105 + def setUp(self): 1.106 + gc.collect() 1.107 + 1.108 + def tearDown(self): 1.109 + reap_children() 1.110 + 1.111 + def call(self, function, *args, **kwargs): 1.112 + try: 1.113 + obj = getattr(self.proc, function) 1.114 + if callable(obj): 1.115 + obj(*args, **kwargs) 1.116 + except psutil.Error: 1.117 + pass 1.118 + 1.119 + def test_name(self): 1.120 + self.execute('name') 1.121 + 1.122 + def test_cmdline(self): 1.123 + self.execute('cmdline') 1.124 + 1.125 + def test_exe(self): 1.126 + self.execute('exe') 1.127 + 1.128 + def test_ppid(self): 1.129 + self.execute('ppid') 1.130 + 1.131 + def test_uids(self): 1.132 + self.execute('uids') 1.133 + 1.134 + def test_gids(self): 1.135 + self.execute('gids') 1.136 + 1.137 + def test_status(self): 1.138 + self.execute('status') 1.139 + 1.140 + def test_get_nice(self): 1.141 + self.execute('get_nice') 1.142 + 1.143 + def test_set_nice(self): 1.144 + niceness = psutil.Process(os.getpid()).get_nice() 1.145 + self.execute('set_nice', niceness) 1.146 + 1.147 + def test_get_io_counters(self): 1.148 + self.execute('get_io_counters') 1.149 + 1.150 + def test_get_ionice(self): 1.151 + self.execute('get_ionice') 1.152 + 1.153 + def test_set_ionice(self): 1.154 + if WINDOWS: 1.155 + value = psutil.Process(os.getpid()).get_ionice() 1.156 + self.execute('set_ionice', value) 1.157 + else: 1.158 + self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE) 1.159 + 1.160 + def test_username(self): 1.161 + self.execute('username') 1.162 + 1.163 + def test_create_time(self): 1.164 + self.execute('create_time') 1.165 + 1.166 + def test_get_num_threads(self): 1.167 + self.execute('get_num_threads') 1.168 + 1.169 + def test_get_num_handles(self): 1.170 + self.execute('get_num_handles') 1.171 + 1.172 + def test_get_num_fds(self): 1.173 + self.execute('get_num_fds') 1.174 + 1.175 + def test_get_threads(self): 1.176 + self.execute('get_threads') 1.177 + 1.178 + def test_get_cpu_times(self): 1.179 + self.execute('get_cpu_times') 1.180 + 1.181 + def test_get_memory_info(self): 1.182 + self.execute('get_memory_info') 1.183 + 1.184 + def test_get_ext_memory_info(self): 1.185 + self.execute('get_ext_memory_info') 1.186 + 1.187 + def test_terminal(self): 1.188 + self.execute('terminal') 1.189 + 1.190 + @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") 1.191 + def test_resume(self): 1.192 + self.execute('resume') 1.193 + 1.194 + def test_getcwd(self): 1.195 + self.execute('getcwd') 1.196 + 1.197 + def test_get_cpu_affinity(self): 1.198 + self.execute('get_cpu_affinity') 1.199 + 1.200 + def test_set_cpu_affinity(self): 1.201 + affinity = psutil.Process(os.getpid()).get_cpu_affinity() 1.202 + self.execute('set_cpu_affinity', affinity) 1.203 + 1.204 + def test_get_open_files(self): 1.205 + safe_remove(TESTFN) # needed after UNIX socket test has run 1.206 + f = open(TESTFN, 'w') 1.207 + try: 1.208 + self.execute('get_open_files') 1.209 + finally: 1.210 + f.close() 1.211 + 1.212 + # OSX implementation is unbelievably slow 1.213 + @unittest.skipIf(OSX, "OSX implementation is too slow") 1.214 + def test_get_memory_maps(self): 1.215 + self.execute('get_memory_maps') 1.216 + 1.217 + # Linux implementation is pure python so since it's slow we skip it 1.218 + @unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)") 1.219 + def test_get_connections(self): 1.220 + def create_socket(family, type): 1.221 + sock = socket.socket(family, type) 1.222 + sock.bind(('', 0)) 1.223 + if type == socket.SOCK_STREAM: 1.224 + sock.listen(1) 1.225 + return sock 1.226 + 1.227 + socks = [] 1.228 + socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM)) 1.229 + socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM)) 1.230 + if supports_ipv6(): 1.231 + socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM)) 1.232 + socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM)) 1.233 + if hasattr(socket, 'AF_UNIX'): 1.234 + safe_remove(TESTFN) 1.235 + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1.236 + s.bind(TESTFN) 1.237 + s.listen(1) 1.238 + socks.append(s) 1.239 + kind = 'all' 1.240 + # TODO: UNIX sockets are temporarily implemented by parsing 1.241 + # 'pfiles' cmd output; we don't want that part of the code to 1.242 + # be executed. 1.243 + if SUNOS: 1.244 + kind = 'inet' 1.245 + try: 1.246 + self.execute('get_connections', kind=kind) 1.247 + finally: 1.248 + for s in socks: 1.249 + s.close() 1.250 + 1.251 + 1.252 +p = get_test_subprocess() 1.253 +DEAD_PROC = psutil.Process(p.pid) 1.254 +DEAD_PROC.kill() 1.255 +DEAD_PROC.wait() 1.256 +del p 1.257 + 1.258 +class TestProcessObjectLeaksZombie(TestProcessObjectLeaks): 1.259 + """Same as above but looks for leaks occurring when dealing with 1.260 + zombie processes raising NoSuchProcess exception. 1.261 + """ 1.262 + proc = DEAD_PROC 1.263 + 1.264 + if not POSIX: 1.265 + def test_kill(self): 1.266 + self.execute('kill') 1.267 + 1.268 + def test_terminate(self): 1.269 + self.execute('terminate') 1.270 + 1.271 + def test_suspend(self): 1.272 + self.execute('suspend') 1.273 + 1.274 + def test_resume(self): 1.275 + self.execute('resume') 1.276 + 1.277 + def test_wait(self): 1.278 + self.execute('wait') 1.279 + 1.280 + 1.281 +class TestModuleFunctionsLeaks(Base): 1.282 + """Test leaks of psutil module functions.""" 1.283 + 1.284 + def setUp(self): 1.285 + gc.collect() 1.286 + 1.287 + def call(self, function, *args, **kwargs): 1.288 + obj = getattr(psutil, function) 1.289 + if callable(obj): 1.290 + retvalue = obj(*args, **kwargs) 1.291 + 1.292 + @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") 1.293 + def test_pid_exists(self): 1.294 + self.execute('pid_exists', os.getpid()) 1.295 + 1.296 + def test_virtual_memory(self): 1.297 + self.execute('virtual_memory') 1.298 + 1.299 + # TODO: remove this skip when this gets fixed 1.300 + @unittest.skipIf(SUNOS, 1.301 + "not worth being tested on SUNOS (uses a subprocess)") 1.302 + def test_swap_memory(self): 1.303 + self.execute('swap_memory') 1.304 + 1.305 + def test_cpu_times(self): 1.306 + self.execute('cpu_times') 1.307 + 1.308 + def test_per_cpu_times(self): 1.309 + self.execute('cpu_times', percpu=True) 1.310 + 1.311 + @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)") 1.312 + def test_disk_usage(self): 1.313 + self.execute('disk_usage', '.') 1.314 + 1.315 + def test_disk_partitions(self): 1.316 + self.execute('disk_partitions') 1.317 + 1.318 + def test_net_io_counters(self): 1.319 + self.execute('net_io_counters') 1.320 + 1.321 + def test_disk_io_counters(self): 1.322 + self.execute('disk_io_counters') 1.323 + 1.324 + # XXX - on Windows this produces a false positive 1.325 + @unittest.skipIf(WINDOWS, 1.326 + "XXX produces a false positive on Windows") 1.327 + def test_get_users(self): 1.328 + self.execute('get_users') 1.329 + 1.330 + 1.331 +def test_main(): 1.332 + test_suite = unittest.TestSuite() 1.333 + tests = [TestProcessObjectLeaksZombie, 1.334 + TestProcessObjectLeaks, 1.335 + TestModuleFunctionsLeaks,] 1.336 + for test in tests: 1.337 + test_suite.addTest(unittest.makeSuite(test)) 1.338 + result = unittest.TextTestRunner(verbosity=2).run(test_suite) 1.339 + return result.wasSuccessful() 1.340 + 1.341 +if __name__ == '__main__': 1.342 + if not test_main(): 1.343 + sys.exit(1)