Python rest_framework.exceptions 模块,AuthenticationFailed() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.AuthenticationFailed()。
def authenticate_payload(payload):
from rest_framework_sso.models import SessionToken
user_model = get_user_model()
if api_settings.VERIFY_SESSION_TOKEN:
try:
session_token = SessionToken.objects.\
active().\
select_related('user').\
get(pk=payload.get(claims.SESSION_ID), user_id=payload.get(claims.USER_ID))
user = session_token.user
except SessionToken.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
else:
try:
user = user_model.objects.get(pk=payload.get(claims.USER_ID))
except user_model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return user
def authenticate_credentials(self, basic):
try:
user, key = base64.b64decode(basic).decode(HTTP_HEADER_ENCODING).split(':')
token = self.model.objects.get(key=key)
except:
raise exceptions.AuthenticationFailed('Invalid basic auth token')
if not token.user.is_active:
raise exceptions.AuthenticationFailed('User inactive or deleted')
if user:
try:
Domain.objects.get(owner=token.user.pk, name=user)
except:
raise exceptions.AuthenticationFailed('Invalid username')
return token.user, token
def authenticate(self, request):
if request.META['REQUEST_METHOD'] != 'POST':
return None
print 'Request data: {}'.format(request.data)
if 'HTTP_TOKEN' not in request.META:
raise exceptions.ParseError("Registration Token not present in the request.")
elif 'sampling_feature' not in request.data:
raise exceptions.ParseError("Sampling feature UUID not present in the request.")
# Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy.
token = request.META['HTTP_TOKEN']
registration = SiteRegistration.objects.filter(registration_token=token).first()
if not registration:
raise exceptions.PermissionDenied('Invalid Security Token')
# request needs to have the sampling feature uuid of the registration -
if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']:
raise exceptions.AuthenticationFailed(
'Site Identifier is not associated with this Token') # or other related exception
return None
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
if isinstance(exc, (exceptions.NotAuthenticated,
exceptions.AuthenticationFailed)):
# WWW-Authenticate header for 401 responses, else coerce to 403
auth_header = self.get_authenticate_header(self.request)
if auth_header:
exc.auth_header = auth_header
else:
exc.status_code = status.HTTP_403_FORBIDDEN
exception_handler = self.get_exception_handler()
context = self.get_exception_handler_context()
response = exception_handler(exc, context)
if response is None:
self.raise_uncaught_exception(exc)
response.exception = True
return response
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password)
def authenticate_credentials(self, userid, password):
"""
Authenticate the userid and password against username and password.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(**credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != self.keyword.lower().encode():
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate(self, request):
token = self.get_auth_header(request)
#token = "" #Overide token for testing
rehive = Rehive(token)
try:
user = rehive.user.get()
except APIException:
raise exceptions.AuthenticationFailed(_('Invalid user'))
try:
company = Company.objects.get(identifier=user['company'])
except Company.DoesNotExist:
raise exceptions.AuthenticationFailed(_("Inactive company."))
user, created = User.objects.get_or_create(
identifier=uuid.UUID(user['identifier']).hex,
company=company)
return user, token
def authenticate(self, request):
"""Implements a custom authentication scheme using a BTS uuid in the
request and cross references it with a whitelist.
Returns: (user, auth) if authentication succeeds or None if this scheme
is not attempted, and another authentication scheme should be
attempted
Raises: AuthenticationFailed exception if an invalid or a
non-whitelisted BTS uuid is provided
"""
if "number" not in request.GET:
# Not attempting whitelist auth scheme.
return None
try:
query_number = request.GET["number"]
number = Number.objects.get(number=query_number)
if number.network.bypass_gateway_auth:
return (number.network.auth_user, None)
else:
raise AuthenticationFailed("Number authentication failed.")
except Number.DoesNotExist:
raise AuthenticationFailed("Unknown number.")
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload, request.channel)
return (user, jwt_value)
def authenticate_credentials(self, payload, channel):
"""
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get(email=username, channel__slug=channel)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user
def authenticate(self, request):
token_from_get = request.GET.get('token', None)
token_from_headers = self._token_from_request_headers(request)
if token_from_get:
token = token_from_get
elif token_from_headers:
token = token_from_headers
else:
msg = "You must provide an API token to use this server. " \
"You can add `?token=API_TOKEN` at the end of the URL, or use a basic HTTP authentication where user=API_TOKEN with an empty password. " \
"Please check your emails for your API token."
raise exceptions.AuthenticationFailed(msg)
# Grab the API key model, make sure it exists
try:
api_key = ApiKey.objects.get(key=token)
except ApiKey.DoesNotExist:
raise exceptions.AuthenticationFailed("This API token doesn't exist")
return (api_key.account, None)
def _token_from_request_headers(self, request):
auth = authentication.get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
return auth_parts[0]
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and token.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
token = jwt_get_knox_token_from_payload(payload)
if not username or not token:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User inactive or deleted.')
raise exceptions.AuthenticationFailed(msg)
return (user, self.ensure_valid_auth_token(user, token))
def get_jwt_value(request):
auth = get_authorization_header(request).split()
auth_header_prefix = auth0_api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
# Auth0 Metadata --------------------------------------------------------------
def test_check_status_code(self):
"""
Test status codes and exceptions raised.
"""
# Step 1: Status code 200.
self.authentication._check_status_code(200)
# Step 2: Status code 401.
with self.assertRaises(AuthenticationFailed):
self.authentication._check_status_code(401)
# Step 3: Status code 403.
with self.assertRaises(PermissionDenied):
self.authentication._check_status_code(403)
# Step 4: Status code other than tested.
with self.assertRaises(UnavailableException):
self.authentication._check_status_code(500)
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password)
def authenticate_credentials(self, userid, password):
"""
Authenticate the userid and password against username and password.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(**credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'token':
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate_credentials(self, username, secret):
"""
Authenticate the userid and secret against username and secret.
Tightly bound to .backends.APIKeyMunchUserBackend
"""
if username != "api":
raise AuthenticationFailed('Invalid username')
user = authenticate(secret=secret)
if user and user.is_active:
return (user, None)
api_app = APIApplication.objects.filter(
secret=secret, author__is_active=True).first()
if api_app and api_app.author.is_active:
return (api_app.author, None)
raise AuthenticationFailed('Invalid API key')
def authenticate(self, request):
token = request.META.get('HTTP_X_VAULTIER_TOKEN')
if token is None or token == '' or token == 'null':
return None, None
try:
model = Token.objects.get(token=token)
""":type : Token"""
token_renewal_interval = settings.VAULTIER.get(
'authentication_token_renewal_interval')
#convert to seconds
token_renewal_interval *= 60
td = timezone.now() - model.last_used_at
if td.total_seconds() > token_renewal_interval:
model.last_used_at = timezone.now()
model.save()
except Token.DoesNotExist:
raise AuthenticationFailed('Invalid token')
if not model.user.is_active:
raise AuthenticationFailed('User inactive or deleted')
return model.user, token
def verify_google_token(request):
token = request.data.get('token')
if token is None:
raise ValidationError({'detail': 'Auth token required.'})
try:
idinfo = client.verify_id_token(token, settings.GOOGLE_AUTH_KEY)
if idinfo['iss'] not in ['accounts.google.com',
'https://accounts.google.com']:
raise crypt.AppIdentityError('Wrong issuer.')
if idinfo.get('hd') != settings.GOOGLE_AUTH_HOSTED_DOMAIN:
raise crypt.AppIdentityError('Wrong hosted domain.')
except crypt.AppIdentityError as e:
raise AuthenticationFailed(e)
defaults = {
'email': idinfo['email'],
'first_name': idinfo.get('given_name', ''),
'last_name': idinfo.get('family_name', ''),
}
user, created = get_user_model().objects.get_or_create(
username=idinfo['email'], defaults=defaults)
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(request, user)
return Response({})
def _authenticate_credentials(self, request, token):
"""
Try to authenticate the given credentials. If authentication is
successful, return the user and token. If not, throw an error.
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY)
except:
msg = 'Invalid authentication. Could not decode token.'
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get(pk=payload['id'])
except User.DoesNotExist:
msg = 'No user matching this token was found.'
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = 'This user has been deactivated.'
raise exceptions.AuthenticationFailed(msg)
return (user, token)
def authenticate_credentials(self, key):
user, token = super(TokenAuthentication, self).authenticate_credentials(key)
try:
kc_user = KeycloakModel.objects.get(user = user)
# DP ???: Should a user's roles be synced?
if self.user_exist(kc_user.UID):
return (user, token) # regular return for authenticate_credentials()
else:
# Disable the user in Django to shortcut the Keycloak lookup
user.is_active = False
user.save()
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
except KeycloakModel.DoesNotExist:
# Regular Django user account
return (user, token)
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'token':
return None
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = 'Invalid token header. Token string should not contain invalid characters.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def get_user_data(self, token):
user_data = self.fb_api.request(
self,
'me',
{
'access_token': token,
'fields': self.fields,
}
)
if user_data and user_data.get('error'):
error = user_data['error']
msg = error.get('error_msg', 'FB API error')
raise AuthenticationFailed(msg)
if user_data is None:
raise AuthenticationFailed('FB doesn\'t return user data')
user_id = user_data.pop('id')
user_data['user_id'] = str(user_id)
user_data['network'] = 'fb'
return user_data
def get_user_data(self, token):
user_data = self.vk_api.request(self, 'users.get', {
'access_token': token,
'fields': self.fields,
})
if user_data and user_data.get('error'):
error = user_data['error']
msg = error.get('error_msg', 'VK API error')
raise AuthenticationFailed(msg)
if user_data is None:
raise AuthenticationFailed('VK doesn\'t return user data')
try:
user_data = user_data['response'][0]
except IndexError:
raise AuthenticationFailed('VK doesn\'t find user')
user_id = user_data.pop('id')
user_data['user_id'] = str(user_id)
user_data['network'] = 'vk'
return user_data
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'tokenservice':
return None
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
try:
token_key = auth[1].decode()
except UnicodeError:
msg = 'Invalid token header. Token string should not contain invalid characters.'
raise exceptions.AuthenticationFailed(msg)
return self._check_token(token_key)
def authenticate(self, request: Request) -> Tuple[User, Any]:
"""
Get access token from request, try to find user with such access token.
Args:
request: Request with access_token query param / session key.
Returns:
Tuple of user and auth object (e.g. token or None).
"""
access_token = request.query_params.get('access_token', None) or \
request.session.get('access_token', None)
if not access_token:
raise exceptions.AuthenticationFailed('No access token passed')
try:
user = User.objects.get(access_token=access_token)
except User.DoesNotExist:
raise exceptions.AuthenticationFailed('No such user')
return user, None
def authenticate(self, request):
token = self.get_jwt_value(request)
if token is None:
return None, None
try:
idinfo = client.verify_id_token(token, settings.GOOGLE_AUTH_KEY)
if idinfo['iss'] not in ['accounts.google.com',
'https://accounts.google.com']:
raise crypt.AppIdentityError("Wrong issuer.")
if idinfo.get('hd') != settings.GOOGLE_AUTH_HOSTED_DOMAIN:
raise crypt.AppIdentityError("Wrong hosted domain.")
except crypt.AppIdentityError as e:
raise exceptions.AuthenticationFailed(e)
defaults = {
'email': idinfo['email'],
'first_name': idinfo['given_name'],
'last_name': idinfo['family_name'],
}
user, created = get_user_model().objects.get_or_create(
username=idinfo['email'], defaults=defaults,
)
return user, idinfo
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
if api_settings.JWT_PERMANENT_TOKEN_AUTH:
payload = jwt_devices_decode_handler(jwt_value)
else:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _("Signature has expired.")
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _("Error decoding signature.")
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return user, jwt_value
def authenticate(self, request):
"""
Returns a `Person` if a correct access token has been supplied. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'bearer':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = AccessToken.get_token(auth[1].decode())
except (InvalidTokenException, UnicodeDecodeError):
msg = _('Token invalide.')
raise exceptions.AuthenticationFailed(msg)
token.person.role.token = token
return token.person.role, token
def authenticate_credentials(self, userid, password):
if password:
return
try:
key = ApiKey.objects.get_from_cache(key=userid)
except ApiKey.DoesNotExist:
raise AuthenticationFailed('API key is not valid')
if not key.is_active:
raise AuthenticationFailed('Key is disabled')
raven.tags_context({
'api_key': userid,
})
return (AnonymousUser(), key)
def authenticate_credentials(self, userid, password):
try:
pk = ProjectKey.objects.get_from_cache(public_key=userid)
except ProjectKey.DoesNotExist:
return None
if not constant_time_compare(pk.secret_key, password):
return None
if not pk.is_active:
raise AuthenticationFailed('Key is disabled')
if not pk.roles.api:
raise AuthenticationFailed('Key does not allow API access')
return (AnonymousUser(), pk)
def authenticate(self, request):
if not self.get_user_info_url():
logger.warning('The setting OAUTH2_USER_INFO_URL is invalid!')
return None
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'bearer':
return None
if len(auth) == 1:
raise exceptions.AuthenticationFailed('Invalid token header. No credentials provided.')
elif len(auth) > 2:
raise exceptions.AuthenticationFailed('Invalid token header. Token string should not contain spaces.')
return self.authenticate_credentials(auth[1].decode('utf8'))
def authenticate_credentials(self, payload):
"""Get or create an active user with the username contained in the payload."""
username = payload.get('preferred_username') or payload.get('username')
if username is None:
raise exceptions.AuthenticationFailed('JWT must include a preferred_username or username claim!')
else:
try:
user, __ = get_user_model().objects.get_or_create(username=username)
attributes_updated = False
for claim, attr in self.get_jwt_claim_attribute_map().items():
payload_value = payload.get(claim)
if getattr(user, attr) != payload_value and payload_value is not None:
setattr(user, attr, payload_value)
attributes_updated = True
if attributes_updated:
user.save()
except:
msg = 'User retrieval failed.'
logger.exception(msg)
raise exceptions.AuthenticationFailed(msg)
return user
def authenticate_credentials(self, key):
token_cache = 'token_' + key
cache_user = cache.get(token_cache)
if cache_user:
return (cache_user, key)
try:
token = self.model.objects.get(key=key)
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('User does not exist.')
if not token.user.is_active:
raise exceptions.PermissionDenied('The user is forbidden.')
utc_now = timezone.now()
if token.created < utc_now - timezone.timedelta(hours=24 * 30):
raise exceptions.AuthenticationFailed('Token has been expired.')
if token:
token_cache = 'token_' + key
cache.set(token_cache, token.user, 24 * 7 * 60 * 60)
return (token.user, token)
def my_exception_handler(exc, context):
# Call REST framework's default exception handler first,
# to get the standard error response.
response = exception_handler(exc, context)
# Now add the HTTP status code to the response.
# print(exc)
# print(context)
if response is not None:
if isinstance(exc, exceptions.AuthenticationFailed):
response.data['error_code'] = 2
elif isinstance(exc, exceptions.PermissionDenied):
response.data['error_code'] = 3
else:
response.data['error_code'] = 1
return response
def validate(self, data):
user_obj = None
username = data.get("username", None)
password = data.get("password", None)
if not username:
raise exceptions.AuthenticationFailed('A username or email is required to login.')
user = User.objects.filter(username=username)
print(user)
if user.exists():
user_obj = user.first()
else:
raise exceptions.AuthenticationFailed("Incorrect username")
if user_obj:
if not user_obj.check_password(password):
raise exceptions.AuthenticationFailed('Incorrect password. Please try again.')
# data['token'] = Token.objects.create(user=user_obj)
return data
def authenticate(self, request):
auth = authentication.get_authorization_header(request).split()
if not auth or auth[0].lower() != b'token':
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate(self, request):
auth = get_authorization_header(request).split()
authenticate_header = self.authenticate_header(request=request)
if not auth or smart_text(auth[0].lower()) != authenticate_header.lower():
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
try:
payload = decode_jwt_token(token=token)
except jwt.exceptions.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.InvalidKeyError:
msg = _('Unauthorized token signing key.')
raise exceptions.AuthenticationFailed(msg)
except jwt.exceptions.InvalidTokenError:
raise exceptions.AuthenticationFailed()
return self.authenticate_credentials(payload=payload)
def authenticate_credentials(self, key) -> Tuple[User, Token]:
try:
session = CashdeskSession.objects.get(api_token=key)
except CashdeskSession.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token.')
if not session.is_active():
raise exceptions.AuthenticationFailed('Your session has ended.')
if session.cashdesk != detect_cashdesk(self.request):
raise exceptions.AuthenticationFailed(
_('Your token is valid for a different cashdesk. Your IP is: {}').format(get_ip_address(self.request)))
return session.user, session.api_token
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
if not user.is_active:
raise exceptions.AuthenticationFailed(
_('User inactive or deleted.'))
payload = jwt_payload_handler(user)
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(datetime.utcnow().utctimetuple())
token = jwt_encode_handler(payload)
response_data = jwt_response_payload_handler(token, user, request)
return Response(response_data,
status=status.HTTP_200_OK)
def validate(self, attrs):
refresh_token = attrs['refresh_token']
try:
token = RefreshToken.objects.select_related('user').get(
key=refresh_token)
except RefreshToken.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
attrs['user'] = token.user
return attrs
def login(self, username, password):
"""Authenticate a user.
:param str username: Username of the user
:param str password: Password of the user
:raises: exceptions.AuthenticationFailed
"""
data = {
'data': {
'attributes': {
'username': username,
'password': password
},
'type': 'obtain-json-web-tokens',
}
}
response = self.post(reverse('login'), data)
if response.status_code != status.HTTP_200_OK:
raise exceptions.AuthenticationFailed()
self.credentials(
HTTP_AUTHORIZATION='{0} {1}'.format(
api_settings.JWT_AUTH_HEADER_PREFIX,
response.data['token']
)
)
def test_client_login_fails(db):
client = JSONAPIClient()
with pytest.raises(exceptions.AuthenticationFailed):
client.login('someuser', 'invalidpw')
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = 'Invalid basic auth token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid basic auth token header. Basic authentication string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(auth[1])
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using URL parameters. Otherwise returns `None`.
"""
if not 'username' in request.query_params:
msg = 'No username URL parameter provided.'
raise exceptions.AuthenticationFailed(msg)
if not 'password' in request.query_params:
msg = 'No password URL parameter provided.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(request.query_params['username'], request.query_params['password'])
def authenticate_credentials(self, userid, key):
try:
token = self.model.objects.get(key=key)
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token')
if not token.user.is_active:
raise exceptions.AuthenticationFailed('User inactive or deleted')
return token.user, token
def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not token.user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (token.user, token)