Python rest_framework.exceptions 模块,PermissionDenied() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.PermissionDenied()。
def authenticate(self, request):
if request.META['REQUEST_METHOD'] != 'POST':
return None
print 'Request data: {}'.format(request.data)
if 'HTTP_TOKEN' not in request.META:
raise exceptions.ParseError("Registration Token not present in the request.")
elif 'sampling_feature' not in request.data:
raise exceptions.ParseError("Sampling feature UUID not present in the request.")
# Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy.
token = request.META['HTTP_TOKEN']
registration = SiteRegistration.objects.filter(registration_token=token).first()
if not registration:
raise exceptions.PermissionDenied('Invalid Security Token')
# request needs to have the sampling feature uuid of the registration -
if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']:
raise exceptions.AuthenticationFailed(
'Site Identifier is not associated with this Token') # or other related exception
return None
def test_publish_release(self, mock_client):
self.client = DockerClient()
self.client.publish_release('ozzy/embryo:git-f2a8020',
{'POWERED_BY': 'Deis'}, 'ozzy/embryo:v4', True)
self.assertTrue(self.client.client.pull.called)
self.assertTrue(self.client.client.tag.called)
self.assertTrue(self.client.client.build.called)
self.assertTrue(self.client.client.push.called)
# Test that a registry host prefix is replaced with deis-registry for the target
self.client.publish_release('ozzy/embryo:git-f2a8020',
{'POWERED_BY': 'Deis'}, 'quay.io/ozzy/embryo:v4', True)
docker_push = self.client.client.push
docker_push.assert_called_with(
'localhost:5000/ozzy/embryo', tag='v4', insecure_registry=True, stream=True)
# Test that blacklisted image names can't be published
with self.assertRaises(PermissionDenied):
self.client.publish_release(
'deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
with self.assertRaises(PermissionDenied):
self.client.publish_release(
'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
def test_build(self, mock_client):
# test that self.client.build was called with proper arguments
self.client = DockerClient()
self.client.build('ozzy/embryo:git-f3a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo', 'v4')
docker_build = self.client.client.build
self.assertTrue(docker_build.called)
args = {"rm": True, "tag": u'localhost:5000/ozzy/embryo:v4', "stream": True}
kwargs = docker_build.call_args[1]
self.assertDictContainsSubset(args, kwargs)
# test that the fileobj arg to "docker build" contains a correct Dockerfile
f = kwargs['fileobj']
self.assertEqual(f.read(), "FROM ozzy/embryo:git-f3a8020\nENV POWERED_BY='Deis'")
# Test that blacklisted image names can't be built
with self.assertRaises(PermissionDenied):
self.client.build('deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.build(
'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
def passwd(self, request, **kwargs):
caller_obj = self.get_object()
target_obj = self.get_object()
if request.data.get('username'):
# if you "accidentally" target yourself, that should be fine
if caller_obj.username == request.data['username'] or caller_obj.is_superuser:
target_obj = get_object_or_404(User, username=request.data['username'])
else:
raise PermissionDenied()
if request.data.get('password') or not caller_obj.is_superuser:
if not target_obj.check_password(request.data['password']):
return Response({'detail': 'Current password does not match'},
status=status.HTTP_400_BAD_REQUEST)
target_obj.set_password(request.data['new_password'])
target_obj.save()
return Response({'status': 'password set'})
def has_permission(self, request, view):
"""
If settings.REGISTRATION_MODE does not exist, such as during a test, return True
Return `True` if permission is granted, `False` otherwise.
"""
try:
if settings.REGISTRATION_MODE == 'disabled':
raise exceptions.PermissionDenied('Registration is disabled')
if settings.REGISTRATION_MODE == 'enabled':
return True
elif settings.REGISTRATION_MODE == 'admin_only':
return request.user.is_superuser
else:
raise Exception("{} is not a valid registation mode"
.format(settings.REGISTRATION_MODE))
except AttributeError:
return True
def test_publish_release(self, mock_client):
self.client = DockerClient()
self.client.publish_release('ozzy/embryo:git-f2a8020',
{'POWERED_BY': 'Deis'}, 'ozzy/embryo:v4', True)
self.assertTrue(self.client.client.pull.called)
self.assertTrue(self.client.client.tag.called)
self.assertTrue(self.client.client.build.called)
self.assertTrue(self.client.client.push.called)
# Test that a registry host prefix is replaced with deis-registry for the target
self.client.publish_release('ozzy/embryo:git-f2a8020',
{'POWERED_BY': 'Deis'}, 'quay.io/ozzy/embryo:v4', True)
docker_push = self.client.client.push
docker_push.assert_called_with(
'localhost:5000/ozzy/embryo', tag='v4', insecure_registry=True, stream=True)
# Test that blacklisted image names can't be published
with self.assertRaises(PermissionDenied):
self.client.publish_release(
'deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
with self.assertRaises(PermissionDenied):
self.client.publish_release(
'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
def test_build(self, mock_client):
# test that self.client.build was called with proper arguments
self.client = DockerClient()
self.client.build('ozzy/embryo:git-f3a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo', 'v4')
docker_build = self.client.client.build
self.assertTrue(docker_build.called)
args = {"rm": True, "tag": u'localhost:5000/ozzy/embryo:v4', "stream": True}
kwargs = docker_build.call_args[1]
self.assertDictContainsSubset(args, kwargs)
# test that the fileobj arg to "docker build" contains a correct Dockerfile
f = kwargs['fileobj']
self.assertEqual(f.read(), "FROM ozzy/embryo:git-f3a8020\nENV POWERED_BY='Deis'")
# Test that blacklisted image names can't be built
with self.assertRaises(PermissionDenied):
self.client.build('deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.build(
'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
def passwd(self, request, **kwargs):
caller_obj = self.get_object()
target_obj = self.get_object()
if request.data.get('username'):
# if you "accidentally" target yourself, that should be fine
if caller_obj.username == request.data['username'] or caller_obj.is_superuser:
target_obj = get_object_or_404(User, username=request.data['username'])
else:
raise PermissionDenied()
if request.data.get('password') or not caller_obj.is_superuser:
if not target_obj.check_password(request.data['password']):
return Response({'detail': 'Current password does not match'},
status=status.HTTP_400_BAD_REQUEST)
target_obj.set_password(request.data['new_password'])
target_obj.save()
return Response({'status': 'password set'})
def has_permission(self, request, view):
"""
If settings.REGISTRATION_MODE does not exist, such as during a test, return True
Return `True` if permission is granted, `False` otherwise.
"""
try:
if settings.REGISTRATION_MODE == 'disabled':
raise exceptions.PermissionDenied('Registration is disabled')
if settings.REGISTRATION_MODE == 'enabled':
return True
elif settings.REGISTRATION_MODE == 'admin_only':
return request.user.is_superuser
else:
raise Exception("{} is not a valid registation mode"
.format(settings.REGISTRATION_MODE))
except AttributeError:
return True
def test_check_status_code(self):
"""
Test status codes and exceptions raised.
"""
# Step 1: Status code 200.
self.authentication._check_status_code(200)
# Step 2: Status code 401.
with self.assertRaises(AuthenticationFailed):
self.authentication._check_status_code(401)
# Step 3: Status code 403.
with self.assertRaises(PermissionDenied):
self.authentication._check_status_code(403)
# Step 4: Status code other than tested.
with self.assertRaises(UnavailableException):
self.authentication._check_status_code(500)
def post(self, request, *args, **kwargs):
user = authenticate(userid=request.data['username'], password=request.data['password'])
if user:
is_active = user.is_active
if is_active:
token, _ = Token.objects.get_or_create(user=user)
response = Response({"token": token.key,
"user_pk": token.user_id,
"created": token.created}, status=status.HTTP_200_OK)
return response
else:
detail = "?? ??? ??????."
raise PermissionDenied(detail=detail)
else:
detail = "???? ?? ? ????. username? password? ?? ??????."
raise ValidationError(detail=detail)
def run(self, request, pk):
"""
Run a Report and create a new ReportResult, overwriting any previous result for the Report.
"""
# Check that the user has permission to run reports.
if not request.user.has_perm('extras.add_reportresult'):
raise PermissionDenied("This user does not have permission to run reports.")
# Retrieve and run the Report. This will create a new ReportResult.
report = self._retrieve_report(pk)
report.run()
serializer = serializers.ReportDetailSerializer(report)
return Response(serializer.data)
#
# User activity
#
def delete(self, request, group, note_id):
if not request.user.is_authenticated():
raise PermissionDenied(detail="Key doesn't have permission to delete Note")
try:
note = Activity.objects.get(
group=group,
type=Activity.NOTE,
user=request.user,
id=note_id,
)
except Activity.DoesNotExist:
raise ResourceDoesNotExist
note.delete()
return Response(status=204)
def put(self, request, group, note_id):
if not request.user.is_authenticated():
raise PermissionDenied(detail="Key doesn't have permission to edit Note")
try:
note = Activity.objects.get(
group=group,
type=Activity.NOTE,
user=request.user,
id=note_id,
)
except Activity.DoesNotExist:
raise ResourceDoesNotExist
serializer = NoteSerializer(data=request.DATA)
if serializer.is_valid():
# Would be nice to have a last_modified timestamp we could bump here
note.data = dict(serializer.object)
note.save()
return Response(serialize(note, request.user), status=200)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def clone(self, request, *args, **kwargs):
self.object = self.get_object()
data = {'xform': self.object.pk, 'username': request.DATA['username']}
serializer = CloneXFormSerializer(data=data)
if serializer.is_valid():
clone_to_user = User.objects.get(username=data['username'])
if not request.user.has_perm('can_add_xform',
clone_to_user.profile):
raise exceptions.PermissionDenied(
detail=_(u"User %(user)s has no permission to add "
"xforms to account %(account)s" %
{'user': request.user.username,
'account': data['username']}))
xform = serializer.save()
serializer = XFormSerializer(
xform.cloned_form, context={'request': request})
return Response(data=serializer.data,
status=status.HTTP_201_CREATED)
return Response(data=serializer.errors,
status=status.HTTP_400_BAD_REQUEST)
def authenticate_credentials(self, key):
token_cache = 'token_' + key
cache_user = cache.get(token_cache)
if cache_user:
return (cache_user, key)
try:
token = self.model.objects.get(key=key)
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('User does not exist.')
if not token.user.is_active:
raise exceptions.PermissionDenied('The user is forbidden.')
utc_now = timezone.now()
if token.created < utc_now - timezone.timedelta(hours=24 * 30):
raise exceptions.AuthenticationFailed('Token has been expired.')
if token:
token_cache = 'token_' + key
cache.set(token_cache, token.user, 24 * 7 * 60 * 60)
return (token.user, token)
def my_exception_handler(exc, context):
# Call REST framework's default exception handler first,
# to get the standard error response.
response = exception_handler(exc, context)
# Now add the HTTP status code to the response.
# print(exc)
# print(context)
if response is not None:
if isinstance(exc, exceptions.AuthenticationFailed):
response.data['error_code'] = 2
elif isinstance(exc, exceptions.PermissionDenied):
response.data['error_code'] = 3
else:
response.data['error_code'] = 1
return response
def place_order(request, pk=None):
"""
Place a specific order.
"""
order = get_object_or_404(rest.models.Order, pk=pk)
if not request.user.is_superuser:
if not order.organization.can_user_scan(request.user):
raise PermissionDenied("You do not have sufficient privileges to start scans for that organization.")
if not order.is_ready_to_place:
raise PermissionDenied(order.get_ready_errors())
order.place_order()
order.save()
send_emails_for_placed_order.delay(
order_uuid=unicode(order.uuid),
receipt_description=order.get_receipt_description(),
)
handle_placed_order.delay(order_uuid=unicode(order.uuid))
return Response(status=204)
def change_rights(self, request, pk):
"""
Used to change a member's rights.
Only administrators can change member's rights.
Only super-administrator can change admin's rights.
If the super-administrator right is requested, then it must come from the current
super-administrator whose thus, losing his status.
If succeeded, returns HTTP_200_OK with the updated GroupMember object
"""
user = request.user
member = self.get_or_404(pk)
rights_serializer, rights = SigmaViewSet.get_deserialized(GroupMemberRightsSerializer, request.data)
if not GroupMember.model.can_change_rights(user, member, rights):
raise PermissionDenied()
if rights.is_super_administrator:
pass # TODO : de-superadminer le gars qui file ses droits
member_serializer, member = self.get_deserialized(rights, member, partial=True)
member_serializer.save()
return Response(member_serializer.data, status=status.HTTP_200_OK)
def add_follower(self, request, pk=None):
guid = request.data.get("guid")
try:
target_profile = Profile.objects.get(guid=guid)
except Profile.DoesNotExist:
raise PermissionDenied("Profile given does not exist.")
profile = self.get_object()
if profile.guid == guid:
raise ValidationError("Cannot follow self!")
profile.following.add(target_profile)
return Response({"status": "Follower added."})
def remove_follower(self, request, pk=None):
guid = request.data.get("guid")
try:
target_profile = Profile.objects.get(guid=guid)
except Profile.DoesNotExist:
raise PermissionDenied("Profile given does not exist.")
profile = self.get_object()
if profile.guid == guid:
raise ValidationError("Cannot unfollow self!")
profile.following.remove(target_profile)
return Response({"status": "Follower removed."})
def log_pending(self, request, pk=None, **kwargs):
"""
Create event log with status == "mk42.apps.core.constants.STATUS_PENDING". Actually NO.
:param request: django request instance.
:type request: django.http.request.HttpRequest.
:param pk: event object primary key.
:type pk: unicode.
:param kwargs: additional args.
:type kwargs: dict.
:return: django rest framework response.
:rtype: rest_framework.response.Response.
"""
raise PermissionDenied(detail=_("Okay move along, move along people, there's nothing to see here!"))
def log_canceled(self, request, pk=None, **kwargs):
"""
Create event log with status == "mk42.apps.core.constants.STATUS_CANCELED".
:param request: django request instance.
:type request: django.http.request.HttpRequest.
:param pk: event object primary key.
:type pk: unicode.
:param kwargs: additional args.
:type kwargs: dict.
:return: django rest framework response.
:rtype: rest_framework.response.Response.
"""
obj = self.get_object()
if request.user != obj.group.owner:
# only group owner can change event status
raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))
log = obj.log_canceled(**kwargs)
if not log:
# can't create event logs with status == "mk42.apps.core.constants.STATUS_CANCELED"
# if log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist
# or log with status == "mk42.apps.core.constants.STATUS_ONGOING" exist
raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_CANCELED), }))
return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def log_ongoing(self, request, pk=None, **kwargs):
"""
Create event log with status == "mk42.apps.core.constants.STATUS_ONGOING".
:param request: django request instance.
:type request: django.http.request.HttpRequest.
:param pk: event object primary key.
:type pk: unicode.
:param kwargs: additional args.
:type kwargs: dict.
:return: django rest framework response.
:rtype: rest_framework.response.Response.
"""
obj = self.get_object()
if request.user != obj.group.owner:
# only group owner can change event status
raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))
log = obj.log_ongoing(**kwargs)
if not log:
# can't create event logs with status == "mk42.apps.core.constants.STATUS_ONGOING"
# if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist
# if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist
# or log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist
raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_ONGOING), }))
return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def log_finished(self, request, pk=None, **kwargs):
"""
Create event log with status == "mk42.apps.core.constants.STATUS_FINISHED".
:param request: django request instance.
:type request: django.http.request.HttpRequest.
:param pk: event object primary key.
:type pk: unicode.
:param kwargs: additional args.
:type kwargs: dict.
:return: django rest framework response.
:rtype: rest_framework.response.Response.
"""
obj = self.get_object()
if request.user != obj.group.owner:
# only group owner can change event status
raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))
log = obj.log_finished(**kwargs)
if not log:
# can't create event logs with status == "mk42.apps.core.constants.STATUS_FINISHED"
# if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist
# if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist
# or log with status == "mk42.apps.core.constants.STATUS_ONGOING" does not exist
raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_FINISHED), }))
return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
def parkings_exception_handler(exc, context):
response = exception_handler(exc, context)
if response is not None:
if isinstance(exc, ParkingException):
response.data['code'] = exc.get_codes()
elif isinstance(exc, PermissionDenied):
response.data['code'] = 'permission_denied'
return response
def get_profile(self):
try:
profile = self.request.user.profile
except exceptions.ObjectDoesNotExist:
raise PermissionDenied(detail='Role incorrect')
return profile
def get_queryset(self):
only_valid = self.request.query_params.get('only_valid', '')
only_valid = only_valid == 'true'
user = self.request.user
try:
queryset = user.parent.coupon_set.all()
except exceptions.ObjectDoesNotExist:
raise PermissionDenied(detail='Role incorrect')
now = timezone.now()
out_time = models.Coupon.OUT_OF_DATE_TIME
if only_valid:
# ??????????
# ????, ????? => ?????, ???????????
queryset = queryset.filter(
expired_at__gt=now,
used=False,
).order_by('-amount', 'expired_at')
else:
# ??????????, ???????
# ????, ????? => ?????
queryset = queryset.filter(
expired_at__gt=now - out_time,
).extra(
# ???????????????
select={'date_diff': 'abs(extract(epoch from (now()-expired_at)))'}
).order_by('date_diff', '-amount')
# ???????
if self.action == 'list':
return sorted(queryset, key=lambda x: x.sort_key())
return queryset
def get_parent(self):
try:
parent = self.request.user.parent
except (AttributeError, exceptions.ObjectDoesNotExist):
raise PermissionDenied(detail='Role incorrect')
return parent
def _enforce_csrf(self, request):
"""Make sure that we have a valid CSRF token.
Django restframework does validate this when using the
SessionAuthentication but since that also checks if the user is
authenticated we can't really use that
"""
reason = CSRFCheck().process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise PermissionDenied('CSRF Failed: %s' % reason)
def has_view_permissions(self, path, method, view):
"""
Return `True` if the incoming request has the correct view permissions.
"""
if view.request is None:
return True
try:
view.check_permissions(view.request)
except (exceptions.APIException, Http404, PermissionDenied):
return False
return True
def get(self, request, *args, **kwargs):
schema = self.schema_generator.get_schema(request, self.public)
if schema is None:
raise exceptions.PermissionDenied()
return Response(schema)
def exception_handler(exc, context):
"""
Returns the response that should be used for any given exception.
By default we handle the REST framework `APIException`, and also
Django's built-in `Http404` and `PermissionDenied` exceptions.
Any unhandled exceptions may return `None`, which will cause a 500 error
to be raised.
"""
if isinstance(exc, exceptions.APIException):
headers = {}
if getattr(exc, 'auth_header', None):
headers['WWW-Authenticate'] = exc.auth_header
if getattr(exc, 'wait', None):
headers['Retry-After'] = '%d' % exc.wait
if isinstance(exc.detail, (list, dict)):
data = exc.detail
else:
data = {'detail': exc.detail}
set_rollback()
return Response(data, status=exc.status_code, headers=headers)
elif isinstance(exc, Http404):
msg = _('Not found.')
data = {'detail': six.text_type(msg)}
set_rollback()
return Response(data, status=status.HTTP_404_NOT_FOUND)
elif isinstance(exc, PermissionDenied):
msg = _('Permission denied.')
data = {'detail': six.text_type(msg)}
set_rollback()
return Response(data, status=status.HTTP_403_FORBIDDEN)
return None
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)
def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
reason = CSRFCheck().process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
def check_blacklist(repo):
"""Check a Docker repository name for collision with deis/* components."""
blacklisted = [ # NOTE: keep this list up to date!
'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
'mesos-marathon', 'mesos-slave', 'zookeeper',
]
if any("deis/{}".format(c) in repo for c in blacklisted):
raise PermissionDenied("Repository name {} is not allowed".format(repo))
def test_pull(self, mock_client):
self.client = DockerClient()
self.client.pull('alpine', '3.2')
docker_pull = self.client.client.pull
docker_pull.assert_called_once_with(
'alpine', tag='3.2', insecure_registry=True, stream=True)
# Test that blacklisted image names can't be pulled
with self.assertRaises(PermissionDenied):
self.client.pull('deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.pull('localhost:5000/deis/controller', 'v1.11.1')
def test_tag(self, mock_client):
self.client = DockerClient()
self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
docker_tag = self.client.client.tag
docker_tag.assert_called_once_with(
'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
# Test that blacklisted image names can't be tagged
with self.assertRaises(PermissionDenied):
self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
def update(self, request, **kwargs):
app = self.get_object()
if request.data.get('owner'):
if self.request.user != app.owner and not self.request.user.is_superuser:
raise PermissionDenied()
new_owner = get_object_or_404(User, username=request.data['owner'])
app.owner = new_owner
app.save()
return Response(status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = request.user
return super(PushHookViewSet, self).create(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
self.user = request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = self.user
super(BuildHookViewSet, self).create(request, *args, **kwargs)
# return the application databag
response = {'release': {'version': app.release_set.latest().version},
'domains': ['.'.join([app.id, settings.DEIS_DOMAIN])]}
return Response(response, status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
config = app.release_set.latest().config
serializer = self.get_serializer(config)
return Response(serializer.data, status=status.HTTP_200_OK)
def destroy(self, request, **kwargs):
app = get_object_or_404(models.App, id=self.kwargs['id'])
user = get_object_or_404(User, username=kwargs['username'])
perm_name = "api.{}".format(self.perm)
if not user.has_perm(perm_name, app):
raise PermissionDenied()
if (user != request.user and
not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
request, self, app)):
raise PermissionDenied()
remove_perm(self.perm, user, app)
models.log_event(app, "User {} was revoked access to {}".format(user, app))
return Response(status=status.HTTP_204_NO_CONTENT)
def check_blacklist(repo):
"""Check a Docker repository name for collision with deis/* components."""
blacklisted = [ # NOTE: keep this list up to date!
'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
'mesos-marathon', 'mesos-slave', 'zookeeper',
]
if any("deis/{}".format(c) in repo for c in blacklisted):
raise PermissionDenied("Repository name {} is not allowed".format(repo))
def test_tag(self, mock_client):
self.client = DockerClient()
self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
docker_tag = self.client.client.tag
docker_tag.assert_called_once_with(
'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
# Test that blacklisted image names can't be tagged
with self.assertRaises(PermissionDenied):
self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
def destroy(self, request, **kwargs):
calling_obj = self.get_object()
target_obj = calling_obj
if request.data.get('username'):
# if you "accidentally" target yourself, that should be fine
if calling_obj.username == request.data['username'] or calling_obj.is_superuser:
target_obj = get_object_or_404(User, username=request.data['username'])
else:
raise PermissionDenied()
target_obj.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
def update(self, request, **kwargs):
app = self.get_object()
if request.data.get('owner'):
if self.request.user != app.owner and not self.request.user.is_superuser:
raise PermissionDenied()
new_owner = get_object_or_404(User, username=request.data['owner'])
app.owner = new_owner
app.save()
return Response(status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = request.user
return super(PushHookViewSet, self).create(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
config = app.release_set.latest().config
serializer = self.get_serializer(config)
return Response(serializer.data, status=status.HTTP_200_OK)
def create(self, request, **kwargs):
app = self.get_object()
if not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
request, self, app):
raise PermissionDenied()
user = get_object_or_404(User, username=request.data['username'])
assign_perm(self.perm, user, app)
models.log_event(app, "User {} was granted access to {}".format(user, app))
return Response(status=status.HTTP_201_CREATED)