我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pandas.datetime()。
def test_yahoo_bars_to_panel_source(self): env = TradingEnvironment() finder = AssetFinder(env.engine) stocks = ['AAPL', 'GE'] env.write_data(equities_identifiers=stocks) start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = factory.load_bars_from_yahoo(stocks=stocks, indexes={}, start=start, end=end) check_fields = ['sid', 'open', 'high', 'low', 'close', 'volume', 'price'] copy_panel = data.copy() sids = finder.map_identifier_index_to_sids( data.items, data.major_axis[0] ) copy_panel.items = sids source = DataPanelSource(copy_panel) for event in source: for check_field in check_fields: self.assertIn(check_field, event) self.assertTrue(isinstance(event['volume'], (integer_types))) self.assertTrue(event['sid'] in sids)
def test_load_bars_from_yahoo(self): stocks = ['AAPL', 'GE'] start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = load_bars_from_yahoo(stocks=stocks, start=start, end=end) assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000') assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000') for stock in stocks: assert stock in data.items for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']: assert ohlc in data.minor_axis np.testing.assert_raises( AssertionError, load_bars_from_yahoo, stocks=stocks, start=end, end=start )
def __init__(self, year, seasons=None, holidays=None): if calendar.isleap(year): hoy = 8784 else: hoy = 8760 self.datapath = os.path.join(os.path.dirname(__file__), 'bdew_data') self.date_time_index = pd.date_range( pd.datetime(year, 1, 1, 0), periods=hoy * 4, freq='15Min') if seasons is None: self.seasons = { 'summer1': [5, 15, 9, 14], # summer: 15.05. to 14.09 'transition1': [3, 21, 5, 14], # transition1 :21.03. to 14.05 'transition2': [9, 15, 10, 31], # transition2 :15.09. to 31.10 'winter1': [1, 1, 3, 20], # winter1: 01.01. to 20.03 'winter2': [11, 1, 12, 31], # winter2: 01.11. to 31.12 } else: self.seasons = seasons self.year = year self.slp_frame = self.all_load_profiles(self.date_time_index, holidays=holidays)
def run_tick(self, event):#??????? tick = event.dict['data'] if self.live_trading: now_ticknum = get_tick_num(datetime.datetime.now()) cur_ticknum = get_tick_num(tick.timestamp) if abs(cur_ticknum - now_ticknum)> self.realtime_tick_diff: self.logger.warning('the tick timestamp has more than 10sec diff from the system time, inst=%s, ticknum= %s, now_ticknum=%s' % (tick.instID, cur_ticknum, now_ticknum)) if not self.update_instrument(tick): return inst = tick.instID if inst in self.inst2spread: for key in self.inst2spread[inst]: self.trade_manager.check_pending_trades(key) self.trade_manager.check_pending_trades(inst) self.update_min_bar(tick) if inst in self.inst2spread: for key in self.inst2spread[inst]: self.trade_manager.process_trades(key) self.trade_manager.process_trades(inst) gway = self.inst2gateway[inst] if gway.process_flag: gway.send_queued_orders()
def setUp(self): self.df = pd.DataFrame(np.random.randn(1000, 4), columns=['A', 'B', 'AdmitDTS', 'LastLoadDTS']) # generate load date self.df['LastLoadDTS'] = pd.datetime(2015, 5, 20) # generate datetime objects for admit date admit = pd.Series(1000) delta = pd.datetime(2015, 5, 20) - pd.datetime(2015, 5, 1) int_delta = (delta.days * 24 * 60 * 60) + delta.seconds for i in range(1000): random_second = randrange(int_delta) admit[i] = pd.datetime(2015, 5, 1) + timedelta(seconds=random_second) self.df['AdmitDTS'] = admit # add nulls a = np.random.rand(1000) > .5 self.df.loc[a, ['A']] = np.nan a = np.random.rand(1000) > .75 self.df.loc[a, ['B']] = np.nan
def test_write_metrics1(): filename = abspath(join(testdir, 'test_write_metrics1.csv')) if isfile(filename): os.remove(filename) metrics = pd.DataFrame({'metric1' : pd.Series([1.], index=[pd.datetime(2016,1,1)])}) pecos.io.write_metrics(filename, metrics) assert_true(isfile(filename)) from_file1 = pd.read_csv(filename) assert_equals(from_file1.shape, (1,2)) # append another date metrics = pd.DataFrame({'metric1' : pd.Series([2.], index=[pd.datetime(2016,1,2)])}) pecos.io.write_metrics(filename, metrics) from_file2 = pd.read_csv(filename) assert_equals(from_file2.shape, (2,2)) # append another metric metrics = pd.DataFrame({'metric2' : pd.Series([3.], index=[pd.datetime(2016,1,2)])}) pecos.io.write_metrics(filename, metrics) from_file3= pd.read_csv(filename) assert_equals(from_file3.shape, (2,3))
def fourier_series(dates, period, series_order): """Provides Fourier series components with the specified frequency and order. Parameters ---------- dates: pd.Series containing timestamps. period: Number of days of the period. series_order: Number of components. Returns ------- Matrix with seasonality features. """ # convert to days since epoch t = np.array( (dates - pd.datetime(1970, 1, 1)) .dt.total_seconds() .astype(np.float) ) / (3600 * 24.) return np.column_stack([ fun((2.0 * (i + 1) * np.pi * t / period)) for i in range(series_order) for fun in (np.sin, np.cos) ])
def test_incorrect_time_axis(): x = np.random.randn(3, 3, 1000) entities = ['entity.{0}'.format(i) for i in range(1000)] time = ['time.{0}'.format(i) for i in range(3)] var_names = ['var.{0}'.format(i) for i in range(3)] p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities) with pytest.raises(ValueError): PanelData(p) df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame() with pytest.raises(ValueError): PanelData(df) time = [1, pd.datetime(1960, 1, 1), 'a'] var_names = ['var.{0}'.format(i) for i in range(3)] p = pd.Panel(x, items=var_names, major_axis=time, minor_axis=entities) with pytest.raises(ValueError): PanelData(p) df = p.swapaxes(1, 2).swapaxes(0, 1).to_frame() with pytest.raises(ValueError): PanelData(df)
def read_fgga_txt(ifile) : fgga_dateparse = lambda x: pd.datetime.utcfromtimestamp(int(x)) fgga_names = ['identifier', 'packet_length', 'timestamp', 'ptp_sync', 'MFM', 'flight_num', 'CPU_Load', 'USB_disk_space', 'ch4', 'co2', 'h2o', 'press_torr', 'temp_c', 'fit_flag', 'rda_usec', 'rdb_usec', 'ch4_ppb', 'co2_ppm', 'MFC_1_absolute_pressure', 'MFC_1_temperature', 'MFC_1volumetic_flow', 'MFC_1mass_flow', 'MFC_1set_point', 'V1', 'V2', 'V3', 'V4', 'restart_FGGA', 'FGGA_Pump', 'CAL_MFC_1Set_Value'] df_fgga = pd.read_csv(ifile, names=fgga_names, delimiter=',', parse_dates=[2], date_parser=fgga_dateparse, skiprows=100) # To be sure to skip the header # Using the Valve states for flagging out calibration periods # TODO: add time buffer around calibration periods df_fgga.loc[df_fgga['V1'] != 0, 'ch4_ppb'] = np.nan df_fgga.loc[df_fgga['V2'] != 0, 'co2_ppm'] = np.nan df_fgga.loc[df_fgga['V2'] != 0, 'ch4_ppb'] = np.nan return df_fgga
def test_medians_for_gaps(self): df = pd.DataFrame({ 'date': list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))), 'value': range(6) }) df = df.iloc[[0, 2, 3, 4, 5]] guac = GuacMl(df, 'value') guac.make_time_series('date', prediction_length=1) medians = HistoricalMedians([3], guac.config, guac.logger) out = medians.execute(guac.data) self.assertTrue(np.isnan(out.df['value_median_3'].iloc[0])) self.assertEqual(out.df['value_median_3'].iloc[1], 0) self.assertEqual(out.df['value_median_3'].iloc[2], 1) self.assertEqual(out.df['value_median_3'].iloc[3], 2.5) self.assertEqual(out.df['value_median_3'].iloc[4], 3)
def test_medians_series_and_group_keys_simple(self): df = pd.DataFrame({ 'date': list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))) + list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 20))), 'series_key': ['a'] * 6 + ['b'] * 6, 'group_key': ['uneven', 'even'] * 6, 'value': range(12) }) guac = GuacMl(df, 'value') guac.make_time_series('date', prediction_length=1, series_key_cols='series_key') medians = HistoricalMedians([2], guac.config, guac.logger, group_keys='group_key') out = medians.execute(guac.data) out.df = out.df.sort_values(['series_key', 'group_key', 'date']) self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[0])) self.assertEqual(out.df['value_median_2_by_group_key'].iloc[1], 1) self.assertEqual(out.df['value_median_2_by_group_key'].iloc[2], 2) self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[3])) self.assertEqual(out.df['value_median_2_by_group_key'].iloc[4], 0) self.assertEqual(out.df['value_median_2_by_group_key'].iloc[5], 1) self.assertTrue(np.isnan(out.df['value_median_2_by_group_key'].iloc[6]))
def create_simulation_parameters(year=2006, start=None, end=None, capital_base=float("1.0e5"), num_days=None, data_frequency='daily', emission_rate='daily', env=None): if env is None: # Construct a complete environment with reasonable defaults env = TradingEnvironment(load=noop_load) if start is None: start = datetime(year, 1, 1, tzinfo=pytz.utc) if end is None: if num_days: start_index = env.trading_days.searchsorted(start) end = env.trading_days[start_index + num_days - 1] else: end = datetime(year, 12, 31, tzinfo=pytz.utc) sim_params = SimulationParameters( period_start=start, period_end=end, capital_base=capital_base, data_frequency=data_frequency, emission_rate=emission_rate, env=env, ) return sim_params
def create_txn(sid, price, amount, datetime): txn = Event({ 'sid': sid, 'amount': amount, 'dt': datetime, 'price': price, 'type': DATASOURCE_TYPE.TRANSACTION, 'source_id': 'MockTransactionSource' }) return txn
def create_commission(sid, value, datetime): txn = Event({ 'dt': datetime, 'type': DATASOURCE_TYPE.COMMISSION, 'cost': value, 'sid': sid, 'source_id': 'MockCommissionSource' }) return txn
def create_test_df_source(sim_params=None, env=None, bars='daily'): if bars == 'daily': freq = pd.datetools.BDay() elif bars == 'minute': freq = pd.datetools.Minute() else: raise ValueError('%s bars not understood.' % bars) if sim_params and bars == 'daily': index = sim_params.trading_days else: if env is None: env = TradingEnvironment(load=noop_load) start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) days = env.days_in_range(start, end) if bars == 'daily': index = days if bars == 'minute': index = pd.DatetimeIndex([], freq=freq) for day in days: day_index = env.market_minutes_for_day(day) index = index.append(day_index) x = np.arange(1, len(index) + 1) df = pd.DataFrame(x, index=index, columns=[0]) return DataFrameSource(df), df
def create_test_panel_source(sim_params=None, env=None, source_type=None): start = sim_params.first_open \ if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) end = sim_params.last_close \ if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) if env is None: env = TradingEnvironment(load=noop_load) index = env.days_in_range(start, end) price = np.arange(0, len(index)) volume = np.ones(len(index)) * 1000 arbitrary = np.ones(len(index)) df = pd.DataFrame({'price': price, 'volume': volume, 'arbitrary': arbitrary}, index=index) if source_type: df['type'] = source_type panel = pd.Panel.from_dict({0: df}) return DataPanelSource(panel), panel
def load_from_yahoo(indexes=None, stocks=None, start=None, end=None, adjusted=True): """ Loads price data from Yahoo into a dataframe for each of the indicated assets. By default, 'price' is taken from Yahoo's 'Adjusted Close', which removes the impact of splits and dividends. If the argument 'adjusted' is False, then the non-adjusted 'close' field is used instead. :param indexes: Financial indexes to load. :type indexes: dict :param stocks: Stock closing prices to load. :type stocks: list :param start: Retrieve prices from start date on. :type start: datetime :param end: Retrieve prices until end date. :type end: datetime :param adjusted: Adjust the price for splits and dividends. :type adjusted: bool """ data = _load_raw_yahoo_data(indexes, stocks, start, end) if adjusted: close_key = 'Adj Close' else: close_key = 'Close' df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)}) df.index = df.index.tz_localize(pytz.utc) return df
def setUp(self): self.sids = range(90) self.env = TradingEnvironment() self.env.write_data(equities_identifiers=self.sids) self.sim_params = factory.create_simulation_parameters( start=datetime(1990, 1, 1, tzinfo=pytz.utc), end=datetime(1990, 1, 8, tzinfo=pytz.utc), env=self.env, )
def setUp(self): setup_logger(self) start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) self.sim_params = factory.create_simulation_parameters( start=start, end=end, env=self.env, ) self.sim_params.emission_rate = 'daily' self.sim_params.data_frequency = 'minute' self.source, self.df = \ factory.create_test_df_source(sim_params=self.sim_params, env=self.env, bars='minute')
def setUp(self): setup_logger(self) self.sim_params = factory.create_simulation_parameters( start=datetime(1990, 1, 1, tzinfo=pytz.utc), end=datetime(1990, 1, 8, tzinfo=pytz.utc), env=self.env ) self.source, self.df = \ factory.create_test_df_source(self.sim_params, self.env)
def parse_veneer_date(self,txt): if hasattr(txt,'strftime'): return txt return pd.datetime.strptime(txt,'%m/%d/%Y %H:%M:%S')
def read_sdt(fn): ts = pd.read_table(fn,sep=' +',engine='python',names=['Year','Month','Day','Val']) ts['Date'] = ts.apply(lambda row: pd.datetime(int(row.Year),int(row.Month),int(row.Day)),axis=1) ts = ts.set_index(ts.Date) return ts.Val
def add_instrument(self, name): self.tick_data[name] = [] dtypes = [(field, dtype_map[field]) for field in day_data_list] self.day_data[name] = data_handler.DynamicRecArray(dtype = dtypes) dtypes = [(field, dtype_map[field]) for field in min_data_list] self.min_data[name] = {1: data_handler.DynamicRecArray(dtype = dtypes)} self.cur_day[name] = dict([(item, 0) for item in day_data_list]) self.cur_min[name] = dict([(item, 0) for item in min_data_list]) self.day_data_func[name] = [] self.min_data_func[name] = {} self.cur_min[name]['datetime'] = datetime.datetime.fromordinal(self.scur_day.toordinal()) self.cur_min[name]['date'] = self.scur_day self.cur_day[name]['date'] = self.scur_day
def mkt_data_sod(self, tday): for inst in self.instruments: self.tick_data[inst] = [] self.cur_min[inst] = dict([(item, 0) for item in min_data_list]) self.cur_day[inst] = dict([(item, 0) for item in day_data_list]) self.cur_day[inst]['date'] = tday self.cur_min[inst]['datetime'] = datetime.datetime.fromordinal(tday.toordinal())
def day_switch(self, event): newday = event.dict['date'] if newday <= self.scur_day: return self.logger.info('switching the trading day from %s to %s, reset tick_id=%s to 0' % (self.scur_day, newday, self.tick_id)) if not self.eod_flag: self.run_eod() self.scur_day = newday self.tick_id = 0 self.timer_count = 0 super(Agent, self).mkt_data_sod(newday) self.eod_flag = False eod_time = datetime.datetime.combine(newday, datetime.time(15, 20, 0)) self.put_command(eod_time, self.run_eod)
def update_instrument(self, tick): inst = tick.instID curr_tick = tick.tick_id if (self.instruments[inst].exchange == 'CZCE') and (self.instruments[inst].last_update == tick.tick_id) and \ ((self.instruments[inst].volume < tick.volume) or (self.instruments[inst].ask_vol1 != tick.askVol1) or \ (self.instruments[inst].bid_vol1 != tick.bidVol1)): if tick.tick_id % 10 < 5: tick.tick_id += 5 tick.timestamp = tick.timestamp + datetime.timedelta(milliseconds=500) self.tick_id = max(curr_tick, self.tick_id) self.instruments[inst].up_limit = tick.upLimit self.instruments[inst].down_limit = tick.downLimit tick.askPrice1 = min(tick.askPrice1, tick.upLimit) tick.bidPrice1 = max(tick.bidPrice1, tick.downLimit) self.instruments[inst].last_update = curr_tick self.instruments[inst].bid_price1 = tick.bidPrice1 self.instruments[inst].ask_price1 = tick.askPrice1 self.instruments[inst].mid_price = (tick.askPrice1 + tick.bidPrice1)/2.0 if (self.instruments[inst].mid_price > tick.upLimit) or (self.instruments[inst].mid_price < tick.downLimit): return False self.instruments[inst].bid_vol1 = tick.bidVol1 self.instruments[inst].ask_vol1 = tick.askVol1 self.instruments[inst].open_interest = tick.openInterest last_volume = self.instruments[inst].volume if tick.volume > last_volume: self.instruments[inst].price = tick.price self.instruments[inst].volume = tick.volume self.instruments[inst].last_traded = curr_tick if inst in self.inst2spread: for spd_key in self.inst2spread[inst]: self.spread_data[spd_key].update() return True
def run_gway_service(self, gway, service, args): if gway in self.gateways: gateway = self.gateways[gway] svc_func = service if hasattr(gateway, svc_func): ts = datetime.datetime.now() self.put_command(ts, getattr(gateway, svc_func), args) else: print "no such service = % for %s" % (service, gway) else: print "no such a gateway %s" % gway
def check_commands(self, event): l = len(self.sched_commands) curr_time = datetime.datetime.now() i = 0 while(i<l and curr_time >= self.sched_commands[i][0]): logging.info(u'exec command:,i=%s,time=%s,command[i][1]=%s' % (i, curr_time, self.sched_commands[i][1].__name__)) arg = self.sched_commands[i][2] self.sched_commands[i][1](**arg) i += 1 if i>0: del self.sched_commands[0:i]
def run_tick(self, event):#??????? tick = event.dict['data'] if self.live_trading: now_ticknum = get_tick_num(datetime.datetime.now()) cur_ticknum = get_tick_num(tick.timestamp) if abs(cur_ticknum - now_ticknum)> self.realtime_tick_diff: self.logger.warning('the tick timestamp has more than 10sec diff from the system time, inst=%s, ticknum= %s, now_ticknum=%s' % (tick.instID, cur_ticknum, now_ticknum)) if not self.update_instrument(tick): return self.update_min_bar(tick) inst = tick.instID for key in self.inst2spread[inst]: self.trade_manager.check_pending_trades(key) self.trade_manager.check_pending_trades(inst) self.trade_manager.process_trades()
def ctp_qry_instruments(self, event): dtime = datetime.datetime.now() min_id = get_min_id(dtime) if min_id < 250: gateway = self.type2gateway['CTP'] gateway.qry_commands.append(gateway.tdApi.qryInstrument)
def load_from_yahoo(indexes=None, stocks=None, start=None, end=None, adjusted=True): """ Loads price data from Yahoo into a dataframe for each of the indicated securities. By default, 'price' is taken from Yahoo's 'Adjusted Close', which removes the impact of splits and dividends. If the argument 'adjusted' is False, then the non-adjusted 'close' field is used instead. :param indexes: Financial indexes to load. :type indexes: dict :param stocks: Stock closing prices to load. :type stocks: list :param start: Retrieve prices from start date on. :type start: datetime :param end: Retrieve prices until end date. :type end: datetime :param adjusted: Adjust the price for splits and dividends. :type adjusted: bool """ import ipdb; ipdb.set_trace() # BREAKPOINT data = _load_raw_yahoo_data(indexes, stocks, start, end) if adjusted: close_key = 'Adj Close' else: close_key = 'Close' df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)}) df.index = df.index.tz_localize(pytz.utc) return df
def _chunk_to_dataframe(self): n = self._current_row_in_chunk_index m = self._current_row_in_file_index ix = range(m - n, m) rslt = pd.DataFrame(index=ix) js, jb = 0, 0 for j in range(self.column_count): name = self.column_names[j] if self.column_types[j] == b'd': rslt[name] = self._byte_chunk[jb, :].view( dtype=self.byte_order + 'd') rslt[name] = np.asarray(rslt[name], dtype=np.float64) if self.convert_dates and (self.column_formats[j] == "MMDDYY"): epoch = pd.datetime(1960, 1, 1) rslt[name] = epoch + pd.to_timedelta(rslt[name], unit='d') jb += 1 elif self.column_types[j] == b's': rslt[name] = self._string_chunk[js, :] rslt[name] = rslt[name].apply(lambda x: x.rstrip(b'\x00 ')) if self.encoding is not None: rslt[name] = rslt[name].apply( lambda x: x.decode(encoding=self.encoding)) if self.blank_missing: ii = rslt[name].str.len() == 0 rslt.loc[ii, name] = np.nan js += 1 else: raise ValueError("unknown column type %s" % self.column_types[j]) return rslt
def setUp(self): self.df = pd.DataFrame(np.random.randn(1000, 4), columns=['A', 'B', 'AdmitDTS', 'LastLoadDTS']) self.df['AdmitDTS'] = pd.datetime(2015, 5, 20)
def setUp(self): self.df = pd.DataFrame(np.random.randn(1000, 2), columns=['AdmitDTS', 'LastLoadDTS']) # generate load date self.df['LastLoadDTS'] = pd.datetime(2015, 5, 20) # generate datetime objects for admit date admit = pd.Series(1000) delta = pd.datetime(2015, 5, 20) - pd.datetime(2015, 5, 1) int_delta = (delta.days * 24 * 60 * 60) + delta.seconds for i in range(1000): random_second = randrange(int_delta) admit[i] = pd.datetime(2015, 5, 1) + timedelta( seconds=random_second) self.df['AdmitDTS'] = admit
def get_nyears_back(raw_data, back=1): """N??????""" all_periods = raw_data.index.get_level_values(0).unique() l=[] for period in all_periods: ly = pd.datetime(period.year-back, period.month, period.day) if ly in all_periods: data_ly = raw_data.loc[[ly]].copy() data_ly.index = pd.MultiIndex.from_product([[period], data_ly.index.get_level_values(1)], names=data_ly.index.names) l.append(data_ly) else: pass new_data = pd.concat(l) return new_data
def sort_dividend(divs): """ ?????????? :param divs: :return: """ if len(divs) > 0: df = pd.DataFrame(divs) df = df.sort_values(by='time') df.time = df.time.apply(lambda x: pd.datetime.utcfromtimestamp(x)) df = df.set_index('time') return df
def read_nox(ifile): _date = datetime.datetime.strptime(os.path.basename(ifile).split('_')[1], '%y%m%d') year = _date.year month = _date.month day = _date.day nox_dateparse = lambda x: pd.datetime(year, month, day) + \ datetime.timedelta(seconds=int(float(float(x) % 1)*86400.)) df_nox = pd.read_csv(ifile, parse_dates=[0], date_parser=nox_dateparse) df_nox = df_nox.set_index('TheTime') # Setting index t = df_nox.index.values df_nox['timestamp'] = t.astype('datetime64[s]') # Converting index data type df_nox = df_nox[['timestamp', 'no_conc', 'no2_conc', 'nox_conc']] df_nox[df_nox < 0] = np.nan return df_nox
def __init__(self, df, Q, st, end='', lab='', excs=[0, 0, 0], excf=[0, 0, 0]): self.ymd = [datetime.now().year, datetime.now().month, datetime.now().day] if end == '': end = self.ymd if lab == '': self.Qlab = 'Discharge' else: self.Qlab = lab self.Qz = df[Q][0] self.rec_results = self.recession(df, Q, st, end, excs, excf)
def test_analyze_frequency_happy(self): df = pd.DataFrame({ 'date': list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 18))) + list(pd.date_range(pd.datetime(2015, 6, 1, 1), pd.datetime(2015, 6, 4, 1))), 'series_key': ['a'] * 4 + ['b'] * 4 }) ts_config = {'date_split_col': 'date', 'series_key_cols': ['series_key']} frequency = analyze_frequency(df, ts_config) self.assertEqual(frequency, pd.Timedelta(days=1))
def test_analyse_frquency_exception(self): df = pd.DataFrame({ 'date': list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 18))) + list(pd.date_range(pd.datetime(2015, 6, 1, 1), pd.datetime(2015, 6, 4, 1))), 'series_key': ['a'] * 8 }) ts_config = {'date_split_col': 'date', 'series_key_cols': ['series_key']} with self.assertRaises(ValueError): analyze_frequency(df, ts_config)
def test_medians_no_series_keys(self): guac = test_util.load_dataset('bike_sharing', target='count') guac.make_time_series('datetime', prediction_length=1, frequency=pd.DateOffset(hours=1)) medians = HistoricalMedians([1], guac.config, guac.logger) out = medians.execute(guac.data) out.df = out.df.sort_values('datetime') self.assertTrue(np.isnan(out.df['count_median_1'].iloc[0])) self.assertAlmostEqual(out.df['count_median_1'].iloc[1], 16, delta=1)
def load_data(indexes=None,stockList=None,start=None,end=None,adjusted=True): """ load stocks from Mongo """ assert indexes is not None or stockList is not None, """ must specify stockList or indexes""" if start is None: start = "1990-01-01" if start is not None and end is not None: startdate = datetime.datetime.strptime(start, "%Y-%m-%d") enddate=datetime.datetime.strptime(end, "%Y-%m-%d") assert startdate < enddate, "start date is later than end date." data = OrderedDict() l=LoadDataCVS(constants.IP,constants.PORT) l.Conn() if stockList=="hs300" or stockList=="zz500" or stockList=="sz50" or stockList=="all": stocks=l.getstocklist(stockList) else: stocks=stockList #print stocks if stocks is not None: for stock in stocks: stkd= l.getstockdaily(stock,start,end) data[stock] = stkd if indexes is not None: for name, ticker in iteritems(indexes): logger.info('Loading index: {} ({})'.format(name, ticker)) stkd= l.getindexdaily(indexes,start,end) data[name] = stkd panel = pd.Panel(data) panel.minor_axis = ['open', 'high', 'low', 'close', 'volume', 'price','change','code'] panel.major_axis = panel.major_axis.tz_localize(pytz.utc) #close the connection l.Close() # Adjust data if adjusted: adj_cols = ['open', 'high', 'low', 'close'] for ticker in panel.items: ratio = (panel[ticker]['price'] / panel[ticker]['close']) ratio_filtered = ratio.fillna(0).values for col in adj_cols: panel[ticker][col] *= ratio_filtered return panel
def _load_raw_yahoo_data(indexes=None, stocks=None, start=None, end=None): """Load closing prices from yahoo finance. :Optional: indexes : dict (Default: {'SPX': '^GSPC'}) Financial indexes to load. stocks : list (Default: ['AAPL', 'GE', 'IBM', 'MSFT', 'XOM', 'AA', 'JNJ', 'PEP', 'KO']) Stock closing prices to load. start : datetime (Default: datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)) Retrieve prices from start date on. end : datetime (Default: datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)) Retrieve prices until end date. :Note: This is based on code presented in a talk by Wes McKinney: http://wesmckinney.com/files/20111017/notebook_output.pdf """ assert indexes is not None or stocks is not None, """ must specify stocks or indexes""" if start is None: start = pd.datetime(1990, 1, 1, 0, 0, 0, 0, pytz.utc) if start is not None and end is not None: assert start < end, "start date is later than end date." data = OrderedDict() if stocks is not None: for stock in stocks: logger.info('Loading stock: {}'.format(stock)) stock_pathsafe = stock.replace(os.path.sep, '--') cache_filename = "{stock}-{start}-{end}.csv".format( stock=stock_pathsafe, start=start, end=end).replace(':', '-') cache_filepath = get_cache_filepath(cache_filename) if os.path.exists(cache_filepath): stkd = pd.DataFrame.from_csv(cache_filepath) else: stkd = DataReader(stock, 'yahoo', start, end).sort_index() stkd.to_csv(cache_filepath) data[stock] = stkd if indexes is not None: for name, ticker in iteritems(indexes): logger.info('Loading index: {} ({})'.format(name, ticker)) stkd = DataReader(ticker, 'yahoo', start, end).sort_index() data[name] = stkd return data
def load_bars_from_yahoo(indexes=None, stocks=None, start=None, end=None, adjusted=True): """ Loads data from Yahoo into a panel with the following column names for each indicated security: - open - high - low - close - volume - price Note that 'price' is Yahoo's 'Adjusted Close', which removes the impact of splits and dividends. If the argument 'adjusted' is True, then the open, high, low, and close values are adjusted as well. :param indexes: Financial indexes to load. :type indexes: dict :param stocks: Stock closing prices to load. :type stocks: list :param start: Retrieve prices from start date on. :type start: datetime :param end: Retrieve prices until end date. :type end: datetime :param adjusted: Adjust open/high/low/close for splits and dividends. The 'price' field is always adjusted. :type adjusted: bool """ data = _load_raw_yahoo_data(indexes, stocks, start, end) panel = pd.Panel(data) # Rename columns panel.minor_axis = ['open', 'high', 'low', 'close', 'volume', 'price'] panel.major_axis = panel.major_axis.tz_localize(pytz.utc) # Adjust data if adjusted: adj_cols = ['open', 'high', 'low', 'close'] for ticker in panel.items: ratio = (panel[ticker]['price'] / panel[ticker]['close']) ratio_filtered = ratio.fillna(0).values for col in adj_cols: panel[ticker][col] *= ratio_filtered return panel
def create_bdew_load_profiles(self, dt_index, slp_types, holidays=None): """Calculates the hourly electricity load profile in MWh/h of a region. """ # define file path of slp csv data file_path = os.path.join(self.datapath, 'selp_series.csv') # Read standard load profile series from csv file selp_series = pd.read_csv(file_path) tmp_df = selp_series # Create an index to merge. The year and month will be ignored only the # time index is necessary. index = pd.date_range( pd.datetime(2007, 1, 1, 0), periods=2016, freq='15Min') tmp_df.set_index(index, inplace=True) # Create empty DataFrame to take the results. new_df = pd.DataFrame(index=dt_index, columns=slp_types).fillna(0) new_df = add_weekdays2df(new_df, holidays=holidays, holiday_is_sunday=True) new_df['hour'] = dt_index.hour + 1 new_df['minute'] = dt_index.minute time_df = new_df[['date', 'hour', 'minute', 'weekday']].copy() tmp_df[slp_types] = tmp_df[slp_types].astype(float) # Inner join the slps on the time_df to the slp's for a whole year tmp_df['hour_of_day'] = tmp_df.index.hour + 1 tmp_df['minute_of_hour'] = tmp_df.index.minute left_cols = ['hour_of_day', 'minute_of_hour', 'weekday'] right_cols = ['hour', 'minute', 'weekday'] tmp_df = tmp_df.reset_index() tmp_df.pop('index') for p in self.seasons.keys(): a = pd.datetime(self.year, self.seasons[p][0], self.seasons[p][1], 0, 0) b = pd.datetime(self.year, self.seasons[p][2], self.seasons[p][3], 23, 59) new_df.update(pd.DataFrame.merge( tmp_df[tmp_df['period'] == p[:-1]], time_df[a:b], left_on=left_cols, right_on=right_cols, how='inner', left_index=True).sort_index().drop( ['hour_of_day'], 1)) new_df.drop('date', axis=1, inplace=True) return new_df.div(new_df.sum(axis=0), axis=1)
def _load_raw_yahoo_data(indexes=None, stocks=None, start=None, end=None): """Load closing prices from yahoo finance. :Optional: indexes : dict (Default: {'SPX': '^SPY'}) Financial indexes to load. stocks : list (Default: ['AAPL', 'GE', 'IBM', 'MSFT', 'XOM', 'AA', 'JNJ', 'PEP', 'KO']) Stock closing prices to load. start : datetime (Default: datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)) Retrieve prices from start date on. end : datetime (Default: datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)) Retrieve prices until end date. :Note: This is based on code presented in a talk by Wes McKinney: http://wesmckinney.com/files/20111017/notebook_output.pdf """ assert indexes is not None or stocks is not None, """ must specify stocks or indexes""" if start is None: start = pd.datetime(1990, 1, 1, 0, 0, 0, 0, pytz.utc) if start is not None and end is not None: assert start < end, "start date is later than end date." data = OrderedDict() if stocks is not None: for stock in stocks: logger.info('Loading stock: {}'.format(stock)) stock_pathsafe = stock.replace(os.path.sep, '--') cache_filename = "{stock}-{start}-{end}.csv".format( stock=stock_pathsafe, start=start, end=end).replace(':', '-') cache_filepath = get_cache_filepath(cache_filename) if os.path.exists(cache_filepath): stkd = pd.DataFrame.from_csv(cache_filepath) else: stkd = DataReader(stock, 'yahoo', start, end).sort_index() stkd.to_csv(cache_filepath) data[stock] = stkd if indexes is not None: for name, ticker in iteritems(indexes): logger.info('Loading index: {} ({})'.format(name, ticker)) stkd = DataReader(ticker, 'yahoo', start, end).sort_index() data[name] = stkd return data