Python rest_framework.exceptions 模块,NotAcceptable() 实例源码
我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用rest_framework.exceptions.NotAcceptable()。
def get(self, request, *args, **kwargs):
box_office = BoxOfficeMovie.objects.all().order_by('-created')[:10]
# ????? ?? ?? ?? ????
comments = []
for i in box_office:
comment = Comment.objects.filter(movie__pk=i.movie.pk)
for k in comment:
comments.append(k)
# print('???', comments)
# 5? ??? 1? ?? ??
comments = sorted(comments, key=attrgetter('likes_count'), reverse=True)
comments = comments[:5]
# print('???', comments)
if len(comments) == 0:
raise NotAcceptable('???? ????')
else:
best_comment = random.sample(comments, 1)
serializer = CommentSerializer(best_comment, many=True)
return Response(serializer.data)
def authenticate(self, request):
try:
token = request.META['Authorization']
except KeyError:
raise NotAcceptable()
token = token.split()
if token[0] != "token":
raise NotAcceptable()
try:
player = Player.objects.get(auth_token=token[1])
except Player.DoesNotExist:
return None
user = AnonymousUser()
user.player = player
return (user, None)
def employee_update(request, employee_id):
"""
This endpoint update skype, first_name, last_name and location
---
response_serializer: employees.serializers.EmployeeSerializer
parameters:
- name: first_name
required: true
paramType: string
- name: last_name
required: true
paramType: string
- name: skype_id
required: true
paramType: string
- name: location
description: location id
paramType: string
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'PATCH':
try:
employee = get_object_or_404(Employee, pk=employee_id)
employee.skype_id = request.data['skype_id']
employee.first_name = request.data['first_name']
employee.last_name = request.data['last_name']
employee.location = get_object_or_404(Location, pk=request.data['location'])
employee.save()
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
except Exception as e:
print(e)
raise NotAcceptable(config.USER_DATA_IS_MISSING)
def post(self, request, *args, **kwargs):
"""
Returns a token and user_id, for credentials provided.
---
response_serializer: employees.serializers.EmployeeAuthenticationResponse
responseMessages:
- code: 404
message: Not found
- code: 500
message: Unable to log in with provided credentials.
parameters:
- name: username
required: true
paramType: string
- name: password
required: true
:paramType: string
"""
try:
response = super(CustomObtainAuthToken, self).post(request, *args, **kwargs)
token = Token.objects.get(key=response.data['token'])
employee = get_object_or_404(Employee, pk=token.user_id)
return Response({'token': token.key,
'user_id': token.user_id,
'reset_password_code': employee.reset_password_code,
'is_base_profile_complete': employee.is_base_profile_complete,
'is_password_reset_required': employee.is_password_reset_required,
'is_staff': employee.is_staff})
except Exception as e:
print(e)
raise NotAcceptable(config.USER_UNABLE_TO_LOG)
def determine_version(self, request, *args, **kwargs):
media_type = _MediaType(request.accepted_media_type)
version = media_type.params.get(self.version_param, self.default_version)
version = unicode_http_header(version)
if not self.is_allowed_version(version):
raise exceptions.NotAcceptable(self.invalid_version_message)
return version
# We don't need to implement `reverse`, as the versioning is based
# on the `Accept` header, not on the request URL.
def determine_version(self, request, *args, **kwargs):
media_type = _MediaType(request.accepted_media_type)
version = media_type.params.get(self.version_param, self.default_version)
version = unicode_http_header(version)
if not self.is_allowed_version(version):
raise exceptions.NotAcceptable(self.invalid_version_message)
return version
# We don't need to implement `reverse`, as the versioning is based
# on the `Accept` header, not on the request URL.
def perform_create(self, serializer):
"""
1. ?? ???? ??
2. ?? ??? ?? ??? ???? movie ??? ??
3. ?? 1?? 1? ??? ?? ??
"""
movie = Movie.objects.get(pk=self.kwargs['pk'])
author = MyUser.objects.get(pk=self.request.user.pk)
# ?? ???
try:
content = self.request.data['content']
r = ProfanitiesFilter()
clean_content = r.clean(content)
except:
clean_content = ''
if Comment.objects.filter(movie=movie, author=author).exists():
raise NotAcceptable('?? ???? ??????')
serializer.save(movie=movie, author=author, content=clean_content)
# ?? ?? ??
movie.comment_count += 1
new_star = float(self.request.data['star'])
movie.star_sum += new_star
movie.star_average = (movie.star_average * (movie.comment_count - 1) + new_star) / movie.comment_count
movie.save()
def get(self, request):
try:
magazine = Magazine.objects.all()
serializer = MagazineSerializer(magazine, many=True)
return Response(serializer.data)
except:
raise NotAcceptable('not reachable')
def get(self, request, *args, **kwargs):
try:
magazines = Magazine.objects.all()
magazine_samples = random.sample(set(magazines), 4)
serializer = MagazineSerializer(magazine_samples, many=True)
return Response(serializer.data)
except:
raise NotAcceptable('not reachable')
def modify_sandbox(self, request, pk=None):
calendar = self.get_object()
search_kwargs = {}
course_name = ''
if 'course_id' in request.query_params:
course_name = request.query_params['course_id']
search_kwargs['course_id'] = course_name
elif 'short_name' in request.query_params:
course_name = request.query_params['short_name']
short_name = COURSE_RE.match(course_name)
if not short_name:
raise NotAcceptable(detail='short_name field does not match pattern.')
search_kwargs['number'] = short_name.group('num')
search_kwargs['department'] = short_name.group('dept')
if short_name.group('letter'):
search_kwargs['letter'] = short_name.group('letter')
else:
raise NotAcceptable(detail='short_name or course_id parameter required.')
try:
course = Course.objects.get(**search_kwargs)
except Course.DoesNotExist:
raise NotFound('Course "%s" not found.' % course_name)
if request.method == 'POST':
# check if already there, and if so, raise 409
if calendar.sandbox.filter(id=course.id).exists():
raise ContentError('Course "%s" already in sandbox.' % course_name)
calendar.sandbox.add(course)
elif request.method == 'DELETE':
if not calendar.sandbox.filter(id=course.id).exists():
raise ContentError('Course "%s" not in sandbox.' % course_name)
calendar.sandbox.remove(course)
genie.clear_cached_recommendations(calendar.profile_id, calendar.pk)
serializer = CourseSerializer(course)
return Response(serializer.data)
def modify_course(self, request, pk=None):
pref = self.get_object()
search_kwargs = {}
course_name = ''
if 'course_id' in request.query_params:
course_name = request.query_params['course_id']
search_kwargs['course_id'] = course_name
else:
course_name = request.query_params['short_name']
short_name = COURSE_RE.match(course_name)
if not short_name:
raise NotAcceptable(detail='short_name field does not match pattern.')
search_kwargs['number'] = short_name.group('num')
search_kwargs['department'] = short_name.group('dept')
if short_name.group('letter'):
search_kwargs['letter'] = short_name.group('letter')
try:
course = Course.objects.get(**search_kwargs)
except Course.DoesNotExist:
raise NotFound('Course "%s" not found.' % course_name)
if request.method == 'POST':
# check if already there, and if so, raise 409
if pref.bl_courses.filter(id=course.id).exists():
raise ContentError('Course "%s" already in blacklist.' % course_name)
pref.bl_courses.add(course)
elif request.method == 'DELETE':
if not pref.bl_courses.filter(id=course.id).exists():
raise ContentError('Course "%s" not in blacklist.' % course_name)
pref.bl_courses.remove(course)
genie.clear_cached_recommendations(pref.profile_id)
return Response({'success': True})
def modify_course(self, request, pk=None):
semester = self.get_object()
search_kwargs = {}
course_name = ''
if 'course_id' in request.query_params:
course_name = request.query_params['course_id']
search_kwargs['course_id'] = course_name
elif 'short_name' in request.query_params:
course_name = request.query_params['short_name']
short_name = COURSE_RE.match(course_name)
if not short_name:
raise NotAcceptable(detail='short_name field does not match pattern.')
search_kwargs['number'] = short_name.group('num')
search_kwargs['department'] = short_name.group('dept')
if short_name.group('letter'):
search_kwargs['letter'] = short_name.group('letter')
else:
raise NotAcceptable(detail='short_name or course_id parameter required.')
try:
course = Course.objects.get(**search_kwargs)
except Course.DoesNotExist:
raise NotFound('Course "%s" not found.' % course_name)
if request.method == 'POST':
# check if already there, and if so, raise 409
if semester.courses.filter(id=course.id).exists():
raise ContentError('Course "%s" already in semester.' % course_name)
semester.courses.add(course)
elif request.method == 'DELETE':
if not semester.courses.filter(id=course.id).exists():
raise ContentError('Course "%s" not in semester.' % course_name)
semester.courses.remove(course)
genie.clear_cached_recommendations(semester.calendar.profile_id)
serializer = CourseSerializer(course)
return Response(serializer.data)
def create(self, validated_data):
basket = validated_data.get('basket')
order_number = self.generate_order_number(basket)
billing_address = None
if 'billing_address' in validated_data:
billing_address = BillingAddress(**validated_data['billing_address'])
shipping_address = None
if 'shipping_address' in validated_data:
shipping_address = ShippingAddress(**validated_data['shipping_address'])
# Place the order
try:
order = self._insupd_order(
basket=basket,
user=validated_data.get('user') or AnonymousUser(),
order_number=order_number,
billing_address=billing_address,
shipping_address=shipping_address,
order_total=validated_data.get('total'),
shipping_method=validated_data.get('shipping_method'),
shipping_charge=validated_data.get('shipping_charge'),
guest_email=validated_data.get('guest_email') or '')
except ValueError as e:
raise exceptions.NotAcceptable(str(e))
# Return the order
return order
def employee_update_password(request, employee_id):
"""
This endpoint update employee password
---
response_serializer: employees.serializers.EmployeeSerializer
parameters:
- name: current_password
required: true
paramType: string
- name: new_password
required: true
paramType: string
responseMessages:
- code: 400
message: Bad request.
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'POST':
try:
current_password = request.data['current_password']
new_password = request.data['new_password']
except Exception as e:
print(e)
raise NotAcceptable(config.USER_DATA_IS_MISSING)
employee = get_object_or_404(Employee, pk=employee_id)
if current_password == new_password:
content = {'detail': config.PASSWORD_EQUAL}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
elif employee.check_password(current_password):
employee.set_password(new_password)
employee.reset_password_code = None
employee.is_password_reset_required = False
employee.save()
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
else:
content = {'detail': config.WRONG_CURRENT_PASSWORD}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
def select_renderer(self, request, renderers, format_suffix=None):
"""
Given a request and a list of renderers, return a two-tuple of:
(renderer, media type).
"""
# Allow URL style format override. eg. "?format=json
format_query_param = self.settings.URL_FORMAT_OVERRIDE
format = format_suffix or request.query_params.get(format_query_param)
if format:
renderers = self.filter_renderers(renderers, format)
accepts = self.get_accept_list(request)
# Check the acceptable media types against each renderer,
# attempting more specific media types first
# NB. The inner loop here isn't as bad as it first looks :)
# Worst case is we're looping over len(accept_list) * len(self.renderers)
for media_type_set in order_by_precedence(accepts):
for renderer in renderers:
for media_type in media_type_set:
if media_type_matches(renderer.media_type, media_type):
# Return the most specific media type as accepted.
media_type_wrapper = _MediaType(media_type)
if (
_MediaType(renderer.media_type).precedence >
media_type_wrapper.precedence
):
# Eg client requests '*/*'
# Accepted media type is 'application/json'
full_media_type = ';'.join(
(renderer.media_type,) +
tuple('{0}={1}'.format(
key, value.decode(HTTP_HEADER_ENCODING))
for key, value in media_type_wrapper.params.items()))
return renderer, full_media_type
else:
# Eg client requests 'application/json; indent=8'
# Accepted media type is 'application/json; indent=8'
return renderer, media_type
raise exceptions.NotAcceptable(available_renderers=renderers)
def select_renderer(self, request, renderers, format_suffix=None):
"""
Given a request and a list of renderers, return a two-tuple of:
(renderer, media type).
"""
# Allow URL style format override. eg. "?format=json
format_query_param = self.settings.URL_FORMAT_OVERRIDE
format = format_suffix or request.query_params.get(format_query_param)
if format:
renderers = self.filter_renderers(renderers, format)
accepts = self.get_accept_list(request)
# Check the acceptable media types against each renderer,
# attempting more specific media types first
# NB. The inner loop here isn't as bad as it first looks :)
# Worst case is we're looping over len(accept_list) * len(self.renderers)
for media_type_set in order_by_precedence(accepts):
for renderer in renderers:
for media_type in media_type_set:
if media_type_matches(renderer.media_type, media_type):
# Return the most specific media type as accepted.
media_type_wrapper = _MediaType(media_type)
if (
_MediaType(renderer.media_type).precedence >
media_type_wrapper.precedence
):
# Eg client requests '*/*'
# Accepted media type is 'application/json'
full_media_type = ';'.join(
(renderer.media_type,) +
tuple('{0}={1}'.format(
key, value.decode(HTTP_HEADER_ENCODING))
for key, value in media_type_wrapper.params.items()))
return renderer, full_media_type
else:
# Eg client requests 'application/json; indent=8'
# Accepted media type is 'application/json; indent=8'
return renderer, media_type
raise exceptions.NotAcceptable(available_renderers=renderers)
def _insupd_order(self, basket, user=None, shipping_address=None, billing_address=None, **kwargs):
existing_orders = basket.order_set.all()
if existing_orders.exclude(status=settings.ORDER_STATUS_PAYMENT_DECLINED).exists():
raise exceptions.NotAcceptable(_("An non-declined order already exists for this basket."))
existing_count = existing_orders.count()
if existing_count > 1:
raise exceptions.NotAcceptable(_("Multiple order exist for this basket! This should never happen and we don't know what to do."))
# Get request object from context
request = self.context.get('request', None)
# If no orders were pre-existing, make a new one.
if existing_count == 0:
return self.place_order(
basket=basket,
user=user,
shipping_address=shipping_address,
billing_address=billing_address,
request=request,
**kwargs)
# Update this order instead of making a new one.
order = existing_orders.first()
kwargs['order_number'] = order.number
status = self.get_initial_order_status(basket)
shipping_address = self.create_shipping_address(
user=user,
shipping_address=shipping_address)
billing_address = self.create_billing_address(
user=user,
billing_address=billing_address,
shipping_address=shipping_address,
**kwargs)
return utils.OrderUpdater().update_order(
order=order,
basket=basket,
user=user,
shipping_address=shipping_address,
billing_address=billing_address,
status=status,
request=request,
**kwargs)