我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用pandas.DateOffset()。
def round_timestamp_to_sleep_date(timeseries): """ Not my proudest function ... this isn't as efficient as it could be, but struggling with some pandas syntax to find the perfect pandas one-line This can be much more performant, but need time to sit down and figure it out """ sleep_dates = [] for value in timeseries: if value.hour < SLEEP_CUTOFF_TIME: result = value - pd.DateOffset(days=1) else: result = value sleep_dates.append(result) index = pd.DatetimeIndex(sleep_dates) return index
def bug_timeseries2seqs(data, timestamps, length=3, T=48): # have a bug if type(timestamps[0]) != pd.Timestamp: timestamps = string2timestamp(timestamps, T=T) offset = pd.DateOffset(minutes=24 * 60 // T) breakpoints = [0] for i in range(1, len(timestamps)): if timestamps[i-1] + offset != timestamps[i]: breakpoints.append(i) X = [] Y = [] for b in range(1, len(breakpoints)): print('breakpoints: ', breakpoints[b-1], breakpoints[b]) idx = range(breakpoints[b-1], breakpoints[b]) for i in range(len(idx) - 3): x = np.vstack(data[idx[i:i+3]]) y = data[idx[i+3]] X.append(x) Y.append(y) X = np.asarray(X) Y = np.asarray(Y) print("X shape: ", X.shape, "Y shape:", Y.shape) return X, Y
def periods(today=None): """ Construct a series of Period objects :param today: If not specified use today's date. Specifying a today is quite useful in unit tests. :return: """ today = today or pd.Timestamp("today") def __f(offset, today): return Period(start=today - offset, end=today) offset = pd.Series() offset["Two weeks"] = pd.DateOffset(weeks=2) offset["Month-to-Date"] = pd.offsets.MonthBegin() offset["Year-to-Date"] = pd.offsets.YearBegin() offset["One Month"] = pd.DateOffset(months=1) offset["Three Months"] = pd.DateOffset(months=3) offset["One Year"] = pd.DateOffset(years=1) offset["Three Years"] = pd.DateOffset(years=3) offset["Five Years"] = pd.DateOffset(years=5) offset["Ten Years"] = pd.DateOffset(years=10) return offset.apply(__f, today=today)
def plotDailyStatsSleep(stats, columns=None): """ Plot daily stats. Fill all data range, and put NaN for days without measures :param data: data to plot """ MEASURE_NAME = 'date' if not columns: columns = ['sleep_inefficiency', 'sleep_hours'] dataToPlot = _prepareDailyStats(stats, columns) f, axes = getAxes(2,1) xTicksDiv = min(10, len(dataToPlot)) #xticks = [(x-pd.DateOffset(years=1, day=2)).date() for x in stats.date] xticks = [x.date() for x in dataToPlot.date] keptticks = xticks[::int(len(xticks)/xTicksDiv)] xticks = ['' for _ in xticks] xticks[::int(len(xticks)/xTicksDiv)] = keptticks for i, c in enumerate(columns): g =sns.pointplot(x=MEASURE_NAME, y=NAMES[c], data=dataToPlot, ax=axes[i]) g.set_xticklabels([]) g.set_xlabel('') g.set_xticklabels(xticks, rotation=45) sns.plt.show()
def get_sim_index(self, tmin, tmax, freq, warmup): """Method to get the indices for the simulation, including the warmup period. Parameters ---------- tmin tmax freq warmup Returns ------- """ sim_index = pd.date_range(tmin - pd.DateOffset(days=warmup), tmax, freq=freq) return sim_index
def fetch_data(self, state): query = QUERY % {'state': NAMES_TO_CODES[state], 'year': YEAR} dataframe = run_query(query, cache_key='temperature-%s' % NAMES_TO_CODES[state]) dataframe['date'] = pd.to_datetime(dataframe[['year', 'month', 'day']]) dataframe['date_readable'] = dataframe['date'].apply(lambda x: x.strftime("%Y-%m-%d")) dataframe['left'] = dataframe.date - pd.DateOffset(days=0.5) dataframe['right'] = dataframe.date + pd.DateOffset(days=0.5) dataframe = dataframe.set_index(['date']) dataframe.sort_index(inplace=True) return dataframe
def set_time_to_maturity(self, time_to_maturity_in_days): if self.get_evaluation_date() is None: self.set_evaluation_date(Timestamp(datetime.now().strftime('%Y-%m-%d'))) self.set_maturity_date(self.get_evaluation_date() + DateOffset(days=time_to_maturity_in_days)) return self
def _is_offset(self, arr_or_obj): """ check if obj or all elements of list-like is DateOffset """ if isinstance(arr_or_obj, pd.DateOffset): return True elif is_list_like(arr_or_obj): return all(isinstance(x, pd.DateOffset) for x in arr_or_obj) else: return False
def _reference_dates(self, start_date, end_date): """ Get reference dates for the holiday. Return reference dates for the holiday also returning the year prior to the start_date and year following the end_date. This ensures that any offsets to be applied will yield the holidays within the passed in dates. """ if self.start_date is not None: start_date = self.start_date.tz_localize(start_date.tz) if self.end_date is not None: end_date = self.end_date.tz_localize(start_date.tz) year_offset = DateOffset(years=1) reference_start_date = Timestamp( datetime(start_date.year - 1, self.month, self.day)) reference_end_date = Timestamp( datetime(end_date.year + 1, self.month, self.day)) # Don't process unnecessary holidays dates = DatetimeIndex(start=reference_start_date, end=reference_end_date, freq=year_offset, tz=start_date.tz) return dates
def test_catch_infinite_loop(self): offset = datetools.DateOffset(minute=5) # blow up, don't loop forever self.assertRaises(Exception, date_range, datetime(2011, 11, 11), datetime(2011, 11, 12), freq=offset)
def test_series_interpolate_intraday(self): # #1698 index = pd.date_range('1/1/2012', periods=4, freq='12D') ts = pd.Series([0, 12, 24, 36], index) new_index = index.append(index + pd.DateOffset(days=1)).sort_values() exp = ts.reindex(new_index).interpolate(method='time') index = pd.date_range('1/1/2012', periods=4, freq='12H') ts = pd.Series([0, 12, 24, 36], index) new_index = index.append(index + pd.DateOffset(hours=1)).sort_values() result = ts.reindex(new_index).interpolate(method='time') self.assert_numpy_array_equal(result.values, exp.values)
def test_intersection_bug_1708(self): from pandas import DateOffset index_1 = date_range('1/1/2012', periods=4, freq='12H') index_2 = index_1 + DateOffset(hours=1) result = index_1 & index_2 self.assertEqual(len(result), 0)
def check_complete(self): missing_timestamps = [] offset = pd.DateOffset(minutes=24 * 60 // self.T) pd_timestamps = self.pd_timestamps i = 1 while i < len(pd_timestamps): if pd_timestamps[i-1] + offset != pd_timestamps[i]: missing_timestamps.append("(%s -- %s)" % (pd_timestamps[i-1], pd_timestamps[i])) i += 1 for v in missing_timestamps: print(v) assert len(missing_timestamps) == 0
def timeseries2seqs(data, timestamps, length=3, T=48): raw_ts = copy(timestamps) if type(timestamps[0]) != pd.Timestamp: timestamps = string2timestamp(timestamps, T=T) offset = pd.DateOffset(minutes=24 * 60 // T) breakpoints = [0] for i in range(1, len(timestamps)): if timestamps[i-1] + offset != timestamps[i]: print(timestamps[i-1], timestamps[i], raw_ts[i-1], raw_ts[i]) breakpoints.append(i) breakpoints.append(len(timestamps)) X = [] Y = [] for b in range(1, len(breakpoints)): print('breakpoints: ', breakpoints[b-1], breakpoints[b]) idx = range(breakpoints[b-1], breakpoints[b]) for i in range(len(idx) - length): x = np.vstack(data[idx[i:i+length]]) y = data[idx[i+length]] X.append(x) Y.append(y) X = np.asarray(X) Y = np.asarray(Y) print("X shape: ", X.shape, "Y shape:", Y.shape) return X, Y
def timeseries2seqs_meta(data, timestamps, length=3, T=48): raw_ts = copy(timestamps) if type(timestamps[0]) != pd.Timestamp: timestamps = string2timestamp(timestamps, T=T) offset = pd.DateOffset(minutes=24 * 60 // T) breakpoints = [0] for i in range(1, len(timestamps)): if timestamps[i-1] + offset != timestamps[i]: print(timestamps[i-1], timestamps[i], raw_ts[i-1], raw_ts[i]) breakpoints.append(i) breakpoints.append(len(timestamps)) X = [] Y = [] avail_timestamps = [] for b in range(1, len(breakpoints)): print('breakpoints: ', breakpoints[b-1], breakpoints[b]) idx = range(breakpoints[b-1], breakpoints[b]) for i in range(len(idx) - length): avail_timestamps.append(raw_ts[idx[i+length]]) x = np.vstack(data[idx[i:i+length]]) y = data[idx[i+length]] X.append(x) Y.append(y) X = np.asarray(X) Y = np.asarray(Y) print("X shape: ", X.shape, "Y shape:", Y.shape) return X, Y, avail_timestamps
def calmar_ratio(self, periods=None, r_f=0): periods = periods or self.__periods_per_year start = self.index[-1] - pd.DateOffset(years=3) # truncate the nav x = self.truncate(before=start) return NavSeries(x).sortino_ratio(periods=periods, r_f=r_f)
def getSleepDate(data): firstDatetime = data.ix[0]['datetime'] if time(12,00) <= firstDatetime.time()<=time(23,59): return firstDatetime.date() else: return (firstDatetime - pd.DateOffset(1)).date()
def period_by_hours(x, separation): ''' aggrege le x par intervale d'heure. Le calcul pourrait être simple si on interdisait le chevauchement de jour. ''' print(separation) assert isinstance(separation, list) assert all([sep < 24 for sep in separation]) separation.sort() if 0 in separation: separation.append(24) hour_categ = pd.cut(x.dt.hour, separation, right=False) date_categ = x.dt.date return date_categ.astype(str) + ' ' + hour_categ.astype(str) else: hour = x.dt.hour hour_categ = pd.cut(hour, separation, right=False).astype(str) night_categ = '[' + str(separation[-1]) + ', ' + str(separation[0]) + ')' hour_categ[(hour < separation[0]) | (hour >= separation[-1])] = night_categ assert hour_categ.nunique(dropna=False) == len(separation) date_categ = x.dt.date.astype(str) # décalage d'un jour pour les premières heures decale = x.dt.date[x.dt.hour < separation[1]] + pd.DateOffset(days=-1) date_categ[x.dt.hour < separation[1]] = decale.astype(str) assert all(date_categ.str.len() == 10) return date_categ + ' ' + hour_categ ### 4 - special
def test_medians_no_series_keys(self): guac = test_util.load_dataset('bike_sharing', target='count') guac.make_time_series('datetime', prediction_length=1, frequency=pd.DateOffset(hours=1)) medians = HistoricalMedians([1], guac.config, guac.logger) out = medians.execute(guac.data) out.df = out.df.sort_values('datetime') self.assertTrue(np.isnan(out.df['count_median_1'].iloc[0])) self.assertAlmostEqual(out.df['count_median_1'].iloc[1], 16, delta=1)
def shift_dates(self,h): """ Auxiliary function for creating dates for forecasts Parameters ---------- h : int How many steps to forecast Returns ---------- A transformed date_index object """ date_index = copy.deepcopy(self.index) date_index = date_index[self.max_lag:len(date_index)] if self.is_pandas is True: if isinstance(date_index, pd.core.indexes.datetimes.DatetimeIndex): if pd.infer_freq(date_index) in ['H', 'M', 'S']: for t in range(h): date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).seconds) else: # Assume higher frequency (configured for days) for t in range(h): date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).days) elif isinstance(date_index, pd.core.indexes.numeric.Int64Index): for i in range(h): new_value = date_index.values[len(date_index.values)-1] + (date_index.values[len(date_index.values)-1] - date_index.values[len(date_index.values)-2]) date_index = pd.Int64Index(np.append(date_index.values,new_value)) else: for t in range(h): date_index.append(date_index[len(date_index)-1]+1) return date_index
def getDatasForOneRouteForOneDepartureDate(route, departureDate): X = getOneRouteData(datas, route) minDeparture = np.amin(X[:,8]) maxDeparture = np.amax(X[:,8]) print minDeparture print maxDeparture # get specific departure date datas X = X[np.where(X[:, 8]==departureDate)[0], :] # get the x values xaxis = X[:,9] # observed date state print xaxis xaxis = departureDate-1-xaxis print xaxis tmp = xaxis startdate = "20151109" xaxis = [pd.to_datetime(startdate) + pd.DateOffset(days=state) for state in tmp] print xaxis # get the y values yaxis = X[:,12] # every monday mondays = WeekdayLocator(MONDAY) # every 3rd month months = MonthLocator(range(1, 13), bymonthday=1, interval=01) days = WeekdayLocator(byweekday=1, interval=2) monthsFmt = DateFormatter("%b. %d, %Y") fig, ax = plt.subplots() ax.plot_date(xaxis, yaxis, 'r--') ax.plot_date(xaxis, yaxis, 'bo') ax.xaxis.set_major_locator(days) ax.xaxis.set_major_formatter(monthsFmt) #ax.xaxis.set_minor_locator(mondays) ax.autoscale_view() #ax.xaxis.grid(False, 'major') #ax.xaxis.grid(True, 'minor') ax.grid(True) plt.xlabel('Date') plt.ylabel('Price in Euro') fig.autofmt_xdate() plt.show() """ # plot line1, = plt.plot(xaxis, yaxis, 'r--') line2, = plt.plot(xaxis, yaxis, 'bo') #plt.legend([line2], ["Price"]) plt.xlabel('States') plt.ylabel('Price in Euro') plt.show() """
def __init__(self, name, year=None, month=None, day=None, offset=None, observance=None, start_date=None, end_date=None, days_of_week=None): """ Parameters ---------- name : str Name of the holiday , defaults to class name offset : array of pandas.tseries.offsets or class from pandas.tseries.offsets computes offset from date observance: function computes when holiday is given a pandas Timestamp days_of_week: provide a tuple of days e.g (0,1,2,3,) for Monday Through Thursday Monday=0,..,Sunday=6 Examples -------- >>> from pandas.tseries.holiday import Holiday, nearest_workday >>> from pandas import DateOffset >>> from dateutil.relativedelta import MO >>> USMemorialDay = Holiday('MemorialDay', month=5, day=24, offset=DateOffset(weekday=MO(1))) >>> USLaborDay = Holiday('Labor Day', month=9, day=1, offset=DateOffset(weekday=MO(1))) >>> July3rd = Holiday('July 3rd', month=7, day=3,) >>> NewYears = Holiday('New Years Day', month=1, day=1, observance=nearest_workday), >>> July3rd = Holiday('July 3rd', month=7, day=3, days_of_week=(0, 1, 2, 3)) """ if offset is not None and observance is not None: raise NotImplementedError("Cannot use both offset and observance.") self.name = name self.year = year self.month = month self.day = day self.offset = offset self.start_date = Timestamp( start_date) if start_date is not None else start_date self.end_date = Timestamp( end_date) if end_date is not None else end_date self.observance = observance assert (days_of_week is None or type(days_of_week) == tuple) self.days_of_week = days_of_week
def create_dataset(self, len_closeness=3, len_trend=3, TrendInterval=7, len_period=3, PeriodInterval=1): """current version """ # offset_week = pd.DateOffset(days=7) offset_frame = pd.DateOffset(minutes=24 * 60 // self.T) XC = [] XP = [] XT = [] Y = [] timestamps_Y = [] depends = [range(1, len_closeness+1), [PeriodInterval * self.T * j for j in range(1, len_period+1)], [TrendInterval * self.T * j for j in range(1, len_trend+1)]] i = max(self.T * TrendInterval * len_trend, self.T * PeriodInterval * len_period, len_closeness) while i < len(self.pd_timestamps): Flag = True for depend in depends: if Flag is False: break Flag = self.check_it([self.pd_timestamps[i] - j * offset_frame for j in depend]) if Flag is False: i += 1 continue x_c = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[0]] x_p = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[1]] x_t = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[2]] y = self.get_matrix(self.pd_timestamps[i]) if len_closeness > 0: XC.append(np.vstack(x_c)) if len_period > 0: XP.append(np.vstack(x_p)) if len_trend > 0: XT.append(np.vstack(x_t)) Y.append(y) timestamps_Y.append(self.timestamps[i]) i += 1 XC = np.asarray(XC) XP = np.asarray(XP) XT = np.asarray(XT) Y = np.asarray(Y) print("XC shape: ", XC.shape, "XP shape: ", XP.shape, "XT shape: ", XT.shape, "Y shape:", Y.shape) return XC, XP, XT, Y, timestamps_Y
def timeseries2seqs_peroid_trend(data, timestamps, length=3, T=48, peroid=pd.DateOffset(days=7), peroid_len=2): raw_ts = copy(timestamps) if type(timestamps[0]) != pd.Timestamp: timestamps = string2timestamp(timestamps, T=T) # timestamps index timestamp_idx = dict() for i, t in enumerate(timestamps): timestamp_idx[t] = i offset = pd.DateOffset(minutes=24 * 60 // T) breakpoints = [0] for i in range(1, len(timestamps)): if timestamps[i-1] + offset != timestamps[i]: print(timestamps[i-1], timestamps[i], raw_ts[i-1], raw_ts[i]) breakpoints.append(i) breakpoints.append(len(timestamps)) X = [] Y = [] for b in range(1, len(breakpoints)): print('breakpoints: ', breakpoints[b-1], breakpoints[b]) idx = range(breakpoints[b-1], breakpoints[b]) for i in range(len(idx) - length): # period target_timestamp = timestamps[i+length] legal_idx = [] for pi in range(1, 1+peroid_len): if target_timestamp - peroid * pi not in timestamp_idx: break legal_idx.append(timestamp_idx[target_timestamp - peroid * pi]) # print("len: ", len(legal_idx), peroid_len) if len(legal_idx) != peroid_len: continue legal_idx += idx[i:i+length] # trend x = np.vstack(data[legal_idx]) y = data[idx[i+length]] X.append(x) Y.append(y) X = np.asarray(X) Y = np.asarray(Y) print("X shape: ", X.shape, "Y shape:", Y.shape) return X, Y
def Blue_Green(Name_NC_ET, Name_NC_P, Name_NC_ETref, Startdate, Enddate, Additional_Months): """ This functions split the evapotranspiration into green and blue evapotranspiration. Parameters ---------- Dir_Basin : str Path to all the output data of the Basin Name_NC_ET : str Path to the .nc file containing ET data Name_NC_P : str Path to the .nc file containing P data (including moving average period) Name_NC_ETref : str Path to the .nc file containing ETref data (including moving average period) Moving_Averaging_Length: integer Number defines the amount of months that are taken into account Returns ------- ET_Blue : array Array[time, lat, lon] contains Blue Evapotranspiration ET_Green : array Array[time, lat, lon] contains Green Evapotranspiration """ import wa.General.raster_conversions as RC # Define startdate and enddate with moving average Startdate_Moving_Average = pd.Timestamp(Startdate) - pd.DateOffset(months = Additional_Months) Enddate_Moving_Average = pd.Timestamp(Enddate) + pd.DateOffset(months = Additional_Months) Startdate_Moving_Average_String = '%d-%02d-%02d' %(Startdate_Moving_Average.year, Startdate_Moving_Average.month, Startdate_Moving_Average.day) Enddate_Moving_Average_String = '%d-%02d-%02d' %(Enddate_Moving_Average.year, Enddate_Moving_Average.month, Enddate_Moving_Average.day) # Extract ETref data from NetCDF file ETref = RC.Open_nc_array(Name_NC_ETref, Startdate = Startdate_Moving_Average_String, Enddate = Enddate_Moving_Average_String) # Extract P data from NetCDF file P = RC.Open_nc_array(Name_NC_P, Startdate = Startdate_Moving_Average_String, Enddate = Enddate_Moving_Average_String) # Extract ET data from NetCDF file ET = RC.Open_nc_array(Name_NC_ET, Startdate = Startdate, Enddate = Enddate) # Apply moving average over 3 months Pavg = RC.Moving_average(P, Additional_Months, Additional_Months) ETrefavg = RC.Moving_average(ETref, Additional_Months, Additional_Months) # Calculate aridity index Pavg[Pavg == 0] = 0.0001 phi = ETrefavg/Pavg # Calculate Budyko Budyko = Calc_budyko(phi) # Calculate ETgreen ETgreen = np.minimum(Budyko * P[Additional_Months:-Additional_Months,:,:], ET) # Calculate ETblue ETblue = ET - ETgreen return(ETblue, ETgreen)