我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.setitimer()。
def __init__( self, callback, timer=TIMER, interval=INTERVAL_SECONDS, timer_signal=TIMER_SIGNAL): assert callable(callback), 'callback must be callable' signal.signal(timer_signal, self.callback) signal.setitimer(timer, interval, interval) oldtimer, oldaction = None, None # cheating for now self.oldaction = oldaction self.oldtimer = oldtimer self._callback = callback
def execution_timeout(timeout): def timed_out(signum, sigframe): raise TimeoutError delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0) old_hdl = signal.signal(signal.SIGALRM, timed_out) now = time.time() try: yield finally: # inner timeout must be smaller, or the timer event will be delayed if delay: elapsed = time.time() - now delay = max(delay - elapsed, 0.000001) else: delay = 0 signal.setitimer(signal.ITIMER_REAL, delay, interval) signal.signal(signal.SIGALRM, old_hdl)
def sig_vtalrm(self, *args): self.hndl_called = True if self.hndl_count > 3: # it shouldn't be here, because it should have been disabled. raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " "timer.") elif self.hndl_count == 3: # disable ITIMER_VIRTUAL, this function shouldn't be called anymore signal.setitimer(signal.ITIMER_VIRTUAL, 0) if support.verbose: print("last SIGVTALRM handler call") self.hndl_count += 1 if support.verbose: print("SIGVTALRM handler invoked", args)
def test_itimer_prof(self): self.itimer = signal.ITIMER_PROF signal.signal(signal.SIGPROF, self.sig_prof) signal.setitimer(self.itimer, 0.2, 0.2) start_time = time.time() while time.time() - start_time < 60.0: # do some work _ = pow(12345, 67890, 10000019) if signal.getitimer(self.itimer) == (0.0, 0.0): break # sig_prof handler stopped this itimer else: # Issue 8424 self.skipTest("timeout: likely cause: machine too slow or load too " "high") # profiling itimer should be (0.0, 0.0) now self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) # and the handler should have been called self.assertEqual(self.hndl_called, True)
def sig_vtalrm(self, *args): self.hndl_called = True if self.hndl_count > 3: # it shouldn't be here, because it should have been disabled. raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " "timer.") elif self.hndl_count == 3: # disable ITIMER_VIRTUAL, this function shouldn't be called anymore signal.setitimer(signal.ITIMER_VIRTUAL, 0) if test_support.verbose: print("last SIGVTALRM handler call") self.hndl_count += 1 if test_support.verbose: print("SIGVTALRM handler invoked", args)
def set_blocking_signal_threshold(self, seconds, action): """Sends a signal if the ioloop is blocked for more than s seconds. Pass seconds=None to disable. Requires python 2.6 on a unixy platform. The action parameter is a python signal handler. Read the documentation for the python 'signal' module for more information. If action is None, the process will be killed if it is blocked for too long. """ if not hasattr(signal, "setitimer"): logging.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL)
def destroy(self): if not self.setup_done: return signal.setitimer(signal.ITIMER_REAL, 0) signal.signal(signal.SIGALRM, self.prev_signal_handler)
def record(self, duration): self.agent.log('Activating block profiler.') signal.setitimer(signal.ITIMER_REAL, self.SAMPLING_RATE, self.SAMPLING_RATE) time.sleep(duration) signal.setitimer(signal.ITIMER_REAL, 0) self.agent.log('Deactivating block profiler.') self.profile_duration += duration self.agent.log('Block profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
def destroy(self): if not self.setup_done: return signal.setitimer(signal.ITIMER_PROF, 0) signal.signal(signal.SIGPROF, self.prev_signal_handler)
def record(self, duration): if self.agent.config.is_profiling_disabled(): return self.agent.log('Activating CPU profiler.') signal.setitimer(signal.ITIMER_PROF, self.SAMPLING_RATE, self.SAMPLING_RATE) time.sleep(duration) signal.setitimer(signal.ITIMER_PROF, 0) self.agent.log('Deactivating CPU profiler.') self.profile_duration += duration self.agent.log('CPU profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL)
def alarm(self, t=None): """start a timer to fire only once like signal.alarm, but with better resolution than integer seconds. """ if t is None: t = self.signal_delay self.timer_fired = False self.orig_handler = signal.signal(signal.SIGALRM, self.stop_timer) # signal_period ignored, since only one timer event is allowed to fire signal.setitimer(signal.ITIMER_REAL, t, 1000)
def stop_timer(self, *args): self.timer_fired = True signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, self.orig_handler)
def test_signal_handling_while_selecting(self): # Test with a signal actually arriving during a select() call. caught = 0 def my_handler(): nonlocal caught caught += 1 self.loop.stop() self.loop.add_signal_handler(signal.SIGALRM, my_handler) signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. self.loop.run_forever() self.assertEqual(caught, 1)
def test_signal_handling_args(self): some_args = (42,) caught = 0 def my_handler(*args): nonlocal caught caught += 1 self.assertEqual(args, some_args) self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. self.loop.call_later(0.5, self.loop.stop) self.loop.run_forever() self.assertEqual(caught, 1)
def set_timeout(): def make_timeout(sec, callback): def _callback(signum, frame): signal.alarm(0) callback() signal.signal(signal.SIGALRM, _callback) signal.setitimer(signal.ITIMER_REAL, sec) yield make_timeout
def stop(self): self._callback = None if self.oldaction and callable(self.oldaction): signal.signal(TIMER_SIGNAL, self.oldaction) signal.setitimer( TIMER_SIGNAL, self.oldtimer[0], self.oldtimer[1], ) else: signal.signal(TIMER_SIGNAL, signal.SIG_IGN)
def test_exit_code_signal(self): self.mktmp("import signal, time\n" "signal.setitimer(signal.ITIMER_REAL, 0.1)\n" "time.sleep(1)\n") self.system("%s %s" % (sys.executable, self.fname)) self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
def tearDown(self): signal.signal(signal.SIGALRM, self.old_alarm) if self.itimer is not None: # test_itimer_exc doesn't change this attr # just ensure that itimer is stopped signal.setitimer(self.itimer, 0)
def sig_prof(self, *args): self.hndl_called = True signal.setitimer(signal.ITIMER_PROF, 0) if support.verbose: print("SIGPROF handler invoked", args)
def test_itimer_exc(self): # XXX I'm assuming -1 is an invalid itimer, but maybe some platform # defines it ? self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) # Negative times are treated as zero on some platforms. if 0: self.assertRaises(signal.ItimerError, signal.setitimer, signal.ITIMER_REAL, -1)
def test_itimer_real(self): self.itimer = signal.ITIMER_REAL signal.setitimer(self.itimer, 1.0) if support.verbose: print("\ncall pause()...") signal.pause() self.assertEqual(self.hndl_called, True) # Issue 3864, unknown if this affects earlier versions of freebsd also
def sig_prof(self, *args): self.hndl_called = True signal.setitimer(signal.ITIMER_PROF, 0) if test_support.verbose: print("SIGPROF handler invoked", args)
def test_itimer_real(self): self.itimer = signal.ITIMER_REAL signal.setitimer(self.itimer, 1.0) if test_support.verbose: print("\ncall pause()...") signal.pause() self.assertEqual(self.hndl_called, True) # Issue 3864. Unknown if this affects earlier versions of freebsd also.
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.setitimer(signal.ITIMER_REAL,seconds) #used timer instead of alarm try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator