我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用logbook.WARNING。
def enable_logging(level=None, redirect_loggers=None): """ Enable logging for the *saltyrtc* logger group. Arguments: - `level`: A :mod:`logbook` logging level. Defaults to ``WARNING``. - `redirect_loggers`: A dictionary containing :mod:`logging` logger names as key and their respective :mod:`logging` level as value. Each logger will be looked up and redirected to :mod:`logbook`. Defaults to an empty dictionary. Raises :class:`ImportError` in case :mod:`logbook` is not installed. """ if _logger_convert_level_handler is None: _logging_error() # At this point, logbook is either defined or an error has been returned if level is None: level = logbook.WARNING logger_group.disabled = False logger_group.level = level if redirect_loggers is not None: _redirect_logging_loggers(redirect_loggers, remove=False)
def set_level(self, log_level): if log_level.lower() == LogLevel.INFO: self.logger.level = logbook.INFO elif log_level.lower() == LogLevel.WARNING: self.logger.level = logbook.WARNING elif log_level.lower() == LogLevel.CRITICAL: self.logger.level = logbook.CRITICAL elif log_level.lower() == LogLevel.NOTSET: self.logger.level = logbook.NOTSET
def __get_logbook_logging_level(level_str): # logbook levels: # CRITICAL = 15 # ERROR = 14 # WARNING = 13 # NOTICE = 12 # INFO = 11 # DEBUG = 10 # TRACE = 9 # NOTSET = 0 level_str = level_str.upper().strip() if level_str == 'CRITICAL': return logbook.CRITICAL elif level_str == 'ERROR': return logbook.ERROR elif level_str == 'WARNING': return logbook.WARNING elif level_str == 'NOTICE': return logbook.NOTICE elif level_str == 'INFO': return logbook.INFO elif level_str == 'DEBUG': return logbook.DEBUG elif level_str == 'TRACE': return logbook.TRACE elif level_str == 'NOTSET': return logbook.NOTSET else: raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logging_method(log_level): method_table = { logbook.DEBUG: logger.debug, logbook.INFO: logger.info, logbook.WARNING: logger.warning, logbook.ERROR: logger.error, logbook.CRITICAL: logger.critical, } method = method_table.get(log_level) if method is None: raise ValueError("unknown log level: {}".format(log_level)) return method
def _get_logging_level(verbosity): # noinspection PyPackageRequirements import logbook return { 1: logbook.CRITICAL, 2: logbook.ERROR, 3: logbook.WARNING, 4: logbook.NOTICE, 5: logbook.INFO, 6: logbook.DEBUG, 7: logbook.TRACE, }[verbosity]
def test_eod_order_cancel_minute(self, direction, minute_emission): """ Test that EOD order cancel works in minute mode for both shorts and longs, and both daily emission and minute emission """ # order 1000 shares of asset1. the volume is only 1 share per bar, # so the order should be cancelled at the end of the day. algo = self.prep_algo( "set_cancel_policy(cancel_policy.EODCancel())", amount=np.copysign(1000, direction), minute_emission=minute_emission ) log_catcher = TestHandler() with log_catcher: results = algo.run(self.data_portal) for daily_positions in results.positions: self.assertEqual(1, len(daily_positions)) self.assertEqual( np.copysign(389, direction), daily_positions[0]["amount"], ) self.assertEqual(1, results.positions[0][0]["sid"]) # should be an order on day1, but no more orders afterwards np.testing.assert_array_equal([1, 0, 0], list(map(len, results.orders))) # should be 389 txns on day 1, but no more afterwards np.testing.assert_array_equal([389, 0, 0], list(map(len, results.transactions))) the_order = results.orders[0][0] self.assertEqual(ORDER_STATUS.CANCELLED, the_order["status"]) self.assertEqual(np.copysign(389, direction), the_order["filled"]) warnings = [record for record in log_catcher.records if record.level == WARNING] self.assertEqual(1, len(warnings)) if direction == 1: self.assertEqual( "Your order for 1000 shares of ASSET1 has been partially " "filled. 389 shares were successfully purchased. " "611 shares were not filled by the end of day and " "were canceled.", str(warnings[0].message) ) elif direction == -1: self.assertEqual( "Your order for -1000 shares of ASSET1 has been partially " "filled. 389 shares were successfully sold. " "611 shares were not filled by the end of day and " "were canceled.", str(warnings[0].message) )
def test_order_in_quiet_period(self, name, sid): asset = self.asset_finder.retrieve_asset(sid) algo_code = dedent(""" from catalyst.api import ( sid, order, order_value, order_percent, order_target, order_target_percent, order_target_value ) def initialize(context): pass def handle_data(context, data): order(sid({sid}), 1) order_value(sid({sid}), 100) order_percent(sid({sid}), 0.5) order_target(sid({sid}), 50) order_target_percent(sid({sid}), 0.5) order_target_value(sid({sid}), 50) """).format(sid=sid) # run algo from 1/6 to 1/7 algo = TradingAlgorithm( script=algo_code, env=self.env, sim_params=SimulationParameters( start_session=pd.Timestamp("2016-01-06", tz='UTC'), end_session=pd.Timestamp("2016-01-07", tz='UTC'), trading_calendar=self.trading_calendar, data_frequency="minute" ) ) with make_test_handler(self) as log_catcher: algo.run(self.data_portal) warnings = [r for r in log_catcher.records if r.level == logbook.WARNING] # one warning per order on the second day self.assertEqual(6 * 390, len(warnings)) for w in warnings: expected_message = ( 'Cannot place order for ASSET{sid}, as it has de-listed. ' 'Any existing positions for this asset will be liquidated ' 'on {date}.'.format(sid=sid, date=asset.auto_close_date) ) self.assertEqual(expected_message, w.message)
def server_factory(request, event_loop, server_permanent_keys): """ Return a factory to create :class:`saltyrtc.Server` instances. """ # Enable asyncio debug logging os.environ['PYTHONASYNCIODEBUG'] = '1' # Enable logging util.enable_logging(level=logbook.NOTICE, redirect_loggers={ 'asyncio': logbook.WARNING, 'websockets': logbook.WARNING, }) # Push handler logging_handler = logbook.StderrHandler() logging_handler.push_application() _server_instances = [] def _server_factory(permanent_keys=None): if permanent_keys is None: permanent_keys = server_permanent_keys # Setup server port = unused_tcp_port() coroutine = serve( util.create_ssl_context( pytest.saltyrtc.cert, dh_params_file=pytest.saltyrtc.dh_params), permanent_keys, host=pytest.saltyrtc.ip, port=port, loop=event_loop, server_class=TestServer, ) server_ = event_loop.run_until_complete(coroutine) # Inject timeout and address (little bit of a hack but meh...) server_.timeout = _get_timeout(request=request) server_.address = (pytest.saltyrtc.ip, port) _server_instances.append(server_) def fin(): server_.close() event_loop.run_until_complete(server_.wait_closed()) _server_instances.remove(server_) if len(_server_instances) == 0: logging_handler.pop_application() request.addfinalizer(fin) return server_ return _server_factory