Python logging 模块,LogRecord() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用logging.LogRecord()。
def format(self, record):
"""Apply little arrow and colors to the record.
Arrow and colors are only applied to sphinxcontrib.versioning log statements.
:param logging.LogRecord record: The log record object to log.
"""
formatted = super(ColorFormatter, self).format(record)
if self.verbose or not record.name.startswith(self.SPECIAL_SCOPE):
return formatted
# Arrow.
formatted = '=> ' + formatted
# Colors.
if not self.colors:
return formatted
if record.levelno >= logging.ERROR:
formatted = str(colorclass.Color.red(formatted))
elif record.levelno >= logging.WARNING:
formatted = str(colorclass.Color.yellow(formatted))
else:
formatted = str(colorclass.Color.cyan(formatted))
return formatted
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
"""
Create a LogRecord.
:param name: The name of the logger that produced the log record.
:param level: The logging level (severity) associated with the logging record.
:param fn: The name of the file (if known) where the log entry was created.
:param lno: The line number (if known) in the file where the log entry was created.
:param msg: The log message (or message template).
:param args: Ordinal message format arguments (if any).
:param exc_info: Exception information to be included in the log entry.
:param func: The function (if known) where the log entry was created.
:param extra: Extra information (if any) to add to the log record.
:param sinfo: Stack trace information (if known) for the log entry.
"""
# Do we have named format arguments?
if extra and 'log_props' in extra:
return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])
return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None):
"""
Create a `LogRecord`.
:param name: The name of the logger that produced the log record.
:param level: The logging level (severity) associated with the logging record.
:param fn: The name of the file (if known) where the log entry was created.
:param lno: The line number (if known) in the file where the log entry was created.
:param msg: The log message (or message template).
:param args: Ordinal message format arguments (if any).
:param exc_info: Exception information to be included in the log entry.
:param func: The function (if known) where the log entry was created.
:param extra: Extra information (if any) to add to the log record.
:param sinfo: Stack trace information (if known) for the log entry.
"""
# Do we have named format arguments?
if extra and 'log_props' in extra:
return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props'])
return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
def _get_local_timestamp(record):
"""
Get the record's UTC timestamp as an ISO-formatted date / time string.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: The ISO-formatted date / time string.
:rtype: str
"""
timestamp = datetime.fromtimestamp(
timestamp=record.created,
tz=tzlocal()
)
return timestamp.isoformat(sep=' ')
def __is_kms_encrypt_request(self, record): # pylint: disable=no-self-use
# type: (logging.LogRecord) -> bool
"""Determine if a record contains a kms:Encrypt request.
:param record: Logging record to filter
:type record: logging.LogRecord
:rtype: bool
"""
try:
return all((
record.name == 'botocore.endpoint',
record.msg.startswith('Making request'),
cast(tuple, record.args)[-1]['headers']['X-Amz-Target'] == 'TrentService.Encrypt'
))
except Exception: # pylint: disable=broad-except
return False
def __is_kms_response_with_plaintext(self, record): # pylint: disable=no-self-use
# type: (logging.LogRecord) -> bool
"""Determine if a record contains a KMS response with plaintext.
:param record: Logging record to filter
:type record: logging.LogRecord
:rtype: bool
"""
try:
return all((
record.name == 'botocore.parsers',
record.msg.startswith('Response body:'),
b'KeyId' in cast(tuple, record.args)[0],
b'Plaintext' in cast(tuple, record.args)[0]
))
except Exception: # pylint: disable=broad-except
return False
def __init__(self, name, level, fn, lno, msg, args, exc_info, func, extra):
logging.LogRecord.__init__(self, name, level, fn, lno, msg, args, exc_info, func)
log_details = get_log_details()
log_details.update(extra or {})
for k in log_details.iterkeys():
setattr(self, k, log_details[k])
logger_fields = "levelname", "levelno", "process", "thread", "name", \
"filename", "module", "funcName", "lineno"
for f in logger_fields:
log_details["logger"][f] = getattr(self, f, None)
try:
correlation_id = request.correlation_id
except Exception:
correlation_id = None
log_details["logger"]["correlation_id"] = correlation_id
log_details["logger"]["created"] = datetime.datetime.utcnow().isoformat() + "Z"
for k in log_details.iterkeys():
setattr(self, k, log_details[k])
def test_custom_handler(self, mocker):
handler = DummyHandler()
mock = mocker.MagicMock()
handler.emit = mock
logger = make_logger()
logger.handlers = [handler]
disable(NOTSET)
logger.debug('test')
assert mock.call_count == 1
emit_call = mock.mock_calls[0]
name, args, kwargs = emit_call
assert name == ''
log_record = args[0]
assert isinstance(log_record, LogRecord)
assert log_record.msg == 'test'
assert log_record.levelname == 'DEBUG'
del logger
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(ColoredStreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
def test_pretty_xml_handler(self):
# Test that a normal, non-XML log record is passed through unchanged
stream = io.BytesIO() if PY2 else io.StringIO()
stream.isatty = lambda: True
h = PrettyXmlHandler(stream=stream)
self.assertTrue(h.is_tty())
r = logging.LogRecord(name='baz', level=logging.INFO, pathname='/foo/bar', lineno=1, msg='hello', args=(), exc_info=None)
h.emit(r)
h.stream.seek(0)
self.assertEqual(h.stream.read(), 'hello\n')
# Test formatting of an XML record. It should contain newlines and color codes.
stream = io.BytesIO() if PY2 else io.StringIO()
stream.isatty = lambda: True
h = PrettyXmlHandler(stream=stream)
r = logging.LogRecord(name='baz', level=logging.DEBUG, pathname='/foo/bar', lineno=1, msg='hello %(xml_foo)s', args=({'xml_foo': b'<?xml version="1.0" encoding="UTF-8"?><foo>bar</foo>'},), exc_info=None)
h.emit(r)
h.stream.seek(0)
self.assertEqual(
h.stream.read(),
"hello \x1b[36m<?xml version='1.0' encoding='utf-8'?>\x1b[39;49;00m\n\x1b[34;01m<foo\x1b[39;49;00m\x1b[34;01m>\x1b[39;49;00mbar\x1b[34;01m</foo>\x1b[39;49;00m\n\n"
)
def test_record_none_exc_info(django_elasticapm_client):
# sys.exc_info can return (None, None, None) if no exception is being
# handled anywhere on the stack. See:
# http://docs.python.org/library/sys.html#sys.exc_info
record = logging.LogRecord(
'foo',
logging.INFO,
pathname=None,
lineno=None,
msg='test',
args=(),
exc_info=(None, None, None),
)
handler = LoggingHandler()
handler.emit(record)
assert len(django_elasticapm_client.events) == 1
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert event['log']['param_message'] == 'test'
assert event['log']['logger_name'] == 'foo'
assert event['log']['level'] == 'info'
assert 'exception' not in event
def format(self, record):
""" Formats the record.msg string by using click.style()
The record will have attributes set to it when a user logs a message
with any kwargs given. This function looks for any attributes that
are in STYLES. If record has any sytle attributes, the record will
be styled with the given sytle. This makes click logging with a logger very easy.
Args:
record (logging.LogRecord): record which gets styled with click.style()
"""
# Sets the default kwargs
kwargs = dict()
# checks if any click args were passed along in the logger
for kwarg_name in self.STYLES:
if hasattr(record, kwarg_name):
kwargs[kwarg_name] = getattr(record, kwarg_name)
# styles the message of the record if a style was given
if kwargs:
record.msg = click.style(record.msg, **kwargs)
return record
def terminate(self):
if self.terminated:
return
self.terminated = True
def flushevents():
while True:
try:
event = self.event_queue.get(block=False)
except (Empty, IOError):
break
if isinstance(event, logging.LogRecord):
logger.handle(event)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.procserver.stop()
while self.procserver.is_alive():
flushevents()
self.procserver.join(0.1)
self.ui_channel.close()
self.event_queue.close()
self.event_queue.setexit()
# Wrap Queue to provide API which isn't server implementation specific
def test_connector_log_formatter_dict_format(self):
formatter = ConnectorLogFormatter()
message = {'message': 'fake_message', 'type': 'message_type'}
record = LogRecord('fake_name', 'DEBUG', None, None, message,
None, None)
record.reseller_name = 'fake_reseller_name'
# as return result of formatter.format is a string we will check the substring
actual_record = formatter.format(record)
assert 'fake_reseller_name' in actual_record
assert 'MESSAGE_TYPE' in actual_record
assert datetime.now().isoformat(' ')[:-7] in actual_record
record.msg = {
'text': 'fake_text'
}
actual_record = formatter.format(record)
assert 'fake_text' in actual_record
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def format(self, record):
"""
:param logging.LogRecord record:
"""
super(HtmlFormatter, self).format(record)
if record.funcName:
record.funcName = escape_html(str(record.funcName))
if record.name:
record.name = escape_html(str(record.name))
if record.msg:
record.msg = escape_html(record.getMessage())
if self.use_emoji:
print(record.name, record.levelno, record.levelname)
if record.levelno == logging.DEBUG:
print(record.levelno, record.levelname)
record.levelname += ' ' + EMOJI.WHITE_CIRCLE
elif record.levelno == logging.INFO:
print(record.levelno, record.levelname)
record.levelname += ' ' + EMOJI.BLUE_CIRCLE
else:
record.levelname += ' ' + EMOJI.RED_CIRCLE
return self.fmt % record.__dict__
def format(self, record):
"""Formats the string representation of record.
:param logging.LogRecord record: Record to be formatted
:returns: Formatted, string representation of record
:rtype: str
"""
out = (logging.StreamHandler.format(self, record)
if sys.version_info < (2, 7)
else super(StreamHandler, self).format(record))
if self.colored and record.levelno >= self.red_level:
return ''.join((util.ANSI_SGR_RED, out, util.ANSI_SGR_RESET))
else:
return out
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def test_json_formatter(self):
name = 'name'
line = 42
module = 'some_module'
func = 'some_function'
msg = {'content': 'sample log'}
log_record = LogRecord(
name, INFO, module, line, msg, None, None, func=func
)
formatter = JSONFormatter()
log_result = formatter.format(log_record)
result = json.loads(log_result)
# check some of the fields to ensure json formatted correctly
self.assertEqual(name, result['name'])
self.assertEqual(line, result['lineNumber'])
self.assertEqual(func, result['functionName'])
self.assertEqual(module, result['module'])
self.assertEqual('INFO', result['level'])
self.assertEqual(msg, result['message'])
def _two_masters_error_logged(
self,
log_records: List[logging.LogRecord],
) -> bool:
"""
Return whether a particular error is logged as a DEBUG message.
This is prone to being broken as it checks for a string in the DC/OS
repository.
Args:
log_records: Messages logged from the logger.
Returns:
Whether a particular error is logged as a DEBUG message.
"""
message = 'Must have 1, 3, 5, 7, or 9 masters'
encountered_error = False
for record in log_records:
if record.levelno == logging.DEBUG and message in str(record.msg):
encountered_error = True
return encountered_error
def format(self, record):
"""Formatting function
@param object record: :logging.LogRecord:
"""
entry = {
'name': record.name,
'timestamp': self.formatTime(record,
datefmt="%Y-%m-%d %H:%M:%S"),
'level': record.levelname
}
if hasattr(record, 'message'):
entry['message'] = record.message
else:
entry['message'] = super().format(record)
# add exception information if available
if record.exc_info is not None:
entry['exception'] = {
'message': traceback.format_exception(
*record.exc_info)[-1][:-1],
'traceback': traceback.format_exception(
*record.exc_info)[:-1]
}
return entry
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def logMessages(self, request):
"""Write a log message from the Swarm FE to the log.
Args:
request: A log message request.
Returns:
Void message.
"""
Level = api_backend.LogMessagesRequest.LogMessage.Level
log = logging.getLogger(__name__)
for message in request.messages:
level = message.level if message.level is not None else Level.info
# Create a log record and override the pathname and lineno. These
# messages come from the front end, so it's misleading to say that they
# come from api_backend_service.
record = logging.LogRecord(name=__name__, level=level.number, pathname='',
lineno='', msg=message.message, args=None,
exc_info=None)
log.handle(record)
return message_types.VoidMessage()
def format(self, record: logging.LogRecord) -> str:
"""Return formatted string of log record.
Args:
record: The log record.
Returns:
Formatted string of log record.
"""
try:
record.asctime
except AttributeError:
record.asctime = self.formatTime(record, self.datefmt)
try:
record.message
except AttributeError:
record.message = record.msg
s = '{} - {:>8} - {:>26} - {}'.format(record.asctime,
record.levelname,
record.name,
record.message)
return s
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def handle_new_task(self, task_name, record):
"""
Do everything needed when a task is starting
Params:
task_name (str): name of the task that is starting
record (logging.LogRecord): log record with all the info
Returns:
None
"""
record.msg = ColorFormatter.colored('default', START_TASK_MSG)
record.task = task_name
self.tasks[task_name] = Task(name=task_name, maxlen=self.buffer_size)
if self.should_show_by_depth():
self.pretty_emit(record, is_header=True)
def test_label_nonstring():
"""Logging things that aren't string is fine"""
result = {
"records": [
logging.LogRecord("root", "INFO", "", 0, msg, [], None)
for msg in (
"Proper message",
12,
{"a": "dict"},
list(),
1.0,
)
],
"error": None
}
model_ = model.Terminal()
model_.update_with_result(result)
for item in model_:
assert isinstance(item.data(model.Label), six.text_type), (
"\"%s\" wasn't a string!" % item.data(model.Label))
def emit(self, record):
"""
Emit a log record.
:param record: The LogRecord.
"""
self.log_queue.put(record, block=False)
def _build_event_data(record):
"""
Build an event data dictionary from the specified log record for submission to Seq.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: A dictionary containing event data representing the log record.
:rtype: dict
"""
if record.args:
# Standard (unnamed) format arguments (use 0-base index as property name).
log_props_shim = get_global_log_properties(record.name)
for (arg_index, arg) in enumerate(record.args or []):
log_props_shim[str(arg_index)] = arg
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": log_props_shim
}
elif isinstance(record, StructuredLogRecord):
# Named format arguments (and, therefore, log event properties).
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.msg,
"Properties": record.log_props
}
else:
# No format arguments; interpret message as-is.
event_data = {
"Timestamp": _get_local_timestamp(record),
"Level": logging.getLevelName(record.levelno),
"MessageTemplate": record.getMessage(),
"Properties": _global_log_props
}
return event_data
def __redact_encrypt_request(self, record):
# type: (logging.LogRecord) -> None
"""Redact the ``Plaintext`` value from a kms:Encrypt request.
:param record: Logging record to filter
:type record: logging.LogRecord
"""
try:
parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[-1]['body']))
parsed_body['Plaintext'] = _REDACTED
cast(tuple, record.args)[-1]['body'] = json.dumps(parsed_body, sort_keys=True)
except Exception: # pylint: disable=broad-except
return
def __redact_key_from_response(self, record):
# type: (logging.LogRecord) -> None
"""Redact the ``Plaintext`` value from a KMS response body.
:param record: Logging record to filter
:type record: logging.LogRecord
"""
try:
parsed_body = json.loads(self.__to_str(cast(tuple, record.args)[0]))
parsed_body['Plaintext'] = _REDACTED
new_args = (json.dumps(parsed_body, sort_keys=True),) + cast(tuple, record.args)[1:]
record.args = new_args
except Exception: # pylint: disable=broad-except
return
def __redact_record(self, record):
# type: (logging.LogRecord) -> logging.LogRecord
"""Redact any values from a record, as necessary.
:param record: Logging record to filter
:type record: logging.LogRecord
"""
_record = copy.deepcopy(record)
if self.__is_kms_encrypt_request(_record):
self.__redact_encrypt_request(_record)
elif self.__is_kms_response_with_plaintext(_record):
self.__redact_key_from_response(_record)
return _record
def filter(self, record):
# type: (logging.LogRecord) -> bool
"""Determines whether to filter record.
:param record: Logging record to filter
:type record: logging.LogRecord
:rtype: bool
"""
return record.name not in self.__blacklist
def create_record(name='test_logger', level='DEBUG', msg='foobar'):
return LogRecord(name=name, level=LEVEL_MAP[level], msg=msg, lineno=None,
args=None, exc_info=None, pathname=None)
# LoggerFilter
def assertLogRecords(self, records, matches):
self.assertEqual(len(records), len(matches))
for rec, match in zip(records, matches):
self.assertIsInstance(rec, logging.LogRecord)
for k, v in match.items():
self.assertEqual(getattr(rec, k), v)
def __init__(self, records: typing.List[logging.LogRecord]):
self._records = records
def filter_by_level(self, minimum_level=logging.NOTSET):
minimum_level = self._sanitize_level(minimum_level)
def filter_func(rec: logging.LogRecord):
return rec.levelno >= minimum_level
return self.__filter(filter_func)
def _filter_by_str(self, text: str or None, rec_attrib):
if text is None:
return self
def filter_func(rec: logging.LogRecord):
return text.lower() in getattr(rec, rec_attrib)
return self.__filter(filter_func)
def __iter__(self) -> typing.Generator[logging.LogRecord, None, None]:
for x in self._records:
yield x
def handle_record(self, record: logging.LogRecord):
raise NotImplementedError()
def emit(self, record: logging.LogRecord):
self._records.append(record)
for follower in self._followers:
follower.handle_record(record)
def records(self) -> typing.Iterator[logging.LogRecord]:
return self.filter_records(**self.filters)
def handle_record(self, record: logging.LogRecord):
if self.filter_record(record, **self.filters):
I.tab_log_write(self.format(record), str(self.colors[record.levelname]))
def _send(self):
content = []
for rec in self.filter_records():
assert isinstance(rec, logging.LogRecord)
content.append(self.format(rec))
url = pastebin.create_new_paste('\n'.join(content))
if url:
SENTRY.captureMessage('Logfile', extra={'log_url': url})
self.tab_log_write('Log file sent; thank you !')
else:
self.tab_log_write('Could not send log file')
def shouldFlush(self, record):
"""Should the buffer be automatically flushed?
:param logging.LogRecord record: log record to be considered
:returns: False because the buffer should never be auto-flushed
:rtype: bool
"""
return False
def emit(self, record):
"""Log the specified logging record.
:param logging.LogRecord record: Record to be formatted
"""
self._delete = False
# logging handlers use old style classes in Python 2.6 so
# super() cannot be used
if sys.version_info < (2, 7): # pragma: no cover
logging.StreamHandler.emit(self, record)
else:
super(TempHandler, self).emit(record)
def filter(self, record: logging.LogRecord):
if record.exc_info == (None, None, None):
return False
else:
return True