Python datetime 模块,datetime() 实例源码
我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用datetime.datetime()。
def default(self, obj):
"""default method."""
if hasattr(obj, '__json__'):
return obj.__json__()
elif isinstance(obj, collections.Iterable):
return list(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
return dict(obj)
elif hasattr(obj, '__dict__'):
return {member: getattr(obj, member)
for member in dir(obj)
if not member.startswith('_') and
not hasattr(getattr(obj, member), '__call__')}
return json.JSONEncoder.default(self, obj)
def get_response_and_time(self, key, default=(None, None)):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response, timestamp = self.responses[key]
except KeyError:
return default
return self.restore_response(response), timestamp
def setUpTestData(cls):
super().setUpTestData()
cls.create_local_and_remote_user()
cls.user2 = AnonymousUser()
cls.local_user = UserFactory()
cls.public_content = ContentFactory(
visibility=Visibility.PUBLIC, text="**Foobar**", author=cls.profile,
)
cls.site_content = ContentFactory(
visibility=Visibility.SITE, text="_Foobar_"
)
cls.limited_content = ContentFactory(visibility=Visibility.LIMITED)
cls.self_content = ContentFactory(visibility=Visibility.SELF)
cls.remote_content = ContentFactory(
visibility=Visibility.PUBLIC, remote_created=make_aware(datetime.datetime(2015, 1, 1)),
author=cls.remote_profile,
)
cls.ids = [
cls.public_content.id, cls.site_content.id, cls.limited_content.id, cls.self_content.id
]
cls.set = {
cls.public_content, cls.site_content, cls.limited_content, cls.self_content
}
def default(self, obj):
"""default method."""
if hasattr(obj, '__json__'):
return obj.__json__()
elif isinstance(obj, collections.Iterable):
return list(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
return dict(obj)
elif hasattr(obj, '__dict__'):
return {member: getattr(obj, member)
for member in dir(obj)
if not member.startswith('_') and
not hasattr(getattr(obj, member), '__call__')}
return json.JSONEncoder.default(self, obj)
def test_am_pm_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-12)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def get_time(cls, request, time_attr):
"""Extracts a time object from the given request
:param request: the request object
:param time_attr: the attribute name
:return: datetime.timedelta
"""
time_part = datetime.datetime.strptime(
request.params[time_attr][:-4],
'%a, %d %b %Y %H:%M:%S'
)
return datetime.timedelta(
hours=time_part.hour,
minutes=time_part.minute
)
def get_datetime(cls, request, date_attr, time_attr):
"""Extracts a UTC datetime object from the given request
:param request: the request object
:param date_attr: the attribute name
:return: datetime.datetime
"""
date_part = datetime.datetime.strptime(
request.params[date_attr][:-4],
'%a, %d %b %Y %H:%M:%S'
)
time_part = datetime.datetime.strptime(
request.params[time_attr][:-4],
'%a, %d %b %Y %H:%M:%S'
)
# update the time values of date_part with time_part
return date_part.replace(
hour=time_part.hour,
minute=time_part.minute,
second=time_part.second,
microsecond=time_part.microsecond
)
def coerce_to_dtype(dtype, value):
"""
Make a value with the specified numpy dtype.
Only datetime64[ns] and datetime64[D] are supported for datetime dtypes.
"""
name = dtype.name
if name.startswith('datetime64'):
if name == 'datetime64[D]':
return make_datetime64D(value)
elif name == 'datetime64[ns]':
return make_datetime64ns(value)
else:
raise TypeError(
"Don't know how to coerce values of dtype %s" % dtype
)
return dtype.type(value)
def get_open_and_close(day, early_closes):
market_open = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=9,
minute=31),
tz='US/Eastern').tz_convert('UTC')
# 1 PM if early close, 4 PM otherwise
close_hour = 13 if day in early_closes else 16
market_close = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=close_hour),
tz='US/Eastern').tz_convert('UTC')
return market_open, market_close
def create_test_panel_ohlc_source(sim_params, env):
start = sim_params.first_open \
if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
end = sim_params.last_close \
if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
index = env.days_in_range(start, end)
price = np.arange(0, len(index)) + 100
high = price * 1.05
low = price * 0.95
open_ = price + .1 * (price % 2 - .5)
volume = np.ones(len(index)) * 1000
arbitrary = np.ones(len(index))
df = pd.DataFrame({'price': price,
'high': high,
'low': low,
'open': open_,
'volume': volume,
'arbitrary': arbitrary},
index=index)
panel = pd.Panel.from_dict({0: df})
return DataPanelSource(panel), panel
def get_open_and_close(day, early_closes):
# only "early close" event in Bovespa actually is a late start
# as the market only opens at 1pm
open_hour = 13 if day in quarta_cinzas else 10
market_open = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=open_hour,
minute=00),
tz='America/Sao_Paulo').tz_convert('UTC')
market_close = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=16),
tz='America/Sao_Paulo').tz_convert('UTC')
return market_open, market_close
def _coerce_datetime(maybe_dt):
if isinstance(maybe_dt, datetime.datetime):
return maybe_dt
elif isinstance(maybe_dt, datetime.date):
return datetime.datetime(
year=maybe_dt.year,
month=maybe_dt.month,
day=maybe_dt.day,
tzinfo=pytz.utc,
)
elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
year, month, day = maybe_dt
return datetime.datetime(
year=year,
month=month,
day=day,
tzinfo=pytz.utc,
)
else:
raise TypeError('Cannot coerce %s into a datetime.datetime'
% type(maybe_dt).__name__)
def get_open_and_close(day, early_closes):
market_open = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=9,
minute=31),
tz='US/Eastern').tz_convert('UTC')
# 1 PM if early close, 4 PM otherwise
close_hour = 13 if day in early_closes else 16
market_close = pd.Timestamp(
datetime(
year=day.year,
month=day.month,
day=day.day,
hour=close_hour),
tz='Asia/Shanghai').tz_convert('UTC')
return market_open, market_close
def test_generator_dates(self):
"""
Ensure the pipeline of generators are in sync, at least as far as
their current dates.
"""
sim_params = factory.create_simulation_parameters(
start=datetime(2011, 7, 30, tzinfo=pytz.utc),
end=datetime(2012, 7, 30, tzinfo=pytz.utc),
env=self.env,
)
algo = TestAlgo(self, sim_params=sim_params, env=self.env)
trade_source = factory.create_daily_trade_source(
[8229],
sim_params,
env=self.env,
)
algo.set_sources([trade_source])
gen = algo.get_generator()
self.assertTrue(list(gen))
self.assertTrue(algo.slippage.latest_date)
self.assertTrue(algo.latest_date)
def test_progress(self):
"""
Ensure the pipeline of generators are in sync, at least as far as
their current dates.
"""
sim_params = factory.create_simulation_parameters(
start=datetime(2008, 1, 1, tzinfo=pytz.utc),
end=datetime(2008, 1, 5, tzinfo=pytz.utc),
env=self.env,
)
algo = TestAlgo(self, sim_params=sim_params, env=self.env)
trade_source = factory.create_daily_trade_source(
[8229],
sim_params,
env=self.env,
)
algo.set_sources([trade_source])
gen = algo.get_generator()
results = list(gen)
self.assertEqual(results[-2]['progress'], 1.0)
def test_checks_should_trigger(self):
class CountingRule(Always):
count = 0
def should_trigger(self, dt, env):
CountingRule.count += 1
return True
for r in [CountingRule] * 5:
self.em.add_event(
Event(r(), lambda context, data: None)
)
mock_algo_class = namedtuple('FakeAlgo', ['trading_environment'])
mock_algo = mock_algo_class(trading_environment="fake_env")
self.em.handle_data(mock_algo, None, datetime.datetime.now())
self.assertEqual(CountingRule.count, 5)
def writeValue(self, value):
if isinstance(value, (str, unicode)):
self.simpleElement("string", value)
elif isinstance(value, bool):
# must switch for bool before int, as bool is a
# subclass of int...
if value:
self.simpleElement("true")
else:
self.simpleElement("false")
elif isinstance(value, (int, long)):
self.simpleElement("integer", "%d" % value)
elif isinstance(value, float):
self.simpleElement("real", repr(value))
elif isinstance(value, dict):
self.writeDict(value)
elif isinstance(value, Data):
self.writeData(value)
elif isinstance(value, datetime.datetime):
self.simpleElement("date", _dateToString(value))
elif isinstance(value, (tuple, list)):
self.writeArray(value)
else:
raise TypeError("unsuported type: %s" % type(value))
def make_comparable(self, other):
if isinstance(other, DateTime):
s = self.value
o = other.value
elif datetime and isinstance(other, datetime.datetime):
s = self.value
o = other.strftime("%Y%m%dT%H:%M:%S")
elif isinstance(other, (str, unicode)):
s = self.value
o = other
elif hasattr(other, "timetuple"):
s = self.timetuple()
o = other.timetuple()
else:
otype = (hasattr(other, "__class__")
and other.__class__.__name__
or type(other))
raise TypeError("Can't compare %s and %s" %
(self.__class__.__name__, otype))
return s, o
def get_response(self, key, default=None):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response = self.responses[key]
except KeyError:
return default
return response
def test_activity_sets_sla_triaged_at():
r = new_report()
r.save()
assert r.sla_triaged_at is None
# An activity that shouldn't update sla_triaged_at
d1 = now()
r.activities.create(id=1, type='activity-comment', created_at=d1)
assert r.sla_triaged_at is None
# And now one that should
d2 = d1 + datetime.timedelta(hours=3)
r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
assert r.sla_triaged_at == d2
# And now another aciivity that would update the date, if it wasn't already set
d3 = d2 + datetime.timedelta(hours=3)
r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
assert r.sla_triaged_at == d2
def _load_date(val):
microsecond = 0
tz = None
try:
if len(val) > 19:
if val[19] == '.':
microsecond = int(val[20:26])
if len(val) > 26:
tz = TomlTz(val[26:32])
else:
tz = TomlTz(val[19:25])
except ValueError:
tz = None
try:
d = datetime.datetime(int(val[:4]), int(val[5:7]), int(val[8:10]), int(val[11:13]), int(val[14:16]), int(val[17:19]), microsecond, tz)
except ValueError:
return None
return d
def setUp(self):
aws_data = {
'ScalableTargets': [
{
'ServiceNamespace': 'ecs',
'ResourceId': 'service/my_cluster/my_service',
'ScalableDimension': 'ecs:service:DesiredCount',
'MinCapacity': 1,
'MaxCapacity': 3,
'RoleARN': 'my_role_arn',
'CreationTime': datetime(2017, 4, 14)
}
],
'NextToken': None
}
init = Mock(return_value=None)
init.return_value = None
appscaling_client = Mock()
appscaling_client.describe_scalable_targets = Mock(return_value=aws_data)
client = Mock(return_value=appscaling_client)
with Replacer() as r:
r.replace('boto3.client', client)
r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster')
def setUp(self):
aws_data = {
'ServiceNamespace': 'ecs',
'ResourceId': 'service/my_cluster/my_service',
'ScalableDimension': 'ecs:service:DesiredCount',
'MinCapacity': 1,
'MaxCapacity': 3,
'RoleARN': 'my_role_arn',
'CreationTime': datetime(2017, 4, 14)
}
init = Mock(return_value=None)
init.return_value = None
appscaling_client = Mock()
self.describe_scaling_targets = Mock(return_value=aws_data)
appscaling_client.describe_scalable_targets = self.describe_scaling_targets
client = Mock(return_value=appscaling_client)
with Replacer() as r:
r.replace('boto3.client', client)
r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster', aws=aws_data)
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)):
self.public_content.save()
self.assertFalse(self.public_content.edited)
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertTrue(self.public_content.edited)
def test_dict_for_view_edited_post(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertEqual(self.public_content.dict_for_view(self.user), {
"author": self.public_content.author_id,
"author_guid": self.public_content.author.guid,
"author_handle": self.public_content.author.handle,
"author_home_url": self.public_content.author.home_url,
"author_image": self.public_content.author.safer_image_url_small,
"author_is_local": bool(self.public_content.author.user),
"author_name": self.public_content.author.handle,
"author_profile_url": self.public_content.author.get_absolute_url(),
"content_type": self.public_content.content_type.string_value,
"delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
"detail_url": self.public_content.get_absolute_url(),
"formatted_timestamp": self.public_content.timestamp,
"guid": self.public_content.guid,
"has_shared": False,
"humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
"id": self.public_content.id,
"is_authenticated": True,
"is_author": True,
"is_following_author": False,
"parent": "",
"profile_id": self.public_content.author.id,
"rendered": self.public_content.rendered,
"reply_count": 0,
"reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
"shares_count": 0,
"slug": self.public_content.slug,
"through": self.public_content.id,
"update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
})
def test_is_time_to_run_before_late_metric_slack_time(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=9, second=59, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_late_metric_slack_time(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_with_already_done_this_hour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=11,
minute=30, second=0, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=check_time)
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=5, second=0, microsecond=0)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_same_time_different_day_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(days=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_crontab():
c = Crontab()
c.add('boo')
c.add('foo', 0)
c.add('bar', [1, 3], -5, -1, -1, 0)
assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'}
assert c.actions(1, 1, 1, 1, 1) == {'boo'}
assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'}
assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'}
ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple())
assert c.actions_ts(ts) == {'boo', 'bar'}
def _get_extensions(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
arcname = posixpath.join(info_dir, 'EXTENSIONS')
wrapper = codecs.getreader('utf-8')
result = []
with ZipFile(pathname, 'r') as zf:
try:
with zf.open(arcname) as bf:
wf = wrapper(bf)
extensions = json.load(wf)
cache = self._get_dylib_cache()
prefix = cache.prefix_to_dir(pathname)
cache_base = os.path.join(cache.base, prefix)
if not os.path.isdir(cache_base):
os.makedirs(cache_base)
for name, relpath in extensions.items():
dest = os.path.join(cache_base, convert_path(relpath))
if not os.path.exists(dest):
extract = True
else:
file_time = os.stat(dest).st_mtime
file_time = datetime.datetime.fromtimestamp(file_time)
info = zf.getinfo(relpath)
wheel_time = datetime.datetime(*info.date_time)
extract = wheel_time > file_time
if extract:
zf.extract(relpath, cache_base)
result.append((name, dest))
except KeyError:
pass
return result
def _get_x509_days_left(x509):
date_fmt = '%Y%m%d%H%M%SZ'
current_datetime = datetime.datetime.utcnow()
not_after = time.strptime(x509.get_notAfter(), date_fmt)
not_before = time.strptime(x509.get_notBefore(), date_fmt)
ret = {'not_after': (datetime.datetime(*not_after[:6]) - current_datetime).days,
'not_before': (datetime.datetime(*not_before[:6]) - current_datetime).days}
return ret
def time_combined(year,month,day,hour,minute,meridiem):
"""
Time is tricky. So I am following the simple rules;
12:** AM will have the hour changed to 0
1:** AM will not have the hour changed
12:** PM will not have the hour changed
1:** PM will have the hour changed by adding 12
From these simple points, I have constructed the following
if statements to take control of the correct hour.
"""
if meridiem == "AM":
if hour == 12:
hour = 0
else:
if hour < 12:
hour = hour + 12
# Create the final start/end date fields
return datetime.datetime(
year,
month,
day,
hour,
minute
)
def parse_neuralynx_time_string(self, time_string):
# Parse a datetime object from the idiosyncratic time string in Neuralynx file headers
try:
tmp_date = [int(x) for x in time_string.split()[4].split('/')]
tmp_time = [int(x) for x in time_string.split()[-1].replace('.', ':').split(':')]
tmp_microsecond = tmp_time[3] * 1000
except:
warnings.warn('Unable to parse time string from Neuralynx header: ' + time_string)
return None
else:
return datetime.datetime(tmp_date[2], tmp_date[0], tmp_date[1], # Year, month, day
tmp_time[0], tmp_time[1], tmp_time[2], # Hour, minute, second
tmp_microsecond)
def test_get_vacations_view_is_working_properly(self):
"""testing if GET: /api/users/{id}/vacations view is working properly
"""
from stalker import db, Vacation
import datetime
vac1 = Vacation(
user=self.test_user1,
start=datetime.datetime(2016, 4, 24, 0, 0),
end=datetime.datetime(2016, 4, 28, 0, 0)
)
vac2 = Vacation(
user=self.test_user1,
start=datetime.datetime(2016, 7, 1, 0, 0),
end=datetime.datetime(2016, 7, 8, 0, 0)
)
db.DBSession.add_all([vac1, vac2])
db.DBSession.flush()
import transaction
transaction.commit()
from stalker import User
user1 = User.query.filter(User.login == self.test_user1.login).first()
response = self.test_app.get(
'/api/users/%s/vacations' % self.test_user1.id
)
self.assertEqual(
sorted(response.json_body),
sorted([
{
'id': v.id,
'$ref': '/api/vacations/%s' % v.id,
'name': v.name,
'entity_type': v.entity_type
} for v in [user1.vacations[0], user1.vacations[1]]
])
)
# TASKS
def test_update_entity_is_working_properly(self):
"""testing if update_entity() method is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
new_end = datetime.datetime(2016, 7, 26, 18)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.testing import DummyRequest, DummyMultiDict
request = DummyRequest()
request.matchdict['id'] = t1.id
request.params = DummyMultiDict()
from stalker_pyramid.views import EntityViewBase
request.params['end'] = \
EntityViewBase.milliseconds_since_epoch(new_end)
self.patch_logged_in_user(request)
time_log_view = time_log.TimeLogViews(request)
response = time_log_view.update_entity()
t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self):
"""testing if delete_entity() method is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.testing import DummyRequest
request = DummyRequest()
request.matchdict['id'] = t1.id
time_log_view = time_log.TimeLogViews(request)
response = time_log_view.delete_entity()
self.assertIsNone(
TimeLog.query
.filter(TimeLog.task == self.test_task1)
.filter(TimeLog.resource == self.test_user1)
.first()
)
def test_update_entity_is_working_properly_with_post(self):
"""testing if POST: /api/time_logs/{id} view is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
new_end = datetime.datetime(2016, 7, 26, 18)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.views import EntityViewBase
self.admin_login()
response = self.test_app.post(
'/api/time_logs/%s' % t1.id,
params={
'end': EntityViewBase.milliseconds_since_epoch(new_end)
},
status=200
)
t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self):
"""testing if DELETE: /api/time_logs/{id} view is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
response = self.test_app.delete(
'/api/time_logs/%s' % t1.id,
status=200
)
self.assertIsNone(
TimeLog.query
.filter(TimeLog.task == self.test_task1)
.filter(TimeLog.resource == self.test_user1)
.first()
)
def test_create_entity_with_invalid_user_id(self):
"""testing if create_entity() method is working properly with invalid
user_id parameter
"""
import datetime
start = datetime.datetime(2016, 4, 22, 10)
end = datetime.datetime(2016, 4, 22, 16)
from stalker_pyramid.testing import DummyRequest, DummyMultiDict
from stalker_pyramid.views import EntityViewBase
request = DummyRequest()
request.params = DummyMultiDict()
request.params['user_id'] = -1
request.params['start'] = \
EntityViewBase.milliseconds_since_epoch(start)
request.params['end'] = EntityViewBase.milliseconds_since_epoch(end)
vacation_views = vacation.VacationViews(request)
from pyramid.httpexceptions import HTTPServerError
with self.assertRaises(HTTPServerError) as cm:
vacation_views.create_entity()
self.assertEqual(
str(cm.exception),
'Missing "user_id" parameter'
)