Python datetime 模块,date() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.date()。
def __compile_policy_value(self, str_value, value_dict):
value = str_value
if not '$' in value:
return value
self.__logger.debug("Compiling value %s", value)
for key in value_dict:
self.__logger.debug("Searching for key %s", key)
val = value_dict[key]
if id(type) and type(val) in (datetime, date):
self.__logger.debug("Value in dictionary: %s -> %s", key, val)
val = time.mktime(self.__get_localtime(val).timetuple())
self.__logger.debug("Timestamp converted: %s -> %s", key, val)
value = value.replace(self.__get_compiled_key(key), str(val))
self.__logger.debug("Value after convertsion: %s", value)
int_value = int(eval(value))
compiled_value = self.__get_localtime(int_value)
self.__logger.debug("Compiled value: %s", compiled_value)
return compiled_value
def get_date_to_message_statistic(self, message_statistic):
"""
Maps each date between the date of the first message and the date of
the last message, inclusive, to the sum of the values of a message
statistic over all messages from that date.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
date_to_message_statistic: A dict mapping a date object between the
date of the first message and the date of the last message to
the sum of the values of message_statistic over all messages in
self.messages from that date.
"""
start_date = self.messages[0].timestamp.date()
end_date = self.messages[-1].timestamp.date()
date_range = [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]
date_to_message_statistic = {d: 0 for d in date_range}
for message in self.messages:
date_to_message_statistic[message.timestamp.date()] += message_statistic(message)
return date_to_message_statistic
def _coerce_datetime(maybe_dt):
if isinstance(maybe_dt, datetime.datetime):
return maybe_dt
elif isinstance(maybe_dt, datetime.date):
return datetime.datetime(
year=maybe_dt.year,
month=maybe_dt.month,
day=maybe_dt.day,
tzinfo=pytz.utc,
)
elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
year, month, day = maybe_dt
return datetime.datetime(
year=year,
month=month,
day=day,
tzinfo=pytz.utc,
)
else:
raise TypeError('Cannot coerce %s into a datetime.datetime'
% type(maybe_dt).__name__)
def get_last_trading_day_of_month(self, dt, env):
self.month = dt.month
if dt.month == 12:
# Roll the year foward and start in January.
year = dt.year + 1
month = 1
else:
# Increment the month in the same year.
year = dt.year
month = dt.month + 1
self.last_day = env.previous_trading_day(
dt.replace(year=year, month=month, day=1)
).date()
return self.last_day
# Stateful rules
def _to_ts(val):
if isinstance(val, (int, float)):
return val
elif isinstance(val, (datetime, date)):
return (val-EPOCH).total_seconds()
v = TS_RE.match(val)
if v:
return float(v.group(0))
v = DT_RE.match(val)
if v:
# get all of the passed datetime pieces, don't worry about them being
# terribly valid, the datetime constructor will handle that ;)
v = list(filter(None, v.groups()))
if len(v) == 7:
# truncate to microseconds as necessary
v[6] = v[6][:6]
# extend to microseconds as necessary
v[6] += (6 - len(v[6])) * '0'
v = list(map(int, v))
dt = datetime(*v)
return (dt-EPOCH).total_seconds()
raise Exception("Value %r is not a timestamp, datetime, or date"%(val,))
def itermonthdates(self, year, month):
"""
Return an iterator for one month. The iterator will yield datetime.date
values and will always iterate through complete weeks, so it will yield
dates outside the specified month.
"""
date = datetime.date(year, month, 1)
# Go back to the beginning of the week
days = (date.weekday() - self.firstweekday) % 7
date -= datetime.timedelta(days=days)
oneday = datetime.timedelta(days=1)
while True:
yield date
date += oneday
if date.month != month and date.weekday() == self.firstweekday:
break
def test_order_execute_single_no_remaining(self, fulfill):
self.product.sale_set.create(
price=100,
member=self.member
)
self.product.start_date = datetime.date(year=2017, month=1, day=1)
self.product.quantity = 1
order = Order(self.member, self.room)
item = OrderItem(self.product, order, 1)
order.items.add(item)
with self.assertRaises(NoMoreInventoryError):
order.execute()
fulfill.was_not_called()
def test_order_execute_multi_some_remaining(self, fulfill):
self.product.sale_set.create(
price=100,
member=self.member
)
self.product.start_date = datetime.date(year=2017, month=1, day=1)
self.product.quantity = 2
order = Order(self.member, self.room)
item = OrderItem(self.product, order, 2)
order.items.add(item)
with self.assertRaises(NoMoreInventoryError):
order.execute()
fulfill.was_not_called()
def index(request):
latest_files = File.objects.order_by("-posted_date")[:10]
latest_updated_families = ProductFamily.objects \
.annotate(last_posted_date=Max('file__posted_date')) \
.order_by('-last_posted_date')[:10]
groups = ProductGroup.objects.order_by("name")
total_count = File.objects.count()
fcu_banner_expiration_date = datetime.date(2017, 11, 10)
show_fcu_banner = datetime.date.today() < fcu_banner_expiration_date
context = {
'show_fcu_banner': show_fcu_banner,
'latest_files': latest_files,
'latest_updated_families': latest_updated_families,
'groups': groups, 'total_count': total_count,
}
return render(request, 'msdn/index.html', context)
def test_activity_sets_sla_triaged_at():
r = new_report()
r.save()
assert r.sla_triaged_at is None
# An activity that shouldn't update sla_triaged_at
d1 = now()
r.activities.create(id=1, type='activity-comment', created_at=d1)
assert r.sla_triaged_at is None
# And now one that should
d2 = d1 + datetime.timedelta(hours=3)
r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
assert r.sla_triaged_at == d2
# And now another aciivity that would update the date, if it wasn't already set
d3 = d2 + datetime.timedelta(hours=3)
r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
assert r.sla_triaged_at == d2
def load_all_url_files(_dir, file_name_prefix):
url_list = []
for file_name in os.listdir(_dir):
if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
file_name = osp.join(_dir, file_name)
fp_urls = open(file_name, 'r') #Open the text file called database.txt
print 'load URLs from file: ' + file_name
i = 0
for line in fp_urls:
line = line.strip()
if len(line)>0:
splits = line.split('\t')
url_list.append(splits[0].strip())
i=i+1
print str(i) + ' URLs loaded'
fp_urls.close()
return url_list
########### End of Functions to Load downloaded urls ###########
############## Functions to get date/time strings ############
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def __init__(self, value):
"""
Initializer value can be:
- integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
- datetime.date: built-in date
- string_type: a string time of the form "yyyy-mm-dd"
"""
if isinstance(value, six.integer_types):
self.days_from_epoch = value
elif isinstance(value, (datetime.date, datetime.datetime)):
self._from_timetuple(value.timetuple())
elif isinstance(value, six.string_types):
self._from_datestring(value)
else:
raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def test_date():
"""Test a simple dateline"""
date_chart = DateLine(truncate_label=1000)
date_chart.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
q = date_chart.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-12',
'2013-01-24',
'2013-02-04',
'2013-02-16']
def test_date_xrange():
"""Test dateline with xrange"""
datey = DateLine(truncate_label=1000)
datey.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
datey.xrange = (date(2013, 1, 1), date(2013, 3, 1))
q = datey.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-01',
'2013-01-12',
'2013-01-24',
'2013-02-04',
'2013-02-16',
'2013-02-27']
def test_date_labels():
"""Test dateline with xrange"""
datey = DateLine(truncate_label=1000)
datey.add('dates', [
(date(2013, 1, 2), 300),
(date(2013, 1, 12), 412),
(date(2013, 2, 2), 823),
(date(2013, 2, 22), 672)
])
datey.x_labels = [
date(2013, 1, 1),
date(2013, 2, 1),
date(2013, 3, 1)
]
q = datey.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'2013-01-01',
'2013-02-01',
'2013-03-01']
def bind_processor(self, dialect):
datetime_date = datetime.date
format = self._storage_format
def process(value):
if value is None:
return None
elif isinstance(value, datetime_date):
return format % {
'year': value.year,
'month': value.month,
'day': value.day,
}
else:
raise TypeError("SQLite Date type only accepts Python "
"date objects as input.")
return process
def result_processor(self, dialect, coltype):
def process(value):
if isinstance(value, datetime.datetime):
return value.date()
elif isinstance(value, util.string_types):
m = self._reg.match(value)
if not m:
raise ValueError(
"could not parse %r as a date value" % (value, ))
return datetime.date(*[
int(x or 0)
for x in m.groups()
])
else:
return value
return process
def _expression_adaptations(self):
return {
operators.add: {
Integer: self.__class__,
Interval: DateTime,
Time: DateTime,
},
operators.sub: {
# date - integer = date
Integer: self.__class__,
# date - date = integer.
Date: Integer,
Interval: DateTime,
# date - datetime = interval,
# this one is not in the PG docs
# but works
DateTime: Interval,
},
}
def test():
"Runs several groups of tests of the database engine."
# Test simple statements in SQL.
persons = test_basic_sql()
# Test various ways to select rows.
test_row_selection(persons)
# Test the four different types of joins in SQL.
orders = test_all_joins(persons)
# Test unstructured ways of joining tables together.
test_table_addition(persons, orders)
# Test creation and manipulation of databases.
test_database_support()
# Load and run some test on the sample Northwind database.
northwind = test_northwind()
# Test different date operations that can be performed.
test_date_functionality()
# Test various functions that operate on specified column.
test_column_functions()
if northwind:
# Test ability to select columns with function processing.
test_generic_column_functions(persons, northwind)
# Test Database2 instances that support transactions.
nw2 = test_transactional_database()
# Allow for interaction at the end of the test.
globals().update(locals())
def test_date_functionality():
"Tests different date operations that can be performed."
# Create an orderz table to test the date type.
orderz = Table(('OrderId', int), ('ProductName', str), ('OrderDate', date))
orderz.insert(1, 'Geitost', date(2008, 11, 11))
orderz.insert(2, 'Camembert Pierrot', date(2008, 11, 9))
orderz.insert(3, 'Mozzarella di Giovanni', date(2008, 11, 11))
orderz.insert(4, 'Mascarpone Fabioloi', date(2008, 10, 29))
# Query the table for a specific date.
orderz.where(ROW.OrderDate == date(2008, 11, 11)).print()
# Update the orderz table so that times are present with the dates.
orderz.alter_column('OrderDate', datetime)
orderz.where(ROW.OrderId == 1) \
.update(OrderDate=datetime(2008, 11, 11, 13, 23, 44))
orderz.where(ROW.OrderId == 2) \
.update(OrderDate=datetime(2008, 11, 9, 15, 45, 21))
orderz.where(ROW.OrderId == 3) \
.update(OrderDate=datetime(2008, 11, 11, 11, 12, 1))
orderz.where(ROW.OrderId == 4) \
.update(OrderDate=datetime(2008, 10, 29, 14, 56, 59))
# Query the table with a datetime object this time.
orderz.where(ROW.OrderDate == datetime(2008, 11, 11)).print()
def add_one_month(t):
"""Return a `datetime.date` or `datetime.datetime` (as given) that is
one month earlier.
Note that the resultant day of the month might change if the following
month has fewer days:
>>> add_one_month(datetime.date(2010, 1, 31))
datetime.date(2010, 2, 28)
"""
import datetime
one_day = datetime.timedelta(days=1)
one_month_later = t + one_day
while one_month_later.month == t.month: # advance to start of next month
one_month_later += one_day
target_month = one_month_later.month
while one_month_later.day < t.day: # advance to appropriate day
one_month_later += one_day
if one_month_later.month != target_month: # gone too far
one_month_later -= one_day
break
return one_month_later
def subtract_one_month(t):
"""Return a `datetime.date` or `datetime.datetime` (as given) that is
one month later.
Note that the resultant day of the month might change if the following
month has fewer days:
>>> subtract_one_month(datetime.date(2010, 3, 31))
datetime.date(2010, 2, 28)
"""
import datetime
one_day = datetime.timedelta(days=1)
one_month_earlier = t - one_day
while one_month_earlier.month == t.month or one_month_earlier.day > t.day:
one_month_earlier -= one_day
return one_month_earlier
def importData(fileName):
"""
Main entry point - opens import data Excel sheet (assumed to be first
sheet) and processes each record, one at a time.
"""
global importSheet, importRow, headers
book = xlrd.open_workbook(fileName)
importSheet = book.sheet_by_index(0)
print "importing from %s" % importSheet.name
# get cell value types from first-non header data row (2nd), since headers are all of type text
for colnum in range(0, importSheet.ncols):
headers.append((importSheet.cell(0, colnum).value, importSheet.cell(1, colnum).ctype))
configErrorReporting(headers)
for importRow in range(1, importSheet.nrows):
record = {}
for colnum in range(0, importSheet.ncols):
if headers[colnum][1] == xlrd.XL_CELL_DATE:
dateTuple = xlrd.xldate_as_tuple(importSheet.cell(rowx=importRow, colx=colnum).value, book.datemode)
date = datetime.date(dateTuple[0], dateTuple[1], dateTuple[2])
# required format for xsd:Date type, for web service call
record[headers[colnum][0]] = date.isoformat()
else:
value = importSheet.cell_value(rowx=importRow, colx=colnum)
if isinstance(value, basestring):
record[headers[colnum][0]] = value.strip()
else:
record[headers[colnum][0]] = value
#print "%s: %s" % (type(record[headers[colnum]]), record[headers[colnum]])
record = handleAccountContact(record)
book.unload_sheet(importSheet.name)
def _o(self, obj):
""" Translate object to json.
Dict in python is not json, so don't be confused.
When return object from rpc, should always use _o.
"""
if obj == None:
return obj
elif isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, date):
return obj.strftime("%Y-%m-%d")
elif isinstance(obj, (list, set, tuple)):
return self._ol(obj)
elif isinstance(obj, dict):
return self._od(obj)
elif isinstance(obj, (int, str, bool, float)):
return obj
else:
return self._oo(obj)
def itermonthdates(self, year, month):
"""
Return an iterator for one month. The iterator will yield datetime.date
values and will always iterate through complete weeks, so it will yield
dates outside the specified month.
"""
date = datetime.date(year, month, 1)
# Go back to the beginning of the week
days = (date.weekday() - self.firstweekday) % 7
date -= datetime.timedelta(days=days)
oneday = datetime.timedelta(days=1)
while True:
yield date
date += oneday
if date.month != month and date.weekday() == self.firstweekday:
break
def _datetime(d):
if isinstance(d, datetime.datetime):
return d
if isinstance(d, datetime.date):
return datetime.datetime(d.year, d.month, d.day)
try:
n = datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
try:
n = datetime.datetime.strptime(d, "%Y-%m-%d")
except ValueError:
n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M")
return n
def test_only_update_with_existent_fields(self):
messages = [PatientUpdateMessage(
first_name="Mary"
)]
container = MessageContainer(
message_type=PatientUpdateMessage,
messages=messages,
hospital_number="50092915",
issuing_source="uclh"
)
subscription = UclhPatientUpdateSubscription()
subscription.notify(container)
patient = self.session.query(Patient).one()
# only update first name
self.assertEqual("Mary", patient.first_name)
self.assertEqual("Smith", patient.surname)
self.assertEqual(None, patient.middle_name)
self.assertEqual("Ms", patient.title)
self.assertEqual(date(1983, 12, 12), patient.date_of_birth)
def format_date_as_string(val, fmt='%m-%d-%Y'):
"""
:rtype str:
:return the input value formatted as '%Y-%m-%d'
:param val: datetime or string
:param fmt: the input format for the date
"""
if isinstance(val, date):
return val.strftime(fmt)
da = format_date(val, fmt)
if not da:
return ''
return da.strftime(FORMAT_DATABASE_DATE)
def format_date(val, fmt='%m-%d-%Y'):
"""
Transform the input string to a datetime object
:param val: the input string for date
:param fmt: the input format for the date
"""
date_obj = None
try:
date_obj = datetime.strptime(val, fmt)
except Exception as exc:
log.warning("Problem formatting date: {} - {} due: {}"
.format(val, fmt, exc))
return date_obj
def test_format_updated_projects(self):
"""Test format updated projects works."""
old_date = date(2014, 11, 24)
old_project = ProjectFactory.create(updated=old_date)
p = ProjectFactory.create()
p.name = 'NewNewNew'
project_repo = ProjectRepository(db)
project_repo.update(p)
update_projects_week()
day = datetime.utcnow().strftime('%Y-%m-%d')
res = format_update_projects()
assert len(res) == 1, res
res = res[0]
assert res['day'] == day, res['day']
assert res['id'] == p.id
assert res['short_name'] == p.short_name
assert res['p_name'] == p.name
assert res['email_addr'] == p.owner.email_addr
assert res['owner_id'] == p.owner.id
assert res['u_name'] == p.owner.name
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, date):
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def get_month_to_message_statistic(self, message_statistic):
"""
Maps each month between the month of the first message and the month of
the last message, inclusive, to the sum of the values of a message
statistic over all messages from that month.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
month_to_message_statistic: A dict mapping a string of the form
'YYYY-MM' representing a month between the month of the first
message and the month of the last message to the sum of the
values of message_statistic over all messages in self.messages
from that month.
"""
start_dt = self.messages[0].timestamp
end_dt = self.messages[-1].timestamp
start_date_month_start = datetime(start_dt.year, start_dt.month, 1)
end_date_month_start = datetime(end_dt.year, end_dt.month, 1)
dt_month_range = rrule(MONTHLY, dtstart=start_date_month_start,
until=end_date_month_start)
month_range = [dt.date() for dt in dt_month_range]
month_to_message_statistic = {dt.strftime(self.MONTH_FORMAT): 0 for dt in month_range}
for message in self.messages:
month_str = message.timestamp.strftime(self.MONTH_FORMAT)
month_to_message_statistic[month_str] += message_statistic(message)
return month_to_message_statistic
def get_time_interval_to_message_statistic(self, time_interval,
message_statistic):
"""
Maps each value of time interval to the sum of the values of a message
statistic over all messages from that time interval value. Wrapper
function for the functions 'get_' + time_interval.replace(' ', '_') +
'_to_message_statistic'.
Args:
time_interval: One of 'minute in hour', 'minute in day', 'hour',
'date', 'week', 'month', 'year'.
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
time_interval_to_message_statistic: A dict mapping each value of a
time interval to the sum of the values of message_statistic
over all messages in self.messages from that time interval
value.
"""
if time_interval not in self.TIME_INTERVALS:
raise ValueError('time_interval must be in {}'.format(self.TIME_INTERVALS))
getter = getattr(self, 'get_' + time_interval.replace(' ', '_') + '_to_message_statistic')
time_interval_to_message_statistic = getter(message_statistic)
return time_interval_to_message_statistic
def test_calendary_weekday_returns_ordinal(self):
third_tuesday_in_july = Calendary(2016).weekday('Tuesday', month=7, ordinal=3)
assert third_tuesday_in_july == ('Tuesday', datetime.date(month=7, day=19, year=2016))
def test_calendary_weekday_returns_right_day_for_year(self):
first_monday_in_nineteen_fortyseven = Calendary(1947).weekday('Monday', ordinal=1)
assert first_monday_in_nineteen_fortyseven == ('Monday', datetime.date(month=1, day=6, year=1947))
def test_weekday_with_multiple_inputs_returns_multiple_days(self):
mondays_and_tuesdays = Calendary(2016).weekday((0, 1), month=7)
mondays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Monday']
tuesdays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Tuesday']
monday_july_eighteenth = datetime.date(year=2016, month=7, day=18)
tuesday_july_nineteenth = datetime.date(year=2016, month=7, day=19)
assert monday_july_eighteenth in mondays
assert tuesday_july_nineteenth in tuesdays
def Log(self, eventType, dataList):
'''
Create an entry in the log file. Each entry will look like:
timestamp\tevent\tdata1\tdata2 <etc>\n
where:
timestamp = integer seconds since the UNIX epoch
event = string identifying the event
data1..n = individual data fields, as appropriate for each event type.
To avoid maintenance issues w/r/t enormous log files, the log filename
that's stored in the settings file is passed through datetime.strftime()
so we can expand any format codes found there against the current date/time
and create e.g. a monthly log file.
'''
now = int(time())
today = datetime.fromtimestamp(now)
# if there's no explicit log file path/name, we create one
# that's the current year & month.
fileName = self.settings.logFilePath
if not fileName:
fileName = "%Y-%m.txt"
self.settings.logFilePath = fileName
path = self.GetPath(fileName)
path = today.strftime(path)
with open(path, "a+t") as f:
f.write("{0}\t{1}\t".format(now, eventType))
f.write("\t".join(dataList))
f.write("\n")
def is_due_date(date):
if 15 <= date.day <= 21:
if date.weekday() == 4:
return True
return False
def gen_due_date(year, month):
date0 = datetime.date(year,month, 1)
for i in range(31):
date0 = date0 + datetime.timedelta(1)
if is_due_date(date0):
return date0
return None
def get_due_date(date):
due_date_this_month = gen_due_date(date.year, date.month)
if date.month != 12:
due_date_next_month = gen_due_date(date.year, date.month + 1)
else:
due_date_next_month = gen_due_date(date.year - 1, 1)
if date > due_date_this_month:
return due_date_next_month
else:
return due_date_this_month
def get_previous_and_next_day(db, message_date):
"""Gets the previous and following saved days given the day in between in the database"""
previous = db.query_message("where date < '{}' order by id desc"
.format(message_date))
following = db.query_message("where date >= '{}' order by id asc"
.format(message_date+timedelta(days=1)))
return Exporter.get_message_date(previous), Exporter.get_message_date(following)
def get_message_date(message):
"""Retrieves the given message DATE, ignoring the time (hour, minutes, seconds, etc.)"""
if message:
return date(year=message.date.year, month=message.date.month, day=message.date.day)
#endregion
def _build_date(date, kwargs):
"""
Builds the date argument for event rules.
"""
if date is None:
if not kwargs:
raise ValueError('Must pass a date or kwargs')
else:
return datetime.date(**kwargs)
elif kwargs:
raise ValueError('Cannot pass kwargs and a date')
else:
return date
def _get_open(self, dt, env):
"""
Cache the open for each day.
"""
if self._dt is None or (self._dt.date() != dt.date()):
self._dt = env.get_open_and_close(dt)[0] \
- datetime.timedelta(minutes=1)
return self._dt
def _get_close(self, dt, env):
"""
Cache the close for each day.
"""
if self._dt is None or (self._dt.date() != dt.date()):
self._dt = env.get_open_and_close(dt)[1]
return self._dt
def should_trigger(self, dt, env):
return dt.date() not in env.early_closes
def get_first_trading_day_of_week(self, dt, env):
prev = dt
dt = env.previous_trading_day(dt)
while dt.date().weekday() < prev.date().weekday():
prev = dt
dt = env.previous_trading_day(dt)
return prev.date()