我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用logging.handlers.QueueListener()。
def __init__(self, pool_names, max_restarts=0, options=None): self.names = pool_names self.queue = multiprocessing.Queue() self.pool = dict() self.max_restarts = max_restarts self.options = options or dict() self.dog_path = os.curdir self.dog_handler = LiveReload(self) # self.dog_observer = Observer() # self.dog_observer.schedule(self.dog_handler, self.dog_path, recursive=True) if multiprocessing.get_start_method() != 'fork': # pragma: no cover root_logger = logging.getLogger() self.log_listener = QueueListener(self.queue, *root_logger.handlers) # TODO: Find out how to get the watchdog + livereload working on a later moment. # self.dog_observer.start() self._restarts = dict()
def logger_init(dirpath=None): # Adapted from http://stackoverflow.com/a/34964369/164864 logging_queue = multiprocessing.Queue() # this is the handler for all log records filepath = "{}-{}.log".format( 'pandarus-worker', datetime.datetime.now().strftime("%d-%B-%Y-%I-%M%p") ) if dirpath is not None: filepath = os.path.join(dirpath, filepath) handler = logging.FileHandler( filepath, encoding='utf-8', ) handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(lineno)d %(message)s")) # queue_listener gets records from the queue and sends them to the handler queue_listener = QueueListener(logging_queue, handler) queue_listener.start() logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(handler) return queue_listener, logging_queue
def setup_logging_queues(): if sys.version_info.major < 3: raise RuntimeError("This feature requires Python 3.") queue_listeners = [] # Q: What about loggers created after this is called? # A: if they don't attach their own handlers they should be fine for logger in get_all_logger_names(include_root=True): logger = logging.getLogger(logger) if logger.handlers: log_queue = queue.Queue(-1) # No limit on size queue_handler = QueueHandler(log_queue) queue_listener = QueueListener( log_queue, respect_handler_level=True) queuify_logger(logger, queue_handler, queue_listener) # print("Replaced logger %s with queue listener: %s" % ( # logger, queue_listener # )) queue_listeners.append(queue_listener) for listener in queue_listeners: listener.start() atexit.register(stop_queue_listeners, *queue_listeners) return
def queuify_logger(logger, queue_handler, queue_listener): """Replace logger's handlers with a queue handler while adding existing handlers to a queue listener. This is useful when you want to use a default logging config but then optionally add a logger's handlers to a queue during runtime. Args: logger (mixed): Logger instance or string name of logger to queue-ify handlers. queue_handler (QueueHandler): Instance of a ``QueueHandler``. queue_listener (QueueListener): Instance of a ``QueueListener``. """ if isinstance(logger, str): logger = logging.getLogger(logger) # Get handlers that aren't being listened for. handlers = [handler for handler in logger.handlers if handler not in queue_listener.handlers] if handlers: # The default QueueListener stores handlers as a tuple. queue_listener.handlers = \ tuple(list(queue_listener.handlers) + handlers) # Remove logger's handlers and replace with single queue handler. del logger.handlers[:] logger.addHandler(queue_handler)
def start(self, timeout=None): """ uses multiprocess Process to call _wrapper_func in subprocess """ if self.is_alive(): log.warning("a process with pid %s is already running", self._proc.pid) return self._run.value = True self._func_running.value = False name = self.__class__.__name__ self.conn_recv, self.conn_send = mp.Pipe(False) self._monitor_thread = threading.Thread(target = self._monitor_stdout_pipe) self._monitor_thread.daemon=True self._monitor_thread.start() log.debug("started monitor thread") self._log_queue = mp.Queue() self._log_queue_listener = QueueListener(self._log_queue, *log.handlers) self._log_queue_listener.start() args = (self.func, self.args, self._run, self._pause, self.interval, self._sigint, self._sigterm, name, log.level, self.conn_send, self._func_running, self._log_queue) self._proc = mp.Process(target = _loop_wrapper_func, args = args) self._proc.start() log.info("started a new process with pid %s", self._proc.pid) log.debug("wait for loop function to come up") t0 = time.time() while not self._func_running.value: if self._proc.exitcode is not None: exc = self._proc.exitcode self._proc = None if exc == 0: log.warning("wrapper function already terminated with exitcode 0\nloop is not running") return else: raise LoopExceptionError("the loop function return non zero exticode ({})!\n".format(exc)+ "see log (INFO level) for traceback information") time.sleep(0.1) if (timeout is not None) and ((time.time() - t0) > timeout): err_msg = "could not bring up function on time (timeout: {}s)".format(timeout) log.error(err_msg) log.info("either it takes too long to spawn the subprocess (increase the timeout)\n"+ "or an internal error occurred before reaching the function call") raise LoopTimeoutError(err_msg) log.debug("loop function is up ({})".format(humanize_time(time.time()-t0)))