Python logging 模块,getLevelName() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.getLevelName()。
def create_logger():
#logging.basicConfig(format='%(levelname)s - %(message)s')
logging.basicConfig(format='%(message)s')
root = logging.getLogger()
root.setLevel(logging.getLevelName('INFO'))
#Add handler for standard output (console) any debug+
#ch = logging.StreamHandler(sys.stdout)
#ch.setLevel(logging.getLevelName('DEBUG'))
#formatter = logging.Formatter('%(message)s')
#ch.setFormatter(formatter)
#handler = ColorStreamHandler()
#handler.setLevel(logging.getLevelName("DEBUG"))
#root.addHandler(handler)
return root
def _cli_log_message(msg, logger_name=None, level="INFO"):
"""
Log a single message to Flightlog. Intended for CLI usage. Calling this
function multiple times within the same process will configure duplicate
handlers and result in duplicate messages.
"""
logger = logging.getLogger(logger_name)
levelnum = logging.getLevelName(level.upper())
try:
int(levelnum)
except ValueError:
raise ValueError("level must be one of DEBUG, INFO, WARNING, ERROR, CRITICAL")
handler = FlightlogHandler(background=False)
logger.addHandler(handler)
logger.setLevel(levelnum)
if msg == "-":
msg = sys.stdin.read()
for line in msg.splitlines():
if line:
logger.log(levelnum, line)
exit_code = 0
return None, exit_code
def test_log_msg():
# This test assumes the default message format found around line 139
# of linchpin/cli/context.py
lvl=logging.DEBUG
msg = 'Test Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log(msg, level=lvl)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def test_log_debug():
lvl=logging.DEBUG
msg = 'Debug Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_debug(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def get_log_config(level):
return {
'version': 1,
'formatters': {
'basicFormatter': {
'format': '[%(asctime)s %(levelname)s %(threadName)s] %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'basicFormatter',
'stream': 'ext://sys.stdout'
}
},
'loggers': {
'hephaestus': {
'level': logging.getLevelName(level),
'propagate': False,
'handlers': ['console']
}
}
}
def main(argv):
""" MySQL binlog to Google Pub/Sub entry point
Args:
argv (list): list of command line arguments
"""
args = _setup_arg_parser(argv)
conf_file = args.conf
if conf_file:
os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file
if args.logconf:
logging.config.fileConfig(args.logconf, disable_existing_loggers=False)
else:
logging.basicConfig()
if args.loglevel:
logging.root.setLevel(logging.getLevelName(args.loglevel.upper()))
import mysqlbinlog2gpubsub
mysqlbinlog2gpubsub.start_publishing()
def test_invalid_value(self):
"""
Sends an invalid value to the handler.
"""
message = 'Needs more cowbell.'
context = {
'key': 'test',
'value': "(Don't Fear) The Reaper",
}
self.handler.handle_invalid_value(message, False, context)
self.assertEqual(len(self.logs.records), 1)
self.assertEqual(self.logs[0].msg, message)
self.assertEqual(getattr(self.logs[0], 'context'), context)
# The log message level is set in the handler's initializer.
self.assertEqual(self.logs[0].levelname, getLevelName(WARNING))
# No exception info for invalid values (by default).
self.assertIsNone(self.logs[0].exc_text)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(output)
def init(cls):
cls.logger = logging.getLogger(_LOGGER_NAME)
logger = cls.logger
levelname = os.environ.get('JUBAKIT_LOG_LEVEL', None)
if not levelname:
# Surpress printing logs by default.
logger.addHandler(cls._NullHandler())
logger.setLevel(CRITICAL)
return
# Setup logger from environment variable.
for lvl in (DEBUG, INFO, WARNING, ERROR, CRITICAL):
if logging.getLevelName(lvl) == levelname:
setup_logger(lvl)
break
else:
setup_logger(INFO)
logger.warning('invalid JUBAKIT_LOG_LEVEL (%s) specified; continue with INFO', levelname)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(strutils.encode(output, 'utf-8'))
def setup_logging(args_obj):
if args_obj.verbose:
level = logging.DEBUG
else:
level = logging.getLevelName(args_obj.log_level.upper())
formatter = logging.Formatter(
fmt='%(asctime)-15s - %(levelname)s - %(name)s'
'[line:%(lineno)d thread:%(threadName)s(%(thread)d) '
'process:%(processName)s(%(process)d)]'
' - %(message)s'
)
logger = logging.getLogger()
logger.setLevel(level)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def log_formatter(line):
split = line.split(' / ')
if len(split) == 4:
return {
# Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime
#
"timestamp": date_parse(split[0] + "+00"),
"level": logging.getLevelName(split[1]),
"message": split[3],
"process": split[2],
"device_id": None,
}
elif len(split) == 6:
return {
# Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime
"timestamp": date_parse(split[0]),
"level": logging.getLevelName(split[1]),
"message": split[5],
"process": split[3],
"device_id": split[4],
}
else:
raise RuntimeError("The logs in the log file are of an unknown format, cannot continue. "
"Line: {}".format(line))
def _has_streamhandler(logger, level=None, fmt=LOG_FORMAT,
stream=DEFAULT_STREAM):
"""Check the named logger for an appropriate existing StreamHandler.
This only returns True if a StreamHandler that exaclty matches
our specification is found. If other StreamHandlers are seen,
we assume they were added for a different purpose.
"""
# Ensure we are talking the same type of logging levels
# if they passed in a string we need to convert it to a number
if isinstance(level, basestring):
level = logging.getLevelName(level)
for handler in logger.handlers:
if not isinstance(handler, logging.StreamHandler):
continue
if handler.stream is not stream:
continue
if handler.level != level:
continue
if not handler.formatter or handler.formatter._fmt != fmt:
continue
return True
return False
def main():
config = options()
logging.basicConfig(level=logging.getLevelName(config.log_level),
format=config.log_format)
db_connection = get_connection(config.log_service.db.servers,
config.log_service.db.replica_name)
collection = get_collection(config.log_service.db.name,
config.log_service.db.collection,
db_connection)
agent_log_service = AgentLogService(config.log_service.bind_address,
collection)
LOG.info('Starting logging service on {}'.format(
config.log_service.bind_address))
agent_log_service.start()
def main():
config = get_inventory_configuration()
logging.basicConfig(level=logging.getLevelName(config.log_level),
format=config.log_format)
loop = zmq.asyncio.ZMQEventLoop()
loop.set_debug(config.asyncio_debug)
asyncio.set_event_loop(loop)
s = InventoryServer(bind_address=config.inventory.bind_address,
config=config)
try:
loop.run_until_complete(s.start())
except KeyboardInterrupt:
log.info('Stopping service')
s.kill()
finally:
pending = asyncio.Task.all_tasks(loop=loop)
loop.run_until_complete(asyncio.gather(*pending))
loop.close()
def _init_logger(verbosity):
# set up the logger
global logger
logger = logging.getLogger('conda_mirror')
logmap = {0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG}
loglevel = logmap.get(verbosity, '3')
# clear all handlers
for handler in logger.handlers:
logger.removeHandler(handler)
logger.setLevel(loglevel)
format_string = '%(levelname)s: %(message)s'
formatter = logging.Formatter(fmt=format_string)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(loglevel)
stream_handler.setFormatter(fmt=formatter)
logger.addHandler(stream_handler)
print("Log level set to %s" % logging.getLevelName(logmap[verbosity]),
file=sys.stdout)
def get_configuration():
"""
get a configuration (snapshot) that can be used to call configure
snapshot = get_configuration()
configure(**snapshot)
"""
root = getLogger()
name_levels = [('', logging.getLevelName(root.level))]
name_levels.extend(
(name, logging.getLevelName(logger.level))
for name, logger
in root.manager.loggerDict.items()
if hasattr(logger, 'level')
)
config_string = ','.join('%s:%s' % x for x in name_levels)
return dict(config_string=config_string, log_json=SLogger.manager.log_json)
def initialize_logger(name, log_level):
"""initializes the logger to a level with a name
logger = initialize_logger(name, log_level)
Parameters
----------
name : str
name of the logger
log_level :
Returns
-------
logging.Logger
a logger set with the name and level specified
"""
level = logging.getLevelName(log_level)
logging.basicConfig()
logger = logging.getLogger(name)
logger.setLevel(level=level)
return logger
def main():
args_parser = argparse.ArgumentParser()
args_parser.add_argument('-c', '--config')
args = args_parser.parse_args()
powergslb.system.parse_config(args.config)
config = powergslb.system.get_config()
logging.basicConfig(
format=config.get('logging', 'format'),
level=logging.getLevelName(config.get('logging', 'level'))
)
service_threads = [
powergslb.monitor.MonitorThread(name='Monitor'),
powergslb.server.ServerThread(name='Server')
]
service = powergslb.system.SystemService(service_threads)
service.start()
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def configure_logging(verbosity: int, logPath: str, isDaemon=False):
rootLogger = logging.getLogger()
if logPath:
logPath = Path(logPath).expanduser()
else:
name = 'i3configger-daemon.log' if isDaemon else 'i3configger.log'
logPath = Path(tempfile.gettempdir()) / name
if DEBUG:
print('logging to %s' % logPath)
level = logging.getLevelName('DEBUG')
else:
level = logging.getLevelName(
{0: 'ERROR', 1: 'WARNING', 2: 'INFO'}.get(verbosity, 'DEBUG'))
fmt = ('%(asctime)s %(name)s:%(funcName)s:%(lineno)s '
'%(levelname)s: %(message)s')
if not rootLogger.handlers:
logging.basicConfig(format=fmt, level=level)
fileHandler = logging.FileHandler(logPath)
fileHandler.setFormatter(logging.Formatter(fmt))
fileHandler.setLevel(level)
rootLogger.addHandler(fileHandler)
def get_logger(name, level=INFO, fac=SysLogHandler.LOG_LOCAL1):
global LOG_TRANS
for lt in LOG_TRANS:
if not LOG_TRANS[lt]['old']:
LOG_TRANS[lt]['old'] = logging.getLevelName(lt)
logging.addLevelName(lt, LOG_TRANS[lt]['new'])
fmt = F('[%(name)s.%(funcName)s]: %(message)s')
log = logging.getLogger('%s' % name.split('.')[-1])
h = SysLogHandler(address='/dev/log', facility=parse_fac(fac))
h.setFormatter(fmt)
log.addHandler(h)
# h = StreamHandler(stream=LOGBUF)
# h.setFormatter(fmt)
# log.addHandler(h)
log.setLevel(level)
log.success = lambda msg: log.log(LOG_SUCCES, msg)
return log
def get_configuration():
"""
get a configuration (snapshot) that can be used to call configure
snapshot = get_configuration()
configure(**snapshot)
"""
root = getLogger()
name_levels = [('', logging.getLevelName(root.level))]
name_levels.extend(
(name, logging.getLevelName(logger.level))
for name, logger
in root.manager.loggerDict.items()
if hasattr(logger, 'level')
)
config_string = ','.join('%s:%s' % x for x in name_levels)
return dict(config_string=config_string, log_json=SLogger.manager.log_json)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(output)
def create_log(self, log_path=None, log_level='DEBUG'):
"""Create a log file for debug output.
Args:
log_path: Path to log file. If None or empty
the log path name will be the
command line invocation name (argv[0]) with
a '.log' suffix in the user's home directory.
log_level: Log level:
'DEBUG', 'INFO', 'WARNING', 'ERROR', or 'CRITICAL'.
Default is 'DEBUG'.
"""
if log_path is None or not log_path:
log_dir = os.path.expanduser('~')
log_path = os.path.join(log_dir, sys.argv[0] + '.log')
log_path = os.path.expanduser(log_path)
logging.basicConfig(filename=os.path.abspath(log_path),
filemode='w', level=log_level.upper())
logger = logging.getLogger(__name__)
logger.info('Log started %s, level=%s', datetime.datetime.now(),
logging.getLevelName(logger.getEffectiveLevel()))
def log_to_stderr(level, formatter=_LOG_FORMATTER,
handler=logging.StreamHandler):
"""Setup logging or set logging level to STDERR.
Args:
level: a logging level, like logging.INFO
formatter: a logging.Formatter object
handler: logging.StreamHandler (this argument is for testing)
"""
global _STDERR_HANDLER
_level = get_loglevel(level)
if type(_STDERR_HANDLER) is handler:
_STDERR_HANDLER.setLevel(_level)
else:
_STDERR_HANDLER = handler(stream=sys.stderr)
_STDERR_HANDLER.setLevel(_level)
_STDERR_HANDLER.setFormatter(formatter)
logging.getLogger('').addHandler(_STDERR_HANDLER)
logging.debug('Setting logging at level=%s',
logging.getLevelName(_level))
def log_to_file(filename, level=INFO, formatter=_LOG_FORMATTER,
filemode=APPEND, handler=logging.FileHandler):
"""Setup logging or set logging level to file.
Args:
filename: string of path/file to write logs
level: a logging level, like logging.INFO
formatter: a logging.Formatter object
filemode: a mode of writing, like app.APPEND or app.CLOBBER
handler: logging.FileHandler (this argument is for testing)
"""
global _FILE_HANDLER
_level = get_loglevel(level)
if type(_FILE_HANDLER) is handler:
_FILE_HANDLER.setLevel(_level)
else:
_FILE_HANDLER = handler(filename=filename, mode=filemode)
_FILE_HANDLER.setLevel(_level)
_FILE_HANDLER.setFormatter(formatter)
logging.getLogger('').addHandler(_FILE_HANDLER)
logging.info('Logging to file %s [mode=\'%s\', level=%s]',
os.path.abspath(filename), filemode,
logging.getLevelName(_level))
def setup_logger(args):
warnings.filterwarnings("ignore", category=TechPreviewWarning)
warnings.filterwarnings("ignore", category=SeenWarning)
# Set up logging according to command-line verbosity
logger = logging.getLogger()
logger.setLevel(int(30 - (args.loglevel * 10)))
ch = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter(u'%(asctime)s %(name)s %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("Set logging level to {0}".format(logging.getLevelName(logger.getEffectiveLevel())))
# If log level is reduced, disable emissions from the `warnings` module; aka the
if not logger.isEnabledFor(logging.WARNING):
warnings.simplefilter("ignore")
return logger
def checkConfig(self):
if not self.interface:
raise ConfigError("You must configure an interface")
if not self.domain:
raise ConfigError("You must configure a domain")
if not self.realm:
raise ConfigError("You must configure a realm")
if not self.honey_username:
raise ConfigError("You must configure a honeytoken username")
if self.log_level.upper() not in {"CRITICAL","ERROR","WARNING","INFO","DEBUG","NOTSET"}:
raise ConfigError("Invalid setting for log level")
else:
level = logging.getLevelName(self.log_level.upper())
logging.getLogger().setLevel(level)
def add_file_handler(self, path, name, level=None):
levelname = logging.getLevelName(level) if level is not None \
else 'DEFAULT'
filename = '{path}/{name}.{level}.log'.format(
path=os.path.abspath(path), name=name,
level=levelname)
if filename not in self.file_handlers:
from logging.handlers import TimedRotatingFileHandler
file_handler = TimedRotatingFileHandler(filename, when="midnight",
backupCount=7)
self.file_handlers[filename] = file_handler
if level is not None:
file_handler.setLevel(level)
self.add_handler(file_handler)
def make_logging_level_names_consistent():
"""Rename the standard library's logging levels to match Twisted's.
Twisted's new logging system in `twisted.logger` that is.
"""
for level in list(logging._levelToName):
if level == logging.NOTSET:
# When the logging level is not known in Twisted it's rendered as
# a hyphen. This is not a common occurrence with `logging` but we
# cater for it anyway.
name = "-"
elif level == logging.WARNING:
# "Warning" is more consistent with the other level names than
# "warn", so there is a fault in Twisted here. However it's easier
# to change the `logging` module to match Twisted than vice-versa.
name = "warn"
else:
# Twisted's level names are all lower-case.
name = logging.getLevelName(level).lower()
# For a preexisting level this will _replace_ the name.
logging.addLevelName(level, name)
def fetch_url(cls, session, msites, platform_id, purpose):
"""Actual method to do fetch url action.
Parameters
----------
msites : list
a list of Site model class, contains info to build spiders.
platform_id : int
id of platform, bind fetched url with this id.
purpose : {'update', 'archive'}
indicate which url to fetch.
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.UrlPipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
for ms in msites:
for sm in build_spiders_iter(ms, purpose):
sm['kwargs']['session'] = session
sm['kwargs']['platform_id'] = platform_id
process.crawl(sm['cls'], *sm['args'], **sm['kwargs'])
process.start()
def fetch_html(cls, session, url_tuples):
"""Actual method to do fetch html action.
Parameters
----------
session : object
a SQLAlchemy session object.
url_tuples : list
a list of url tuple (id, raw, status_code).
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.HtmlPipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
logger.warning('Number of url to fetch html is: %s', len(url_tuples))
process.crawl(
HtmlSpider,
session=session,
url_tuples=url_tuples,
excluded_domains=cls.conf['crawl']['excluded_domains'])
process.start()
def parse_article(cls, session, url_tuples):
"""Actual method to do parse to article action.
Parameters
----------
session : object
a SQLAlchemy session object.
url_tuples : list
a list of url tuple (id, created_at, date_published,
canonical, site_id)
"""
settings = Settings(cls.conf['crawl']['scrapy'])
settings.set('ITEM_PIPELINES',
{'hoaxy.crawl.pipelines.ArticlePipeline': 300})
process = CrawlerProcess(settings)
sll = cls.conf['logging']['loggers']['scrapy']['level']
logging.getLogger('scrapy').setLevel(logging.getLevelName(sll))
logger.info('Number of url to parse is: %s', len(url_tuples))
process.crawl(
ArticleParserSpider,
session=session,
url_tuples=url_tuples,
api_key=cls.conf['crawl']['article_parser']['webparser_api_key'],)
process.start()
def __init__(self, config):
super(self.__class__, self).__init__()
self.dirs = config.dirs
self.files = config.files
self.parsing_dirs = config.dirs is not None
self.config = config
log_level = logging.getLevelName(config.debug.upper())
log.setLevel(log_level)
# mininet version stores a SeismicStatistics object to hold stats extracted from results
self.stats = None
# This will be set when parsing files to determine the type of experimental
# results file we're working with. Can be either 'mininet' or 'networkx'
self.experiment_type = None
def __init__(self, virtapi, read_only=False):
super(IronicDriver, self).__init__(virtapi)
global ironic
if ironic is None:
ironic = importutils.import_module('ironicclient')
# NOTE(deva): work around a lack of symbols in the current version.
if not hasattr(ironic, 'exc'):
ironic.exc = importutils.import_module('ironicclient.exc')
if not hasattr(ironic, 'client'):
ironic.client = importutils.import_module(
'ironicclient.client')
self.firewall_driver = firewall.load_driver(
default='nova.virt.firewall.NoopFirewallDriver')
self.node_cache = {}
self.node_cache_time = 0
ironicclient_log_level = CONF.ironic.client_log_level
if ironicclient_log_level:
level = py_logging.getLevelName(ironicclient_log_level)
logger = py_logging.getLogger('ironicclient')
logger.setLevel(level)
self.ironicclient = client_wrapper.IronicClientWrapper()
def parse_expressions(self, expressions):
""" Parse a list of logger matching expressions of the form
<regex>=<log-level>. Place the compiled regex's and levels
in the expressions attribute. """
lines = expressions.split('\n')
for line in lines:
try:
# Use the right split so we can have '='s in the regex
regex, level = line.rsplit('=', 1)
pattern = re.compile(regex)
results = (pattern, logging.getLevelName(level.upper()))
self.logger.log(
TraceLogger.TRACE,
'Appending %s:%s to logger level expressions' % (
results[0], results[1]))
self.expressions.append(results)
except Exception, ex:
self.logger.\
error('Parser error in log configuration file: %s' % (
line))
self.logger.exception(ex)
def setlogdir(logdir):
'''set the log directory'''
# set log color
logging.addLevelName(logging.INFO, print_style('%s', fore='green') % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, print_style('%s', fore='red') % logging.getLevelName(logging.WARNING))
ldir = os.path.dirname(logdir)
writelog = os.path.join(ldir, 'log.log')
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=writelog,
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
def logging_level(self):
""" **Syntax:** logging_level=[CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET]
**Description:** Sets the threshold for the logger of this command invocation. Logging messages less severe than
`logging_level` will be ignored.
"""
return getLevelName(self._logger.getEffectiveLevel())
def logging_level(self):
""" **Syntax:** logging_level=[CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET]
**Description:** Sets the threshold for the logger of this command invocation. Logging messages less severe than
`logging_level` will be ignored.
"""
return getLevelName(self._logger.getEffectiveLevel())
def _build_event_data(record):
"""
Build an event data dictionary from the specified log record for submission to Seq.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: A dictionary containing event data representing the log record.
:rtype: dict
"""
if record.args:
# Standard (unnamed) format arguments (use 0-base index as property name).
log_props_shim = get_global_log_properties(record.name)
for (arg_index, arg) in enumerate(record.args or []):
log_props_shim[str(arg_index)] = arg
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": log_props_shim
}
elif isinstance(record, StructuredLogRecord):
# Named format arguments (and, therefore, log event properties).
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.msg,
"Properties": record.log_props
}
else:
# No format arguments; interpret message as-is.
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": _global_log_props
}
return event_data
def setup_logger(self, level=logging.INFO):
"""Configure global log settings"""
if isinstance(level, int):
self.level = logging.getLevelName(level)
self.logger = logging.getLogger()
self.logger.setLevel(self.level)
if not len(self.logger.handlers):
ch = logging.StreamHandler(stream=sys.stderr)
logformat = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(logformat)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
def get_log_level():
level = logging.getLevelName(os.environ.get('LOG_LEVEL', '').upper())
if not isinstance(level, int):
level = DEFAULT_LEVEL
return level
def logLevels():
return [logging.getLevelName(n) for n in xrange(0, logging.CRITICAL + 1, 10)]
def _log(self, level, msg, event=()):
event = events.Event(event).union({
"logtime": time.strftime("%Y-%m-%d %H:%M:%SZ", time.gmtime()),
"logmsg": msg,
"loglevel": logging.getLevelName(level)
})
return self._logger.log(level, msg, **{"extra": {"event": event}})
def get_config(config='~/.nyttth/config.yml'):
global cfg
if not cfg:
cfgpath = os.path.expanduser(config)
log.debug('reading config from {}'.format(cfgpath))
cfg = dict()
if os.path.isfile(cfgpath):
with open(cfgpath, 'r') as stream:
cfg = yaml.load(stream)
else:
print 'config not found at {}. Create y/n?'.format(cfgpath)
if propmt_yn():
import errno
try:
os.makedirs(os.path.dirname(cfgpath))
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(cfgpath, 'w') as cfg_file:
cfg_file.write(SAMPLE_CONFIG)
print 'Sample configuration has been written to {}.\n You will need to edit '.format(cfgpath) + \
'this configuration with real values from your networking environment. Exiting.'
else:
print 'Exiting'
exit()
if 'log_level' in cfg:
# print('setting log level to {}'.format(cfg['log_level']))
log.setLevel(logging.getLevelName(cfg['log_level']))
cfg['basedir'] = os.path.dirname(cfgpath)
cfg['supervisor.conf'] = os.path.join(cfg['basedir'],'supervisord.conf')
return cfg
def __new__(cls, value):
if value in cls._level_instances:
return cls._level_instances[value]
instance = int.__new__(cls, value)
instance.name = logging.getLevelName(value)
cls._level_instances[value] = instance
return instance
def test_log_state():
lvl=logging.DEBUG
msg = '{0}: State Msg'.format(logging.getLevelName(lvl))
regex = '^{0}.*STATE - {1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_state(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def test_log_info():
lvl=logging.INFO
msg = 'Info Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_info(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)