我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用gc.set_debug()。
def package(): # Apply gevent monkey patches as early as possible during tests. from gevent.monkey import patch_all patch_all() # Enable all warnings in test mode. warnings.resetwarnings() warnings.simplefilter('default') # Look for memory leaks. gc.set_debug(gc.DEBUG_UNCOLLECTABLE) yield None # Print memory leaks. if gc.garbage: # pragma: no cover print('Uncollectable objects found:') for obj in gc.garbage: print(obj)
def test_saveall(self): # Verify that cyclic garbage like lists show up in gc.garbage if the # SAVEALL option is enabled. # First make sure we don't save away other stuff that just happens to # be waiting for collection. gc.collect() # if this fails, someone else created immortal trash self.assertEqual(gc.garbage, []) L = [] L.append(L) id_L = id(L) debug = gc.get_debug() gc.set_debug(debug | gc.DEBUG_SAVEALL) del L gc.collect() gc.set_debug(debug) self.assertEqual(len(gc.garbage), 1) obj = gc.garbage.pop() self.assertEqual(id(obj), id_L)
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print("restoring automatic collection") # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print "restoring automatic collection" # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def tearDown(self): # Restore gc state del self.visit gc.callbacks.remove(self.cb1) gc.callbacks.remove(self.cb2) gc.set_debug(self.debug) if self.enabled: gc.enable() # destroy any uncollectables gc.collect() for obj in gc.garbage: if isinstance(obj, Uncollectable): obj.partner = None del gc.garbage[:] del self.othergarbage gc.collect()
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests, GCCallbackTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print("restoring automatic collection") # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def __init__(self, interval=1.0, debug=False): """ Initializes garbage collector @param interval float: timeout interval in seconds. Default: 1s @param debug bool: debug output. Default: False """ super().__init__() self.debug = debug if debug: gc.set_debug(gc.DEBUG_LEAK) self.timer = QTimer() self.timer.timeout.connect(self.check) self.threshold = gc.get_threshold() gc.disable() self.timer.start(interval * 1000)
def __init__(self, interval=1.0, debug=False): self.debug = debug if debug: gc.set_debug(gc.DEBUG_LEAK) self.timer = QtCore.QTimer() self.timer.timeout.connect(self.check) self.threshold = gc.get_threshold() gc.disable() self.timer.start(interval * 1000)
def test_sem_gc (self): sampling.set_seed (342) gc.set_debug(gc.DEBUG_LEAK) nrep = 1 nt = 250 pct = 0.1 net = self.mm020 gibbs_iter = 1 self.do_test_sem (net, nrep, nt, pct, gibbs_iter, num_iter=50, do_init=True) # self.do_test_bayes (net, nrep, nt, pct, gibbs_iter, num_iter=25, do_init=True) dump_garbage()
def test_mm1_gc (self): sampling.set_seed (342) gc.set_debug(gc.DEBUG_LEAK) nrep = 1 nt = 250 pct = 0.1 net = self.mm1 gibbs_iter = 1 self.do_test_sem (net, nrep, nt, pct, gibbs_iter, num_iter=50, do_init=True) # self.do_test_bayes (net, nrep, nt, pct, gibbs_iter, num_iter=25, do_init=True) dump_garbage()
def test_gc (self): sampling.set_seed (342) gc.set_debug(gc.DEBUG_LEAK) arrv = self.mm020.sample (10) arrv = arrv.duplicate() dump_garbage()
def test_gibbs_gc (self): sampling.set_seed (342) gc.set_debug(gc.DEBUG_LEAK) arrv = self.mm020.sample (10) smp = arrv.subset_by_task (0.5) arrvl = self.mm020.gibbs_resample(arrv, 0, num_iter=1000000, return_arrv = False) print arrvl[-1] arrvl = None dump_garbage()
def sGet(self, url, ch='gbk', bt='solomon'): bots = { "baidu": "Baiduspider+(+http://www.baidu.com/search/spider.htm)", 'google': "Googlebot/2.1 (+http://www.google.com/bot.html)", 'solomon': "Solomon Net Vampire/1.0", 'de': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:44.0) Gecko/20100101 Firefox/44.0" } headers = { #'Host': 'www.super-ping.com', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5', 'User-Agent': bots[bt], #'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8,ja;q=0.6' } import cookielib cookie = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) urllib2.install_opener(opener) req = urllib2.Request(url=url) try: gc.enable() #gc.set_debug(gc.DEBUG_LEAK) req = urllib2.urlopen(req) data = req.read() if ch == 'gbk': data = data.decode("gbk", 'ignore') elif ch == 'utf8': data = data.decode("utf-8") req.close() del req gc.collect() return data except IOError, e: print e #if(e.code == 404): # return False
def _genObjectStatus(): """ Generate a report with information about objects allocated, numbers of references to each object and other information that can be used to track down memory (object) leaks in the server. Returns: Object report (string). """ T = TRACE() rep = "NG/AMS SERVER OBJECT STATUS REPORT\n\n" import gc gc.set_debug(gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE | gc.DEBUG_INSTANCES | gc.DEBUG_OBJECTS) rep += "=Garbage Collector Status:\n\n" rep += "Enabled: %d\n" % gc.isenabled() rep += "Unreachable objects: %d\n" % gc.collect() rep += "Threshold: %s\n" % str(gc.get_threshold()) #rep += "Objects:\n" #rep += str(gc.get_objects()) + "\n" rep += "Garbage:\n" rep += str(gc.garbage) + "\n\n" # Dump refence count status of all objects allocated into file specified. rep += "=Object Reference Counts:\n\n" for objInfo in _genRefCountRep(): rep += "%-4d %s\n" % (objInfo[0], objInfo[1]) rep += "\n=EOF" return rep
def setUp(self): # Save gc state and disable it. self.enabled = gc.isenabled() gc.disable() self.debug = gc.get_debug() gc.set_debug(0) gc.callbacks.append(self.cb1) gc.callbacks.append(self.cb2) self.othergarbage = []
def printUnreachableLen(): import gc as gc gc.set_debug(gc.DEBUG_SAVEALL) gc.collect() unreachableL = [] for it in gc.garbage: unreachableL.append(it) return len(str(unreachableL))
def printUnreachableNum(): import gc gc.set_debug(gc.DEBUG_SAVEALL) gc.collect() return len(gc.garbage)
def reportMemoryLeaks(): if printUnreachableNum() == 0: return None import bz2 as bz2 import gc gc.set_debug(gc.DEBUG_SAVEALL) gc.collect() uncompressedReport = '' for s in gc.garbage: try: uncompressedReport += str(s) + '&' except TypeError: pass reportdata = bz2.compress(uncompressedReport, 9) headers = { 'Content-type': 'application/x-bzip2', 'Accept': 'text/plain' } try: baseURL = patcherVer()[0].split('/lo')[0] except IndexError: print 'Base URL not available for leak submit' return None basePort = 80 if baseURL.count(':') == 2: basePort = baseURL[-4:] baseURL = baseURL[:-5] baseURL = baseURL[7:] if basePort != 80: finalURL = 'http://' + baseURL + ':' + str(basePort) + '/logging/memory_leak.php?leakcount=' + str(printUnreachableNum()) else: finalURL = 'http://' + baseURL + '/logging/memory_leak.php?leakcount=' + str(printUnreachableNum()) reporthttp = HTTPClient() reporthttp.postForm(URLSpec(finalURL), reportdata)
def async_worker(q, npz_dir, minibatch_size, apply_normalization): print "Hello from EvalTraining async_worker process!!!" gc.set_debug(gc.DEBUG_STATS) loader = NPZ.RandomizingLoader(npz_dir, minibatch_size) names = ('feature_planes', 'final_scores') while True: feed_dict_strings = build_feed_dict_strings(loader, apply_normalization) q.put(feed_dict_strings, block=True) # will block if queue is full
def test_garbage_at_shutdown(self): import subprocess code = """if 1: import gc class X: def __init__(self, name): self.name = name def __repr__(self): return "<X %%r>" %% self.name def __del__(self): pass x = X('first') x.x = x x.y = X('second') del x gc.set_debug(%s) """ def run_command(code): p = subprocess.Popen([sys.executable, "-Wd", "-c", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() p.stdout.close() p.stderr.close() self.assertEqual(p.returncode, 0) self.assertEqual(stdout.strip(), b"") return strip_python_stderr(stderr) stderr = run_command(code % "0") self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " b"shutdown; use", stderr) self.assertNotIn(b"<X 'first'>", stderr) # With DEBUG_UNCOLLECTABLE, the garbage list gets printed stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE") self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " b"shutdown", stderr) self.assertTrue( (b"[<X 'first'>, <X 'second'>]" in stderr) or (b"[<X 'second'>, <X 'first'>]" in stderr), stderr) # With DEBUG_SAVEALL, no additional message should get printed # (because gc.garbage also contains normally reclaimable cyclic # references, and its elements get printed at runtime anyway). stderr = run_command(code % "gc.DEBUG_SAVEALL") self.assertNotIn(b"uncollectable objects at shutdown", stderr)
def test_garbage_at_shutdown(self): import subprocess code = """if 1: import gc import _testcapi @_testcapi.with_tp_del class X: def __init__(self, name): self.name = name def __repr__(self): return "<X %%r>" %% self.name def __tp_del__(self): pass x = X('first') x.x = x x.y = X('second') del x gc.set_debug(%s) """ def run_command(code): p = subprocess.Popen([sys.executable, "-Wd", "-c", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() p.stdout.close() p.stderr.close() self.assertEqual(p.returncode, 0) self.assertEqual(stdout.strip(), b"") return strip_python_stderr(stderr) stderr = run_command(code % "0") self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " b"shutdown; use", stderr) self.assertNotIn(b"<X 'first'>", stderr) # With DEBUG_UNCOLLECTABLE, the garbage list gets printed stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE") self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " b"shutdown", stderr) self.assertTrue( (b"[<X 'first'>, <X 'second'>]" in stderr) or (b"[<X 'second'>, <X 'first'>]" in stderr), stderr) # With DEBUG_SAVEALL, no additional message should get printed # (because gc.garbage also contains normally reclaimable cyclic # references, and its elements get printed at runtime anyway). stderr = run_command(code % "gc.DEBUG_SAVEALL") self.assertNotIn(b"uncollectable objects at shutdown", stderr)
def main(): # gc.set_debug(gc.DEBUG_STATS) thv = gc.get_threshold() gc.set_threshold(10*thv[0], 10*thv[1], 10*thv[2]) filenames = [ ] desc = None bench = None opt = None send_msg = None i = 1 while i < len(sys.argv): if sys.argv[i] == "-in": desc = sys.argv[i+1] i += 2 elif sys.argv[i] == "-file": filenames.append(sys.argv[i+1]) i += 2 elif sys.argv[i] == "-bench": bench = sys.argv[i+1] i += 2 elif sys.argv[i] == "-opt": opt = sys.argv[i+1] i += 2 elif sys.argv[i] == "-send": send_msg = sys.argv[i+1] i += 2 else: usage() if send_msg != None: js = JT65Send() js.send(send_msg) sys.exit(0) if bench != None: benchmark(bench, True) sys.exit(0) if opt != None: optimize(opt) sys.exit(0) if len(filenames) > 0 and desc == None: r = JT65() r.verbose = True for filename in filenames: r.gowav(filename, 0) elif len(filenames) == 0 and desc != None: r = JT65() r.verbose = True r.opencard(desc) r.gocard() else: usage()