我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用thread._count()。
def monitor(): global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST while True: queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: load = 1 else: ac_count = thread._count() load = float(ac_count - 4) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() if load > 0: time.sleep(8) else: time.sleep(60)
def monitor(): global PASSWORD_DIC, THREAD_COUNT, TIMEOUT while True: queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: load = 1 else: ac_count = thread._count() load = float(ac_count - 4) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT = get_config() if load > 0: time.sleep(8) else: time.sleep(60)
def run_doctest(module, verbosity=None): """Run doctest on the given module. Return (#failures, #tests). If optional argument verbosity is not specified (or is None), pass test_support's belief about verbosity on to doctest. Else doctest's usual behavior is used (it searches sys.argv for -v). """ import doctest if verbosity is None: verbosity = verbose else: verbosity = None # Direct doctest output (normally just errors) to real stdout; doctest # output shouldn't be compared by regrtest. save_stdout = sys.stdout sys.stdout = get_original_stdout() try: f, t = doctest.testmod(module, verbose=verbosity) if f: raise TestFailed("%d of %d doctests failed" % (f, t)) finally: sys.stdout = save_stdout if verbose: print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) return f, t #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R # NOTE: we use thread._count() rather than threading.enumerate() (or the # moral equivalent thereof) because a threading.Thread object is still alive # until its __bootstrap() method has returned, even after it has been # unregistered from the threading module. # thread._count(), on the other hand, only gets decremented *after* the # __bootstrap() method has returned, which gives us reliable reference counts # at the end of a test run.
def threading_setup(): if thread: return thread._count(), else: return 1,
def threading_cleanup(nb_threads): if not thread: return _MAX_COUNT = 10 for count in range(_MAX_COUNT): n = thread._count() if n == nb_threads: break time.sleep(0.1) # XXX print a warning in case of failure?
def modules_cleanup(oldmodules): # Encoders/decoders are registered permanently within the internal # codec cache. If we destroy the corresponding modules their # globals will be set to None which will trip up the cached functions. encodings = [(k, v) for k, v in sys.modules.items() if k.startswith('encodings.')] # Was: # sys.modules.clear() # Py2-compatible: for i in range(len(sys.modules)): sys.modules.pop() sys.modules.update(encodings) # XXX: This kind of problem can affect more than just encodings. In particular # extension modules (such as _ssl) don't cope with reloading properly. # Really, test modules should be cleaning out the test specific modules they # know they added (ala test_runpy) rather than relying on this function (as # test_importhooks and test_pkg do currently). # Implicitly imported *real* modules should be left alone (see issue 10556). sys.modules.update(oldmodules) #======================================================================= # Backported versions of threading_setup() and threading_cleanup() which don't refer # to threading._dangling (not available on Py2.7). # Threading support to prevent reporting refleaks when running regrtest.py -R # NOTE: we use thread._count() rather than threading.enumerate() (or the # moral equivalent thereof) because a threading.Thread object is still alive # until its __bootstrap() method has returned, even after it has been # unregistered from the threading module. # thread._count(), on the other hand, only gets decremented *after* the # __bootstrap() method has returned, which gives us reliable reference counts # at the end of a test run.
def threading_setup(): if _thread: return _thread._count(), else: return 1,
def threading_cleanup(nb_threads): if not _thread: return _MAX_COUNT = 10 for count in range(_MAX_COUNT): n = _thread._count() if n == nb_threads: break time.sleep(0.1) # XXX print a warning in case of failure?