我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pandas.Timestamp()。
def get_open_and_close(day, early_closes): market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=9, minute=31), tz='US/Eastern').tz_convert('UTC') # 1 PM if early close, 4 PM otherwise close_hour = 13 if day in early_closes else 16 market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=close_hour), tz='US/Eastern').tz_convert('UTC') return market_open, market_close
def get_open_and_close(day, early_closes): # only "early close" event in Bovespa actually is a late start # as the market only opens at 1pm open_hour = 13 if day in quarta_cinzas else 10 market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=open_hour, minute=00), tz='America/Sao_Paulo').tz_convert('UTC') market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=16), tz='America/Sao_Paulo').tz_convert('UTC') return market_open, market_close
def get_open_and_close(day, early_closes): market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=9, minute=31), tz='US/Eastern').tz_convert('UTC') # 1 PM if early close, 4 PM otherwise close_hour = 13 if day in early_closes else 16 market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=close_hour), tz='Asia/Shanghai').tz_convert('UTC') return market_open, market_close
def was_active(reference_date_value, asset): """ Whether or not `asset` was active at the time corresponding to `reference_date_value`. Parameters ---------- reference_date_value : int Date, represented as nanoseconds since EPOCH, for which we want to know if `asset` was alive. This is generally the result of accessing the `value` attribute of a pandas Timestamp. asset : Asset The asset object to check. Returns ------- was_active : bool Whether or not the `asset` existed at the specified time. """ return ( asset.start_date.value <= reference_date_value <= asset.end_date.value )
def as_of(self, dt): """ Get the future chain for this root symbol as of a specific date. Parameters ---------- dt : datetime.datetime or pandas.Timestamp or str, optional The as_of_date for the new chain. Returns ------- FutureChain """ return FutureChain( asset_finder=self._asset_finder, get_datetime=self._algorithm_get_datetime, root_symbol=self.root_symbol, as_of_date=Timestamp(dt, tz='UTC'), )
def last_date_in_output_for_sid(self, sid): """ Parameters: ----------- sid : int Asset identifier. Returns: -------- out : pd.Timestamp The midnight of the last date written in to the output for the given sid. """ sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid)) if not os.path.exists(sizes_path): return pd.NaT with open(sizes_path, mode='r') as f: sizes = f.read() data = json.loads(sizes) num_days = data['shape'][0] / self._minutes_per_day if num_days == 0: # empty container return pd.NaT return self._trading_days[num_days - 1]
def setUpClass(cls): cls.future = Future( 2468, symbol='OMH15', root_symbol='OM', notice_date=pd.Timestamp('2014-01-20', tz='UTC'), expiration_date=pd.Timestamp('2014-02-20', tz='UTC'), auto_close_date=pd.Timestamp('2014-01-18', tz='UTC'), tick_size=.01, multiplier=500 ) cls.future2 = Future( 0, symbol='CLG06', root_symbol='CL', start_date=pd.Timestamp('2005-12-01', tz='UTC'), notice_date=pd.Timestamp('2005-12-20', tz='UTC'), expiration_date=pd.Timestamp('2006-01-20', tz='UTC') ) env = TradingEnvironment(load=noop_load) env.write_data(futures_identifiers=[TestFuture.future, TestFuture.future2]) cls.asset_finder = env.asset_finder
def test_insert_metadata(self): data = {0: {'start_date': '2014-01-01', 'end_date': '2015-01-01', 'symbol': "PLAY", 'foo_data': "FOO"}} self.env.write_data(equities_data=data) finder = self.asset_finder_type(self.env.engine) # Test proper insertion equity = finder.retrieve_asset(0) self.assertIsInstance(equity, Equity) self.assertEqual('PLAY', equity.symbol) self.assertEqual(pd.Timestamp('2015-01-01', tz='UTC'), equity.end_date) # Test invalid field with self.assertRaises(AttributeError): equity.foo_data
def test_consume_asset_as_identifier(self): # Build some end dates eq_end = pd.Timestamp('2012-01-01', tz='UTC') fut_end = pd.Timestamp('2008-01-01', tz='UTC') # Build some simple Assets equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end) future_asset = Future(200, symbol="TESTFUT", end_date=fut_end) # Consume the Assets self.env.write_data(equities_identifiers=[equity_asset], futures_identifiers=[future_asset]) finder = self.asset_finder_type(self.env.engine) # Test equality with newly built Assets self.assertEqual(equity_asset, finder.retrieve_asset(1)) self.assertEqual(future_asset, finder.retrieve_asset(200)) self.assertEqual(eq_end, finder.retrieve_asset(1).end_date) self.assertEqual(fut_end, finder.retrieve_asset(200).end_date)
def test_sid_assignment(self): # This metadata does not contain SIDs metadata = ['PLAY', 'MSFT'] today = normalize_date(pd.Timestamp('2015-07-09', tz='UTC')) # Write data with sid assignment self.env.write_data(equities_identifiers=metadata, allow_sid_assignment=True) # Verify that Assets were built and different sids were assigned finder = self.asset_finder_type(self.env.engine) play = finder.lookup_symbol('PLAY', today) msft = finder.lookup_symbol('MSFT', today) self.assertEqual('PLAY', play.symbol) self.assertIsNotNone(play.sid) self.assertNotEqual(play.sid, msft.sid)
def test_map_identifier_index_to_sids(self): # Build an empty finder and some Assets dt = pd.Timestamp('2014-01-01', tz='UTC') finder = self.asset_finder_type(self.env.engine) asset1 = Equity(1, symbol="AAPL") asset2 = Equity(2, symbol="GOOG") asset200 = Future(200, symbol="CLK15") asset201 = Future(201, symbol="CLM15") # Check for correct mapping and types pre_map = [asset1, asset2, asset200, asset201] post_map = finder.map_identifier_index_to_sids(pre_map, dt) self.assertListEqual([1, 2, 200, 201], post_map) for sid in post_map: self.assertIsInstance(sid, int) # Change order and check mapping again pre_map = [asset201, asset2, asset200, asset1] post_map = finder.map_identifier_index_to_sids(pre_map, dt) self.assertListEqual([201, 2, 200, 1], post_map)
def test_group_by_type(self): equities = make_simple_equity_info( range(5), start_date=pd.Timestamp('2014-01-01'), end_date=pd.Timestamp('2015-01-01'), ) futures = make_commodity_future_info( first_sid=6, root_symbols=['CL'], years=[2014], ) # Intersecting sid queries, to exercise loading of partially-cached # results. queries = [ ([0, 1, 3], [6, 7]), ([0, 2, 3], [7, 10]), (list(equities.index), list(futures.index)), ] with tmp_asset_finder(equities=equities, futures=futures) as finder: for equity_sids, future_sids in queries: results = finder.group_by_type(equity_sids + future_sids) self.assertEqual( results, {'equity': set(equity_sids), 'future': set(future_sids)}, )
def test_repr(self): """ Test the __repr__ method of FutureChain. """ cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL') cl_feb = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL', as_of_date=pd.Timestamp('2006-02-01', tz='UTC')) # The default chain should not include the as of date. self.assertEqual(repr(cl), "FutureChain(root_symbol='CL')") # An explicit as of date should show up in the repr. self.assertEqual( repr(cl_feb), ("FutureChain(root_symbol='CL', " "as_of_date='2006-02-01 00:00:00+00:00')") )
def test_minute(self): np.random.seed(123) start_prices = {0: 100, 1: 500} start = pd.Timestamp('1990-01-01', tz='UTC') end = pd.Timestamp('1991-01-01', tz='UTC') source = RandomWalkSource(start_prices=start_prices, calendar=calendar_nyse, start=start, end=end) self.assertIsInstance(source.start, pd.lib.Timestamp) self.assertIsInstance(source.end, pd.lib.Timestamp) for event in source: self.assertIn(event.sid, start_prices.keys()) self.assertIn(event.dt.replace(minute=0, hour=0), calendar_nyse.trading_days) self.assertGreater(event.dt, start) self.assertLess(event.dt, end) self.assertGreater(event.price, 0, "price should never go negative.") self.assertTrue(13 <= event.dt.hour <= 21, "event.dt.hour == %i, not during market \ hours." % event.dt.hour)
def test_day(self): np.random.seed(123) start_prices = {0: 100, 1: 500} start = pd.Timestamp('1990-01-01', tz='UTC') end = pd.Timestamp('1992-01-01', tz='UTC') source = RandomWalkSource(start_prices=start_prices, calendar=calendar_nyse, start=start, end=end, freq='daily') self.assertIsInstance(source.start, pd.lib.Timestamp) self.assertIsInstance(source.end, pd.lib.Timestamp) for event in source: self.assertIn(event.sid, start_prices.keys()) self.assertIn(event.dt.replace(minute=0, hour=0), calendar_nyse.trading_days) self.assertGreater(event.dt, start) self.assertLess(event.dt, end) self.assertGreater(event.price, 0, "price should never go negative.") self.assertEqual(event.dt.hour, 0)
def test_get_environment(self): expected_env = { 'arena': 'backtest', 'data_frequency': 'minute', 'start': pd.Timestamp('2006-01-03 14:31:00+0000', tz='UTC'), 'end': pd.Timestamp('2006-01-04 21:00:00+0000', tz='UTC'), 'capital_base': 100000.0, 'platform': 'zipline' } def initialize(algo): self.assertEqual('zipline', algo.get_environment()) self.assertEqual(expected_env, algo.get_environment('*')) def handle_data(algo, data): pass algo = TradingAlgorithm(initialize=initialize, handle_data=handle_data, sim_params=self.sim_params, env=self.env) algo.run(self.source)
def test_load_bars_from_yahoo(self): stocks = ['AAPL', 'GE'] start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = load_bars_from_yahoo(stocks=stocks, start=start, end=end) assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000') assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000') for stock in stocks: assert stock in data.items for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']: assert ohlc in data.minor_axis np.testing.assert_raises( AssertionError, load_bars_from_yahoo, stocks=stocks, start=end, end=start )
def view(self, view_id, portfolio_id, portfolio_type, start_date=(pd.Timestamp("today")), end_date=pd.Timestamp("today")): def __dict(d): return '&'.join(["{key}={value}".format(key=key, value=value) for key, value in d.items()]) assert isinstance(portfolio_type, PortfolioType) request = "portfolio/views/{view}/results?".format(view=view_id) + \ __dict({"portfolio_id": portfolio_id, "portfolio_type": portfolio_type.value, "output_type": OutputType.JSON.value, "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d")}) return self.get(request=request)
def to_date_time(dt, tz_in=None, tz_out=None): """Convert value to date/time object :param dt: value representing a date/time (parsed by pandas.Timestamp) :param tz_in: time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize) :param tz_out: time zone to convert data/time value into (parsed by pandas.Timestamp.tz_convert) :returns: date/time object :rtype: datetime.datetime """ if dt is None: return None try: ts = pd.Timestamp(dt) if tz_in: ts = ts.tz_localize(tz_in) if tz_out: ts = ts.tz_convert(tz_out) return ts.to_datetime() except BaseException: return None
def to_timestamp(dt, tz_in=None): """Convert value to Unix timestamp (ns) :param dt: value representing a date/time (parsed by pandas.Timestamp) :param tz_in: time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize) :returns: Unix timestamp (ns) :rtype: int """ if dt is None: return None try: ts = pd.Timestamp(dt) if tz_in: ts = ts.tz_localize(tz_in) return ts.value except BaseException: return None
def get_utc_timestamp(dt): """ Returns the Timestamp/DatetimeIndex with either localized or converted to UTC. Parameters ---------- dt : Timestamp/DatetimeIndex the date(s) to be converted Returns ------- same type as input date(s) converted to UTC """ dt = pd.to_datetime(dt) try: dt = dt.tz_localize('UTC') except TypeError: dt = dt.tz_convert('UTC') return dt
def predict_tf_once(day,start_date = '2016-10-1'): all_dataset = get_dataset(day) all_dataset = map(lambda x:x.ix[start_date:start_date],all_dataset) y_p_features = map(lambda user_id:tf_percent_model.resample_x_y_(all_dataset,user_id)[0].reshape(-1),get_full_user_ids()) y_p_features_df = pd.DataFrame(y_p_features,index = get_full_user_ids()) percent = pd.DataFrame.from_csv('./features/tensorflow_model/percent_model/%d.csv'%day) #percent = pd.DataFrame.from_csv('./features/tensorflow_model/percent_model/%d.csv'%2) #%% percent = percent[map(lambda x:'percent#%d'%x,range(_feature_length))] t = pd.DataFrame(index = percent.index) t[pd.Timestamp(start_date)+pd.Timedelta('%dd'%(day-1))] = (np.array(y_p_features_df)*percent).sum(axis=1) t = t.T t.to_csv('./result/predict_part/%d.csv'%day) real = int(np.round((np.array(y_p_features_df)*percent).sum().sum())) print (day,real) return (day,real)
def predict_tf_all(path = None): result_list = [] p = m_Pool(31) result_list = p.map(predict_tf_once,range(1,32)) p.close() p.join() print 'writing...' result_df = pd.DataFrame(index = range(1)) for day,result in result_list: day_s = str(day) if len(day_s)<=1: day_s = '0'+day_s result_df['201610'+day_s] = result result_df = result_df.T result_df.columns = ['predict_power_consumption'] if path == None: date = str(pd.Timestamp(time.ctime())).replace(' ','_').replace(':','_') path = './result/'+date+'.csv' result_df.to_csv(path,index_label='predict_date') l = map(lambda day:pd.DataFrame.from_csv('./result/predict_part/%d.csv'%day),range(1,32)) t = pd.concat(l) t.to_csv('./result/predict_part/'+date+'.csv')
def test_write_points_from_dataframe(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) expected = ( b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n" b"foo column_one=\"2\",column_two=2i,column_three=2.0 " b"3600000000000\n" ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/write", status_code=204) cli = DataFrameClient(database='db') cli.write_points(dataframe, 'foo') self.assertEqual(m.last_request.body, expected) cli.write_points(dataframe, 'foo', tags=None) self.assertEqual(m.last_request.body, expected)
def test_write_points_from_dataframe(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) points = [ { "points": [ ["1", 1, 1.0, 0], ["2", 2, 2.0, 3600] ], "name": "foo", "columns": ["column_one", "column_two", "column_three", "time"] } ] with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series") cli = DataFrameClient(database='db') cli.write_points({"foo": dataframe}) self.assertListEqual(json.loads(m.last_request.body), points)
def test_write_points_from_dataframe_with_float_nan(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[[1, float("NaN"), 1.0], [2, 2, 2.0]], index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) points = [ { "points": [ [1, None, 1.0, 0], [2, 2, 2.0, 3600] ], "name": "foo", "columns": ["column_one", "column_two", "column_three", "time"] } ] with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series") cli = DataFrameClient(database='db') cli.write_points({"foo": dataframe}) self.assertListEqual(json.loads(m.last_request.body), points)
def test_write_points_from_dataframe_with_numeric_column_names(self): now = pd.Timestamp('1970-01-01 00:00+00:00') # df with numeric column names dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)]) points = [ { "points": [ ["1", 1, 1.0, 0], ["2", 2, 2.0, 3600] ], "name": "foo", "columns": ['0', '1', '2', "time"] } ] with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series") cli = DataFrameClient(database='db') cli.write_points({"foo": dataframe}) self.assertListEqual(json.loads(m.last_request.body), points)
def test_datetime_to_epoch(self): timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00') cli = DataFrameClient('host', 8086, 'username', 'password', 'db') self.assertEqual( cli._datetime_to_epoch(timestamp), 1356998400.0 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='s'), 1356998400.0 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='m'), 1356998400000.0 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='ms'), 1356998400000.0 ) self.assertEqual( cli._datetime_to_epoch(timestamp, time_precision='u'), 1356998400000000.0 )
def test_load_columnar_pandas_all(self, con, all_types_table): pd = pytest.importorskip("pandas") import numpy as np data = pd.DataFrame({ "boolean_": [True, False], "smallint_": np.array([0, 1], dtype=np.int8), "int_": np.array([0, 1], dtype=np.int32), "bigint_": np.array([0, 1], dtype=np.int64), "float_": np.array([0, 1], dtype=np.float32), "double_": np.array([0, 1], dtype=np.float64), "varchar_": ["a", "b"], "text_": ['a', 'b'], "time_": [datetime.time(0, 11, 59), datetime.time(13)], "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")], "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)], }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_', 'double_', 'varchar_', 'text_', 'time_', 'timestamp_', 'date_']) con.load_table_columnar(all_types_table, data, preserve_index=False)
def test_load_table_creates(self, con, not_a_table): pd = pytest.importorskip("pandas") import numpy as np data = pd.DataFrame({ "boolean_": [True, False], "smallint_cast": np.array([0, 1], dtype=np.int8), "smallint_": np.array([0, 1], dtype=np.int16), "int_": np.array([0, 1], dtype=np.int32), "bigint_": np.array([0, 1], dtype=np.int64), "float_": np.array([0, 1], dtype=np.float32), "double_": np.array([0, 1], dtype=np.float64), "varchar_": ["a", "b"], "text_": ['a', 'b'], "time_": [datetime.time(0, 11, 59), datetime.time(13)], "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")], "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)], }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_', 'double_', 'varchar_', 'text_', 'time_', 'timestamp_', 'date_']) con.load_table(not_a_table, data, create=True)
def _handle_dividend_payable(self, trading_date): """handle dividend payable before trading """ to_delete_dividend = [] for order_book_id, dividend_info in six.iteritems(self.portfolio._dividend_info): dividend_series_dict = dividend_info.dividend_series_dict if pd.Timestamp(trading_date) == pd.Timestamp(dividend_series_dict['payable_date']): dividend_per_share = dividend_series_dict["dividend_cash_before_tax"] / dividend_series_dict["round_lot"] if dividend_per_share > 0 and dividend_info.quantity > 0: dividend_cash = dividend_per_share * dividend_info.quantity self.portfolio._dividend_receivable -= dividend_cash self.portfolio._cash += dividend_cash # user_log.info(_("get dividend {dividend} for {order_book_id}").format( # dividend=dividend_cash, # order_book_id=order_book_id, # )) to_delete_dividend.append(order_book_id) for order_book_id in to_delete_dividend: self.portfolio._dividend_info.pop(order_book_id, None)
def _handle_dividend_payable(self, trading_date): """handle dividend payable before trading """ to_delete_dividend = [] for order_book_id, dividend_info in six.iteritems(self.portfolio._dividend_info): dividend_series_dict = dividend_info.dividend_series_dict if pd.Timestamp(trading_date) == pd.Timestamp(dividend_series_dict['payable_date']): dividend_per_share = dividend_series_dict["dividend_cash_before_tax"] / dividend_series_dict["round_lot"] if dividend_per_share > 0 and dividend_info.quantity > 0: dividend_cash = dividend_per_share * dividend_info.quantity self.portfolio._dividend_receivable -= dividend_cash self.portfolio._cash += dividend_cash to_delete_dividend.append(order_book_id) for order_book_id in to_delete_dividend: self.portfolio._dividend_info.pop(order_book_id, None)
def is_valid_date(self, ignore_none=True): def check_is_valid_date(func_name, value): if ignore_none and value is None: return None if isinstance(value, (datetime.date, pd.Timestamp)): return if isinstance(value, six.string_types): try: v = parse_date(value) except ValueError: raise RQInvalidArgument( _('function {}: invalid {} argument, expect a valid date, got {} (type: {})').format( func_name, self._arg_name, value, type(value) )) raise RQInvalidArgument( _('function {}: invalid {} argument, expect a valid date, got {} (type: {})').format( func_name, self._arg_name, value, type(value) )) self._rules.append(check_is_valid_date) return self
def get_previous_trading_date(date): """ ????????????? :param date: ???? :type date: `str` | `date` | `datetime` | `pandas.Timestamp` :return: `datetime.date` :example: .. code-block:: python3 :linenos: [In]get_previous_trading_date(date='2016-05-02') [Out] [datetime.date(2016, 4, 29)] """ return ExecutionContext.data_proxy.get_previous_trading_date(date)
def get_next_trading_date(date): """ ???????????? :param date: ???? :type date: `str` | `date` | `datetime` | `pandas.Timestamp` :return: `datetime.date` :example: .. code-block:: python3 :linenos: [In]get_next_trading_date(date='2016-05-01') [Out] [datetime.date(2016, 5, 3)] """ return ExecutionContext.data_proxy.get_next_trading_date(date)
def is_valid_date(self, ignore_none=True): def check_is_valid_date(func_name, value): if ignore_none and value is None: return None if isinstance(value, (datetime.date, pd.Timestamp)): return if isinstance(value, six.string_types): try: v = parse_date(value) return except ValueError: raise RQInvalidArgument( _(u"function {}: invalid {} argument, expect a valid date, got {} (type: {})").format( func_name, self._arg_name, value, type(value) )) raise RQInvalidArgument( _(u"function {}: invalid {} argument, expect a valid date, got {} (type: {})").format( func_name, self._arg_name, value, type(value) )) self._rules.append(check_is_valid_date) return self
def get_trading_dates(start_date, end_date): """ ??????????????????????????????????? :param start_date: ???? :type start_date: `str` | `date` | `datetime` | `pandas.Timestamp` :param end_date: ???? :type end_date: `str` | `date` | `datetime` | `pandas.Timestamp` :return: list[`datetime.date`] :example: .. code-block:: python3 :linenos: [In]get_trading_dates(start_date='2016-05-05', end_date='20160505') [Out] [datetime.date(2016, 5, 5)] """ return Environment.get_instance().data_proxy.get_trading_dates(start_date, end_date)
def get_next_trading_date(date): """ ???????????? :param date: ???? :type date: `str` | `date` | `datetime` | `pandas.Timestamp` :return: `datetime.date` :example: .. code-block:: python3 :linenos: [In]get_next_trading_date(date='2016-05-01') [Out] [datetime.date(2016, 5, 3)] """ return Environment.get_instance().data_proxy.get_next_trading_date(date)
def test_date_range_lower_freq(): cal = mcal.get_calendar("NYSE") schedule = cal.schedule(pd.Timestamp('2017-09-05 20:00', tz='UTC'), pd.Timestamp('2017-10-23 20:00', tz='UTC')) # cannot get date range of frequency lower than 1D with pytest.raises(ValueError): mcal.date_range(schedule, frequency='3D') # instead get for 1D and convert to lower frequency short = mcal.date_range(schedule, frequency='1D') actual = mcal.convert_freq(short, '3D') expected = pd.date_range('2017-09-05 20:00', '2017-10-23 20:00', freq='3D', tz='UTC') assert_index_equal(actual, expected) actual = mcal.convert_freq(short, '1W') expected = pd.date_range('2017-09-05 20:00', '2017-10-23 20:00', freq='1W', tz='UTC') assert_index_equal(actual, expected)
def test_2016_holidays(): # new years: jan 1 # mlk: jan 18 # presidents: feb 15 # good friday: mar 25 # mem day: may 30 # independence day: july 4 # labor day: sep 5 # thanksgiving day: nov 24 # christmas (observed): dec 26 # new years (observed): jan 2 2017 cfe = CFEExchangeCalendar() good_dates = cfe.valid_days('2016-01-01', '2016-12-31') for day in ["2016-01-01", "2016-01-18", "2016-02-15", "2016-03-25", "2016-05-30", "2016-07-04", "2016-09-05", "2016-11-24", "2016-12-26", "2017-01-02"]: assert pd.Timestamp(day, tz='UTC') not in good_dates
def test_special_holidays(): # 9/11 # Sept 11, 12, 13, 14 2001 nyse = NYSEExchangeCalendar() good_dates = nyse.valid_days('2001-01-01', '2016-12-31') assert pd.Timestamp("9/11/2001") not in good_dates assert pd.Timestamp("9/12/2001") not in good_dates assert pd.Timestamp("9/13/2001") not in good_dates assert pd.Timestamp("9/14/2001") not in good_dates # Hurricane Sandy # Oct 29, 30 2012 assert pd.Timestamp("10/29/2012") not in good_dates assert pd.Timestamp("10/30/2012") not in good_dates # various national days of mourning # Gerald Ford - 1/2/2007 assert pd.Timestamp("1/2/2007") not in good_dates # Ronald Reagan - 6/11/2004 assert pd.Timestamp("6/11/2004") not in good_dates # Richard Nixon - 4/27/1994 assert pd.Timestamp("4/27/1994") not in good_dates
def test_2016_early_closes(): # mlk day: 2016-01-18 # presidents: 2016-02-15 # mem day: 2016-05-30 # july 4: 2016-07-04 # labor day: 2016-09-05 # thanksgiving: 2016-11-24 cme = CMEExchangeCalendar() schedule = cme.schedule('2016-01-01', '2016-12-31') early_closes = cme.early_closes(schedule).index for date in ["2016-01-18", "2016-02-15", "2016-05-30", "2016-07-04", "2016-09-05", "2016-11-24"]: dt = pd.Timestamp(date, tz='UTC') assert dt in early_closes market_close = schedule.loc[dt].market_close assert market_close.tz_convert(cme.tz).hour == 12
def test_clean_dates(): start, end = clean_dates('2016-12-01', '2016-12-31') assert start == pd.Timestamp('2016-12-01') assert end == pd.Timestamp('2016-12-31') start, end = clean_dates('2016-12-01 12:00', '2016-12-31 12:00') assert start == pd.Timestamp('2016-12-01') assert end == pd.Timestamp('2016-12-31') start, end = clean_dates(pd.Timestamp('2016-12-01', tz='America/Chicago'), pd.Timestamp('2016-12-31', tz='America/New_York')) assert start == pd.Timestamp('2016-12-01') assert end == pd.Timestamp('2016-12-31') start, end = clean_dates(pd.Timestamp('2016-12-01 09:31', tz='America/Chicago'), pd.Timestamp('2016-12-31 16:00', tz='America/New_York')) assert start == pd.Timestamp('2016-12-01') assert end == pd.Timestamp('2016-12-31')
def test_schedule_w_times(): cal = FakeCalendar(time(12, 12), time(13, 13)) assert cal.open_time == time(12, 12) assert cal.close_time == time(13, 13) results = cal.schedule('2016-12-01', '2016-12-31') assert len(results) == 21 expected = pd.Series({'market_open': pd.Timestamp('2016-12-01 04:12:00+0000', tz='UTC', freq='B'), 'market_close': pd.Timestamp('2016-12-01 05:13:00+0000', tz='UTC', freq='B')}, name=pd.Timestamp('2016-12-01'), index=['market_open', 'market_close'], dtype=object) assert_series_equal(results.iloc[0], expected) expected = pd.Series({'market_open': pd.Timestamp('2016-12-30 04:12:00+0000', tz='UTC', freq='B'), 'market_close': pd.Timestamp('2016-12-30 05:13:00+0000', tz='UTC', freq='B')}, name=pd.Timestamp('2016-12-30'), index=['market_open', 'market_close'], dtype=object) assert_series_equal(results.iloc[-1], expected)
def test_special_closes(): cal = FakeCalendar() results = cal.schedule('2012-07-01', '2012-07-06') closes = results['market_close'].tolist() # confirm that the day before July 4th is an 11:30 close not 11:49 assert pd.Timestamp('2012-07-02 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes assert pd.Timestamp('2012-07-03 11:30', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes assert pd.Timestamp('2012-07-04 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes # early close first date results = cal.schedule('2012-07-03', '2012-07-04') actual = results['market_close'].tolist() expected = [pd.Timestamp('2012-07-03 11:30', tz='Asia/Ulaanbaatar').tz_convert('UTC'), pd.Timestamp('2012-07-04 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC')] assert actual == expected # early close last date results = cal.schedule('2012-07-02', '2012-07-03') actual = results['market_close'].tolist() expected = [pd.Timestamp('2012-07-02 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC'), pd.Timestamp('2012-07-03 11:30', tz='Asia/Ulaanbaatar').tz_convert('UTC')] assert actual == expected
def test_special_closes_adhoc(): cal = FakeCalendar() results = cal.schedule('2016-12-10', '2016-12-20') closes = results['market_close'].tolist() # confirm that 2016-12-14 is an 11:40 close not 11:49 assert pd.Timestamp('2016-12-13 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes assert pd.Timestamp('2016-12-14 11:40', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes assert pd.Timestamp('2016-12-15 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes # now with the early close as end date results = cal.schedule('2016-12-13', '2016-12-14') closes = results['market_close'].tolist() assert pd.Timestamp('2016-12-13 11:49', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes assert pd.Timestamp('2016-12-14 11:40', tz='Asia/Ulaanbaatar').tz_convert('UTC') in closes
def test_open_at_time(): cal = FakeCalendar() schedule = cal.schedule('2014-01-01', '2016-12-31') # regular trading day assert cal.open_at_time(schedule, pd.Timestamp('2014-07-02 03:40', tz='UTC')) is True # early close assert cal.open_at_time(schedule, pd.Timestamp('2014-07-03 03:40', tz='UTC')) is False # holiday assert cal.open_at_time(schedule, pd.Timestamp('2014-12-25 03:30', tz='UTC')) is False # last bar of the day defaults to False assert cal.open_at_time(schedule, pd.Timestamp('2016-09-07 11:49', tz='Asia/Ulaanbaatar')) is False # last bar of the day is True if include_close is True assert cal.open_at_time(schedule, pd.Timestamp('2016-09-07 11:49', tz='Asia/Ulaanbaatar'), include_close=True) is True
def test_2016_early_closes(): # 2016 early closes # mlk: 2016-01-18 # presidents: 2016-02-15 # mem day: 2016-05-30 # independence day: 2016-07-04 # labor: 2016-09-05 # thanksgiving: 2016-11-24 ice = ICEExchangeCalendar() schedule = ice.schedule('2016-01-01', '2016-12-31') early_closes = ice.early_closes(schedule) for date in ["2016-01-18", "2016-02-15", "2016-05-30", "2016-07-04", "2016-09-05", "2016-11-24"]: dt = pd.Timestamp(date, tz='UTC') assert dt in early_closes.index market_close = schedule.loc[dt].market_close # all ICE early closes are 1 pm local assert market_close.tz_convert(ice.tz).hour == 13
def future_chain(self, root_symbol, as_of_date=None): """ Look up a future chain with the specified parameters. Parameters ---------- root_symbol : str The root symbol of a future chain. as_of_date : datetime.datetime or pandas.Timestamp or str, optional Date at which the chain determination is rooted. I.e. the existing contract whose notice date is first after this date is the primary contract, etc. Returns ------- FutureChain The future chain matching the specified parameters. Raises ------ RootSymbolNotFound If a future chain could not be found for the given root symbol. """ if as_of_date: try: as_of_date = pd.Timestamp(as_of_date, tz='UTC') except ValueError: raise UnsupportedDatetimeFormat(input=as_of_date, method='future_chain') return FutureChain( asset_finder=self.asset_finder, get_datetime=self.get_datetime, root_symbol=root_symbol, as_of_date=as_of_date )
def set_symbol_lookup_date(self, dt): """ Set the date for which symbols will be resolved to their sids (symbols may map to different firms or underlying assets at different times) """ try: self._symbol_lookup_date = pd.Timestamp(dt, tz='UTC') except ValueError: raise UnsupportedDatetimeFormat(input=dt, method='set_symbol_lookup_date')