我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用gc.get_threshold()。
def test_free_from_gc(self): # Check that freeing of blocks by the garbage collector doesn't deadlock # (issue #12352). # Make sure the GC is enabled, and set lower collection thresholds to # make collections more frequent (and increase the probability of # deadlock). if not gc.isenabled(): gc.enable() self.addCleanup(gc.disable) thresholds = gc.get_threshold() self.addCleanup(gc.set_threshold, *thresholds) gc.set_threshold(10) # perform numerous block allocations, with cyclic references to make # sure objects are collected asynchronously by the gc for i in range(5000): a = multiprocessing.heap.BufferWrapper(1) b = multiprocessing.heap.BufferWrapper(1) # circular references a.buddy = b b.buddy = a # # #
def check_gc_during_creation(self, makeref): thresholds = gc.get_threshold() gc.set_threshold(1, 1, 1) gc.collect() class A: pass def callback(*args): pass referenced = A() a = A() a.a = a a.wr = makeref(referenced) try: # now make sure the object and the ref get labeled as # cyclic trash: a = A() weakref.ref(referenced, callback) finally: gc.set_threshold(*thresholds)
def test_del_newclass(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A(object): def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds) # The following two tests are fragile: # They precisely count the number of allocations, # which is highly implementation-dependent. # For example, disposed tuples are not freed, but reused. # To minimize variations, though, we first store the get_count() results # and check them at the end.
def check_len_race(self, dict_type, cons): # Extended sanity checks for len() in the face of cyclic collection self.addCleanup(gc.set_threshold, *gc.get_threshold()) for th in range(1, 100): N = 20 gc.collect(0) gc.set_threshold(th, th, th) items = [RefCycle() for i in range(N)] dct = dict_type(cons(o) for o in items) del items # All items will be collected at next garbage collection pass it = dct.iteritems() try: next(it) except StopIteration: pass n1 = len(dct) del it n2 = len(dct) self.assertGreaterEqual(n1, 0) self.assertLessEqual(n1, N) self.assertGreaterEqual(n2, 0) self.assertLessEqual(n2, n1)
def test_del_newclass(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A(object): def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds) # The following two tests are fragile: # They precisely count the number of allocations, # which is highly implementation-dependent. # For example: # - disposed tuples are not freed, but reused # - the call to assertEqual somehow avoids building its args tuple
def check_len_race(self, dict_type, cons): # Extended sanity checks for len() in the face of cyclic collection self.addCleanup(gc.set_threshold, *gc.get_threshold()) for th in range(1, 100): N = 20 gc.collect(0) gc.set_threshold(th, th, th) items = [RefCycle() for i in range(N)] dct = dict_type(cons(o) for o in items) del items # All items will be collected at next garbage collection pass it = dct.items() try: next(it) except StopIteration: pass n1 = len(dct) del it n2 = len(dct) self.assertGreaterEqual(n1, 0) self.assertLessEqual(n1, N) self.assertGreaterEqual(n2, 0) self.assertLessEqual(n2, n1)
def __init__(self, interval=1.0, debug=False): """ Initializes garbage collector @param interval float: timeout interval in seconds. Default: 1s @param debug bool: debug output. Default: False """ super().__init__() self.debug = debug if debug: gc.set_debug(gc.DEBUG_LEAK) self.timer = QTimer() self.timer.timeout.connect(self.check) self.threshold = gc.get_threshold() gc.disable() self.timer.start(interval * 1000)
def test_free_from_gc(self): # Check that freeing of blocks by the garbage collector doesn't deadlock # (issue #12352). # Make sure the GC is enabled, and set lower collection thresholds to # make collections more frequent (and increase the probability of # deadlock). if not gc.isenabled(): gc.enable() self.addCleanup(gc.disable) #thresholds = gc.get_threshold() #self.addCleanup(gc.set_threshold, *thresholds) #gc.set_threshold(10) # perform numerous block allocations, with cyclic references to make # sure objects are collected asynchronously by the gc for i in range(5000): a = multiprocessing.heap.BufferWrapper(1) b = multiprocessing.heap.BufferWrapper(1) # circular references a.buddy = b b.buddy = a # # #
def check_len_race(self, dict_type, cons): # Extended sanity checks for len() in the face of cyclic collection #self.addCleanup(gc.set_threshold, *gc.get_threshold()) for th in range(1, 100): N = 20 test_support.gc_collect() #gc.set_threshold(th, th, th) items = [RefCycle() for i in range(N)] dct = dict_type(cons(o) for o in items) del items # All items will be collected at next garbage collection pass it = dct.iteritems() try: next(it) except StopIteration: pass n1 = len(dct) del it n2 = len(dct) self.assertGreaterEqual(n1, 0) self.assertLessEqual(n1, N) self.assertGreaterEqual(n2, 0) self.assertLessEqual(n2, n1)
def __init__(self, interval=1.0, debug=False): self.debug = debug if debug: gc.set_debug(gc.DEBUG_LEAK) self.timer = QtCore.QTimer() self.timer.timeout.connect(self.check) self.threshold = gc.get_threshold() gc.disable() self.timer.start(interval * 1000)
def test_del(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A: def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds)
def _genObjectStatus(): """ Generate a report with information about objects allocated, numbers of references to each object and other information that can be used to track down memory (object) leaks in the server. Returns: Object report (string). """ T = TRACE() rep = "NG/AMS SERVER OBJECT STATUS REPORT\n\n" import gc gc.set_debug(gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE | gc.DEBUG_INSTANCES | gc.DEBUG_OBJECTS) rep += "=Garbage Collector Status:\n\n" rep += "Enabled: %d\n" % gc.isenabled() rep += "Unreachable objects: %d\n" % gc.collect() rep += "Threshold: %s\n" % str(gc.get_threshold()) #rep += "Objects:\n" #rep += str(gc.get_objects()) + "\n" rep += "Garbage:\n" rep += str(gc.garbage) + "\n\n" # Dump refence count status of all objects allocated into file specified. rep += "=Object Reference Counts:\n\n" for objInfo in _genRefCountRep(): rep += "%-4d %s\n" % (objInfo[0], objInfo[1]) rep += "\n=EOF" return rep
def check_gc_during_creation(self, makeref): if test_support.check_impl_detail(): import gc thresholds = gc.get_threshold() gc.set_threshold(1, 1, 1) gc_collect() class A: pass def callback(*args): pass referenced = A() a = A() a.a = a a.wr = makeref(referenced) try: # now make sure the object and the ref get labeled as # cyclic trash: a = A() weakref.ref(referenced, callback) finally: if test_support.check_impl_detail(): gc.set_threshold(*thresholds)
def test_implicit_parent_with_threads(self): if not gc.isenabled(): return # cannot test with disabled gc N = gc.get_threshold()[0] if N < 50: return # cannot test with such a small N def attempt(): lock1 = threading.Lock() lock1.acquire() lock2 = threading.Lock() lock2.acquire() recycled = [False] def another_thread(): lock1.acquire() # wait for gc greenlet.getcurrent() # update ts_current lock2.release() # release gc t = threading.Thread(target=another_thread) t.start() class gc_callback(object): def __del__(self): lock1.release() lock2.acquire() recycled[0] = True class garbage(object): def __init__(self): self.cycle = self self.callback = gc_callback() l = [] x = range(N*2) current = greenlet.getcurrent() g = garbage() for i in x: g = None # lose reference to garbage if recycled[0]: # gc callback called prematurely t.join() return False last = greenlet() if recycled[0]: break # yes! gc called in green_new l.append(last) # increase allocation counter else: # gc callback not called when expected gc.collect() if recycled[0]: t.join() return False self.assertEqual(last.parent, current) for g in l: self.assertEqual(g.parent, current) return True for i in range(5): if attempt(): break
def collect_metrics(self): u = resource.getrusage(resource.RUSAGE_SELF) if gc_.isenabled(): c = list(gc_.get_count()) th = list(gc_.get_threshold()) g = GC(collect0=c[0] if not self.last_collect else c[0] - self.last_collect[0], collect1=c[1] if not self.last_collect else c[ 1] - self.last_collect[1], collect2=c[2] if not self.last_collect else c[ 2] - self.last_collect[2], threshold0=th[0], threshold1=th[1], threshold2=th[2]) thr = t.enumerate() daemon_threads = len([tr.daemon and tr.is_alive() for tr in thr]) alive_threads = len([not tr.daemon and tr.is_alive() for tr in thr]) dead_threads = len([not tr.is_alive() for tr in thr]) m = Metrics(ru_utime=u[0] if not self.last_usage else u[0] - self.last_usage[0], ru_stime=u[1] if not self.last_usage else u[1] - self.last_usage[1], ru_maxrss=u[2], ru_ixrss=u[3], ru_idrss=u[4], ru_isrss=u[5], ru_minflt=u[6] if not self.last_usage else u[6] - self.last_usage[6], ru_majflt=u[7] if not self.last_usage else u[7] - self.last_usage[7], ru_nswap=u[8] if not self.last_usage else u[8] - self.last_usage[8], ru_inblock=u[9] if not self.last_usage else u[9] - self.last_usage[9], ru_oublock=u[10] if not self.last_usage else u[10] - self.last_usage[10], ru_msgsnd=u[11] if not self.last_usage else u[11] - self.last_usage[11], ru_msgrcv=u[12] if not self.last_usage else u[12] - self.last_usage[12], ru_nsignals=u[13] if not self.last_usage else u[13] - self.last_usage[13], ru_nvcs=u[14] if not self.last_usage else u[14] - self.last_usage[14], ru_nivcsw=u[15] if not self.last_usage else u[15] - self.last_usage[15], alive_threads=alive_threads, dead_threads=dead_threads, daemon_threads=daemon_threads, gc=g) self.last_usage = u if gc_.isenabled(): self.last_collect = c return m
def main(): # gc.set_debug(gc.DEBUG_STATS) thv = gc.get_threshold() gc.set_threshold(10*thv[0], 10*thv[1], 10*thv[2]) filenames = [ ] desc = None bench = None opt = None send_msg = None i = 1 while i < len(sys.argv): if sys.argv[i] == "-in": desc = sys.argv[i+1] i += 2 elif sys.argv[i] == "-file": filenames.append(sys.argv[i+1]) i += 2 elif sys.argv[i] == "-bench": bench = sys.argv[i+1] i += 2 elif sys.argv[i] == "-opt": opt = sys.argv[i+1] i += 2 elif sys.argv[i] == "-send": send_msg = sys.argv[i+1] i += 2 else: usage() if send_msg != None: js = JT65Send() js.send(send_msg) sys.exit(0) if bench != None: benchmark(bench, True) sys.exit(0) if opt != None: optimize(opt) sys.exit(0) if len(filenames) > 0 and desc == None: r = JT65() r.verbose = True for filename in filenames: r.gowav(filename, 0) elif len(filenames) == 0 and desc != None: r = JT65() r.verbose = True r.opencard(desc) r.gocard() else: usage()