Python signal 模块,getsignal() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.getsignal()

项目:Piwho    作者:Adirockzz95    | 项目源码 | 文件源码
def start_service(self):
        """
        start speaker training service.
        """
        # prevent signal from propagating to child process
        handler = signal.getsignal(signal.SIGINT)
        signal.signal(signal.SIGINT, signal.SIG_IGN)

        if self.debug:
            self.sprecog.debug = True
            mp.log_to_stderr(logging.DEBUG)

        self.sprecog.speaker_name = self.speaker_name
        self.proc = mp.Process(name="watchdog", target=self.__run,
                               args=(self.event,))
        self.proc.setDaemon = False
        self.proc.start()
        # restore signal
        signal.signal(signal.SIGINT, handler)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:anonymine    作者:oskar-skog    | 项目源码 | 文件源码
def bug1():
    # BUG:
    #       sys.stdin.readline() in `ask` dies with IOError and
    #       errno=EINTR when the terminal gets resized after curses has
    #       been de-initialized.
    #       1: SIGWINCH is sent by the terminal when the screen has been
    #           resized.
    #       2: curses forgot to restore the signal handling of SIGWINCH
    #           to the default of ignoring the signal.
    #           NOTE: signal.getsignal(signal.SIGWINCH) incorrectly
    #               returns signal.SIG_DFL. (Default is to be ignored.)
    #       3: Python fails to handle EINTR when reading from stdin.
    # REFERENCES:
    #       Issue 3949: https://bugs.python.org/issue3949
    #       PEP 0457: https://www.python.org/dev/peps/pep-0475/
    def handler(foo, bar):
        print("Resized")
    signal.signal(signal.SIGWINCH, handler)
    while True:
        sys.stdin.readline()
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alphy    作者:maximepeschard    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testSecondInterrupt(self):
        # Can't use skipIf decorator because the signal handler may have
        # been changed after defining this method.
        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
            self.skipTest("test requires SIGINT to not be ignored")
        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")

        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest.installHandler()

        result = unittest.TestResult()
        unittest.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest.TestResult()
        unittest.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _handleSignals(self):
        """Install the signal handlers for the Twisted event loop."""
        try:
            import signal
        except ImportError:
            log.msg("Warning: signal module unavailable -- not installing signal handlers.")
            return

        if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
            # only handle if there isn't already a handler, e.g. for Pdb.
            signal.signal(signal.SIGINT, self.sigInt)
        signal.signal(signal.SIGTERM, self.sigTerm)

        # Catch Ctrl-Break in windows
        if hasattr(signal, "SIGBREAK"):
            signal.signal(signal.SIGBREAK, self.sigBreak)

        if platformType == 'posix':
            signal.signal(signal.SIGCHLD, self._handleSigchld)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest2.TestResult()
        unittest2.installHandler()
        unittest2.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def testSecondInterrupt(self):
        # Can't use skipIf decorator because the signal handler may have
        # been changed after defining this method.
        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
            self.skipTest("test requires SIGINT to not be ignored")
        result = unittest2.TestResult()
        unittest2.installHandler()
        unittest2.registerResult(result)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")

        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest2.installHandler()

        result = unittest2.TestResult()
        unittest2.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest2.TestResult()
        unittest2.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest2.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:GoToMeetingTools    作者:plongitudes    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:behelper    作者:istommao    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-workflows    作者:arthurhammer    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-zebra    作者:r0x73    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_context_manager_with_signal(self):
        init_signals = get_signals(self.signals)
        with signal_receiver(self.signals) as signals_received:
            with self.handler:
                should_be_42 = 42
                send_signal(self.signals[0])
                should_be_42 *= 10

        # check execution stoped when the signal was sent
        self.assertEqual(42, should_be_42)
        # assert signals were caught
        self.assertEqual([self.signals[0]], signals_received)
        # assert the error handling function was just called once
        self.init_func.assert_called_once_with(*self.init_args,
                                               **self.init_kwargs)
        for signum in self.signals:
            self.assertEqual(init_signals[signum], signal.getsignal(signum))
项目:veros    作者:dionhaefner    | 项目源码 | 文件源码
def signals_to_exception(signals=(signal.SIGINT, signal.SIGTERM)):
    """Context manager that makes sure that converts system signals to exceptions.

    This allows e.g. for a graceful exit after receiving SIGTERM (e.g. through
    `kill` on UNIX systems).

    Example:
       >>> with signals_to_exception():
       >>>     try:
       >>>         # do something
       >>>     except SystemExit:
       >>>         # graceful exit even upon receiving interrupt signal
    """
    def signal_to_exception(sig, frame):
        raise SystemExit("received interrupt signal {}".format(sig))
    old_signals = {}
    for s in signals:
        old_signals[s] = signal.getsignal(s)
        signal.signal(s, signal_to_exception)
    try:
        yield
    finally:
        for s in signals:
            signal.signal(s, old_signals[s])
