Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 #!/usr/bin/env python
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """
8 A test script which attempts to detect memory leaks by calling C
9 functions many times and compare process memory usage before and
10 after the calls. It might produce false positives.
11 """
13 import os
14 import gc
15 import unittest
16 import time
17 import socket
18 import threading
19 import types
20 import sys
22 import psutil
23 import psutil._common
24 from psutil._compat import PY3, callable, xrange
25 from test_psutil import *
27 # disable cache for Process class properties
28 psutil._common.cached_property.enabled = False
30 LOOPS = 1000
31 TOLERANCE = 4096
34 class Base(unittest.TestCase):
36 proc = psutil.Process(os.getpid())
38 def execute(self, function, *args, **kwargs):
39 def call_many_times():
40 for x in xrange(LOOPS - 1):
41 self.call(function, *args, **kwargs)
42 del x
43 gc.collect()
44 return self.get_mem()
46 self.call(function, *args, **kwargs)
47 self.assertEqual(gc.garbage, [])
48 self.assertEqual(threading.active_count(), 1)
50 # RSS comparison
51 # step 1
52 rss1 = call_many_times()
53 # step 2
54 rss2 = call_many_times()
56 difference = rss2 - rss1
57 if difference > TOLERANCE:
58 # This doesn't necessarily mean we have a leak yet.
59 # At this point we assume that after having called the
60 # function so many times the memory usage is stabilized
61 # and if there are no leaks it should not increase any
62 # more.
63 # Let's keep calling fun for 3 more seconds and fail if
64 # we notice any difference.
65 stop_at = time.time() + 3
66 while 1:
67 self.call(function, *args, **kwargs)
68 if time.time() >= stop_at:
69 break
70 del stop_at
71 gc.collect()
72 rss3 = self.get_mem()
73 difference = rss3 - rss2
74 if rss3 > rss2:
75 self.fail("rss2=%s, rss3=%s, difference=%s" \
76 % (rss2, rss3, difference))
78 def get_mem(self):
79 return psutil.Process(os.getpid()).get_memory_info()[0]
81 def call(self, *args, **kwargs):
82 raise NotImplementedError("must be implemented in subclass")
85 class TestProcessObjectLeaks(Base):
86 """Test leaks of Process class methods and properties"""
88 def __init__(self, *args, **kwargs):
89 Base.__init__(self, *args, **kwargs)
90 # skip tests which are not supported by Process API
91 supported_attrs = dir(psutil.Process)
92 for attr in [x for x in dir(self) if x.startswith('test')]:
93 if attr[5:] not in supported_attrs:
94 meth = getattr(self, attr)
95 name = meth.__func__.__name__.replace('test_', '')
96 @unittest.skipIf(True,
97 "%s not supported on this platform" % name)
98 def test_(self):
99 pass
100 setattr(self, attr, types.MethodType(test_, self))
102 def setUp(self):
103 gc.collect()
105 def tearDown(self):
106 reap_children()
108 def call(self, function, *args, **kwargs):
109 try:
110 obj = getattr(self.proc, function)
111 if callable(obj):
112 obj(*args, **kwargs)
113 except psutil.Error:
114 pass
116 def test_name(self):
117 self.execute('name')
119 def test_cmdline(self):
120 self.execute('cmdline')
122 def test_exe(self):
123 self.execute('exe')
125 def test_ppid(self):
126 self.execute('ppid')
128 def test_uids(self):
129 self.execute('uids')
131 def test_gids(self):
132 self.execute('gids')
134 def test_status(self):
135 self.execute('status')
137 def test_get_nice(self):
138 self.execute('get_nice')
140 def test_set_nice(self):
141 niceness = psutil.Process(os.getpid()).get_nice()
142 self.execute('set_nice', niceness)
144 def test_get_io_counters(self):
145 self.execute('get_io_counters')
147 def test_get_ionice(self):
148 self.execute('get_ionice')
150 def test_set_ionice(self):
151 if WINDOWS:
152 value = psutil.Process(os.getpid()).get_ionice()
153 self.execute('set_ionice', value)
154 else:
155 self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
157 def test_username(self):
158 self.execute('username')
160 def test_create_time(self):
161 self.execute('create_time')
163 def test_get_num_threads(self):
164 self.execute('get_num_threads')
166 def test_get_num_handles(self):
167 self.execute('get_num_handles')
169 def test_get_num_fds(self):
170 self.execute('get_num_fds')
172 def test_get_threads(self):
173 self.execute('get_threads')
175 def test_get_cpu_times(self):
176 self.execute('get_cpu_times')
178 def test_get_memory_info(self):
179 self.execute('get_memory_info')
181 def test_get_ext_memory_info(self):
182 self.execute('get_ext_memory_info')
184 def test_terminal(self):
185 self.execute('terminal')
187 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
188 def test_resume(self):
189 self.execute('resume')
191 def test_getcwd(self):
192 self.execute('getcwd')
194 def test_get_cpu_affinity(self):
195 self.execute('get_cpu_affinity')
197 def test_set_cpu_affinity(self):
198 affinity = psutil.Process(os.getpid()).get_cpu_affinity()
199 self.execute('set_cpu_affinity', affinity)
201 def test_get_open_files(self):
202 safe_remove(TESTFN) # needed after UNIX socket test has run
203 f = open(TESTFN, 'w')
204 try:
205 self.execute('get_open_files')
206 finally:
207 f.close()
209 # OSX implementation is unbelievably slow
210 @unittest.skipIf(OSX, "OSX implementation is too slow")
211 def test_get_memory_maps(self):
212 self.execute('get_memory_maps')
214 # Linux implementation is pure python so since it's slow we skip it
215 @unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)")
216 def test_get_connections(self):
217 def create_socket(family, type):
218 sock = socket.socket(family, type)
219 sock.bind(('', 0))
220 if type == socket.SOCK_STREAM:
221 sock.listen(1)
222 return sock
224 socks = []
225 socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
226 socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
227 if supports_ipv6():
228 socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
229 socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
230 if hasattr(socket, 'AF_UNIX'):
231 safe_remove(TESTFN)
232 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
233 s.bind(TESTFN)
234 s.listen(1)
235 socks.append(s)
236 kind = 'all'
237 # TODO: UNIX sockets are temporarily implemented by parsing
238 # 'pfiles' cmd output; we don't want that part of the code to
239 # be executed.
240 if SUNOS:
241 kind = 'inet'
242 try:
243 self.execute('get_connections', kind=kind)
244 finally:
245 for s in socks:
246 s.close()
249 p = get_test_subprocess()
250 DEAD_PROC = psutil.Process(p.pid)
251 DEAD_PROC.kill()
252 DEAD_PROC.wait()
253 del p
255 class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
256 """Same as above but looks for leaks occurring when dealing with
257 zombie processes raising NoSuchProcess exception.
258 """
259 proc = DEAD_PROC
261 if not POSIX:
262 def test_kill(self):
263 self.execute('kill')
265 def test_terminate(self):
266 self.execute('terminate')
268 def test_suspend(self):
269 self.execute('suspend')
271 def test_resume(self):
272 self.execute('resume')
274 def test_wait(self):
275 self.execute('wait')
278 class TestModuleFunctionsLeaks(Base):
279 """Test leaks of psutil module functions."""
281 def setUp(self):
282 gc.collect()
284 def call(self, function, *args, **kwargs):
285 obj = getattr(psutil, function)
286 if callable(obj):
287 retvalue = obj(*args, **kwargs)
289 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
290 def test_pid_exists(self):
291 self.execute('pid_exists', os.getpid())
293 def test_virtual_memory(self):
294 self.execute('virtual_memory')
296 # TODO: remove this skip when this gets fixed
297 @unittest.skipIf(SUNOS,
298 "not worth being tested on SUNOS (uses a subprocess)")
299 def test_swap_memory(self):
300 self.execute('swap_memory')
302 def test_cpu_times(self):
303 self.execute('cpu_times')
305 def test_per_cpu_times(self):
306 self.execute('cpu_times', percpu=True)
308 @unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
309 def test_disk_usage(self):
310 self.execute('disk_usage', '.')
312 def test_disk_partitions(self):
313 self.execute('disk_partitions')
315 def test_net_io_counters(self):
316 self.execute('net_io_counters')
318 def test_disk_io_counters(self):
319 self.execute('disk_io_counters')
321 # XXX - on Windows this produces a false positive
322 @unittest.skipIf(WINDOWS,
323 "XXX produces a false positive on Windows")
324 def test_get_users(self):
325 self.execute('get_users')
328 def test_main():
329 test_suite = unittest.TestSuite()
330 tests = [TestProcessObjectLeaksZombie,
331 TestProcessObjectLeaks,
332 TestModuleFunctionsLeaks,]
333 for test in tests:
334 test_suite.addTest(unittest.makeSuite(test))
335 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
336 return result.wasSuccessful()
338 if __name__ == '__main__':
339 if not test_main():
340 sys.exit(1)