Python logbook 模块,WARNING 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用logbook.WARNING

项目:saltyrtc-server-python    作者:saltyrtc    | 项目源码 | 文件源码
def enable_logging(level=None, redirect_loggers=None):
    """
    Enable logging for the *saltyrtc* logger group.

    Arguments:
        - `level`: A :mod:`logbook` logging level. Defaults to
          ``WARNING``.
        - `redirect_loggers`: A dictionary containing :mod:`logging`
          logger names as key and their respective :mod:`logging` level
          as value. Each logger will be looked up and redirected to
          :mod:`logbook`. Defaults to an empty dictionary.

    Raises :class:`ImportError` in case :mod:`logbook` is not
    installed.
    """
    if _logger_convert_level_handler is None:
        _logging_error()

    # At this point, logbook is either defined or an error has been returned
    if level is None:
        level = logbook.WARNING
    logger_group.disabled = False
    logger_group.level = level
    if redirect_loggers is not None:
        _redirect_logging_loggers(redirect_loggers, remove=False)
项目:WindAdapter    作者:iLampard    | 项目源码 | 文件源码
def set_level(self, log_level):
        if log_level.lower() == LogLevel.INFO:
            self.logger.level = logbook.INFO
        elif log_level.lower() == LogLevel.WARNING:
            self.logger.level = logbook.WARNING
        elif log_level.lower() == LogLevel.CRITICAL:
            self.logger.level = logbook.CRITICAL
        elif log_level.lower() == LogLevel.NOTSET:
            self.logger.level = logbook.NOTSET
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def __get_logbook_logging_level(level_str):
        # logbook levels:
        # CRITICAL = 15
        # ERROR = 14
        # WARNING = 13
        # NOTICE = 12
        # INFO = 11
        # DEBUG = 10
        # TRACE = 9
        # NOTSET = 0

        level_str = level_str.upper().strip()

        if level_str == 'CRITICAL':
            return logbook.CRITICAL
        elif level_str == 'ERROR':
            return logbook.ERROR
        elif level_str == 'WARNING':
            return logbook.WARNING
        elif level_str == 'NOTICE':
            return logbook.NOTICE
        elif level_str == 'INFO':
            return logbook.INFO
        elif level_str == 'DEBUG':
            return logbook.DEBUG
        elif level_str == 'TRACE':
            return logbook.TRACE
        elif level_str == 'NOTSET':
            return logbook.NOTSET
        else:
            raise ValueError("Unknown logbook log level: {}".format(level_str))
项目:subprocrunner    作者:thombashi    | 项目源码 | 文件源码
def __get_logging_method(log_level):
        method_table = {
            logbook.DEBUG: logger.debug,
            logbook.INFO: logger.info,
            logbook.WARNING: logger.warning,
            logbook.ERROR: logger.error,
            logbook.CRITICAL: logger.critical,
        }

        method = method_table.get(log_level)
        if method is None:
            raise ValueError("unknown log level: {}".format(log_level))

        return method
项目:cookiecutter-pyramid-talk-python-starter    作者:mikeckennedy    | 项目源码 | 文件源码
def __get_logbook_logging_level(level_str):
        # logbook levels:
        # CRITICAL = 15
        # ERROR = 14
        # WARNING = 13
        # NOTICE = 12
        # INFO = 11
        # DEBUG = 10
        # TRACE = 9
        # NOTSET = 0

        level_str = level_str.upper().strip()

        if level_str == 'CRITICAL':
            return logbook.CRITICAL
        elif level_str == 'ERROR':
            return logbook.ERROR
        elif level_str == 'WARNING':
            return logbook.WARNING
        elif level_str == 'NOTICE':
            return logbook.NOTICE
        elif level_str == 'INFO':
            return logbook.INFO
        elif level_str == 'DEBUG':
            return logbook.DEBUG
        elif level_str == 'TRACE':
            return logbook.TRACE
        elif level_str == 'NOTSET':
            return logbook.NOTSET
        else:
            raise ValueError("Unknown logbook log level: {}".format(level_str))
项目:cookiecutter-course    作者:mikeckennedy    | 项目源码 | 文件源码
def __get_logbook_logging_level(level_str):
        # logbook levels:
        # CRITICAL = 15
        # ERROR = 14
        # WARNING = 13
        # NOTICE = 12
        # INFO = 11
        # DEBUG = 10
        # TRACE = 9
        # NOTSET = 0

        level_str = level_str.upper().strip()

        if level_str == 'CRITICAL':
            return logbook.CRITICAL
        elif level_str == 'ERROR':
            return logbook.ERROR
        elif level_str == 'WARNING':
            return logbook.WARNING
        elif level_str == 'NOTICE':
            return logbook.NOTICE
        elif level_str == 'INFO':
            return logbook.INFO
        elif level_str == 'DEBUG':
            return logbook.DEBUG
        elif level_str == 'TRACE':
            return logbook.TRACE
        elif level_str == 'NOTSET':
            return logbook.NOTSET
        else:
            raise ValueError("Unknown logbook log level: {}".format(level_str))
