我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用thread.trace_func()。
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None self.is_sending = False # stackless changes if stackless is not None: self._stackless_attach() if sys.platform == 'cli': self.frames = []
def _stackless_attach(self): try: stackless.tasklet.trace_function except AttributeError: # the tasklets need to be traced on a case by case basis # sys.trace needs to be called within their calling context def __call__(tsk, *args, **kwargs): f = tsk.tempval def new_f(old_f, args, kwargs): sys.settrace(self.trace_func) try: if old_f is not None: return old_f(*args, **kwargs) finally: sys.settrace(None) tsk.tempval = new_f stackless.tasklet.setup(tsk, f, args, kwargs) return tsk def settrace(tsk, tb): if hasattr(tsk.frame, "f_trace"): tsk.frame.f_trace = tb sys.settrace(tb) self.__oldstacklesscall__ = stackless.tasklet.__call__ stackless.tasklet.settrace = settrace stackless.tasklet.__call__ = __call__ if sys.platform == 'cli': self.frames = []
def context_dispatcher(self, old, new): self.stepping = STEPPING_NONE # for those tasklets that started before we started tracing # we need to make sure that the trace is set by patching # it in the context switch if old and new: if hasattr(new.frame, "f_trace") and not new.frame.f_trace: sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next): current = stackless.getcurrent() if not current: return current_tf = current.trace_function try: current.trace_function = None self.stepping = STEPPING_NONE # If the current frame has no trace function, we may need to get it # from the previous frame, depending on how we ended up in the # callback. if current_tf is None: f_back = current.frame.f_back if f_back is not None: current_tf = f_back.f_trace if next is not None: # Assign our trace function to the current stack f = next.frame if next is current: f = f.f_back while f: if isinstance(f, types.FrameType): f.f_trace = self.trace_func f = f.f_back next.trace_function = self.trace_func finally: current.trace_function = current_tf
def trace_func(self, frame, event, arg): # If we're so far into process shutdown that sys is already gone, just stop tracing. if sys is None: return None elif self.is_sending: # https://pytools.codeplex.com/workitem/1864 # we're currently doing I/O w/ the socket, we don't want to deliver # any breakpoints or async breaks because we'll deadlock. Continue # to return the trace function so all of our frames remain # balanced. A better way to deal with this might be to do # sys.settrace(None) when we take the send lock, but that's much # more difficult because our send context manager is used both # inside and outside of the trace function, and so is used when # tracing is enabled and disabled, and so it's very easy to get our # current frame tracking to be thrown off... return self.trace_func try: # if should_debug_code(frame.f_code) is not true during attach # the current frame is None and a pop_frame will cause an exception and # break the debugger if self.cur_frame is None: # happens during attach, we need frame for blocking self.push_frame(frame) if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code): if self.detach: if stackless is not None: stackless.set_schedule_callback(None) stackless.tasklet.__call__ = self.__oldstacklesscall__ sys.settrace(None) return None self.async_break() return self._events[event](frame, arg) except (StackOverflowException, KeyboardInterrupt): # stack overflow, disable tracing return self.trace_func
def new_external_thread(): thread = new_thread() if not attach_sent_break: # we are still doing the attach, make this thread break. thread.stepping = STEPPING_ATTACH_BREAK elif SEND_BREAK_COMPLETE: # user requested break all, make this thread break thread.stepping = STEPPING_BREAK sys.settrace(thread.trace_func)
def new_thread_wrapper(func, posargs, kwargs): cur_thread = new_thread() try: sys.settrace(cur_thread.trace_func) func(*posargs, **kwargs) finally: THREADS_LOCK.acquire() if not cur_thread.detach: del THREADS[cur_thread.id] THREADS_LOCK.release() if not DETACHED: report_thread_exit(cur_thread)
def context_dispatcher(self, old, new): self.stepping = STEPPING_NONE # for those tasklets that started before we started tracing # we need to make sure that the trace is set by patching # it in the context switch if not old: pass # starting new elif not new: pass # killing prev else: if hasattr(new.frame, "f_trace") and not new.frame.f_trace: sys.call_tracing(new.settrace,(self.trace_func,))
def trace_func(self, frame, event, arg): # If we're so far into process shutdown that sys is already gone, just stop tracing. if sys is None: return None try: # if should_debug_code(frame.f_code) is not true during attach # the current frame is None and a pop_frame will cause an exception and # break the debugger if self.cur_frame is None: # happens during attach, we need frame for blocking self.push_frame(frame) if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code): if self.detach: if stackless is not None: stackless.set_schedule_callback(None) stackless.tasklet.__call__ = self.__oldstacklesscall__ sys.settrace(None) return None self.async_break() return self._events[event](frame, arg) except (StackOverflowException, KeyboardInterrupt): # stack overflow, disable tracing return self.trace_func
def new_thread_wrapper(func, *posargs, **kwargs): cur_thread = new_thread() try: sys.settrace(cur_thread.trace_func) func(*posargs, **kwargs) finally: THREADS_LOCK.acquire() if not cur_thread.detach: del THREADS[cur_thread.id] THREADS_LOCK.release() if not DETACHED: report_thread_exit(cur_thread)
def debug(file, port_num, debug_id, debug_options, currentPid, run_as = 'script'): # remove us from modules so there's no trace of us sys.modules['$visualstudio_py_debugger'] = sys.modules['visualstudio_py_debugger'] __name__ = '$visualstudio_py_debugger' del sys.modules['visualstudio_py_debugger'] wait_on_normal_exit = 'WaitOnNormalExit' in debug_options ## Begin modification by Don Jayamanne # Pass current Process id to pass back to debugger attach_process(port_num, debug_id, debug_options, currentPid, report = True) ## End Modification by Don Jayamanne # setup the current thread cur_thread = new_thread() cur_thread.stepping = STEPPING_LAUNCH_BREAK # start tracing on this thread sys.settrace(cur_thread.trace_func) # now execute main file globals_obj = {'__name__': '__main__'} try: if run_as == 'module': exec_module(file, globals_obj) elif run_as == 'code': exec_code(file, '<string>', globals_obj) else: exec_file(file, globals_obj) finally: sys.settrace(None) THREADS_LOCK.acquire() del THREADS[cur_thread.id] THREADS_LOCK.release() report_thread_exit(cur_thread) # Give VS debugger a chance to process commands # by waiting for ack of "last" command global _threading if _threading is None: import threading _threading = threading global last_ack_event last_ack_event = _threading.Event() with _SendLockCtx: write_bytes(conn, LAST) last_ack_event.wait(5) if wait_on_normal_exit: do_wait() # Code objects for functions which are going to be at the bottom of the stack, right below the first # stack frame for user code. When we walk the stack to determine whether to report or block on a given # frame, hitting any of these means that we walked all the frames that we needed to look at.
def debug(file, port_num, debug_id, debug_options, run_as = 'script'): # remove us from modules so there's no trace of us sys.modules['$visualstudio_py_debugger'] = sys.modules['visualstudio_py_debugger'] __name__ = '$visualstudio_py_debugger' del sys.modules['visualstudio_py_debugger'] wait_on_normal_exit = 'WaitOnNormalExit' in debug_options attach_process(port_num, debug_id, debug_options, report = True) # setup the current thread cur_thread = new_thread() cur_thread.stepping = STEPPING_LAUNCH_BREAK # start tracing on this thread sys.settrace(cur_thread.trace_func) # now execute main file globals_obj = {'__name__': '__main__'} try: if run_as == 'module': exec_module(file, globals_obj) elif run_as == 'code': exec_code(file, '<string>', globals_obj) else: exec_file(file, globals_obj) finally: sys.settrace(None) THREADS_LOCK.acquire() del THREADS[cur_thread.id] THREADS_LOCK.release() report_thread_exit(cur_thread) # Give VS debugger a chance to process commands # by waiting for ack of "last" command global _threading if _threading is None: import threading _threading = threading global last_ack_event last_ack_event = _threading.Event() with _SendLockCtx: write_bytes(conn, LAST) last_ack_event.wait(5) if wait_on_normal_exit: do_wait() # Code objects for functions which are going to be at the bottom of the stack, right below the first # stack frame for user code. When we walk the stack to determine whether to report or block on a given # frame, hitting any of these means that we walked all the frames that we needed to look at.
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None # stackless changes if stackless is not None: stackless.set_schedule_callback(self.context_dispatcher) # the tasklets need to be traced on a case by case basis # sys.trace needs to be called within their calling context def __call__(tsk, *args, **kwargs): f = tsk.tempval def new_f(old_f, args, kwargs): sys.settrace(self.trace_func) try: if old_f is not None: return old_f(*args, **kwargs) finally: sys.settrace(None) tsk.tempval = new_f stackless.tasklet.setup(tsk, f, args, kwargs) return tsk def settrace(tsk, tb): if hasattr(tsk.frame, "f_trace"): tsk.frame.f_trace = tb sys.settrace(tb) self.__oldstacklesscall__ = stackless.tasklet.__call__ stackless.tasklet.settrace = settrace stackless.tasklet.__call__ = __call__ if sys.platform == 'cli': self.frames = []
def handle_line(self, frame, arg): if not DETACHED: stepping = self.stepping # http://pytools.codeplex.com/workitem/815 # if we block for a step into/over we don't want to block again for a breakpoint blocked_for_stepping = False if stepping is not STEPPING_NONE: # check for the common case of no stepping first... if (((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and frame.f_lineno != self.stopped_on_line) or stepping == STEPPING_LAUNCH_BREAK or stepping == STEPPING_ATTACH_BREAK): if ((stepping == STEPPING_LAUNCH_BREAK and not MODULES) or not should_debug_code(frame.f_code)): # don't break into our own debugger / non-user code # don't break into inital Python code needed to set things up return self.trace_func blocked_for_stepping = stepping != STEPPING_LAUNCH_BREAK and stepping != STEPPING_ATTACH_BREAK self.block_maybe_attach() if BREAKPOINTS and blocked_for_stepping is False: bp = BREAKPOINTS.get(frame.f_lineno) if bp is not None: for (filename, bp_id), (condition, bound) in bp.items(): if filename == frame.f_code.co_filename or (not bound and filename_is_same(filename, frame.f_code.co_filename)): if condition: try: res = eval(condition.condition, frame.f_globals, frame.f_locals) if condition.break_when_changed: block = condition.last_value != res condition.last_value = res else: block = res except: block = True else: block = True if block: probe_stack() update_all_thread_stacks(self) self.block(lambda: (report_breakpoint_hit(bp_id, self.id), mark_all_threads_for_break())) break # forward call to previous trace function, if any, updating trace function appropriately old_trace_func = self.prev_trace_func if old_trace_func is not None: self.prev_trace_func = None # clear first incase old_trace_func stack overflows self.prev_trace_func = old_trace_func(frame, 'line', arg) return self.trace_func