Python gc 模块,enable() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.enable()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        timing = self.inner(it, self.timer)
        if gcold:
            gc.enable()
        return timing
项目:deb-python-pyngus    作者:openstack    | 项目源码 | 文件源码
def test_cleanup(self):
        gc.enable()
        gc.collect()
        assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
        container = pyngus.Container("abc")
        c1 = container.create_connection("c1")
        c2 = container.create_connection("c2")
        assert c2
        del c2
        gc.collect()
        c2 = container.get_connection("c2")
        assert c2
        c1 = container.get_connection("c1")
        assert c1
        c1.create_receiver("r1")
        c1.create_sender("s1")
        del c1
        del c2
        container.destroy()
        del container
        gc.collect()
        assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
项目:aws-sg-mngr    作者:mkazin    | 项目源码 | 文件源码
def test_init():

    # Delete existing DB to init from a clean state
    if os.path.exists(MOCK_DB_FILE):
        os.remove(MOCK_DB_FILE)

    db = SqliteStore(MockConfig())
    assert db
    # Only the Global CIDR should appear here
    assert len(db.query_all()) == 1

    # Forcibly destroy DB object to close connection
    gc.disable()
    del db
    gc.enable()

    # Test the existing DB
    db = SqliteStore(MockConfig())
    # Again, only the Global CIDR should appear
    assert len(db.query_all()) == 1

    # Cleanup
    os.remove(MOCK_DB_FILE)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def nogc(fun):
    """
    Decorator: let a function disable the garbage collector during its execution.
    It is used in the build context when storing/loading the build cache file (pickle)

    :param fun: function to execute
    :type fun: function
    :return: the return value of the function executed
    """
    def f(*k, **kw):
        try:
            gc.disable()
            ret = fun(*k, **kw)
        finally:
            gc.enable()
        return ret
    f.__doc__ = fun.__doc__
    return f
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def nogc(fun):
    """
    Decorator: let a function disable the garbage collector during its execution.
    It is used in the build context when storing/loading the build cache file (pickle)

    :param fun: function to execute
    :type fun: function
    :return: the return value of the function executed
    """
    def f(*k, **kw):
        try:
            gc.disable()
            ret = fun(*k, **kw)
        finally:
            gc.enable()
        return ret
    f.__doc__ = fun.__doc__
    return f
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def nogc(fun):
    """
    Decorator: let a function disable the garbage collector during its execution.
    It is used in the build context when storing/loading the build cache file (pickle)

    :param fun: function to execute
    :type fun: function
    :return: the return value of the function executed
    """
    def f(*k, **kw):
        try:
            gc.disable()
            ret = fun(*k, **kw)
        finally:
            gc.enable()
        return ret
    f.__doc__ = fun.__doc__
    return f
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer(func, repetitions=100000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper

#------------------------------------------------------------------------------
# [ timer_X function decorators ]
#   replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10(func, repetitions=10):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_100(func, repetitions=100):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_1k(func, repetitions=1000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10k(func, repetitions=10000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer(func, repetitions=100000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper

#------------------------------------------------------------------------------
# [ timer_X function decorators ]
#   replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10(func, repetitions=10):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_100(func, repetitions=100):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_1k(func, repetitions=1000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10k(func, repetitions=10000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def load_parser(self, path, original_changed_time):
        try:
            pickle_changed_time = self._index[path]
        except KeyError:
            return None
        if original_changed_time is not None \
                and pickle_changed_time < original_changed_time:
            # the pickle file is outdated
            return None

        with open(self._get_hashed_path(path), 'rb') as f:
            try:
                gc.disable()
                parser_cache_item = pickle.load(f)
            finally:
                gc.enable()

        debug.dbg('pickle loaded: %s', path)
        parser_cache[path] = parser_cache_item
        return parser_cache_item.parser
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def load_parser(self, path, original_changed_time):
        try:
            pickle_changed_time = self._index[path]
        except KeyError:
            return None
        if original_changed_time is not None \
                and pickle_changed_time < original_changed_time:
            # the pickle file is outdated
            return None

        with open(self._get_hashed_path(path), 'rb') as f:
            try:
                gc.disable()
                parser_cache_item = pickle.load(f)
            finally:
                gc.enable()

        debug.dbg('pickle loaded: %s', path)
        parser_cache[path] = parser_cache_item
        return parser_cache_item.parser
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_load_refcount():
    # Check that objects returned by np.load are directly freed based on
    # their refcount, rather than needing the gc to collect them.

    f = BytesIO()
    np.savez(f, [1, 2, 3])
    f.seek(0)

    assert_(gc.isenabled())
    gc.disable()
    try:
        gc.collect()
        np.load(f)
        # gc.collect returns the number of unreachable objects in cycles that
        # were found -- we are checking that no cycles were created by np.load
        n_objects_in_cycles = gc.collect()
    finally:
        gc.enable()
    assert_equal(n_objects_in_cycles, 0)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def bench(func, iterations, stat_memory):
    gc.collect()
    heap_diff = None

    if heapy and stat_memory:
        heap_before = heapy.heap()

    total_sec = timeit.timeit(func, setup=gc.enable, number=iterations)

    if heapy and stat_memory:
        heap_diff = heapy.heap() - heap_before

    sec_per_req = Decimal(str(total_sec)) / Decimal(str(iterations))

    sys.stdout.write('.')
    sys.stdout.flush()

    return (sec_per_req, heap_diff)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def determine_iterations(func):
    # NOTE(kgriffs): Algorithm adapted from IPython's magic timeit
    # function to determine iterations so that 0.2 <= total time < 2.0
    iterations = ITER_DETECTION_MULTIPLIER
    for __ in range(1, ITER_DETECTION_MAX_ATTEMPTS):
        gc.collect()

        total_sec = timeit.timeit(
            func,
            setup=gc.enable,
            number=int(iterations)
        )

        if total_sec >= ITER_DETECTION_DURATION_MIN:
            assert total_sec < ITER_DETECTION_DURATION_MAX
            break

        iterations *= ITER_DETECTION_MULTIPLIER

    return int(iterations)
项目:Comictagger    作者:dickloraine    | 项目源码 | 文件源码
def call_unrar(params):
    "Calls rar/unrar command line executable, returns stdout pipe"
    global rar_executable_cached
    if rar_executable_cached is None:
        for command in ('unrar', 'rar'):
            try:
                subprocess.Popen([command], stdout=subprocess.PIPE)
                rar_executable_cached = command
                break
            except OSError:
                pass
        if rar_executable_cached is None:
            raise UnpackerNotInstalled("No suitable RAR unpacker installed")

    assert type(params) == list, "params must be list"
    args = [rar_executable_cached] + params
    try:
        gc.disable() # See http://bugs.python.org/issue1336
        return subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    finally:
        gc.enable()
项目:ibench    作者:rscohn2    | 项目源码 | 文件源码
def _run(self, n):
        self._make_args(n)

        gcold = gc.isenabled()
        gc.disable()

        times = []
        for i in range(self._cmd.args.runs):
            t_start = time.time()
            self._compute()
            elapsed = time.time() - t_start
            times.append(elapsed)

        if gcold:
            gc.enable()

        return times
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def timeit(self, number=timeit.default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        it = itertools.repeat(None, number)
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:DaNet-Tensorflow    作者:khaotik    | 项目源码 | 文件源码
def install_and_load(self):
        # TODO automatically install if fails to find anything
        FILE_NOT_FOUND_MSG = (
            'Did not found TIMIT file "%s"'
            ', make sure you download and install the dataset')
        self.subset = {}
        path = os.path.join(os.path.dirname(__file__), 'TIMIT', '%s_set.pkl')
        for subset in ['train', 'test']:
            filepath = path % subset
            if not os.path.exists(filepath):
                raise IOError(
                    FILE_NOT_FOUND_MSG % filepath)

            with open(filepath, 'rb') as f:
                gc.disable()
                all_data = [pickle.load(f)]
                all_data.append(pickle.load(f))
                all_data.append(pickle.load(f))
                gc.enable()
            self.subset[subset] = all_data

        # use same subset for validation / test
        # as TIMIT is small
        self.subset['valid'] = self.subset['test']
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def benchmark(self, block: RunProgramBlock, runs: int,
                  cpuset: CPUSet = None, set_id: int = 0) -> BenchmarkingResultBlock:
        t = time.time()
        block = block.copy()
        try:
            self._setup_block(block)
            gc.collect()
            gc.disable()
        except IOError as err:
            return BenchmarkingResultBlock(error=err)
        try:
            res = self._benchmark(block, runs, cpuset, set_id)
        except BaseException as ex:
            return BenchmarkingResultBlock(error=ex)
        finally:
            gc.enable()
        try:
            self._teardown_block(block)
        except BaseException as err:
            return BenchmarkingResultBlock(error=err)
        t = time.time() - t
        assert isinstance(res, BenchmarkingResultBlock)
        res.data["__ov-time"] = [t / runs] * runs
        # print(res.data)
        return res
项目:RTCR    作者:uubram    | 项目源码 | 文件源码
def run_ec_on_bin(*args):
    cloneset, mismatch_rate, confidence, max_Q = args
    try:
        gc.disable()
        seqlen = len(next(iter(cloneset)).seq)
        logger.info("Starting QMerge on cloneset: seqlen: %s, \
#clones: %s, #sequences: %s, #bases: %s, #mutation_count: %s"%(seqlen,
len(cloneset), cloneset.sequence_count, cloneset.base_count,
cloneset.mutation_count))

        cloneset = run_qmerge_on_bin(cloneset, mismatch_rate, confidence,
                max_Q)
        logger.info("Starting IMerge on cloneset: seqlen: %s, \
#clones: %s, #sequences: %s, #bases: %s, #mutation_count: %s"%(seqlen,
len(cloneset), cloneset.sequence_count, cloneset.base_count,
cloneset.mutation_count))
        cloneset = run_imerge_on_bin(cloneset, mismatch_rate, confidence)
    finally:
        gc.enable()
    return cloneset, mismatch_rate
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_load_refcount():
    # Check that objects returned by np.load are directly freed based on
    # their refcount, rather than needing the gc to collect them.

    f = BytesIO()
    np.savez(f, [1, 2, 3])
    f.seek(0)

    assert_(gc.isenabled())
    gc.disable()
    try:
        gc.collect()
        np.load(f)
        # gc.collect returns the number of unreachable objects in cycles that
        # were found -- we are checking that no cycles were created by np.load
        n_objects_in_cycles = gc.collect()
    finally:
        gc.enable()
    assert_equal(n_objects_in_cycles, 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    enabled = gc.isenabled()
    gc.disable()
    assert not gc.isenabled()
    debug = gc.get_debug()
    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak

    try:
        gc.collect() # Delete 2nd generation garbage
        run_unittest(GCTests, GCTogglingTests)
    finally:
        gc.set_debug(debug)
        # test gc.enable() even if GC is disabled by default
        if verbose:
            print "restoring automatic collection"
        # make sure to always test gc.enable()
        gc.enable()
        assert gc.isenabled()
        if not enabled:
            gc.disable()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    enabled = gc.isenabled()
    gc.disable()
    assert not gc.isenabled()
    debug = gc.get_debug()
    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak

    try:
        gc.collect() # Delete 2nd generation garbage
        run_unittest(GCTests, GCTogglingTests)
    finally:
        gc.set_debug(debug)
        # test gc.enable() even if GC is disabled by default
        if verbose:
            print "restoring automatic collection"
        # make sure to always test gc.enable()
        gc.enable()
        assert gc.isenabled()
        if not enabled:
            gc.disable()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        if itertools:
            it = itertools.repeat(None, number)
        else:
            it = [None] * number
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:bezier    作者:dhermes    | 项目源码 | 文件源码
def test_main():
    min_kb, max_kb = get_bounds()

    # This should be the **only** test function here.
    gc.disable()
    intersect_all()
    kb_used_after = memory_profiler.memory_usage(max_usage=True)
    if min_kb <= kb_used_after <= max_kb:
        status = 0
        msg = SUCCESS_TEMPLATE.format(kb_used_after)
        print(msg)
    else:
        status = 1
        msg = ERR_TEMPLATE.format(kb_used_after, min_kb, max_kb)
        print(msg, file=sys.stderr)

    gc.enable()
    sys.exit(status)
项目:bezier    作者:dhermes    | 项目源码 | 文件源码
def test_main():
    min_kb, max_kb = get_bounds()

    # This should be the **only** test function here.
    gc.disable()
    intersect_all()
    kb_used_after = memory_profiler.memory_usage(max_usage=True)
    if min_kb <= kb_used_after <= max_kb:
        status = 0
        msg = SUCCESS_TEMPLATE.format(kb_used_after)
        print(msg)
    else:
        status = 1
        msg = ERR_TEMPLATE.format(kb_used_after, min_kb, max_kb)
        print(msg, file=sys.stderr)

    gc.enable()
    sys.exit(status)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def timeit(self, number=default_number):
        """Time 'number' executions of the main statement.

        To be precise, this executes the setup statement once, and
        then returns the time it takes to execute the main statement
        a number of times, as a float measured in seconds.  The
        argument is the number of times through the loop, defaulting
        to one million.  The main statement, the setup statement and
        the timer function to be used are passed to the constructor.
        """
        it = itertools.repeat(None, number)
        gcold = gc.isenabled()
        gc.disable()
        try:
            timing = self.inner(it, self.timer)
        finally:
            if gcold:
                gc.enable()
        return timing
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def tearDown(self):
        # Restore gc state
        del self.visit
        gc.callbacks.remove(self.cb1)
        gc.callbacks.remove(self.cb2)
        gc.set_debug(self.debug)
        if self.enabled:
            gc.enable()
        # destroy any uncollectables
        gc.collect()
        for obj in gc.garbage:
            if isinstance(obj, Uncollectable):
                obj.partner = None
        del gc.garbage[:]
        del self.othergarbage
        gc.collect()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_main():
    enabled = gc.isenabled()
    gc.disable()
    assert not gc.isenabled()
    debug = gc.get_debug()
    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak

    try:
        gc.collect() # Delete 2nd generation garbage
        run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
    finally:
        gc.set_debug(debug)
        # test gc.enable() even if GC is disabled by default
        if verbose:
            print("restoring automatic collection")
        # make sure to always test gc.enable()
        gc.enable()
        assert gc.isenabled()
        if not enabled:
            gc.disable()
项目:fg-gating    作者:kimiyoung    | 项目源码 | 文件源码
def parse_all_files(self, directory, dictionary, use_chars, cache_file):
        """
        parse all files under the given directory into a list of questions,
        where each element is in the form of (document, query, answer, filename)
        """
        if os.path.exists(cache_file):
            gc.disable()
            temp = cPickle.load(open(cache_file))
            gc.enable()
            return temp
        all_files = glob.glob(directory + '/*.question')
        questions = []
        for i, f in enumerate(all_files):
            if i % 10000 == 0:
                print 'parsing {}'.format(i)
            questions.append(self.parse_one_file(f, dictionary, use_chars) + (f,))
        questions = self.parse_ner_pos(questions)
        cPickle.dump(questions, open(cache_file, 'w'), cPickle.HIGHEST_PROTOCOL)
        return questions
项目:YATE    作者:GarethNelson    | 项目源码 | 文件源码
def parser_thread(self):
       while self.active:
         eventlet.greenthread.sleep(0)
         data,addr = None,None
         while data==None:
            eventlet.greenthread.sleep(0)
            try:
               data,addr = self.parse_q.get()
            except:
               yatelog.minor_exception('YATESock','Failed during parse receive')
         if data != None:
            data = zlib.decompress(data)
            gc.disable() # performance hack for msgpack
            try:
               msg        = msgpack.unpackb(data,use_list = False)
               msg_type   = msg[0]
               msg_params = msg[1]
               msg_id     = msg[2]
               self.in_queues[msg_type].put((msg_params,msg_id,addr))
            except:
               yatelog.minor_exception('YATESock','Error while parsing packet from %s:%s' % addr)
            gc.enable()