Python rest_framework.status 模块,HTTP_500_INTERNAL_SERVER_ERROR 实例源码
我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用rest_framework.status.HTTP_500_INTERNAL_SERVER_ERROR。
def test_500_error_context_logged_out(self):
"""
Assert context values for 500 error page when logged out
"""
# case with specific page
with patch('ui.templatetags.render_bundle._get_bundle') as get_bundle:
response = self.client.get('/500/')
assert response.context['authenticated'] is False
assert response.context['name'] == ""
assert response.context['is_public'] is True
assert response.context['has_zendesk_widget'] is True
self.assertContains(response, 'Share this page', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
bundles = [bundle[0][1] for bundle in get_bundle.call_args_list]
assert set(bundles) == {
'common',
'public',
'sentry_client',
'style',
'style_public',
'zendesk_widget',
}
def test_add_channel_failed_create_channel(mock_staff_client, mocker):
"""If client.channels.create fails an exception should be raised"""
response_500 = Response()
response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR
mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500)
with pytest.raises(ChannelCreationException) as ex:
api.add_channel(
Search.from_dict({}),
"title",
"name",
"public_description",
"channel_type",
123,
456,
)
assert ex.value.args[0] == "Error creating channel name"
mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with()
assert mock_staff_client.channels.create.call_count == 1
assert PercolateQuery.objects.count() == 0
assert Channel.objects.count() == 0
def test_generate_mailgun_response_json_with_failed_json_call(self):
"""
Tests that generate_mailgun_response_json() returns without erroring if Response.json() call fails for
non 401 status code
"""
# Response.json() error
response = Mock(
spec=Response,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
json=lambda: (_ for _ in []).throw(ValueError), # To get .json() to throw ValueError
reason="reason"
)
self.assertDictEqual(
generate_mailgun_response_json(response),
{"message": response.reason}
)
def test_view_response_improperly_configured(self):
"""
Test that the SearchResultMailView will raise ImproperlyConfigured if mailgun returns 401, which
results in returning 500 since micromasters.utils.custom_exception_handler catches ImproperlyConfigured
"""
with patch(
'mail.views.get_all_query_matching_emails', autospec=True, return_value=self.email_results
), patch(
'mail.views.MailgunClient'
) as mock_mailgun_client, patch(
'mail.views.get_mail_vars', autospec=True, return_value=self.email_vars,
) as mock_get_mail_vars:
mock_mailgun_client.send_batch.side_effect = ImproperlyConfigured
resp = self.client.post(self.search_result_mail_url, data=self.request_data, format='json')
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
mock_get_mail_vars.assert_called_once_with(self.email_results)
def heartbeat(request): # pylint: disable=unused-argument
"""
View to check if database is reachable and ready to handle requests.
"""
try:
db_status()
except DatabaseError:
return JsonResponse(
{'OK': False},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return JsonResponse(
{'OK': True},
status=status.HTTP_200_OK
)
def logs(self, request, **kwargs):
app = self.get_object()
try:
return Response(app.logs(request.query_params.get('log_lines',
str(settings.LOG_LINES))),
status=status.HTTP_200_OK, content_type='text/plain')
except requests.exceptions.RequestException:
return Response("Error accessing logs for {}".format(app.id),
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
content_type='text/plain')
except EnvironmentError as e:
if e.message == 'Error accessing deis-logger':
return Response("Error accessing logs for {}".format(app.id),
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
content_type='text/plain')
else:
return Response("No logs for {}".format(app.id),
status=status.HTTP_204_NO_CONTENT,
content_type='text/plain')
def logs(self, request, **kwargs):
app = self.get_object()
try:
return Response(app.logs(request.query_params.get('log_lines',
str(settings.LOG_LINES))),
status=status.HTTP_200_OK, content_type='text/plain')
except requests.exceptions.RequestException:
return Response("Error accessing logs for {}".format(app.id),
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
content_type='text/plain')
except EnvironmentError as e:
if e.message == 'Error accessing deis-logger':
return Response("Error accessing logs for {}".format(app.id),
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
content_type='text/plain')
else:
return Response("No logs for {}".format(app.id),
status=status.HTTP_204_NO_CONTENT,
content_type='text/plain')
def comment_post(request, pid, co, owner_id):
"""
https://themoviebook.herokuapp.com/posts/comment/userpid=<>/postid=<>/rating=<>/
GET request: Adds comment
"""
try:
profile = request.user.profile
post_owner = UserProfile.objects.get(pk=owner_id)
post = post_owner.posts.get(pk=pid)
post.comment(profile, co)
return JsonResponse({'detail': 'successful'}, safe=False, status=status.HTTP_200_OK)
except Exception:
return JsonResponse({'detail': 'failed'}, safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# ['GET']
def custom_exception_handler(exc, context):
"""
Custom django rest framework exception handler that includes 'status_code' field in the
responses.
"""
# Call REST framework's default exception handler first
response = exception_handler(exc, context)
# Catch unexpected exceptions
if response is None:
if isinstance(exc, Exception):
data = {'detail': 'A server error occurred.'}
set_rollback()
response = Response(data, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Now add the HTTP status code to the response.
if response is not None:
response.data['status_code'] = response.status_code
return response
def create(self, validated_data):
if self._get_cache() is not None:
raise Throttled()
callback = validated_data['callback']
signed_data = signing.dumps(dict(callback=callback, user_id=self.user.pk))
callback = add_params(callback, sign=signed_data)
email_message = get_password_reset_email(self.user, callback)
try:
email_message.send()
except smtplib.SMTPServerDisconnected as e:
raise serializers.ValidationError(
'Mail sending timeout.', code=status.HTTP_500_INTERNAL_SERVER_ERROR)
except smtplib.SMTPException as e:
raise serializers.ValidationError(
'Unknown SMTP error: %s.' % str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
self._set_cache()
return 'OK'
def handle_exception(self, exc):
exception_handler = self.settings.EXCEPTION_HANDLER
context = self.get_exception_handler_context()
response_data = exception_handler(exc, context)
if response_data is None:
response_data = {
'detail': 'Internal server error.',
'status': status.HTTP_500_INTERNAL_SERVER_ERROR
}
else:
status_code = getattr(exc, 'status_code', None)
if status_code is not None:
response_data.update({'status': status_code})
self.send_exception(response_data)
def handle_exception(self, exc):
exception_handler = self.settings.EXCEPTION_HANDLER
context = self.get_exception_handler_context()
response_data = exception_handler(exc, context)
if response_data is None:
status_code = rest_framework_status.HTTP_500_INTERNAL_SERVER_ERROR
logger.error(exc)
else:
status_code = getattr(
exc,
'status_code',
rest_framework_status.HTTP_500_INTERNAL_SERVER_ERROR
)
self.route_send(
self.request.reply_channel,
data=response_data if response_data else {'detail': 'Internal server error.'},
status=status_code
)
def direct_channel(self, request):
"""
Gets or creates a direct channel to the user
---
request_serializer: DirectChannelSerializer
response_serializer: ChannelSerializer
"""
serializer = self.get_serializer(data=request.data)
channel = None
if serializer.is_valid(raise_exception=True):
user = serializer.validated_data['user']
channel = get_or_create_direct_channel(request.user, user)
if not channel:
return Response(
{'status': "Couldn't get or create a direct channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
response_serializer = ChannelSerializer(channel)
return Response(response_serializer.data)
def task_channel(self, request, task_id=None):
"""
Gets or creates task channel
---
response_serializer: ChannelSerializer
"""
task = get_object_or_404(Task.objects.all(), pk=task_id)
channel = None
if task:
channel = get_or_create_task_channel(request.user, task)
if not channel:
return Response(
{'status': "Couldn't create task channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
response_serializer = ChannelSerializer(channel, context={'request': request})
return Response(response_serializer.data)
def post(self, request, token):
serializer = UpdateSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
try:
bot = Bot.objects.get(token=token)
bot.handle(Update.de_json(request.data, bot._bot))
except Bot.DoesNotExist:
logger.warning("Token %s not associated to a bot" % token)
return Response(serializer.errors, status=status.HTTP_404_NOT_FOUND)
except:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
logger.error("Error processing %s for token %s" % (request.data, token))
return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return Response(serializer.data, status=status.HTTP_200_OK)
logger.error("Validation error: %s from message %s" % (serializer.errors, request.data))
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def __init__(self, request, action):
self.Code = status.HTTP_500_INTERNAL_SERVER_ERROR
self.Result = {}
self.Errors = []
self.request = request
self.action = action
self.input_data = self.request.data
def process(self):
# Must be overwriten
self.setCode(status.HTTP_500_INTERNAL_SERVER_ERROR)
def __init__(self, request, endpoint, action):
self.Code = status.HTTP_500_INTERNAL_SERVER_ERROR
self.Result = {}
self.Errors = []
has_callback = request.GET.get('callback', None)
if has_callback:
self.Callback = has_callback
self.setCode(self.process(request, endpoint, action))
def test_500_error_context_logged_in(self):
"""
Assert context values for 500 error page when logged in
"""
with mute_signals(post_save):
profile = self.create_and_login_user()
self.client.force_login(profile.user)
with override_settings(EMAIL_SUPPORT='support'), patch(
'ui.templatetags.render_bundle._get_bundle'
) as get_bundle:
response = self.client.get('/500/')
assert response.context['authenticated'] is True
assert response.context['name'] == profile.preferred_name
assert response.context['support_email'] == 'support'
assert response.context['is_public'] is True
assert response.context['has_zendesk_widget'] is True
self.assertContains(response, 'Share this page', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
bundles = [bundle[0][1] for bundle in get_bundle.call_args_list]
assert set(bundles) == {
'common',
'public',
'sentry_client',
'style',
'style_public',
'zendesk_widget',
}
def test_enrollment_fails(self, mock_refresh, mock_edx_enr): # pylint: disable=unused-argument
"""
Test error when backend raises an exception
"""
error = HTTPError()
error.response = MagicMock()
error.response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
mock_edx_enr.side_effect = error
resp = self.client.post(self.url, {'course_id': self.course_id}, format='json')
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
# the response has a structure like {"error": "<message>"}
assert isinstance(resp.data, dict)
assert 'error' in resp.data
assert mock_edx_enr.call_count == 1
# assert just the second argument, since the first is `self`
assert mock_edx_enr.call_args[0][1] == self.course_id
# if instead edX returns a 400 error, an exception is raised by
# the view and the user gets a different error message
error.response.status_code = status.HTTP_400_BAD_REQUEST
mock_edx_enr.side_effect = error
resp = self.client.post(self.url, {'course_id': self.course_id}, format='json')
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert isinstance(resp.data, list)
assert len(resp.data) == 1
assert PossiblyImproperlyConfigured.__name__ in resp.data[0]
# if the error from the call to edX is is not HTTPError, the user gets a normal json error
mock_edx_enr.side_effect = ValueError()
resp = self.client.post(self.url, {'course_id': self.course_id}, format='json')
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
# the response has a structure like {"error": "<message>"}
assert isinstance(resp.data, dict)
assert 'error' in resp.data
def test_improperly_configured(self, exception_to_raise, mock_sentry):
"""
Test a standard exception not handled by default by the rest framework
"""
exp = exception_to_raise('improperly configured')
resp = custom_exception_handler(exp, self.context)
assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert resp.data == ['{0}: improperly configured'.format(exception_to_raise.__name__)]
mock_sentry.assert_called_once_with()
def get(self, request):
if "number" not in request.GET:
return Response("No number specified.",
status=status.HTTP_400_BAD_REQUEST)
query_num = request.GET["number"]
try:
n = Number.objects.get(number=query_num)
if n.state == 'inuse':
if not n.subscriber:
# See issue 260.
print 'Error: number %s has no subscriber' % n.number
return Response(
'Number %s has no associated subscriber.' % n.number,
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
if not n.subscriber.bts:
# See issue 386.
print 'Error: subscriber "%s" has no BTS' % n.subscriber
return Response(
'Subscriber "%s" has no BTS.' % n.subscriber,
status=status.HTTP_404_NOT_FOUND)
# Strip the protocol field and just return the rest,
# removing any trailing slash.
bts_info = urlparse.urlparse(n.subscriber.bts.inbound_url)
bts_netloc = bts_info.netloc
bts_host = bts_info.hostname
result = {
'netloc': bts_netloc,
'number': n.number,
'hostname': bts_host,
'source': n.kind,
'owner': n.network.id
}
return Response(result, status=status.HTTP_200_OK)
else:
return Response("No such number",
status=status.HTTP_404_NOT_FOUND)
except Number.DoesNotExist:
return Response("No such number", status=status.HTTP_404_NOT_FOUND)
def get(self, request, *args, **kwargs):
health_check = HealthCheck()
result = health_check.run()
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
if result.is_healthy:
status_code = status.HTTP_200_OK
return Response(result.data, status=status_code)
def like_post(request, pid, ra, owner_id):
"""
https://themoviebook.herokuapp.com/posts/like/userpid=<>/postid=<>/rating=<>/
GET request: adds Like
"""
try:
profile = request.user.profile
post_owner = UserProfile.objects.get(pk=owner_id)
post = post_owner.posts.get(pk=pid)
post.like(profile, ra)
return JsonResponse({'detail': 'successful'}, safe=False, status=status.HTTP_200_OK)
except Exception:
return JsonResponse({'detail': 'failed'}, safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def api_500(request):
return RenderedResponse({'detail': 'Internal server error', 'request':request,
'status': status.HTTP_500_INTERNAL_SERVER_ERROR})
def deploy_view(request):
if request.data['payload']['outcome'] == 'success':
build_info_obj = save_build_object(request)
clone_or_pull_repo(request)
config_dict = open_just_config(request.data['payload']['reponame'])
if (config_dict == None):
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
allowed_branches = config_dict['Deploy'].keys()
deploy_build(request, config_dict, allowed_branches)
return Response(status=status.HTTP_200_OK)
def custom_exception_handler(exc, context):
"""
Formats REST exceptions like:
{
"error": "error_code",
"error_description": "description of the error",
}
:param exc: Exception
:return: Response
"""
# Call REST framework's default exception handler first,
# to get the standard error response.
response = exception_handler(exc, context)
if not response:
# Unhandled exceptions (500 internal server errors)
response = Response(data={
'error': 'server_error',
'error_description': unicode(exc),
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return response
if hasattr(exc, 'default_error'):
response.data['error'] = exc.default_error
else:
response.data['error'] = 'api_error'
if hasattr(exc, 'default_detail'):
response.data['error_description'] = exc.default_detail
elif 'detail' in response.data:
response.data['error_description'] = response.data['details']
if 'detail' in response.data:
del response.data['detail']
return response
def create(self, request, *args, **kwargs):
post_pk = kwargs['pk']
like_user = request.user.pk
if PostLike.objects.filter(post=post_pk, like_user=like_user):
return Response({"detail": "?? ???? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
request.data._mutable = True
request.data['like_user'] = request.user.pk
request.data['post'] = post_pk
return super().create(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs):
post_pk = kwargs['pk']
like_user = request.user.pk
if PostLike.objects.filter(post=post_pk, like_user=like_user):
instance = PostLike.objects.get(post=post_pk, like_user=like_user)
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
else:
return Response({"detail": "?? ???? ??? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def create(self, request, *args, **kwargs):
post = kwargs['pk']
bookmark_user = request.user.pk
if PostBookMark.objects.filter(post=post, bookmark_user=bookmark_user):
return Response({"detail": "?? ???? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
request.data._mutable = True
request.data['bookmark_user'] = request.user.pk
request.data['post'] = kwargs['pk']
return super().create(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs):
post = kwargs['pk']
bookmark_user = request.user.pk
if PostBookMark.objects.filter(post=post, bookmark_user=bookmark_user):
instance = PostBookMark.objects.get(post=post, bookmark_user=bookmark_user)
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
else:
return Response({"detail": "?? ????? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request, hook_id):
"""
Process Messenger webhook.
1. Get an enabled Messenger bot
3. For each message serialize
4. For each message create :class:`MessengerMessage <permabots.models.messenger_api.MessengerMessage>`
5. Delay processing of each message to a task
6. Response provider
"""
try:
bot = caching.get_or_set(MessengerBot, hook_id)
except MessengerBot.DoesNotExist:
logger.warning("Hook id %s not associated to a bot" % hook_id)
return Response(status=status.HTTP_404_NOT_FOUND)
logger.debug("Messenger Bot %s attending request %s" % (bot, request.data))
webhook = Webhook.from_json(request.data)
for webhook_entry in webhook.entries:
for webhook_message in webhook_entry.messaging:
try:
if webhook_message.is_delivery:
raise OnlyTextMessages
message = self.create_message(webhook_message, bot)
if bot.enabled:
logger.debug("Messenger Bot %s attending request %s" % (bot, message))
handle_messenger_message.delay(message.id, bot.id)
else:
logger.error("Message %s ignored by disabled bot %s" % (message, bot))
except OnlyTextMessages:
logger.warning("Not text message %s for bot %s" % (message, hook_id))
except:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
logger.error("Error processing %s for bot %s" % (webhook_message, hook_id))
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(status=status.HTTP_200_OK)
def post(self, request, hook_id):
"""
Process Telegram webhook.
1. Serialize Telegram message
2. Get an enabled Telegram bot
3. Create :class:`Update <permabots.models.telegram_api.Update>`
5. Delay processing to a task
6. Response provider
"""
serializer = UpdateSerializer(data=request.data)
if serializer.is_valid():
try:
bot = caching.get_or_set(TelegramBot, hook_id)
except TelegramBot.DoesNotExist:
logger.warning("Hook id %s not associated to an bot" % hook_id)
return Response(serializer.errors, status=status.HTTP_404_NOT_FOUND)
try:
update = self.create_update(serializer, bot)
if bot.enabled:
logger.debug("Telegram Bot %s attending request %s" % (bot.token, request.data))
handle_update.delay(update.id, bot.id)
else:
logger.error("Update %s ignored by disabled bot %s" % (update, bot.token))
except OnlyTextMessages:
logger.warning("Not text message %s for bot %s" % (request.data, hook_id))
return Response(status=status.HTTP_200_OK)
except:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
logger.error("Error processing %s for bot %s" % (request.data, hook_id))
return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return Response(serializer.data, status=status.HTTP_200_OK)
logger.error("Validation error: %s from message %s" % (serializer.errors, request.data))
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, key):
"""
Process notitication hooks:
1. Obtain Hook
2. Check Auth
3. Delay processing to a task
4. Respond requester
"""
try:
hook = Hook.objects.get(key=key, enabled=True)
except Hook.DoesNotExist:
msg = _("Key %s not associated to an enabled hook or bot") % key
logger.warning(msg)
return Response(msg, status=status.HTTP_404_NOT_FOUND)
if hook.bot.owner != request.user:
raise exceptions.AuthenticationFailed()
try:
parsed_data = request.data
logger.debug("Hook %s attending request %s" % (hook, parsed_data))
handle_hook.delay(hook.id, parsed_data)
except ParseError as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
except:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
msg = _("Error processing %s for key %s") % (request.data, key)
logger.error(msg)
return Response(msg, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return Response(status=status.HTTP_200_OK)
def test_get_internal_error(self, mock_obj):
mock_obj.return_value = ANSWER_INTERNAL_ERROR
self.client.force_authenticate(user=self.user)
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
def devino_request(self, serializer=None):
if serializer:
serializer.is_valid(raise_exception=True)
json_data = json.dumps(
serializer.validated_data if serializer else None,
default=date_handler
)
devino_request = models.DevinoRequest.objects.create(api_resource=self.api_resource, data=json_data)
try:
if serializer:
answer = self.api_resource_lib(**serializer.validated_data)
else:
answer = self.api_resource_lib()
models.DevinoAnswer.objects.create(
code=answer.code,
description=answer.description,
result=answer.result,
request=devino_request,
)
if answer.code == consts.STATUS_BAD_REQUEST:
status_response = status.HTTP_400_BAD_REQUEST
elif answer.code == consts.STATUS_ERROR_API:
status_response = status.HTTP_500_INTERNAL_SERVER_ERROR
else:
status_response = status.HTTP_200_OK
return Response({'code': answer.code, 'description': answer.description, 'result': answer.result},
status=status_response)
except DevinoException as ex:
error = models.DevinoAnswer.objects.create(
code=ex.error.code,
description=ex.error.description,
request=devino_request,
is_fail=True,
)
return Response({'code': error.code, 'description': error.description}, status=status.HTTP_400_BAD_REQUEST)
def support_channel(self, request):
"""
Gets or creates a direct channel for the current user or customer
---
request_serializer: SupportChannelSerializer
response_serializer: ChannelSerializer
"""
serializer = self.get_serializer(data=request.data)
channel = None
if serializer.is_valid(raise_exception=True):
if request.user.is_authenticated():
# Create support channel for logged in user
channel = get_or_create_support_channel(request.user)
else:
# Create support channel for anonymous user
channel_id = serializer.validated_data.get('id', None)
if channel_id:
channel = get_object_or_404(self.get_queryset(), pk=channel_id)
email = serializer.validated_data.get('email', None)
name = serializer.validated_data.get('name', None)
if email:
customer = Inquirer.objects.create(name=name, email=email)
channel.content_object = customer
channel.save()
else:
return Response(
{'email': "Couldn't get or create a support channel"},
status=status.HTTP_400_BAD_REQUEST
)
else:
channel = Channel.objects.create(type=CHANNEL_TYPE_SUPPORT)#, content_object=customer)
if not channel:
return Response(
{'status': "Couldn't get or create a support channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
response_serializer = ChannelSerializer(channel, context={'request': request})
return Response(response_serializer.data)
def developer_channel(self, request):
"""
Gets or creates a developer channel for the current user
---
request_serializer: DeveloperChannelSerializer
response_serializer: ChannelSerializer
"""
serializer = self.get_serializer(data=request.data)
channel = None
if serializer.is_valid(raise_exception=True):
# Create developer channel
subject = serializer.validated_data['subject']
message = serializer.validated_data['message']
channel = create_channel(
request.user,
channel_type=CHANNEL_TYPE_DEVELOPER,
subject=subject,
messages=[
dict(user=request.user, body=message)
]
)
if not channel:
return Response(
{'status': "Couldn't get or create a support channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
response_serializer = ChannelSerializer(channel, context={'request': request})
return Response(response_serializer.data)
def post(self, request, format=None):
try:
rfid = Rfid.objects.get(code=request.data.get('rfid'))
device = Device.objects.get(identifier=request.data.get('device'))
deviceAuth = DeviceAuth.objects.get(device=device.identifier, rfid=rfid.id)
except ObjectDoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
except ValidationError as e:
# except:
# logger.exception("An error occurred")
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
serializer = AuthSerializer(
instance={'name': device.name, 'rfid': rfid.code, 'device': device.identifier})
return Response(serializer.data, status=200)
def post(self, request, hook_id):
"""
Process Kik webhook:
1. Get an enabled Kik bot
2. Verify Kik signature
3. Serialize each message
4. For each message create :class:`KikMessage <permabots.models.kik_api.KikMessage>` and :class:`KikUser <permabots.models.kik_api.KikUser>`
5. Delay each message processing to a task
6. Response provider
"""
try:
bot = caching.get_or_set(KikBot, hook_id)
except KikBot.DoesNotExist:
logger.warning("Hook id %s not associated to a bot" % hook_id)
return Response(status=status.HTTP_404_NOT_FOUND)
signature = request.META.get('HTTP_X_KIK_SIGNATURE')
if signature:
signature.encode('utf-8')
if not bot._bot.verify_signature(signature, request.stream.body):
logger.debug("Kik Bot data %s not verified %s" % (request.data, signature))
return Response(status=403)
logger.debug("Kik Bot data %s verified" % (request.data))
for kik_message in request.data['messages']:
serializer = KikMessageSerializer(data=kik_message)
logger.debug("Kik message %s serialized" % (kik_message))
if serializer.is_valid():
try:
if not self.accepted_types(serializer):
raise OnlyTextMessages
message = self.create_message(serializer, bot)
if bot.enabled:
logger.debug("Kik Bot %s attending request %s" % (bot, kik_message))
handle_message.delay(message.id, bot.id)
else:
logger.error("Message %s ignored by disabled bot %s" % (message, bot))
except OnlyTextMessages:
logger.warning("Not text message %s for bot %s" % (kik_message, hook_id))
return Response(status=status.HTTP_200_OK)
except:
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
logger.error("Error processing %s for bot %s" % (kik_message, hook_id))
return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
logger.error("Validation error: %s from kik message %s" % (serializer.errors, kik_message))
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data, status=status.HTTP_200_OK)
def send_form(request):
"""
Receives data from a contact form and creates a GPG-encrypted email including that data.
Depending on the data's given confidentiality the email is sent to pre-defined receivers.
"""
form = ContactForm(request.POST, request.FILES)
if not form.is_valid():
return Response({'status': 'error', 'errors': form.errors},
status=status.HTTP_400_BAD_REQUEST)
data = form.cleaned_data
uploaded_files = request.FILES.getlist('files')
if uploaded_files:
res = form.files_are_valid(request)
if res is not True:
return Response({'status': 'error', 'errors': res},
status=status.HTTP_400_BAD_REQUEST)
logger.debug("Contact data includes %i uploaded file(s).", len(uploaded_files))
subject = "Neue Nachricht von Streetcoverage"
body = EMAIL_TEMPLATE % (
data['name'] or "(kein Name angegeben)",
data['email'] or "(keine Emailadresse angegeben)",
data['subject'],
"ja" if data['journalist'] else "nein",
"ja" if data['confidential'] else "nein",
data['message']
)
if data['confidential']:
receivers = settings.EMAIL_TO_CONTACT_CONFIDENTIAL
else:
receivers = settings.EMAIL_TO_CONTACT_NON_CONFIDENTIAL
email = GPGEmailMessage(subject, body, settings.EMAIL_FROM_CONTACT, receivers)
for uploaded_file in uploaded_files:
# uploaded_file is an InMemoryUploadedFile created by MemoryFileUploadHandler (see setting
# FILE_UPLOAD_HANDLERS). Its mode includes 'b' so its content is a byte string.
# Django's attach functionality internally invokes encode() when the file consists of text.
# Byte strings however do not have an encode() method, so we decode the byte string to a normal string.
# Alternative: instead of attach() we could use attach_file() which has been made aware of types:
# https://github.com/django/django/commit/c6da621def9bc04a5feacd5652c7f9d84f48ad2c
# This would require writing the uploaded file to disk, e.g. using method().
# Note that TemporaryUploadedFile created by TemporaryFileUploadHandler does write files to disk, but uses
# tempfile.NamedTemporaryFile's default mode 'w+b', so we could create a subclass of both classes as
# another alternative.
if uploaded_file.content_type and uploaded_file.content_type.split('/')[0] == 'text':
email.attach(uploaded_file.name, uploaded_file.read().decode('utf-8'),)
else:
email.attach(uploaded_file.name, uploaded_file.read(),)
try:
email.send()
logger.info("Contact email has been sent.")
except GPGException:
return Response({'status': 'error', 'errors': 'Email could not be sent.'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({'status': 'success'})