Wed, 31 Dec 2014 06:55:50 +0100
Added tag UPSTREAM_283F7C6 for changeset ca08bd8f51b2
michael@0 | 1 | from __future__ import with_statement |
michael@0 | 2 | |
michael@0 | 3 | import sys |
michael@0 | 4 | |
michael@0 | 5 | __all__ = ['nested', 'catch_warnings', 'examine_warnings'] |
michael@0 | 6 | |
michael@0 | 7 | |
michael@0 | 8 | try: |
michael@0 | 9 | from contextlib import nested |
michael@0 | 10 | except ImportError: |
michael@0 | 11 | from contextlib import contextmanager |
michael@0 | 12 | @contextmanager |
michael@0 | 13 | def nested(*managers): |
michael@0 | 14 | exits = [] |
michael@0 | 15 | vars = [] |
michael@0 | 16 | exc = (None, None, None) |
michael@0 | 17 | try: |
michael@0 | 18 | for mgr in managers: |
michael@0 | 19 | exit = mgr.__exit__ |
michael@0 | 20 | enter = mgr.__enter__ |
michael@0 | 21 | vars.append(enter()) |
michael@0 | 22 | exits.append(exit) |
michael@0 | 23 | yield vars |
michael@0 | 24 | except: |
michael@0 | 25 | exc = sys.exc_info() |
michael@0 | 26 | finally: |
michael@0 | 27 | while exits: |
michael@0 | 28 | exit = exits.pop() |
michael@0 | 29 | try: |
michael@0 | 30 | if exit(*exc): |
michael@0 | 31 | exc = (None, None, None) |
michael@0 | 32 | except: |
michael@0 | 33 | exc = sys.exc_info() |
michael@0 | 34 | if exc != (None, None, None): |
michael@0 | 35 | raise exc[1] |
michael@0 | 36 | |
michael@0 | 37 | # copied from Python 2.6 |
michael@0 | 38 | try: |
michael@0 | 39 | from warnings import catch_warnings |
michael@0 | 40 | except ImportError: |
michael@0 | 41 | class catch_warnings(object): |
michael@0 | 42 | def __init__(self, record=False, module=None): |
michael@0 | 43 | self._record = record |
michael@0 | 44 | self._module = sys.modules['warnings'] |
michael@0 | 45 | self._entered = False |
michael@0 | 46 | |
michael@0 | 47 | def __repr__(self): |
michael@0 | 48 | args = [] |
michael@0 | 49 | if self._record: |
michael@0 | 50 | args.append("record=True") |
michael@0 | 51 | name = type(self).__name__ |
michael@0 | 52 | return "%s(%s)" % (name, ", ".join(args)) |
michael@0 | 53 | |
michael@0 | 54 | def __enter__(self): |
michael@0 | 55 | if self._entered: |
michael@0 | 56 | raise RuntimeError("Cannot enter %r twice" % self) |
michael@0 | 57 | self._entered = True |
michael@0 | 58 | self._filters = self._module.filters |
michael@0 | 59 | self._module.filters = self._filters[:] |
michael@0 | 60 | self._showwarning = self._module.showwarning |
michael@0 | 61 | if self._record: |
michael@0 | 62 | log = [] |
michael@0 | 63 | def showwarning(*args, **kwargs): |
michael@0 | 64 | log.append(WarningMessage(*args, **kwargs)) |
michael@0 | 65 | self._module.showwarning = showwarning |
michael@0 | 66 | return log |
michael@0 | 67 | else: |
michael@0 | 68 | return None |
michael@0 | 69 | |
michael@0 | 70 | def __exit__(self, *exc_info): |
michael@0 | 71 | if not self._entered: |
michael@0 | 72 | raise RuntimeError("Cannot exit %r without entering first" % self) |
michael@0 | 73 | self._module.filters = self._filters |
michael@0 | 74 | self._module.showwarning = self._showwarning |
michael@0 | 75 | |
michael@0 | 76 | class WarningMessage(object): |
michael@0 | 77 | _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", |
michael@0 | 78 | "line") |
michael@0 | 79 | def __init__(self, message, category, filename, lineno, file=None, |
michael@0 | 80 | line=None): |
michael@0 | 81 | local_values = locals() |
michael@0 | 82 | for attr in self._WARNING_DETAILS: |
michael@0 | 83 | setattr(self, attr, local_values[attr]) |
michael@0 | 84 | self._category_name = None |
michael@0 | 85 | if category.__name__: |
michael@0 | 86 | self._category_name = category.__name__ |
michael@0 | 87 | |
michael@0 | 88 | |
michael@0 | 89 | def examine_warnings(func): |
michael@0 | 90 | def wrapper(): |
michael@0 | 91 | with catch_warnings(record=True) as ws: |
michael@0 | 92 | func(ws) |
michael@0 | 93 | return wrapper |