我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.enable()。
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ if itertools: it = itertools.repeat(None, number) else: it = [None] * number gcold = gc.isenabled() gc.disable() timing = self.inner(it, self.timer) if gcold: gc.enable() return timing
def test_cleanup(self): gc.enable() gc.collect() assert not gc.garbage, "Object leak: %s" % str(gc.garbage) container = pyngus.Container("abc") c1 = container.create_connection("c1") c2 = container.create_connection("c2") assert c2 del c2 gc.collect() c2 = container.get_connection("c2") assert c2 c1 = container.get_connection("c1") assert c1 c1.create_receiver("r1") c1.create_sender("s1") del c1 del c2 container.destroy() del container gc.collect() assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
def test_init(): # Delete existing DB to init from a clean state if os.path.exists(MOCK_DB_FILE): os.remove(MOCK_DB_FILE) db = SqliteStore(MockConfig()) assert db # Only the Global CIDR should appear here assert len(db.query_all()) == 1 # Forcibly destroy DB object to close connection gc.disable() del db gc.enable() # Test the existing DB db = SqliteStore(MockConfig()) # Again, only the Global CIDR should appear assert len(db.query_all()) == 1 # Cleanup os.remove(MOCK_DB_FILE)
def nogc(fun): """ Decorator: let a function disable the garbage collector during its execution. It is used in the build context when storing/loading the build cache file (pickle) :param fun: function to execute :type fun: function :return: the return value of the function executed """ def f(*k, **kw): try: gc.disable() ret = fun(*k, **kw) finally: gc.enable() return ret f.__doc__ = fun.__doc__ return f
def timer(func, repetitions=100000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper #------------------------------------------------------------------------------ # [ timer_X function decorators ] # replicate the above decorator with different number of repetitions #------------------------------------------------------------------------------
def timer_10(func, repetitions=10): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_100(func, repetitions=100): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_1k(func, repetitions=1000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timer_10k(func, repetitions=10000): @wraps(func) def wrapper(*args, **kwargs): sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...") sys.stdout.flush() print(" ") # disable garbage collection gc.collect() gc.disable() start = time.time() for x in range(repetitions): result = func(*args, **kwargs) end = time.time() gc.enable() # re-enable garbage collection gc.collect() print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec") return result return wrapper
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ if itertools: it = itertools.repeat(None, number) else: it = [None] * number gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def load_parser(self, path, original_changed_time): try: pickle_changed_time = self._index[path] except KeyError: return None if original_changed_time is not None \ and pickle_changed_time < original_changed_time: # the pickle file is outdated return None with open(self._get_hashed_path(path), 'rb') as f: try: gc.disable() parser_cache_item = pickle.load(f) finally: gc.enable() debug.dbg('pickle loaded: %s', path) parser_cache[path] = parser_cache_item return parser_cache_item.parser
def test_load_refcount(): # Check that objects returned by np.load are directly freed based on # their refcount, rather than needing the gc to collect them. f = BytesIO() np.savez(f, [1, 2, 3]) f.seek(0) assert_(gc.isenabled()) gc.disable() try: gc.collect() np.load(f) # gc.collect returns the number of unreachable objects in cycles that # were found -- we are checking that no cycles were created by np.load n_objects_in_cycles = gc.collect() finally: gc.enable() assert_equal(n_objects_in_cycles, 0)
def bench(func, iterations, stat_memory): gc.collect() heap_diff = None if heapy and stat_memory: heap_before = heapy.heap() total_sec = timeit.timeit(func, setup=gc.enable, number=iterations) if heapy and stat_memory: heap_diff = heapy.heap() - heap_before sec_per_req = Decimal(str(total_sec)) / Decimal(str(iterations)) sys.stdout.write('.') sys.stdout.flush() return (sec_per_req, heap_diff)
def determine_iterations(func): # NOTE(kgriffs): Algorithm adapted from IPython's magic timeit # function to determine iterations so that 0.2 <= total time < 2.0 iterations = ITER_DETECTION_MULTIPLIER for __ in range(1, ITER_DETECTION_MAX_ATTEMPTS): gc.collect() total_sec = timeit.timeit( func, setup=gc.enable, number=int(iterations) ) if total_sec >= ITER_DETECTION_DURATION_MIN: assert total_sec < ITER_DETECTION_DURATION_MAX break iterations *= ITER_DETECTION_MULTIPLIER return int(iterations)
def call_unrar(params): "Calls rar/unrar command line executable, returns stdout pipe" global rar_executable_cached if rar_executable_cached is None: for command in ('unrar', 'rar'): try: subprocess.Popen([command], stdout=subprocess.PIPE) rar_executable_cached = command break except OSError: pass if rar_executable_cached is None: raise UnpackerNotInstalled("No suitable RAR unpacker installed") assert type(params) == list, "params must be list" args = [rar_executable_cached] + params try: gc.disable() # See http://bugs.python.org/issue1336 return subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) finally: gc.enable()
def _run(self, n): self._make_args(n) gcold = gc.isenabled() gc.disable() times = [] for i in range(self._cmd.args.runs): t_start = time.time() self._compute() elapsed = time.time() - t_start times.append(elapsed) if gcold: gc.enable() return times
def timeit(self, number=timeit.default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ it = itertools.repeat(None, number) gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def test_free_from_gc(self): # Check that freeing of blocks by the garbage collector doesn't deadlock # (issue #12352). # Make sure the GC is enabled, and set lower collection thresholds to # make collections more frequent (and increase the probability of # deadlock). if not gc.isenabled(): gc.enable() self.addCleanup(gc.disable) thresholds = gc.get_threshold() self.addCleanup(gc.set_threshold, *thresholds) gc.set_threshold(10) # perform numerous block allocations, with cyclic references to make # sure objects are collected asynchronously by the gc for i in range(5000): a = multiprocessing.heap.BufferWrapper(1) b = multiprocessing.heap.BufferWrapper(1) # circular references a.buddy = b b.buddy = a # # #
def test_del_newclass(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A(object): def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds) # The following two tests are fragile: # They precisely count the number of allocations, # which is highly implementation-dependent. # For example, disposed tuples are not freed, but reused. # To minimize variations, though, we first store the get_count() results # and check them at the end.
def install_and_load(self): # TODO automatically install if fails to find anything FILE_NOT_FOUND_MSG = ( 'Did not found TIMIT file "%s"' ', make sure you download and install the dataset') self.subset = {} path = os.path.join(os.path.dirname(__file__), 'TIMIT', '%s_set.pkl') for subset in ['train', 'test']: filepath = path % subset if not os.path.exists(filepath): raise IOError( FILE_NOT_FOUND_MSG % filepath) with open(filepath, 'rb') as f: gc.disable() all_data = [pickle.load(f)] all_data.append(pickle.load(f)) all_data.append(pickle.load(f)) gc.enable() self.subset[subset] = all_data # use same subset for validation / test # as TIMIT is small self.subset['valid'] = self.subset['test']
def benchmark(self, block: RunProgramBlock, runs: int, cpuset: CPUSet = None, set_id: int = 0) -> BenchmarkingResultBlock: t = time.time() block = block.copy() try: self._setup_block(block) gc.collect() gc.disable() except IOError as err: return BenchmarkingResultBlock(error=err) try: res = self._benchmark(block, runs, cpuset, set_id) except BaseException as ex: return BenchmarkingResultBlock(error=ex) finally: gc.enable() try: self._teardown_block(block) except BaseException as err: return BenchmarkingResultBlock(error=err) t = time.time() - t assert isinstance(res, BenchmarkingResultBlock) res.data["__ov-time"] = [t / runs] * runs # print(res.data) return res
def run_ec_on_bin(*args): cloneset, mismatch_rate, confidence, max_Q = args try: gc.disable() seqlen = len(next(iter(cloneset)).seq) logger.info("Starting QMerge on cloneset: seqlen: %s, \ #clones: %s, #sequences: %s, #bases: %s, #mutation_count: %s"%(seqlen, len(cloneset), cloneset.sequence_count, cloneset.base_count, cloneset.mutation_count)) cloneset = run_qmerge_on_bin(cloneset, mismatch_rate, confidence, max_Q) logger.info("Starting IMerge on cloneset: seqlen: %s, \ #clones: %s, #sequences: %s, #bases: %s, #mutation_count: %s"%(seqlen, len(cloneset), cloneset.sequence_count, cloneset.base_count, cloneset.mutation_count)) cloneset = run_imerge_on_bin(cloneset, mismatch_rate, confidence) finally: gc.enable() return cloneset, mismatch_rate
def test_del_newclass(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A(object): def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds) # The following two tests are fragile: # They precisely count the number of allocations, # which is highly implementation-dependent. # For example: # - disposed tuples are not freed, but reused # - the call to assertEqual somehow avoids building its args tuple
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print "restoring automatic collection" # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def test_main(): min_kb, max_kb = get_bounds() # This should be the **only** test function here. gc.disable() intersect_all() kb_used_after = memory_profiler.memory_usage(max_usage=True) if min_kb <= kb_used_after <= max_kb: status = 0 msg = SUCCESS_TEMPLATE.format(kb_used_after) print(msg) else: status = 1 msg = ERR_TEMPLATE.format(kb_used_after, min_kb, max_kb) print(msg, file=sys.stderr) gc.enable() sys.exit(status)
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ it = itertools.repeat(None, number) gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def tearDown(self): # Restore gc state del self.visit gc.callbacks.remove(self.cb1) gc.callbacks.remove(self.cb2) gc.set_debug(self.debug) if self.enabled: gc.enable() # destroy any uncollectables gc.collect() for obj in gc.garbage: if isinstance(obj, Uncollectable): obj.partner = None del gc.garbage[:] del self.othergarbage gc.collect()
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests, GCCallbackTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print("restoring automatic collection") # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def parse_all_files(self, directory, dictionary, use_chars, cache_file): """ parse all files under the given directory into a list of questions, where each element is in the form of (document, query, answer, filename) """ if os.path.exists(cache_file): gc.disable() temp = cPickle.load(open(cache_file)) gc.enable() return temp all_files = glob.glob(directory + '/*.question') questions = [] for i, f in enumerate(all_files): if i % 10000 == 0: print 'parsing {}'.format(i) questions.append(self.parse_one_file(f, dictionary, use_chars) + (f,)) questions = self.parse_ner_pos(questions) cPickle.dump(questions, open(cache_file, 'w'), cPickle.HIGHEST_PROTOCOL) return questions
def parser_thread(self): while self.active: eventlet.greenthread.sleep(0) data,addr = None,None while data==None: eventlet.greenthread.sleep(0) try: data,addr = self.parse_q.get() except: yatelog.minor_exception('YATESock','Failed during parse receive') if data != None: data = zlib.decompress(data) gc.disable() # performance hack for msgpack try: msg = msgpack.unpackb(data,use_list = False) msg_type = msg[0] msg_params = msg[1] msg_id = msg[2] self.in_queues[msg_type].put((msg_params,msg_id,addr)) except: yatelog.minor_exception('YATESock','Error while parsing packet from %s:%s' % addr) gc.enable()