Python gc 模块,get_objects() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.get_objects()

项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def testBenchmark():
        import time

        def printThreadNum():
            import gc
            from greenlet import greenlet
            objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
            print "Greenlets: %s" % len(objs)

        printThreadNum()
        test = TestNoblock()
        s = time.time()
        for i in range(3):
            gevent.spawn(test.count, i + 1)
        print "Created in %.3fs" % (time.time() - s)
        printThreadNum()
        time.sleep(5)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def make_gc_snapShot(filename, name):
        """Append the signatures to a file, giving them the given
        'name'. A signature is a pair object_id / type_name"""
    global first_time
    if first_time:
        gc.collect()
        first_time = False
    contents = []
    for o in gc.get_objects():
        try:
            tname = o.__class__.__name__
        except AttributeError:
            tname = str(type(o))
        contents.append((id(o), tname))
        del tname
    f = open(filename, 'a')
    pickle.dump((name, contents), f)
    f.close()
    del contents
    del f
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def reach(self, ids):
            """
            \param ids Iterable of object id, as returned by x[0],
            with x in the result of (snapshot2 - snapshot1)

            Return a dict id -> object with that id currently known.

            The objects recorded with these id might have been
            replaced by new ones... so we might end-up seeing objects
            that don't correspond to the original ones. This is
            especially true after a gc.collect()
            """
            result = dict()
            for obj in gc.get_objects():
                if id(obj) in ids:
                    result[id(obj)] = obj
            return result
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def memory_dump():
    import gc
    x = 0
    for obj in gc.get_objects():
        i = id(obj)
        size = sys.getsizeof(obj, 0)
        # referrers = [id(o) for o in gc.get_referrers(obj)]
        try:
            cls = str(obj.__class__)
        except:
            cls = "<no class>"
        if size > 1024 * 50:
            referents = set([id(o) for o in gc.get_referents(obj)])
            x += 1
            print(x, {'id': i,
                      'class': cls,
                      'size': size,
                      "ref": len(referents)})
            #if len(referents) < 2000:
                #print(obj)
项目:nyroglancer    作者:funkey    | 项目源码 | 文件源码
def add_file(self, filename):

        # make sure this file is closed before opening it again, h5py doesn't
        # handle this well otherwise
        for obj in gc.get_objects():
            if isinstance(obj, h5py.File):
                try:
                    if obj.filename == filename:
                        print("Closing previously open %s"%filename)
                        obj.close()
                except:
                    pass

        f = h5py.File(filename, 'r')
        self.files.append(f)
        self.__traverse_add(f, filename)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def _green_existing_locks():
    """Make locks created before monkey-patching safe.

    RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
    blocks the native thread. We need to replace these with green Locks.

    This was originally noticed in the stdlib logging module."""
    import gc
    import threading
    import eventlet.green.thread
    lock_type = type(threading.Lock())
    rlock_type = type(threading.RLock())
    if sys.version_info[0] >= 3:
        pyrlock_type = type(threading._PyRLock())
    # We're monkey-patching so there can't be any greenlets yet, ergo our thread
    # ID is the only valid owner possible.
    tid = eventlet.green.thread.get_ident()
    for obj in gc.get_objects():
        if isinstance(obj, rlock_type):
            if (sys.version_info[0] == 2 and
                    isinstance(obj._RLock__block, lock_type)):
                _fix_py2_rlock(obj, tid)
            elif (sys.version_info[0] >= 3 and
                    not isinstance(obj, pyrlock_type)):
                _fix_py3_rlock(obj)
