Python logging 模块,NOTSET 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.NOTSET。
def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def run(self, argv = None, data = None, logger = None):
"""
Runs the function
"""
if not logger is None:
assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
self.logger = logger
if not data is None:
assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
self.data = data
self.options, self.arguments = self._argparser.parseArguments(argv)
if self._argparser.loglevel == 1:
self._configureConsoleLoggers(logging.NOTSET, True)
elif self._argparser.loglevel == -1:
self._configureConsoleLoggers(logging.CRITICAL, False)
try:
self._argparser.validateRequiredArguments()
return self._app(ApplicationData(self))
except Exception as e:
logger.exception(e)
return 1
def run(self, argv = None, data = None, logger = None):
"""
Runs the function
"""
if not logger is None:
assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
self.logger = logger
if not data is None:
assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
self.data = data
self.options, self.arguments = self._argparser.parseArguments(argv)
if self._argparser.loglevel == 1:
self._configureConsoleLoggers(logging.NOTSET, True)
elif self._argparser.loglevel == -1:
self._configureConsoleLoggers(logging.CRITICAL, False)
try:
self._argparser.validateRequiredArguments()
return self._app(ApplicationData(self))
except Exception as e:
logger.exception(e)
return 1
def test_custom_handler(self, mocker):
handler = DummyHandler()
mock = mocker.MagicMock()
handler.emit = mock
logger = make_logger()
logger.handlers = [handler]
disable(NOTSET)
logger.debug('test')
assert mock.call_count == 1
emit_call = mock.mock_calls[0]
name, args, kwargs = emit_call
assert name == ''
log_record = args[0]
assert isinstance(log_record, LogRecord)
assert log_record.msg == 'test'
assert log_record.levelname == 'DEBUG'
del logger
def level_to_int(level: Union[str, int]) -> int:
if isinstance(level, int):
if logging.NOTSET <= level <= logging.FATAL:
return level
else:
raise ValueError('Log level must be 0 <= level <= 50,'
'but gat: {}'.format(level))
elif isinstance(level, str):
try:
return getattr(logging, level.upper())
except AttributeError:
raise ValueError('Invalid log level: {}'.format(level))
else:
raise TypeError(
'Log level must be int (0 ~ 50) or string,'
'but gat type: {}'.format(type(level)))
def __init__(self, pathname, **settings):
"""initial config for singleton baka framework
:param import_name: the name of the application package
:param settings: *optional dict settings for pyramid configuration
"""
self.import_name = pathname
self.settings = settings
self.__include = {}
self.__trafaret = trafaret_yaml
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')):
formatter = logging.Formatter(logging_format)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def get(self, key, *sources, default=None, log_level=None, log_value=True):
"""
Get config value for key using default sources or provided sources. A successful get (not returning None)
will be cache value and source, subsequent get for that key will return that value regardless of other
parameters.
key -- the key for the value
sources -- custom source order for this key, if no sources the sources set by constructor or source property
will be used
default -- return this value if all sources fail, default value will be cached and logged as specified
log_level -- override log_level from constructor, makes get log key, value and source on first use,
set to logging.NOTSET to turn off logging
log_value -- set to False to prevent logging of value but still log the source for this key
"""
value, source = self.get_with_source(key, *sources, default=default, log_level=log_level, log_value=log_value)
return value
def to(cls, channel, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
level=logging.NOTSET):
"""Convenience class method to create a ZmqLoghandler and
connect to a ZMQ subscriber.
Args:
channel (string): Logging channel name. This is used to build a
ZMQ topic.
host (string): Hostname / ip address of the subscriber to publish
to.
port (int, string): Port on which to publish messages.
level (int): Logging level
"""
context = zmq.Context()
publisher = context.socket(zmq.PUB)
address = 'tcp://{}:{}'.format(host, port)
publisher.connect(address)
time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem.
return cls(channel, publisher, level=level)
def getLevel( self ):
"""
A convenience wrapper around ``getEffectiveLevel()`` because the integer values for the
various logging levels are clunky and probably don't mean anything to you.
Returns:
str: the name of the effective log level for this logging object, in lowercase
(``"warning"``, ``"info"``, etc.)
"""
level = self.getEffectiveLevel()
if level == logging.CRITICAL:
return 'critical'
elif level == logging.ERROR:
return 'error'
elif level == logging.WARNING:
return 'warning'
elif level == logging.INFO:
return 'info'
elif level == logging.DEBUG:
return 'debug'
elif level == logging.NOTSET:
return 'notset'
else:
return 'unknown ({})'.format( level )
def __init__(self, *args, **kwargs):
client = kwargs.pop('client_cls', Client)
if len(args) == 1:
arg = args[0]
args = args[1:]
if isinstance(arg, Client):
self.client = arg
else:
raise ValueError(
'The first argument to %s must be a Client instance, '
'got %r instead.' % (
self.__class__.__name__,
arg,
))
elif 'client' in kwargs:
self.client = kwargs.pop('client')
else:
self.client = client(*args, **kwargs)
logging.Handler.__init__(self, level=kwargs.get('level', logging.NOTSET))
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def _handle_existing_loggers(existing, child_loggers, disable_existing):
"""
When (re)configuring logging, handle loggers which were in the previous
configuration but are not in the new configuration. There's no point
deleting them as other threads may continue to hold references to them;
and by disabling them, you stop them doing any logging.
However, don't disable children of named loggers, as that's probably not
what was intended by the user. Also, allow existing loggers to NOT be
disabled if disable_existing is false.
"""
root = logging.root
for log in existing:
logger = root.manager.loggerDict[log]
if log in child_loggers:
logger.level = logging.NOTSET
logger.handlers = []
logger.propagate = True
elif disable_existing:
logger.disabled = True
def get_console_logger():
try:
console_logger = logging.getLogger()
console_logger.setLevel(logging.NOTSET)
# console_logger.propagate = False
# if there are two console_logger use only one.
if console_logger.handlers:
console_logger.handlers.pop()
# Set-up the logging configs
ch = logging.StreamHandler()
# Use the standard formatter constant
ch.setFormatter(FORMATTER)
# Only send stout INFO level messages
ch.setLevel(logging.INFO)
# TODO: Delete LessThanFilter if not needed in future
# ch.addFilter(LessThanFilter(logging.WARNING))
# add the handler
console_logger.addHandler(ch)
except TypeError as e:
sys.stdout.write(str("Console logger is having issues: {}\n".format(e)))
return console_logger
def get_app_logger(name=None):
try:
logger_map = {"__webbreaker__": APP_LOG}
app_logger = logging.getLogger("__webbreaker__")
app_logger.setLevel(logging.NOTSET)
# if there are two app_loggers use only one.
if app_logger.handlers:
app_logger.handlers.pop()
formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s')
fh = logging.FileHandler(logger_map[name], mode='a')
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
fh.setLevel(logging.INFO)
app_logger.addHandler(fh)
except TypeError as e:
sys.stdout.write(str("App logger error: {}!\n".format(e)))
return app_logger
def get_debug_logger(name=None):
try:
debug_logger = logging.getLogger(name)
debug_logger.setLevel(logging.NOTSET)
# if there are two debug_logger use only one.
if debug_logger.handlers:
debug_logger.handlers.pop()
debug_formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s')
fh = logging.FileHandler(DEBUG_LOG, mode='a')
fh.setFormatter(debug_formatter)
fh.setLevel(logging.DEBUG)
debug_logger.addHandler(fh)
except TypeError as e:
sys.stdout.write(str("Debug logger error: {}!\n".format(e)))
return debug_logger
# Override existing hierarchical filter logic in logger
def tune(self, src_batch, trg_batch, epochs):
self._ensure_model_loaded()
if self._tuner is None:
self._tuner = NMTEngineTrainer(self._model, self._optim, self._src_dict, self._trg_dict,
model_params=self._model_params, gpu_ids=([0] if self._using_cuda else None))
self._tuner.min_perplexity_decrement = -1.
self._tuner.set_log_level(logging.NOTSET)
self._tuner.min_epochs = self._tuner.max_epochs = epochs
# Convert words to indexes [suggestions]
tuning_src_batch, tuning_trg_batch = [], []
for source, target in zip(src_batch, trg_batch):
tuning_src_batch.append(self._src_dict.convertToIdx(source, Constants.UNK_WORD))
tuning_trg_batch.append(self._trg_dict.convertToIdx(target, Constants.UNK_WORD,
Constants.BOS_WORD, Constants.EOS_WORD))
# Prepare data for training on the tuningBatch
tuning_dataset = Dataset(tuning_src_batch, tuning_trg_batch, 32, self._using_cuda)
self._tuner.train_model(tuning_dataset, save_epochs=0)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def setLogLevel(self,level):
''' This method allows to change the default logging level'''
#if isinstance(level,basestring): level = level.upper()
if type(level)==type(logging.NOTSET):
self.log_obj.setLevel(level)
#self.debug('log.Logger: Logging level set to %s'%
#str(level).upper())
else:
l = self.getLogLevel(level)
if l is None:
self.warning('log.Logger: Logging level cannot be set to "%s"'
%level)
elif l!=self.log_obj.level:
self.log_obj.setLevel(l)
self.debug('log.Logger: Logging level set to "%s" = %s'
%(level,l))
return level
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def add_loggers(self, loggers, stdout_level=logging.NOTSET, file_level=logging.NOTSET):
"""Adds loggers for stdout/filesystem handling.
Stdout: loggers will log to stdout only when mentioned in `log` option. If they're
mentioned without explicit level, `stdout_level` will be used.
Filesystem: loggers will log to files at `file_level`.
:arg loggers: List of logger names.
:arg stdout_level: Default level at which stdout handlers will pass logs.
By default: `logging.NOTSET`, which means: pass everything.
:arg file_level: Level at which filesystem handlers will pass logs.
By default: `logging.NOTSET`, which means: pass everything.
"""
self._enabled = True
self._loggers.append((loggers, _sanitize_level(stdout_level), _sanitize_level(file_level)))
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def _resetLogging(self):
# ensure we dont attach the handlers multiple times.
if self.logging_initialized:
return
self.logging_initialized = True
with self.uid_manager:
util.mkdirIfAbsent(self.resultdir)
# attach logs to log files.
# This happens in addition to anything that
# is set up in the config file... ie. logs go everywhere
for (log, filename, fmt_str) in (
(self.state.state_log, "state.log", self.config['state_log_fmt_str']),
(self.build_log, "build.log", self.config['build_log_fmt_str']),
(self.root_log, "root.log", self.config['root_log_fmt_str'])):
fullPath = os.path.join(self.resultdir, filename)
fh = logging.FileHandler(fullPath, "a+")
formatter = logging.Formatter(fmt_str)
fh.setFormatter(formatter)
fh.setLevel(logging.NOTSET)
log.addHandler(fh)
log.info("Mock Version: %s", self.config['version'])
def emit(self, record):
if record.levelno < logging.WARNING and self._modules and not record.name in self._modules:
# Log INFO and DEBUG only with enabled modules
return
levels = {
logging.CRITICAL: xbmc.LOGFATAL,
logging.ERROR: xbmc.LOGERROR,
logging.WARNING: xbmc.LOGWARNING,
logging.INFO: xbmc.LOGNOTICE,
logging.DEBUG: xbmc.LOGSEVERE,
logging.NOTSET: xbmc.LOGNONE,
}
try:
xbmc.log(self.format(record), levels[record.levelno])
except:
try:
xbmc.log(self.format(record).encode('utf-8', 'ignore'), levels[record.levelno])
except:
xbmc.log(b"[%s] Unicode Error in message text" % self.pluginName, levels[record.levelno])
def DEFAULT_LOGGING_CONFIG(level=logging.WARN, format=LOG_FORMAT):
"""Returns a default logging config in dict format.
Compatible with logging.config.dictConfig(), this default set the root
logger to `level` with `sys.stdout` console handler using a formatter
initialized with `format`. A simple 'brief' formatter is defined that
shows only the message portion any log entries."""
return {
"version": 1,
"formatters": {"generic": {"format": format},
"brief": {"format": "%(message)s"},
},
"handlers": {"console": {"class": "logging.StreamHandler",
"level": "NOTSET",
"formatter": "generic",
"stream": "ext://sys.stdout",
},
},
"root": {"level": level,
"handlers": ["console"],
},
"loggers": {},
}
def __init__(self, login_ip, login_port, game_ip, game_port, magic=None, single_quotes=False, logger=None):
self._login_ip = login_ip
self._login_port = login_port
self._game_ip = game_ip
self._game_port = game_port
if logger is None:
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
self._logger = logger
self._magic = "Y(02.>'H}t\":E1" if magic is None else magic
self._single_quotes = single_quotes
self._connected = False
self._buffer = ""
self._handlers = {}
self._nexts = []
self._internal_room_id = -1
self._id = -1
self._coins = -1
self._room = -1
self._penguins = {}
self._follow = None
def setup_logging(handlers, facility, level):
global log
log = logging.getLogger('export-trac')
formatter = logging.Formatter(' | '.join(['%(asctime)s', '%(name)s', '%(levelname)s', '%(message)s']))
if handlers in ['syslog', 'both']:
sh = logging.handlers.SysLogHandler(address='/dev/log', facility=facility)
sh.setFormatter(formatter)
log.addHandler(sh)
if handlers in ['stdout', 'both']:
ch = logging.StreamHandler()
ch.setFormatter(formatter)
log.addHandler(ch)
lmap = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET
}
log.setLevel(lmap[level])
def __init__(
self, nameparts,
propagate=False,
level=logging.NOTSET,
log_dir=None,
*args, **kwargs
):
super(BaseLogger, self).__init__()
self._nameparts = nameparts
self._propagate = None
self._level = None
self._log_dir = None
self._name = ''.join(map(str, nameparts.values()))
self.logger = logging.getLogger(self._name)
self.propagate = propagate
self.level = level
self.log_dir = log_dir
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def setup_logging(opts):
logging.root.addHandler(logging.NullHandler())
if opts.verbose:
logging_levels = {
0:logging.CRITICAL,
1:logging.ERROR,
2:logging.WARNING,
3:logging.INFO,
4:logging.DEBUG,
5:logging.NOTSET
}
fmt = '%(asctime)-15s.%(msecs)03d [%(threadName)10.10s] [%(levelname)6.6s] %(name)s#%(funcName)s:%(lineno)s %(message)s'
datefmt = '%Y-%m-%dT%H:%M:%S'
formatter = logging.Formatter(fmt, datefmt=datefmt)
formatter.converter = time.gmtime
hnd = logging.StreamHandler(stream=sys.stdout)
hnd.setFormatter(formatter)
logging.root.addHandler(hnd)
logging.root.setLevel(logging_levels[opts.verbose - 1])
def __init__(self, level=logging.NOTSET, host=mongo_server, port=27017, database_name='logs', collection='logs',
username=None, password=None, fail_silently=False, formatter=None):
"""Setting up mongo handler, initializing mongo database connection via pymongo."""
logging.Handler.__init__(self, level)
self.host = host
self.port = port
self.database_name = database_name
self.collection_name = collection
self.username = username
self.password = password
self.fail_silently = fail_silently
self.connection = None
self.db = None
self.collection = None
self.authenticated = False
self.formatter = formatter or MongoFormatter()
self._connect()
def _handle_existing_loggers(existing, child_loggers, disable_existing):
"""
When (re)configuring logging, handle loggers which were in the previous
configuration but are not in the new configuration. There's no point
deleting them as other threads may continue to hold references to them;
and by disabling them, you stop them doing any logging.
However, don't disable children of named loggers, as that's probably not
what was intended by the user. Also, allow existing loggers to NOT be
disabled if disable_existing is false.
"""
root = logging.root
for log in existing:
logger = root.manager.loggerDict[log]
if log in child_loggers:
logger.level = logging.NOTSET
logger.handlers = []
logger.propagate = True
else:
logger.disabled = disable_existing
def init_logger(self, log_type, path):
"""
:param path: Where log file will be created
:param log_type: Type of log to recording. Example logging.NOTSET
"""
self.__log_file_path = os.path.join(path, self.__log_filename)
log_formatter = logging.Formatter(
"%(asctime)s [%(levelname)-8.8s] %(message)s")
self.__logger = logging.getLogger()
file_handler = SizedTimedRotatingFileHandler(
self.__log_file_path,
max_bytes=1000000,
backup_count=5,
interval=24,
# encoding='bz2',
# uncomment for bz2 compression
)
file_handler.setFormatter(log_formatter)
self.__logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
self.__logger.addHandler(console_handler)
self.__logger.setLevel(log_type)
def saveLoggingContext(self, logcfg_url, oldstyle_loglevel, rscCtx ):
# apply resource context to macro definitions
if rscCtx:
rscCtx.apply(self.loggingMacros )
self.loggingCtx = rscCtx
# test we have a logging URLx
self.loggingURL = logcfg_url
if logcfg_url==None or logcfg_url=="" :
self.logConfig = ossie.logger.GetDefaultConfig()
else:
# try to process URL and grab contents
try:
cfg_data=ossie.logger.GetConfigFileContents( logcfg_url )
if cfg_data and len(cfg_data) > 0 :
self.logConfig = ossie.logger.ExpandMacros( cfg_data, self.loggingMacros )
except:
pass
# apply logging level if explicitly stated
if oldstyle_loglevel != None and oldstyle_loglevel > -1 :
loglevel = ossie.logger.ConvertLogLevel(oldstyle_loglevel)
self.setLogLevel( "", loglevel )
if self._log and self._log.getEffectiveLevel() == logging.NOTSET:
self.setLevel( self._logid, loglevel )
else:
if self._log and self._log.getEffectiveLevel() == logging.NOTSET:
loglevel = ossie.logger.ConvertLog4ToCFLevel( logging.getLogger(None).getEffectiveLevel() )
self.setLogLevel( self._logid, loglevel )
# assign an event channel manager to the logging library
ossie.logger.SetEventChannelManager( self._ecm )
def __init__(self):
self.stream = sys.stdout # in log4j stdout is default
self.threshold = logging.NOTSET
self.log4pyProps = {'strm':sys.stdout}
def __init__(self):
self.threshold = logging.NOTSET
self.log4pyProps = {}
def __init__(self):
self.threshold = logging.NOTSET
self.log4pyProps = {}
def __init__(self):
self.threshold = logging.NOTSET
self.log4pyProps = {}
def setUp(self):
self.setContext()
if self.srcData:
self.seq = self.srcData
else:
self.seq = range(100)
self.orb = CORBA.ORB_init();
self.rootPOA = self.orb.resolve_initial_references("RootPOA")
self.logger = logging.getLogger(self.ptype[0])
self.logger.setLevel(logging.NOTSET)
self.logger.info( "Setup - Multiout Create Ports Table " );
self.ip1 = bulkio.InFloatPort("sink_1", self.logger );
self.ip1_oid = self.rootPOA.activate_object(self.ip1);
self.ip2 = bulkio.InFloatPort("sink_2", self.logger );
self.ip2_oid = self.rootPOA.activate_object(self.ip2);
self.ip3 = bulkio.InFloatPort("sink_3", self.logger );
self.ip3_oid = self.rootPOA.activate_object(self.ip3);
self.ip4 = bulkio.InFloatPort("sink_4", self.logger );
self.ip4_oid = self.rootPOA.activate_object(self.ip4);
self.port = bulkio.OutFloatPort("multiout_source", self.logger );
self.port_oid = self.rootPOA.activate_object(self.port);
self.desc_list=[];
self.logger.info( "Setup - Multiout Connection Table " );
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_4", stream_id="stream-4-1" ) )
def setUp(self):
self.setContext()
if self.srcData:
self.seq = self.srcData
else:
self.seq = range(100)
self.orb = CORBA.ORB_init();
self.rootPOA = self.orb.resolve_initial_references("RootPOA")
self.logger = logging.getLogger(self.ptype[0])
self.logger.setLevel(logging.NOTSET)
self.logger.info( "Setup - Multiout Create Ports Table " );
self.ip1 = bulkio.InFloatPort("sink_1", self.logger );
self.ip1_oid = self.rootPOA.activate_object(self.ip1);
self.ip2 = bulkio.InFloatPort("sink_2", self.logger );
self.ip2_oid = self.rootPOA.activate_object(self.ip2);
self.ip3 = bulkio.InFloatPort("sink_3", self.logger );
self.ip3_oid = self.rootPOA.activate_object(self.ip3);
self.ip4 = bulkio.InFloatPort("sink_4", self.logger );
self.ip4_oid = self.rootPOA.activate_object(self.ip4);
self.port = bulkio.OutFloatPort("multiout_source", self.logger );
self.port_oid = self.rootPOA.activate_object(self.port);
self.desc_list=[];
self.logger.info( "Setup - Multiout Connection Table " );
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-1" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-2" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-3" ) )
self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_4", stream_id="stream-4-1" ) )
def _resetExistingLoggers(parent="root"):
""" Reset the logger named 'parent' and all its children to their initial
state, if they already exist in the current configuration.
"""
root = logging.root
# get sorted list of all existing loggers
existing = sorted(root.manager.loggerDict.keys())
if parent == "root":
# all the existing loggers are children of 'root'
loggers_to_reset = [parent] + existing
elif parent not in existing:
# nothing to do
return
elif parent in existing:
loggers_to_reset = [parent]
# collect children, starting with the entry after parent name
i = existing.index(parent) + 1
prefixed = parent + "."
pflen = len(prefixed)
num_existing = len(existing)
while i < num_existing:
if existing[i][:pflen] == prefixed:
loggers_to_reset.append(existing[i])
i += 1
for name in loggers_to_reset:
if name == "root":
root.setLevel(logging.WARNING)
for h in root.handlers[:]:
root.removeHandler(h)
for f in root.filters[:]:
root.removeFilters(f)
root.disabled = False
else:
logger = root.manager.loggerDict[name]
logger.level = logging.NOTSET
logger.handlers = []
logger.filters = []
logger.propagate = True
logger.disabled = False
def __init__(self, level=logging.NOTSET):
"""
Initialize the handler.
"""
logging.Handler.__init__(self, level)
def __init__(self,
name=None,
router=None,
load_env=True,
log_config=LOGGING):
if log_config:
logging.config.dictConfig(log_config)
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not (logging.root.handlers and
log.level == logging.NOTSET and
log_config):
formatter = logging.Formatter(
"%(asctime)s: %(levelname)s: %(message)s")
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
# Get name from previous stack frame
if name is None:
frame_records = stack()[1]
name = getmodulename(frame_records[1])
self.name = name
self.config = Config(load_env=load_env)
self.router = router or PathRouter()
self.debug = None
self.static_handler = None