我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用thread.start()。
def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown p = subprocess.Popen([sys.executable, "-c", """if 1: import threading from time import sleep def child(): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet print "Woke up, sleep function is:", sleep threading.Thread(target=child).start() raise SystemExit """], stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.addCleanup(p.stdout.close) self.addCleanup(p.stderr.close) stdout, stderr = p.communicate() self.assertEqual(stdout.strip(), "Woke up, sleep function is: <built-in function sleep>") stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() self.assertEqual(stderr, "")
def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate old_interval = sys.getcheckinterval() try: for i in xrange(1, 100): # Try a couple times at each thread-switching interval # to get more interleavings. sys.setcheckinterval(i // 5) t = threading.Thread(target=lambda: None) t.start() t.join() l = enum() self.assertNotIn(t, l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: sys.setcheckinterval(old_interval)
def test_is_alive_after_fork(self): # Try hard to trigger #18418: is_alive() could sometimes be True on # threads that vanished after a fork. old_interval = sys.getcheckinterval() # Make the bug more likely to manifest. sys.setcheckinterval(10) try: for i in range(20): t = threading.Thread(target=lambda: None) t.start() pid = os.fork() if pid == 0: os._exit(1 if t.is_alive() else 0) else: t.join() pid, status = os.waitpid(pid, 0) self.assertEqual(0, status) finally: sys.setcheckinterval(old_interval)
def test_BoundedSemaphore_limit(self): # BoundedSemaphore should raise ValueError if released too often. for limit in range(1, 10): bs = threading.BoundedSemaphore(limit) threads = [threading.Thread(target=bs.acquire) for _ in range(limit)] for t in threads: t.start() for t in threads: t.join() threads = [threading.Thread(target=bs.release) for _ in range(limit)] for t in threads: t.start() for t in threads: t.join() self.assertRaises(ValueError, bs.release)
def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. script = """if 1: main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(main_thread,)) print 'end of main' t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() """ self._run_and_join(script)
def test_reinit_tls_after_fork(self): # Issue #13817: fork() would deadlock in a multithreaded program with # the ad-hoc TLS implementation. def do_fork_and_wait(): # just fork a child process and wait it pid = os.fork() if pid > 0: os.waitpid(pid, 0) else: os._exit(0) # start a bunch of threads that will fork() child processes threads = [] for i in range(16): t = threading.Thread(target=do_fork_and_wait) threads.append(t) t.start() for t in threads: t.join()
def test_print_exception(self): script = r"""if 1: import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1.0/0.0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() """ rc, out, err = assert_python_ok("-c", script) self.assertEqual(out, '') self.assertIn("Exception in thread", err) self.assertIn("Traceback (most recent call last):", err) self.assertIn("ZeroDivisionError", err) self.assertNotIn("Unhandled exception", err)
def test_print_exception(self): script = r"""if 1: import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() """ rc, out, err = assert_python_ok("-c", script) self.assertEqual(out, '') self.assertIn("Exception in thread", err) self.assertIn("Traceback (most recent call last):", err) self.assertIn("ZeroDivisionError", err) self.assertNotIn("Unhandled exception", err)
def create_thread(thread_function,thread_arguments = (),is_sub_thread = False) : # thread.start_new_thread(thread_function,thread_arguments) thread = thread_object(thread_function,thread_arguments,is_sub_thread) thread.start() return thread
def test_various_ops(self): # This takes about n/3 seconds to run (about n/3 clumps of tasks, # times about 1 second per clump). NUMTASKS = 10 # no more than 3 of the 10 can run at once sema = threading.BoundedSemaphore(value=3) mutex = threading.RLock() numrunning = Counter() threads = [] for i in range(NUMTASKS): t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) threads.append(t) self.assertIsNone(t.ident) self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$') t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) self.assertFalse(t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertIsNotNone(t.ident) self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$') if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0)
def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise thread.error() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(thread.error, t.start) self.assertFalse( t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: threading._start_new_thread = _start_new_thread
def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown p = subprocess.Popen([sys.executable, "-c", """if 1: import sys, threading # A deadlock-killer, to prevent the # testsuite to hang forever def killer(): import os, time time.sleep(2) print 'program blocked; aborting' os._exit(2) t = threading.Thread(target=killer) t.daemon = True t.start() # This is the trace function def func(frame, event, arg): threading.current_thread() return func sys.settrace(func) """], stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.addCleanup(p.stdout.close) self.addCleanup(p.stderr.close) stdout, stderr = p.communicate() rc = p.returncode self.assertFalse(rc == 2, "interpreted was blocked") self.assertTrue(rc == 0, "Unexpected error: " + repr(stderr))
def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread script = """if 1: import os t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() time.sleep(0.1) print 'end of main' """ self._run_and_join(script)
def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter script = """if 1: childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() print 'end of main' """ self._run_and_join(script)
def test_start_thread_again(self): thread = threading.Thread() thread.start() self.assertRaises(RuntimeError, thread.start)
def test_daemonize_active_thread(self): thread = threading.Thread() thread.start() self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
def test_print_exception_stderr_is_none_1(self): script = r"""if 1: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1.0/0.0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) sys.stderr = None running = False t.join() """ rc, out, err = assert_python_ok("-c", script) self.assertEqual(out, '') self.assertIn("Exception in thread", err) self.assertIn("Traceback (most recent call last):", err) self.assertIn("ZeroDivisionError", err) self.assertNotIn("Unhandled exception", err)
def test_recursion_limit(self): # Issue 9670 # test that excessive recursion within a non-main thread causes # an exception rather than crashing the interpreter on platforms # like Mac OS X or FreeBSD which have small default stack sizes # for threads script = """if True: import threading def recurse(): return recurse() def outer(): try: recurse() except RuntimeError: pass w = threading.Thread(target=outer) w.start() w.join() print('end of main thread') """ expected_output = "end of main thread\n" p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) stdout, stderr = p.communicate() data = stdout.decode().replace('\r', '') self.assertEqual(p.returncode, 0, "Unexpected error") self.assertEqual(data, expected_output)
def test_various_ops(self): # This takes about n/3 seconds to run (about n/3 clumps of tasks, # times about 1 second per clump). NUMTASKS = 10 # no more than 3 of the 10 can run at once sema = threading.BoundedSemaphore(value=3) mutex = threading.RLock() numrunning = Counter() threads = [] for i in range(NUMTASKS): t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) threads.append(t) self.assertEqual(t.ident, None) self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0)