我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用threading._active()。
def __init__(self): #_DummyThread_.__init__(self) # pylint:disable=super-init-not-called # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out # there is checking thread names... self._name = self._Thread__name = __threading__._newname("DummyThread-%d") self._set_ident() g = getcurrent() gid = _get_ident(g) # same as id(g) __threading__._active[gid] = self rawlink = getattr(g, 'rawlink', None) if rawlink is not None: # raw greenlet.greenlet greenlets don't # have rawlink... rawlink(_cleanup) else: # ... so for them we use weakrefs. # See https://github.com/gevent/gevent/issues/918 global _weakref if _weakref is None: _weakref = __import__('weakref') ref = _weakref.ref(g, _make_cleanup_id(gid)) self.__raw_ref = ref
def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. def f(mutex): # Calling current_thread() forces an entry for the foreign # thread to get made in the threading._active map. threading.current_thread() mutex.release() mutex = threading.Lock() mutex.acquire() tid = _thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() self.assertIn(tid, threading._active) self.assertIsInstance(threading._active[tid], threading._DummyThread) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it.
def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. def f(mutex): # Calling current_thread() forces an entry for the foreign # thread to get made in the threading._active map. threading.current_thread() mutex.release() mutex = threading.Lock() mutex.acquire() tid = thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() self.assertIn(tid, threading._active) self.assertIsInstance(threading._active[tid], threading._DummyThread) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it.
def __bootstrap(self): try: self._set_ident() self._Thread__started.set() threading._active_limbo_lock.acquire() threading._active[self._Thread__ident] = self del threading._limbo[self] threading._active_limbo_lock.release() if threading._trace_hook: sys.settrace(threading._trace_hook) if threading._profile_hook: sys.setprofile(threading._profile_hook) try: self.run() finally: self._Thread__exc_clear() finally: with threading._active_limbo_lock: self._Thread__stop() try: del threading._active[threading._get_ident()] except: pass
def test_orig_thread(self): new_mod = """import eventlet eventlet.monkey_patch() from eventlet import patcher import threading _threading = patcher.original('threading') def test(): print(repr(threading.currentThread())) t = _threading.Thread(target=test) t.start() t.join() print(len(threading._active)) print(len(_threading._active)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 4, "\n".join(lines)) assert lines[0].startswith('<Thread'), lines[0] assert lines[1] == '1', lines assert lines[2] == '1', lines
def test_threading(self): new_mod = """import eventlet eventlet.monkey_patch() import threading def test(): print(repr(threading.currentThread())) t = threading.Thread(target=test) t.start() t.join() print(len(threading._active)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 3, "\n".join(lines)) assert lines[0].startswith('<_MainThread'), lines[0] self.assertEqual(lines[1], "1", lines[1])
def test_greenlet(self): new_mod = """import eventlet eventlet.monkey_patch() from eventlet import event import threading evt = event.Event() def test(): print(repr(threading.currentThread())) evt.send() eventlet.spawn_n(test) evt.wait() print(len(threading._active)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 3, "\n".join(lines)) assert lines[0].startswith('<_MainThread'), lines[0] self.assertEqual(lines[1], "1", lines[1])
def id2thread_name(thread_id): return threading.Thread.getName(threading._active[thread_id])
def _cleanup(g): __threading__._active.pop(id(g), None)
def __init__(self): #_DummyThread_.__init__(self) # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out # there is checking thread names... self._name = self._Thread__name = __threading__._newname("DummyThread-%d") self._set_ident() __threading__._active[_get_ident()] = self g = getcurrent() rawlink = getattr(g, 'rawlink', None) if rawlink is not None: rawlink(_cleanup)
def _make_cleanup_id(gid): def _(_r): __threading__._active.pop(gid, None) return _
def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. self.assertFalse(threading.currentThread().ident is None) def f(): ident.append(threading.currentThread().ident) done.set() done = threading.Event() ident = [] _thread.start_new_thread(f, ()) done.wait() self.assertFalse(ident[0] is None) # Kill the "immortal" _DummyThread del threading._active[ident[0]] # run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. self.assertIsNotNone(threading.currentThread().ident) def f(): ident.append(threading.currentThread().ident) done.set() done = threading.Event() ident = [] thread.start_new_thread(f, ()) done.wait() self.assertIsNotNone(ident[0]) # Kill the "immortal" _DummyThread del threading._active[ident[0]] # run with a small(ish) thread stack size (256kB)
def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. self.assertFalse(threading.currentThread().ident is None) def f(): ident.append(threading.currentThread().ident) done.set() done = threading.Event() ident = [] thread.start_new_thread(f, ()) done.wait() self.assertFalse(ident[0] is None) # Kill the "immortal" _DummyThread del threading._active[ident[0]] # run with a small(ish) thread stack size (256kB)
def __repr__(self): owner, count = self._get_data() try: owner = threading._active[owner].name except KeyError: pass if owner: return "<{}, owned by <{}>x{} for {}>".format(self._name, owner, count, self._lease_timer.elapsed) else: return "<{}, unowned>".format(self._name)
def test_tpool(self): new_mod = """import eventlet eventlet.monkey_patch() from eventlet import tpool import threading def test(): print(repr(threading.currentThread())) tpool.execute(test) print(len(threading._active)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 3, "\n".join(lines)) assert lines[0].startswith('<Thread'), lines[0] self.assertEqual(lines[1], "1", lines[1])
def test_greenthread(self): new_mod = """import eventlet eventlet.monkey_patch() import threading def test(): print(repr(threading.currentThread())) t = eventlet.spawn(test) t.wait() print(len(threading._active)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 3, "\n".join(lines)) assert lines[0].startswith('<_GreenThread'), lines[0] self.assertEqual(lines[1], "1", lines[1])