我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用thread.stopped_on_line()。
def command_auto_resume(self): tid = read_int(self.conn) THREADS_LOCK.acquire() thread = THREADS[tid] THREADS_LOCK.release() stepping = thread.stepping if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): report_step_finished(tid) else: self.command_resume_all()
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None self.is_sending = False # stackless changes if stackless is not None: self._stackless_attach() if sys.platform == 'cli': self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False): """blocks the current thread until the debugger resumes it""" assert not self._is_blocked #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves # send thread frames before we block self.enum_thread_frames_locally() if not keep_stopped_on_line: self.stopped_on_line = self.cur_frame.f_lineno # need to synchronize w/ sending the reason we're blocking self._block_starting_lock.acquire() self._is_blocked = True block_lambda() self._block_starting_lock.release() while not DETACHED: self._block_lock.acquire() if self.unblock_work is None: break # the debugger wants us to do something, do it, and then block again self._is_working = True self.unblock_work() self.unblock_work = None self._is_working = False self._block_starting_lock.acquire() assert self._is_blocked self._is_blocked = False self._block_starting_lock.release()