我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用thread.stepping()。
def handle_exception(self, frame, arg): if self.stepping == STEPPING_ATTACH_BREAK: self.block_maybe_attach() if not DETACHED and should_debug_code(frame.f_code): break_type = BREAK_ON.should_break(self, *arg) if break_type: update_all_thread_stacks(self) self.block(lambda: report_exception(frame, arg, self.id, break_type)) # forward call to previous trace function, if any, updating the current trace function # with a new one if available old_trace_func = self.prev_trace_func if old_trace_func is not None: self.prev_trace_func = old_trace_func(frame, 'exception', arg) return self.trace_func
def async_break(self): def async_break_send(): with _SendLockCtx: sent_break_complete = False global SEND_BREAK_COMPLETE if SEND_BREAK_COMPLETE == True or SEND_BREAK_COMPLETE == self.id: # multiple threads could be sending this... SEND_BREAK_COMPLETE = False sent_break_complete = True write_bytes(conn, ASBR) write_int(conn, self.id) if sent_break_complete: # if we have threads which have not broken yet capture their frame list and # send it now. If they block we'll send an updated (and possibly more accurate - if # there are any thread locals) list of frames. update_all_thread_stacks(self) self.stepping = STEPPING_NONE self.block(async_break_send)
def detach_threads(): # tell all threads to stop tracing... THREADS_LOCK.acquire() all_threads = list(THREADS.items()) THREADS_LOCK.release() for tid, pyThread in all_threads: if not _INTERCEPTING_FOR_ATTACH: pyThread.detach = True pyThread.stepping = STEPPING_BREAK if pyThread._is_blocked: pyThread.unblock() if not _INTERCEPTING_FOR_ATTACH: THREADS_LOCK.acquire() THREADS.clear() THREADS_LOCK.release() BREAKPOINTS.clear()
def handle_exception(self, frame, arg): if self.stepping == STEPPING_ATTACH_BREAK: self.block_maybe_attach() if not DETACHED and should_debug_code(frame.f_code): break_type = BREAK_ON.ShouldBreak(self, *arg) if break_type: update_all_thread_stacks(self) self.block(lambda: report_exception(frame, arg, self.id, break_type)) # forward call to previous trace function, if any, updating the current trace function # with a new one if available old_trace_func = self.prev_trace_func if old_trace_func is not None: self.prev_trace_func = old_trace_func(frame, 'exception', arg) return self.trace_func
def detach_threads(): # tell all threads to stop tracing... THREADS_LOCK.acquire() for tid, pyThread in THREADS.items(): if not _INTERCEPTING_FOR_ATTACH: pyThread.detach = True pyThread.stepping = STEPPING_BREAK if pyThread._is_blocked: pyThread.unblock() if not _INTERCEPTING_FOR_ATTACH: THREADS.clear() BREAKPOINTS.clear() THREADS_LOCK.release()
def command_auto_resume(self): tid = read_int(self.conn) THREADS_LOCK.acquire() thread = THREADS[tid] THREADS_LOCK.release() stepping = thread.stepping if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): report_step_finished(tid) else: self.command_resume_all()
def __len__(self): return self.len_value # Specifies list of files not to debug. Can be extended by other modules # (the REPL does this for $attach support and not stepping into the REPL).
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None self.is_sending = False # stackless changes if stackless is not None: self._stackless_attach() if sys.platform == 'cli': self.frames = []
def context_dispatcher(self, old, new): self.stepping = STEPPING_NONE # for those tasklets that started before we started tracing # we need to make sure that the trace is set by patching # it in the context switch if old and new: if hasattr(new.frame, "f_trace") and not new.frame.f_trace: sys.call_tracing(new.settrace,(self.trace_func,))
def _stackless_schedule_cb(self, prev, next): current = stackless.getcurrent() if not current: return current_tf = current.trace_function try: current.trace_function = None self.stepping = STEPPING_NONE # If the current frame has no trace function, we may need to get it # from the previous frame, depending on how we ended up in the # callback. if current_tf is None: f_back = current.frame.f_back if f_back is not None: current_tf = f_back.f_trace if next is not None: # Assign our trace function to the current stack f = next.frame if next is current: f = f.f_back while f: if isinstance(f, types.FrameType): f.f_trace = self.trace_func f = f.f_back next.trace_function = self.trace_func finally: current.trace_function = current_tf
def trace_func(self, frame, event, arg): # If we're so far into process shutdown that sys is already gone, just stop tracing. if sys is None: return None elif self.is_sending: # https://pytools.codeplex.com/workitem/1864 # we're currently doing I/O w/ the socket, we don't want to deliver # any breakpoints or async breaks because we'll deadlock. Continue # to return the trace function so all of our frames remain # balanced. A better way to deal with this might be to do # sys.settrace(None) when we take the send lock, but that's much # more difficult because our send context manager is used both # inside and outside of the trace function, and so is used when # tracing is enabled and disabled, and so it's very easy to get our # current frame tracking to be thrown off... return self.trace_func try: # if should_debug_code(frame.f_code) is not true during attach # the current frame is None and a pop_frame will cause an exception and # break the debugger if self.cur_frame is None: # happens during attach, we need frame for blocking self.push_frame(frame) if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code): if self.detach: if stackless is not None: stackless.set_schedule_callback(None) stackless.tasklet.__call__ = self.__oldstacklesscall__ sys.settrace(None) return None self.async_break() return self._events[event](frame, arg) except (StackOverflowException, KeyboardInterrupt): # stack overflow, disable tracing return self.trace_func
def block_maybe_attach(self): will_block_now = True if self.stepping == STEPPING_ATTACH_BREAK: # only one thread should send the attach break in attach_lock.acquire() global attach_sent_break if attach_sent_break: will_block_now = False attach_sent_break = True attach_lock.release() probe_stack() stepping = self.stepping self.stepping = STEPPING_NONE def block_cond(): if will_block_now: if stepping == STEPPING_OVER or stepping == STEPPING_INTO: report_step_finished(self.id) return mark_all_threads_for_break(skip_thread = self) else: if not DETACHED: if stepping == STEPPING_ATTACH_BREAK: self.reported_process_loaded = True return report_process_loaded(self.id) update_all_thread_stacks(self) self.block(block_cond)
def mark_all_threads_for_break(stepping = STEPPING_BREAK, skip_thread = None): THREADS_LOCK.acquire() for thread in THREADS.values(): if thread is skip_thread: continue thread.stepping = stepping THREADS_LOCK.release()
def command_step_into(self): tid = read_int(self.conn) thread = get_thread_from_id(tid) if thread is not None: assert thread._is_blocked thread.stepping = STEPPING_INTO self.command_resume_all()
def command_step_over(self): # set step over tid = read_int(self.conn) thread = get_thread_from_id(tid) if thread is not None: assert thread._is_blocked if DJANGO_DEBUG: source_obj = get_django_frame_source(thread.cur_frame) if source_obj is not None: thread.django_stepping = True self.command_resume_all() return thread.stepping = STEPPING_OVER self.command_resume_all()
def command_resume_all(self): # resume all THREADS_LOCK.acquire() all_threads = list(THREADS.values()) THREADS_LOCK.release() for thread in all_threads: thread._block_starting_lock.acquire() if thread.stepping == STEPPING_BREAK or thread.stepping == STEPPING_ATTACH_BREAK: thread.stepping = STEPPING_NONE if thread._is_blocked: thread.unblock() thread._block_starting_lock.release()
def command_clear_stepping(self): tid = read_int(self.conn) thread = get_thread_from_id(tid) if thread is not None: thread.stepping = STEPPING_NONE
def new_external_thread(): thread = new_thread() if not attach_sent_break: # we are still doing the attach, make this thread break. thread.stepping = STEPPING_ATTACH_BREAK elif SEND_BREAK_COMPLETE: # user requested break all, make this thread break thread.stepping = STEPPING_BREAK sys.settrace(thread.trace_func)
def handle_return(self, frame, arg): self.pop_frame() if not DETACHED: stepping = self.stepping # only update stepping state when this frame is debuggable (matching handle_call) if stepping is not STEPPING_NONE and should_debug_code(frame.f_code): if stepping > STEPPING_OVER: self.stepping -= 1 elif stepping < STEPPING_OUT: self.stepping += 1 elif stepping in USER_STEPPING: if self.cur_frame is None or frame.f_code.co_name == "<module>" : # only return to user code modules if self.should_block_on_frame(frame): # restore back the module frame for the step out of a module self.push_frame(ModuleExitFrame(frame)) self.stepping = STEPPING_NONE update_all_thread_stacks(self) self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self))) self.pop_frame() elif self.should_block_on_frame(self.cur_frame): # if we're returning into non-user code then don't block in the # non-user code, wait until we hit user code again self.stepping = STEPPING_NONE update_all_thread_stacks(self) self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self))) # forward call to previous trace function, if any old_trace_func = self.prev_trace_func if old_trace_func is not None: old_trace_func(frame, 'return', arg) # restore previous frames trace function if there is one if self.trace_func_stack: self.prev_trace_func = self.trace_func_stack.pop()
def command_step_out(self): tid = read_int(self.conn) thread = get_thread_from_id(tid) if thread is not None: assert thread._is_blocked thread.stepping = STEPPING_OUT self.command_resume_all()
def new_thread(tid = None, set_break = False, frame = None): # called during attach w/ a thread ID provided. if tid == debugger_thread_id: return None cur_thread = Thread(tid) THREADS_LOCK.acquire() THREADS[cur_thread.id] = cur_thread THREADS_LOCK.release() cur_thread.push_frame(frame) if set_break: cur_thread.stepping = STEPPING_ATTACH_BREAK if not DETACHED: report_new_thread(cur_thread) return cur_thread