Python logging 模块,ERROR 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.ERROR。
def format(self, record):
"""Apply little arrow and colors to the record.
Arrow and colors are only applied to sphinxcontrib.versioning log statements.
:param logging.LogRecord record: The log record object to log.
"""
formatted = super(ColorFormatter, self).format(record)
if self.verbose or not record.name.startswith(self.SPECIAL_SCOPE):
return formatted
# Arrow.
formatted = '=> ' + formatted
# Colors.
if not self.colors:
return formatted
if record.levelno >= logging.ERROR:
formatted = str(colorclass.Color.red(formatted))
elif record.levelno >= logging.WARNING:
formatted = str(colorclass.Color.yellow(formatted))
else:
formatted = str(colorclass.Color.cyan(formatted))
return formatted
def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ):
level = logging.INFO
if newLevel == CF.LogLevels.OFF :
level=logging.FATAL+1
if newLevel == CF.LogLevels.FATAL :
level=logging.FATAL
if newLevel == CF.LogLevels.ERROR :
level=logging.ERROR
if newLevel == CF.LogLevels.WARN :
level=logging.WARN
if newLevel == CF.LogLevels.INFO:
level=logging.INFO
if newLevel == CF.LogLevels.DEBUG:
level=logging.DEBUG
if newLevel == CF.LogLevels.TRACE:
level=logging.TRACE
if newLevel == CF.LogLevels.ALL:
level=logging.TRACE
return level
def maker(name):
my_level = getattr(logging, name.upper()) if name != 'exception' else logging.ERROR
altname = (name if name != 'exception' else 'error').upper()
def _log(self, msg, *args, **kwargs):
exc = kwargs.pop('exc_info', None) or name == 'exception'
tb = ('\n' + traceback.format_exc().strip()) if exc else ''
if args:
try:
msg = msg % args
except:
self.exception(
"Exception raised while formatting message:\n%s\n%r",
msg, args)
msg += tb
# todo: check level before printing
if self.level <= my_level:
print("%s %s %s"%(time.asctime(), altname, msg))
_log.__name__ = name
return _log
def __init__(self, appname, dllname=None, logtype="Application"):
logging.Handler.__init__(self)
try:
import win32evtlogutil, win32evtlog
self.appname = appname
self._welu = win32evtlogutil
if not dllname:
dllname = os.path.split(self._welu.__file__)
dllname = os.path.split(dllname[0])
dllname = os.path.join(dllname[0], r'win32service.pyd')
self.dllname = dllname
self.logtype = logtype
self._welu.AddSourceToRegistry(appname, dllname, logtype)
self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE,
logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE,
}
except ImportError:
print("The Python Win32 extensions for NT (service, event "\
"logging) appear not to be available.")
self._welu = None
def log(self, message, level=logging.DEBUG, depth=0):
"""Prepend string to log messages to denote class."""
if depth <= 0:
prefix = 'AmazonAccountUtils: '
else:
prefix = "\t" * depth
if level == CRITICAL:
self.logger.critical(prefix + str(message))
elif level == ERROR:
self.logger.error(prefix + str(message))
elif level == WARNING:
self.logger.warning(prefix + str(message))
elif level == INFO:
self.logger.info(prefix + str(message))
else:
self.logger.debug(prefix + str(message))
def q_hashpubkey(abe, page, chain):
"""shows the 160-bit hash of the given public key."""
pubkey = wsgiref.util.shift_path_info(page['env'])
if pubkey is None:
return \
"Returns the 160-bit hash of PUBKEY.\n" \
"For example, the Bitcoin genesis block's output public key," \
" seen in its transaction output scriptPubKey, starts with\n" \
"04678afdb0fe..., and its hash is" \
" 62E907B15CBF27D5425399EBF6F0FB50EBB88F18, corresponding" \
" to address 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa.\n" \
"/q/hashpubkey/PUBKEY\n"
try:
pubkey = pubkey.decode('hex')
except Exception:
return 'ERROR: invalid hexadecimal byte string.'
return util.pubkey_to_hash(pubkey).encode('hex').upper()
def deprecated(self, removal_version, msg, *args, **kwargs):
"""
Logs deprecation message which is log level WARN if the
``removal_version`` is > 1 minor release away and log level ERROR
otherwise.
removal_version should be the version that the deprecated feature is
expected to be removed in, so something that will not exist in
version 1.7, but will in 1.6 would have a removal_version of 1.7.
"""
from pip import __version__
if should_warn(__version__, removal_version):
self.warn(msg, *args, **kwargs)
else:
self.error(msg, *args, **kwargs)
def __check_input( opts, args, parser ):
"""
Make sure the input is in the form of either a cmp.h5 file of aligned reads
or a FOFN of unaligned bas.h5 files. Also make sure that a reference fasta
file is specified if
"""
if len(args)!=2:
print "ERROR -- expecting two arguments: \
(1) <SEQ>_methyl_features.txt output from methylprofiles containing methylation features for mapping \
(2) <SEQ>_other_features.txt output from methylprofiles containing alternative sequence features for mapping"
mfeats_fn = args[0]
ofeats_fn = args[1]
feature_type = None
if not os.path.exists(mfeats_fn):
parser.error("Can't find file of sequence features (methylprofiles output) for mapping: %s" % mfeats_fn)
if not os.path.exists(ofeats_fn):
parser.error("Can't find file of sequence features (methylprofiles output) for mapping: %s" % ofeats_fn)
return mfeats_fn, ofeats_fn
def __init__(self,options):
self.start_time = int(time.time())
self.logger = logging.getLogger(__name__)
self.output_directory = options.output_directory
self.verbose = options.verbose
self.threads = options.threads
self.kmer = options.kmer
self.min_kmers_threshold = options.min_kmers_threshold
self.max_kmers_threshold = options.max_kmers_threshold
self.input_files = options.input_files
self.keep_files = options.keep_files
self.object_to_be_cleaned = []
if self.verbose:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.ERROR)
self.kmc_major_version = KmcVersionDetect(self.verbose).major_version()
def __init__(self,output_directory, input_filename, threads, kmer, min_kmers_threshold, max_kmers_threshold, verbose):
self.logger = logging.getLogger(__name__)
self.output_directory = output_directory
self.input_filename = input_filename
self.threads = threads
self.kmer = kmer
if self.file_type_option() == '-fm':
# a FASTA file doesnt have a depth of coverage
self.min_kmers_threshold = 1
else:
self.min_kmers_threshold = min_kmers_threshold
self.max_kmers_threshold = max_kmers_threshold
self.temp_working_dir = tempfile.mkdtemp(dir=os.path.abspath(output_directory),prefix='tmp_samplekmers_')
self.verbose = verbose
if self.verbose:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.ERROR)
def getDLsize(self):
debug_log("getDLsize called")
it = QTreeWidgetItemIterator(self.tw)
while it.value():
item = it.value()
url_test = item.data(0, dataURL)
if url_test is not None:
try:
r = requests.head(url_test)
r.raise_for_status()
try:
size = (int(r.headers['Content-Length']) / 1024) / 1024
except ValueError:
size = 0
if size > 0:
item.setText(2, "{} MiB".format(round(size, 2)))
except requests.exceptions.HTTPError:
debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers))
item.setText(2, r.status_code)
except requests.exceptions.RequestException as e:
item.setText(2, self.tr("Error"))
debug_log(e, logging.ERROR)
it += 1
def setUp(self):
self.formatter = LogFormatter(color=False)
# Fake color support. We can't guarantee anything about the $TERM
# variable when the tests are run, so just patch in some values
# for testing. (testing with color off fails to expose some potential
# encoding issues from the control characters)
self.formatter._colors = {
logging.ERROR: u("\u0001"),
}
self.formatter._normal = u("\u0002")
# construct a Logger directly to bypass getLogger's caching
self.logger = logging.Logger('LogFormatterTest')
self.logger.propagate = False
self.tempdir = tempfile.mkdtemp()
self.filename = os.path.join(self.tempdir, 'log.out')
self.handler = self.make_handler(self.filename)
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)
def _get_logging_level():
"""
Converts our ENV variable HA_LOG_LEVEL to a logging level object
:return: logging level object
"""
_log_level = _get_config('LOG_LEVEL', 'info').lower()
to_return = logging.INFO
if _log_level == 'critical':
to_return = logging.CRITICAL
if _log_level == 'error':
to_return = logging.ERROR
if _log_level == 'warning':
to_return = logging.WARNING
if _log_level == 'debug':
to_return = logging.DEBUG
return to_return
def doActivateNode(registrar_ip,registrar_port,instance_id,key):
data = {
'auth_tag': crypto.do_hmac(base64.b64decode(key),instance_id),
}
v_json_message = json.dumps(data)
response = tornado_requests.request("PUT",
"http://%s:%s/v2/instances/%s/activate"%(registrar_ip,registrar_port,instance_id),
data=v_json_message,
context=None)
if response.status_code == 200:
logger.info("Registration activated for node %s."%instance_id)
else:
logger.error("Error: unexpected http response code from Registrar Server: " + str(response.status_code))
common.log_http_response(logger,logging.ERROR,response.json())
def doActivateVirtualNode(registrar_ip,registrar_port,instance_id,deepquote):
data = {
'deepquote': deepquote,
}
v_json_message = json.dumps(data)
response = tornado_requests.request("PUT",
"http://%s:%s/v2/instances/%s/vactivate"%(registrar_ip,registrar_port,instance_id),
data=v_json_message,
context=None)
if response.status_code == 200:
logger.info("Registration activated for node %s."%instance_id)
else:
logger.error("Error: unexpected http response code from Registrar Server: " + str(response.status_code))
common.log_http_response(logger,logging.ERROR,response.json())
def add_coloring_to_emit_ansi(fn):
RED_BOLD = '\x1b[31;1m'
RED = '\x1b[31m'
GREEN = '\x1b[32m'
YELLOW = '\x1b[33m'
BLUE = '\x1b[34m'
PINK = '\x1b[35m'
CYAN = '\x1b[36m'
DEFAULT = '\x1b[0m'
def new(*args):
levelno = args[1].levelno
color = DEFAULT
if levelno >= logging.CRITICAL:
color = RED_BOLD
elif levelno >= logging.ERROR:
color = RED
elif levelno >= logging.WARNING:
color = YELLOW
elif levelno >= logging.INFO:
color = DEFAULT
elif levelno >= logging.DEBUG:
color = GREEN
args[1].msg = color + str(args[1].msg) + DEFAULT
return fn(*args)
return new
def set_log(level, filename='jumpserver.log'):
"""
return a log file object
??????log??
"""
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('jumpserver')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
return logger_f
def connectionSetup(self):
log.info("CTRL.RX <<Server Greeting>>")
data = self.receive()
self.smode = struct.unpack('!I', data[12:16])[0]
log.info("TWAMP modes supported: %d", self.smode)
if self.smode & 1 == 0:
log.critical('*** TWAMPY only supports unauthenticated mode(1)')
log.info("CTRL.TX <<Setup Response>>")
self.send(struct.pack('!I', 1) + zeros(160))
log.info("CTRL.RX <<Server Start>>")
data = self.receive()
rval = ord(data[15])
if rval != 0:
# TWAMP setup request not accepted by server
log.critical("*** ERROR CODE %d in <<Server Start>>", rval)
self.nbrSessions = 0
def get_smtp_logging_handler(self):
if self.settings.get(self.smtp_key):
keys = ["smtp_host", "smtp_port", "smtp_user", "smtp_password"]
setting_keys = self.settings[self.smtp_key].keys()
missing_keys = list(filter(lambda x: x not in setting_keys, keys))
if not missing_keys:
handler = BufferingSMTPHandler(mailhost=self.settings[self.smtp_key]['smtp_host'],
mailport=self.settings[self.smtp_key]['smtp_port'],
fromaddr=self.settings[self.smtp_key]['smtp_user'],
toaddrs=self.settings[self.admin_emails],
subject='Error {} {}:{}'.format(self.settings[self.host_name_key],
self.settings[
self.service_name_key].upper(),
self.settings[self.service_version_key]),
capacity=1,
password=self.settings[self.smtp_key]['smtp_password'])
handler.setLevel(logging.ERROR)
if not self.settings[self.ronin_key]:
return handler
def process(args, state):
# send one email per run to leads, if any errors occurred
with mail_logs(args.email, toaddrs=args.email, subject='%s' % (state.subject), thresholdLevel=logging.ERROR) as leads_email:
if args.dryrun:
logging.warning("--dry-run is in effect, nothing will really be done")
state.packages = process_relarea(args)
if not state.packages:
return None
state.packages = process_uploads(args, state)
return state.packages
#
# remove stale packages
#
def __init__(self, fmt = None, datefmt = None):
logging.Formatter.__init__(self, fmt, datefmt)
# Color escape string
COLOR_RED='\033[1;31m'
COLOR_GREEN='\033[1;32m'
COLOR_YELLOW='\033[1;33m'
COLOR_BLUE='\033[1;34m'
COLOR_PURPLE='\033[1;35m'
COLOR_CYAN='\033[1;36m'
COLOR_GRAY='\033[1;37m'
COLOR_WHITE='\033[1;38m'
COLOR_RESET='\033[1;0m'
# Define log color
self.LOG_COLORS = {
'DEBUG': '%s',
'INFO': COLOR_GREEN + '%s' + COLOR_RESET,
'WARNING': COLOR_YELLOW + '%s' + COLOR_RESET,
'ERROR': COLOR_RED + '%s' + COLOR_RESET,
'CRITICAL': COLOR_RED + '%s' + COLOR_RESET,
'EXCEPTION': COLOR_RED + '%s' + COLOR_RESET,
}
def tolog(self, msg, level=None):
try:
level = level if level else self._level
level = str(level).lower()
level = self.get_map_level(level)
if level == logging.DEBUG:
self._logger.debug(msg)
if level == logging.INFO:
self._logger.info(msg)
if level == logging.WARN:
self._logger.warn(msg)
if level == logging.ERROR:
self._logger.error(msg)
if level == logging.CRITICAL:
self._logger.critical(msg)
except Exception as expt:
print expt
def report_unused_values(self, logger, optional_configs=None):
optional_configs = [] if optional_configs is None else optional_configs
has_error = False
for config in self.iter_configs():
messages = config.describe_unused_values()
if len(messages) > 0:
if config in optional_configs:
log_level = logging.WARNING
else:
log_level = logging.ERROR
has_error = True
for message in messages:
logger.log(log_level, message)
if has_error:
raise AssertionException('Detected unused keys that are not ignorable.')
def logged_sum_and_product(list_of_numbers):
v = value.Value(value=list_of_numbers)
s = apply.Apply(function=sum)
m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
b = buffers.Buffer()
logging.basicConfig(level=logging.ERROR)
p = printer.LogPrinter(logger=logging.getLogger(__name__),
loglevel=logging.ERROR)
g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])
g.connect(p, b, 'message')
g.connect(b, s, 'sum value')
g.connect(b, m, 'product value')
g.connect(s, v, 'argument')
g.connect(m, v, 'argument')
return g
def logged_sum_and_product(list_of_numbers):
v = value.Value(value=list_of_numbers)
s = apply.Apply(function=sum)
m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
b = buffers.Buffer()
logging.basicConfig(level=logging.ERROR)
p = printer.LogPrinter(logger=logging.getLogger(__name__),
loglevel=logging.ERROR)
g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])
g.connect(p, b, 'message')
g.connect(b, s, 'sum value')
g.connect(b, m, 'product value')
g.connect(s, v, 'argument')
g.connect(m, v, 'argument')
return g
def init_logging (logfile):
# Initialize the logging infra and add a handler
logger = logging.getLogger ("ydk")
logger.setLevel (logging.DEBUG)
# create file handler
fh = logging.FileHandler (logfile)
fh.setLevel (logging.DEBUG)
# create a console logger too
ch = logging.StreamHandler ()
ch.setLevel (logging.ERROR)
# add the handlers to the logger
logger.addHandler (fh)
logger.addHandler (ch)
def deprecated(self, removal_version, msg, *args, **kwargs):
"""
Logs deprecation message which is log level WARN if the
``removal_version`` is > 1 minor release away and log level ERROR
otherwise.
removal_version should be the version that the deprecated feature is
expected to be removed in, so something that will not exist in
version 1.7, but will in 1.6 would have a removal_version of 1.7.
"""
from pip import __version__
if should_warn(__version__, removal_version):
self.warn(msg, *args, **kwargs)
else:
self.error(msg, *args, **kwargs)
def __init__(self, appname, dllname=None, logtype="Application"):
logging.Handler.__init__(self)
try:
import win32evtlogutil, win32evtlog
self.appname = appname
self._welu = win32evtlogutil
if not dllname:
dllname = os.path.split(self._welu.__file__)
dllname = os.path.split(dllname[0])
dllname = os.path.join(dllname[0], r'win32service.pyd')
self.dllname = dllname
self.logtype = logtype
self._welu.AddSourceToRegistry(appname, dllname, logtype)
self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE,
logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE,
}
except ImportError:
print("The Python Win32 extensions for NT (service, event "\
"logging) appear not to be available.")
self._welu = None
def init_app(cls, app):
Config.init_app(app)
# email errors to the administrators
import logging
from logging.handlers import SMTPHandler
credentials = None
secure = None
if getattr(cls, 'MAIL_USERNAME', None) is not None:
credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
if getattr(cls, 'MAIL_USE_TLS', None):
secure = ()
mail_handler = SMTPHandler(
mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT),
fromaddr=cls.CIRCULATE_MAIL_SENDER,
toaddrs=[cls.CIRCULATE_ADMIN],
subject=cls.CIRCULATE_MAIL_SUBJECT_PREFIX + ' Application Error',
credentials=credentials,
secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
def format(self, record):
if self.debug and self.color:
if record.levelno >= logging.CRITICAL:
color = TEXT_RED
elif record.levelno >= logging.ERROR:
color = TEXT_RED
elif record.levelno >= logging.WARNING:
color = TEXT_YELLOW
elif record.levelno >= logging.INFO:
color = TEXT_GREEN
elif record.levelno >= logging.DEBUG:
color = TEXT_CYAN
else:
color = TEXT_NORMAL
record.levelname = "\x1b[%sm%s\x1b[%sm" % (color, record.levelname, TEXT_NORMAL)
return logging.Formatter.format(self, record)
def update_logging_settings(self, file_path=None, level=None, format=None):
"""
Update global logging. If None is set to the arguments, it will keep the previous setting.
Args:
file_path (str): It is Initialized to 'log.log'.
level (str): It can be 'error', 'warning' or 'info'. It is Initialized to 'error'.
format (str): It is Initialized to '%(asctime)s %(levelname)s %(message)s'.
"""
LOGGING_STRING_MAP = {'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR}
if file_path is not None:
self._logger_config['file_path'] = self._get_abs_path(file_path)
if level is not None:
self._logger_config['level'] = level
if format is not None:
self._logger_config['format'] = format
logger = logging.getLogger(Configuration.LOGGER_NAME)
log_file = logging.FileHandler(self._logger_config['file_path'])
logger.addHandler(log_file)
log_file.setFormatter(logging.Formatter(self._logger_config['format']))
logger.setLevel(LOGGING_STRING_MAP[self._logger_config['level']])
self._logger = logger
def get_logging_level(log_level):
logging_level = logging.INFO
if log_level == 'DEBUG':
logging_level = logging.DEBUG
elif log_level == 'INFO':
logging_level = logging.INFO
elif log_level == 'WARNING':
logging_level = logging.WARNING
elif log_level == 'ERROR':
logging_level = logging.ERROR
elif log_level == 'CRITICAL':
logging_level = logging.CRITICAL
else:
print('Unknown or unset logging level. Using INFO')
return logging_level
def _logWriter(self,level,message,exception=None):
self._logger.setLevel(level)
self._fh.setLevel(level)
self._ch.setLevel(level)
if(exception!=None):
exFormatted = self._formatException(exception)
msg = "%s%s" % (message,exFormatted)
if(level==logging.DEBUG):
logging.debug(msg)
elif(level==logging.INFO):
logging.info(msg)
elif(level==logging.WARN):
logging.warn(msg)
elif(level==logging.FATAL):
logging.fatal(msg)
if(level==logging.ERROR):
logging.error(msg)
def send_message(self, email_content):
# ???????
now = datetime.datetime.now()
header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \
str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']'
msg = MIMEText(email_content, 'plain', 'utf-8')
msg['from'] = self.smtp_from_addr
msg['to'] = self.smtp_to_addr
msg['Subject'] = Header(header, 'utf-8').encode()
# ??
try:
smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port)
smtp_server.login(self.smtp_from_addr, self.smtp_server_password)
smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string())
smtp_server.quit()
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.error("??????")
log.exception(e)
def check_and_restart(self):
for process_thread in self.processor_list:
if process_thread.thread_status == 'error':
thread_id = process_thread.thread_id
self.processor_list.remove(process_thread)
del process_thread
new_thread = ProcessThread(thread_id, self.redis_connection, self.token_filter,
self.response_buffer, self.is_parser_following_list,
self.is_parser_follower_list, self.is_parser_follow_relation)
self.processor_list.append(new_thread)
new_thread.start()
if log.isEnabledFor(logging.ERROR):
log.error('???????[' + thread_id + ']????')
# URL ??
def run(self):
try:
while True:
# ????????????
while is_scanning:
time.sleep(3)
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() > 0:
unchecked_proxy = unchecked_proxy_list.get()
is_available = self.dataValidateModule.validate_proxy_ip(unchecked_proxy)
if is_available is True:
proxy_pool.put(unchecked_proxy)
# print(unchecked_proxy)
time.sleep(1)
else:
time.sleep(5)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# ????????????????????
def run(self):
try:
while True:
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() < PROXY_POOL_SIZE:
self.fetch_and_parse_proxy()
elif proxy_pool.qsize() == PROXY_POOL_SIZE:
if log.isEnabledFor(logging.DEBUG):
log.debug('?????')
self.scan_proxy_pool()
time.sleep(PROXY_POOL_SCAN_INTERVAL)
else:
time.sleep(60)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# ?????????
def init_app(cls, app):
Config.init_app(app)
# email errors to the administrator.
credentials, secure= None, None
if getattr(cls, 'MAIL_USERNAME', None) is not None:
credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
if getattr(cls, 'MAIL_USE_TLS', None):
secure = ()
mail_handler = SMTPHandler(
mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT),
fromaddr=cls.MAIL_SENDER,
toaddrs=[cls.FLASK_ADMIN],
subject=cls.MAIL_SUBJECT_PREFIX + ' Application Error',
credentials=credentials,
secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
def test_poly_bands(self):
print('\ntesting test_poly_bands...')
mass = 0.25
self.model_params['poly_bands'] = [[[[0.0, 0.0, 0.0], [0.0, mass]]]]
amset = AMSET(calc_dir=self.GaAs_path,material_params=self.GaAs_params,
model_params=self.model_params,
performance_params=self.performance_params,
dopings=[-2e15], temperatures=[300], k_integration=True,
e_integration=True, fermi_type='k',
loglevel=logging.ERROR)
amset.run(self.GaAs_cube, kgrid_tp='coarse', write_outputs=False)
egrid = amset.egrid
diff = abs(np.array(amset.mobility['n']['ACD'][-2e15][300]) - \
np.array(egrid['n']['mobility']['SPB_ACD'][-2e15][300]))
avg = (amset.mobility['n']['ACD'][-2e15][300] + \
egrid['n']['mobility']['SPB_ACD'][-2e15][300]) / 2
self.assertTrue((diff / avg <= 0.01).all())
def __init__(self, log_level=ERROR):
"""Initialize the deployment environment."""
super(OpenStackAmuletUtils, self).__init__(log_level)
def __init__(self, log_level=logging.ERROR):
self.log = self.get_logger(level=log_level)
self.ubuntu_releases = self.get_ubuntu_releases()
def set_verbosity(verbose):
"""
A simple function one can use to set the verbosity of
the app.
"""
# Default
logging.getLogger(SQLALCHEMY_LOGGER).setLevel(logging.ERROR)
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_LOGGER).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_CLI).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.ERROR)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.ERROR)
# Handle Verbosity
if verbose > 0:
logging.getLogger(NEWSREAP_CLI).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.INFO)
if verbose > 1:
logging.getLogger(NEWSREAP_CLI).setLevel(logging.DEBUG)
logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.DEBUG)
logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.DEBUG)
if verbose > 2:
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.INFO)
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.INFO)
if verbose > 3:
logging.getLogger(NEWSREAP_CODEC).setLevel(logging.DEBUG)
if verbose > 4:
logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.DEBUG)
# set initial level to WARN.
def setup(app, version:str='undefined'):
environment = getattr(app.settings, 'ROLE', None) or os.environ.get('ROLE')
dsn = getattr(app.settings, 'SENTRY_DSN', None)
tags = getattr(app.settings, 'SENTRY_TAGS', None)
client = Client(dsn=dsn, transport=AioHttpTransport,
version=version, environment=environment,
tags=tags)
handler = DjaioSentryHandler(client=client)
handler.setLevel(logging.ERROR)
setup_logging(handler)
app.raven = client
def setup_logs(args):
"""
Initialize the api loggers.
Args:
args: dict containing the configuration options.
"""
flask_logging.create_logger = lambda app: use(app.logger_name)
if not args.get("debug", True):
set_level("werkzeug", logging.ERROR)
level = [logging.WARNING, logging.INFO, logging.DEBUG][
min(args.get("verbose", 1), 2)]
internal_error_log = ExceptionHandler()
internal_error_log.setLevel(logging.ERROR)
log.root.setLevel(level)
log.root.addHandler(internal_error_log)
if api.config.get_settings()["email"]["enable_email"]:
severe_error_log = SevereHandler()
severe_error_log.setLevel(logging.CRITICAL)
log.root.addHandler(severe_error_log)
stats_log = StatsHandler()
stats_log.setLevel(logging.INFO)
log.root.addHandler(stats_log)
def ConvertLogLevel( oldstyle_level ):
if oldstyle_level == 0 :
return CF.LogLevels.FATAL
if oldstyle_level == 1 :
return CF.LogLevels.ERROR
if oldstyle_level == 2 :
return CF.LogLevels.WARN
if oldstyle_level == 3 :
return CF.LogLevels.INFO
if oldstyle_level == 4 :
return CF.LogLevels.DEBUG
if oldstyle_level == 5 :
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def ConvertLevelNameToDebugLevel( level_name ):
if level_name == "OFF" : return 0
if level_name == "FATAL" : return 0
if level_name == "ERROR" : return 1
if level_name == "WARN" : return 2
if level_name == "INFO" : return 3
if level_name == "DEBUG" : return 4
if level_name == "TRACE": return 5
if level_name == "ALL" : return 5
return 3
def ConvertLevelNameToCFLevel( level_name ):
if level_name == "OFF" : return CF.LogLevels.OFF
if level_name == "FATAL" : return CF.LogLevels.FATAL
if level_name == "ERROR" : return CF.LogLevels.ERROR
if level_name == "WARN" : return CF.LogLevels.WARN
if level_name == "INFO" : return CF.LogLevels.INFO
if level_name == "DEBUG" : return CF.LogLevels.DEBUG
if level_name == "TRACE": return CF.LogLevels.TRACE
if level_name == "ALL" : return CF.LogLevels.ALL
return CF.LogLevels.INFO
def __init__(self, log_level=ERROR):
"""Initialize the deployment environment."""
super(OpenStackAmuletUtils, self).__init__(log_level)