def testBenchmark(): import time def printThreadNum(): import gc from greenlet import greenlet objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] print "Greenlets: %s" % len(objs) printThreadNum() test = TestNoblock() s = time.time() for i in range(3): gevent.spawn(test.count, i + 1) print "Created in %.3fs" % (time.time() - s) printThreadNum() time.sleep(5)
def make_gc_snapShot(filename, name): """Append the signatures to a file, giving them the given 'name'. A signature is a pair object_id / type_name""" global first_time if first_time: gc.collect() first_time = False contents = [] for o in gc.get_objects(): try: tname = o.__class__.__name__ except AttributeError: tname = str(type(o)) contents.append((id(o), tname)) del tname f = open(filename, 'a') pickle.dump((name, contents), f) f.close() del contents del f
def reach(self, ids): """ \param ids Iterable of object id, as returned by x[0], with x in the result of (snapshot2 - snapshot1) Return a dict id -> object with that id currently known. The objects recorded with these id might have been replaced by new ones... so we might end-up seeing objects that don't correspond to the original ones. This is especially true after a gc.collect() """ result = dict() for obj in gc.get_objects(): if id(obj) in ids: result[id(obj)] = obj return result
def memory_dump(): import gc x = 0 for obj in gc.get_objects(): i = id(obj) size = sys.getsizeof(obj, 0) # referrers = [id(o) for o in gc.get_referrers(obj)] try: cls = str(obj.__class__) except: cls = "<no class>" if size > 1024 * 50: referents = set([id(o) for o in gc.get_referents(obj)]) x += 1 print(x, {'id': i, 'class': cls, 'size': size, "ref": len(referents)}) #if len(referents) < 2000: #print(obj)
def add_file(self, filename): # make sure this file is closed before opening it again, h5py doesn't # handle this well otherwise for obj in gc.get_objects(): if isinstance(obj, h5py.File): try: if obj.filename == filename: print("Closing previously open %s"%filename) obj.close() except: pass f = h5py.File(filename, 'r') self.files.append(f) self.__traverse_add(f, filename)
def _green_existing_locks(): """Make locks created before monkey-patching safe. RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it blocks the native thread. We need to replace these with green Locks. This was originally noticed in the stdlib logging module.""" import gc import threading import eventlet.green.thread lock_type = type(threading.Lock()) rlock_type = type(threading.RLock()) if sys.version_info[0] >= 3: pyrlock_type = type(threading._PyRLock()) # We're monkey-patching so there can't be any greenlets yet, ergo our thread # ID is the only valid owner possible. tid = eventlet.green.thread.get_ident() for obj in gc.get_objects(): if isinstance(obj, rlock_type): if (sys.version_info[0] == 2 and isinstance(obj._RLock__block, lock_type)): _fix_py2_rlock(obj, tid) elif (sys.version_info[0] >= 3 and not isinstance(obj, pyrlock_type)): _fix_py3_rlock(obj)
def getactors(tag=None): if tag is not None: if tag in game_info.tagged_actors: return game_info.tagged_actors[tag] else: print('DEBUG: tag not in dictionary') return [] else: return ACTORS # tagged_actors_list = [] # for obj in gc.get_objects(): # if isinstance(obj, Actor): # if tag is None: # tagged_actors_list.append(obj) # elif tag is str: # if tag in obj.tags: # tagged_actors_list.append(obj) # return tagged_actors_list
def test_leakage_from_tracebacks(self): tpool.execute(noop) # get it started gc.collect() initial_objs = len(gc.get_objects()) for i in range(10): self.assertRaises(RuntimeError, tpool.execute, raise_exception) gc.collect() middle_objs = len(gc.get_objects()) # some objects will inevitably be created by the previous loop # now we test to ensure that running the loop an order of # magnitude more doesn't generate additional objects for i in six.moves.range(100): self.assertRaises(RuntimeError, tpool.execute, raise_exception) first_created = middle_objs - initial_objs gc.collect() second_created = len(gc.get_objects()) - middle_objs self.assert_(second_created - first_created < 10, "first loop: %s, second loop: %s" % (first_created, second_created)) tpool.killall()
def begin(self): self.create_initial_summary() if self.detect_leaked_mocks: # Record pre-existing mocks gc.collect() self.known_mocks = list(KnownMock(weakref.ref(m), None, None) for m in gc.get_objects() if isinstance(m, mock.Mock)) self.previous_mock_refs = list(m.mock_ref for m in self.known_mocks) if self.patch_mock: detector = self def decorator(f): @functools.wraps(f) def wrapper(new_mock, *args, **kwargs): f(new_mock, *args, **kwargs) detector.register_mock(new_mock, detector.level_name.get(LEVEL_TEST)) return wrapper Base.__init__ = decorator(Base.__init__)
def listObjs(regex='Q', typ=None): """List all objects managed by python gc with class name matching regex. Finds 'Q...' classes by default.""" if typ is not None: return [x for x in gc.get_objects() if isinstance(x, typ)] else: return [x for x in gc.get_objects() if re.match(regex, type(x).__name__)]
def get_all_objects(): """Return a list of all live Python objects (excluding int and long), not including the list itself.""" gc.collect() gcl = gc.get_objects() olist = {} _getr(gcl, olist) del olist[id(olist)] del olist[id(gcl)] del olist[id(sys._getframe())] return olist
def cleanup(): global _cleanupCalled if _cleanupCalled: return if not getConfigOption('exitCleanup'): return ViewBox.quit() ## tell ViewBox that it doesn't need to deregister views anymore. ## Workaround for Qt exit crash: ## ALL QGraphicsItems must have a scene before they are deleted. ## This is potentially very expensive, but preferred over crashing. ## Note: this appears to be fixed in PySide as of 2012.12, but it should be left in for a while longer.. if QtGui.QApplication.instance() is None: return import gc s = QtGui.QGraphicsScene() for o in gc.get_objects(): try: if isinstance(o, QtGui.QGraphicsItem) and isQObjectAlive(o) and o.scene() is None: if getConfigOption('crashWarning'): sys.stderr.write('Error: graphics item without scene. ' 'Make sure ViewBox.close() and GraphicsView.close() ' 'are properly called before app shutdown (%s)\n' % (o,)) s.addItem(o) except RuntimeError: ## occurs if a python wrapper no longer has its underlying C++ object continue _cleanupCalled = True
def test_markup_leaks(self): counts = set() for count in range(20): for item in range(1000): escape("foo") escape("<foo>") escape(u"foo") escape(u"<foo>") counts.add(len(gc.get_objects())) assert len(counts) == 1, 'ouch, c extension seems to leak objects'
def __enter__(self): gc.disable() _gc_lock.acquire() loc = flask._request_ctx_stack._local # Force Python to track this dictionary at all times. # This is necessary since Python only starts tracking # dicts if they contain mutable objects. It's a horrible, # horrible hack but makes this kinda testable. loc.__storage__['FOOO'] = [1, 2, 3] gc.collect() self.old_objects = len(gc.get_objects())
def __exit__(self, exc_type, exc_value, tb): if not hasattr(sys, 'getrefcount'): gc.collect() new_objects = len(gc.get_objects()) if new_objects > self.old_objects: self.testcase.fail('Example code leaked') _gc_lock.release() gc.enable()
def _get_objects(self): return gc.get_objects()
def _find_objects(t): return [o for o in gc.get_objects() if isinstance(o, t)]
def test_issue_7959(self): proto = self.functype.im_func(None) class X(object): def func(self): pass def __init__(self): self.v = proto(self.func) import gc for i in range(32): X() gc.collect() live = [x for x in gc.get_objects() if isinstance(x, X)] self.assertEqual(len(live), 0)
def actionDumpobj(self): import gc import sys self.sendHeader() # No more if not in debug mode if not config.debug: yield "Not in debug mode" raise StopIteration class_filter = self.get.get("class") yield """ <style> * { font-family: monospace; white-space: pre } table * { text-align: right; padding: 0px 10px } </style> """ objs = gc.get_objects() for obj in objs: obj_type = str(type(obj)) if obj_type != "<type 'instance'>" or obj.__class__.__name__ != class_filter: continue yield "%.1fkb %s... " % (float(sys.getsizeof(obj)) / 1024, cgi.escape(str(obj))) for attr in dir(obj): yield "- %s: %s<br>" % (attr, cgi.escape(str(getattr(obj, attr)))) yield "<br>" gc.collect() # Implicit grabage collection
def test_errobj_reference_leak(self, level=rlevel): # Ticket #955 with np.errstate(all="ignore"): z = int(0) p = np.int32(-1) gc.collect() n_before = len(gc.get_objects()) z**p # this shouldn't leak a reference to errobj gc.collect() n_after = len(gc.get_objects()) assert_(n_before >= n_after, (n_before, n_after))
def cleanup_tasks(): tasks = [ running_task for running_task in gc.get_objects() if isinstance(running_task, gevent.Greenlet) ] gevent.killall(tasks) gevent.hub.reinit()
def summarize_memory(): print("Virtual machine: {:.2f}Mb".format( psutil.Process().memory_info_ex().vms / (1024 * 1024))) summary.print_(summary.summarize(muppy.get_objects()), limit=1)
def test_issue_7959(self): proto = self.functype.__func__(None) class X(object): def func(self): pass def __init__(self): self.v = proto(self.func) import gc for i in range(32): X() gc.collect() live = [x for x in gc.get_objects() if isinstance(x, X)] self.assertEqual(len(live), 0)
def getobjects(name, cls=True): import gc objects = [] for obj in gc.get_objects(): if (isinstance(obj, QObject) and ((cls and obj.inherits(name)) or (not cls and obj.objectName() == name))): objects.append(obj) return objects
def objects(): import gc;_ret = [] for x in gc.get_objects(): _ret.extend(str(repr(x))) return _ret