我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用logging.handlers.append()。
def create_destinations(self): # type: () -> List[AbstractDestination] destinations = [] # type: List[AbstractDestination] if self._config.get('flush_stdout'): destinations.append(Stdout()) if self._config.get('flush_graphite'): for graphite_address in self._config['flush_graphite'].split(','): graphite_address = graphite_address.strip().split(':') graphite_host = graphite_address.pop(0).strip() graphite_port = graphite_address and int(graphite_address.pop( )) or 2003 destinations.append(Graphite(graphite_host, graphite_port)) if self._config.get('flush_file'): for file_path in self._config['flush_file'].split('|'): destinations.append(TextFile(file_path)) if self._config.get('flush_file_csv'): for file_path in self._config['flush_file_csv'].split('|'): destinations.append(CsvFile(file_path)) return destinations
def get_addresses_with_unique_ports(addresses): # type: (str) -> List[Tuple[str, int]] address_tuples = [tuple(address.strip().split(':')) for address in addresses.split(',')] result = [] # type: List[Tuple[str, int]] ports = set() # type: Set[int] for address in address_tuples: host = address[0] if len(address) > 1 and address[1]: port_str = address[1] port = int(port_str) if port < 1 or port > 65535: raise ValueError( "Port {} is out of range".format(port_str)) if port in ports: raise ValueError( "Port {} is already specified before".format(port_str)) else: port = DEFAULT_PORT result.append((host, port)) ports.add(port) return result
def init(): date_format = config.logging.date_format level = log_level(config.logging.level) handlers = [] # Ensure each process has its own unique log file # to prevent file write conflicts # TODO implement global log server with unique uuid per process handlers.append(file_handler(date_format)) handlers.append(console_handler(date_format)) if config.logging.system.active: handlers.append(system_file_handler(date_format)) logging.basicConfig(handlers=handlers, level=level, datefmt=date_format) logging.info('Initialized logging')
def init_logger(self) -> None: try: log_path = QStandardPaths.writableLocation(QStandardPaths.AppConfigLocation) except AttributeError: if sys.platform == 'win32': log_path = os.path.join(QDir.homePath(), 'AppData', 'Local', qApp.applicationName().lower()) elif sys.platform == 'darwin': log_path = os.path.join(QDir.homePath(), 'Library', 'Preferences', qApp.applicationName().lower()) else: log_path = os.path.join(QDir.homePath(), '.config', qApp.applicationName().lower()) os.makedirs(log_path, exist_ok=True) self.console = ConsoleWidget(self) self.consoleLogger = ConsoleHandler(self.console) handlers = [logging.handlers.RotatingFileHandler(os.path.join(log_path, '%s.log' % qApp.applicationName().lower()), maxBytes=1000000, backupCount=1), self.consoleLogger] if self.parser.isSet(self.debug_option) or self.verboseLogs: # noinspection PyTypeChecker handlers.append(logging.StreamHandler()) logging.setLoggerClass(VideoLogger) logging.basicConfig(handlers=handlers, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M', level=logging.INFO) logging.captureWarnings(capture=True) sys.excepthook = MainWindow.log_uncaught_exceptions
def setup_logging(self, job, daemon=False, verbose=False): #log_folder = '%s/%s' % (JOB_LOGS_DIRECTORY, job) #file_path = '%s/%s' % (MIN_DATA_LOG, self.today) if os.path.exists(JOB_LOGS_DIRECTORY) is False: os.makedirs(JOB_LOGS_DIRECTORY) log_filename = '%s/%s.log' % (JOB_LOGS_DIRECTORY, job) logger = logging.getLogger() if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) handlers = [] if daemon: handlers.append(logging.handlers.TimedRotatingFileHandler(filename=log_filename, when='midnight')) else: handlers.append(logging.FileHandler(filename='%s.%s' % (log_filename, time.strftime('%Y-%m-%d')))) handlers.append(logging.StreamHandler()) for handler in handlers: if verbose: handler.setLevel(logging.DEBUG) else: handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) logger.addHandler(handler)
def _create_socket_servers(self, addresses, socket_type): # type: (str, int) -> List[SocketServer] collectors = [] socket_addresses = self.get_addresses_with_unique_ports( addresses) for (host, port) in socket_addresses: socket_server = self._configure_socket_server( SocketServer(), host=host, port=port ) socket_server.socket_type = socket_type collectors.append(socket_server) return collectors
def _close_logger(self): # type: () -> None if self.logger: handlers = [] for handler in self.logger.handlers: if hasattr(handler, 'close') and callable(handler.close): handler.close() handlers.append(handler) map(self.logger.removeHandler, handlers) self.logger = None
def _extract_handlers(handlers_dict): handlers = [] if not handlers_dict: raise ConfigurationError('no handlers are defined for logger') for filename, handler_config in handlers_dict.items(): if not isinstance(handler_config, collections.abc.Mapping): raise ConfigurationError( 'handler %s is not a dictionary' % filename ) level = handler_config.get('level', 'debug').lower() fmt = handler_config.get('format', '%(message)s') datefmt = handler_config.get('datefmt', '%FT%T') append = handler_config.get('append', False) timestamp = handler_config.get('timestamp', None) if filename == '&1': hdlr = StreamHandler(stream=sys.stdout) elif filename == '&2': hdlr = StreamHandler(stream=sys.stderr) else: if timestamp: basename, ext = os.path.splitext(filename) filename = '%s_%s%s' % ( basename, datetime.now().strftime(timestamp), ext ) hdlr = RotatingFileHandler(filename, mode='a+' if append else 'w+') hdlr.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt)) hdlr.setLevel(level) handlers.append(hdlr) return handlers