项目:PYDOJO    作者:sprintingkiwi    | 项目源码 | 文件源码
def getactors(tag=None):
    if tag is not None:
        if tag in game_info.tagged_actors:
            return game_info.tagged_actors[tag]
        else:
            print('DEBUG: tag not in dictionary')
            return []
    else:
        return ACTORS
    # tagged_actors_list = []
    # for obj in gc.get_objects():
    #     if isinstance(obj, Actor):
    #         if tag is None:
    #             tagged_actors_list.append(obj)
    #         elif tag is str:
    #             if tag in obj.tags:
    #                 tagged_actors_list.append(obj)
    # return tagged_actors_list
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_leakage_from_tracebacks(self):
        tpool.execute(noop)  # get it started
        gc.collect()
        initial_objs = len(gc.get_objects())
        for i in range(10):
            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
        gc.collect()
        middle_objs = len(gc.get_objects())
        # some objects will inevitably be created by the previous loop
        # now we test to ensure that running the loop an order of
        # magnitude more doesn't generate additional objects
        for i in six.moves.range(100):
            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
        first_created = middle_objs - initial_objs
        gc.collect()
        second_created = len(gc.get_objects()) - middle_objs
        self.assert_(second_created - first_created < 10,
                     "first loop: %s, second loop: %s" % (first_created,
                                                          second_created))
        tpool.killall()
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def _green_existing_locks():
    """Make locks created before monkey-patching safe.

    RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
    blocks the native thread. We need to replace these with green Locks.

    This was originally noticed in the stdlib logging module."""
    import gc
    import threading
    import eventlet.green.thread
    lock_type = type(threading.Lock())
    rlock_type = type(threading.RLock())
    if sys.version_info[0] >= 3:
        pyrlock_type = type(threading._PyRLock())
    # We're monkey-patching so there can't be any greenlets yet, ergo our thread
    # ID is the only valid owner possible.
    tid = eventlet.green.thread.get_ident()
    for obj in gc.get_objects():
        if isinstance(obj, rlock_type):
            if (sys.version_info[0] == 2 and
                    isinstance(obj._RLock__block, lock_type)):
                _fix_py2_rlock(obj, tid)
            elif (sys.version_info[0] >= 3 and
                    not isinstance(obj, pyrlock_type)):
                _fix_py3_rlock(obj)
