Python logging 模块,Filter() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.Filter()。
def setup_logging(verbose=0, colors=False, name=None):
"""Configure console logging. Info and below go to stdout, others go to stderr.
:param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build.
:param bool colors: Print color text in non-verbose mode.
:param str name: Which logger name to set handlers to. Used for testing.
"""
root_logger = logging.getLogger(name)
root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO)
formatter = ColorFormatter(verbose > 0, colors)
if colors:
colorclass.Windows.enable()
handler_stdout = logging.StreamHandler(sys.stdout)
handler_stdout.setFormatter(formatter)
handler_stdout.setLevel(logging.DEBUG)
handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)}))
root_logger.addHandler(handler_stdout)
handler_stderr = logging.StreamHandler(sys.stderr)
handler_stderr.setFormatter(formatter)
handler_stderr.setLevel(logging.WARNING)
root_logger.addHandler(handler_stderr)
def isIPv(version, ip):
"""Check if **ip** is a certain **version** (IPv4 or IPv6).
.. warning: Do *not* put any calls to the logging module in this function,
or else an infinite recursion will occur when the call is made, due
the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog`
using this function to validate matches from the regular expression
for IP addresses.
:param integer version: The IPv[4|6] version to check; must be either
``4`` or ``6``. Any other value will be silently changed to ``4``.
:param ip: The IP address to check. May be an any type which
:class:`ipaddr.IPAddress` will accept.
:rtype: boolean
:returns: ``True``, if the address is an IPv4 address.
"""
try:
ipaddr.IPAddress(ip, version=version)
except (ipaddr.AddressValueError, Exception):
return False
else:
return True
return False
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def setup_logger(self, logger, level=None):
if isinstance(logger, str):
logger = logging.getLogger(logger)
if level is None:
logger.setLevel(logging.DEBUG if settings.DEVELOPMENT_MODE else logging.DEBUG)
else:
logger.setLevel(level)
for handler in logging.root.handlers:
handler.addFilter(logging.Filter("gold-digger"))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"[%(levelname)s] %(asctime)s at %(filename)s:%(lineno)d (%(processName)s) -- %(message)s",
"%Y-%m-%d %H:%M:%S")
)
logger.addHandler(handler)
if not settings.DEVELOPMENT_MODE:
handler = graypy.GELFHandler(settings.GRAYLOG_ADDRESS, settings.GRAYLOG_PORT)
logger.addHandler(handler)
return logger
def test_load_filters_onlyused(self):
self.m_obj.load_config({
"handlers" : {
"root" : {
"filters" : [ "f2" ]
}
},
"filters" : {
"f1" : {
"class" : "logging.Filter"
},
"f2" : {
"class" : "logging.Filter"
}
}
})
self.m_obj._load_filters()
self.assertEqual(len(self.m_obj.m_filters), 1)
self.assertEqual(list(self.m_obj.m_filters.keys())[0], "f2")
def test_load_filters_noclass(self):
self.m_obj.load_config({
"handlers" : {
"root" : {
"filters" : [ "f2" ]
}
},
"filters" : {
"f1" : {
"param" : "logging.Filter"
},
"f2" : {
"param" : "logging.Filter"
}
}
})
with self.assertRaises(error.XtdError):
self.m_obj._load_filters()
def SetupLoggers():
global logger
logger = logging.getLogger()
logging.info("testing before")
logger.setLevel(logging.DEBUG)
#logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log")
#flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w')
#flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s'))
#logger.addHandler(flog)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
clog = logging.StreamHandler()
clog.addFilter(logging.Filter(name='CADIS'))
clog.setFormatter(logging.Formatter('[%(name)s] %(message)s'))
clog.setLevel(logging.DEBUG)
logger.addHandler(clog)
def SetupLoggers():
global logger
logger = logging.getLogger()
logging.info("testing before")
logger.setLevel(logging.DEBUG)
#logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log")
#flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w')
#flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s'))
#logger.addHandler(flog)
logging.getLogger("requests").setLevel(logging.WARNING)
clog = logging.StreamHandler()
clog.addFilter(logging.Filter(name='CADIS'))
clog.setFormatter(logging.Formatter('[%(name)s] %(message)s'))
clog.setLevel(logging.DEBUG)
logger.addHandler(clog)
def setupLoggers():
global logger
logger = logging.getLogger()
logging.info("testing before")
logger.setLevel(logging.DEBUG)
# logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log")
# flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w')
# flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s'))
# logger.addHandler(flog)
logging.getLogger("requests").setLevel(logging.WARNING)
clog = logging.StreamHandler()
clog.addFilter(logging.Filter(name='CADIS'))
clog.setFormatter(logging.Formatter('[%(name)s] %(message)s'))
clog.setLevel(logging.DEBUG)
logger.addHandler(clog)
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def setup_stream_handlers(conf):
"""Setup logging stream handlers according to the options."""
class StdoutFilter(logging.Filter):
def filter(self, record):
return record.levelno in (logging.DEBUG, logging.INFO)
if log.handlers:
for handler in log.handlers:
log.removeHandler(handler)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.WARNING)
stdout_handler.addFilter(StdoutFilter())
if conf.debug:
stdout_handler.setLevel(logging.DEBUG)
elif conf.verbose:
stdout_handler.setLevel(logging.INFO)
else:
stdout_handler.setLevel(logging.WARNING)
log.addHandler(stdout_handler)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.WARNING)
log.addHandler(stderr_handler)
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def test_nag_timer(self):
w = []
l = logging.getLogger()
class _Filter(logging.Filter):
def filter(self, record):
if record.levelno == logging.WARNING:
w.append(record.getMessage().lstrip())
return 0
f = _Filter()
l.addFilter(f)
proc = subprocess2.Popen(
self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
res = proc.communicate(nag_timer=3), proc.returncode
l.removeFilter(f)
self._check_res(res, 'A\nBB\nCCC\n', None, 0)
expected = ['No output for 3 seconds from command:', proc.cmd_str,
'No output for 6 seconds from command:', proc.cmd_str,
'No output for 9 seconds from command:', proc.cmd_str]
self.assertEquals(w, expected)
def test_nag_timer(self):
w = []
l = logging.getLogger()
class _Filter(logging.Filter):
def filter(self, record):
if record.levelno == logging.WARNING:
w.append(record.getMessage().lstrip())
return 0
f = _Filter()
l.addFilter(f)
proc = subprocess2.Popen(
self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
res = proc.communicate(nag_timer=3), proc.returncode
l.removeFilter(f)
self._check_res(res, 'A\nBB\nCCC\n', None, 0)
expected = ['No output for 3 seconds from command:', proc.cmd_str,
'No output for 6 seconds from command:', proc.cmd_str,
'No output for 9 seconds from command:', proc.cmd_str]
self.assertEquals(w, expected)
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def test_nag_timer(self):
w = []
l = logging.getLogger()
class _Filter(logging.Filter):
def filter(self, record):
if record.levelno == logging.WARNING:
w.append(record.getMessage().lstrip())
return 0
f = _Filter()
l.addFilter(f)
proc = subprocess2.Popen(
self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
res = proc.communicate(nag_timer=3), proc.returncode
l.removeFilter(f)
self._check_res(res, 'A\nBB\nCCC\n', None, 0)
expected = ['No output for 3 seconds from command:', proc.cmd_str,
'No output for 6 seconds from command:', proc.cmd_str,
'No output for 9 seconds from command:', proc.cmd_str]
self.assertEquals(w, expected)
def setup_stream_handlers(options):
"""Setup logging stream handlers according to the options."""
class StdoutFilter(logging.Filter):
def filter(self, record):
return record.levelno in (logging.DEBUG, logging.INFO)
if log.handlers:
for handler in log.handlers:
log.removeHandler(handler)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.WARNING)
stdout_handler.addFilter(StdoutFilter())
if options.debug:
stdout_handler.setLevel(logging.DEBUG)
elif options.verbose:
stdout_handler.setLevel(logging.INFO)
else:
stdout_handler.setLevel(logging.WARNING)
log.addHandler(stdout_handler)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.WARNING)
log.addHandler(stderr_handler)
def __init__(self, name='', lvls='normal', lvldiff=None):
logging.Filter.__init__(self, name)
if not isinstance(lvls, list) and lvls in levels:
self.levels = levels[lvls]
elif not isinstance(lvls, list) and not lvls is None:
self.levels = [lvls]
else:
self.levels = lvls
self.lvlhide = []
self.lvlshow = []
if not lvldiff: lvldiff = []
for ld in lvldiff:
if ld.startswith('-'):
self.lvlhide.append(ld[1:].upper())
elif ld.startswith('+'):
self.lvlshow.append(ld[1:].upper())
else:
self.lvlshow.append(ld.upper())
def pytest_configure():
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
class Shorten(logging.Filter):
max_len = 500
def filter(self, record):
if record.msg == '%r':
record.msg = record.msg % record.args
record.args = ()
if len(record.msg) > self.max_len:
record.msg = record.msg[:self.max_len] + '...'
return True
logging.getLogger('sqlalchemy.engine.base.Engine').addFilter(Shorten())
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
if '()' in config:
result = self.configure_custom(config)
else:
name = config.get('name', '')
result = logging.Filter(name)
return result
def __init__(self, *args, **kwargs):
# Can't use super() because logging.Filter is an old-style class in py26
logging.Filter.__init__(self, *args, **kwargs)
self.warning_count = self.error_count = 0
def __init__(self, *args, **kwargs):
# Can't use super() because logging.Filter is an old-style class in py26
logging.Filter.__init__(self, *args, **kwargs)
self.warning_count = self.error_count = 0
def __init__(self, *args, **kwargs):
# Can't use super() because logging.Filter is an old-style class in py26
logging.Filter.__init__(self, *args, **kwargs)
self.warning_count = self.error_count = 0
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
if '()' in config:
result = self.configure_custom(config)
else:
name = config.get('name', '')
result = logging.Filter(name)
return result
def __init__(self, level):
self._level = level
logging.Filter.__init__(self)
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def filter(self, record: logging.LogRecord):
"""Filter log messages."""
if self.origin is None:
return True
for _filter in self.origin:
if record.origin.startswith(_filter):
return False if self.exclude else True
return True if self.exclude else False
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def __init__(self, invert=False):
logging.Filter.__init__(self)
self._invert = invert
self.exp = re.compile(r"Post|Meta|Process")
def __init__(self, bot):
self.bot = bot
logging.Filter.__init__(self)
def basic_config(logger: logging.Logger = logging.root, level=logging.INFO):
"""
Configures a logger to log <=INFO to stdout and >INFO to stderr
:param logger: Logger to configure, defaults to logging.root
:param level: Defaults to INFO
:return: configured logger (logger from parameters)
"""
logger.setLevel(level)
class InfoFilter(logging.Filter):
def filter(self, rec):
return rec.levelno in (logging.DEBUG, logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s", "%d/%m/%Y %H:%M:%S")
std_out_handler = logging.StreamHandler(sys.stdout)
std_out_handler.setLevel(logging.DEBUG)
std_out_handler.setFormatter(formatter)
std_out_handler.addFilter(InfoFilter())
std_err_handler = logging.StreamHandler()
std_err_handler.setLevel(logging.WARNING)
std_err_handler.setFormatter(formatter)
logger.addHandler(std_out_handler)
logger.addHandler(std_err_handler)
return logger
def __init__(self, name, level):
logging.Filter.__init__(self)
self.name = name
self.level = level
def test_logger_filter(self):
# Filter at logger level.
self.root_logger.setLevel(VERBOSE)
# Levels >= 'Verbose' are good.
self.log_at_all_levels(self.root_logger)
self.assert_log_lines([
('Verbose', '5'),
('Sociable', '6'),
('Effusive', '7'),
('Terse', '8'),
('Taciturn', '9'),
('Silent', '10'),
])
def test_handler_filter(self):
# Filter at handler level.
self.root_logger.handlers[0].setLevel(SOCIABLE)
try:
# Levels >= 'Sociable' are good.
self.log_at_all_levels(self.root_logger)
self.assert_log_lines([
('Sociable', '6'),
('Effusive', '7'),
('Terse', '8'),
('Taciturn', '9'),
('Silent', '10'),
])
finally:
self.root_logger.handlers[0].setLevel(logging.NOTSET)
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
if '()' in config:
result = self.configure_custom(config)
else:
name = config.get('name', '')
result = logging.Filter(name)
return result
def __init__(self, level):
self._level = level
logging.Filter.__init__(self)
def __init__(self, level):
self.filter_level = level
logging.Filter.__init__(self)
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def __init__(self):
logging.Filter.__init__(self)
self.warning_table = {}
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
#
# First, we define our levels. There can be as many as you want - the only
# limitations are that they should be integers, the lowest should be > 0 and
# larger values mean less information being logged. If you need specific
# level values which do not fit into these limitations, you can use a
# mapping dictionary to convert between your application levels and the
# logging system.
#
def test_logger_filter(self):
# Filter at logger level.
self.root_logger.setLevel(VERBOSE)
# Levels >= 'Verbose' are good.
self.log_at_all_levels(self.root_logger)
self.assert_log_lines([
('Verbose', '5'),
('Sociable', '6'),
('Effusive', '7'),
('Terse', '8'),
('Taciturn', '9'),
('Silent', '10'),
])
def test_handler_filter(self):
# Filter at handler level.
self.root_logger.handlers[0].setLevel(SOCIABLE)
try:
# Levels >= 'Sociable' are good.
self.log_at_all_levels(self.root_logger)
self.assert_log_lines([
('Sociable', '6'),
('Effusive', '7'),
('Terse', '8'),
('Taciturn', '9'),
('Silent', '10'),
])
finally:
self.root_logger.handlers[0].setLevel(logging.NOTSET)
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
if '()' in config:
result = self.configure_custom(config)
else:
name = config.get('name', '')
result = logging.Filter(name)
return result
def test_filter(self):
# Only messages satisfying the specified criteria pass through the
# filter.
filter_ = logging.Filter("spam.eggs")
handler = self.root_logger.handlers[0]
try:
handler.addFilter(filter_)
spam = logging.getLogger("spam")
spam_eggs = logging.getLogger("spam.eggs")
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
spam.info(self.next_message())
spam_eggs.info(self.next_message()) # Good.
spam_eggs_fish.info(self.next_message()) # Good.
spam_bakedbeans.info(self.next_message())
self.assert_log_lines([
('spam.eggs', 'INFO', '2'),
('spam.eggs.fish', 'INFO', '3'),
])
finally:
handler.removeFilter(filter_)
#
# First, we define our levels. There can be as many as you want - the only
# limitations are that they should be integers, the lowest should be > 0 and
# larger values mean less information being logged. If you need specific
# level values which do not fit into these limitations, you can use a
# mapping dictionary to convert between your application levels and the
# logging system.
#