Python rest_framework.status 模块,HTTP_200_OK 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用rest_framework.status.HTTP_200_OK。
def employee_role_list(request):
"""
Returns employee role full list
---
serializer: employees.serializers.EmployeeRoleListSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'GET':
role_list = get_list_or_404(Role)
serializer = EmployeeRoleListSerializer(role_list, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all badges
---
serializer: administrator.serializers.BadgeSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
badges = get_list_or_404(Badge)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(badges, request)
serializer = BadgeSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = BadgeSerializer(badges, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all employee positions
---
serializer: administrator.serializers.PositionSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
positions = get_list_or_404(Position)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(positions, request)
serializer = PositionSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = PositionSerializer(positions, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def SwaggerRequestMethodMaker(model = None):
def model_handler(self, request, *args, **kwargs):
data = kwargs.get('data', None)
resp = model
return Response(resp, status = status.HTTP_200_OK)
def empty_handler(self, request, *args, **kwargs):
data = kwargs.get('data', None)
resp = None
return Response(resp, status = status.HTTP_200_OK)
if model:
return model_handler
else:
return empty_handler
# make named APIView class with specified methods
def test_stop_entry(self):
DESCRIPTION = 'EXAMPLE'
startTime = timezone.now()
currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime)
currentEntry.save()
url = reverse("api:time-entry-stop")
data = {}
response = self.client.post(url, data)
response_start_time = dateparse.parse_datetime(response.data['start'])
response_stop_time = dateparse.parse_datetime(response.data['stop'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], DESCRIPTION)
self.assertEqual(response_start_time, startTime)
self.assertGreater(response_stop_time, response_start_time)
self.assertEqual(response.data['duration'],(response_stop_time- response_start_time).total_seconds())
def category_detail(request, category_id):
"""
Category detail
---
serializer: categories.serializers.CategorySerializer
responseMessages:
- code: 400
message: Bad request.
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
category = get_object_or_404(Category, pk=category_id)
if request.method == 'GET':
serializer = CategorySerializer(category)
return Response(serializer.data, status=status.HTTP_200_OK)
def keyword_detail(request, keyword_id):
"""
Keyword detail
---
serializer: categories.serializers.KeywordSerializer
responseMessages:
- code: 400
message: Bad request.
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
keyword = get_object_or_404(Keyword, pk=keyword_id)
if request.method == 'GET':
serializer = KeywordSerializer(keyword)
return Response(serializer.data, status=status.HTTP_200_OK)
def employee(request, employee_id):
"""
Returns employee details
---
serializer: employees.serializers.EmployeeSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'GET':
employee = get_object_or_404(Employee, pk=employee_id)
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_200_OK)
def employee_position_list(request):
"""
Returns employee position full list
---
serializer: employees.serializers.EmployeePositionListSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'GET':
position_list = get_list_or_404(Position)
serializer = EmployeePositionListSerializer(position_list, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all employees
---
serializer: administrator.serializers.EmployeeSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
employees = get_list_or_404(Employee)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(employees, request)
serializer = EmployeeSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = EmployeeSerializer(employees, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all events
---
serializer: administrator.serializers.EventSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
events = get_list_or_404(Event)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(events, request)
serializer = EventSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = EventSerializer(events, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all employee positions
---
serializer: administrator.serializers.LocationSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
locations = get_list_or_404(Location)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(locations, request)
serializer = LocationSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = LocationSerializer(locations, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all messages
---
serializer: administrator.serializers.MessageSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
messages = get_list_or_404(Message)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(messages, request)
serializer = MessageSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = MessageSerializer(messages, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, employee_id, format=None):
"""
List all messages from employee
---
serializer: administrator.serializers.MessageSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
employee = get_object_or_404(Employee, pk=employee_id)
messages = get_list_or_404(Message, from_user=employee)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(messages, request)
serializer = MessageSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = MessageSerializer(messages, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
Get site info
---
serializer: administrator.serializers.SiteInfoSerializer
"""
email_domain = settings.EMAIL_DOMAIN_LIST[0]
current_site = Site.objects.get_current()
version = config.VERSION
data = {'site': current_site.domain,
'email_domain': email_domain,
'backend_version': version}
serializer = SiteInfoSerializer(data)
return Response(serializer.data, status=status.HTTP_200_OK)
def test_distance_uppsala(self):
point_uppsala = {'lat': 59.858564, 'lng': 17.638927}
url = '%s?dist=100000&point=%s,%s' % (
reverse('record_test_list'),
point_uppsala['lng'], point_uppsala['lat'])
response = self.client.get(url, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
json_response = json.loads(response.content)
order_list = ['Uppsala', 'Gamla Stan', 'Hornstull']
self.assertEqual(len(json_response), len(order_list))
for idx, order_key in enumerate(order_list):
self.assertTrue(json_response[idx]['title'] == order_key)
self.assertEqual(json_response[0]['distance'], 0.0)
def _search_by_field(self, field_name, search_term):
"""Search by field."""
self.authenticate()
url = reverse('bookdocument-list', kwargs={})
data = {}
# Should contain 20 results
response = self.client.get(url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), self.all_count)
# Should contain only 10 results
filtered_response = self.client.get(
url + '?search={}'.format(search_term),
data
)
self.assertEqual(filtered_response.status_code, status.HTTP_200_OK)
self.assertEqual(
len(filtered_response.data['results']),
self.special_count
)
def _search_by_nested_field(self, search_term):
"""Search by field."""
self.authenticate()
url = reverse('citydocument-list', kwargs={})
data = {}
# Should contain 20 results
response = self.client.get(url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), self.all_cities_count)
# Should contain only 10 results
filtered_response = self.client.get(
url + '?search={}'.format(search_term),
data
)
self.assertEqual(filtered_response.status_code, status.HTTP_200_OK)
self.assertEqual(
len(filtered_response.data['results']),
self.switz_cities_count
)
def _field_filter_value(self, field_name, value, count):
"""Field filter value.
Usage example:
>>> self._field_filter_value(
>>> 'title__wildcard',
>>> self.prefix[3:-3],
>>> self.prefix_count
>>> )
"""
url = self.base_url[:]
data = {}
response = self.client.get(
url + '?{}={}'.format(field_name, value),
data
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
len(response.data['results']),
count
)
def _field_filter_terms_list(self, field_name, in_values, count):
"""Field filter terms.
Example:
http://localhost:8000/api/articles/?id=1&id=2&id=3
"""
url = self.base_url[:]
data = {}
url_parts = ['{}={}'.format(field_name, val) for val in in_values]
response = self.client.get(
url + '?{}'.format('&'.join(url_parts)),
data
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
len(response.data['results']),
count
)
def _test_pagination(self):
"""Test pagination."""
self.authenticate()
publishers_url = reverse('publisherdocument-list', kwargs={})
books_url = reverse('bookdocument-list', kwargs={})
data = {}
invalid_page_url = books_url + '?page=3&page_size=30'
invalid_response = self.client.get(invalid_page_url, data)
self.assertEqual(
invalid_response.status_code,
status.HTTP_404_NOT_FOUND
)
valid_page_url = publishers_url + '?limit=5&offset=8'
# Check if response now is valid
valid_response = self.client.get(valid_page_url, data)
self.assertEqual(valid_response.status_code, status.HTTP_200_OK)
# Check totals
self.assertEqual(len(valid_response.data['results']), 5)
def test_question_create_vote(self):
"""
Assert POST /api/question/:id/vote creates a vote for a user
"""
self.login()
question = Question.objects.create(**fixtures['question'])
endpoint = question.get_api_detail_url() + "vote/"
self.assertEqual(question.votes.count(), 0)
response = self.client.post(endpoint, {'vote': 1}, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
expected = {'created': True, 'status': 'vote recorded', 'display': 'upvote'}
self.assertDictContainsSubset(expected, response.data)
self.assertEqual(question.votes.count(), 1)
def test_question_update_vote(self):
"""
Assert POST /api/question/:id/vote updates if already voted
"""
self.login()
question = Question.objects.create(**fixtures['question'])
vote, _ = Vote.objects.punch_ballot(content=question, user=self.user, vote=1)
endpoint = question.get_api_detail_url() + "vote/"
self.assertEqual(question.votes.count(), 1)
response = self.client.post(endpoint, {'vote': -1}, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
expected = {'created': False, 'status': 'vote recorded', 'display': 'downvote'}
self.assertDictContainsSubset(expected, response.data)
self.assertEqual(question.votes.count(), 1)
def results(self, request, pk=None):
job = self.get_object()
if job.status == AnalysisJob.Status.COMPLETE:
results = OrderedDict([
('census_block_count', job.census_block_count),
('census_blocks_url', job.census_blocks_url),
('connected_census_blocks_url', job.connected_census_blocks_url),
('destinations_urls', job.destinations_urls),
('tile_urls', job.tile_urls),
('overall_scores', job.overall_scores),
('overall_scores_url', job.overall_scores_url),
('score_inputs_url', job.score_inputs_url),
('ways_url', job.ways_url),
])
return Response(results, status=status.HTTP_200_OK)
else:
return Response(None, status=status.HTTP_404_NOT_FOUND)
def set_password(self, request, pk=None):
"""Detail ``POST`` endpoint for changing a user's password
Args:
request (rest_framework.request.Request)
pk (str): primary key for user to retrieve user from database
Returns:
Response
"""
old_password = request.data.get('oldPassword')
user = authenticate(email=PFBUser.objects.get(uuid=pk).email,
password=old_password)
if not user:
raise ValidationError({'detail': 'Unable to complete password change'})
new_password = request.data.get('newPassword')
if not new_password:
raise ValidationError({'detail': 'Unable to complete password change'})
user.set_password(new_password)
user.save()
return Response({'detail': 'Successfully changed password'},
status=status.HTTP_200_OK)
def test_login_and_logout(self):
""" tests login and logout for a single user """
# there should be zero tokens
self.assertEqual(MultiToken.objects.all().count(), 0)
token = self.login_and_obtain_token('user1', 'secret1')
self.assertEqual(MultiToken.objects.all().count(), 1)
# verify the token is for user 1
self.assertEqual(
MultiToken.objects.filter(key=token).first().user.username,
'user1'
)
# logout
response = self.rest_do_logout(token)
# make sure the response is "logged_out"
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertContains(response, "{\"status\":\"logged out\"}")
# there should now again be zero tokens in the database
self.assertEqual(MultiToken.objects.all().count(), 0)
def post(self, request, *args, **kwargs):
if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES:
# Only allow auth types allowed in settings.
return Response(status=status.HTTP_404_NOT_FOUND)
serializer = self.serializer_class(data=request.data, context={'request': request})
if serializer.is_valid(raise_exception=True):
# Validate -
user = serializer.validated_data['user']
# Create and send callback token
success = TokenService.send_token(user, self.alias_type, **self.message_payload)
# Respond With Success Or Failure of Sent
if success:
status_code = status.HTTP_200_OK
response_detail = self.success_response
else:
status_code = status.HTTP_400_BAD_REQUEST
response_detail = self.failure_response
return Response({'detail': response_detail}, status=status_code)
else:
return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
if serializer.is_valid(raise_exception=True):
user = serializer.validated_data['user']
token, created = Token.objects.get_or_create(user=user)
if created:
# Initially set an unusable password if a user is created through this.
user.set_unusable_password()
user.save()
if token:
# Return our key for consumption.
return Response({'token': token.key}, status=status.HTTP_200_OK)
else:
log.error(
"Couldn't log in unknown user. Errors on serializer: %s" % (serializer.error_messages, ))
return Response({'detail': 'Couldn\'t log you in. Try again later.'},
status=status.HTTP_400_BAD_REQUEST)
def test_email_auth_success(self):
data = {'email': self.email}
response = self.client.post(self.url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Token sent to alias
callback_token = CallbackToken.objects.filter(user=self.user, is_active=True).first()
challenge_data = {'token': callback_token}
# Try to auth with the callback token
challenge_response = self.client.post(self.challenge_url, challenge_data)
self.assertEqual(challenge_response.status_code, status.HTTP_200_OK)
# Verify Auth Token
auth_token = challenge_response.data['token']
self.assertEqual(auth_token, Token.objects.filter(key=auth_token).first().key)
def test_mobile_signup_success(self):
mobile = '+15551234567'
data = {'mobile': mobile}
# Verify user doesn't exist yet
user = User.objects.filter(**{self.mobile_field_name: '+15551234567'}).first()
# Make sure our user isn't None, meaning the user was created.
self.assertEqual(user, None)
# verify a new user was created with serializer
response = self.client.post(self.url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
user = User.objects.get(**{self.mobile_field_name: '+15551234567'})
self.assertNotEqual(user, None)
# Verify a token exists for the user
self.assertEqual(CallbackToken.objects.filter(user=user, is_active=True).exists(), 1)
def test_mobile_auth_success(self):
data = {'mobile': self.mobile}
response = self.client.post(self.url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Token sent to alias
callback_token = CallbackToken.objects.filter(user=self.user, is_active=True).first()
challenge_data = {'token': callback_token}
# Try to auth with the callback token
challenge_response = self.client.post(self.challenge_url, challenge_data)
self.assertEqual(challenge_response.status_code, status.HTTP_200_OK)
# Verify Auth Token
auth_token = challenge_response.data['token']
self.assertEqual(auth_token, Token.objects.filter(key=auth_token).first().key)
def test_detail_todo_200(self):
view = ObjectListViewSet.as_view()
data = {
'name':'other list',
'priority':3,
'author':self.user_author
}
response = self.create_todo(data)
url = 'api/v1/list/%s/' % response.data['uuid']
request = self.factory.get(url)
force_authenticate(request,user=self.user_author)
response = view(request,'v1',response.data['uuid'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_update_todo_200(self):
view = ObjectListViewSet.as_view()
data = {
'name':'bad name list',
'priority':1,
'author':self.user_author
}
response = self.create_todo(data)
url = 'api/v1/list/%s/' % response.data['uuid']
new_data = {
'name':'Good name list',
'priority':1,
}
request = self.factory.put(url,new_data)
force_authenticate(request,user=self.user_author)
response = view(request,'v1',response.data['uuid'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_todos_author_200(self):
view = AuthorListViewSet.as_view()
all_data = [
{'name':'By author 1','priority':1,'author':self.user_author},
{'name':'By author 2','priority':1,'author':self.user_author},
{'name':'By author 3','priority':1,'author':self.user_author}
]
for data in all_data:
self.create_todo(data)
# Make an authenticated request to the view...
request = self.factory.get('api/v1/author_list/')
force_authenticate(request,user=self.user_author)
response = view(request,'v1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_list_all_item_200(self):
all_data = [
{
'author':self.user_author,'note':'Note 1 for one activity','priority':2,
'title':'activity 1','uuid_list':self.list,'assigned_to':self.user_author},
{'author':self.user_author,'note':'Note 2 for','priority':3,'title':'activity 2','uuid_list':self.list,'assigned_to':self.user_author}
]
for data in all_data:
self.create_item(data)
view = AllItemViewSet.as_view()
request = self.factory.get('api/v1/item/')
force_authenticate(request, user=self.user_author)
response = view(request,'v1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_detail_item_200(self):
""" detail item"""
data = {
'author':self.user_author,
'note':'Note 1 for one activity',
'priority':2,
'title':'activity 1',
'uuid_list':self.list,
'assigned_to':self.user_author
}
response = self.create_item(data)
view = ObjectItemViewSet.as_view()
request = self.factory.get('api/v1/item/%s/' % response.data['uuid'])
force_authenticate(request, user=self.user_author)
response = view(request,'v1',response.data['uuid'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_all_item_for_list_200(self):
all_data = [
{
'author':self.user_author,'note':'Note 1 for one activity','priority':2,
'title':'activity 1','uuid_list':self.list,'assigned_to':self.user_author},
{
'author':self.user_author,'note':'Note 2 for','priority':3,'title':'activity 2','uuid_list':self.list,'assigned_to':self.user_author}
]
for data in all_data:
self.create_item(data)
view = AllItemForListViewSet.as_view()
request = self.factory.get('api/v1/item/list/%s' % self.list)
force_authenticate(request, user=self.user_author)
response = view(request,'v1',self.list)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def put(self, request, version,uuid):
""" Update a Item """
serializer = ItemSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
data = serializer.data
object = self.get_object(uuid)
object.note = data['note']
object.title = data['title']
object.priority = data['priority']
try:
object.due_date = data['due_date']
except KeyError:
pass
object.completed = data['completed']
object.save()
return Response(status=status.HTTP_200_OK)
def test_POST_user_detail(self):
token = Token.objects.get_or_create(user=self.user)[0]
client = APIClient()
client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
name = self.user_data['first_name']
user = User.objects.filter(first_name=name)[0]
temp = {'first_name': 'test', 'email': user.email}
# TODO: Figure out why we need to pass 'email' to make patch request
response = client.patch(reverse('user-detail', kwargs={'pk': user.pk}),
data=temp)
self.assertNotEqual(user.first_name, response.data['first_name'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(temp['first_name'], response.data['first_name'])
def assign(self, request, pk):
try:
task = Task.objects.get(pk=pk, assignee=None)
except Task.DoesNotExist:
return Response(json.dumps({"message": "Already taken"}), status=status.HTTP_400_BAD_REQUEST)
expense, created = TaskExpense.objects.get_or_create(
task=task,
executor_id=request.user.pk,
money=task.money)
if created:
with transaction.atomic():
request.user.update_balance(u"???? ??????", task.money, task=task)
Task.objects.filter(pk=pk, assignee=None).update(assignee=request.user)
return Response(json.dumps({'message': "Taken"}), status=status.HTTP_200_OK)
def test_enrollment(self, mock_refresh, mock_edx_enr, mock_index): # pylint: disable=unused-argument
"""
Test for happy path
"""
cache_enr = CachedEnrollment.objects.filter(
user=self.user, course_run__edx_course_key=self.course_id).first()
assert cache_enr is None
enr_json = {'course_details': {'course_id': self.course_id}}
enrollment = Enrollment(enr_json)
mock_edx_enr.return_value = enrollment
resp = self.client.post(self.url, {'course_id': self.course_id}, format='json')
assert resp.status_code == status.HTTP_200_OK
assert mock_edx_enr.call_count == 1
assert mock_edx_enr.call_args[0][1] == self.course_id
assert resp.data == enr_json
mock_index.delay.assert_called_once_with([self.user.id], check_if_changed=True)
cache_enr = CachedEnrollment.objects.filter(
user=self.user, course_run__edx_course_key=self.course_id).first()
assert cache_enr is not None
def test_skipped_financialaid_object_created(self):
"""
Tests that the user can create a skipped FinancialAid if it doesn't already exist
"""
assert FinancialAidAudit.objects.count() == 0
assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 0
self.make_http_request(self.client.patch, self.skip_url, status.HTTP_200_OK)
assert FinancialAidAudit.objects.count() == 1
assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 1
financial_aid = FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).first()
assert financial_aid.tier_program == get_no_discount_tier_program(self.program)
assert financial_aid.user == self.profile.user
assert financial_aid.status == FinancialAidStatus.SKIPPED
assert is_near_now(financial_aid.date_exchange_rate)
assert financial_aid.country_of_income == self.profile.country
assert financial_aid.country_of_residence == self.profile.country
def test_skipped_financialaid_object_updated(self, financial_aid_status):
"""
Tests that an existing FinancialAid object is updated to have the status "skipped"
"""
financial_aid = FinancialAidFactory.create(
user=self.profile.user,
tier_program=self.tier_programs["75k"],
status=financial_aid_status,
)
assert FinancialAidAudit.objects.count() == 0
self.make_http_request(self.client.patch, self.skip_url, status.HTTP_200_OK)
assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 1
financial_aid.refresh_from_db()
assert financial_aid.tier_program == self.tier_programs["75k"]
assert financial_aid.status == FinancialAidStatus.SKIPPED
# Check logging
assert FinancialAidAudit.objects.count() == 1
def test_provides_edx_link(self):
"""If the program doesn't have financial aid, the checkout API should provide a link to go to edX"""
user = UserFactory.create()
self.client.force_login(user)
course_run = CourseRunFactory.create(
course__program__live=True,
course__program__financial_aid_availability=False,
)
resp = self.client.post(reverse('checkout'), {'course_id': course_run.edx_course_key}, format='json')
assert resp.status_code == status.HTTP_200_OK
assert resp.json() == {
'payload': {},
'url': 'http://edx_base/course_modes/choose/{}/'.format(course_run.edx_course_key),
'method': 'GET',
}
# We should only create Order objects for a Cybersource checkout
assert Order.objects.count() == 0
def test_ignore_duplicate_cancel(self):
"""
If the decision is CANCEL and we already have a duplicate failed order, don't change anything.
"""
course_run, user = create_purchasable_course_run()
order = create_unfulfilled_order(course_run.edx_course_key, user)
order.status = Order.FAILED
order.save()
data = {
'req_reference_number': make_reference_id(order),
'decision': 'CANCEL',
}
with patch(
'ecommerce.views.IsSignedByCyberSource.has_permission',
return_value=True
):
resp = self.client.post(reverse('order-fulfillment'), data=data)
assert resp.status_code == status.HTTP_200_OK
assert Order.objects.count() == 1
assert Order.objects.get(id=order.id).status == Order.FAILED
def test_should_patch_and_update(self):
"""
Should let us update an existing automatic email!
"""
automatic = AutomaticEmailFactory.create(staff_user=self.staff_user)
url = reverse('automatic_email_api-detail', kwargs={'email_id': automatic.id})
update = {
"enabled": not automatic.enabled,
"email_subject": "new subject",
"email_body": "new body",
"id": automatic.id
}
response = self.client.patch(url, update, format='json')
assert response.status_code == status.HTTP_200_OK
automatic.refresh_from_db()
assert automatic.email_subject == update["email_subject"]
assert automatic.email_body == update["email_body"]
assert automatic.enabled == update["enabled"]
def test_send_course_team_email_view(self, mock_mailgun_client):
"""
Test that course team emails are correctly sent through the view
"""
self.client.force_login(self.staff_user)
mock_mailgun_client.send_course_team_email.return_value = Mock(
spec=Response,
status_code=status.HTTP_200_OK,
json=mocked_json()
)
url = reverse(self.url_name, kwargs={'course_id': self.course.id})
resp_post = self.client.post(url, data=self.request_data, format='json')
assert resp_post.status_code == status.HTTP_200_OK
assert mock_mailgun_client.send_course_team_email.called
_, called_kwargs = mock_mailgun_client.send_course_team_email.call_args
assert called_kwargs['user'] == self.staff_user
assert called_kwargs['course'] == self.course
assert called_kwargs['subject'] == self.request_data['email_subject']
assert called_kwargs['body'] == self.request_data['email_body']
assert 'raise_for_status' not in called_kwargs
def test_send_learner_email_view(self, mock_mailgun_client):
"""
Test that learner emails are correctly sent through the view
"""
self.client.force_login(self.staff_user)
mock_mailgun_client.send_individual_email.return_value = Mock(
spec=Response,
status_code=status.HTTP_200_OK,
json=mocked_json()
)
url = reverse(self.url_name, kwargs={'student_id': self.recipient_user.profile.student_id})
resp_post = self.client.post(url, data=self.request_data, format='json')
assert resp_post.status_code == status.HTTP_200_OK
assert mock_mailgun_client.send_individual_email.called
_, called_kwargs = mock_mailgun_client.send_individual_email.call_args
assert called_kwargs['subject'] == self.request_data['email_subject']
assert called_kwargs['body'] == self.request_data['email_body']
assert called_kwargs['recipient'] == self.recipient_user.email
assert called_kwargs['sender_address'] == self.staff_user.email
assert called_kwargs['sender_name'] == self.staff_user.profile.display_name
assert 'raise_for_status' not in called_kwargs
def test_send_financial_aid_view(self, mock_mailgun_client):
"""
Test that the FinancialAidMailView will accept and return expected values
"""
self.client.force_login(self.staff_user_profile.user)
mock_mailgun_client.send_financial_aid_email.return_value = Mock(
spec=Response,
status_code=status.HTTP_200_OK,
json=mocked_json()
)
resp_post = self.client.post(self.url, data=self.request_data, format='json')
assert resp_post.status_code == status.HTTP_200_OK
assert mock_mailgun_client.send_financial_aid_email.called
_, called_kwargs = mock_mailgun_client.send_financial_aid_email.call_args
assert called_kwargs['acting_user'] == self.staff_user_profile.user
assert called_kwargs['financial_aid'] == self.financial_aid
assert called_kwargs['subject'] == self.request_data['email_subject']
assert called_kwargs['body'] == self.request_data['email_body']
assert 'raise_for_status' not in called_kwargs