我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.SIG_DFL。
def __call__(self, *args, **kwargs): """Trap ``SIGTERM`` and call wrapped function.""" self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def register_signal(signal_number, handler_func, once = False): prev_handler = None def _handler(signum, frame): skip_prev = handler_func(signum, frame) if not skip_prev: if callable(prev_handler): if once: signal.signal(signum, prev_handler) prev_handler(signum, frame) elif prev_handler == signal.SIG_DFL and once: signal.signal(signum, signal.SIG_DFL) os.kill(os.getpid(), signum) prev_handler = signal.signal(signal_number, _handler)
def test_register_signal(self): if runtime_info.OS_WIN: return result = {'handler': 0} def _handler(signum, frame): result['handler'] += 1 register_signal(signal.SIGUSR1, _handler) os.kill(os.getpid(), signal.SIGUSR1) os.kill(os.getpid(), signal.SIGUSR1) signal.signal(signal.SIGUSR1, signal.SIG_DFL) self.assertEqual(result['handler'], 2)
def __call__(self, *args, **kwargs): self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def __init__(self, default_handler): self.called = False self.original_handler = default_handler if isinstance(default_handler, int): if default_handler == signal.SIG_DFL: # Pretend it's signal.default_int_handler instead. default_handler = signal.default_int_handler elif default_handler == signal.SIG_IGN: # Not quite the same thing as SIG_IGN, but the closest we # can make it: do nothing. def default_handler(unused_signum, unused_frame): pass else: raise TypeError("expected SIGINT signal handler to be " "signal.SIG_IGN, signal.SIG_DFL, or a " "callable object") self.default_handler = default_handler
def closed(self): global old log.msg('closed %s' % self) log.msg(repr(self.conn.channels)) if not options['nocache']: # fork into the background if os.fork(): if old: fd = sys.stdin.fileno() tty.tcsetattr(fd, tty.TCSANOW, old) if (options['command'] and options['tty']) or \ not options['notty']: signal.signal(signal.SIGWINCH, signal.SIG_DFL) os._exit(0) os.setsid() for i in range(3): try: os.close(i) except OSError, e: import errno if e.errno != errno.EBADF: raise
def force_kill_all_children(): """ Iterate through all known child processes and force kill them. In the future we might consider possibly giving the child processes time to exit but this is fine for now. If someone force kills us and does not clean the process tree this will leave child processes around unless they choose to end themselves if their parent process dies. """ # First uninstall the SIGCHLD handler so that we don't get called again. signal.signal(signal.SIGCHLD, signal.SIG_DFL) global pid_list for pid in pid_list: print ("force killing PID={}".format(pid)) try: os.kill(pid, signal.SIGKILL) except: print ("error force killing PID={} continuing".format(pid)) pid_list = []
def init_signals(self): # reset signaling [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS] # init new signaling signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_quit) signal.signal(signal.SIGWINCH, self.handle_winch) signal.signal(signal.SIGUSR1, self.handle_usr1) signal.signal(signal.SIGABRT, self.handle_abort) # Don't let SIGTERM and SIGUSR1 disturb active requests # by interrupting system calls if hasattr(signal, 'siginterrupt'): # python >= 2.6 signal.siginterrupt(signal.SIGTERM, False) signal.siginterrupt(signal.SIGUSR1, False)
def __init__(self, master=False): # Setup signal fd, this allows signal to behave correctly if os.name == 'posix': self.signal_pipe_r, self.signal_pipe_w = os.pipe() self._set_nonblock(self.signal_pipe_r) self._set_nonblock(self.signal_pipe_w) signal.set_wakeup_fd(self.signal_pipe_w) self._signals_received = collections.deque() signal.signal(signal.SIGINT, signal.SIG_DFL) if os.name == 'posix': signal.signal(signal.SIGCHLD, signal.SIG_DFL) signal.signal(signal.SIGTERM, self._signal_catcher) signal.signal(signal.SIGALRM, self._signal_catcher) signal.signal(signal.SIGHUP, self._signal_catcher) else: # currently a noop on window... signal.signal(signal.SIGTERM, self._signal_catcher) # FIXME(sileht): should allow to catch signal CTRL_BREAK_EVENT, # but we to create the child process with CREATE_NEW_PROCESS_GROUP # to make this work, so current this is a noop for later fix signal.signal(signal.SIGBREAK, self._signal_catcher)
def enable_logging(log_level): # type: (typing.Union[None, int]) -> None """Configure the root logger and a logfile handler. Args: log_level: The logging level to set the logger handler. """ root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) logfile_handler = logging.StreamHandler(_LOGFILE_STREAM) logfile_handler.setLevel(logging.DEBUG) logfile_handler.setFormatter(logging.Formatter( '%(levelname)s [%(asctime)s][%(name)s] %(message)s')) root_logger.addHandler(logfile_handler) if signal.getsignal(signal.SIGTERM) == signal.SIG_DFL: signal.signal(signal.SIGTERM, _logfile_sigterm_handler) if log_level: handler = logging.StreamHandler() handler.setFormatter(_LogColorFormatter()) root_logger.setLevel(log_level) root_logger.addHandler(handler)
def manage(): """Manage directory tags.""" atexit.register(close_stdio) signal.signal(signal.SIGPIPE, signal.SIG_DFL) args = sys.argv[1:] if not args: _list() head, tail = args[0], args[1:] if head == '--help': finish(USAGE + DESCRIPTION) elif head == '--version': finish('Version ' + VERSION) elif head == 'edit': _edit(tail) elif head == 'clean': _clean(tail) elif head == 'list': _list(tail, reverse=False) elif head == 'reverse': _list(tail, reverse=True) elif head == 'commands': _commands(tail) else: abort(USAGE + 'dtags: invalid argument: ' + style.bad(head))
def init_signals(self): # reset signaling [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS] # init new signaling signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_quit) signal.signal(signal.SIGWINCH, self.handle_winch) signal.signal(signal.SIGUSR1, self.handle_usr1) signal.signal(signal.SIGABRT, self.handle_abort) # Don't let SIGTERM and SIGUSR1 disturb active requests # by interrupting system calls if hasattr(signal, 'siginterrupt'): # python >= 2.6 signal.siginterrupt(signal.SIGTERM, False) signal.siginterrupt(signal.SIGUSR1, False) if hasattr(signal, 'set_wakeup_fd'): signal.set_wakeup_fd(self.PIPE[1])
def __init__(self): # Applet icon global indicator indicator = appindicator.Indicator.new(APPINDICATOR_ID, self.exchange_app.dogecoin.icon, appindicator.IndicatorCategory.SYSTEM_SERVICES) indicator.set_status(appindicator.IndicatorStatus.ACTIVE) indicator.set_label('€ *.****', '100%') # Set the menu of the indicator (default: gtk.Menu()) indicator.set_menu(self.build_menu()) # Ctrl-C behaviour signal.signal(signal.SIGINT, signal.SIG_DFL) # Setup the refresh every 5 minutes gobject.timeout_add(1000*60*5, self.exchange_app.update_price, "timeout") # First price update within 1 second gobject.timeout_add(1000, self.exchange_app.first_update_price, "first_update") # Main loop gtk.main()
def test_signal_handler(self, mock_signal, mock_os): # pylint: disable=protected-access mock_signal.getsignal.return_value = signal.SIG_DFL self.handler.set_signal_handlers() signal_handler = self.handler._signal_handler for signum in self.signals: mock_signal.signal.assert_any_call(signum, signal_handler) signum = self.signals[0] signal_handler(signum, None) self.init_func.assert_called_once_with(*self.init_args, **self.init_kwargs) mock_os.kill.assert_called_once_with(mock_os.getpid(), signum) self.handler.reset_signal_handlers() for signum in self.signals: mock_signal.signal.assert_any_call(signum, signal.SIG_DFL)
def safe_call(function, *args, **kwargs): """ If we're not in the main process, sets the default signal handler (`signal.SIG_DFL`) for the ``SIGINT`` signal before calling *function* using the given *args* and *kwargs*. Otherwise *function* will just be called and returned normally. The point being to prevent loads of unnecessary tracebacks from being printed to stdout when the user executes a :kbd:`Ctrl-C` on a running gateone.py process. .. note:: This function is only meant to be used to wrap calls made inside of `MultiprocessRunner` instances. """ if os.getpid() != PID: signal.signal(signal.SIGINT, signal.SIG_DFL) return function(*args, **kwargs)
def main(self): self.indicator = appindicator.Indicator.new(self.APPINDICATOR_ID, self.ICON_OFF, appindicator.IndicatorCategory.SYSTEM_SERVICES) self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE) self.indicator.set_menu(self.build_menu()) # This sets the handler for “INT” signal processing #- the one issued by the OS when “Ctrl+C” is typed. #The handler we assign to it is the “default” handler, which, #in case of the interrupt signal, is to stop execution. signal.signal(signal.SIGINT, signal.SIG_DFL) #listen to quit signal notify.init(self.APPINDICATOR_ID) self.update() glib.timeout_add_seconds(self.UPDATE_FREQUENCY, self.update) gtk.main()
def process(self): """ Run the interactive shell after displaying the software version information @raise SystemExit """ self._display_banner() while 1: try: self.cmdloop() except kbd_exception.CtrlD: signal.signal(signal.SIGINT, signal.SIG_DFL) return except kbd_exception.CtrlC: # cancel the current line and get a new one pass
def do_startup(self): Gtk.Application.do_startup(self) signal.signal(signal.SIGINT, signal.SIG_DFL) action = Gio.SimpleAction.new('about', None) action.connect('activate', self.on_about) self.add_action(action) action = Gio.SimpleAction.new('quit', None) action.connect('activate', self.on_quit) self.add_action(action) self.add_accelerator('<Primary>q', 'app.quit') app_menu = Gio.Menu.new() app_menu.append(_('About'), 'app.about') app_menu.append(_('Quit'), 'app.quit') self.set_app_menu(app_menu)
def set_blocking_signal_threshold(self, seconds, action): """Sends a signal if the ioloop is blocked for more than s seconds. Pass seconds=None to disable. Requires python 2.6 on a unixy platform. The action parameter is a python signal handler. Read the documentation for the python 'signal' module for more information. If action is None, the process will be killed if it is blocked for too long. """ if not hasattr(signal, "setitimer"): logging.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL)