Python logging 模块,INFO 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.INFO。
def create_logger():
"""
Setup the logging environment
"""
log = logging.getLogger() # root logger
log.setLevel(logging.INFO)
format_str = '%(asctime)s - %(levelname)-8s - %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
if HAVE_COLORLOG and os.isatty(2):
cformat = '%(log_color)s' + format_str
colors = {'DEBUG': 'reset',
'INFO': 'reset',
'WARNING': 'bold_yellow',
'ERROR': 'bold_red',
'CRITICAL': 'bold_red'}
formatter = colorlog.ColoredFormatter(cformat, date_format,
log_colors=colors)
else:
formatter = logging.Formatter(format_str, date_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
return logging.getLogger(__name__)
def init_logger(self, args):
level = logging.INFO
if args.verbose:
level = logging.VERBOSE
if args.debug:
level = logging.DEBUG
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s',
level=level)
Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10)
Rthandler.setLevel(level)
formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def setup_logging(log_level=logging.INFO):
"""Set up the logging."""
logging.basicConfig(level=log_level)
fmt = ("%(asctime)s %(levelname)s (%(threadName)s) "
"[%(name)s] %(message)s")
colorfmt = "%(log_color)s{}%(reset)s".format(fmt)
datefmt = '%Y-%m-%d %H:%M:%S'
# Suppress overly verbose logs from libraries that aren't helpful
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('aiohttp.access').setLevel(logging.WARNING)
try:
from colorlog import ColoredFormatter
logging.getLogger().handlers[0].setFormatter(ColoredFormatter(
colorfmt,
datefmt=datefmt,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
))
except ImportError:
pass
logger = logging.getLogger('')
logger.setLevel(log_level)
def setup_logging(verbose=0, colors=False, name=None):
"""Configure console logging. Info and below go to stdout, others go to stderr.
:param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build.
:param bool colors: Print color text in non-verbose mode.
:param str name: Which logger name to set handlers to. Used for testing.
"""
root_logger = logging.getLogger(name)
root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO)
formatter = ColorFormatter(verbose > 0, colors)
if colors:
colorclass.Windows.enable()
handler_stdout = logging.StreamHandler(sys.stdout)
handler_stdout.setFormatter(formatter)
handler_stdout.setLevel(logging.DEBUG)
handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)}))
root_logger.addHandler(handler_stdout)
handler_stderr = logging.StreamHandler(sys.stderr)
handler_stderr.setFormatter(formatter)
handler_stderr.setLevel(logging.WARNING)
root_logger.addHandler(handler_stderr)
def __init__(self, queue, DEBUG=config.DEBUG, reset=False, socksport=None):
if not socksport:
socksport = config.SOCKS_PORT
## TODO add checks that a socks proxy is even open
## TODO add Tor checks to make sure circuits are operating
threading.Thread.__init__(self)
self.reset = reset # Whether to check if a url has been collected
self.queue = queue # Multithreading queue of urls
self.proxysettings = [
'--proxy=127.0.0.1:%s' % socksport,
'--proxy-type=socks5',
]
#self.proxysettings = [] # DEBUG
#self.ignore_ssl = ['--ignore-ssl-errors=true', '--ssl-protocols=any']
self.ignore_ssl = []
self.service_args = self.proxysettings + self.ignore_ssl
self.failcount = 0 # Counts failures
self.donecount = 0 # Counts successes
self.tor = tor.tor() # Manages Tor via control port
if DEBUG: # PhantomJS sends a lot of data if debug set to DEBUG
logging.basicConfig(level=logging.INFO)
def init_logging(logfile):
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S' )
fh = logging.FileHandler(logfile)
# ch = logging.StreamHandler()
fh.setFormatter(formatter)
# ch.setFormatter(formatter)
# fh.setLevel(logging.INFO)
# ch.setLevel(logging.INFO)
# logging.getLogger().addHandler(ch)
logging.getLogger().addHandler(fh)
logging.getLogger().setLevel(logging.INFO)
return logging
# prepare logging.
def init_logger(logger_name):
# initialize logger
log = logging.getLogger(logger_name)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s"
" - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
return log
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
args = get_args()
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=os.path.join(args.outdir, "NanoQC.log"),
level=logging.INFO)
logging.info("NanoQC started.")
sizeRange = length_histogram(
fqin=gzip.open(args.fastq, 'rt'),
name=os.path.join(args.outdir, "SequenceLengthDistribution.png"))
fq = get_bin(gzip.open(args.fastq, 'rt'), sizeRange)
logging.info("Using {} reads for plotting".format(len(fq)))
fqbin = [dat[0] for dat in fq]
qualbin = [dat[1] for dat in fq]
logging.info("Creating plots...")
per_base_sequence_content_and_quality(fqbin, qualbin, args.outdir, args.format)
logging.info("per base sequence content and quality completed.")
logging.info("Finished!")
def init_logging(logfile, debug=True, level=None):
"""
Simple configuration of logging.
"""
if debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# allow user to override exact log_level
if level:
log_level = level
logging.basicConfig(level=log_level,
format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s',
filename=logfile,
filemode='a')
return logging.getLogger("circus")
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
# Set up a console logger.
console = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
kw = {}
longopts = ['domainname=', 'verbose']
opts, args = getopt.getopt(sys.argv[1:], 'v', longopts)
for opt, val in opts:
if opt == '--domainname':
kw['domainName'] = val
if opt in ['-v', '--verbose']:
kw['verbose'] = True
a = QApplication(sys.argv)
QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()"))
w = BrowseWindow(**kw)
w.show()
a.exec_()
def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ):
level = logging.INFO
if newLevel == CF.LogLevels.OFF :
level=logging.FATAL+1
if newLevel == CF.LogLevels.FATAL :
level=logging.FATAL
if newLevel == CF.LogLevels.ERROR :
level=logging.ERROR
if newLevel == CF.LogLevels.WARN :
level=logging.WARN
if newLevel == CF.LogLevels.INFO:
level=logging.INFO
if newLevel == CF.LogLevels.DEBUG:
level=logging.DEBUG
if newLevel == CF.LogLevels.TRACE:
level=logging.TRACE
if newLevel == CF.LogLevels.ALL:
level=logging.TRACE
return level
def __init__(self, resource=None ):
self._mgr_lock = threading.Lock()
self._ecm = None
self._logger = logging.getLogger("ossie.events.Manager")
self._logger.setLevel(logging.INFO)
self._allow = True
self._registrations=[]
if resource :
try:
self._logger.debug("Requesting Domain Manager Access....")
dom = resource.getDomainManager()
self._logger.debug("Requesting EventChannelManager Access....")
self._ecm = dom.getRef()._get_eventChannelMgr()
self._logger.debug("Acquired reference to EventChannelManager")
except:
#print traceback.format_exc()
self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ")
pass
def run(self, args=None, namespace=None):
options = self.parser.parse_args(args=args, namespace=namespace)
enable_pretty_logging()
logger = logging.getLogger(__name__)
# todo configure_logger() method ?
if options.debug:
logging.getLogger('root').setLevel(logging.INFO)
if options.verbose:
if options.verbose >= 1:
logging.getLogger('root').setLevel(logging.DEBUG)
if options.verbose >= 2:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG)
try:
handler = options.handler
except AttributeError as e:
if not callable(self.default_handler):
raise
handler = None
return (handler or self.default_handler)(logger, options)
def setup(name=__name__, level=logging.INFO):
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(level)
try:
# check if click exists to swap the logger
import click # noqa
formatter = ColorFormatter('[.] %(message)s')
except ImportError:
formatter = CustomFormatter('[.] %(message)s')
handler = logging.StreamHandler(None)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def __init__(self, appname, dllname=None, logtype="Application"):
logging.Handler.__init__(self)
try:
import win32evtlogutil, win32evtlog
self.appname = appname
self._welu = win32evtlogutil
if not dllname:
dllname = os.path.split(self._welu.__file__)
dllname = os.path.split(dllname[0])
dllname = os.path.join(dllname[0], r'win32service.pyd')
self.dllname = dllname
self.logtype = logtype
self._welu.AddSourceToRegistry(appname, dllname, logtype)
self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE,
logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE,
}
except ImportError:
print("The Python Win32 extensions for NT (service, event "\
"logging) appear not to be available.")
self._welu = None
def main():
parser = build_cli_parser("Grab all binaries from a Cb server")
parser.add_argument('-d', '--destdir', action='store', help='Destination directory to place the events',
default=os.curdir)
# TODO: we don't have a control on the "start" value in the query yet
# parser.add_argument('--start', action='store', dest='startvalue', help='Start from result number', default=0)
parser.add_argument('-v', action='store_true', dest='verbose', help='Enable verbose debugging messages',
default=False)
args = parser.parse_args()
cb = get_cb_response_object(args)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# startvalue = args.startvalue
startvalue = 0
return dump_all_binaries(cb, args.destdir, startvalue)
def __init__(self, session, api_key, service_key):
"""
:param session: The Flask requests object used to connect to PD
:param api_key: The PD read-only, V2 API key
:param service_key: The PD service name which is interrogated
"""
self._api_key = api_key
self._service_key = service_key
self.timezone = 'UTC'
logging.basicConfig(level=logging.INFO)
self._s = session
self._headers = {
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token=' + self._api_key
}
self._s.headers.update(self._headers)
def instantiate(p):
print("*** instantiate ***")
print(p)
with rlock:
global logger
logger = logging.getLogger("freepydius-logger")
logger.setLevel(logging.INFO)
handler = TimedRotatingFileHandler(_LOG_FILE,
when="midnight",
interval=1)
formatter = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
log = Log("INSTANCE")
log.log(( ('Response', 'created'), ))
# return 0 for success or -1 for failure
return 0
def __init__(self, apply_light_policy_interval = 10, device_detection_interval = 10, device_offline_delay = 10, logging_level = logging.INFO):
self.__yeelight_detection_thread = None
self.__device_detection_thread = None
self.__device_detection_thread_woker = {}
self.__device_detection_thread_rlock = threading.Lock()
self.__thread_rlock = threading.Lock()
self.__apply_light_policy_thread = None
self.__current_geo = None
self.__compiled_policy = []
self.__compiled_policy_date = None
self.__device_on_monitor = []
self.__device_online = []
self.__device_detection_interval = device_detection_interval
self.__apply_light_policy_interval = apply_light_policy_interval
self.__device_offline_delay = device_offline_delay
self.__config = {}
self.__RUNNING = False
# a few setups
self.register_signal_handler()
self.__setup_log(logging_level = logging_level)
self.__logger.info("Controller instance created")
def cli(ctx, registry, build_container_image, build_container_tag, build_container_net, verbose):
"""
Easily dockerize your Git repository
"""
logging_level = logging.DEBUG if verbose else logging.INFO
utils.configure_logging(name='skipper', level=logging_level)
ctx.obj['registry'] = registry
ctx.obj['build_container_image'] = build_container_image
ctx.obj['build_container_net'] = build_container_net
ctx.obj['git_revision'] = build_container_tag == 'git:revision'
ctx.obj['build_container_tag'] = git.get_hash() if ctx.obj['git_revision'] else build_container_tag
ctx.obj['env'] = ctx.default_map.get('env', {})
ctx.obj['containers'] = ctx.default_map.get('containers')
ctx.obj['volumes'] = ctx.default_map.get('volumes')
ctx.obj['workdir'] = ctx.default_map.get('workdir')
ctx.obj['container_context'] = ctx.default_map.get('container_context')
def __init__(self, args, logger=None, mode=None):
self.commandLine = CommandLine(args)
if mode is not None:
self.commandLine.mode = mode
if logger is None:
logging.basicConfig(format="%(asctime)-15s %(levelname)s [%(filename)s:%(lineno)d-%(thread)d] %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
self.logger = logger
self.mode = self.__detectMode(self.commandLine.mode)
self.config = self.__loadConfig()
self.inventory = Inventory(logger, self.config)
self.commands = {
'migrate': MigrateCommand(self),
'list-migrations': ListMigrationsCommand(self),
'version': VersionCommand(self),
'help':HelpCommand(self) }
self.defaultCommand = HelpCommand(self)
def log(self, message, level=logging.DEBUG, depth=0):
"""Prepend string to log messages to denote class."""
if depth <= 0:
prefix = 'AmazonAccountUtils: '
else:
prefix = "\t" * depth
if level == CRITICAL:
self.logger.critical(prefix + str(message))
elif level == ERROR:
self.logger.error(prefix + str(message))
elif level == WARNING:
self.logger.warning(prefix + str(message))
elif level == INFO:
self.logger.info(prefix + str(message))
else:
self.logger.debug(prefix + str(message))
def test_missing_variable(self):
"""Test if ``WriteTensorBoard`` handles missing image variables as expected."""
bad_epoch_data = {'valid': {}}
with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
# test ignore
hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='ignore')
with LogCapture(level=logging.INFO) as log_capture:
hook.after_epoch(42, bad_epoch_data)
log_capture.check()
# test warn
warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='warn')
with LogCapture(level=logging.INFO) as log_capture2:
warn_hook.after_epoch(42, bad_epoch_data)
log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))
# test error
raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='error')
with self.assertRaises(KeyError):
raise_hook.after_epoch(42, bad_epoch_data)
def main(args=None):
from fontTools import configLogger
if args is None:
args = sys.argv[1:]
options = Options()
args = options.parse_opts(args)
if len(args) < 1:
print("usage: pyftmerge font...", file=sys.stderr)
return 1
configLogger(level=logging.INFO if options.verbose else logging.WARNING)
if options.timing:
timer.logger.setLevel(logging.DEBUG)
else:
timer.logger.disabled = True
merger = Merger(options=options)
font = merger.merge(args)
outfile = 'merged.ttf'
with timer("compile and save font"):
font.save(outfile)
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.info("LEDBAT TEST SINK starting")
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(PeerProtocol, local_addr=("0.0.0.0", 6778))
transport, protocol = loop.run_until_complete(listen)
if os.name == 'nt':
def wakeup():
# Call again later
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
def main(args):
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.info("LEDBAT TEST SOURCE starting. Target: {}".format(args.target_ip))
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(lambda: PeerProtocol(args), local_addr=("0.0.0.0", 6778))
transport, protocol = loop.run_until_complete(listen)
if os.name == 'nt':
def wakeup():
# Call again later
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
def setup(args, pipeline, runmod, injector):
"""Load configuration"""
logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
_globals['gransk'] = gransk.api.API(injector)
_globals['config'] = _globals['gransk'].config
if pipeline:
_globals['gransk'].pipeline = pipeline
if _globals['gransk'].pipeline.get_service('related_entities'):
_globals['gransk'].pipeline.get_service('related_entities').load_all(_globals['config'])
if _globals['gransk'].pipeline.get_service('related_documents'):
_globals['gransk'].pipeline.get_service('related_documents').load_all(_globals['config'])
def __init__(self, debug=False, logfile=None):
logging.Logger.__init__(self, 'VirtualBMC')
try:
if logfile is not None:
self.handler = logging.FileHandler(logfile)
else:
self.handler = logging.StreamHandler()
formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
self.handler.setFormatter(formatter)
self.addHandler(self.handler)
if debug:
self.setLevel(logging.DEBUG)
else:
self.setLevel(logging.INFO)
except IOError, e:
if e.errno == errno.EACCES:
pass
def configure_logging(debug):
'''Sets the data kennel logger to appropriate levels of chattiness.'''
default_logger = logging.getLogger('')
datadog_logger = logging.getLogger('datadog.api')
requests_logger = logging.getLogger('requests')
if debug:
default_logger.setLevel(logging.DEBUG)
datadog_logger.setLevel(logging.INFO)
requests_logger.setLevel(logging.INFO)
else:
default_logger.setLevel(logging.INFO)
datadog_logger.setLevel(logging.WARNING)
requests_logger.setLevel(logging.WARNING)
stream_handler = logging.StreamHandler(sys.__stdout__)
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s'))
default_logger.addHandler(stream_handler)
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def parse_args():
""" Parse the command line arguments """
parser = argparse.ArgumentParser(
description="Integrate Hugo and PhotoSwipe")
parser.add_argument('-v', '--verbose', help="Verbose mode",
action="store_const", dest="loglevel", const=logging.INFO,
default=logging.WARNING)
parser.add_argument('-f', '--fast', action="store_true", help=('Fast mode '
'(tries less potential crops)'))
parser.add_argument('command', choices=['new', 'update', 'clean', 'init'],
help="action to do")
parser.add_argument('album', nargs='?',
help="album to apply the action to")
args = parser.parse_args()
logging.basicConfig(level=args.loglevel, datefmt="[%Y-%m-%d %H:%M:%S]",
format="%(asctime)s - %(message)s")
settings.verbose = args.loglevel == logging.INFO
settings.fast = args.fast
return args.command, args.album
def getLogger(self):
''' Initialize and load log handlers '''
logger = logging.getLogger(self.proc_name)
logger.setLevel(logging.INFO)
if "debug" in self.config['logging']:
if self.config['logging']['debug']:
logger.setLevel(logging.DEBUG)
# Load and add a handler for each logging mechanism
for loghandler in self.config['logging']['plugins'].keys():
plugin = __import__("plugins.logging." + loghandler, globals(),
locals(), ['Logger'], -1)
lh = plugin.Logger(config=self.config, proc_name=self.proc_name)
logger.addHandler(lh.setup())
return logger
def main(argv=None):
args = parse_arguments(argv)
if args['very_verbose']:
logging.basicConfig(level=logging.DEBUG)
elif args['verbose']:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig()
del args['verbose']
del args['very_verbose']
sc = SparkContext(appName="MLR: data collection pipeline")
# spark info logging is incredibly spammy. Use warn to have some hope of
# human decipherable output
sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
run_pipeline(sc, sqlContext, **args)
def parse_arguments(argv):
parser = argparse.ArgumentParser(description='...')
parser.add_argument(
'-b', '--brokers', dest='brokers', required=True, type=lambda x: x.split(','),
help='Kafka brokers to bootstrap from as a comma separated list of <host>:<port>')
parser.add_argument(
'-m', '--max-request-size', dest='max_request_size', type=int, default=4*1024*1024*10,
help='Max size of requets sent to the kafka broker'
+ 'Defaults to 40MB.')
parser.add_argument(
'-w', '--num-workers', dest='n_workers', type=int, default=5,
help='Number of workers to issue elasticsearch queries in parallel. '
+ 'Defaults to 5.')
parser.add_argument(
'-v', '--verbose', dest='verbose', default=False, action='store_true',
help='Increase logging to INFO')
parser.add_argument(
'-vv', '--very-verbose', dest='very_verbose', default=False, action='store_true',
help='Increase logging to DEBUG')
args = parser.parse_args(argv)
return dict(vars(args))
def __init__(self, ip_address, user, password=None, key_filename=None):
self.ip_address = ip_address
self.user = user
self.password = password
self.key_filename = key_filename
self.connected = False
self.shell = None
self.logger.setLevel(logging.INFO)
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.util = Utilvnf()
with open(self.util.test_env_config_yaml) as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
self.ssh_revieve_buff = test_env_config_yaml.get("general").get(
"ssh_receive_buffer")
def filter(self, rec):
"""
filter a record, adding the colors automatically
* error: red
* warning: yellow
:param rec: message to record
"""
rec.zone = rec.module
if rec.levelno >= logging.INFO:
return True
m = re_log.match(rec.msg)
if m:
rec.zone = m.group(1)
rec.msg = m.group(2)
if zones:
return getattr(rec, 'zone', '') in zones or '*' in zones
elif not verbose > 2:
return False
return True
def filter(self, rec):
"""
filter a record, adding the colors automatically
* error: red
* warning: yellow
:param rec: message to record
"""
rec.zone = rec.module
if rec.levelno >= logging.INFO:
return True
m = re_log.match(rec.msg)
if m:
rec.zone = m.group(1)
rec.msg = m.group(2)
if zones:
return getattr(rec, 'zone', '') in zones or '*' in zones
elif not verbose > 2:
return False
return True
def filter(self, rec):
"""
filter a record, adding the colors automatically
* error: red
* warning: yellow
:param rec: message to record
"""
rec.zone = rec.module
if rec.levelno >= logging.INFO:
return True
m = re_log.match(rec.msg)
if m:
rec.zone = m.group(1)
rec.msg = m.group(2)
if zones:
return getattr(rec, 'zone', '') in zones or '*' in zones
elif not verbose > 2:
return False
return True
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
args = parse_args()
logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO))
# Don't allow more than 10 concurrent requests to the wayback machine
concurrency = min(args.concurrency, 10)
# Scrape results are stored in a temporary folder if no folder specified
target_folder = args.target_folder if args.target_folder else tempfile.gettempdir()
logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder))
# Parse the period entered by the user (throws an exception if the dates are not correctly formatted)
from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT)
to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT)
# The scraper downloads the elements matching the given xpath expression in the target folder
scraper = Scraper(target_folder, args.xpath)
# Launch the scraping using the scraper previously instantiated
scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta),
concurrency)
def init_logging(logfile):
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S' )
fh = logging.FileHandler(logfile)
# ch = logging.StreamHandler()
fh.setFormatter(formatter)
# ch.setFormatter(formatter)
# fh.setLevel(logging.INFO)
# ch.setLevel(logging.INFO)
# logging.getLogger().addHandler(ch)
logging.getLogger().addHandler(fh)
logging.getLogger().setLevel(logging.INFO)
return logging
# prepare logging.
def init_logging(logfile):
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S' )
fh = logging.FileHandler(logfile)
# ch = logging.StreamHandler()
fh.setFormatter(formatter)
# ch.setFormatter(formatter)
# fh.setLevel(logging.INFO)
# ch.setLevel(logging.INFO)
# logging.getLogger().addHandler(ch)
logging.getLogger().addHandler(fh)
logging.getLogger().setLevel(logging.INFO)
return logging
# prepare logging.
def init_logging(logfile):
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S' )
fh = logging.FileHandler(logfile)
# ch = logging.StreamHandler()
fh.setFormatter(formatter)
# ch.setFormatter(formatter)
# fh.setLevel(logging.INFO)
# ch.setLevel(logging.INFO)
# logging.getLogger().addHandler(ch)
logging.getLogger().addHandler(fh)
logging.getLogger().setLevel(logging.INFO)
return logging
# prepare logging.