Python sys 模块,_current_frames() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys._current_frames()。
def print_all_stacktraces():
print("\n*** STACKTRACE - START ***\n")
code = []
for threadId, stack in sys._current_frames().items():
threadName = ''
for t in threading.enumerate():
if t.ident == threadId:
threadName = t.name
code.append("\n# ThreadID: %s %s" % (threadId, threadName))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
for line in code:
print(line)
print("\n*** STACKTRACE - END ***\n")
def process_sample(self, signal_frame, sample_time, main_thread_id):
if self.profile:
start = time.clock()
current_frames = sys._current_frames()
items = current_frames.items()
for thread_id, thread_frame in items:
if thread_id == main_thread_id:
thread_frame = signal_frame
stack = self.recover_stack(thread_frame)
if stack:
current_node = self.profile
for frame in reversed(stack):
current_node = current_node.find_or_add_child(str(frame))
current_node.increment(sample_time, 1)
thread_id, thread_frame, stack = None, None, None
items = None
current_frames = None
self.profile._overhead += (time.clock() - start)
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def sigquit_handler(sig, frame):
"""Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
e.g. kill -s QUIT <PID> or CTRL+\
"""
print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread: {}({})"
.format(id_to_name.get(thread_id, ""), thread_id))
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format(filename, line_number, name))
if line:
code.append(" {}".format(line.strip()))
print("\n".join(code))
def _get_current_traceback(self, thread ):
'''????????????
:param thread: ????????
:type thread: Thread
'''
for thread_id, stack in sys._current_frames().items():
if thread_id != thread.ident:
continue
tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
for filename, lineno, name, line in traceback.extract_stack(stack):
tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name)
if line:
tb += " %s\n" % (line.strip())
return tb
else:
raise RuntimeError("thread not found")
def get_thread_traceback(thread):
'''????????????
:param thread: ????????
:type thread: Thread
'''
for thread_id, stack in sys._current_frames().items():
if thread_id != thread.ident:
continue
tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
for filename, lineno, name, line in traceback.extract_stack(stack):
tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name)
if line:
tb += " %s\n" % (line.strip())
return tb
else:
raise RuntimeError("thread not found")
def enable_tracing(self):
""" Enable tracing if it is disabled and debugged program is running,
else do nothing.
Do this on all threads but the debugger thread.
:return: True if tracing has been enabled, False else.
"""
_logger.x_debug("enable_tracing()")
#self.dump_tracing_state("before enable_tracing()")
if not self.tracing_enabled and self.execution_started:
# Restore or set trace function on all existing frames appart from
# debugger
threading.settrace(self._tracer) # then enable on all threads to come
for thr in threading.enumerate():
if thr.ident != self.debugger_thread_ident: # skip debugger thread
a_frame = sys._current_frames()[thr.ident]
while a_frame:
a_frame.f_trace = self._tracer
a_frame = a_frame.f_back
iksettrace._set_trace_on(self._tracer, self.debugger_thread_ident)
self.tracing_enabled = True
#self.dump_tracing_state("after enable_tracing()")
return self.tracing_enabled
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def timeout_response() -> chalice.Response:
"""
Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the
current thread, are provided in the response object.
"""
frames = sys._current_frames()
current_threadid = threading.get_ident()
trace_dump = {
thread_id: traceback.format_stack(frame)
for thread_id, frame in frames.items()
if thread_id != current_threadid}
problem = {
'status': requests.codes.gateway_timeout,
'code': "timed_out",
'title': "Timed out processing request.",
'traces': trace_dump,
}
return chalice.Response(
status_code=problem['status'],
headers={"Content-Type": "application/problem+json"},
body=json.dumps(problem),
)
def setUpClass(cls):
cls.__lockup_timestamp__ = time.time()
def check_twisted():
while time.time() - cls.__lockup_timestamp__ < cls.MAX_TEST_TIME:
time.sleep(2)
# If the test class completed normally, exit
if not cls.__testing__:
return
# If we made it here, there is a serious issue which we cannot recover from.
# Most likely the Twisted threadpool got into a deadlock while shutting down.
import os, traceback
print >> sys.stderr, "The test-suite locked up! Force quitting! Thread dump:"
for tid, stack in sys._current_frames().items():
if tid != threading.currentThread().ident:
print >> sys.stderr, "THREAD#%d" % tid
for line in traceback.format_list(traceback.extract_stack(stack)):
print >> sys.stderr, "|", line[:-1].replace('\n', '\n| ')
os._exit(1)
t = threading.Thread(target=check_twisted)
t.daemon = True
t.start()
def test_clear_threads_states_after_fork(self):
# Issue #17094: check that threads states are cleared after fork()
# start a bunch of threads
threads = []
for i in range(16):
t = threading.Thread(target=lambda : time.sleep(0.3))
threads.append(t)
t.start()
pid = os.fork()
if pid == 0:
# check that threads states have been cleared
if len(sys._current_frames()) == 1:
os._exit(0)
else:
os._exit(1)
else:
_, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
for t in threads:
t.join()
def sigquit_handler(sig, frame):
"""Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
e.g. kill -s QUIT <PID> or CTRL+\
"""
print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread: {}({})"
.format(id_to_name.get(thread_id, ""), thread_id))
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format(filename, line_number, name))
if line:
code.append(" {}".format(line.strip()))
print("\n".join(code))
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def iter_thread_frames():
main_thread_frame = None
for ident, frame in sys._current_frames().items():
if IDENT_TO_UUID.get(ident) == MAIN_UUID:
main_thread_frame = frame
# the MainThread should be shown in it's "greenlet" version
continue
yield ident, frame
for thread in threading.enumerate():
if not getattr(thread, '_greenlet', None):
# some inbetween state, before greenlet started or after it died?...
pass
elif thread._greenlet.gr_frame:
yield thread.ident, thread._greenlet.gr_frame
else:
# a thread with greenlet but without gr_frame will be fetched from sys._current_frames
# If we switch to another greenlet by the time we get there we will get inconsistent dup of threads.
# TODO - make best-effort attempt to show coherent thread dump
yield thread.ident, main_thread_frame
def test_clear_threads_states_after_fork(self):
# Issue #17094: check that threads states are cleared after fork()
# start a bunch of threads
threads = []
for i in range(16):
t = threading.Thread(target=lambda : time.sleep(0.3))
threads.append(t)
t.start()
pid = os.fork()
if pid == 0:
# check that threads states have been cleared
if len(sys._current_frames()) == 1:
os._exit(0)
else:
os._exit(1)
else:
_, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
for t in threads:
t.join()
def threadDump(signum, frame):
"""Signal handler for dumping thread stack frames to stdout."""
print
print "App server has been signaled to attempt a thread dump."
print
print "Thread stack frame dump at", asclocaltime()
sys.stdout.flush()
frames = sys._current_frames()
print
print "-" * 79
print
for threadID in sorted(frames):
frame = frames[threadID]
print "Thread ID: %d (reference count = %d)" % (
threadID, sys.getrefcount(frame))
print ''.join(traceback.format_list(traceback.extract_stack(frame)))
print "-" * 79
sys.stdout.flush()
def get_full_thread_dump():
"""Returns a string containing a traceback for all threads"""
output = io.StringIO()
time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
thread_names = {}
for thread in threading.enumerate():
thread_names[thread.ident] = thread.name
output.write("\n>>>> Begin stack trace (%s) >>>>\n" % time)
for threadId, stack in current_frames().items():
output.write(
"\n# ThreadID: %s (%s)\n" %
(threadId, thread_names.get(threadId, "unknown")))
for filename, lineno, name, line in traceback.extract_stack(stack):
output.write(
'File: "%s", line %d, in %s\n' %
(filename, lineno, name))
if line:
output.write(" %s\n" % (line.strip()))
output.write("\n<<<< End stack trace <<<<\n\n")
thread_dump = output.getvalue()
output.close()
return thread_dump
def _captureThreadStacks(self):
"""Capture the stacks for all currently running threads.
:return: A list of ``(thread-description, stack)`` tuples. See
`traceback.format_stack` for the format of ``stack``.
"""
threads = {t.ident: t for t in threading.enumerate()}
def describe(ident):
if ident in threads:
return repr(threads[ident])
else:
return "<*Unknown* %d>" % ident
return [
(describe(ident), traceback.format_stack(frame))
for ident, frame in sys._current_frames().items()
]
def _debug_lock_release(self):
errbuf = ""
owner = self._RLock__owner
if not owner:
return errbuf
# Get stack of current owner, if lock is owned.
for tid, stack in sys._current_frames().items():
if tid != owner.ident:
continue
errbuf += "Stack of owner:\n"
for filenm, lno, func, txt in \
traceback.extract_stack(stack):
errbuf += " File: \"{0}\", line {1:d},in {2}".format(
filenm, lno, func)
if txt:
errbuf += "\n {0}".format(txt.strip())
errbuf += "\n"
break
return errbuf
def dump_stacks(self):
'''
Dumps the stack of all threads. This function
is meant for debugging. Useful when a deadlock happens.
borrowed from: http://blog.ziade.org/2012/05/25/zmq-and-gevent-debugging-nightmares/
'''
dump = []
# threads
threads = dict([(th.ident, th.name)
for th in threading.enumerate()])
for thread, frame in sys._current_frames().items():
if thread not in threads: continue
dump.append('Thread 0x%x (%s)\n' % (thread, threads[thread]))
dump.append(''.join(traceback.format_stack(frame)))
dump.append('\n')
return ''.join(dump)
def checkAddressSanitizerError(output, router, component):
"Checks for AddressSanitizer in output. If found, then logs it and returns true, false otherwise"
addressSantizerError = re.search('(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ', output)
if addressSantizerError:
sys.stderr.write("%s: %s triggered an exception by AddressSanitizer\n" % (router, component))
# Sanitizer Error found in log
pidMark = addressSantizerError.group(1)
addressSantizerLog = re.search('%s(.*)%s' % (pidMark, pidMark), output, re.DOTALL)
if addressSantizerLog:
callingTest = os.path.basename(sys._current_frames().values()[0].f_back.f_back.f_globals['__file__'])
callingProc = sys._getframe(2).f_code.co_name
with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile:
sys.stderr.write('\n'.join(addressSantizerLog.group(1).splitlines()) + '\n')
addrSanFile.write("## Error: %s\n\n" % addressSantizerError.group(2))
addrSanFile.write("### AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" % (callingTest, callingProc, router))
addrSanFile.write(' '+ '\n '.join(addressSantizerLog.group(1).splitlines()) + '\n')
addrSanFile.write("\n---------------\n")
return True
return False
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def run(self):
while True:
with self.lock:
if self._stop is True:
return
print("\n============= THREAD FRAMES: ================")
for id, frame in sys._current_frames().items():
if id == threading.current_thread().ident:
continue
print("<< thread %d >>" % id)
traceback.print_stack(frame)
print("===============================================\n")
time.sleep(self.interval)
def run(self):
while True:
with self.lock:
if self._stop is True:
return
print("\n============= THREAD FRAMES: ================")
for id, frame in sys._current_frames().items():
if id == threading.current_thread().ident:
continue
print("<< thread %d >>" % id)
traceback.print_stack(frame)
print("===============================================\n")
time.sleep(self.interval)
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print(threadId)
traceback.print_stack(stack)
print()
def delete(self):
self.logger.info("Threads: %s" % sys._current_frames())
print sys._current_frames()
try:
return jsonify({'Slaves': storperf.terminate_workloads()})
except Exception as e:
abort(400, str(e))
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def test_current_frames(self):
have_threads = True
try:
import _thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.im_func.func_code \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self):
have_threads = True
try:
import thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.im_func.func_code \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self):
have_threads = True
try:
import thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def dumpstacks(self, data):
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
return "\r\n".join(code)
def iter_frames(self, t):
#sys._current_frames(): dictionary with thread id -> topmost frame
current_frames = sys._current_frames()
v = current_frames.get(t.ident)
if v is not None:
return [v]
return []
# IFDEF CYTHON
# def create_db_frame(self, *args, **kwargs):
# raise AssertionError('This method should not be called on cython (PyDbFrame should be used directly).')
# ELSE
# just create the db frame directly
def create_db_frame(self, args):
#the frame must be cached as a weak-ref (we return the actual db frame -- which will be kept
#alive until its trace_dispatch method is not referenced anymore).
#that's a large workaround because:
#1. we can't have weak-references to python frame object
#2. only from 2.5 onwards we have _current_frames support from the interpreter
db_frame = PyDBFrame(args)
db_frame.frame = args[-1]
self._AddDbFrame(db_frame)
return db_frame
def __str__(self):
return 'State:%s Stop:%s Cmd: %s Kill:%s Frames:%s' % (
self.pydev_state, self.pydev_step_stop, self.pydev_step_cmd, self.pydev_notify_kill, len(self.iter_frames(None)))
#=======================================================================================================================
# NOW, WE HAVE TO DEFINE WHICH THREAD INFO TO USE
# (whether we have to keep references to the frames or not)
# from version 2.5 onwards, we can use sys._current_frames to get a dict with the threads
# and frames, but to support other versions, we can't rely on that.
#=======================================================================================================================
def run(self):
time.sleep(10)
thread_id_to_name = {}
try:
for t in threading.enumerate():
thread_id_to_name[t.ident] = '%s (daemon: %s)' % (t.name, t.daemon)
except:
pass
stack_trace = [
'===============================================================================',
'pydev pyunit runner: Threads still found running after tests finished',
'================================= Thread Dump =================================']
for thread_id, stack in sys._current_frames().items():
stack_trace.append('\n-------------------------------------------------------------------------------')
stack_trace.append(" Thread %s" % thread_id_to_name.get(thread_id, thread_id))
stack_trace.append('')
if 'self' in stack.f_locals:
sys.stderr.write(str(stack.f_locals['self']) + '\n')
for filename, lineno, name, line in traceback.extract_stack(stack):
stack_trace.append(' File "%s", line %d, in %s' % (filename, lineno, name))
if line:
stack_trace.append(" %s" % (line.strip()))
stack_trace.append('\n=============================== END Thread Dump ===============================')
sys.stderr.write('\n'.join(stack_trace))
def test_getframe(self):
self.assertRaises(TypeError, sys._getframe, 42, 42)
self.assertRaises(ValueError, sys._getframe, 2000000000)
self.assertTrue(
SysModuleTest.test_getframe.__code__ \
is sys._getframe().f_code
)
# sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self):
have_threads = True
try:
import _thread
except ImportError:
have_threads = False
if have_threads:
self.current_frames_with_threads()
else:
self.current_frames_without_threads()
# Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self):
# Not much happens here: there is only one thread, with artificial
# "thread id" 0.
d = sys._current_frames()
self.assertEqual(len(d), 1)
self.assertIn(0, d)
self.assertTrue(d[0] is sys._getframe())
def dumpstacks(self):
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
for threadId, stack in sys._current_frames().items():
self.logger.debug("# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
self.logger.debug('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
self.logger.debug(" %s" % (line.strip()))
def dump_tracing_state(self, context):
""" A debug tool to dump all threads tracing state
"""
print "Dumping all threads Tracing state: (%s)" % context
print " self.tracing_enabled=%s" % self.tracing_enabled
print " self.execution_started=%s" % self.execution_started
print " self.frame_beginning=%s" % self.frame_beginning
print " self.debugger_thread_ident=%s" % self.debugger_thread_ident
for thr in threading.enumerate():
is_current_thread = thr.ident == threading.current_thread().ident
print " Thread: %s, %s %s" % (thr.name, thr.ident, "<= Current*" if is_current_thread else '')
a_frame = sys._current_frames()[thr.ident]
while a_frame:
flags = []
if a_frame == self.frame_beginning:
flags.append("beginning")
if a_frame == inspect.currentframe():
flags.append("current")
if flags:
flags_str = "**"+",".join(flags)
else:
flags_str = ""
print " => %s, %s:%s(%s) | %s %s" % (a_frame,
a_frame.f_code.co_filename,
a_frame.f_lineno,
a_frame.f_code.co_name,
a_frame.f_trace,
flags_str)
a_frame = a_frame.f_back
def stackdump(sig, frm):
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
print "\n".join(code)
def stackdump(sig, frm):
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
print "\n".join(code)
def threads_handler():
frames = sys._current_frames()
text = ['%d threads found\n\n' % len(frames)]
for i, frame in frames.items():
s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
text.append(s)
return { 'Content-Type': 'text/plain' }, ''.join(text)
def threads_handler():
frames = sys._current_frames()
text = ['%d threads found\n\n' % len(frames)]
for i, frame in frames.items():
s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
text.append(s)
return { 'Content-Type': 'text/plain' }, ''.join(text)