python/psutil/test/test_memory_leaks.py

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 #!/usr/bin/env python
     3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
     4 # Use of this source code is governed by a BSD-style license that can be
     5 # found in the LICENSE file.
     7 """
     8 A test script which attempts to detect memory leaks by calling C
     9 functions many times and compare process memory usage before and
    10 after the calls.  It might produce false positives.
    11 """
    13 import os
    14 import gc
    15 import unittest
    16 import time
    17 import socket
    18 import threading
    19 import types
    20 import sys
    22 import psutil
    23 import psutil._common
    24 from psutil._compat import PY3, callable, xrange
    25 from test_psutil import *
    27 # disable cache for Process class properties
    28 psutil._common.cached_property.enabled = False
    30 LOOPS = 1000
    31 TOLERANCE = 4096
    34 class Base(unittest.TestCase):
    36     proc = psutil.Process(os.getpid())
    38     def execute(self, function, *args, **kwargs):
    39         def call_many_times():
    40             for x in xrange(LOOPS - 1):
    41                 self.call(function, *args, **kwargs)
    42             del x
    43             gc.collect()
    44             return self.get_mem()
    46         self.call(function, *args, **kwargs)
    47         self.assertEqual(gc.garbage, [])
    48         self.assertEqual(threading.active_count(), 1)
    50         # RSS comparison
    51         # step 1
    52         rss1 = call_many_times()
    53         # step 2
    54         rss2 = call_many_times()
    56         difference = rss2 - rss1
    57         if difference > TOLERANCE:
    58             # This doesn't necessarily mean we have a leak yet.
    59             # At this point we assume that after having called the
    60             # function so many times the memory usage is stabilized
    61             # and if there are no leaks it should not increase any
    62             # more.
    63             # Let's keep calling fun for 3 more seconds and fail if
    64             # we notice any difference.
    65             stop_at = time.time() + 3
    66             while 1:
    67                 self.call(function, *args, **kwargs)
    68                 if time.time() >= stop_at:
    69                     break
    70             del stop_at
    71             gc.collect()
    72             rss3 = self.get_mem()
    73             difference = rss3 - rss2
    74             if rss3 > rss2:
    75                 self.fail("rss2=%s, rss3=%s, difference=%s" \
    76                           % (rss2, rss3, difference))
    78     def get_mem(self):
    79         return psutil.Process(os.getpid()).get_memory_info()[0]
    81     def call(self, *args, **kwargs):
    82         raise NotImplementedError("must be implemented in subclass")
    85 class TestProcessObjectLeaks(Base):
    86     """Test leaks of Process class methods and properties"""
    88     def __init__(self, *args, **kwargs):
    89         Base.__init__(self, *args, **kwargs)
    90         # skip tests which are not supported by Process API
    91         supported_attrs = dir(psutil.Process)
    92         for attr in [x for x in dir(self) if x.startswith('test')]:
    93             if attr[5:] not in supported_attrs:
    94                 meth = getattr(self, attr)
    95                 name = meth.__func__.__name__.replace('test_', '')
    96                 @unittest.skipIf(True,
    97                                  "%s not supported on this platform" % name)
    98                 def test_(self):
    99                     pass
   100                 setattr(self, attr, types.MethodType(test_, self))
   102     def setUp(self):
   103         gc.collect()
   105     def tearDown(self):
   106         reap_children()
   108     def call(self, function, *args, **kwargs):
   109         try:
   110             obj = getattr(self.proc, function)
   111             if callable(obj):
   112                 obj(*args, **kwargs)
   113         except psutil.Error:
   114             pass
   116     def test_name(self):
   117         self.execute('name')
   119     def test_cmdline(self):
   120         self.execute('cmdline')
   122     def test_exe(self):
   123         self.execute('exe')
   125     def test_ppid(self):
   126         self.execute('ppid')
   128     def test_uids(self):
   129         self.execute('uids')
   131     def test_gids(self):
   132         self.execute('gids')
   134     def test_status(self):
   135         self.execute('status')
   137     def test_get_nice(self):
   138         self.execute('get_nice')
   140     def test_set_nice(self):
   141         niceness = psutil.Process(os.getpid()).get_nice()
   142         self.execute('set_nice', niceness)
   144     def test_get_io_counters(self):
   145         self.execute('get_io_counters')
   147     def test_get_ionice(self):
   148         self.execute('get_ionice')
   150     def test_set_ionice(self):
   151         if WINDOWS:
   152             value = psutil.Process(os.getpid()).get_ionice()
   153             self.execute('set_ionice', value)
   154         else:
   155             self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
   157     def test_username(self):
   158         self.execute('username')
   160     def test_create_time(self):
   161         self.execute('create_time')
   163     def test_get_num_threads(self):
   164         self.execute('get_num_threads')
   166     def test_get_num_handles(self):
   167         self.execute('get_num_handles')
   169     def test_get_num_fds(self):
   170         self.execute('get_num_fds')
   172     def test_get_threads(self):
   173         self.execute('get_threads')
   175     def test_get_cpu_times(self):
   176         self.execute('get_cpu_times')
   178     def test_get_memory_info(self):
   179         self.execute('get_memory_info')
   181     def test_get_ext_memory_info(self):
   182         self.execute('get_ext_memory_info')
   184     def test_terminal(self):
   185         self.execute('terminal')
   187     @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
   188     def test_resume(self):
   189         self.execute('resume')
   191     def test_getcwd(self):
   192         self.execute('getcwd')
   194     def test_get_cpu_affinity(self):
   195         self.execute('get_cpu_affinity')
   197     def test_set_cpu_affinity(self):
   198         affinity = psutil.Process(os.getpid()).get_cpu_affinity()
   199         self.execute('set_cpu_affinity', affinity)
   201     def test_get_open_files(self):
   202         safe_remove(TESTFN)  # needed after UNIX socket test has run
   203         f = open(TESTFN, 'w')
   204         try:
   205             self.execute('get_open_files')
   206         finally:
   207             f.close()
   209     # OSX implementation is unbelievably slow
   210     @unittest.skipIf(OSX, "OSX implementation is too slow")
   211     def test_get_memory_maps(self):
   212         self.execute('get_memory_maps')
   214     # Linux implementation is pure python so since it's slow we skip it
   215     @unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)")
   216     def test_get_connections(self):
   217         def create_socket(family, type):
   218             sock = socket.socket(family, type)
   219             sock.bind(('', 0))
   220             if type == socket.SOCK_STREAM:
   221                 sock.listen(1)
   222             return sock
   224         socks = []
   225         socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
   226         socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
   227         if supports_ipv6():
   228             socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
   229             socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
   230         if hasattr(socket, 'AF_UNIX'):
   231             safe_remove(TESTFN)
   232             s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
   233             s.bind(TESTFN)
   234             s.listen(1)
   235             socks.append(s)
   236         kind = 'all'
   237         # TODO: UNIX sockets are temporarily implemented by parsing
   238         # 'pfiles' cmd  output; we don't want that part of the code to
   239         # be executed.
   240         if SUNOS:
   241             kind = 'inet'
   242         try:
   243             self.execute('get_connections', kind=kind)
   244         finally:
   245             for s in socks:
   246                 s.close()
   249 p = get_test_subprocess()
   250 DEAD_PROC = psutil.Process(p.pid)
   251 DEAD_PROC.kill()
   252 DEAD_PROC.wait()
   253 del p
   255 class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
   256     """Same as above but looks for leaks occurring when dealing with
   257     zombie processes raising NoSuchProcess exception.
   258     """
   259     proc = DEAD_PROC
   261     if not POSIX:
   262         def test_kill(self):
   263             self.execute('kill')
   265         def test_terminate(self):
   266             self.execute('terminate')
   268         def test_suspend(self):
   269             self.execute('suspend')
   271         def test_resume(self):
   272             self.execute('resume')
   274         def test_wait(self):
   275             self.execute('wait')
   278 class TestModuleFunctionsLeaks(Base):
   279     """Test leaks of psutil module functions."""
   281     def setUp(self):
   282         gc.collect()
   284     def call(self, function, *args, **kwargs):
   285         obj = getattr(psutil, function)
   286         if callable(obj):
   287             retvalue = obj(*args, **kwargs)
   289     @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
   290     def test_pid_exists(self):
   291         self.execute('pid_exists', os.getpid())
   293     def test_virtual_memory(self):
   294         self.execute('virtual_memory')
   296     # TODO: remove this skip when this gets fixed
   297     @unittest.skipIf(SUNOS,
   298                      "not worth being tested on SUNOS (uses a subprocess)")
   299     def test_swap_memory(self):
   300         self.execute('swap_memory')
   302     def test_cpu_times(self):
   303         self.execute('cpu_times')
   305     def test_per_cpu_times(self):
   306         self.execute('cpu_times', percpu=True)
   308     @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
   309     def test_disk_usage(self):
   310         self.execute('disk_usage', '.')
   312     def test_disk_partitions(self):
   313         self.execute('disk_partitions')
   315     def test_net_io_counters(self):
   316         self.execute('net_io_counters')
   318     def test_disk_io_counters(self):
   319         self.execute('disk_io_counters')
   321     # XXX - on Windows this produces a false positive
   322     @unittest.skipIf(WINDOWS,
   323                      "XXX produces a false positive on Windows")
   324     def test_get_users(self):
   325         self.execute('get_users')
   328 def test_main():
   329     test_suite = unittest.TestSuite()
   330     tests = [TestProcessObjectLeaksZombie,
   331              TestProcessObjectLeaks,
   332              TestModuleFunctionsLeaks,]
   333     for test in tests:
   334         test_suite.addTest(unittest.makeSuite(test))
   335     result = unittest.TextTestRunner(verbosity=2).run(test_suite)
   336     return result.wasSuccessful()
   338 if __name__ == '__main__':
   339     if not test_main():
   340         sys.exit(1)

mercurial