我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用signal.NSIG。
def get_ctypes_strsignal(): """Strategy 1: If the C library exposes strsignal(), use it.""" import signal import ctypes import ctypes.util libc = ctypes.CDLL(ctypes.util.find_library("c")) strsignal_proto = ctypes.CFUNCTYPE(ctypes.c_char_p, ctypes.c_int) strsignal_c = strsignal_proto(("strsignal", libc), ((1,),)) NSIG = signal.NSIG def strsignal_ctypes_wrapper(signo): # The behavior of the C library strsignal() is unspecified if # called with an out-of-range argument. Range-check on entry # _and_ NULL-check on exit. if 0 <= signo < NSIG: s = strsignal_c(signo) if s: return s.decode("utf-8") return "Unknown signal "+str(signo) return strsignal_ctypes_wrapper
def test_add_signal_handler_coroutine_error(self, m_signal): m_signal.NSIG = signal.NSIG @asyncio.coroutine def simple_coroutine(): yield from [] # callback must not be a coroutine function coro_func = simple_coroutine coro_obj = coro_func() self.addCleanup(coro_obj.close) for func in (coro_func, coro_obj): self.assertRaisesRegex( TypeError, 'coroutines cannot be used with add_signal_handler', self.loop.add_signal_handler, signal.SIGINT, func)
def test_add_signal_handler_install_error(self, m_signal): m_signal.NSIG = signal.NSIG def set_wakeup_fd(fd): if fd == -1: raise ValueError() m_signal.set_wakeup_fd = set_wakeup_fd class Err(OSError): errno = errno.EFAULT m_signal.signal.side_effect = Err self.assertRaises( Err, self.loop.add_signal_handler, signal.SIGINT, lambda: True)
def _serve(self): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: with self._listener.accept() as conn: msg = conn.recv() if msg is None: break key, destination_pid = msg send, close = self._cache.pop(key) try: send(conn, destination_pid) finally: close() except: if not util.is_exiting(): sys.excepthook(*sys.exc_info())
def get_reversemap_strsignal(): """Strategy 2: return the name of the signal constant corresponding to the value passed in.""" import signal signames = [None] * signal.NSIG for constant in dir(signal): if not constant.startswith("SIG"): continue if constant.startswith("SIG_"): continue # obsolete names for signals if constant in ('SIGIOT', 'SIGCLD', 'SIGPOLL'): continue signames[getattr(signal, constant)] = constant for rt in range(signal.SIGRTMIN+1, signal.SIGRTMAX): signames[rt] = "SIGRTMIN+"+str(rt - signal.SIGRTMIN) for gap in range(len(signames)): if signames[gap] is None: signames[gap] = "SIG_"+str(gap) NSIG = signal.NSIG def strsignal_reversemap_wrapper(signo): if 0 <= signo < NSIG: return signames[signo] else: return "Unknown signal "+str(signo) return strsignal_reversemap_wrapper
def _signal_number_to_name(signum): """ Return a name for the signal number, or None if no name can be found. >>> _signal_number_to_name(signal.SIGINT) 'SIGINT' >>> _signal_number_to_name(signal.NSIG + 1) """ names = _signums.get(signum, []) if not names: return None return "/".join(names)
def _check_signal(self, sig): """Internal helper to validate a signal. Raise ValueError if the signal number is invalid or uncatchable. Raise RuntimeError if there is a problem setting up the handler. """ if not isinstance(sig, int): raise TypeError('sig must be an int, not {!r}'.format(sig)) if not (1 <= sig < signal.NSIG): raise ValueError( 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
def test_check_signal(self): self.assertRaises( TypeError, self.loop._check_signal, '1') self.assertRaises( ValueError, self.loop._check_signal, signal.NSIG + 1)
def test_handle_signal_no_handler(self): self.loop._handle_signal(signal.NSIG + 1)
def test_handle_signal_cancelled_handler(self): h = asyncio.Handle(mock.Mock(), (), loop=mock.Mock()) h.cancel() self.loop._signal_handlers[signal.NSIG + 1] = h self.loop.remove_signal_handler = mock.Mock() self.loop._handle_signal(signal.NSIG + 1) self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
def test_add_signal_handler(self, m_signal): m_signal.NSIG = signal.NSIG cb = lambda: True self.loop.add_signal_handler(signal.SIGHUP, cb) h = self.loop._signal_handlers.get(signal.SIGHUP) self.assertIsInstance(h, asyncio.Handle) self.assertEqual(h._callback, cb)
def test_add_signal_handler_install_error2(self, m_logging, m_signal): m_signal.NSIG = signal.NSIG class Err(OSError): errno = errno.EINVAL m_signal.signal.side_effect = Err self.loop._signal_handlers[signal.SIGHUP] = lambda: True self.assertRaises( RuntimeError, self.loop.add_signal_handler, signal.SIGINT, lambda: True) self.assertFalse(m_logging.info.called) self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
def test_add_signal_handler_install_error3(self, m_logging, m_signal): class Err(OSError): errno = errno.EINVAL m_signal.signal.side_effect = Err m_signal.NSIG = signal.NSIG self.assertRaises( RuntimeError, self.loop.add_signal_handler, signal.SIGINT, lambda: True) self.assertFalse(m_logging.info.called) self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
def test_remove_signal_handler_2(self, m_signal): m_signal.NSIG = signal.NSIG m_signal.SIGINT = signal.SIGINT self.loop.add_signal_handler(signal.SIGINT, lambda: True) self.loop._signal_handlers[signal.SIGHUP] = object() m_signal.set_wakeup_fd.reset_mock() self.assertTrue( self.loop.remove_signal_handler(signal.SIGINT)) self.assertFalse(m_signal.set_wakeup_fd.called) self.assertTrue(m_signal.signal.called) self.assertEqual( (signal.SIGINT, m_signal.default_int_handler), m_signal.signal.call_args[0])
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) m_signal.set_wakeup_fd.side_effect = ValueError self.loop.remove_signal_handler(signal.SIGHUP) self.assertTrue(m_logging.info)
def test_remove_signal_handler_error(self, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) m_signal.signal.side_effect = OSError self.assertRaises( OSError, self.loop.remove_signal_handler, signal.SIGHUP)
def test_remove_signal_handler_error2(self, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) class Err(OSError): errno = errno.EINVAL m_signal.signal.side_effect = Err self.assertRaises( RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
def test_close(self, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) self.assertEqual(len(self.loop._signal_handlers), 2) m_signal.set_wakeup_fd.reset_mock() self.loop.close() self.assertEqual(len(self.loop._signal_handlers), 0) m_signal.set_wakeup_fd.assert_called_once_with(-1)
def _reset_signal_handlers(): for s in _signals_to_reset: # On FreeBSD, the numerical value of SIGRT* is larger than NSIG # from signal.h (which is a bug in my opinion). Do not change # action for these signals. This prevents a ValueError raised # in the signal module. if s < signal.NSIG: signal.signal(s, signal.SIG_DFL)
def __init__(self, loop, signalnum, ref=True, priority=None): if signalnum < 1 or signalnum >= signalmodule.NSIG: raise ValueError('illegal signal number: %r' % signalnum) # still possible to crash on one of libev's asserts: # 1) "libev: ev_signal_start called with illegal signal number" # EV_NSIG might be different from signal.NSIG on some platforms # 2) "libev: a signal must not be attached to two different loops" # we probably could check that in LIBEV_EMBED mode, but not in general watcher.__init__(self, loop, ref=ref, priority=priority, args=(signalnum, ))
def test_add_signal_handler_setup_error(self, m_signal): m_signal.NSIG = signal.NSIG m_signal.set_wakeup_fd.side_effect = ValueError self.assertRaises( RuntimeError, self.loop.add_signal_handler, signal.SIGINT, lambda: True)
def test_remove_signal_handler(self, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) self.assertTrue( self.loop.remove_signal_handler(signal.SIGHUP)) self.assertTrue(m_signal.set_wakeup_fd.called) self.assertTrue(m_signal.signal.called) self.assertEqual( (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
def strsignal_ctypes_wrapper(signo): # The behavior of the C library strsignal() is unspecified if # called with an out-of-range argument. Range-check on entry # _and_ NULL-check on exit. if 0 <= signo < NSIG: s = strsignal_c(signo) if s: return s.decode("utf-8") return "Unknown signal %d" % signo
def _resetSignalDisposition(self): # The Python interpreter ignores some signals, and our child # process will inherit that behaviour. To have a child process # that responds to signals normally, we need to reset our # child process's signal handling (just) after we fork and # before we execvpe. for signalnum in xrange(1, signal.NSIG): if signal.getsignal(signalnum) == signal.SIG_IGN: # Reset signal handling to the default signal.signal(signalnum, signal.SIG_DFL)