我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bdb.Bdb()。
def bp_commands(self,frame): """Call every command that was set for the current active breakpoint (if there is one). Returns True if the normal interaction function must be called, False otherwise.""" # self.currentbp is set in bdb in Bdb.break_here if a breakpoint was hit if getattr(self, "currentbp", False) and \ self.currentbp in self.commands: currentbp = self.currentbp self.currentbp = 0 lastcmd_back = self.lastcmd self.setup(frame, None) for line in self.commands[currentbp]: self.onecmd(line) self.lastcmd = lastcmd_back if not self.commands_silent[currentbp]: self.print_stack_entry(self.stack[self.curindex]) if self.commands_doprompt[currentbp]: self.cmdloop() self.forget() return return 1
def pull_actions(self): # receive a remote procedure call from the frontend: # returns True if action processed # None when 'run' notification is received (see 'startup') request = self.pipe.recv() if request.get("method") == 'run': return None response = {'version': '1.1', 'id': request.get('id'), 'result': None, 'error': None} try: # dispatch message (JSON RPC like) method = getattr(self, request['method']) response['result'] = method.__call__(*request['args'], **request.get('kwargs', {})) except Exception as e: response['error'] = {'code': 0, 'message': str(e)} # send the result for normal method calls, not for notifications if request.get('id'): self.pipe.send(response) return True # Override Bdb methods
def trace_dispatch(self, frame, event, arg): # check for non-interaction rpc (set_breakpoint, interrupt) while self.allow_interruptions and self.pipe.poll(): self.pull_actions() # check for non-interaction rpc (set_breakpoint, interrupt) while self.pipe.poll(): self.pull_actions() if (frame.f_code.co_filename, frame.f_lineno) not in breaks and \ self.fast_continue: return self.trace_dispatch # process the frame (see Bdb.trace_dispatch) ##if self.fast_continue: ## return self.trace_dispatch if self.quitting: return # None if event == 'line': return self.dispatch_line(frame) if event == 'call': return self.dispatch_call(frame, arg) if event == 'return': return self.dispatch_return(frame, arg) if event == 'exception': return self.dispatch_exception(frame, arg) return self.trace_dispatch
def __init__(self, pipe, redirect_stdio=True, allow_interruptions=False, skip=[__name__]): kwargs = {} if sys.version_info > (2, 7): kwargs['skip'] = skip bdb.Bdb.__init__(self, **kwargs) self.frame = None self.i = 1 # sequential RPC call id self.waiting = False self.pipe = pipe # for communication self._wait_for_mainpyfile = False self._wait_for_breakpoint = False self.mainpyfile = "" self._lineno = None # last listed line numbre # replace system standard input and output (send them thru the pipe) if redirect_stdio: sys.stdin = self sys.stdout = self sys.stderr = self if allow_interruptions: # fake breakpoint to prevent removing trace_dispatch on set_continue self.breaks[None] = [] self.allow_interruptions = allow_interruptions self.burst = 0 # do not send notifications ("burst" mode) self.params = {} # optional parameters for interaction
def pull_actions(self): # receive a remote procedure call from the frontend: # returns True if action processed # None when 'run' notification is received (see 'startup') request = self.pipe.recv() if request.get("method") == 'run': return None response = {'version': '1.1', 'id': request.get('id'), 'result': None, 'error': None} try: # dispatch message (JSON RPC like) method = getattr(self, request['method']) response['result'] = method.__call__(*request['args'], **request.get('kwargs', {})) except Exception, e: response['error'] = {'code': 0, 'message': str(e)} # send the result for normal method calls, not for notifications if request.get('id'): self.pipe.send(response) return True # Override Bdb methods
def trace_dispatch(self, frame, event, arg): # check for non-interaction rpc (set_breakpoint, interrupt) while self.allow_interruptions and self.pipe.poll(): self.pull_actions() # process the frame (see Bdb.trace_dispatch) if self.quitting: return # None if event == 'line': return self.dispatch_line(frame) if event == 'call': return self.dispatch_call(frame, arg) if event == 'return': return self.dispatch_return(frame, arg) if event == 'exception': return self.dispatch_exception(frame, arg) return self.trace_dispatch
def execRcLines(self): if not self.rcLines: return # local copy because of recursion rcLines = self.rcLines rcLines.reverse() # execute every line only once self.rcLines = [] while rcLines: line = rcLines.pop().strip() if line and line[0] != '#': if self.onecmd(line): # if onecmd returns True, the command wants to exit # from the interaction, save leftover rc lines # to execute before next interaction self.rcLines += reversed(rcLines) return True # Override Bdb methods
def bp_commands(self, frame): """Call every command that was set for the current active breakpoint (if there is one). Returns True if the normal interaction function must be called, False otherwise.""" # self.currentbp is set in bdb in Bdb.break_here if a breakpoint was hit if getattr(self, "currentbp", False) and \ self.currentbp in self.commands: currentbp = self.currentbp self.currentbp = 0 lastcmd_back = self.lastcmd self.setup(frame, None) for line in self.commands[currentbp]: self.onecmd(line) self.lastcmd = lastcmd_back if not self.commands_silent[currentbp]: self.print_stack_entry(self.stack[self.curindex]) if self.commands_doprompt[currentbp]: self._cmdloop() self.forget() return return 1
def __init__(self): self.debugApplication = None self.debuggingThread = None self.debuggingThreadStateHandle = None self.stackSnifferCookie = self.stackSniffer = None self.codeContainerProvider = None self.debuggingThread = None self.breakFlags = None self.breakReason = None self.appDebugger = None self.appEventConnection = None self.logicalbotframe = None # Anything at this level or below does not exist! self.currentframe = None # The frame we are currently in. self.recursiveData = [] # Data saved for each reentery on this thread. bdb.Bdb.__init__(self) self._threadprotectlock = thread.allocate_lock() self.reset()
def dispatch_line(self, frame): traceenter("dispatch_line", _dumpf(frame), _dumpf(self.botframe)) # trace("logbotframe is", _dumpf(self.logicalbotframe), "botframe is", self.botframe) if frame is self.logicalbotframe: trace("dispatch_line", _dumpf(frame), "for bottom frame returing tracer") # The next code executed in the frame above may be a builtin (eg, apply()) # in which sys.trace needs to be set. sys.settrace(self.trace_dispatch) # And return the tracer incase we are about to execute Python code, # in which case sys tracer is ignored! return self.trace_dispatch if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None: trace("dispatch_line has no document for", _dumpf(frame), "- skipping trace!") return None self.currentframe = frame # So the stack sniffer knows our most recent, debuggable code. return bdb.Bdb.dispatch_line(self, frame)
def dispatch_call(self, frame, arg): traceenter("dispatch_call",_dumpf(frame)) frame.f_locals['__axstack_address__'] = axdebug.GetStackAddress() if frame is self.botframe: trace("dispatch_call is self.botframe - returning tracer") return self.trace_dispatch # Not our bottom frame. If we have a document for it, # then trace it, otherwise run at full speed. if self.codeContainerProvider.FromFileName(frame.f_code.co_filename) is None: trace("dispatch_call has no document for", _dumpf(frame), "- skipping trace!") ## sys.settrace(None) return None return self.trace_dispatch # rc = bdb.Bdb.dispatch_call(self, frame, arg) # trace("dispatch_call", _dumpf(frame),"returned",rc) # return rc
def __init__(self): self.debugApplication = None self.debuggingThread = None self.debuggingThreadStateHandle = None self.stackSnifferCookie = self.stackSniffer = None self.codeContainerProvider = None self.debuggingThread = None self.breakFlags = None self.breakReason = None self.appDebugger = None self.appEventConnection = None self.logicalbotframe = None # Anything at this level or below does not exist! self.currentframe = None # The frame we are currently in. self.recursiveData = [] # Data saved for each reentery on this thread. bdb.Bdb.__init__(self) self._threadprotectlock = _thread.allocate_lock() self.reset()
def reset(self): bdb.Bdb.reset(self) self.forget()
def execRcLines(self): if self.rcLines: # Make local copy because of recursion rcLines = self.rcLines # executed only once self.rcLines = [] for line in rcLines: line = line[:-1] if len(line) > 0 and line[0] != '#': self.onecmd(line) # Override Bdb methods
def __init__(self, pipe, redirect_stdio=True, allow_interruptions=False, use_speedups=True, skip=[__name__]): global breaks kwargs = {} if sys.version_info > (2, 7): kwargs['skip'] = skip bdb.Bdb.__init__(self, **kwargs) self.frame = None self.i = 1 # sequential RPC call id self.waiting = False self.pipe = pipe # for communication self._wait_for_mainpyfile = False self._wait_for_breakpoint = False self.mainpyfile = "" self._lineno = None # last listed line numbre # ignore filenames (avoid spurious interaction specially on py2) self.ignore_files = [self.canonic(f) for f in (__file__, bdb.__file__)] # replace system standard input and output (send them thru the pipe) self.old_stdio = sys.stdin, sys.stdout, sys.stderr if redirect_stdio: sys.stdin = self sys.stdout = self sys.stderr = self if allow_interruptions: # fake breakpoint to prevent removing trace_dispatch on set_continue self.breaks[None] = [] self.allow_interruptions = allow_interruptions self.burst = 0 # do not send notifications ("burst" mode) self.params = {} # optional parameters for interaction # flags to reduce overhead (only stop at breakpoint or interrupt) self.use_speedups = use_speedups self.fast_continue = False