Python logbook 模块,StreamHandler() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用logbook.StreamHandler()

项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def global_init(log_level_text, filename):
        level = LogService.__get_logbook_logging_level(log_level_text)

        if not filename:
            logbook.StreamHandler(sys.stdout, level=level).push_application()
        else:
            logbook.TimedRotatingFileHandler(
                filename, level=level,
                date_format="%Y-%m-%d").push_application()

        msg = 'Logging initialized, level: {}, mode: {}'.format(
            log_level_text,
            "stdout mode" if not filename else 'file mode: ' + filename
        )

        LogService.get_startup_log().notice(msg)
项目:cookiecutter-pyramid-talk-python-starter    作者:mikeckennedy    | 项目源码 | 文件源码
def global_init(log_level_text, filename):
        level = LogService.__get_logbook_logging_level(log_level_text)

        if not filename:
            logbook.StreamHandler(sys.stdout, level=level).push_application()
        else:
            logbook.TimedRotatingFileHandler(
                filename, level=level,
                date_format="%Y-%m-%d").push_application()

        msg = 'Logging initialized, level: {}, mode: {}'.format(
            log_level_text,
            "stdout mode" if not filename else 'file mode: ' + filename
        )

        LogService.get_startup_log().notice(msg)
项目:cookiecutter-course    作者:mikeckennedy    | 项目源码 | 文件源码
def global_init(log_level_text, filename):
        level = LogService.__get_logbook_logging_level(log_level_text)

        if not filename:
            logbook.StreamHandler(sys.stdout, level=level).push_application()
        else:
            logbook.TimedRotatingFileHandler(
                filename, level=level,
                date_format="%Y-%m-%d").push_application()

        msg = 'Logging initialized, level: {}, mode: {}'.format(
            log_level_text,
            "stdout mode" if not filename else 'file mode: ' + filename
        )

        LogService.get_startup_log().notice(msg)
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def set_handler(args):
    old_value = get_client_parameter(args.hostname, args.parameter)

    try:
        old = set_client_parameter(args.hostname, args.parameter, args.value)
    except Exception as e:
        sys.exit('Failed to set parameter: {}'.format(e))

    if not old_value:
        with logbook.StreamHandler(sys.stdout, bubble=True):
            log.info('Set parameter {} for host {} to {}',
                     args.parameter, args.hostname, args.value)
    elif old:
        with logbook.StreamHandler(sys.stdout, bubble=True):
            log.info('Changed parameter {} for host {} from {} to {}',
                     args.parameter, args.hostname, old, args.value)
    else:
        print('No changes.')
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def unsnooze_handler(args):
    if not (args.host or args.issue_name or args.all):
        sys.exit('If you really want to unsnooze all issues for all hosts,\n'
                 'you need to specify --all.')

    hostname = (None if not args.host else
                args.host[0] if len(args.host) == 1 else
                {'$in': args.host})
    issue_name = (None if not args.issue_name else
                  args.issue_name[0] if len(args.issue_name) == 1
                  else {'$in': args.issue_name})
    ids = unsnooze_issue(hostname, issue_name)

    if not ids:
        print('No matching issues.')
        return

    with logbook.StreamHandler(sys.stdout, bubble=True):
        for doc in get_db().issues.find({'_id': {'$in': ids}}):
            log.info('Unsnoozed {} {} at {}', doc['hostname'], doc['name'],
                     doc['unsnoozed_at'])
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def close_handler(args):
    if not (args.host or args.issue_name or args.all):
        sys.exit('If you really want to close all issues for all hosts,\n'
                 'you need to specify --all.')

    hostname = (None if not args.host else
                args.host[0] if len(args.host) == 1 else
                {'$in': args.host})
    issue_name = (None if not args.issue_name else
                  args.issue_name[0] if len(args.issue_name) == 1
                  else {'$in': args.issue_name})
    docs = close_issue(hostname, issue_name)

    if not docs:
        print('No matching issues.')
        return

    with logbook.StreamHandler(sys.stdout, bubble=True):
        for doc in docs:
            log.info('Manually closed {} issue for {}', doc['name'],
                     doc['hostname'])
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def start_loop(cfg: Config, noop=False):
    handlers = []
    handlers.append(StreamHandler(sys.stdout, level=cfg.log_level))
    logger = Logger("Heart")
    logger.info("Initializing Oshino v{0}".format(get_version()))
    logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit"
                .format(cfg.interval))
    if cfg.sentry_dsn:
        try:
            client = SentryClient(cfg.sentry_dsn)
            handlers.append(SentryHandler(client,
                                          level=logbook.ERROR,
                                          bubble=True))
        except InvalidDsn:
            logger.warn("Invalid Sentry DSN '{0}' providen. Skipping"
                        .format(cfg.sentry_dsn))

    setup = NestedSetup(handlers)
    setup.push_application()

    loop = create_loop()
    try:
        loop.run_until_complete(main_loop(cfg,
                                          logger,
                                          cfg.riemann.transport(noop),
                                          forever,
                                          loop=loop))
    finally:
        loop.close()
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def debug():
    logbook.StreamHandler(sys.stdout, level=logbook.DEBUG).push_application()
