我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.getsignal()。
def start_service(self): """ start speaker training service. """ # prevent signal from propagating to child process handler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) if self.debug: self.sprecog.debug = True mp.log_to_stderr(logging.DEBUG) self.sprecog.speaker_name = self.speaker_name self.proc = mp.Process(name="watchdog", target=self.__run, args=(self.event,)) self.proc.setDaemon = False self.proc.start() # restore signal signal.signal(signal.SIGINT, handler)
def __call__(self, *args, **kwargs): """Trap ``SIGTERM`` and call wrapped function.""" self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def bug1(): # BUG: # sys.stdin.readline() in `ask` dies with IOError and # errno=EINTR when the terminal gets resized after curses has # been de-initialized. # 1: SIGWINCH is sent by the terminal when the screen has been # resized. # 2: curses forgot to restore the signal handling of SIGWINCH # to the default of ignoring the signal. # NOTE: signal.getsignal(signal.SIGWINCH) incorrectly # returns signal.SIG_DFL. (Default is to be ignored.) # 3: Python fails to handle EINTR when reading from stdin. # REFERENCES: # Issue 3949: https://bugs.python.org/issue3949 # PEP 0457: https://www.python.org/dev/peps/pep-0475/ def handler(foo, bar): print("Resized") signal.signal(signal.SIGWINCH, handler) while True: sys.stdin.readline()
def __call__(self, *args, **kwargs): self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def testInterruptCaught(self): default_handler = signal.getsignal(signal.SIGINT) result = unittest.TestResult() unittest.installHandler() unittest.registerResult(result) self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.breakCaught)
def testSecondInterrupt(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") result = unittest.TestResult() unittest.installHandler() unittest.registerResult(result) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) os.kill(pid, signal.SIGINT) self.fail("Second KeyboardInterrupt not raised") try: test(result) except KeyboardInterrupt: pass else: self.fail("Second KeyboardInterrupt not raised") self.assertTrue(result.breakCaught)
def testTwoResults(self): unittest.installHandler() result = unittest.TestResult() unittest.registerResult(result) new_handler = signal.getsignal(signal.SIGINT) result2 = unittest.TestResult() unittest.registerResult(result2) self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) result3 = unittest.TestResult() def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.shouldStop) self.assertTrue(result2.shouldStop) self.assertFalse(result3.shouldStop)
def _handleSignals(self): """Install the signal handlers for the Twisted event loop.""" try: import signal except ImportError: log.msg("Warning: signal module unavailable -- not installing signal handlers.") return if signal.getsignal(signal.SIGINT) == signal.default_int_handler: # only handle if there isn't already a handler, e.g. for Pdb. signal.signal(signal.SIGINT, self.sigInt) signal.signal(signal.SIGTERM, self.sigTerm) # Catch Ctrl-Break in windows if hasattr(signal, "SIGBREAK"): signal.signal(signal.SIGBREAK, self.sigBreak) if platformType == 'posix': signal.signal(signal.SIGCHLD, self._handleSigchld)
def testInterruptCaught(self): default_handler = signal.getsignal(signal.SIGINT) result = unittest2.TestResult() unittest2.installHandler() unittest2.registerResult(result) self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.breakCaught)
def testSecondInterrupt(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") result = unittest2.TestResult() unittest2.installHandler() unittest2.registerResult(result) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) os.kill(pid, signal.SIGINT) self.fail("Second KeyboardInterrupt not raised") try: test(result) except KeyboardInterrupt: pass else: self.fail("Second KeyboardInterrupt not raised") self.assertTrue(result.breakCaught)
def testTwoResults(self): unittest2.installHandler() result = unittest2.TestResult() unittest2.registerResult(result) new_handler = signal.getsignal(signal.SIGINT) result2 = unittest2.TestResult() unittest2.registerResult(result2) self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) result3 = unittest2.TestResult() def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.shouldStop) self.assertTrue(result2.shouldStop) self.assertFalse(result3.shouldStop)
def test_context_manager_with_signal(self): init_signals = get_signals(self.signals) with signal_receiver(self.signals) as signals_received: with self.handler: should_be_42 = 42 send_signal(self.signals[0]) should_be_42 *= 10 # check execution stoped when the signal was sent self.assertEqual(42, should_be_42) # assert signals were caught self.assertEqual([self.signals[0]], signals_received) # assert the error handling function was just called once self.init_func.assert_called_once_with(*self.init_args, **self.init_kwargs) for signum in self.signals: self.assertEqual(init_signals[signum], signal.getsignal(signum))
def signals_to_exception(signals=(signal.SIGINT, signal.SIGTERM)): """Context manager that makes sure that converts system signals to exceptions. This allows e.g. for a graceful exit after receiving SIGTERM (e.g. through `kill` on UNIX systems). Example: >>> with signals_to_exception(): >>> try: >>> # do something >>> except SystemExit: >>> # graceful exit even upon receiving interrupt signal """ def signal_to_exception(sig, frame): raise SystemExit("received interrupt signal {}".format(sig)) old_signals = {} for s in signals: old_signals[s] = signal.getsignal(s) signal.signal(s, signal_to_exception) try: yield finally: for s in signals: signal.signal(s, old_signals[s])
def test_catch_signals(): print = lambda *args: None orig = signal.getsignal(signal.SIGILL) print(orig) with catch_signals([signal.SIGILL]) as queue: # Raise it a few times, to exercise signal coalescing, both at the # call_soon level and at the SignalQueue level signal_raise(signal.SIGILL) signal_raise(signal.SIGILL) await _core.wait_all_tasks_blocked() signal_raise(signal.SIGILL) await _core.wait_all_tasks_blocked() async for batch in queue: # pragma: no branch assert batch == {signal.SIGILL} break signal_raise(signal.SIGILL) async for batch in queue: # pragma: no branch assert batch == {signal.SIGILL} break with pytest.raises(RuntimeError): await queue.__anext__() assert signal.getsignal(signal.SIGILL) is orig
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints): if (threading.current_thread() != threading.main_thread() or signal.getsignal(signal.SIGINT) != signal.default_int_handler): yield return def handler(signum, frame): assert signum == signal.SIGINT protection_enabled = ki_protection_enabled(frame) if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: deliver_cb() else: raise KeyboardInterrupt signal.signal(signal.SIGINT, handler) try: yield finally: if signal.getsignal(signal.SIGINT) is handler: signal.signal(signal.SIGINT, signal.default_int_handler)
def getsignal(signalnum): """ Exactly the same as :func:`signal.signal` except where :const:`signal.SIGCHLD` is concerned. For :const:`signal.SIGCHLD`, this cooperates with :func:`signal` to provide consistent answers. """ if signalnum != _signal.SIGCHLD: return _signal_getsignal(signalnum) global _child_handler if _child_handler is _INITIAL: _child_handler = _signal_getsignal(_signal.SIGCHLD) return _child_handler
def enable_logging(log_level): # type: (typing.Union[None, int]) -> None """Configure the root logger and a logfile handler. Args: log_level: The logging level to set the logger handler. """ root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) logfile_handler = logging.StreamHandler(_LOGFILE_STREAM) logfile_handler.setLevel(logging.DEBUG) logfile_handler.setFormatter(logging.Formatter( '%(levelname)s [%(asctime)s][%(name)s] %(message)s')) root_logger.addHandler(logfile_handler) if signal.getsignal(signal.SIGTERM) == signal.SIG_DFL: signal.signal(signal.SIGTERM, _logfile_sigterm_handler) if log_level: handler = logging.StreamHandler() handler.setFormatter(_LogColorFormatter()) root_logger.setLevel(log_level) root_logger.addHandler(handler)
def testHandlerReplacedButCalled(self): # If our handler has been replaced (is no longer installed) but is # called by the *new* handler, then it isn't safe to delay the # SIGINT and we should immediately delegate to the default handler unittest.installHandler() handler = signal.getsignal(signal.SIGINT) def new_handler(frame, signum): handler(frame, signum) signal.signal(signal.SIGINT, new_handler) try: pid = os.getpid() os.kill(pid, signal.SIGINT) except KeyboardInterrupt: pass else: self.fail("replaced but delegated handler doesn't raise interrupt")