项目:dutch-boy    作者:Nextdoor    | 项目源码 | 文件源码
def begin(self):
        self.create_initial_summary()

        if self.detect_leaked_mocks:

            # Record pre-existing mocks
            gc.collect()
            self.known_mocks = list(KnownMock(weakref.ref(m), None, None)
                                    for m in gc.get_objects() if isinstance(m, mock.Mock))
            self.previous_mock_refs = list(m.mock_ref for m in self.known_mocks)

            if self.patch_mock:
                detector = self

                def decorator(f):
                    @functools.wraps(f)
                    def wrapper(new_mock, *args, **kwargs):
                        f(new_mock, *args, **kwargs)
                        detector.register_mock(new_mock, detector.level_name.get(LEVEL_TEST))
                    return wrapper

                Base.__init__ = decorator(Base.__init__)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def listObjs(regex='Q', typ=None):
    """List all objects managed by python gc with class name matching regex.
    Finds 'Q...' classes by default."""
    if typ is not None:
        return [x for x in gc.get_objects() if isinstance(x, typ)]
    else:
        return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_all_objects():
    """Return a list of all live Python objects (excluding int and long), not including the list itself."""
    gc.collect()
    gcl = gc.get_objects()
    olist = {}
    _getr(gcl, olist)

    del olist[id(olist)]
    del olist[id(gcl)]
    del olist[id(sys._getframe())]
    return olist
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def cleanup():
    global _cleanupCalled
    if _cleanupCalled:
        return

    if not getConfigOption('exitCleanup'):
        return

    ViewBox.quit()  ## tell ViewBox that it doesn't need to deregister views anymore.

    ## Workaround for Qt exit crash:
    ## ALL QGraphicsItems must have a scene before they are deleted.
    ## This is potentially very expensive, but preferred over crashing.
    ## Note: this appears to be fixed in PySide as of 2012.12, but it should be left in for a while longer..
    if QtGui.QApplication.instance() is None:
        return
    import gc
    s = QtGui.QGraphicsScene()
    for o in gc.get_objects():
        try:
            if isinstance(o, QtGui.QGraphicsItem) and isQObjectAlive(o) and o.scene() is None:
                if getConfigOption('crashWarning'):
                    sys.stderr.write('Error: graphics item without scene. '
                        'Make sure ViewBox.close() and GraphicsView.close() '
                        'are properly called before app shutdown (%s)\n' % (o,))

                s.addItem(o)
        except RuntimeError:  ## occurs if a python wrapper no longer has its underlying C++ object
            continue
    _cleanupCalled = True
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def listObjs(regex='Q', typ=None):
    """List all objects managed by python gc with class name matching regex.
    Finds 'Q...' classes by default."""
    if typ is not None:
        return [x for x in gc.get_objects() if isinstance(x, typ)]
    else:
        return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_all_objects():
    """Return a list of all live Python objects (excluding int and long), not including the list itself."""
    gc.collect()
    gcl = gc.get_objects()
    olist = {}
    _getr(gcl, olist)

    del olist[id(olist)]
    del olist[id(gcl)]
    del olist[id(sys._getframe())]
    return olist
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _get_objects(self):
        return gc.get_objects()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def _find_objects(t):
    return [o for o in gc.get_objects() if isinstance(o, t)]
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_issue_7959(self):
        proto = self.functype.im_func(None)

        class X(object):
            def func(self): pass
            def __init__(self):
                self.v = proto(self.func)

        import gc
        for i in range(32):
            X()
        gc.collect()
        live = [x for x in gc.get_objects()
                if isinstance(x, X)]
        self.assertEqual(len(live), 0)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def actionDumpobj(self):

        import gc
        import sys

        self.sendHeader()

        # No more if not in debug mode
        if not config.debug:
            yield "Not in debug mode"
            raise StopIteration

        class_filter = self.get.get("class")

        yield """
        <style>
         * { font-family: monospace; white-space: pre }
         table * { text-align: right; padding: 0px 10px }
        </style>
        """

        objs = gc.get_objects()
        for obj in objs:
            obj_type = str(type(obj))
            if obj_type != "<type 'instance'>" or obj.__class__.__name__ != class_filter:
                continue
            yield "%.1fkb %s... " % (float(sys.getsizeof(obj)) / 1024, cgi.escape(str(obj)))
            for attr in dir(obj):
                yield "- %s: %s<br>" % (attr, cgi.escape(str(getattr(obj, attr))))
            yield "<br>"

        gc.collect()  # Implicit grabage collection
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_errobj_reference_leak(self, level=rlevel):
        # Ticket #955
        with np.errstate(all="ignore"):
            z = int(0)
            p = np.int32(-1)

            gc.collect()
            n_before = len(gc.get_objects())
            z**p  # this shouldn't leak a reference to errobj
            gc.collect()
            n_after = len(gc.get_objects())
            assert_(n_before >= n_after, (n_before, n_after))
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def cleanup_tasks():
    tasks = [
        running_task
        for running_task in gc.get_objects()
        if isinstance(running_task, gevent.Greenlet)
    ]
    gevent.killall(tasks)
    gevent.hub.reinit()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def summarize_memory():
        print("Virtual machine: {:.2f}Mb".format(
            psutil.Process().memory_info_ex().vms / (1024 * 1024)))
        summary.print_(summary.summarize(muppy.get_objects()), limit=1)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __enter__(self):
        gc.disable()
        _gc_lock.acquire()
        loc = flask._request_ctx_stack._local

        # Force Python to track this dictionary at all times.
        # This is necessary since Python only starts tracking
        # dicts if they contain mutable objects.  It's a horrible,
        # horrible hack but makes this kinda testable.
        loc.__storage__['FOOO'] = [1, 2, 3]

        gc.collect()
        self.old_objects = len(gc.get_objects())
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if not hasattr(sys, 'getrefcount'):
            gc.collect()
        new_objects = len(gc.get_objects())
        if new_objects > self.old_objects:
            self.testcase.fail('Example code leaked')
        _gc_lock.release()
        gc.enable()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def test_markup_leaks(self):
        counts = set()
        for count in range(20):
            for item in range(1000):
                escape("foo")
                escape("<foo>")
                escape(u"foo")
                escape(u"<foo>")
            counts.add(len(gc.get_objects()))
        assert len(counts) == 1, 'ouch, c extension seems to leak objects'
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_issue_7959(self):
        proto = self.functype.__func__(None)

        class X(object):
            def func(self): pass
            def __init__(self):
                self.v = proto(self.func)

        import gc
        for i in range(32):
            X()
        gc.collect()
        live = [x for x in gc.get_objects()
                if isinstance(x, X)]
        self.assertEqual(len(live), 0)
项目:pyTSon_plugins    作者:Bluscream    | 项目源码 | 文件源码
def getobjects(name, cls=True):
    import gc
    objects = []
    for obj in gc.get_objects():
        if (isinstance(obj, QObject) and
            ((cls and obj.inherits(name)) or
             (not cls and obj.objectName() == name))):
            objects.append(obj)
    return objects
项目:pyTSon_plugins    作者:Bluscream    | 项目源码 | 文件源码
def objects():
    import gc;_ret = []
    for x in gc.get_objects(): _ret.extend(str(repr(x)))
    return _ret