Python logging 模块,basicConfig() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.basicConfig()。
def init_logger(self, args):
level = logging.INFO
if args.verbose:
level = logging.VERBOSE
if args.debug:
level = logging.DEBUG
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s',
level=level)
Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10)
Rthandler.setLevel(level)
formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def init(self):
import DataStore, readconf, logging, sys
self.conf.update({ "debug": None, "logging": None })
self.conf.update(DataStore.CONFIG_DEFAULTS)
args, argv = readconf.parse_argv(self.argv, self.conf, strict=False)
if argv and argv[0] in ('-h', '--help'):
print self.usage()
return None, []
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, format="%(message)s")
if args.logging is not None:
import logging.config as logging_config
logging_config.dictConfig(args.logging)
store = DataStore.new(args)
return store, argv
# Abstract hex-binary conversions for eventual porting to Python 3.
def setup_logging(log_level=logging.INFO):
"""Set up the logging."""
logging.basicConfig(level=log_level)
fmt = ("%(asctime)s %(levelname)s (%(threadName)s) "
"[%(name)s] %(message)s")
colorfmt = "%(log_color)s{}%(reset)s".format(fmt)
datefmt = '%Y-%m-%d %H:%M:%S'
# Suppress overly verbose logs from libraries that aren't helpful
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('aiohttp.access').setLevel(logging.WARNING)
try:
from colorlog import ColoredFormatter
logging.getLogger().handlers[0].setFormatter(ColoredFormatter(
colorfmt,
datefmt=datefmt,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
))
except ImportError:
pass
logger = logging.getLogger('')
logger.setLevel(log_level)
def main():
"""Entry point for creating an application specific security group"""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
spinnakerapps = SpinnakerSecurityGroup(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
spinnakerapps.create_security_group()
def __init__(self, queue, DEBUG=config.DEBUG, reset=False, socksport=None):
if not socksport:
socksport = config.SOCKS_PORT
## TODO add checks that a socks proxy is even open
## TODO add Tor checks to make sure circuits are operating
threading.Thread.__init__(self)
self.reset = reset # Whether to check if a url has been collected
self.queue = queue # Multithreading queue of urls
self.proxysettings = [
'--proxy=127.0.0.1:%s' % socksport,
'--proxy-type=socks5',
]
#self.proxysettings = [] # DEBUG
#self.ignore_ssl = ['--ignore-ssl-errors=true', '--ssl-protocols=any']
self.ignore_ssl = []
self.service_args = self.proxysettings + self.ignore_ssl
self.failcount = 0 # Counts failures
self.donecount = 0 # Counts successes
self.tor = tor.tor() # Manages Tor via control port
if DEBUG: # PhantomJS sends a lot of data if debug set to DEBUG
logging.basicConfig(level=logging.INFO)
def runspider(name):
configure_logging(install_root_handler=False)
logging.basicConfig(
filename='log/%s.log' % name,
format='%(levelname)s %(asctime)s: %(message)s',
level=logging.DEBUG
)
process = CrawlerProcess(get_project_settings())
try:
logging.info('runspider start spider:%s' % name)
process.crawl(name)
process.start()
except Exception as e:
logging.exception('runspider spider:%s exception:%s' % (name, e))
logging.debug('finish this spider:%s\n\n' % name)
def main():
args = get_args()
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=os.path.join(args.outdir, "NanoQC.log"),
level=logging.INFO)
logging.info("NanoQC started.")
sizeRange = length_histogram(
fqin=gzip.open(args.fastq, 'rt'),
name=os.path.join(args.outdir, "SequenceLengthDistribution.png"))
fq = get_bin(gzip.open(args.fastq, 'rt'), sizeRange)
logging.info("Using {} reads for plotting".format(len(fq)))
fqbin = [dat[0] for dat in fq]
qualbin = [dat[1] for dat in fq]
logging.info("Creating plots...")
per_base_sequence_content_and_quality(fqbin, qualbin, args.outdir, args.format)
logging.info("per base sequence content and quality completed.")
logging.info("Finished!")
def main():
"""
Simple private telegram bot example.
"""
# Set up logging to log to stdout
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
updater = Updater(TOKEN)
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler("start", start_handler))
# Enable admin commands for this bot
AdminCommands(dispatcher)
updater.start_polling()
updater.idle()
def init_logging(logfile, debug=True, level=None):
"""
Simple configuration of logging.
"""
if debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# allow user to override exact log_level
if level:
log_level = level
logging.basicConfig(level=log_level,
format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s',
filename=logfile,
filemode='a')
return logging.getLogger("circus")
def main():
parser = build_cli_parser("Grab all binaries from a Cb server")
parser.add_argument('-d', '--destdir', action='store', help='Destination directory to place the events',
default=os.curdir)
# TODO: we don't have a control on the "start" value in the query yet
# parser.add_argument('--start', action='store', dest='startvalue', help='Start from result number', default=0)
parser.add_argument('-v', action='store_true', dest='verbose', help='Enable verbose debugging messages',
default=False)
args = parser.parse_args()
cb = get_cb_response_object(args)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# startvalue = args.startvalue
startvalue = 0
return dump_all_binaries(cb, args.destdir, startvalue)
def startRPC(self, port, eventListenerPort):
logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG)
reqOS = ["darwin", "win32", "windows", "linux"]
reqArch = ["x64", "ia32"]
cfg = launcher.Config(reqOS, reqArch)
launcherLocal = launcher.launcher()
# define log file name for rpc agent, so e.g
# for "runConsumerOWP.py" it will be: "rpc-wpwithin-runConsumerOWP.log"
logfilename = os.path.basename(sys.argv[0])
logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log"
args = []
if eventListenerPort > 0:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort))
args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info', '-callbackport', str(eventListenerPort)]
else:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info")
args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info']
process = launcherLocal.launch(cfg, os.getcwd() + "", args)
return process
def __init__(self, session, api_key, service_key):
"""
:param session: The Flask requests object used to connect to PD
:param api_key: The PD read-only, V2 API key
:param service_key: The PD service name which is interrogated
"""
self._api_key = api_key
self._service_key = service_key
self.timezone = 'UTC'
logging.basicConfig(level=logging.INFO)
self._s = session
self._headers = {
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token=' + self._api_key
}
self._s.headers.update(self._headers)
def main():
"""Run newer stuffs."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
parser.add_argument("--elb-subnet", help="Subnetnet type, e.g. external, internal", required=True)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
spinnakerapps = SpinnakerDns(
app=args.app, env=args.env, region=args.region, prop_path=args.properties, elb_subnet=args.elb_subnet)
spinnakerapps.create_elb_dns()
def main():
"""Destroy any DNS related resources of an application
Records in any Hosted Zone for an Environment will be deleted.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert destroy_dns(**vars(args))
def main():
"""Command to create IAM Instance Profiles, Roles, Users, and Groups.
IAM Roles will retain any attached Managed Policies. Inline Policies that do
not match the name *iam-project_repo_policy* will also be left untouched.
**WARNING**: Inline Policies named *iam-project_repo_policy* will be
rewritten.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert create_iam_resources(**args.__dict__)
def main():
"""Append Application Configurations to a given file in multiple formats."""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
parser.add_argument('-o', '--output', required=True, help='Name of environment file to append to')
parser.add_argument(
'-g', '--git-short', metavar='GROUP/PROJECT', required=True, help='Short name for Git, e.g. forrest/core')
parser.add_argument('-r', '--runway-dir', help='Runway directory with app.json files, requires --git-short')
args = parser.parse_args()
LOG.setLevel(args.debug)
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
generated = gogoutils.Generator(*gogoutils.Parser(args.git_short).parse_url(), formats=APP_FORMATS)
git_short = generated.gitlab()['main']
if args.runway_dir:
configs = process_runway_configs(runway_dir=args.runway_dir)
else:
configs = process_git_configs(git_short=git_short)
write_variables(app_configs=configs, out_file=args.output, git_short=git_short)
def main():
"""CLI entrypoint for scaling policy creation"""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_properties(parser)
add_env(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
asgpolicy = AutoScalingPolicy(app=args.app, prop_path=args.properties, env=args.env, region=args.region)
asgpolicy.create_policy()
def main():
"""Entry point for ELB creation"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description='Example with non-optional arguments')
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
elb = SpinnakerELB(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
elb.create_elb()
def main():
"""Send Slack notification to a configured channel."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split(".")[0]).setLevel(args.debug)
log.debug('Parsed arguements: %s', args)
if "prod" not in args.env:
log.info('No slack message sent, not a production environment')
else:
log.info("Sending slack message, production environment")
slacknotify = SlackNotification(app=args.app, env=args.env, prop_path=args.properties)
slacknotify.post_message()
def main():
"""Create Lambda events."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
lambda_function = LambdaFunction(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
lambda_function.create_lambda_function()
lambda_event = LambdaEvent(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
lambda_event.create_lambda_events()
def main():
"""Create any API Gateway event related resources."""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
apigateway = APIGateway(**vars(args))
apigateway.setup_lambda_api()
def __init__(self, args, logger=None, mode=None):
self.commandLine = CommandLine(args)
if mode is not None:
self.commandLine.mode = mode
if logger is None:
logging.basicConfig(format="%(asctime)-15s %(levelname)s [%(filename)s:%(lineno)d-%(thread)d] %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
self.logger = logger
self.mode = self.__detectMode(self.commandLine.mode)
self.config = self.__loadConfig()
self.inventory = Inventory(logger, self.config)
self.commands = {
'migrate': MigrateCommand(self),
'list-migrations': ListMigrationsCommand(self),
'version': VersionCommand(self),
'help':HelpCommand(self) }
self.defaultCommand = HelpCommand(self)
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.info("LEDBAT TEST SINK starting")
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(PeerProtocol, local_addr=("0.0.0.0", 6778))
transport, protocol = loop.run_until_complete(listen)
if os.name == 'nt':
def wakeup():
# Call again later
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
def main(args):
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.info("LEDBAT TEST SOURCE starting. Target: {}".format(args.target_ip))
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(lambda: PeerProtocol(args), local_addr=("0.0.0.0", 6778))
transport, protocol = loop.run_until_complete(listen)
if os.name == 'nt':
def wakeup():
# Call again later
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
def setup(args, pipeline, runmod, injector):
"""Load configuration"""
logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
_globals['gransk'] = gransk.api.API(injector)
_globals['config'] = _globals['gransk'].config
if pipeline:
_globals['gransk'].pipeline = pipeline
if _globals['gransk'].pipeline.get_service('related_entities'):
_globals['gransk'].pipeline.get_service('related_entities').load_all(_globals['config'])
if _globals['gransk'].pipeline.get_service('related_documents'):
_globals['gransk'].pipeline.get_service('related_documents').load_all(_globals['config'])
def __init__(self, file_prefix):
'''
Events files have a name of the form
'/some/file/path/events.out.tfevents.[timestamp].[hostname]'
'''
self._file_prefix = file_prefix + ".out.tfevents." \
+ str(time.time())[:10] + "." + socket.gethostname()
# Open(Create) the log file with the particular form of name.
logging.basicConfig(filename=self._file_prefix)
self._num_outstanding_events = 0
self._py_recordio_writer = RecordWriter(self._file_prefix)
# Initialize an event instance.
self._event = event_pb2.Event()
self._event.wall_time = time.time()
self.write_event(self._event)
def main():
print("see log scrape.log")
if os.path.isfile("scrape.log"):
os.remove("scrape.log")
log.basicConfig(filename="scrape.log",
format='%(asctime)s %(levelname)s %(message)s',
level=log.DEBUG)
try:
log.debug("main() full scrape will take 5-10 minutes")
cards, tokens = loadJsonCards()
saveCardsAsJson("data/cards.json", loadSets(allcards=cards))
# a lot of token names are not unique
# a static, handmade list of ids is more reliable
if os.path.isfile('data/tokenlist.json'):
with open('data/tokenlist.json', 'r', encoding='utf8') as f:
saveCardsAsJson("data/tokens.json", loadTokens(tokens, json.load(f)))
except Exception as e:
log.exception("main() error %s", e)
def __init__(self, debug=False):
"""
Constructor of the Application.
:param debug: Sets the logging level of the application
:raises NotImplementedError: When ``Application.base_title``
not set in the class definition.
"""
self.debug = debug
loglevel = logging.DEBUG if debug else logging.WARNING
logging.basicConfig(
format='%(asctime)s - [%(levelname)s] %(message)s', datefmt='%I:%M:%S %p', level=loglevel)
self.processor = EventProcessor()
self.server = EventServer(processor=self.processor)
if self.base_title is None:
raise NotImplementedError
self.services = {}
self.views = {}
self.current_view = None
self.register('init', lambda evt,
interface: self._load_view('default'))
def parse_args():
""" Parse the command line arguments """
parser = argparse.ArgumentParser(
description="Integrate Hugo and PhotoSwipe")
parser.add_argument('-v', '--verbose', help="Verbose mode",
action="store_const", dest="loglevel", const=logging.INFO,
default=logging.WARNING)
parser.add_argument('-f', '--fast', action="store_true", help=('Fast mode '
'(tries less potential crops)'))
parser.add_argument('command', choices=['new', 'update', 'clean', 'init'],
help="action to do")
parser.add_argument('album', nargs='?',
help="album to apply the action to")
args = parser.parse_args()
logging.basicConfig(level=args.loglevel, datefmt="[%Y-%m-%d %H:%M:%S]",
format="%(asctime)s - %(message)s")
settings.verbose = args.loglevel == logging.INFO
settings.fast = args.fast
return args.command, args.album
def create_logger():
#logging.basicConfig(format='%(levelname)s - %(message)s')
logging.basicConfig(format='%(message)s')
root = logging.getLogger()
root.setLevel(logging.getLevelName('INFO'))
#Add handler for standard output (console) any debug+
#ch = logging.StreamHandler(sys.stdout)
#ch.setLevel(logging.getLevelName('DEBUG'))
#formatter = logging.Formatter('%(message)s')
#ch.setFormatter(formatter)
#handler = ColorStreamHandler()
#handler.setLevel(logging.getLevelName("DEBUG"))
#root.addHandler(handler)
return root
def main(argv=None):
args = parse_arguments(argv)
if args['very_verbose']:
logging.basicConfig(level=logging.DEBUG)
elif args['verbose']:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig()
del args['verbose']
del args['very_verbose']
sc = SparkContext(appName="MLR: data collection pipeline")
# spark info logging is incredibly spammy. Use warn to have some hope of
# human decipherable output
sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
run_pipeline(sc, sqlContext, **args)
def main():
"""Entry point"""
logging.basicConfig()
odl = ODLTests()
parser = ODLParser()
args = parser.parse_args(sys.argv[1:])
try:
result = odl.run_suites(ODLTests.default_suites, **args)
if result != robotframework.RobotFramework.EX_OK:
return result
if args['pushtodb']:
return odl.push_to_db()
else:
return result
except Exception: # pylint: disable=broad-except
return robotframework.RobotFramework.EX_RUN_ERROR
def main():
args = parse_args()
logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO))
# Don't allow more than 10 concurrent requests to the wayback machine
concurrency = min(args.concurrency, 10)
# Scrape results are stored in a temporary folder if no folder specified
target_folder = args.target_folder if args.target_folder else tempfile.gettempdir()
logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder))
# Parse the period entered by the user (throws an exception if the dates are not correctly formatted)
from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT)
to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT)
# The scraper downloads the elements matching the given xpath expression in the target folder
scraper = Scraper(target_folder, args.xpath)
# Launch the scraping using the scraper previously instantiated
scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta),
concurrency)
def log_to_console(level=logging.WARNING, override_root_logger=False, **kwargs):
"""
Configure the logging system to send log entries to the console.
Note that the root logger will not log to Seq by default.
:param level: The minimum level at which to log.
:param override_root_logger: Override the root logger, too?
Note - this might cause problems if third-party components try to be clever
when using the logging.XXX functions.
"""
logging.setLoggerClass(StructuredLogger)
if override_root_logger:
_override_root_logger()
logging.basicConfig(
style='{',
handlers=[
ConsoleStructuredLogHandler()
],
level=level,
**kwargs
)
def run(self, result=None):
logger = logging.getLogger()
if not logger.handlers:
logging.basicConfig()
handler = logger.handlers[0]
if (len(logger.handlers) > 1 or
not isinstance(handler, logging.StreamHandler)):
# Logging has been configured in a way we don't recognize,
# so just leave it alone.
super(LogTrapTestCase, self).run(result)
return
old_stream = handler.stream
try:
handler.stream = StringIO()
gen_log.info("RUNNING TEST: " + str(self))
old_error_count = len(result.failures) + len(result.errors)
super(LogTrapTestCase, self).run(result)
new_error_count = len(result.failures) + len(result.errors)
if new_error_count != old_error_count:
old_stream.write(handler.stream.getvalue())
finally:
handler.stream = old_stream
def run(self, result=None):
logger = logging.getLogger()
if not logger.handlers:
logging.basicConfig()
handler = logger.handlers[0]
if (len(logger.handlers) > 1 or
not isinstance(handler, logging.StreamHandler)):
# Logging has been configured in a way we don't recognize,
# so just leave it alone.
super(LogTrapTestCase, self).run(result)
return
old_stream = handler.stream
try:
handler.stream = StringIO()
gen_log.info("RUNNING TEST: " + str(self))
old_error_count = len(result.failures) + len(result.errors)
super(LogTrapTestCase, self).run(result)
new_error_count = len(result.failures) + len(result.errors)
if new_error_count != old_error_count:
old_stream.write(handler.stream.getvalue())
finally:
handler.stream = old_stream
def run(self, result=None):
logger = logging.getLogger()
if not logger.handlers:
logging.basicConfig()
handler = logger.handlers[0]
if (len(logger.handlers) > 1 or
not isinstance(handler, logging.StreamHandler)):
# Logging has been configured in a way we don't recognize,
# so just leave it alone.
super(LogTrapTestCase, self).run(result)
return
old_stream = handler.stream
try:
handler.stream = StringIO()
gen_log.info("RUNNING TEST: " + str(self))
old_error_count = len(result.failures) + len(result.errors)
super(LogTrapTestCase, self).run(result)
new_error_count = len(result.failures) + len(result.errors)
if new_error_count != old_error_count:
old_stream.write(handler.stream.getvalue())
finally:
handler.stream = old_stream
def __init__(self):
date_time_name = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(filename=date_time_name + '.log', level=logging.INFO)
path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
self.config = configparser.ConfigParser()
self.config.read(os.path.join(path, "configuration.txt"))
self.sleep_time = int(self.config.get("settings", "time_between_retweets"))
self.search_term = self.config.get("settings", "search_query")
self.tweet_language = self.config.get("settings", "tweet_language")
self.max_age_in_minutes = int(self.config.get("settings", "max_age_in_minutes"))
self.last_id_file = self.build_save_point()
self.savepoint = self.retrieve_save_point(self.last_id_file)
auth = tweepy.OAuthHandler(self.config.get("twitter", "consumer_key"), self.config.
get("twitter", "consumer_secret"))
auth.set_access_token(self.config.get("twitter", "access_token"), self.config.
get("twitter", "access_token_secret"))
self.api = tweepy.API(auth)
def setup_logging(
default_path='logging.ini',
default_level=logging.INFO,
env_key='LOG_CFG'
):
"""Setup logging configuration
"""
path = default_path
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
logging.config.fileConfig(default_path)
else:
logging.basicConfig(level=default_level)
def setup_custom_logger(name, testname):
log_format = "%(asctime)s [%(levelname)s]: >> %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO)
logger = logging.getLogger(name)
for h in list(logger.handlers):
logger.removeHandler(h)
consoleHandler = logging.StreamHandler()
logFormatter = logging.Formatter(log_format)
logFormatter._fmt = testname + " -- " + logFormatter._fmt
consoleHandler.setFormatter(logFormatter)
logger.addHandler(consoleHandler)
logging.getLogger(name).addHandler(consoleHandler)
logger.propagate = False
return logger
def setup_logging(debug=False, os_info=True):
if os.environ.get("LOLVRSPECTATE_DEBUG") == "1":
debug = True
if not debug:
format_ = '%(asctime)-15s || %(message)s'
logging.basicConfig(filename="LoLVRSpectate.log", format=format_, level=logging.INFO, filemode="w")
logging.getLogger().addHandler(logging.StreamHandler()) # Log both to file and console
else:
logging.basicConfig(level=logging.DEBUG)
if os_info:
logging.info("Win platform = {}".format(platform.platform()))
if 'PROGRAMFILES(X86)' in os.environ:
logging.info("System Arch = {}".format("64 bit"))
else:
logging.info("System Arch = {}".format("32 bit"))
logging.info("Python version = {}".format(sys.version))
logging.info("VorpX exclusion = {}".format(is_excluded()))
def logged_sum_and_product(list_of_numbers):
v = value.Value(value=list_of_numbers)
s = apply.Apply(function=sum)
m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
b = buffers.Buffer()
logging.basicConfig(level=logging.ERROR)
p = printer.LogPrinter(logger=logging.getLogger(__name__),
loglevel=logging.ERROR)
g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])
g.connect(p, b, 'message')
g.connect(b, s, 'sum value')
g.connect(b, m, 'product value')
g.connect(s, v, 'argument')
g.connect(m, v, 'argument')
return g
def logged_sum_and_product(list_of_numbers):
v = value.Value(value=list_of_numbers)
s = apply.Apply(function=sum)
m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
b = buffers.Buffer()
logging.basicConfig(level=logging.ERROR)
p = printer.LogPrinter(logger=logging.getLogger(__name__),
loglevel=logging.ERROR)
g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])
g.connect(p, b, 'message')
g.connect(b, s, 'sum value')
g.connect(b, m, 'product value')
g.connect(s, v, 'argument')
g.connect(m, v, 'argument')
return g
def test_dispatch_should_trigger_intent_and_default():
logging.basicConfig(level=logging.DEBUG)
bot = MockBot()
intent_slots = fixture_intent_slots()
state = IntentState(bot, intent_slots)
dispatcher = DefaultDispatcher(bot, state)
dispatcher.dispatch({'content': '/intent credentials', 'channel': 'fakechannel'}, None)
dispatcher.dispatch({'content': 'my token', 'channel': 'fakechannel'}, None)
dispatcher.dispatch({'content': 'my secret token', 'channel': 'fakechannel'}, None)
dispatcher.dispatch({'content': 'hello', 'channel': 'fakechannel'}, None)
assert len(bot.executed) == 2
executed = bot.executed.pop(0)
assert executed == Executed('set_credentials', ('my token', 'my secret token'))
executed = bot.executed.pop(0)
assert executed == Executed('on_default', ({'content': 'hello', 'channel': 'fakechannel'},
None))
def set_logging(args):
'''
Sets up logging capability
:param args: argparse.Namespace
:return: None
'''
if args.log:
print("Logging Enabled: " + str(args.log))
logging.basicConfig(filename="OBCI.log",
format='%(asctime)s - %(levelname)s : %(message)s',
level=logging.DEBUG)
logging.getLogger('yapsy').setLevel(logging.DEBUG)
logging.info('---------LOG START-------------')
logging.info(args)
else:
print("main.py: Logging Disabled.")
def run():
'''
????
'''
reload(sys)
sys.setdefaultencoding('utf8')
program = os.path.basename(sys.argv[0])
logger = logging.getLogger(program)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s')
logging.root.setLevel(level=logging.INFO)
logger.info("running %s" % ' '.join(sys.argv))
outp1 = r'wiki_model'
outp2 = r'vector.txt'
model = Word2Vec(sentences, size=400, window=5, min_count=5, workers=multiprocessing.cpu_count())
model.save(outp1)
model.wv.save_word2vec_format(outp2, binary=False)
testData = ['??','??','??','??']
for i in testData:
temp = model.most_similar(i)
for j in temp:
print '%f %s'%(j[1],j[0])
print ''
def run(infile, outfile, time_interval, quiet):
logging.basicConfig(level=logging.WARN if quiet else logging.INFO)
logger = logging.getLogger(__name__)
logger.info('loading input file %s ...' % infile)
with open(infile) as fin:
# Do not use click.File because we want close the file asap
data = json.load(fin)
n = len(data)
logger.info(
'loading input file %s done. %d data found.'% (infile, n))
for i in xrange(len(data)):
logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n))
time.sleep(time_interval)
with open(outfile, 'w') as fout:
json.dump(data[:(i+1)], fout)
logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
def main():
parser = argparse.ArgumentParser(
description="Backup system/data using dar and par2")
parser.add_argument("-c", "--config", dest="config", required=True,
help="configuration file for dar and archive. " +
"NOTE: the backup archive will be placed under " +
"the same directory as this configuration file")
parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true",
help="dry run, do not perform any action")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="show verbose information")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
settings = DarSettings(args.config, verbose=args.verbose,
dry_run=args.dry_run)
dar = DarBackup(settings)
dar.run(dry_run=args.dry_run)
def main():
parser = argparse.ArgumentParser(
description="Backup files preserving metadata")
parser.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
help="dry run, do not perform actual action")
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
help="be quiet")
parser.add_argument("-d", "--debug", dest="debug", action="store_true",
help="show verbose debug information")
parser.add_argument("config", help="configuration file")
args = parser.parse_args()
if args.quiet and not args.dryrun:
logging.basicConfig(level=logging.WARNING)
if args.debug:
logging.basicConfig(level=logging.DEBUG)
now = datetime.now()
logger.info("=== %s @ %s ===" % (" ".join(sys.argv), now.isoformat()))
if args.dryrun:
logger.info("*** DRY RUN ***")
backup = Backup(args.config, dryrun=args.dryrun, debug=args.debug)
backup.backup()
backup.cleanup()
logger.info("=== Backup Finished! @ %s ===" % datetime.now().isoformat())