python/psutil/test/test_memory_leaks.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 #!/usr/bin/env python
michael@0 2
michael@0 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
michael@0 4 # Use of this source code is governed by a BSD-style license that can be
michael@0 5 # found in the LICENSE file.
michael@0 6
michael@0 7 """
michael@0 8 A test script which attempts to detect memory leaks by calling C
michael@0 9 functions many times and compare process memory usage before and
michael@0 10 after the calls. It might produce false positives.
michael@0 11 """
michael@0 12
michael@0 13 import os
michael@0 14 import gc
michael@0 15 import unittest
michael@0 16 import time
michael@0 17 import socket
michael@0 18 import threading
michael@0 19 import types
michael@0 20 import sys
michael@0 21
michael@0 22 import psutil
michael@0 23 import psutil._common
michael@0 24 from psutil._compat import PY3, callable, xrange
michael@0 25 from test_psutil import *
michael@0 26
michael@0 27 # disable cache for Process class properties
michael@0 28 psutil._common.cached_property.enabled = False
michael@0 29
michael@0 30 LOOPS = 1000
michael@0 31 TOLERANCE = 4096
michael@0 32
michael@0 33
michael@0 34 class Base(unittest.TestCase):
michael@0 35
michael@0 36 proc = psutil.Process(os.getpid())
michael@0 37
michael@0 38 def execute(self, function, *args, **kwargs):
michael@0 39 def call_many_times():
michael@0 40 for x in xrange(LOOPS - 1):
michael@0 41 self.call(function, *args, **kwargs)
michael@0 42 del x
michael@0 43 gc.collect()
michael@0 44 return self.get_mem()
michael@0 45
michael@0 46 self.call(function, *args, **kwargs)
michael@0 47 self.assertEqual(gc.garbage, [])
michael@0 48 self.assertEqual(threading.active_count(), 1)
michael@0 49
michael@0 50 # RSS comparison
michael@0 51 # step 1
michael@0 52 rss1 = call_many_times()
michael@0 53 # step 2
michael@0 54 rss2 = call_many_times()
michael@0 55
michael@0 56 difference = rss2 - rss1
michael@0 57 if difference > TOLERANCE:
michael@0 58 # This doesn't necessarily mean we have a leak yet.
michael@0 59 # At this point we assume that after having called the
michael@0 60 # function so many times the memory usage is stabilized
michael@0 61 # and if there are no leaks it should not increase any
michael@0 62 # more.
michael@0 63 # Let's keep calling fun for 3 more seconds and fail if
michael@0 64 # we notice any difference.
michael@0 65 stop_at = time.time() + 3
michael@0 66 while 1:
michael@0 67 self.call(function, *args, **kwargs)
michael@0 68 if time.time() >= stop_at:
michael@0 69 break
michael@0 70 del stop_at
michael@0 71 gc.collect()
michael@0 72 rss3 = self.get_mem()
michael@0 73 difference = rss3 - rss2
michael@0 74 if rss3 > rss2:
michael@0 75 self.fail("rss2=%s, rss3=%s, difference=%s" \
michael@0 76 % (rss2, rss3, difference))
michael@0 77
michael@0 78 def get_mem(self):
michael@0 79 return psutil.Process(os.getpid()).get_memory_info()[0]
michael@0 80
michael@0 81 def call(self, *args, **kwargs):
michael@0 82 raise NotImplementedError("must be implemented in subclass")
michael@0 83
michael@0 84
michael@0 85 class TestProcessObjectLeaks(Base):
michael@0 86 """Test leaks of Process class methods and properties"""
michael@0 87
michael@0 88 def __init__(self, *args, **kwargs):
michael@0 89 Base.__init__(self, *args, **kwargs)
michael@0 90 # skip tests which are not supported by Process API
michael@0 91 supported_attrs = dir(psutil.Process)
michael@0 92 for attr in [x for x in dir(self) if x.startswith('test')]:
michael@0 93 if attr[5:] not in supported_attrs:
michael@0 94 meth = getattr(self, attr)
michael@0 95 name = meth.__func__.__name__.replace('test_', '')
michael@0 96 @unittest.skipIf(True,
michael@0 97 "%s not supported on this platform" % name)
michael@0 98 def test_(self):
michael@0 99 pass
michael@0 100 setattr(self, attr, types.MethodType(test_, self))
michael@0 101
michael@0 102 def setUp(self):
michael@0 103 gc.collect()
michael@0 104
michael@0 105 def tearDown(self):
michael@0 106 reap_children()
michael@0 107
michael@0 108 def call(self, function, *args, **kwargs):
michael@0 109 try:
michael@0 110 obj = getattr(self.proc, function)
michael@0 111 if callable(obj):
michael@0 112 obj(*args, **kwargs)
michael@0 113 except psutil.Error:
michael@0 114 pass
michael@0 115
michael@0 116 def test_name(self):
michael@0 117 self.execute('name')
michael@0 118
michael@0 119 def test_cmdline(self):
michael@0 120 self.execute('cmdline')
michael@0 121
michael@0 122 def test_exe(self):
michael@0 123 self.execute('exe')
michael@0 124
michael@0 125 def test_ppid(self):
michael@0 126 self.execute('ppid')
michael@0 127
michael@0 128 def test_uids(self):
michael@0 129 self.execute('uids')
michael@0 130
michael@0 131 def test_gids(self):
michael@0 132 self.execute('gids')
michael@0 133
michael@0 134 def test_status(self):
michael@0 135 self.execute('status')
michael@0 136
michael@0 137 def test_get_nice(self):
michael@0 138 self.execute('get_nice')
michael@0 139
michael@0 140 def test_set_nice(self):
michael@0 141 niceness = psutil.Process(os.getpid()).get_nice()
michael@0 142 self.execute('set_nice', niceness)
michael@0 143
michael@0 144 def test_get_io_counters(self):
michael@0 145 self.execute('get_io_counters')
michael@0 146
michael@0 147 def test_get_ionice(self):
michael@0 148 self.execute('get_ionice')
michael@0 149
michael@0 150 def test_set_ionice(self):
michael@0 151 if WINDOWS:
michael@0 152 value = psutil.Process(os.getpid()).get_ionice()
michael@0 153 self.execute('set_ionice', value)
michael@0 154 else:
michael@0 155 self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
michael@0 156
michael@0 157 def test_username(self):
michael@0 158 self.execute('username')
michael@0 159
michael@0 160 def test_create_time(self):
michael@0 161 self.execute('create_time')
michael@0 162
michael@0 163 def test_get_num_threads(self):
michael@0 164 self.execute('get_num_threads')
michael@0 165
michael@0 166 def test_get_num_handles(self):
michael@0 167 self.execute('get_num_handles')
michael@0 168
michael@0 169 def test_get_num_fds(self):
michael@0 170 self.execute('get_num_fds')
michael@0 171
michael@0 172 def test_get_threads(self):
michael@0 173 self.execute('get_threads')
michael@0 174
michael@0 175 def test_get_cpu_times(self):
michael@0 176 self.execute('get_cpu_times')
michael@0 177
michael@0 178 def test_get_memory_info(self):
michael@0 179 self.execute('get_memory_info')
michael@0 180
michael@0 181 def test_get_ext_memory_info(self):
michael@0 182 self.execute('get_ext_memory_info')
michael@0 183
michael@0 184 def test_terminal(self):
michael@0 185 self.execute('terminal')
michael@0 186
michael@0 187 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
michael@0 188 def test_resume(self):
michael@0 189 self.execute('resume')
michael@0 190
michael@0 191 def test_getcwd(self):
michael@0 192 self.execute('getcwd')
michael@0 193
michael@0 194 def test_get_cpu_affinity(self):
michael@0 195 self.execute('get_cpu_affinity')
michael@0 196
michael@0 197 def test_set_cpu_affinity(self):
michael@0 198 affinity = psutil.Process(os.getpid()).get_cpu_affinity()
michael@0 199 self.execute('set_cpu_affinity', affinity)
michael@0 200
michael@0 201 def test_get_open_files(self):
michael@0 202 safe_remove(TESTFN) # needed after UNIX socket test has run
michael@0 203 f = open(TESTFN, 'w')
michael@0 204 try:
michael@0 205 self.execute('get_open_files')
michael@0 206 finally:
michael@0 207 f.close()
michael@0 208
michael@0 209 # OSX implementation is unbelievably slow
michael@0 210 @unittest.skipIf(OSX, "OSX implementation is too slow")
michael@0 211 def test_get_memory_maps(self):
michael@0 212 self.execute('get_memory_maps')
michael@0 213
michael@0 214 # Linux implementation is pure python so since it's slow we skip it
michael@0 215 @unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)")
michael@0 216 def test_get_connections(self):
michael@0 217 def create_socket(family, type):
michael@0 218 sock = socket.socket(family, type)
michael@0 219 sock.bind(('', 0))
michael@0 220 if type == socket.SOCK_STREAM:
michael@0 221 sock.listen(1)
michael@0 222 return sock
michael@0 223
michael@0 224 socks = []
michael@0 225 socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
michael@0 226 socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
michael@0 227 if supports_ipv6():
michael@0 228 socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
michael@0 229 socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
michael@0 230 if hasattr(socket, 'AF_UNIX'):
michael@0 231 safe_remove(TESTFN)
michael@0 232 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
michael@0 233 s.bind(TESTFN)
michael@0 234 s.listen(1)
michael@0 235 socks.append(s)
michael@0 236 kind = 'all'
michael@0 237 # TODO: UNIX sockets are temporarily implemented by parsing
michael@0 238 # 'pfiles' cmd output; we don't want that part of the code to
michael@0 239 # be executed.
michael@0 240 if SUNOS:
michael@0 241 kind = 'inet'
michael@0 242 try:
michael@0 243 self.execute('get_connections', kind=kind)
michael@0 244 finally:
michael@0 245 for s in socks:
michael@0 246 s.close()
michael@0 247
michael@0 248
michael@0 249 p = get_test_subprocess()
michael@0 250 DEAD_PROC = psutil.Process(p.pid)
michael@0 251 DEAD_PROC.kill()
michael@0 252 DEAD_PROC.wait()
michael@0 253 del p
michael@0 254
michael@0 255 class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
michael@0 256 """Same as above but looks for leaks occurring when dealing with
michael@0 257 zombie processes raising NoSuchProcess exception.
michael@0 258 """
michael@0 259 proc = DEAD_PROC
michael@0 260
michael@0 261 if not POSIX:
michael@0 262 def test_kill(self):
michael@0 263 self.execute('kill')
michael@0 264
michael@0 265 def test_terminate(self):
michael@0 266 self.execute('terminate')
michael@0 267
michael@0 268 def test_suspend(self):
michael@0 269 self.execute('suspend')
michael@0 270
michael@0 271 def test_resume(self):
michael@0 272 self.execute('resume')
michael@0 273
michael@0 274 def test_wait(self):
michael@0 275 self.execute('wait')
michael@0 276
michael@0 277
michael@0 278 class TestModuleFunctionsLeaks(Base):
michael@0 279 """Test leaks of psutil module functions."""
michael@0 280
michael@0 281 def setUp(self):
michael@0 282 gc.collect()
michael@0 283
michael@0 284 def call(self, function, *args, **kwargs):
michael@0 285 obj = getattr(psutil, function)
michael@0 286 if callable(obj):
michael@0 287 retvalue = obj(*args, **kwargs)
michael@0 288
michael@0 289 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
michael@0 290 def test_pid_exists(self):
michael@0 291 self.execute('pid_exists', os.getpid())
michael@0 292
michael@0 293 def test_virtual_memory(self):
michael@0 294 self.execute('virtual_memory')
michael@0 295
michael@0 296 # TODO: remove this skip when this gets fixed
michael@0 297 @unittest.skipIf(SUNOS,
michael@0 298 "not worth being tested on SUNOS (uses a subprocess)")
michael@0 299 def test_swap_memory(self):
michael@0 300 self.execute('swap_memory')
michael@0 301
michael@0 302 def test_cpu_times(self):
michael@0 303 self.execute('cpu_times')
michael@0 304
michael@0 305 def test_per_cpu_times(self):
michael@0 306 self.execute('cpu_times', percpu=True)
michael@0 307
michael@0 308 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
michael@0 309 def test_disk_usage(self):
michael@0 310 self.execute('disk_usage', '.')
michael@0 311
michael@0 312 def test_disk_partitions(self):
michael@0 313 self.execute('disk_partitions')
michael@0 314
michael@0 315 def test_net_io_counters(self):
michael@0 316 self.execute('net_io_counters')
michael@0 317
michael@0 318 def test_disk_io_counters(self):
michael@0 319 self.execute('disk_io_counters')
michael@0 320
michael@0 321 # XXX - on Windows this produces a false positive
michael@0 322 @unittest.skipIf(WINDOWS,
michael@0 323 "XXX produces a false positive on Windows")
michael@0 324 def test_get_users(self):
michael@0 325 self.execute('get_users')
michael@0 326
michael@0 327
michael@0 328 def test_main():
michael@0 329 test_suite = unittest.TestSuite()
michael@0 330 tests = [TestProcessObjectLeaksZombie,
michael@0 331 TestProcessObjectLeaks,
michael@0 332 TestModuleFunctionsLeaks,]
michael@0 333 for test in tests:
michael@0 334 test_suite.addTest(unittest.makeSuite(test))
michael@0 335 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
michael@0 336 return result.wasSuccessful()
michael@0 337
michael@0 338 if __name__ == '__main__':
michael@0 339 if not test_main():
michael@0 340 sys.exit(1)

mercurial