项目:OdooQuant    作者:haogefeifei    | 项目源码 | 文件源码
def get_logger(name, debug=True):
    logbook.set_datetime_format('local')
    handler = StreamHandler(sys.stdout) if debug else NullHandler()
    handler.push_application()
    return Logger(os.path.basename(name))
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def unset_handler(args):
    try:
        old = set_client_parameter(args.hostname, args.parameter, None)
    except Exception as e:
        sys.exit('Failed to unset parameter: {}'.format(e))

    if old:
        with logbook.StreamHandler(sys.stdout, bubble=True):
            log.info('Unset parameter {} for host {} (was {})', args.parameter,
                     args.hostname, old)
    else:
        print('No changes.')
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def snooze_handler(args):
    if not (args.host or args.issue_name or args.all):
        sys.exit('If you really want to snooze all issues for all hosts,\n'
                 'you need to specify --all.')

    if not (args.days or args.hours):
        args.days = 1

    if args.days:
        then = now + datetime.timedelta(days=args.days)
    else:
        then = now + datetime.timedelta(hours=args.hours)

    hostname = (None if not args.host else
                args.host[0] if len(args.host) == 1 else
                {'$in': args.host})
    issue_name = (None if not args.issue_name else
                  args.issue_name[0] if len(args.issue_name) == 1
                  else {'$in': args.issue_name})
    ids = snooze_issue(hostname, issue_name, then)

    if not ids:
        print('No matching issues.')
        return

    with logbook.StreamHandler(sys.stdout, bubble=True):
        for doc in get_db().issues.find({'_id': {'$in': ids}}):
            log.info('Snoozed {} {} until {}', doc['hostname'], doc['name'],
                     then)
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def suspend_handler(args):
    matches = suspend_host(args.host)
    if not matches:
        print('No matching, unsuspended hosts.')
        return
    with logbook.StreamHandler(sys.stdout, bubble=True):
        for host in matches:
            log.info('Suspended {}', host)
项目:PenguinDome    作者:quantopian    | 项目源码 | 文件源码
def open_handler(args):
    with logbook.StreamHandler(sys.stdout, bubble=True):
        for host in args.host:
            for issue in args.issue_name:
                if open_issue(host, issue):
                    log.info('Manually opened {} issue for {}', issue, host)
                else:
                    print('Open {} issue for {} already exists.'.format(
                        issue, host))
项目:GAFBot    作者:DiNitride    | 项目源码 | 文件源码
def run(self):

        log = Logger("GAF Bot")
        log.handlers.append(StreamHandler(sys.stdout, bubble=True))
        log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w"))
        self.logger = log
        self.logger.notice("Logging started")
        self.logger.notice("Bot process started")

        with open("bot/config/defaults/default.guildconfig.json") as f:
            self.default_guild_config = json.load(f)
            self.logger.debug("Loaded default guild config")

        self.logger.debug("Connecting to DB")
        self.db_conn = sqlite3.connect("bot/config/guild_configs.db")
        self.logger.notice("DB Connection Established")
        self.db_cursor = self.db_conn.cursor()
        self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'")
        exists = self.db_cursor.fetchone()
        if not exists[0]:
            self.logger.error("No table found in DB! Creating new one now")
            self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''')
            self.logger.debug("Table created")

        self.load_extension("bot.modules.core")
        self.logger.notice("Loaded core module")

        self.logger.notice("Loading other modules")
        # This bar and the time.sleep() stuff is entirely useless
        # Like completely
        # Don't do this
        # It just looks cool and that makes me happy but really this is terrible
        # and a complete waste of time
        time.sleep(0.5)
        for cog in tqdm.tqdm(self.config["modules"].keys(),
                             desc="Loading modules"
                             ):
            self.load_extension(f"bot.modules.{cog.lower()}")
            time.sleep(0.2)
        time.sleep(0.5)
        self.logger.debug("Completed loading modules")

        self.logger.notice("Logging into Discord")
        super().run(self.config["token"], reconnect=True)