我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.signal()。
def test_worker_alarm(manager): called = [] def handler(signal, frame): called.append(True) signal.signal(signal.SIGALRM, handler) @manager.task def foo(sleep): time.sleep(sleep) w = Worker(manager, task_timeout=1) w.process_one(make_task('foo', args=(0.1,))) assert not called w.process_one(make_task('foo', args=(1.1,))) assert called
def timeout(timeout_seconds): def decorate(function): message = "Timeout (%s sec) elapsed for test %s" % (timeout_seconds, function.__name__) def handler(signum, frame): raise TimeoutError(message) def new_f(*args, **kwargs): old = signal.signal(signal.SIGALRM, handler) signal.alarm(timeout_seconds) try: function_result = function(*args, **kwargs) finally: signal.signal(signal.SIGALRM, old) signal.alarm(0) return function_result new_f.func_name = function.func_name return new_f return decorate
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator
def __call__(self, *args, **kwargs): """Trap ``SIGTERM`` and call wrapped function.""" self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def process(self, queue_list, burst=False): # pragma: no cover signal.signal(signal.SIGALRM, self.alarm_handler) run = RunFlag() start = time() while run: task = self.manager.pop(queue_list, 1) if task: try: self.process_one(task) except StopWorker: break elif burst: break if self.lifetime and time() - start > self.lifetime: break self.manager.close()
def test(): import signal t = AsyncFactory('inet:127.0.0.1:5000' , MilterProtocol) def sigHandler(num , frame): t.close() sys.exit(0) signal.signal(signal.SIGINT , sigHandler) t.run() # }}}
def main(): # Read the configuration file config_file = os.path.join(rumps.application_support(APP_TITLE), 'config.yaml') with open(config_file, 'r') as conf: try: config = yaml.safe_load(conf) except yaml.YAMLError as exc: print(exc) return # Setup our CTRL+C signal handler signal.signal(signal.SIGINT, _signal_handler) # Enable debug mode rumps.debug_mode(config['debug']) # Startup application SlackStatusBarApp(config).run()
def run(self, handler): import asyncio from aiohttp.wsgi import WSGIServerHttpProtocol self.loop = self.get_event_loop() asyncio.set_event_loop(self.loop) protocol_factory = lambda: WSGIServerHttpProtocol( handler, readpayload=True, debug=(not self.quiet)) self.loop.run_until_complete(self.loop.create_server(protocol_factory, self.host, self.port)) if 'BOTTLE_CHILD' in os.environ: import signal signal.signal(signal.SIGINT, lambda s, f: self.loop.stop()) try: self.loop.run_forever() except KeyboardInterrupt: self.loop.stop()
def __init__(self, address): self.log = logging.getLogger(self.server_logger) self.socket = None if ":" in address[0]: self.address_family = socket.AF_INET6 else: self.address_family = socket.AF_INET self.log.debug("Listening on %s", address) super(_SpoonMixIn, self).__init__(address, self.handler_klass, bind_and_activate=False) self.load_config() self._setup_socket() # Finally, set signals if self.signal_reload is not None: signal.signal(self.signal_reload, self.reload_handler) if self.signal_shutdown is not None: signal.signal(self.signal_shutdown, self.shutdown_handler)
def stop(self, failed=False, shutting_down=False): ''' Stops a job if running. If the optional "failed" argument is true, outputs will not be set as available. ''' if self.is_running: with self._lock: if not self.is_running: # another thread could have changed the status return failed = bool(failed) DEFAULT_LOGGER.info("Stopping job failed = %r", bool(failed)) if shutting_down: DEFAULT_LOGGER.warning("Stopping job as part of atexit/signal handler exit") try: _finish_job(self.conn, self.inputs, self.outputs, self.identifier, failed=failed) finally: self.last_refreshed = None self.auto_refresh = None LOCKED.discard(self) AUTO_REFRESH.discard(self)
def handle(self, *args, **kwargs): logger.info("%s - starting jobs schedule" % (__name__)) try: ''' schedule.every().hour.do(job_update_entities) schedule.every().hour.do(job_update_clients) schedule.every().hour.do(job_update_checks) schedule.every().hour.do(job_update_trends) #schedule.every(10).minutes.do(job_update_events) ''' schedule.every(settings.CACHE_ENTITY_TTL).seconds.do(job_update_entities) schedule.every(settings.CACHE_CLIENT_TTL).seconds.do(job_update_clients) schedule.every(settings.CACHE_TRENDS_TTL).seconds.do(job_update_trends) while True: schedule.run_pending() sleep(1) except KeyboardInterrupt: logger.info("%s - user signal exit!" % (__name__)) exit(0)
def per_process_init(): # type: () -> None try: os.nice(19) except AttributeError: # nice is not available everywhere. pass except OSError: # When this program is already running on the nicest level (20) on OS X # it is not permitted to change the priority. pass # A keyboard interrupt disrupts the communication between a # Python script and its subprocesses when using multiprocessing. # The child can ignore SIGINT and is properly shut down # by a pool.terminate() call in case of a keyboard interrupt # or an early generator exit. signal.signal(signal.SIGINT, signal.SIG_IGN)
def setup(self): if self.agent.get_option('cpu_profiler_disabled'): return if runtime_info.OS_WIN: self.agent.log('CPU profiler is not available on Windows.') return def _sample(signum, signal_frame): if self.handler_active: return self.handler_active = True with self.profile_lock: try: self.process_sample(signal_frame) signal_frame = None except Exception: self.agent.exception() self.handler_active = False self.prev_signal_handler = signal.signal(signal.SIGPROF, _sample) self.setup_done = True
def register_signal(signal_number, handler_func, once = False): prev_handler = None def _handler(signum, frame): skip_prev = handler_func(signum, frame) if not skip_prev: if callable(prev_handler): if once: signal.signal(signum, prev_handler) prev_handler(signum, frame) elif prev_handler == signal.SIG_DFL and once: signal.signal(signum, signal.SIG_DFL) os.kill(os.getpid(), signum) prev_handler = signal.signal(signal_number, _handler)
def test_register_signal(self): if runtime_info.OS_WIN: return result = {'handler': 0} def _handler(signum, frame): result['handler'] += 1 register_signal(signal.SIGUSR1, _handler) os.kill(os.getpid(), signal.SIGUSR1) os.kill(os.getpid(), signal.SIGUSR1) signal.signal(signal.SIGUSR1, signal.SIG_DFL) self.assertEqual(result['handler'], 2)
def set_debug(self, state): "Turn debugging on or off, remembering the last-set level" if state == self.forced_debug: self.debug("Debug signal ignored; already %sdebugging", "" if state else "not ") if state: self.level(DEBUG, save=False) self.debug("Debug started") else: self.debug("Debug discontinued") self.level(self.last_level) self.forced_debug = state self.__update_env()
def __exit_handler(signum, frame): """ Let the worker go. This doesn't do much of anything because it's called from inside a signal handler. """ #print "EH START" with this.lock: exit_barrier = this.exit_barrier if exit_barrier is not None: # Meet up with the worker this.exit_barrier.wait() #print "EH FIRST BARRIER" # Wait for the worker to be done this.finish_barrier.wait() #print "EH HANDLER FINISHED" #print "EH DONE" sys.exit(0)
def signal_handler(signum, frame): """ Signal handler invoked when registered signals are triggered :param signum: The signal number :param frame: The frame """ del signum, frame global running, run_condition with run_condition: if running: # Stop the application running = False run_condition.notify() else: exit(1) # Signals to register for
def bug1(): # BUG: # sys.stdin.readline() in `ask` dies with IOError and # errno=EINTR when the terminal gets resized after curses has # been de-initialized. # 1: SIGWINCH is sent by the terminal when the screen has been # resized. # 2: curses forgot to restore the signal handling of SIGWINCH # to the default of ignoring the signal. # NOTE: signal.getsignal(signal.SIGWINCH) incorrectly # returns signal.SIG_DFL. (Default is to be ignored.) # 3: Python fails to handle EINTR when reading from stdin. # REFERENCES: # Issue 3949: https://bugs.python.org/issue3949 # PEP 0457: https://www.python.org/dev/peps/pep-0475/ def handler(foo, bar): print("Resized") signal.signal(signal.SIGWINCH, handler) while True: sys.stdin.readline()
def loop(self): # signal.signal(signal.SIGINT, sigint_handler) #?????windows python2.4???,??freebsd??? signal.signal(signal.SIGHUP, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) while True: self.depths = self.update_depths() # print(self.depths) self.tickers() self.tick() time.sleep(config.refresh_rate) if is_sigint_up: # ?????????? self.terminate() print ("Exit") break
def __call__(self, *args, **kwargs): self._caught_signal = None # Register handler for SIGTERM, then call `self.func` self.old_signal_handler = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.signal_handler) self.func(*args, **kwargs) # Restore old signal handler signal.signal(signal.SIGTERM, self.old_signal_handler) # Handle any signal caught during execution if self._caught_signal is not None: signum, frame = self._caught_signal if callable(self.old_signal_handler): self.old_signal_handler(signum, frame) elif self.old_signal_handler == signal.SIG_DFL: sys.exit(0)
def web_server(wiki, port, debug=False): def kill_handler(signal_number, stack_frame): logger.info('\nStopping wiki') sys.exit(1) signal.signal(signal.SIGINT, kill_handler) logger.info('Starting wiki on port {}. Ctrl+C will kill it.'.format(port)) HTTPServer(WSGIContainer(wiki)).listen(port) ioloop = IOLoop.instance() if debug: autoreload.start(ioloop) ioloop.start()
def run_with_reloader(main_func, extra_files=None, interval=1, reloader_type='auto'): """Run the given function in an independent python interpreter.""" import signal reloader = reloader_loops[reloader_type](extra_files, interval) signal.signal(signal.SIGTERM, lambda *args: sys.exit(0)) try: if os.environ.get('WERKZEUG_RUN_MAIN') == 'true': t = threading.Thread(target=main_func, args=()) t.setDaemon(True) t.start() reloader.run() else: sys.exit(reloader.restart_with_reloader()) except KeyboardInterrupt: pass
def test_all_exit_sigs_with_sig(self): # Same as above but the process is terminated by a signal # instead of exiting cleanly. for sig in TEST_SIGNALS: ret = pyrun(textwrap.dedent( """ import functools, os, signal, imp mod = imp.load_source("mod", r"{modname}") def foo(s): with open(r"{testfn}", "ab") as f: f.write(s) signal.signal({sig}, functools.partial(foo, b'0')) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) os.kill(os.getpid(), {sig}) """.format(modname=os.path.abspath(__file__), testfn=TESTFN, sig=sig) )) self.assertEqual(ret, sig) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"210") safe_remove(TESTFN)
def __init__(self, patterns, pattern_file=None, name="jack-matchmaker", connect_interval=3.0, connect_maxattempts=0): self.patterns = [] self.pattern_file = pattern_file self.connect_maxattempts = connect_maxattempts self.connect_interval = connect_interval if self.pattern_file: self.add_patterns_from_file(self.pattern_file) if not sys.platform.startswith('win'): signal.signal(signal.SIGHUP, self.reread_pattern_file) else: log.warning("Signal handling not supported on Windows. jack-matchmaker must be " "restarted to re-read the pattern file.") for pair in patterns: self.add_patterns(*pair) self.queue = queue.Queue() self.client = None
def __call__(self, *args, **keyArgs): # If we have SIGALRM signal, use it to cause an exception if and # when this function runs too long. Otherwise check the time taken # after the method has returned, and throw an exception then. if hasattr(signal, 'SIGALRM'): old = signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.timeout) try: result = self.function(*args, **keyArgs) finally: signal.signal(signal.SIGALRM, old) signal.alarm(0) else: startTime = time.time() result = self.function(*args, **keyArgs) timeElapsed = time.time() - startTime if timeElapsed >= self.timeout: self.handle_timeout(None, None) return result
def exit_gracefully(signum, frame): global original_sigint # restore the original signal handler as otherwise evil things will happen # in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant signal.signal(signal.SIGINT, original_sigint) try: if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'): sys.exit(1) except KeyboardInterrupt: print("Ok ok, quitting") sys.exit(1) # restore the exit gracefully handler here signal.signal(signal.SIGINT, exit_gracefully)
def initialize(cls, io_loop=None): """Initializes the ``SIGCHLD`` handler. The signal handler is run on an `.IOLoop` to avoid locking issues. Note that the `.IOLoop` used for signal handling need not be the same one used by individual Subprocess objects (as long as the ``IOLoops`` are each running in separate threads). .. versionchanged:: 4.1 The ``io_loop`` argument is deprecated. """ if cls._initialized: return if io_loop is None: io_loop = ioloop.IOLoop.current() cls._old_sigchld = signal.signal( signal.SIGCHLD, lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup)) cls._initialized = True
def set_exit_callback(self, callback): """Runs ``callback`` when this process exits. The callback takes one argument, the return code of the process. This method uses a ``SIGCHLD`` handler, which is a global setting and may conflict if you have other libraries trying to handle the same signal. If you are using more than one ``IOLoop`` it may be necessary to call `Subprocess.initialize` first to designate one ``IOLoop`` to run the signal handlers. In many cases a close callback on the stdout or stderr streams can be used as an alternative to an exit callback if the signal handler is causing a problem. """ self._exit_callback = stack_context.wrap(callback) Subprocess.initialize(self.io_loop) Subprocess._waiting[self.pid] = self Subprocess._try_cleanup_process(self.pid)
def start(self): try: # Register to catch stop requests signal.signal(signal.SIGTERM, self.signal_handler) main() except BaseException: self.LOG.exception("Monasca Transform service " "encountered fatal error. " "Shutting down all threads and exiting") shutdown_all_threads_and_die()
def run(): # REVISIT(ivc): current CNI implementation provided by this package is # experimental and its primary purpose is to enable development of other # components (e.g. functional tests, service/LBaaSv2 support) cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin)) args = ['--config-file', cni_conf.kuryr_conf] try: if cni_conf.debug: args.append('-d') except AttributeError: pass config.init(args) config.setup_logging() # Initialize o.vo registry. k_objects.register_locally_defined_vifs() os_vif.initialize() if CONF.cni_daemon.daemon_enabled: runner = cni_api.CNIDaemonizedRunner() else: runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin()) LOG.info("Using '%s' ", runner.__class__.__name__) def _timeout(signum, frame): runner._write_dict(sys.stdout, { 'msg': 'timeout', 'code': k_const.CNI_TIMEOUT_CODE, }) LOG.debug('timed out') sys.exit(1) signal.signal(signal.SIGALRM, _timeout) signal.alarm(_CNI_TIMEOUT) status = runner.run(os.environ, cni_conf, sys.stdout) LOG.debug("Exiting with status %s", status) if status: sys.exit(status)
def signal_handler(signum, frame): pass # prevent exiting #signal.signal(signal.SIGTSTP, signal_handler) # Disable Ctrl-Z #signal.signal(signal.SIGINT, signal_handler) # Disable Ctrl-C
def __init__(self, cmd): self.cmd = cmd # ????????? ??subprocess?????? # ??????????????????????? # SIGCHLD??????????? SIG_DFL??????? signal.signal(signal.SIGCHLD, signal.SIG_DFL)
def process_one(self, task): timeout = task.get('timeout', self.task_timeout) if timeout: signal.alarm(timeout) self.current_task = task self.manager.process(task) if timeout: signal.alarm(0)
def __init__(self): self._flag = True signal.signal(signal.SIGINT, self.handler) signal.signal(signal.SIGTERM, self.handler)
def handler(self, signal, frame): self.stop()
def main(): import signal rospy.init_node('uwb_multi_range_node') u = UWBMultiRange() def sigint_handler(sig, _): if sig == signal.SIGINT: u.stop() signal.signal(signal.SIGINT, sigint_handler) try: u.exec_() except (rospy.ROSInterruptException, select.error): rospy.logwarn("Interrupted... Stopping.")
def main(): import signal rospy.init_node('uwb_tracker_node') u = UWBTracker() def sigint_handler(sig, _): if sig == signal.SIGINT: u.stop() signal.signal(signal.SIGINT, sigint_handler) try: u.exec_() except (rospy.ROSInterruptException, select.error): rospy.logwarn("Interrupted... Stopping.")
def serve_forever(): listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listen_socket.bind(SERVER_ADDRESS) listen_socket.listen(REQUEST_QUEUE_SIZE) print('Serving HTTP on port {port} ...'.format(port=PORT)) signal.signal(signal.SIGTSTP, grim_reaper) while True: try: client_connection, client_address = listen_socket.accept() except IOError as e: code, msg = e.args # restart 'accept' if it was interrupted if code == errno.EINTR: continue else: raise pid = os.fork() if pid == 0: # child listen_socket.close() # close child copy handle_request(client_connection) client_connection.close() os._exit(0) else: # parent client_connection.close() # close parent copy and loop over
def __init__(self, *args, **kwargs): super(SigIntMixin, self).__init__(*args, **kwargs) signal(SIGINT, self._sigint_handler)
def __init__(self, *args, **kwargs): """ Save the original SIGINT handler for later. """ super(InterruptibleMixin, self).__init__(*args, **kwargs) self.original_handler = signal(SIGINT, self.handle_sigint) # If signal() returns None, the previous handler was not installed from # Python, and we cannot restore it. This probably should not happen, # but if it does, we must restore something sensible instead, at least. # The least bad option should be Python's default SIGINT handler, which # just raises KeyboardInterrupt. if self.original_handler is None: self.original_handler = default_int_handler
def finish(self): """ Restore the original SIGINT handler after finishing. This should happen regardless of whether the progress display finishes normally, or gets interrupted. """ super(InterruptibleMixin, self).finish() signal(SIGINT, self.original_handler)
def fn_run_milter(): """When the script runs, this is the primary entry point. It generates a listener for sendmail """ # We can set our milter opts here opts = lm.SMFIF_CHGFROM | lm.SMFIF_ADDRCPT | lm.SMFIF_CHGBODY | lm.SMFIF_CHGFROM # We initialize the factory we want to use (you can choose from an # AsyncFactory, ForkFactory or ThreadFactory. You must use the # appropriate mixin classes for your milter for Thread and Fork) f = lm.ForkFactory('inet:127.0.0.1:5000', VRMilter, opts) def sigHandler(num, frame): f.close() sys.exit(0) signal.signal(signal.SIGINT, sigHandler) try: # run it f.run() except Exception as e: f.close() glog('EXCEPTION OCCURED: ' + str(e)) sys.exit(3) #======================================================================= # Helper Functions #=======================================================================
def set_exit_handler(func): signal.signal(signal.SIGTERM, func)
def signal_handler(signal, frame): sys.exit()