我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用dateutil.relativedelta.relativedelta()。
def rise_rate(df): date1_2 = df[record_date].map(lambda x: str2time(x)).max() date1_1 = datetime.datetime(date1_2.year, date1_2.month, 1).date() grouped1 = DataView(df).filter_by_record_date2(date1_1, date1_2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() from dateutil.relativedelta import relativedelta date2_1 = date1_1 - relativedelta(months=+1) date2_2 = date1_2 - relativedelta(months=+1) grouped2 = DataView(df).filter_by_record_date2(date2_1, date2_2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() print(date1_1,date1_2, date2_1, date2_2) print(grouped1) print(grouped2) user_rise_rate = pd.Series(map(lambda x, y: float(x - y) / y, grouped1[power_consumption], grouped2[power_consumption])) user_rise_rate.name = 'user_rise_rate' return grouped1[[user_id]].join(user_rise_rate) # ?????
def get_evaluations(cls, start_date, end_date, dx): """ Return evaluation info """ Evaluation = Pool().get('gnuhealth.patient.evaluation') start_date = datetime.strptime(str(start_date), '%Y-%m-%d') end_date = datetime.strptime(str(end_date), '%Y-%m-%d') end_date += relativedelta(hours=+23,minutes=+59,seconds=+59) clause = [ ('evaluation_start', '>=', start_date), ('evaluation_start', '<=', end_date), ] if dx: clause.append(('diagnosis', '=', dx)) res = Evaluation.search(clause) return(res)
def eval(dct, context): return datetime.datetime.now() + relativedelta( year=dct['y'], month=dct['M'], day=dct['d'], hour=dct['h'], minute=dct['m'], second=dct['s'], microsecond=dct['ms'], years=dct['dy'], months=dct['dM'], days=dct['dd'], hours=dct['dh'], minutes=dct['dm'], seconds=dct['ds'], microseconds=dct['dms'], )
def timedur_to_reldelta(timedur: str) -> relativedelta: tdur = timedur_standardize(timedur) t_num = re.findall('\d+', tdur)[0] t_unit = tdur[-1] unit_delta = { 's': relativedelta(seconds=int(t_num)), 'm': relativedelta(minutes=int(t_num)), 'h': relativedelta(hours=int(t_num)), 'd': relativedelta(days=int(t_num)), 'W': relativedelta(weeks=int(t_num)), 'M': relativedelta(months=int(t_num)), 'Y': relativedelta(years=int(t_num)), } try: reldelta = unit_delta[t_unit] return reldelta except KeyError: raise KeyError('Support only s, m, h, d, W, M, Y.')
def test_for_date_range(summaries, make_one, datetime): """Ensure we can find many summaries from the past.""" week_ago = datetime.date.today() - relativedelta(days=7) s1 = make_one(created_on=week_ago) s2 = make_one() s3 = make_one(filter_id=111) s2, result = summaries.store(s2) s3, result = summaries.store(s3) s1, result = summaries.store(s1) past_summaries = summaries.for_date_range(filter_id=s2.filter_id, start_date=week_ago) assert 2 == len(past_summaries) assert past_summaries[0] == s1 assert past_summaries[1] == s2
def check_and_clear(): Session = scoped_session(sessionmaker(bind=dbengine)) dbsession = Session() log.info("db check_and_clear started with options {} {}".format(MAX_INST,DBTTL)) oldtime = datetime.datetime.today() - timedelta.relativedelta(days=DBTTL) log.info("looking for unregistered records older than {}".format(oldtime)) inst = dbsession.query(Instance).filter( Instance.state != 'added', Instance.created_at <= oldtime ).delete() try: dbsession.commit() log.info("{} records removed from db".format(len(inst))) except Exception as e: log.error(repr(e)) dbsession.rollback() dbsession.bind.dispose() Session.remove()
def get_stock(symbol): last_year_date = datetime.strftime(datetime.now() - relativedelta(years=1), "%Y-%m-%d") date = get_last_trading_date() url = requests.get('https://www.quandl.com/api/v3/datasets/WIKI/{}.json?start_date={}&end_date={}'.format(symbol, last_year_date, date)) json_dataset = url.json() json_data = json_dataset['dataset']['data'] dates = [] closing = [] for day in json_data: dates.append(datetime.strptime(day[0], "%Y-%m-%d")) closing.append(day[4]) plt.plot_date(dates, closing, '-') plt.title(symbol) plt.xlabel('Date') plt.ylable('Stock Price') plt.savefig('foo.png')
def test_assignment_estimated_end_date(authorized_client, authorized_user, test_assignment_fixture): # Get our required foreign key data user = UserProfile.objects.get(user=authorized_user) position = Position.objects.first() tour_of_duty = TourOfDuty.objects.filter(months=12).first() # Create an assignment assignment = Assignment.objects.create(user=user, position=position, tour_of_duty=tour_of_duty) # Assert the dates are currently null assert assignment.start_date is None assert assignment.estimated_end_date is None # Now save a start date assignment.start_date = "1991-02-01" assignment.save() assignment.refresh_from_db() expected_estimated_end_date = assignment.start_date + relativedelta(months=12) assert assignment.estimated_end_date == expected_estimated_end_date
def timedelta(*args, **kwargs): """ Wrapper around dateutil.relativedelta (same construtor args) and returns a humanized string representing the detla in a meaningful way. """ if 'milliseconds' in kwargs: sec = kwargs.get('seconds', 0) msec = kwargs.pop('milliseconds') kwargs['seconds'] = sec + (float(msec) / 1000.0) delta = relativedelta(*args, **kwargs) attrs = ('years', 'months', 'days', 'hours', 'minutes', 'seconds') parts = [ '%d %s' % (getattr(delta, attr), getattr(delta, attr) > 1 and attr or attr[:-1]) for attr in attrs if getattr(delta, attr) ] return " ".join(parts)
def create_month_cycle(n_months, start_month=1): ''' Create a repeating array of months 1-12, truncated by start_month Example: Create an array of 24 months starting on March (3) create_month_cycle(24, start_month=3) array([ 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2]) ''' start = start_month - 1 end = n_months + start_month - 1 months = [(dt.datetime(2000,1,1) + relativedelta(months=i)).month for i in range(start, end)] return np.array(months)
def expired_password(function): @wraps(function) def wrapper(self, *args, **kwargs): representation = function(self, *args, **kwargs) request = self.context["request"] expiry_time = int(get_channel_setting(request.channel, "EXPIRE_PASSWORD_IN")[0]) if expiry_time: representation["expired_password"] = False entry = PasswordHistory.objects.filter(user=request.user).order_by('-pk').first() if not entry: representation["expired_password"] = True else: delta = relativedelta(seconds=expiry_time) if timezone.now() - delta > entry.created_date: representation["expired_password"] = True return representation return wrapper
def create(self, request, *args, **kwargs): email = request.data.get('email', None) try: user = self.get_queryset().get(email__iexact=email) except: user = None if user: # Allow only 5 requests per hour limit = 5 now = timezone.now() to_check = (now - relativedelta(hours=1)).replace(tzinfo=timezone.utc) tokens = models.PasswordRecoveryToken.objects.filter(user=user, created_date__gte=to_check, channel__slug=request.channel) if tokens.count() >= limit: will_release = tokens.order_by('-created_date')[limit-1].created_date + relativedelta(hours=1) seconds = abs((will_release - now).seconds) return response.Response({'success': False, 'message': 'Five tokens generated last hour.', 'try_again_in': seconds}, status=status.HTTP_429_TOO_MANY_REQUESTS) token = models.PasswordRecoveryToken.objects.create(user=user, object_channel=request.channel) return response.Response({'success': True, 'message': 'Token requested successfully(if user exists).'})
def test_round_time(self): rt1 = round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) self.assertEqual(datetime(2015, 1, 1, 0, 0), rt1) rt2 = round_time(datetime(2015, 1, 2), relativedelta(months=1)) self.assertEqual(datetime(2015, 1, 1, 0, 0), rt2) rt3 = round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) self.assertEqual(datetime(2015, 9, 16, 0, 0), rt3) rt4 = round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) self.assertEqual(datetime(2015, 9, 15, 0, 0), rt4) rt5 = round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) self.assertEqual(datetime(2015, 9, 14, 0, 0), rt5) rt6 = round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) self.assertEqual(datetime(2015, 9, 14, 0, 0), rt6)
def calculate_expire_date(self, engine): """Determine expire date, based on 'retention_type' tag""" if self.retention_type == BackupResource.RETENTION_DAILY: expire_date = self.date_created + timedelta( days=RuntimeConfig.get_keep_daily(self.entity_resource.tags, engine)) elif self.retention_type == BackupResource.RETENTION_WEEKLY: expire_date = self.date_created + relativedelta( weeks=RuntimeConfig.get_keep_weekly(self.entity_resource.tags, engine)) elif self.retention_type == BackupResource.RETENTION_MONTHLY: expire_date = self.date_created + relativedelta( months=RuntimeConfig.get_keep_monthly(self.entity_resource.tags, engine)) elif self.retention_type == BackupResource.RETENTION_YEARLY: expire_date = self.date_created + relativedelta( years=RuntimeConfig.get_keep_yearly(self.entity_resource.tags, engine)) else: # in case there is no retention tag on backup, we want it kept forever expire_date = datetime.utcnow() + relativedelta(years=10) self.expire_date = expire_date
def tenor_expiry(exch, product, tenor, rolldays = 0, field = 'fwd'): expiry = tenor if exch == 'DCE' or exch == 'CZCE': while expiry.month not in [1, 5, 9]: expiry = expiry + relativedelta(months=1) expiry = workdays.workday(expiry - datetime.timedelta(days=1), 10 - rolldays, misc.CHN_Holidays) elif exch == 'CFFEX': while product in ['T', 'TF'] and expiry.month not in [3, 6, 9, 12]: expiry = expiry + relativedelta(months=1) wkday = expiry.weekday() expiry = expiry + datetime.timedelta(days=13 + (11 - wkday) % 7) expiry = workdays.workday(expiry, 1 - rolldays, misc.CHN_Holidays) elif exch == 'SHFE': if product in ['hc', 'rb']: while expiry.month not in [1, 5, 10]: expiry = expiry + relativedelta(months=1) else: while expiry.month not in [1, 5, 9]: expiry = expiry + relativedelta(months=1) expiry = expiry.replace(day=14) expiry = workdays.workday(expiry, 1 - rolldays, misc.CHN_Holidays) elif exch == 'SGX' or exch == 'OTC': expiry = expiry + relativedelta(months=1) expiry = workdays.workday(expiry, -1 - rolldays, misc.PLIO_Holidays) return expiry
def day_shift(d, roll_rule): if 'b' in roll_rule: days = int(roll_rule[:-1]) shft_day = workdays.workday(d, days) elif 'm' in roll_rule: mths = int(roll_rule[:-1]) shft_day = d + relativedelta(months=mths) elif 'd' in roll_rule: days = int(roll_rule[:-1]) shft_day = d + datetime.timedelta(days=days) elif 'y' in roll_rule: years = int(roll_rule[:-1]) shft_day = d + relativedelta(years=years) elif 'w' in roll_rule: weeks = int(roll_rule[:-1]) shft_day = d + relativedelta(weeks=weeks) return shft_day
def get_params_for_link(date, query={}): today = utils.get_date_ymd(date) tomorrow = today + relativedelta(days=1) tomorrow = utils.get_date_str(tomorrow) today = utils.get_date_str(today) search_date = ['>=' + today, '<' + tomorrow] params = {'product': '', 'date': search_date, 'release_channel': '', 'version': '', 'signature': '', '_facets': ['url', 'user_comments', 'install_time', 'version', 'address', 'moz_crash_reason', 'reason', 'build_id', 'platform_pretty_version', 'signature', 'useragent_locale']} params.update(query) return params
def get_date(date): if date: try: if isinstance(date, six.string_types): if date.startswith('today'): s = date.split('-') if len(s) == 2 and s[1].isdigit(): date = utils.get_date_ymd('today') date -= relativedelta(days=int(s[1])) return datetime.date(date.year, date.month, date.day) date = utils.get_date_ymd(date) return datetime.date(date.year, date.month, date.day) elif isinstance(date, datetime.date): return date elif isinstance(date, datetime.datetime): return datetime.date(date.year, date.month, date.day) except: pass return None
def month_add(t1, months): """Adds a number of months to the given date. Note ---- The decimal part of the value corresponds to the fraction of days of the last month that will be added. Parameters ---------- t1 : datetime object months : float Returns ------- t2 : datetime object """ t2 = t1 + relativedelta(months=int(months // 1)) days_in_month = calendar.monthrange(t2.year, t2.month)[1] t2 = t2 + datetime.timedelta(seconds=days_in_month * 86400 * (months % 1)) return t2
def get_datetime_at_period_ix(self, ix): """ Get the datetime at a given period. :param period: The index of the period. :returns: The datetime. """ if self.timestep_period_duration == TimePeriod.millisecond: return self.start_datetime + timedelta(milliseconds=ix) elif self.timestep_period_duration == TimePeriod.second: return self.start_datetime + timedelta(seconds=ix) elif self.timestep_period_duration == TimePeriod.minute: return self.start_datetime + timedelta(minutes=ix) elif self.timestep_period_duration == TimePeriod.hour: return self.start_datetime + timedelta(hours=ix) elif self.timestep_period_duration == TimePeriod.day: return self.start_datetime + relativedelta(days=ix) elif self.timestep_period_duration == TimePeriod.week: return self.start_datetime + relativedelta(days=ix*7) elif self.timestep_period_duration == TimePeriod.month: return self.start_datetime + relativedelta(months=ix) elif self.timestep_period_duration == TimePeriod.year: return self.start_datetime + relativedelta(years=ix)
def test_run_last_month(self): """ Test that the activity run method creates settled the loan on the loans' last month and that the transactions """ self.clock.tick() self.object.prepare_to_run(self.clock, 61) for i in range(0, 60): self.object.run(self.clock, self.gl) self.clock.tick() # Test the number of transaction that was created during the # loan's lifetime. # 2 transactions per months should be created + the initial loan amount # transaction. 36*2 + 1 = 73 last_tx_ix = len(self.gl.transactions) self.assertEqual(len(self.gl.transactions), 73) # The transactions should have run only run for 36 months # (the duration of the loan) r = relativedelta.relativedelta( self.gl.transactions[last_tx_ix-1].tx_date, self.gl.transactions[0].tx_date) self.assertEqual(r.years * 12 + r.months, 36)
def prepare_to_run(self, clock, period_count): """ Prepare the entity for execution. :param clock: The clock containing the execution start time and execution period information. :param period_count: The total amount of periods this activity will be requested to be run for. """ self.period_count = period_count self._exec_year_end_datetime = clock.get_datetime_at_period_ix( period_count) self._prev_year_end_datetime = clock.start_datetime self._curr_year_end_datetime = clock.start_datetime + relativedelta( years=1) # Remove all the transactions del self.gl.transactions[:] for c in self.components: c.prepare_to_run(clock, period_count) self.negative_income_tax_total = 0
def getDateList(dateFrom, dateTo, years=0, months=0, weeks=0, days=0, hours=0, minutes=0, seconds=0): if years!=0: tmp = relativedelta(dateTo, dateFrom).years dates = [] for i0 in np.arange(0, tmp): dates.append(dateFrom+relativedelta(years=+i0)) return dates if months!=0: tmp = relativedelta(dateTo, dateFrom).months dates = [] for i0 in np.arange(0, tmp): dates.append(dateFrom+relativedelta(months=+i0)) return dates return [d0.astype(object) for d0 in np.arange(dateFrom, dateTo+dt.timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds), dt.timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) )] pass
def getGeneralStats(request): # total number of students: totalStudents = Customer.objects.distinct().count() numSeries = Series.objects.distinct().count() totalSeriesRegs = EventRegistration.objects.filter(**{'dropIn':False,'cancelled':False}).values('event','customer__user__email').distinct().count() # time studio in existence: firstClass = EventOccurrence.objects.order_by('startTime').values('startTime').first() if firstClass: firstStartTime = firstClass['startTime'] else: firstStartTime = timezone.now() timeDiff = relativedelta(timezone.now(),firstStartTime) totalTime = '%s years, %s months, %s days' % (timeDiff.years, timeDiff.months,timeDiff.days) return (totalStudents,numSeries,totalSeriesRegs,totalTime)
def close_confirm(self): if self.status != self.STATUS_REQUESTED_CLOSING: return False bank_confirmation = BankSystemProxy.deposit_closing(self.id) if not bank_confirmation: self.reject('???????? ??????.') return False cur_date = datetime.date.today() end_date = self.start_date + relativedelta(months=self.duration) if self.next_capitalize_term < min(end_date, cur_date): # last capitalize, if need self._update_amount() money_in_byn = FinanceSettings.get_instance().exchange_rates[self.currency] * float(self.amount.amount) account = Account.objects.get(pk=self.target_account_id) account.put_money(money_in_byn) self.status = self.STATUS_CLOSED self.save() Transaction.objects.create(client=self.client, product=self, info='??????? ??????. ?????? ? ??????? {} BYN ?????????? ?? ???? {}'. format(money_in_byn, self.target_account_id), type=Transaction.TYPE_DEPOSIT_CONFIRM_CLOSE) return True
def daily_update(self): """ Recalculate(capitalize) money amount and set new capitalize term If deposit duration has ended --> close or prolongate deposit Give 1 week buffer for client before longation """ cur_date = datetime.date.today() if self.start_date + relativedelta(months=self.duration) + relativedelta(days=7) < cur_date: if self.prolongation: self.start_date += relativedelta(months=self.duration) self.next_capitalize_term += Deposit.CAPITALIZATION_TIME_PERIOD[self.capitalization] Transaction.objects.create(client=self.client, product=self, info='????????? ??????????????? ????????.', type=Transaction.TYPE_DEPOSIT_PROLONGATION) else: self.status = Deposit.STATUS_CLOSED elif self.next_capitalize_term <= cur_date: self._update_amount() Transaction.objects.create(client=self.client, product=self, info='?????????? ?????????. ????? ????? ????????: {}.'.format(self.amount), type=Transaction.TYPE_DEPOSIT_CAPITALIZE) self.save()
def _check_overdue(self): """ Checks for overdue and updates accordingly. Should be called daily """ cur_date = datetime.date.today() end_date = self.start_date + relativedelta(months=self.duration) if end_date <= cur_date: self.status = Credit.STATUS_FINED elif self.next_payment_term <= cur_date: if self.current_month_percents != 0: self.status = Credit.STATUS_FINED else: self.next_payment_term += relativedelta(months=1) self.current_month_percents = self.current_percents() + 0.1 self.current_month_pay.amount = 0 self.save()
def periods_in_range(self, months_per, start, end): one_day = datetime.timedelta(days=1) ends = [] cur_start = start.replace(day=1) # in edge cases (all sids filtered out, start/end are adjacent) # a test will not generate any returns data if len(self.algorithm_returns) == 0: return ends # ensure that we have an end at the end of a calendar month, in case # the return series ends mid-month... the_end = end.replace(day=1) + relativedelta(months=1) - one_day while True: cur_end = cur_start + relativedelta(months=months_per) - one_day if(cur_end > the_end): break cur_period_metrics = RiskMetricsPeriod( start_date=cur_start, end_date=cur_end, returns=self.algorithm_returns, benchmark_returns=self.benchmark_returns, env=self.env, algorithm_leverages=self.algorithm_leverages, ) ends.append(cur_period_metrics) cur_start = cur_start + relativedelta(months=1) return ends
def extract_post_time_and_name(item): post_data = {} title_data_raw = item.find("h2") raw_time = title_data_raw.find("span").text raw_time = clean_raw_time(raw_time) time_delta = string_time_to_timedelta_dict(raw_time) post_data["created"] = (datetime.utcnow() - relativedelta(**time_delta)).isoformat() h = title_data_raw.find(href=True) post_data["name"], post_data["url"] = extract_href_name_and_url(h) return post_data
def is_alive(df): date2 = df[record_date].map(lambda x: str2time(x)).max() date1 = datetime.datetime(date2.year, date2.month, 1).date() from dateutil.relativedelta import relativedelta date1 -= relativedelta(months=+2) grouped = DataView(df).filter_by_record_date2(date1, date2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() alive = grouped[power_consumption].map(lambda x: 0 if x < 10 else 1) alive.name = 'is_alive' return grouped.join(alive).drop(power_consumption, axis=1) # ????????????
def rise_rate(df): date1_2 = df[record_date].map(lambda x: str2time(x)).max() date1_1 = datetime.datetime(date1_2.year, date1_2.month, 1).date() grouped1 = DataView(df).filter_by_record_date2(date1_1, date1_2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() from dateutil.relativedelta import relativedelta date2_1 = date1_1 - relativedelta(months=+1) date2_2 = date1_2 - relativedelta(months=+1) grouped2 = DataView(df).filter_by_record_date2(date2_1, date2_2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() user_rise_rate = pd.Series(map(lambda x, y: float(x - y) / y, grouped1[power_consumption], grouped2[power_consumption])) user_rise_rate.name = 'user_rise_rate' return grouped1[[user_id]].join(user_rise_rate)
def is_alive(df): date2 = df[record_date].map(lambda x: str2time(x)).max() date1 = datetime.datetime(date2.year, date2.month, 1).date() from dateutil.relativedelta import relativedelta date1 -= relativedelta(months=+2) grouped = DataView(df).filter_by_record_date2(date1, date2)[[user_id, power_consumption]].groupby([user_id], as_index=False).mean() alive = grouped[power_consumption].map(lambda x: 0 if x < 100 else 1) alive.name = 'is_alive' return grouped.join(alive).drop(power_consumption, axis=1) # ????????????
def __init__(self, stdabbr, stdoffset=None, dstabbr=None, dstoffset=None, start=None, end=None): global relativedelta if not relativedelta: from dateutil import relativedelta self._std_abbr = stdabbr self._dst_abbr = dstabbr if stdoffset is not None: self._std_offset = datetime.timedelta(seconds=stdoffset) else: self._std_offset = ZERO if dstoffset is not None: self._dst_offset = datetime.timedelta(seconds=dstoffset) elif dstabbr and stdoffset is not None: self._dst_offset = self._std_offset+datetime.timedelta(hours=+1) else: self._dst_offset = ZERO if dstabbr and start is None: self._start_delta = relativedelta.relativedelta( hours=+2, month=4, day=1, weekday=relativedelta.SU(+1)) else: self._start_delta = start if dstabbr and end is None: self._end_delta = relativedelta.relativedelta( hours=+1, month=10, day=31, weekday=relativedelta.SU(-1)) else: self._end_delta = end
def _delta(self, x, isend=0): kwargs = {} if x.month is not None: kwargs["month"] = x.month if x.weekday is not None: kwargs["weekday"] = relativedelta.weekday(x.weekday, x.week) if x.week > 0: kwargs["day"] = 1 else: kwargs["day"] = 31 elif x.day: kwargs["day"] = x.day elif x.yday is not None: kwargs["yearday"] = x.yday elif x.jyday is not None: kwargs["nlyearday"] = x.jyday if not kwargs: # Default is to start on first sunday of april, and end # on last sunday of october. if not isend: kwargs["month"] = 4 kwargs["day"] = 1 kwargs["weekday"] = relativedelta.SU(+1) else: kwargs["month"] = 10 kwargs["day"] = 31 kwargs["weekday"] = relativedelta.SU(-1) if x.time is not None: kwargs["seconds"] = x.time else: # Default is 2AM. kwargs["seconds"] = 7200 if isend: # Convert to standard time, to follow the documented way # of working with the extra hour. See the documentation # of the tzinfo class. delta = self._dst_offset-self._std_offset kwargs["seconds"] -= delta.seconds+delta.days*86400 return relativedelta.relativedelta(**kwargs)
def string_to_relativedelta(s, num_pos=0, text_pos=1): if s is None: return None interval = [0, 0, 0, 0] # years, months, weeks, days line = s.strip().split(' ') number = int(line[num_pos]) text = line[text_pos][0] if text in 'Dd': interval[3] = number elif text in 'Ww': interval[2] = number elif text in 'Mm': if len(line) > 2: # last day of month interval[1] = number interval[3] = -1 else: interval[1] = number elif text in 'Yy': interval[0] = number elif text in 'Ll': interval[1] = number interval[3] = -1 return relativedelta(years=interval[0], months=interval[1], weeks=interval[2], days=interval[3])
def execute(task, user, verbose, filename, frontload=0): due_date = parse(task['due_date']).date() early = string_to_relativedelta(task['early']) todoist_project = None new_task = task.copy() rewrite = False interval = string_to_relativedelta(task['interval']) while date.today() + relativedelta(days=frontload) >= due_date - early: if todoist_project is None: todoist_project = user.get_project(task['project']) if task['interval'] is None: # One time task for t in task['tasks']: todoist_project.add_task(t, date=task['due_date'], priority=task['priority']) if verbose: print('-> Added new task \'{}\' with due date {}.' .format(t, task['due_date']), flush=True) delete(task, filename, verbose) break else: # Recurring task todoist_project.add_task(new_task['tasks'][new_task['index']], date=new_task['due_date'], priority=task['priority']) if verbose: print('-> Added new task \'{}\' with due date {}.' .format(new_task['tasks'][new_task['index']], new_task['due_date']), flush=True) # incrementing values if interval.days == -1: # last day of month due_date += relativedelta(days=+1) due_date += interval new_task['due_date'] = due_date.isoformat() new_task['index'] = (new_task['index'] + 1) % len(new_task['tasks']) rewrite = True if rewrite: write(new_task, filename, verbose)
def divide_int(self, other): return TypeTime.parse_time_delta( relativedelta(years=self.time_obj.tm_year * self.defined_values[0] / other, months=self.time_obj.tm_mon * self.defined_values[1] / other, days=self.time_obj.tm_mday * self.defined_values[2] / other, hours=self.time_obj.tm_hour * self.defined_values[3] / other, minutes=self.time_obj.tm_min * self.defined_values[4] / other, seconds=self.time_obj.tm_sec * self.defined_values[5] / other).normalized())
def get_rel_delta(self): return relativedelta(years=self.time_obj.tm_year * self.defined_values[0], months=self.time_obj.tm_mon * self.defined_values[1] - 1, days=self.time_obj.tm_mday * self.defined_values[2], hours=self.time_obj.tm_hour * self.defined_values[3], minutes=self.time_obj.tm_min * self.defined_values[4], seconds=self.time_obj.tm_sec * self.defined_values[5]).normalized()
def inc_time(self, time: Node.clock): """Increment a time object by the following amount: 0 - years 1 - months 2 - days 3 - hours 4 - minutes 5 - seconds 6 - weeks 7 - 4 years 8 - decades 9 - 12 hours""" arg_map = (("years", 1), ("months", 1), ("days", 1), ("hours", 1), ("minutes", 1), ("seconds", 1), ("days", 7), ("years", 4), ("years", 10), ("hours", 12)) args = 2 if self.overwrote_default: args = self.arg delta = relativedelta(**dict([arg_map[args]])) new_time = datetime.datetime(*time.time_obj[:7]) + delta rtn = TypeTime(new_time.timetuple()) rtn.defined_values = time.defined_values return rtn
def dec_time(self, time: Node.clock): """Decrement a time object by the following amount: 0 - years 1 - months 2 - days 3 - hours 4 - minutes 5 - seconds 6 - weeks 7 - 4 years 8 - decades 9 - 12 hours""" arg_map = (("years", -1), ("months", -1), ("days", -1), ("hours", -1), ("minutes", -1), ("seconds", -1), ("days", -7), ("years", -4), ("years", -10), ("hours", -12)) args = 2 if self.overwrote_default: args = self.config delta = relativedelta(**dict([arg_map[args]])) new_time = datetime.datetime(*time.time_obj[:7]) + delta rtn = TypeTime(new_time.timetuple()) rtn.defined_values = time.defined_values return rtn
def test_get_inactive_users_returns_jobs(self): """Test JOB get inactive users returns a list of jobs.""" today = datetime.datetime.today() old_date = today + relativedelta(months=-1) date_str = old_date.strftime('%Y-%m-%dT%H:%M:%S.%f') # substract one year and take care of leap years one_year = today + relativedelta(years=-1, leapdays=1) one_year_str = one_year.strftime('%Y-%m-%dT%H:%M:%S.%f') user = UserFactory.create() user_recent = UserFactory.create() # 1 month old contribution tr = TaskRunFactory.create(finish_time=date_str) # 1 year old contribution tr_year = TaskRunFactory.create(finish_time=one_year_str) # User with a contribution from a long time ago TaskRunFactory.create(finish_time="2010-08-08T18:23:45.714110", user=user) # User with a recent contribution TaskRunFactory.create(user=user) user = user_repo.get(tr.user_id) jobs_generator = get_inactive_users_jobs() jobs = [] for job in jobs_generator: jobs.append(job) msg = "There should be one job." assert len(jobs) == 1, msg job = jobs[0] args = job['args'][0] assert job['queue'] == 'quaterly', job['queue'] assert len(args['recipients']) == 1 assert args['recipients'][0] == tr_year.user.email_addr, args['recipients'][0] assert "UNSUBSCRIBE" in args['body'] assert "Update" in args['html']
def acknowledge(self, user, days, reason=None, commit=True): if self.status in (self.STATUS.warning, self.STATUS.critical) and self.is_acknowledged(): raise AlreadyAcknowledged() self.acknowledged_at = timezone.now() self.acknowledged_by = user self.acknowledged_until = timezone.now() + relativedelta.relativedelta(days=days) self.acknowledged_reason = reason or '' if commit: self.save(update_fields=['acknowledged_at', 'acknowledged_by', 'acknowledged_until', 'acknowledged_reason'])
def run_checks(self, force=False, slug=None): """ :param force: <bool> force all registered checks to be executed :return: """ now = timezone.now() checks = datawatch.get_all_registered_checks() last_executions = self.get_last_executions() for check_class in checks: check = check_class() # only update a single slug if requested if slug and check.slug != slug: continue # check is not meant to be run periodically if not isinstance(check_class.run_every, relativedelta): continue # shall the check be run again? if not force and check.slug in last_executions: if now < last_executions[check.slug] + check.run_every: continue # enqueue the check and save execution state check.run()
def _get_date_range_of_month(self, date, dtype='str'): ''' ??????????????????? :param date: ?? :param dtype: ???????str or datetime :return: ''' one_day = dt.timedelta(days=1) month_start = dt.datetime.strptime(date[:7], '%Y-%m') month_end = (month_start + relativedelta(months=1)) - one_day if dtype == 'str': return month_start.strftime('%Y-%m-%d'), month_end.strftime('%Y-%m-%d') elif dtype == 'datetime': return month_start, month_end
def _date_range_to_month_list(self): ''' ??????????????????????? :return: ''' # start date, use bigest day value,for check cache in self._is_cache_file_updated processing = dt.datetime.strptime(self.start[:7] + self.end[7:], '%Y-%m-%d') end = dt.datetime.strptime(self.end, '%Y-%m-%d') month_list = [] while processing <= end: month_list.append(processing.strftime('%Y-%m-%d')) processing = processing + relativedelta(months=1) return month_list
def humanizedelta(*args, **kwargs): """ Wrapper around dateutil.relativedelta (same construtor args) and returns a humanized string representing the detla in a meaningful way. """ delta = relativedelta(*args, **kwargs) attrs = ('years', 'months', 'days', 'hours', 'minutes', 'seconds') parts = [ '%d %s' % (getattr(delta, attr), getattr(delta, attr) > 1 and attr or attr[:-1]) for attr in attrs if getattr(delta, attr) ] return " ".join(parts)
def patient_age_at_evaluation(self, name): if (self.patient.name.dob and self.visit_date): rdelta = relativedelta(self.visit_date.date(), self.patient.name.dob) years_months_days = str(rdelta.years) + 'y ' \ + str(rdelta.months) + 'm ' \ + str(rdelta.days) + 'd' return years_months_days else: return None
def create_fiscalyear(company=None, today=None, config=None): "Create a fiscal year for the company on today" FiscalYear = Model.get('account.fiscalyear', config=config) Sequence = Model.get('ir.sequence', config=config) if not company: company = get_company() if not today: today = datetime.date.today() fiscalyear = FiscalYear(name=str(today.year)) fiscalyear.start_date = today + relativedelta(month=1, day=1) fiscalyear.end_date = today + relativedelta(month=12, day=31) fiscalyear.company = company post_move_sequence = Sequence(name=str(today.year), code='account.move', company=company) post_move_sequence.save() fiscalyear.post_move_sequence = post_move_sequence return fiscalyear
def create_period(cls, fiscalyears, interval=1): ''' Create periods for the fiscal years with month interval ''' Period = Pool().get('account.period') to_create = [] for fiscalyear in fiscalyears: period_start_date = fiscalyear.start_date while period_start_date < fiscalyear.end_date: period_end_date = period_start_date + \ relativedelta(months=interval - 1) + \ relativedelta(day=31) if period_end_date > fiscalyear.end_date: period_end_date = fiscalyear.end_date name = datetime_strftime(period_start_date, '%Y-%m') if name != datetime_strftime(period_end_date, '%Y-%m'): name += ' - ' + datetime_strftime(period_end_date, '%Y-%m') to_create.append({ 'name': name, 'start_date': period_start_date, 'end_date': period_end_date, 'fiscalyear': fiscalyear.id, 'type': 'standard', }) period_start_date = period_end_date + relativedelta(days=1) if to_create: Period.create(to_create)