Python syslog 模块,LOG_ERR 实例源码
我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用syslog.LOG_ERR。
def confirm_leave(self, owner, level):
"""
??????owner,level???????facility,severity
"""
# ?????
facility = syslog.LOG_USER
severity = syslog.LOG_INFO
# as??????syslog????
level_info = dict(INFO=syslog.LOG_INFO, WARN=syslog.LOG_WARNING,
ERROR=syslog.LOG_ERR, DEBUG=syslog.LOG_DEBUG)
if level in level_info.keys():
severity = level_info[level]
# ????
if owner in ['ecms_troubleshoot']:
severity = syslog.syslog.LOG_EMERG
return facility, severity
def test_format_colored_with_level_error(self):
self.message.level = syslog.LOG_ERR
log = TailFormatter('({source}) - {message}', color=True).format(self.message)
self.assertEquals(colored('(dummy.source) - dummy message', 'red'), log)
def test_get_log_level_from_code(self):
self.assertEquals('CRITICAL', LogLevel.find_by_syslog_code(syslog.LOG_CRIT)['name'])
self.assertEquals('WARNING', LogLevel.find_by_syslog_code(syslog.LOG_WARNING)['name'])
self.assertEquals('DEBUG', LogLevel.find_by_syslog_code(syslog.LOG_DEBUG)['name'])
self.assertEquals('INFO', LogLevel.find_by_syslog_code(syslog.LOG_INFO)['name'])
self.assertEquals('ERROR', LogLevel.find_by_syslog_code(syslog.LOG_ERR)['name'])
self.assertEquals('NOTICE', LogLevel.find_by_syslog_code(syslog.LOG_NOTICE)['name'])
self.assertEquals('', LogLevel.find_by_syslog_code(9999)['name'])
def test_get_log_level_code(self):
self.assertEquals(syslog.LOG_CRIT, LogLevel.find_by_level_name('CRITICAL'))
self.assertEquals(syslog.LOG_WARNING, LogLevel.find_by_level_name('WARNING'))
self.assertEquals(syslog.LOG_DEBUG, LogLevel.find_by_level_name('DEBUG'))
self.assertEquals(syslog.LOG_INFO, LogLevel.find_by_level_name('INFO'))
self.assertEquals(syslog.LOG_ERR, LogLevel.find_by_level_name('ERROR'))
self.assertEquals(syslog.LOG_NOTICE, LogLevel.find_by_level_name('NOTICE'))
self.assertIsNone(LogLevel.find_by_level_name('UNKNOWN'))
def logerr(msg):
logmsg(syslog.LOG_ERR, msg)
# some unit labels are rather lengthy. this reduces them to something shorter.
def handle_finished_download():
torrent_id = os.environ['TR_TORRENT_ID']
syslog.syslog('Beginning processing of torrent {0}'.format(torrent_id))
transmission_client = transmissionrpc.Client(
transmission_host, port=transmission_port, user=transmission_user,
password=transmission_password
)
torrent = transmission_client.get_torrent(torrent_id)
# Make sure transmission called us with a completed torrent
if torrent.progress != 100.0:
syslog.syslog(syslog.LOG_ERR, 'Called with an incomplete torrent')
sys.exit(1)
if couchpotato_category in torrent.downloadDir:
if not ignore_couchpotato:
handle_couchpotato(torrent)
elif sonarr_category in torrent.downloadDir:
if not ignore_sonarr:
handle_sonarr(torrent)
else:
handle_manual(torrent)
# Immediately remove torrents that are not whitelisted (by tracker)
if not whitelisted(torrent):
transmission_client.remove_torrent(torrent_id)
pb_notify('Removed non-whitelisted torrent {0}'.format(torrent.name))
def commit(db_conn):
try:
db_conn.commit()
except sqlite3.Error as e:
print('sql error: ' + e.args[0])
syslog(LOG_ERR, 'sql error: ' + e.args[0])
raise e
def executeSQL(db_conn, sql_string, add_information={}):
c = db_conn.cursor()
try:
entries = c.execute(sql_string)
add_information['id'] = c.lastrowid
except sqlite3.Error as e:
print('sql error: ' + e.args[0])
syslog(LOG_ERR, 'sql error: ' + e.args[0])
raise e
return entries
def log_err(err):
print >>sys.stderr, err
syslog.syslog(syslog.LOG_USER | syslog.LOG_ERR, "%s: %s" % (sys.argv[0], err))
def witter(foo):
if log_details:
syslog.syslog(syslog.LOG_USER | syslog.LOG_ERR, foo)
print >>sys.stderr, foo
def test_levels(journal, logger):
"""Mapping between log levels and Syslog priorities."""
with JournalHandler().threadbound():
logger.critical('Critical message')
assert journal['PRIORITY'] == str(syslog.LOG_CRIT)
logger.error('Error message')
assert journal['PRIORITY'] == str(syslog.LOG_ERR)
logger.warning('Warning message')
assert journal['PRIORITY'] == str(syslog.LOG_WARNING)
logger.notice('Notice message')
assert journal['PRIORITY'] == str(syslog.LOG_NOTICE)
logger.info('Info message')
assert journal['PRIORITY'] == str(syslog.LOG_INFO)
logger.debug('Debug message')
assert journal['PRIORITY'] == str(syslog.LOG_DEBUG)
def handle_manual(torrent):
auto_processed = False
def handle_media(path, move):
nonlocal auto_processed
guess = guessit.guessit(path)
if guess['type'] == 'episode':
move_episode(path, guess, move)
auto_processed = True
elif guess['type'] == 'movie':
move_movie(path, guess, move)
auto_processed = True
part_regex = re.compile('.*part(\d+).rar', re.IGNORECASE)
for index, file in torrent.files().items():
file_path = os.path.join(torrent.downloadDir, file['name'])
if check_extension(file_path) and 'sample' not in file_path.lower():
# Log and ignore mkv files of less than ~92MiB
try:
if os.path.getsize(file_path) >= 96811278:
handle_media(file_path, False)
else:
syslog.syslog(
syslog.LOG_ERR, 'Detected false media file, skipping'
)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, 'Torrent file missing, skipping')
elif file_path.endswith('rar'):
# Ignore parts beyond the first in a rar series
match = part_regex.match(file_path)
if match and int(match.group(1)) > 1:
continue
with tempfile.TemporaryDirectory() as temp_dir:
paths = extract(file_path, temp_dir)
if paths:
for path in paths:
shutil.chown(path, group=plex_group)
os.chmod(path, 0o664)
handle_media(path, True)
if auto_processed:
pb_notify(
textwrap.dedent(
'''
Manually added torrent {0} finished downloading
and was auto-processed
'''.format(torrent.name)
).strip()
)
else:
pb_notify(
'Manually added torrent {0} finished downloading'.format(
torrent.name
)
)