Python django.utils.timezone 模块,activate() 实例源码
我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用django.utils.timezone.activate()。
def _select_locale(self, request):
supported = request.event.locales
language = (
self._language_from_user(request, supported)
or self._language_from_cookie(request, supported)
or self._language_from_browser(request, supported)
or request.event.locale
)
translation.activate(language)
request.LANGUAGE_CODE = translation.get_language()
with suppress(pytz.UnknownTimeZoneError):
tzname = request.event.timezone
timezone.activate(pytz.timezone(tzname))
request.timezone = tzname
def _select_locale(self, request):
supported = request.event.locales if (hasattr(request, 'event') and request.event) else settings.LANGUAGES
language = (
self._language_from_user(request, supported)
or self._language_from_cookie(request, supported)
or self._language_from_browser(request, supported)
)
if hasattr(request, 'event') and request.event:
language = language or request.event.locale
translation.activate(language)
request.LANGUAGE_CODE = translation.get_language()
with suppress(pytz.UnknownTimeZoneError):
if request.user.is_authenticated:
tzname = request.user.timezone
elif hasattr(request, 'event') and request.event:
tzname = request.event.timezone
else:
tzname = settings.TIME_ZONE
timezone.activate(pytz.timezone(tzname))
request.timezone = tzname
def chat_view(request, thread_id):
thread = get_object_or_404(
Thread,
id=thread_id,
participants__id=request.user.id
)
messages_info = get_messages_info(request.user.id, thread_id)
messages = thread.message_set.order_by("-datetime")[:]
tz = request.COOKIES.get("timezone")
if tz:
timezone.activate(pytz.timezone(tz))
clear_users_thread_unread_messages(thread, request.user)
return render(request, 'chat.html', {
"thread": thread,
"thread_messages": messages,
"messages_total": messages_info['total'],
"messages_sent": messages_info['sent'],
"messages_received": messages_info['received'],
})
def tearDown(self):
timezone.activate(self._timezone_backup)
return super(MiddlewareTests, self).tearDown()
def process_request(self, request):
translation.activate(self.get_language_for_user(request))
request.LANGUAGE_CODE = translation.get_language()
def process_request(self, request):
account = getattr(request.user, "account", None)
if account:
timezone.activate(account.timezone)
def process_request(self, request):
translation.activate(self.get_language_for_user(request))
request.LANGUAGE_CODE = translation.get_language()
def process_request(self, request):
try:
account = getattr(request.user, "account", None)
except Account.DoesNotExist:
pass
else:
if account:
tz = settings.TIME_ZONE if not account.timezone else account.timezone
timezone.activate(tz)
def process_request(self, request):
"""Intercedes during a request."""
if not request.user:
return
if 'test' in sys.argv:
# Skip this middleware in testing :/
# TODO(matt): figure out why this middleware causes the internal
# django humanize test to fail.
return
try:
user_profile = models.UserProfile.objects.get(user=request.user.id)
timezone.activate(pytz.timezone(user_profile.timezone))
except models.UserProfile.DoesNotExist:
return
def test_timestamp_utc_create(self):
timezone.activate(pytz.utc)
obj = DateTimeTzModel.objects.create(timestamp=aware_datetime())
self.assertEqual(obj.timestamp.tzinfo, aware_datetime().tzinfo)
timezone.deactivate()
def test_timestamp_utc_read(self):
# Regression test for https://github.com/learningequality/kolibri/issues/1602
timezone.activate(pytz.utc)
obj = DateTimeTzModel.objects.create(timestamp=aware_datetime())
obj.refresh_from_db()
self.assertEqual(obj.timestamp, aware_datetime())
timezone.deactivate()
def test_timestamp_arbitrary_create(self):
tz = pytz.timezone('Africa/Nairobi')
timezone.activate(tz)
timestamp = aware_datetime()
obj = DateTimeTzModel.objects.create(timestamp=timestamp)
self.assertEqual(obj.timestamp.tzinfo, timestamp.tzinfo)
timezone.deactivate()
def test_timestamp_arbitrary_read(self):
# Regression test for https://github.com/learningequality/kolibri/issues/1602
tz = pytz.timezone('Africa/Nairobi')
timezone.activate(tz)
timestamp = aware_datetime()
obj = DateTimeTzModel.objects.create(timestamp=timestamp)
obj.refresh_from_db()
self.assertEqual(obj.timestamp, timestamp)
timezone.deactivate()
def test_default_utc_create(self):
timezone.activate(pytz.utc)
obj = DateTimeTzModel.objects.create()
self.assertEqual(obj.default_timestamp.tzinfo, pytz.utc)
timezone.deactivate()
def test_zero_second_fractions_read(self):
# Regression test for https://github.com/learningequality/kolibri/issues/1758
timezone.activate(pytz.utc)
try:
timestamp = parse_timezonestamp('2000-12-11 10:09:08')
self.assertEqual(timestamp, aware_datetime())
except ValueError:
self.fail('parse_timezonestamp did not parse time data missing fractions of seconds.')
finally:
timezone.deactivate()
def test_default_utc_create(self):
timezone.activate(pytz.utc)
obj = DateTimeTzModel.objects.create()
self.assertEqual(obj.default_timestamp.tzinfo, pytz.utc)
timezone.deactivate()
def test_timestamp_utc_parse(self):
timezone.activate(pytz.utc)
field = DateTimeTzSerializerField()
timestamp = aware_datetime()
self.assertEqual(field.to_internal_value(timestamp.isoformat()).tzinfo, aware_datetime().tzinfo)
timezone.deactivate()
def test_timestamp_arbitrary_parse(self):
tz = pytz.timezone('Africa/Nairobi')
timezone.activate(tz)
field = DateTimeTzSerializerField()
timestamp = aware_datetime()
self.assertEqual(field.to_internal_value(timestamp.isoformat()).tzinfo, aware_datetime().tzinfo)
timezone.deactivate()
def __call__(self, request):
url = resolve(request.path_info)
event_slug = url.kwargs.get('event')
if event_slug:
request.event = get_object_or_404(
Event,
slug__iexact=event_slug,
)
if hasattr(request, 'event') and request.event:
if not request.user.is_anonymous:
request.is_orga = request.user.is_superuser or EventPermission.objects.filter(
user=request.user,
event=request.event,
is_orga=True
).exists()
request.is_reviewer = request.user.is_superuser or EventPermission.objects.filter(
user=request.user,
event=request.event,
is_reviewer=True
).exists()
else:
request.is_orga = False
request.is_reviewer = False
timezone.activate(pytz.timezone(request.event.timezone))
self._set_orga_events(request)
if 'orga' in url.namespaces:
url = self._handle_orga_url(request, url)
if url:
return redirect(url)
return self.get_response(request)
def process_request(self, request):
tz = request.session.get(settings.TIMEZONE_SESSION_KEY)
if not tz:
tz = settings.TIME_ZONE
timezone.activate(tz)
def process_request(self, request):
if request.user.is_authenticated():
timezone.activate(request.user.timezone)
else:
timezone.deactivate()
def process_request(self, request):
if request.user.is_authenticated():
translation.activate(request.user.language)
def localnow():
""" Get the current datetime in the local timezone for the user
(timezone set by timezone.activate())."""
return timezone.localtime(utcnow(), timezone=timezone.get_current_timezone())
def localtoday():
""" Get the current date in the local timezone for the user
(timezone set by timezone.activate())."""
return timezone.localtime(utcnow(), timezone=timezone.get_current_timezone()).date()
def __call__(self, request):
if request.site:
tzname = GeneralSetting.for_site(request.site).timezone
if tzname:
timezone.activate(pytz.timezone(tzname))
else:
timezone.deactivate()
response = self.get_response(request)
return response
def process_request(self, request):
tzname = request.session.get('django_timezone')
if tzname:
timezone.activate(pytz.timezone(tzname))
else:
timezone.deactivate()
def user_time(request):
try:
stprofile = StProfile.objects.get(user=request.user)
timezone.activate(stprofile.time_zone)
request.session['django_timezone'] = stprofile.time_zone
except:
pass
messages.success(request, _(u"You have successfully logged in as %s") % request.user.username)
try:
translation.activate(stprofile.language)
request.session[translation.LANGUAGE_SESSION_KEY] = stprofile.language
except:
pass
return HttpResponseRedirect(reverse('home'))
def process_request(self, request):
tzname = get_site_setting('i18n_timezone')
if tzname:
timezone.activate(pytz.timezone(tzname))
else:
timezone.deactivate()
def process_request(self, request):
user_time_zone = request.session.get('user_time_zone')
try:
if not user_time_zone:
user_ip = get_real_ip(request)
if user_ip:
reader = geolite2.reader()
ip_details = reader.get(user_ip)
user_time_zone = ip_details['location']['time_zone']
geolite2.close()
if user_time_zone:
request.session['user_time_zone'] = user_time_zone
timezone.activate(pytz.timezone(user_time_zone))
except:
timezone.deactivate()
def __call__(self, request):
if request.user.is_authenticated():
timezone.activate(pytz.timezone(request.user.timezone))
else:
timezone.activate('UTC')
return self.get_response(request)
def process_request(self, request):
translation.activate(self.get_language_for_user(request))
request.LANGUAGE_CODE = translation.get_language()
def process_request(self, request):
account = getattr(request.user, "account", None)
if account:
tz = settings.TIME_ZONE if not account.timezone else account.timezone
timezone.activate(tz)
def process_request(self, request):
if request.user.is_authenticated() and request.user.timezone:
timezone.activate(pytz.timezone(request.user.timezone))
else:
timezone.deactivate()
def process_request(self, request):
if request.user.is_authenticated:
if request.user.timezone is not None:
timezone.activate(pytz.timezone(request.user.timezone))
else:
timezone.activate(pytz.timezone('UTC'))
else:
timezone.deactivate()
def process_request(self, request):
"""Adds data necessary for Horizon to function to the request."""
request.horizon = {'dashboard': None,
'panel': None,
'async_messages': []}
if not hasattr(request, "user") or not request.user.is_authenticated():
# proceed no further if the current request is already known
# not to be authenticated
# it is CRITICAL to perform this check as early as possible
# to avoid creating too many sessions
return None
if request.is_ajax():
# if the request is Ajax we do not want to proceed, as clients can
# 1) create pages with constant polling, which can create race
# conditions when a page navigation occurs
# 2) might leave a user seemingly left logged in forever
# 3) thrashes db backed session engines with tons of changes
return None
# If we use cookie-based sessions, check that the cookie size does not
# reach the max size accepted by common web browsers.
if (
settings.SESSION_ENGINE ==
'django.contrib.sessions.backends.signed_cookies'
):
max_cookie_size = getattr(
settings, 'SESSION_COOKIE_MAX_SIZE', None)
session_cookie_name = getattr(
settings, 'SESSION_COOKIE_NAME', None)
session_key = request.COOKIES.get(session_cookie_name)
if max_cookie_size is not None and session_key is not None:
cookie_size = sum((
len(key) + len(value)
for key, value in six.iteritems(request.COOKIES)
))
if cookie_size >= max_cookie_size:
LOG.error(
'Total Cookie size for user_id: %(user_id)s is '
'%(cookie_size)sB >= %(max_cookie_size)sB. '
'You need to configure file-based or database-backed '
'sessions instead of cookie-based sessions: '
'http://docs.openstack.org/developer/horizon/topics/'
'deployment.html#session-storage'
% {
'user_id': request.session.get(
'user_id', 'Unknown'),
'cookie_size': cookie_size,
'max_cookie_size': max_cookie_size,
}
)
tz = request.session.get('django_timezone')
if tz:
timezone.activate(tz)
def user_preference(request):
current_user = User.objects.get(username=request.user.username)
if request.method == 'POST':
if request.POST.get('cancel_button'):
messages.warning(request, _(u"Changing user settings canceled"))
return HttpResponseRedirect(reverse('home'))
else:
errors = {}
stprofile = StProfile.objects.get_or_create(user=current_user)[0]
form_first_name=request.POST.get('first_name', '').strip()
current_user.first_name = form_first_name
form_last_name=request.POST.get('last_name', '').strip()
current_user.last_name = form_last_name
form_email=request.POST.get('email', '').strip()
users_same_email = User.objects.filter(email=form_email)
if len(users_same_email) > 0 and current_user.email != form_email:
current_user.email = form_email
errors['email'] = _(u"This email address is already in use." +
" Please enter a different email address.")
elif len(form_email) > 0:
try:
validate_email(form_email)
except ValidationError:
errors['email'] = _(u"Enter a valid email address")
else:
current_user.email = form_email
form_language=request.POST.get('lang')
if stprofile.language != form_language:
stprofile.language = form_language
translation.activate(form_language)
request.session[translation.LANGUAGE_SESSION_KEY] = form_language
form_time_zone=request.POST.get('time_zone')
if stprofile.time_zone != form_time_zone:
stprofile.time_zone = form_time_zone
timezone.activate(form_time_zone)
request.session['django_timezone'] = form_time_zone
if errors:
messages.error(request, _(u'Please, correct the following errors'))
return render(request, 'students/user_preference.html',
{'current_user': current_user, 'timezones': pytz.common_timezones, 'errors': errors})
current_user.save()
stprofile.save()
messages.success(request, _(u"User settings changed successfully"))
return HttpResponseRedirect(reverse('home'))
else:
return render(request, 'students/user_preference.html',
{'current_user': current_user, 'timezones': pytz.common_timezones})
def test_all(boite, mocker):
timezone.activate('Africa/Niamey')
now = timezone.now()
now = now.replace(year=2000, month=1, day=13, hour=13, minute=35, second=1)
mocker.patch('laboite.apps.time.models.timezone.now', return_value=now)
translation.activate('fr')
with mocker.patch('laboite.apps.time.models.AppTime.should_update',
return_value=True): # Force update
app = AppTime.objects.create(boite=boite,
enabled=True,
tz='Africa/Niamey')
result = app.get_app_dictionary()
assert len(result) == 3
assert result['data'] == [{'type': 'text',
'width': 25,
'height': 8,
'x': 4,
'y': 1,
'content': '14:35'}]
assert result['height'] == 8
assert result['width'] == 32
app.tz = 'Pacific/Pitcairn'
app.save()
result = app.get_app_dictionary()
assert len(result) == 3
assert result['data'] == [{'type': 'text',
'width': 25,
'height': 8,
'x': 4,
'y': 1,
'content': '05:35'}]
assert result['height'] == 8
assert result['width'] == 32
translation.activate('en-us')
result = app.get_app_dictionary()
assert len(result) == 3
assert result['data'] == [{'type': 'text',
'width': 25,
'height': 8,
'x': 4,
'y': 1,
'content': '5:35 a.m.'}]
assert result['height'] == 8
assert result['width'] == 32
def create_jwt_payload(user, expiration_delta, issuer, version=None, **kwargs):
timezone.activate(settings.TIME_ZONE)
now = timezone.now()
expiration_date = now + datetime.timedelta(seconds=expiration_delta)
version = settings.JWT_VERSION if not version else version
not_before = now - datetime.timedelta(
seconds=settings.JWT_NBF_LEEWAY_SECONDS)
payload = {
"user": {
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
},
# used jwt RFC claims
# JWT expiration
"exp": int(expiration_date.strftime("%s")),
# Issuer of the token
"iss": issuer,
# Issued at
"iat": int(now.strftime("%s")),
# # Not before (dont use before)
"nbf": int(not_before.strftime("%s")),
# # Subject of the token
# "sub":
# # Audience of the token
# "aud":
# # JWT token id
# "jti":
# Version of the token
"version": version,
}
# Only set the permissions on the jwt token if forcen on settings
if settings.JWT_SET_PERMISSION_ON_TOKEN:
payload["permission"] = list(
user.project_permissions.values_list('key', flat=True))
# Set extra args
for k, v in kwargs.items():
payload[k] = v
return payload