项目:alfred-bear    作者:chrisbro    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-confluence    作者:skleinei    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred_reviewboard    作者:lydian    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:MathKernelToggle    作者:LingyuanJi    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-moeha    作者:moeHa    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:simplingua-workflow    作者:bydmm    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-baidu-translate    作者:ketor    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_catch_signals():
    print = lambda *args: None
    orig = signal.getsignal(signal.SIGILL)
    print(orig)
    with catch_signals([signal.SIGILL]) as queue:
        # Raise it a few times, to exercise signal coalescing, both at the
        # call_soon level and at the SignalQueue level
        signal_raise(signal.SIGILL)
        signal_raise(signal.SIGILL)
        await _core.wait_all_tasks_blocked()
        signal_raise(signal.SIGILL)
        await _core.wait_all_tasks_blocked()
        async for batch in queue:  # pragma: no branch
            assert batch == {signal.SIGILL}
            break
        signal_raise(signal.SIGILL)
        async for batch in queue:  # pragma: no branch
            assert batch == {signal.SIGILL}
            break
    with pytest.raises(RuntimeError):
        await queue.__anext__()
    assert signal.getsignal(signal.SIGILL) is orig
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
    if (threading.current_thread() != threading.main_thread()
            or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
        yield
        return

    def handler(signum, frame):
        assert signum == signal.SIGINT
        protection_enabled = ki_protection_enabled(frame)
        if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
            deliver_cb()
        else:
            raise KeyboardInterrupt

    signal.signal(signal.SIGINT, handler)
    try:
        yield
    finally:
        if signal.getsignal(signal.SIGINT) is handler:
            signal.signal(signal.SIGINT, signal.default_int_handler)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def getsignal(signalnum):
    """
    Exactly the same as :func:`signal.signal` except where
    :const:`signal.SIGCHLD` is concerned.

    For :const:`signal.SIGCHLD`, this cooperates with :func:`signal`
    to provide consistent answers.
    """
    if signalnum != _signal.SIGCHLD:
        return _signal_getsignal(signalnum)

    global _child_handler
    if _child_handler is _INITIAL:
        _child_handler = _signal_getsignal(_signal.SIGCHLD)

    return _child_handler
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def enable_logging(log_level):
    # type: (typing.Union[None, int]) -> None
    """Configure the root logger and a logfile handler.

    Args:
        log_level: The logging level to set the logger handler.
    """
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    logfile_handler = logging.StreamHandler(_LOGFILE_STREAM)
    logfile_handler.setLevel(logging.DEBUG)
    logfile_handler.setFormatter(logging.Formatter(
        '%(levelname)s [%(asctime)s][%(name)s] %(message)s'))
    root_logger.addHandler(logfile_handler)
    if signal.getsignal(signal.SIGTERM) == signal.SIG_DFL:
        signal.signal(signal.SIGTERM, _logfile_sigterm_handler)
    if log_level:
        handler = logging.StreamHandler()
        handler.setFormatter(_LogColorFormatter())
        root_logger.setLevel(log_level)
        root_logger.addHandler(handler)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest.installHandler()

        result = unittest.TestResult()
        unittest.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest.TestResult()
        unittest.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testHandlerReplacedButCalled(self):
        # If our handler has been replaced (is no longer installed) but is
        # called by the *new* handler, then it isn't safe to delay the
        # SIGINT and we should immediately delegate to the default handler
        unittest.installHandler()

        handler = signal.getsignal(signal.SIGINT)
        def new_handler(frame, signum):
            handler(frame, signum)
        signal.signal(signal.SIGINT, new_handler)

        try:
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("replaced but delegated handler doesn't raise interrupt")
项目:pomodoro-alfred    作者:ecbrodie    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:jisho-alfred-workflow    作者:janclarin    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:Alfred-Workflow    作者:stidio    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testSecondInterrupt(self):
        # Can't use skipIf decorator because the signal handler may have
        # been changed after defining this method.
        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
            self.skipTest("test requires SIGINT to not be ignored")
        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")

        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest.installHandler()

        result = unittest.TestResult()
        unittest.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest.TestResult()
        unittest.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testSecondInterrupt(self):
        # Can't use skipIf decorator because the signal handler may have
        # been changed after defining this method.
        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
            self.skipTest("test requires SIGINT to not be ignored")
        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")

        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest.installHandler()

        result = unittest.TestResult()
        unittest.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest.TestResult()
        unittest.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)

        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testSecondInterrupt(self):
        # Can't use skipIf decorator because the signal handler may have
        # been changed after defining this method.
        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
            self.skipTest("test requires SIGINT to not be ignored")
        result = unittest.TestResult()
        unittest.installHandler()
        unittest.registerResult(result)

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")

        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testTwoResults(self):
        unittest.installHandler()

        result = unittest.TestResult()
        unittest.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)

        result2 = unittest.TestResult()
        unittest.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)

        result3 = unittest.TestResult()

        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)

        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")

        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _handleSignals(self):
        """Install the signal handlers for the Twisted event loop."""
        try:
            import signal
        except ImportError:
            log.msg("Warning: signal module unavailable -- not installing signal handlers.")
            return

        if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
            # only handle if there isn't already a handler, e.g. for Pdb.
            signal.signal(signal.SIGINT, self.sigInt)
        signal.signal(signal.SIGTERM, self.sigTerm)

        # Catch Ctrl-Break in windows
        if hasattr(signal, "SIGBREAK"):
            signal.signal(signal.SIGBREAK, self.sigBreak)

        if platformType == 'posix':
            signal.signal(signal.SIGCHLD, self._handleSigchld)
项目:alfred-workflow-stockprice    作者:sungminoh    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)