我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.collect()。
def mem_check(opts): while True: if opts['gc']: try: gc.collect() except Exception as e: logging.exception(repr(e) + ' while gc.collect()') try: rss = psutil.Process(os.getpid()).memory_info().rss logging.info('current memory used: {rss}'.format(rss=rss)) if rss > opts['threshold']: memory_dump(opts) os.abort() except Exception as e: logging.exception(repr(e) + ' while checking memory usage') finally: time.sleep(opts['interval'])
def test_cleanup(self): gc.enable() gc.collect() assert not gc.garbage, "Object leak: %s" % str(gc.garbage) container = pyngus.Container("abc") c1 = container.create_connection("c1") c2 = container.create_connection("c2") assert c2 del c2 gc.collect() c2 = container.get_connection("c2") assert c2 c1 = container.get_connection("c1") assert c1 c1.create_receiver("r1") c1.create_sender("s1") del c1 del c2 container.destroy() del container gc.collect() assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
def test_getViewWidget_deleted(): view = pg.PlotWidget() item = pg.InfiniteLine() view.addItem(item) assert item.getViewWidget() is view # Arrange to have Qt automatically delete the view widget obj = pg.QtGui.QWidget() view.setParent(obj) del obj gc.collect() assert not pg.Qt.isQObjectAlive(view) assert item.getViewWidget() is None #if __name__ == '__main__': #view = pg.PlotItem() #vref = weakref.ref(view) #item = pg.InfiniteLine() #view.addItem(item) #del view #gc.collect()
def check(self): #return self.debug_cycles() # uncomment to just debug cycles l0, l1, l2 = gc.get_count() if self.debug: print('gc_check called:', l0, l1, l2) if l0 > self.threshold[0]: num = gc.collect(0) if self.debug: print('collecting gen 0, found: %d unreachable' % num) if l1 > self.threshold[1]: num = gc.collect(1) if self.debug: print('collecting gen 1, found: %d unreachable' % num) if l2 > self.threshold[2]: num = gc.collect(2) if self.debug: print('collecting gen 2, found: %d unreachable' % num)
def _getr(slist, olist, first=True): i = 0 for e in slist: oid = id(e) typ = type(e) if oid in olist or typ is int: ## or e in olist: ## since we're excluding all ints, there is no longer a need to check for olist keys continue olist[oid] = e if first and (i%1000) == 0: gc.collect() tl = gc.get_referents(e) if tl: _getr(tl, olist, first=False) i += 1 # The public function.
def start(self): """ Remember the current set of objects as the comparison for all future calls to diff() Called automatically on init, but can be called manually as well. """ refs, count, objs = self.collect() for r in self.startRefs: self.forgetRef(self.startRefs[r]) self.startRefs.clear() self.startRefs.update(refs) for r in refs: self.rememberRef(r) self.startCount.clear() self.startCount.update(count) #self.newRefs.clear() #self.newRefs.update(refs)
def test_lots_of_queries(self): import resource import objgraph class LoadTest(Model): k = columns.Integer(primary_key=True) v = columns.Integer() sync_table(LoadTest) gc.collect() objgraph.show_most_common_types() print("Starting...") for i in range(1000000): if i % 25000 == 0: # print memory statistic print("Memory usage: %s" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)) LoadTest.create(k=i, v=i) objgraph.show_most_common_types() raise Exception("you shouldn't be here")
def setUp(self): # gc.collect() self.zeroVec = Vector2() self.e1 = Vector2(1, 0) self.e2 = Vector2(0, 1) # self.t1 = (random(), random()) self.t1 = (1.2, 3.4) self.l1 = list(self.t1) self.v1 = Vector2(self.t1) # self.t2 = (random(), random()) self.t2 = (5.6, 7.8) self.l2 = list(self.t2) self.v2 = Vector2(self.t2) # self.s1 = random() # self.s2 = random() self.s1 = 5.6 self.s2 = 7.8
def setUp(self): # gc.collect() self.zeroVec = Vector3() self.e1 = Vector3(1, 0, 0) self.e2 = Vector3(0, 1, 0) self.e3 = Vector3(0, 0, 1) # self.t1 = (random(), random()) self.t1 = (1.2, 3.4, 9.6) self.l1 = list(self.t1) self.v1 = Vector3(self.t1) # self.t2 = (random(), random()) self.t2 = (5.6, 7.8, 2.1) self.l2 = list(self.t2) self.v2 = Vector3(self.t2) # self.s1 = random() # self.s2 = random() self.s1 = 5.6 self.s2 = 7.8
def make_gc_snapShot(filename, name): """Append the signatures to a file, giving them the given 'name'. A signature is a pair object_id / type_name""" global first_time if first_time: gc.collect() first_time = False contents = [] for o in gc.get_objects(): try: tname = o.__class__.__name__ except AttributeError: tname = str(type(o)) contents.append((id(o), tname)) del tname f = open(filename, 'a') pickle.dump((name, contents), f) f.close() del contents del f
def reach(self, ids): """ \param ids Iterable of object id, as returned by x[0], with x in the result of (snapshot2 - snapshot1) Return a dict id -> object with that id currently known. The objects recorded with these id might have been replaced by new ones... so we might end-up seeing objects that don't correspond to the original ones. This is especially true after a gc.collect() """ result = dict() for obj in gc.get_objects(): if id(obj) in ids: result[id(obj)] = obj return result
def create_agents(self, generator): """ Given information on a set of countries and a generator function, generate the agents and assign the results to ``self.agents``. :type generator: DataFrame, str, int :param generator: A function which generates the agents. """ self.generator = generator country_array = pd.concat([pd.Series([c] * k["Population"]) for c, k in self.df.iterrows()]) country_array.index = range(len(country_array)) # Garbage collect before creating new processes. gc.collect() self.agents = pd.concat( self.pool.imap(self._gen_agents, np.array_split(country_array, self.processes * self.splits)) ) self.agents.index = range(len(self.agents))
def timer(func, repetitions=100000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper #------------------------------------------------------------------------------ # [ timer_X function decorators ] # replicate the above decorator with different number of repetitions #------------------------------------------------------------------------------
def timer_10(func, repetitions=10): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_100(func, repetitions=100): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_1k(func, repetitions=1000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_10k(func, repetitions=10000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def make_chunk(filename, arr, arr_len): remain_arr = [] dump_arr = [] prob = float(chunk_num_frms) / arr_len for e in arr: flag_choose = np.random.binomial(1, prob) if flag_choose == 1: dump_arr.append(e) arr_len -= 1 else: remain_arr.append(e) save_hd5(filename, dump_arr) del dump_arr[:] del dump_arr gc.collect() return remain_arr, arr_len
def test_it(collectible): dd() dd('======= ', ('collectible' if collectible else 'uncollectible'), ' object =======') dd() gc.collect() dd('*** init, nr of referrers: ', len(gc.get_referrers(One))) dd(' garbage: ', gc.garbage) one = One(collectible) dd(' created: ', one.typ, ': ', one) dd(' nr of referrers: ', len(gc.get_referrers(One))) dd(' delete:') del one gc.collect() dd('*** after gc, nr of referrers: ', len(gc.get_referrers(One))) dd(' garbage: ', gc.garbage)
def flushUnder(dirpath): """Flushes all modules that live under the given directory :param dirpath: the name of the top most directory to search under. :type dirpath: str """ modulePaths = list() for name, module in sys.modules.items(): if module is None: del sys.modules[name] continue try: moduleDirpath = os.path.realpath(os.path.dirname(inspect.getfile(module))) if moduleDirpath.startswith(dirpath): modulePaths.append((name, inspect.getfile(sys.modules[name]))) del sys.modules[name] logger.debug('unloaded module: %s ' % name) except TypeError: continue # Force a garbage collection gc.collect() return modulePaths
def test_diagnostics_life(self): import gc from weakref import ref def tmp(): cur = self.conn.cursor() try: cur.execute("select * from nonexist") except psycopg2.Error as exc: return cur, exc cur, e = tmp() diag = e.diag w = ref(cur) del e, cur gc.collect() assert(w() is not None) self.assertEqual(diag.sqlstate, '42P01') del diag gc.collect() gc.collect() assert(w() is None)
def test_diagnostics_life(self): import gc from weakref import ref def tmp(): cur = self.conn.cursor() try: cur.execute("select * from nonexist") except psycopg2.Error, exc: return cur, exc cur, e = tmp() diag = e.diag w = ref(cur) del e, cur gc.collect() assert(w() is not None) self.assertEqual(diag.sqlstate, '42P01') del diag gc.collect() gc.collect() assert(w() is None)
def runAnalysis(bam, fasta, blastresults, taxdump, modelOutput, output, tokeep, toremove, binary, target, level): taxdump, taxidDict = common.parseTaxdump(taxdump, False) gc.collect() click.echo("Taxdump parsed, %d taxIDs loaded" % len(taxdump)) contigs = readFasta(fasta) gc.collect() click.echo("FASTA loaded, %d contigs returned" % len(contigs)) contigs = readBAM(bam, contigs) gc.collect() click.echo("BAM loaded") contigs, classMap, classList = readBLAST(blastresults, taxdump, level.lower(), contigs) gc.collect() click.echo("BLAST results loaded") corpus, testdata, features = common.constructCorpus(list(contigs.values()), classMap, binary, target) gc.collect() click.echo("Corpus constucted, %d contigs in corpus and %d contigs in test data" % (len(corpus), len(testdata))) classifier = common.constructModel(corpus, classList, features, modelOutput) result = common.classifyData(classifier, testdata, classMap) common.generateOutput(tokeep, toremove, result, contigs.values(), target, output)
def test_gc(self): """test close&term by garbage collection alone""" if PYPY: raise SkipTest("GC doesn't work ") # test credit @dln (GH #137): def gcf(): def inner(): ctx = self.Context() s = ctx.socket(zmq.PUSH) inner() gc.collect() t = Thread(target=gcf) t.start() t.join(timeout=1) self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
def _cleanUp(self, result): try: if self.forceGarbageCollection: gc.collect() util._Janitor().postCaseCleanup() except util.FailureError, e: result.addError(self, e.original) self._passed = False except: result.cleanupErrors(failure.Failure()) self._passed = False for error in self._observer.getErrors(): result.addError(self, error) self._passed = False self.flushLoggedErrors() self._removeObserver() if self._passed: result.addSuccess(self)
def pytest_runtest_item(self, item): lines1 = self.get_open_files() yield if hasattr(sys, "pypy_version_info"): gc.collect() lines2 = self.get_open_files() new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1]) leaked_files = [t for t in lines2 if t[0] in new_fds] if leaked_files: error = [] error.append("***** %s FD leakage detected" % len(leaked_files)) error.extend([str(f) for f in leaked_files]) error.append("*** Before:") error.extend([str(f) for f in lines1]) error.append("*** After:") error.extend([str(f) for f in lines2]) error.append(error[0]) error.append("*** function %s:%s: %s " % item.location) pytest.fail("\n".join(error), pytrace=False) # XXX copied from execnet's conftest.py - needs to be merged
def getmodulecol(self, source, configargs=(), withinit=False): """Return the module collection node for ``source``. This writes ``source`` to a file using :py:meth:`makepyfile` and then runs the pytest collection on it, returning the collection node for the test module. :param source: The source code of the module to collect. :param configargs: Any extra arguments to pass to :py:meth:`parseconfigure`. :param withinit: Whether to also write a ``__init__.py`` file to the temporarly directory to ensure it is a package. """ kw = {self.request.function.__name__: Source(source).strip()} path = self.makepyfile(**kw) if withinit: self.makepyfile(__init__ = "#") self.config = config = self.parseconfigure(path, *configargs) node = self.getnode(config, path) return node
def test_fom_buffer(self): a = array.array("i", range(16)) x = (c_int * 16).from_buffer(a) y = X.from_buffer(a) self.assertEqual(y.c_int, a[0]) self.assertFalse(y.init_called) self.assertEqual(x[:], a.tolist()) a[0], a[-1] = 200, -200 self.assertEqual(x[:], a.tolist()) self.assertIn(a, x._objects.values()) self.assertRaises(ValueError, c_int.from_buffer, a, -1) expected = x[:] del a; gc.collect(); gc.collect(); gc.collect() self.assertEqual(x[:], expected) self.assertRaises(TypeError, (c_char * 16).from_buffer, "a" * 16)
def test_from_buffer_copy(self): a = array.array("i", range(16)) x = (c_int * 16).from_buffer_copy(a) y = X.from_buffer_copy(a) self.assertEqual(y.c_int, a[0]) self.assertFalse(y.init_called) self.assertEqual(x[:], range(16)) a[0], a[-1] = 200, -200 self.assertEqual(x[:], range(16)) self.assertEqual(x._objects, None) self.assertRaises(ValueError, c_int.from_buffer, a, -1) del a; gc.collect(); gc.collect(); gc.collect() self.assertEqual(x[:], range(16)) x = (c_char * 16).from_buffer_copy("a" * 16) self.assertEqual(x[:], "a" * 16)
def test_1(self): from sys import getrefcount as grc f = dll._testfunc_callback_i_if f.restype = ctypes.c_int f.argtypes = [ctypes.c_int, MyCallback] def callback(value): #print "called back with", value return value self.assertEqual(grc(callback), 2) cb = MyCallback(callback) self.assertGreater(grc(callback), 2) result = f(-10, cb) self.assertEqual(result, -18) cb = None gc.collect() self.assertEqual(grc(callback), 2)
def showImage(self): #????????????????? main thread is not in main loop ????????main loop???????????????????????????UI??????????? try: image = GenImage(os.getcwd() + '/resource/%s/' % (self.type)) image.generateImage('position_for_image.csv','1.png','bar') image.generateImage('salary_for_image.csv','2.png','pie') except: self.networkError() PixMapSalary = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/1.png' % (self.type)).scaled(400,600) self.SalaryImage.setPixmap(PixMapSalary) PixMapPosition = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/2.png' % (self.type)).scaled(500,500) self.PositionImage.setPixmap(PixMapPosition) del image gc.collect() #?????? self.showStaff() #????????????????????????????webbrowser???
def test_main(verbose=None): test_classes = ( TestPartial, TestPartialSubclass, TestPythonPartial, TestUpdateWrapper, TestTotalOrdering, TestCmpToKey, TestWraps, TestReduce, TestLRU, TestOrderedDict, ) support.run_unittest(*test_classes) # verify reference counting if verbose and hasattr(sys, "gettotalrefcount"): import gc counts = [None] * 5 for i in range(len(counts)): support.run_unittest(*test_classes) gc.collect() counts[i] = sys.gettotalrefcount() print(counts)
def gc_collect(): """Force as many objects as possible to be collected. In non-CPython implementations of Python, this is needed because timely deallocation is not guaranteed by the garbage collector. (Even in CPython this can be the case in case of reference cycles.) This means that __del__ methods may be called later than expected and weakrefs may remain alive for longer than expected. This function tries its best to force all garbage objects to disappear. """ gc.collect() if is_jython: time.sleep(0.1) gc.collect() gc.collect() #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards.
def delete(self, *args, **kwargs): directory = self.directory # Just doing a plain delete will collect all related objects in memory # before deleting: translation projects, stores, units, quality checks, # suggestions, and submissions. # This can easily take down a process. If we do a translation project # at a time and force garbage collection, things stay much more # managable. import gc gc.collect() for tp in self.translationproject_set.iterator(): tp.delete() gc.collect() super(Project, self).delete(*args, **kwargs) directory.delete()
def test_threaded_leak(self): gg = [] def worker(): # only main greenlet present gg.append(weakref.ref(greenlet.getcurrent())) for i in range(2): t = threading.Thread(target=worker) t.start() t.join() del t greenlet.getcurrent() # update ts_current self.recycle_threads() greenlet.getcurrent() # update ts_current gc.collect() greenlet.getcurrent() # update ts_current for g in gg: self.assertTrue(g() is None)
def test_threaded_adv_leak(self): gg = [] def worker(): # main and additional *finished* greenlets ll = greenlet.getcurrent().ll = [] def additional(): ll.append(greenlet.getcurrent()) for i in range(2): greenlet.greenlet(additional).switch() gg.append(weakref.ref(greenlet.getcurrent())) for i in range(2): t = threading.Thread(target=worker) t.start() t.join() del t greenlet.getcurrent() # update ts_current self.recycle_threads() greenlet.getcurrent() # update ts_current gc.collect() greenlet.getcurrent() # update ts_current for g in gg: self.assertTrue(g() is None)
def __call__(self, result = None): # For the COM suite's sake, always ensure we don't leak # gateways/interfaces from pythoncom import _GetInterfaceCount, _GetGatewayCount gc.collect() ni = _GetInterfaceCount() ng = _GetGatewayCount() self.real_test(result) # Failed - no point checking anything else if result.shouldStop or not result.wasSuccessful(): return self._do_leak_tests(result) gc.collect() lost_i = _GetInterfaceCount() - ni lost_g = _GetGatewayCount() - ng if lost_i or lost_g: msg = "%d interface objects and %d gateway objects leaked" \ % (lost_i, lost_g) exc = AssertionError(msg) result.addFailure(self.real_test, (exc.__class__, exc, None))
def _do_leak_tests(self, result = None): try: gtrc = sys.gettotalrefcount except AttributeError: return # can't do leak tests in this build # Assume already called once, to prime any caches etc gc.collect() trc = gtrc() for i in range(self.num_leak_iters): self.real_test(result) if result.shouldStop: break del i # created after we remembered the refcount! # int division here means one or 2 stray references won't force # failure, but one per loop gc.collect() lost = (gtrc() - trc) // self.num_leak_iters if lost < 0: msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost) result.addFailure(self.real_test, (AssertionError, msg, None)) if lost > 0: msg = "LeakTest: %s lost %d references" % (self.real_test, lost) exc = AssertionError(msg) result.addFailure(self.real_test, (exc.__class__, exc, None))
def test_no_memory_leak(): import gc import os def rss(): gc.collect() out = os.popen("ps -o rss= -p %d" % os.getpid()).read() return int(out.strip()) before = rss() for _ in range(100000): n = Name.parse("Reallyverylongfirstname Reallyverylonglastname") n.given_name n.surname after = rss() assert after < 1.25 * before
def test_ffi_type_not_immortal(): import weakref, gc ffi = _cffi1_backend.FFI() t1 = ffi.typeof("int **") t2 = ffi.typeof("int *") w1 = weakref.ref(t1) w2 = weakref.ref(t2) del t1, ffi gc.collect() assert w1() is None assert w2() is t2 ffi = _cffi1_backend.FFI() assert ffi.typeof(ffi.new("int **")[0]) is t2 # ffi = _cffi1_backend.FFI() t1 = ffi.typeof("int ***") t2 = ffi.typeof("int **") w1 = weakref.ref(t1) w2 = weakref.ref(t2) del t2, ffi gc.collect() assert w1() is t1 assert w2() is not None # kept alive by t1 ffi = _cffi1_backend.FFI() assert ffi.typeof("int * *") is t1.item