我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.print_stack()。
def lock(self, id=None): c = 0 waitTime = 5000 # in ms while True: if self.tryLock(waitTime, id): break c += 1 if self.debug: self.l.lock() try: print("Waiting for mutex lock (%0.1f sec). Traceback follows:" % (c*waitTime/1000.)) traceback.print_stack() if len(self.tb) > 0: print("Mutex is currently locked from:\n") print(self.tb[-1]) else: print("Mutex is currently locked from [???]") finally: self.l.unlock() #print 'lock', self, len(self.tb)
def assertEquals( exp, got ): """assertEquals(exp, got) Two objects test as "equal" if: * they are the same object as tested by the 'is' operator. * either object is a float or complex number and the absolute value of the difference between the two is less than 1e-8. * applying the equals operator ('==') returns True. """ from types import FloatType, ComplexType if exp is got: r = True elif ( type( exp ) in ( FloatType, ComplexType ) or type( got ) in ( FloatType, ComplexType ) ): r = abs( exp - got ) < 1e-8 else: r = ( exp == got ) if not r: print >>sys.stderr, "Error: expected <%s> but got <%s>" % ( repr( exp ), repr( got ) ) traceback.print_stack()
def get_one_hot_targets(target_file_path): target = [] one_hot_targets = [] n_target = 0 try : with open(target_file_path) as f : target = f.readlines() target = [t.strip('\n') for t in target] n_target = len(target) except IOError : print('Could not load the labels.txt file in the dataset. A ' 'dataset folder is expected in the "data/datasets" ' 'directory with the name that has been passed as an ' 'argument to this method. This directory should contain a ' 'file called labels.txt which contains a list of labels and ' 'corresponding folders for the labels with the same name as ' 'the labels.') traceback.print_stack() lbl_idxs = np.arange(n_target) one_hot_targets = np.zeros((n_target, n_target)) one_hot_targets[np.arange(n_target), lbl_idxs] = 1 return target, one_hot_targets, n_target
def safe_repr(o): """safe_repr(anything) -> string Returns a string representation of an object, or a string containing a traceback, if that object's __repr__ raised an exception. """ try: return repr(o) except: io = StringIO.StringIO() traceback.print_stack(file=io) whati = _determineClassName(o) swron = io.getvalue() gwith = id(o) you ='<%s instance at %s with repr error %s>' % ( whati,swron,gwith) return you
def handle_error(self, wrapper, exception, traceback_): print >> sys.stderr print >> sys.stderr, "---- location:" traceback.print_stack() print >> sys.stderr, "---- error:" traceback.print_tb(traceback_) try: stack = wrapper.stack_where_defined except AttributeError: print >> sys.stderr, "??:??: %s / %r" % (wrapper, exception) else: stack = list(stack) stack.reverse() for (filename, line_number, function_name, text) in stack: file_dir = os.path.dirname(os.path.abspath(filename)) if file_dir.startswith(this_script_dir): print >> sys.stderr, "%s:%i: %r" % (os.path.join("..", "bindings", "python", os.path.basename(filename)), line_number, exception) break return True
def run(): try: xbmc.executebuiltin("ActivateWindow(10147)") window = xbmcgui.Window(10147) xbmc.sleep(100) window.getControl(1).setLabel(translate(32000)) window.getControl(5).setText(translate(32065)) while xbmc.getCondVisibility("Window.IsActive(10147)"): xbmc.sleep(100) ret = xbmcgui.Dialog().yesno(translate(32000), translate(32067)) if ret: xbmc.executebuiltin("RunAddon(script.keymap)") except: traceback.print_stack() xbmc.executebuiltin("XBMC.Notification('"+translate(32000)+"','"+translate(32066)+"','2000','')")
def test_ki_enabled_after_yield_briefly(): @_core.enable_ki_protection async def protected(): await child(True) @_core.disable_ki_protection async def unprotected(): await child(False) async def child(expected): import traceback traceback.print_stack() assert _core.currently_ki_protected() == expected await _core.checkpoint() traceback.print_stack() assert _core.currently_ki_protected() == expected await protected() await unprotected() # This also used to be broken due to # https://bugs.python.org/issue29590
def GetBalance(self, wallet, address, as_string=False): if type(address) is UInt160: address = Crypto.ToAddress(address) invoke_args = [self.ScriptHash.ToString(), parse_param('balanceOf'), [parse_param(address, wallet)]] tx, fee, balanceResults, num_ops = TestInvokeContract(wallet, invoke_args, None, False) try: val = balanceResults[0].GetBigInteger() precision_divisor = pow(10, self.decimals) balance = Decimal(val) / Decimal(precision_divisor) if as_string: formatter_str = '.%sf' % self.decimals balance_str = format(balance, formatter_str) return balance_str return balance except Exception as e: logger.error("could not get balance: %s " % e) traceback.print_stack() return 0
def formatException(self, ei): """ Format and return the specified exception information as a string. This default implementation just uses traceback.print_exception() """ sio = io.StringIO() tb = ei[2] # See issues #9427, #1553375. Commented out for now. #if getattr(self, 'fullstack', False): # traceback.print_stack(tb.tb_frame.f_back, file=sio) traceback.print_exception(ei[0], ei[1], tb, None, sio) s = sio.getvalue() sio.close() if s[-1:] == "\n": s = s[:-1] return s
def create_dashboard(dashboard_name, dashboard_dir=None): if not dashboard_dir: dashboard_dir = "/etc/tendrl/monitoring-integration/grafana/dashboards" dashboard_path = os.path.join(dashboard_dir, "{}.json".format(dashboard_name)) if os.path.exists(dashboard_path): dashboard_data = utils.fread(dashboard_path) try: dashboard_json = json.loads(dashboard_data) response = _post_dashboard(dashboard_json) return response except exceptions.ConnectionFailedException: traceback.print_stack() raise exceptions.ConnectionFailedException else: raise exceptions.FileNotFoundException
def create_datasource(): try: config = maps.NamedDict(NS.config.data) url = "http://" + str(config.datasource_host) + ":" \ + str(config.datasource_port) datasource_json = {'name': config.datasource_name, 'type': config.datasource_type, 'url': url, 'access': config.access, 'basicAuth': config.basicAuth, 'isDefault': config.isDefault} response = _post_datasource(json.dumps(datasource_json)) return response except exceptions.ConnectionFailedException: logger.log("info", NS.get("publisher_id", None), {'message': str(traceback.print_stack())}) raise exceptions.ConnectionFailedException
def print_tb(func): """ This function is used as a decorate on a function to have the calling stack printed whenever that function is entered. This can be used like so: .. code-block:: python @print_tb def some_deeply_nested_function(...): """ @wraps(func) def run_func(*args, **kwargs): traceback.print_stack() return func(*args, **kwargs) return run_func
def createBestView(cursor, topXpercent): if(topXpercent < 0 or topXpercent > 100): print FAIL + "[TTC] ERROR: topXpercent not valid." + ENDC exit(-1) command = """ CREATE VIEW IF NOT EXISTS 'fastest%dPercent' AS select * from measurements join ( select v2.* from ( select *,max(bandwidth) as maxBandwidth from variant group by measurement_id ) v1 inner join variant v2 on v1.measurement_id=v2.measurement_id where v2.bandwidth >= (%f * v1.maxBandwidth) ) fast on measurements.measurement_id = fast.measurement_id; """%(topXpercent, 1.0 - float(topXpercent)/100.) try: cursor.execute(command) except sqlite3.Error as e: print FAIL + "[TTC] ERROR (sql):", e.args[0], ENDC traceback.print_stack() exit(-1)
def insertIntoVariant(cursor, blockA, blockB, prefetchDistance, BW, time, rankLoopOrder, rankBlocking, measurement_id, loop_id, tlbMisses, l2misses, _logFile): command = """INSERT INTO variant( blockA , blockB , prefetchDistance , bandwidth , time , tlbMisses, l2misses, rankLoopOrder, rankBlocking, measurement_id, loopOrder_id) VALUES (%d, %d, %d, %f, %f, %f, %f, %d, %d, %d, %d);"""%(blockA,blockB,prefetchDistance,BW,time, tlbMisses, l2misses, rankLoopOrder, rankBlocking,measurement_id, loop_id) try: cursor.execute(command) except sqlite3.Error as e: print command print FAIL + "[TTC] ERROR (sql):", e.args[0], ENDC traceback.print_stack() exit(-1) primaryKey = getLastPrimaryKey(cursor, _logFile) return primaryKey
def _error_handler(display, event): # By default, all errors are silently ignored: this has a better chance # of working than the default behaviour of quitting ;-) # # We've actually never seen an error that was our fault; they're always # driver bugs (and so the reports are useless). Nevertheless, set # environment variable PYGLET_DEBUG_X11 to 1 to get dumps of the error # and a traceback (execution will continue). import pyglet if pyglet.options['debug_x11']: event = event.contents buf = c_buffer(1024) xlib.XGetErrorText(display, event.error_code, buf, len(buf)) print('X11 error:', buf.value) print(' serial:', event.serial) print(' request:', event.request_code) print(' minor:', event.minor_code) print(' resource:', event.resourceid) import traceback print('Python stack trace (innermost last):') traceback.print_stack() return 0
def __init__(self, seconds, cb, *args, **kw): """Create a timer. seconds: The minimum number of seconds to wait before calling cb: The callback to call when the timer has expired *args: The arguments to pass to cb **kw: The keyword arguments to pass to cb This timer will not be run unless it is scheduled in a runloop by calling timer.schedule() or runloop.add_timer(timer). """ self.seconds = seconds self.tpl = cb, args, kw self.called = False if _g_debug: self.traceback = six.StringIO() traceback.print_stack(file=self.traceback)
def update_writes(self, line, pc, lo, hi, stage, origpc=None, substage=None): if not pc: (path, lineno) = line.split(':') lineno = int(lineno) else: (path, lineno) = ('', 0) if not origpc: origpc = pc w = WriteDstResult(path, lineno, '', [intervaltree.Interval(lo, hi)], pc, origpc, substage_name=substage) if lo > hi: print "%x > %x at %x" % (lo, hi, pc) traceback.print_stack() self.writerangetable.add_dsts_entry(w)
def ex(self, _exception): """ Log func for exceptions """ # Very ugly hack, will be fixed tmp_file_name = "/tmp/{0}{1}.txt".format("wsexc", random.randint(1, 9999)) fh = open(tmp_file_name, "w") traceback.print_stack(file=fh) fh.close() trace_text = file_get_contents(tmp_file_name) os.remove(tmp_file_name) exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] log_str = "Exception {1}:\n{2} ({3}): {4}\n{0}\n{5}{0}\n".format( "{0:=^20}".format(""), exc_type, fname, exc_tb.tb_lineno, str(_exception), trace_text, ) self.log(log_str, _print=True)
def __init__(self, mainMenu): cmd.Cmd.__init__(self) self.mainMenu = mainMenu self.doc_header = 'Commands' # set the prompt text self.prompt = '(EmPyre: '+helpers.color("agents", color="blue")+') > ' agents = self.mainMenu.agents.get_agents() messages.display_agents(agents) # def preloop(self): # traceback.print_stack() # print a nicely formatted help menu # stolen/adapted from recon-ng
def __init__(self, mainMenu): cmd.Cmd.__init__(self) self.doc_header = 'Listener Commands' self.mainMenu = mainMenu # get all the the stock listener options self.options = self.mainMenu.listeners.get_listener_options() # set the prompt text self.prompt = '(EmPyre: '+helpers.color("listeners", color="blue")+') > ' # display all active listeners on menu startup messages.display_listeners(self.mainMenu.listeners.get_listeners()) # def preloop(self): # traceback.print_stack() # print a nicely formatted help menu # stolen/adapted from recon-ng
def __init__(self, mainMenu, moduleName, agent=None): cmd.Cmd.__init__(self) self.doc_header = 'Module Commands' self.mainMenu = mainMenu # get the current module/name self.moduleName = moduleName self.module = self.mainMenu.modules.modules[moduleName] # set the prompt text self.prompt = '(EmPyre: '+helpers.color(self.moduleName, color="blue")+') > ' # if this menu is being called from an agent menu if agent: # resolve the agent sessionID to a name, if applicable agent = self.mainMenu.agents.get_agent_name(agent) self.module.options['Agent']['Value'] = agent # def preloop(self): # traceback.print_stack()
def unwrap_args(args, kw): #debug! # c=0 # for x in args: # if isinstance(x, Expr): # print "arg %d, EXPR: %s" % (c, str(x)) # else: # if type(x) == types.InstanceType: # print "arg %d, Z3: %s" %(c, x.__class__) # else: # print "arg %d, Z3: %s" %(c, type(x)) # print traceback.print_stack() # c+=1 newargs=[x.__backend__() if isinstance(x, Expr) else x for x in args] if isinstance(kw, dict): newkw=dict(starmap(lambda k,v: (k, v if not isinstance(v, Expr) else v.__backend__()), kw.iteritems())) else: newkw=kw return (newargs, newkw)
def checkNumbering(self): """Verify the internal order of the cards""" err = 0 indent = 0 for i, card in enumerate(self.cardlist): if card._pos != i: err += 1 say("ERROR: Cards out of order pos=#%d it should be #%d" % (card.pos() + 1, i)) say(str(card)) if card.enable and card.tag in _INDENT_DEC: indent = max(0, indent - 1) if card.enable and card.indent() != indent: say("ERROR: Indent out of order pos=#%d card._indent=%d it should be %d" % (i, card._indent, indent)) say(str(card)) if card.enable and card.tag in _INDENT_INC: indent += 1 if err > 0: say("ERROR: %d cards found out of order. Correcting error" % (err)) say("*** Please contact %s ***" % (__email__)) import traceback traceback.print_stack() self.renumber()
def autostart(reason, *args, **kwargs): if reason == 0: if hasattr(PluginComponent, 'pluginHider_baseGetPlugins'): print("[PluginHider] Something went wrong as our autostart handler was called multiple times for startup, printing traceback and ignoring.") import traceback, sys traceback.print_stack(limit=5, file=sys.stdout) else: PluginComponent.pluginHider_baseGetPlugins = PluginComponent.getPlugins PluginComponent.getPlugins = PluginComponent_getPlugins else: if hasattr(PluginComponent, 'pluginHider_baseGetPlugins'): PluginComponent.getPlugins = PluginComponent.pluginHider_baseGetPlugins del PluginComponent.pluginHider_baseGetPlugins else: print("[PluginHider] Something went wrong as our autostart handler was called multiple times for shutdown, printing traceback and ignoring.") import traceback, sys traceback.print_stack(limit=5, file=sys.stdout)
def handleError(self, record): print traceback.print_stack()
def write(self, x): self.stdout.write(x) traceback.print_stack()
def run(self): while True: with self.lock: if self._stop is True: return print("\n============= THREAD FRAMES: ================") for id, frame in sys._current_frames().items(): if id == threading.current_thread().ident: continue print("<< thread %d >>" % id) traceback.print_stack(frame) print("===============================================\n") time.sleep(self.interval)
def __getattr__(self, attr): """Return the terminal with the given name""" if attr not in self.terminals: raise AttributeError(attr) else: import traceback traceback.print_stack() print("Warning: use of node.terminalName is deprecated; use node['terminalName'] instead.") return self.terminals[attr]
def __getattr__(self, attr): ## Leaving this undocumented because I might like to remove it in the future.. #print type(self), attr if 'names' not in self.__dict__: raise AttributeError(attr) if attr in self.names: import traceback traceback.print_stack() print("Warning: Use of Parameter.subParam is deprecated. Use Parameter.param(name) instead.") return self.param(attr) else: raise AttributeError(attr)
def assertException( exceptionType, f ): """Assert that an exception of type \var{exceptionType} is thrown when the function \var{f} is evaluated. """ try: f() except exceptionType: assert True else: print >>sys.stderr, "Error: expected <%s> to be thrown by function" % exceptionType.__name__ traceback.print_stack()
def assertFalse( b ): """assertFalse(b) """ if b: print >>sys.stderr, "Error: expected value to be False" traceback.print_stack()
def assertTrue( b ): if not b: print >>sys.stderr, "Error: expected value to be True" traceback.print_stack() #### # Test NamedSequences ####
def fake_conn_request(self): # print a stacktrace to illustrate where the unmocked API call # is being made from traceback.print_stack() # forcing a test failure for missing mock self.missing_mocks = True
def _print_greenthreads(): for i, gt in enumerate(_find_objects(greenlet.greenlet)): print(i, gt) traceback.print_stack(gt.gr_frame) print()
def _print_nativethreads(): for threadId, stack in sys._current_frames().items(): print(threadId) traceback.print_stack(stack) print()
def __call__(self, *args, **kw): import traceback log.msg('instance method %s.%s' % (reflect.qual(self.my_class), self.name)) log.msg('being called with %r %r' % (args, kw)) traceback.print_stack(file=log.logfile) assert 0
def f(): traceback.print_stack(file=sys.stdout)
def traceback_3(self): index = self.tb_preparation() print('Traceback (most recent call last):') traceback.print_stack(limit=-index) sys.tracebacklimit = None ############################################################################## # TypeError session. ############################################################################## # ?????? # The argument(s) must be (a) number(s).