我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用thread.cur_frame()。
def command_set_lineno(self): tid = read_int(self.conn) fid = read_int(self.conn) lineno = read_int(self.conn) try: THREADS_LOCK.acquire() THREADS[tid].cur_frame.f_lineno = lineno newline = THREADS[tid].cur_frame.f_lineno THREADS_LOCK.release() with _SendLockCtx: write_bytes(self.conn, SETL) write_int(self.conn, 1) write_int(self.conn, tid) write_int(self.conn, newline) except: with _SendLockCtx: write_bytes(self.conn, SETL) write_int(self.conn, 0) write_int(self.conn, tid) write_int(self.conn, 0)
def command_auto_resume(self): tid = read_int(self.conn) THREADS_LOCK.acquire() thread = THREADS[tid] THREADS_LOCK.release() stepping = thread.stepping if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): report_step_finished(tid) else: self.command_resume_all()
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None self.is_sending = False # stackless changes if stackless is not None: self._stackless_attach() if sys.platform == 'cli': self.frames = []
def push_frame(self, frame): self.cur_frame = frame self.frames.append(frame)
def pop_frame(self): self.frames.pop() self.cur_frame = self.frames[-1]
def push_frame(self, frame): self.cur_frame = frame
def pop_frame(self): self.cur_frame = self.cur_frame.f_back
def trace_func(self, frame, event, arg): # If we're so far into process shutdown that sys is already gone, just stop tracing. if sys is None: return None elif self.is_sending: # https://pytools.codeplex.com/workitem/1864 # we're currently doing I/O w/ the socket, we don't want to deliver # any breakpoints or async breaks because we'll deadlock. Continue # to return the trace function so all of our frames remain # balanced. A better way to deal with this might be to do # sys.settrace(None) when we take the send lock, but that's much # more difficult because our send context manager is used both # inside and outside of the trace function, and so is used when # tracing is enabled and disabled, and so it's very easy to get our # current frame tracking to be thrown off... return self.trace_func try: # if should_debug_code(frame.f_code) is not true during attach # the current frame is None and a pop_frame will cause an exception and # break the debugger if self.cur_frame is None: # happens during attach, we need frame for blocking self.push_frame(frame) if self.stepping == STEPPING_BREAK and should_debug_code(frame.f_code): if self.detach: if stackless is not None: stackless.set_schedule_callback(None) stackless.tasklet.__call__ = self.__oldstacklesscall__ sys.settrace(None) return None self.async_break() return self._events[event](frame, arg) except (StackOverflowException, KeyboardInterrupt): # stack overflow, disable tracing return self.trace_func
def block(self, block_lambda, keep_stopped_on_line = False): """blocks the current thread until the debugger resumes it""" assert not self._is_blocked #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves # send thread frames before we block self.enum_thread_frames_locally() if not keep_stopped_on_line: self.stopped_on_line = self.cur_frame.f_lineno # need to synchronize w/ sending the reason we're blocking self._block_starting_lock.acquire() self._is_blocked = True block_lambda() self._block_starting_lock.release() while not DETACHED: self._block_lock.acquire() if self.unblock_work is None: break # the debugger wants us to do something, do it, and then block again self._is_working = True self.unblock_work() self.unblock_work = None self._is_working = False self._block_starting_lock.acquire() assert self._is_blocked self._is_blocked = False self._block_starting_lock.release()
def run_on_thread(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL): self._block_starting_lock.acquire() if not self._is_blocked: report_execution_error('<expression cannot be evaluated at this time>', execution_id) elif not self._is_working: self.schedule_work(lambda : self.run_locally(text, cur_frame, execution_id, frame_kind, repr_kind)) else: report_execution_error('<error: previous evaluation has not completed>', execution_id) self._block_starting_lock.release()
def run_on_thread_no_report(self, text, cur_frame, frame_kind): self._block_starting_lock.acquire() if not self._is_blocked: pass elif not self._is_working: self.schedule_work(lambda : self.run_locally_no_report(text, cur_frame, frame_kind)) else: pass self._block_starting_lock.release()
def enum_child_on_thread(self, text, cur_frame, execution_id, frame_kind): self._block_starting_lock.acquire() if not self._is_working and self._is_blocked: self.schedule_work(lambda : self.enum_child_locally(text, cur_frame, execution_id, frame_kind)) self._block_starting_lock.release() else: self._block_starting_lock.release() report_children(execution_id, [])
def get_locals(self, cur_frame, frame_kind): if frame_kind == FRAME_KIND_DJANGO: locs = {} # iterate going forward, so later items replace earlier items for d in cur_frame.f_locals['context'].dicts: # hasattr check to defend against someone passing a bad dictionary value # and us breaking the app. if hasattr(d, 'keys') and d != DJANGO_BUILTINS: for key in d.keys(): locs[key] = d[key] else: locs = cur_frame.f_locals return locs
def run_locally(self, text, cur_frame, execution_id, frame_kind, repr_kind = PYTHON_EVALUATION_RESULT_REPR_KIND_NORMAL): try: code = self.compile(text, cur_frame) res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind)) self.locals_to_fast(cur_frame) # Report any updated variable values first self.enum_thread_frames_locally() report_execution_result(execution_id, res, repr_kind) except: # Report any updated variable values first self.enum_thread_frames_locally() report_execution_exception(execution_id, sys.exc_info())
def run_locally_no_report(self, text, cur_frame, frame_kind): code = self.compile(text, cur_frame) res = eval(code, cur_frame.f_globals, self.get_locals(cur_frame, frame_kind)) self.locals_to_fast(cur_frame) sys.displayhook(res)
def command_step_over(self): # set step over tid = read_int(self.conn) thread = get_thread_from_id(tid) if thread is not None: assert thread._is_blocked if DJANGO_DEBUG: source_obj = get_django_frame_source(thread.cur_frame) if source_obj is not None: thread.django_stepping = True self.command_resume_all() return thread.stepping = STEPPING_OVER self.command_resume_all()
def execute_code_no_report(self, text, tid, fid, frame_kind): # execute given text in specified frame, without sending back the results thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind) if thread is not None and cur_frame is not None: thread.run_locally_no_report(text, cur_frame, frame_kind)
def command_enum_children(self): # execute given text in specified frame text = read_string(self.conn) tid = read_int(self.conn) # thread id fid = read_int(self.conn) # frame id eid = read_int(self.conn) # execution id frame_kind = read_int(self.conn) # frame kind thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind) if thread is not None and cur_frame is not None: thread.enum_child_on_thread(text, cur_frame, eid, frame_kind)
def get_thread_and_frame(self, tid, fid, frame_kind): thread = get_thread_from_id(tid) cur_frame = None if thread is not None: cur_frame = thread.cur_frame for i in xrange(fid): cur_frame = cur_frame.f_back return thread, cur_frame
def handle_return(self, frame, arg): self.pop_frame() if not DETACHED: stepping = self.stepping # only update stepping state when this frame is debuggable (matching handle_call) if stepping is not STEPPING_NONE and should_debug_code(frame.f_code): if stepping > STEPPING_OVER: self.stepping -= 1 elif stepping < STEPPING_OUT: self.stepping += 1 elif stepping in USER_STEPPING: if self.cur_frame is None or frame.f_code.co_name == "<module>" : # only return to user code modules if self.should_block_on_frame(frame): # restore back the module frame for the step out of a module self.push_frame(ModuleExitFrame(frame)) self.stepping = STEPPING_NONE update_all_thread_stacks(self) self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self))) self.pop_frame() elif self.should_block_on_frame(self.cur_frame): # if we're returning into non-user code then don't block in the # non-user code, wait until we hit user code again self.stepping = STEPPING_NONE update_all_thread_stacks(self) self.block(lambda: (report_step_finished(self.id), mark_all_threads_for_break(skip_thread = self))) # forward call to previous trace function, if any old_trace_func = self.prev_trace_func if old_trace_func is not None: old_trace_func(frame, 'return', arg) # restore previous frames trace function if there is one if self.trace_func_stack: self.prev_trace_func = self.trace_func_stack.pop()
def compile(self, text, cur_frame): try: code = compile(text, '<debug input>', 'eval') except: code = compile(text, '<debug input>', 'exec') return code
def command_execute_code(self): # execute given text in specified frame text = read_string(self.conn) tid = read_int(self.conn) # thread id fid = read_int(self.conn) # frame id eid = read_int(self.conn) # execution id frame_kind = read_int(self.conn) repr_kind = read_int(self.conn) thread, cur_frame = self.get_thread_and_frame(tid, fid, frame_kind) if thread is not None and cur_frame is not None: thread.run_on_thread(text, cur_frame, eid, frame_kind, repr_kind)