项目:saltyrtc-server-python    作者:saltyrtc    | 项目源码 | 文件源码
def _get_logging_level(verbosity):
    # noinspection PyPackageRequirements
    import logbook
    return {
        1: logbook.CRITICAL,
        2: logbook.ERROR,
        3: logbook.WARNING,
        4: logbook.NOTICE,
        5: logbook.INFO,
        6: logbook.DEBUG,
        7: logbook.TRACE,
    }[verbosity]
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_eod_order_cancel_minute(self, direction, minute_emission):
        """
        Test that EOD order cancel works in minute mode for both shorts and
        longs, and both daily emission and minute emission
        """
        # order 1000 shares of asset1.  the volume is only 1 share per bar,
        # so the order should be cancelled at the end of the day.
        algo = self.prep_algo(
            "set_cancel_policy(cancel_policy.EODCancel())",
            amount=np.copysign(1000, direction),
            minute_emission=minute_emission
        )

        log_catcher = TestHandler()
        with log_catcher:
            results = algo.run(self.data_portal)

            for daily_positions in results.positions:
                self.assertEqual(1, len(daily_positions))
                self.assertEqual(
                    np.copysign(389, direction),
                    daily_positions[0]["amount"],
                )
                self.assertEqual(1, results.positions[0][0]["sid"])

            # should be an order on day1, but no more orders afterwards
            np.testing.assert_array_equal([1, 0, 0],
                                          list(map(len, results.orders)))

            # should be 389 txns on day 1, but no more afterwards
            np.testing.assert_array_equal([389, 0, 0],
                                          list(map(len, results.transactions)))

            the_order = results.orders[0][0]

            self.assertEqual(ORDER_STATUS.CANCELLED, the_order["status"])
            self.assertEqual(np.copysign(389, direction), the_order["filled"])

            warnings = [record for record in log_catcher.records if
                        record.level == WARNING]

            self.assertEqual(1, len(warnings))

            if direction == 1:
                self.assertEqual(
                    "Your order for 1000 shares of ASSET1 has been partially "
                    "filled. 389 shares were successfully purchased. "
                    "611 shares were not filled by the end of day and "
                    "were canceled.",
                    str(warnings[0].message)
                )
            elif direction == -1:
                self.assertEqual(
                    "Your order for -1000 shares of ASSET1 has been partially "
                    "filled. 389 shares were successfully sold. "
                    "611 shares were not filled by the end of day and "
                    "were canceled.",
                    str(warnings[0].message)
                )
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_order_in_quiet_period(self, name, sid):
        asset = self.asset_finder.retrieve_asset(sid)

        algo_code = dedent("""
        from catalyst.api import (
            sid,
            order,
            order_value,
            order_percent,
            order_target,
            order_target_percent,
            order_target_value
        )

        def initialize(context):
            pass

        def handle_data(context, data):
            order(sid({sid}), 1)
            order_value(sid({sid}), 100)
            order_percent(sid({sid}), 0.5)
            order_target(sid({sid}), 50)
            order_target_percent(sid({sid}), 0.5)
            order_target_value(sid({sid}), 50)
        """).format(sid=sid)

        # run algo from 1/6 to 1/7
        algo = TradingAlgorithm(
            script=algo_code,
            env=self.env,
            sim_params=SimulationParameters(
                start_session=pd.Timestamp("2016-01-06", tz='UTC'),
                end_session=pd.Timestamp("2016-01-07", tz='UTC'),
                trading_calendar=self.trading_calendar,
                data_frequency="minute"
            )
        )

        with make_test_handler(self) as log_catcher:
            algo.run(self.data_portal)

            warnings = [r for r in log_catcher.records
                        if r.level == logbook.WARNING]

            # one warning per order on the second day
            self.assertEqual(6 * 390, len(warnings))

            for w in warnings:
                expected_message = (
                    'Cannot place order for ASSET{sid}, as it has de-listed. '
                    'Any existing positions for this asset will be liquidated '
                    'on {date}.'.format(sid=sid, date=asset.auto_close_date)
                )
                self.assertEqual(expected_message, w.message)
项目:saltyrtc-server-python    作者:saltyrtc    | 项目源码 | 文件源码
def server_factory(request, event_loop, server_permanent_keys):
    """
    Return a factory to create :class:`saltyrtc.Server` instances.
    """
    # Enable asyncio debug logging
    os.environ['PYTHONASYNCIODEBUG'] = '1'

    # Enable logging
    util.enable_logging(level=logbook.NOTICE, redirect_loggers={
        'asyncio': logbook.WARNING,
        'websockets': logbook.WARNING,
    })

    # Push handler
    logging_handler = logbook.StderrHandler()
    logging_handler.push_application()

    _server_instances = []

    def _server_factory(permanent_keys=None):
        if permanent_keys is None:
            permanent_keys = server_permanent_keys

        # Setup server
        port = unused_tcp_port()
        coroutine = serve(
            util.create_ssl_context(
                pytest.saltyrtc.cert, dh_params_file=pytest.saltyrtc.dh_params),
            permanent_keys,
            host=pytest.saltyrtc.ip,
            port=port,
            loop=event_loop,
            server_class=TestServer,
        )
        server_ = event_loop.run_until_complete(coroutine)
        # Inject timeout and address (little bit of a hack but meh...)
        server_.timeout = _get_timeout(request=request)
        server_.address = (pytest.saltyrtc.ip, port)

        _server_instances.append(server_)

        def fin():
            server_.close()
            event_loop.run_until_complete(server_.wait_closed())
            _server_instances.remove(server_)
            if len(_server_instances) == 0:
                logging_handler.pop_application()

        request.addfinalizer(fin)
        return server_
    return _server_factory