我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用thread.start_new()。
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread # so that new threads started using it will be intercepted by our code. # # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then # treat the current thread as the main thread, which is incorrect when attaching because this code is executing # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported. global _threading if _threading is None and 'threading' in sys.modules: import threading _threading = threading _threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach ## Modified parameters by Don Jayamanne # Accept current Process id to pass back to debugger
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread # so that new threads started using it will be intercepted by our code. # # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then # treat the current thread as the main thread, which is incorrect when attaching because this code is executing # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported. global _threading if _threading is None and 'threading' in sys.modules: import threading _threading = threading _threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach
def __init__(self, func, args=(), kwargs={}): import thread def thread_bkcall(): try: self.ret=func(*args, **kwargs) self.done=1 except: self.exc=sys.exc_info() self.done=2 self.id=thread.start_new(thread_bkcall, ())
def detach_process(): global DETACHED DETACHED = True if not _INTERCEPTING_FOR_ATTACH: if isinstance(sys.stdout, _DebuggerOutput): sys.stdout = sys.stdout.old_out if isinstance(sys.stderr, _DebuggerOutput): sys.stderr = sys.stderr.old_out if not _INTERCEPTING_FOR_ATTACH: thread.start_new_thread = _start_new_thread thread.start_new = _start_new_thread
def BeginThreadsSimpleMarshal(numThreads, cookie): """Creates multiple threads using simple (but slower) marshalling. Single interpreter object, but a new stream is created per thread. Returns the handles the threads will set when complete. """ ret = [] for i in range(numThreads): hEvent = win32event.CreateEvent(None, 0, 0, None) thread.start_new(TestInterpInThread, (hEvent, cookie)) ret.append(hEvent) return ret
def run(self, nworkers): if not self.work: return # Nothing to do for i in range(nworkers-1): thread.start_new(self._worker, ()) self._worker() self.todo.acquire() # Main program
def main(): if len(sys.argv) < 2: sys.stderr.write('usage: telnet hostname [port]\n') sys.exit(2) host = sys.argv[1] try: hostaddr = gethostbyname(host) except error: sys.stderr.write(sys.argv[1] + ': bad host name\n') sys.exit(2) # if len(sys.argv) > 2: servname = sys.argv[2] else: servname = 'telnet' # if '0' <= servname[:1] <= '9': port = eval(servname) else: try: port = getservbyname(servname, 'tcp') except error: sys.stderr.write(servname + ': bad tcp service name\n') sys.exit(2) # s = socket(AF_INET, SOCK_STREAM) # try: s.connect((host, port)) except error, msg: sys.stderr.write('connect failed: %r\n' % (msg,)) sys.exit(1) # thread.start_new(child, (s,)) parent(s)
def startPycheckerRun(self): self.result=None old=win32api.SetCursor(win32api.LoadCursor(0, win32con.IDC_APPSTARTING)) win32ui.GetApp().AddIdleHandler(self.idleHandler) import thread thread.start_new(self.threadPycheckerRun,()) ##win32api.SetCursor(old)
def __init__(self, *args): winout.WindowOutput.__init__(*(self,)+args) self.hStopThread = win32event.CreateEvent(None, 0, 0, None) thread.start_new(CollectorThread, (self.hStopThread, self))
def Is_HTTP_On(on_off): if on_off == "ON": return thread.start_new(serve_thread_tcp,('', 80,HTTP)) if on_off == "OFF": return False #Function name self-explanatory
def Is_HTTPS_On(SSL_On_Off): if SSL_On_Off == "ON": return thread.start_new(serve_thread_SSL,('', 443,DoSSL)) if SSL_On_Off == "OFF": return False #Function name self-explanatory
def Is_WPAD_On(on_off): if on_off == True: return True #return thread.start_new(serve_thread_tcp,('', 3141,ProxyHandler)) if on_off == False: return False #Function name self-explanatory
def Is_SMB_On(SMB_On_Off): if SMB_On_Off == "ON": if LM_On_Off == True: return thread.start_new(serve_thread_tcp, ('', 445,SMB1LM)),thread.start_new(serve_thread_tcp,('', 139,SMB1LM)) else: return thread.start_new(serve_thread_tcp, ('', 445,SMB1)),thread.start_new(serve_thread_tcp,('', 139,SMB1)) if SMB_On_Off == "OFF": return False #Function name self-explanatory
def Is_Kerberos_On(Krb_On_Off): if Krb_On_Off == "ON": return thread.start_new(serve_thread_udp,('', 88,KerbUDP)),thread.start_new(serve_thread_tcp,('', 88, KerbTCP)) if Krb_On_Off == "OFF": return False #Function name self-explanatory
def Is_FTP_On(FTP_On_Off): if FTP_On_Off == "ON": return thread.start_new(serve_thread_tcp,('', 21,FTP)) if FTP_On_Off == "OFF": return False #Function name self-explanatory
def Is_POP_On(POP_On_Off): if POP_On_Off == "ON": return thread.start_new(serve_thread_tcp,('', 110,POP)) if POP_On_Off == "OFF": return False #Function name self-explanatory
def Is_LDAP_On(LDAP_On_Off): if LDAP_On_Off == "ON": return thread.start_new(serve_thread_tcp,('', 389,LDAP)) if LDAP_On_Off == "OFF": return False #Function name self-explanatory
def Is_SMTP_On(SMTP_On_Off): if SMTP_On_Off == "ON": return thread.start_new(serve_thread_tcp,('', 25,ESMTP)),thread.start_new(serve_thread_tcp,('', 587,ESMTP)) if SMTP_On_Off == "OFF": return False #Function name self-explanatory
def Is_IMAP_On(IMAP_On_Off): if IMAP_On_Off == "ON": return thread.start_new(serve_thread_tcp,('', 143,IMAP)) if IMAP_On_Off == "OFF": return False #Function name self-explanatory
def main(): try: thread.start_new(RunInloop,(Target,Command,Domain)) except KeyboardInterrupt: exit()
def intercept_threads(for_attach = False): thread.start_new_thread = thread_creator thread.start_new = thread_creator global threading if threading is None: # we need to patch threading._start_new_thread so that # we pick up new threads in the attach case when threading # is already imported. import threading threading._start_new_thread = thread_creator global _INTERCEPTING_FOR_ATTACH _INTERCEPTING_FOR_ATTACH = for_attach