Python flask.request 模块,authorization() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.authorization()。
def requires_auth(function):
""" Bind against LDAP to validate users credentials """
@wraps(function)
def decorated(*args, **kwargs):
logger = logging.getLogger("ca_server")
auth = request.authorization
ldap = LdapClient()
if not auth or not ldap.check_auth(auth.username, auth.password):
output = DataBag()
output.set_error("Access denied")
if hasattr(auth, "username"):
logger.info("requires_auth: access denied {}".format(auth.username))
else:
logger.info("requires_auth: access denied")
return output.get_json()
return function(auth.username)
return decorated
def check_auth():
session = None
user = None
token = request.headers.get('X-Auth-Token')
if token:
session = Session.query.filter_by(token=token).first()
if not session:
return make_error_response('Invalid session token', 401)
user = session.user
else:
auth = request.authorization
if auth:
user = User.find_by_email_or_username(auth.username)
if not (user and user.password == auth.password):
return make_error_response('Invalid username/password combination', 401)
g.current_session = session
g.current_user = user
def add_basic_auth(blueprint, username, password, realm='RQ Scheduler Dashboard'):
"""Add HTTP Basic Auth to a blueprint.
Note this is only for casual use!
"""
@blueprint.before_request
def basic_http_auth(*args, **kwargs):
auth = request.authorization
if (
auth is None
or auth.password != password
or auth.username != username):
return Response(
'Please login',
401,
{'WWW-Authenticate': 'Basic realm="{}"'.format(realm)})
def _template_rendering(template):
def decorator(fn):
@wraps(fn)
def inner_fn(*args, **kwargs):
data = fn(*args, **kwargs)
auth = request.authorization
basic_auth = '' if not auth else base64.b64encode(bytes(':'.join([auth.username, auth.password]), 'utf-8')).decode('utf-8')
data.update({
'basic_auth': basic_auth,
'base_url': g.cn.g_('app_config').get('base_url'),
'ecs_clusters': g.cn.f_('aws.get_ecs_clusters', region=g.cn.g_('app_config').get('ecs_region')),
'selected_ecs_cluster': g.cn.g_('session').get('selected_ecs_cluster')
})
return render_template(template, **data)
return inner_fn
return decorator
def login_required(self, f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
# We need to ignore authentication headers for OPTIONS to avoid
# unwanted interactions with CORS.
# Chrome and Firefox issue a preflight OPTIONS request to check
# Access-Control-* headers, and will fail if it returns 401.
if request.method != 'OPTIONS':
if auth:
password = self.get_password_callback(auth.username)
else:
password = None
if not self.authenticate(auth, password):
return self.auth_error_callback()
return f(*args, **kwargs)
return decorated
def require_token(func):
""" verifies the uuid/token combo of the given account. account type can be:
customer, fox, merchant """
@wraps(func)
def decorator(*args, **kwargs):
if request.authorization:
uuid = request.authorization.username
token = request.authorization.password
try:
manager = SessionManager()
valid = manager.verify(uuid, token)
if not valid:
return UnauthorizedResponseJson().make_response()
except Exception as e:
traceback.print_exc()
return ExceptionResponseJson("unable to validate credentials", e).make_response()
else:
return UnauthorizedResponseJson().make_response()
return func(*args, **kwargs)
return decorator
def require_password(func):
""" verifies the given username/password combo """
@wraps(func)
def decorator(*args, **kwargs):
if request.authorization:
username = request.authorization.username
password = request.authorization.password
try:
manager = AccountManager()
valid = manager.verify_account(username, password)
if not valid:
return UnauthorizedResponseJson().make_response()
except Exception as e:
traceback.print_exc()
return ExceptionResponseJson("unable to validate credentials", e).make_response()
else:
return UnauthorizedResponseJson().make_response()
return func(*args, **kwargs)
return decorator
def requires_admin(f): # pragma: no cover
"""Decorator to define endpoints that requires Basic Auth.
Args:
f (func): An route function.
Returns:
response (object): 401 if unauthorized.
f (func): The route to call.
"""
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def login_required(self, f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
# We need to ignore authentication headers for OPTIONS to avoid
# unwanted interactions with CORS.
# Chrome and Firefox issue a preflight OPTIONS request to check
# Access-Control-* headers, and will fail if it returns 401.
if request.method != 'OPTIONS':
if auth:
password = self.get_password_callback(auth.username)
else:
password = None
if not self.authenticate(auth, password):
return self.auth_error_callback()
return f(*args, **kwargs)
return decorated
def basic_authenticate(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_app.config.get("USE_AUTH"):
return f(*args, **kwargs)
if not getattr(f, 'basic_authenticate', True):
return f(*args, **kwargs)
auth = request.authorization
if auth and auth.username and auth.password != '':
password = current_app.config.get("{}_PASSWORD".format(auth.username.upper()))
if auth.password == password:
role = AUTH_ROLES[auth.username]
logging.debug("Authenticated '{}' with role '{}' via basic auth".format(auth.username, role))
current_app.auth_role = role
return f(*args, **kwargs)
return abort(401)
return decorated
def login_required(self, f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
# We need to ignore authentication headers for OPTIONS to avoid
# unwanted interactions with CORS.
# Chrome and Firefox issue a preflight OPTIONS request to check
# Access-Control-* headers, and will fail if it returns 401.
if request.method != 'OPTIONS':
if auth:
password = self.get_password_callback(auth.username)
else:
password = None
if not self.authenticate(auth, password):
return self.auth_error_callback()
return f(*args, **kwargs)
return decorated
def put(self):
"""
Queue the specific pipeline
"""
data = request.get_json(force=True)
config = data.get('config')
user = auth_get_username(request.authorization, data.get('user'))
errors = None # Pipeline.validate_config(config, user)
if not errors:
config = Pipeline.load_cfg(config)
# Get id from DB
db_info = dbmodel.PipelineDb(config['name'], config, Pipeline.ordered_steps(config), user)
config['run_id'] = db_info.run_id
ut.pretty_print("Submitting pipeline %s (ID %d) for user %s" % (config['label'], config['run_id'], user))
return pm.add_pipeline(config, user)
else:
return errors, 400
def get_blender_id_oauth_token() -> str:
"""Returns the Blender ID auth token, or an empty string if there is none."""
from flask import request
token = session.get('blender_id_oauth_token')
if token:
if isinstance(token, (tuple, list)):
# In a past version of Pillar we accidentally stored tuples in the session.
# Such sessions should be actively fixed.
# TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts
# the token value to a string and use that instead.
token = token[0]
session['blender_id_oauth_token'] = token
return token
if request.authorization and request.authorization.username:
return request.authorization.username
if current_user.is_authenticated and current_user.id:
return current_user.id
return ''
def requires_basic_auth(f):
"""
wrapper function to check authentication credentials are valid
"""
@wraps(f)
def decorated(*args, **kwargs):
# check credentials supplied in the http request are valid
auth = request.authorization
if not auth:
return jsonify(message="Missing credentials"), 401
if (auth.username != settings.config.api_user or
auth.password != settings.config.api_password):
return jsonify(message="username/password mismatch with the "
"configuration file"), 401
return f(*args, **kwargs)
return decorated
def get(self, id_, type_):
"""GET object with id = id_ from the database."""
if get_authentication():
if request.authorization is None:
return failed_authentication()
else:
auth = check_authorization(request, get_session())
if auth is False:
return failed_authentication()
class_type = get_doc().collections[type_]["collection"].class_.title
if checkClassOp(class_type, "GET"):
try:
response = crud.get(id_, class_type, api_name=get_api_name(), session=get_session())
return set_response_headers(jsonify(hydrafy(response)))
except Exception as e:
status_code, message = e.get_HTTP()
return set_response_headers(jsonify(message), status_code=status_code)
abort(405)
def delete(self, id_, type_):
"""Delete object with id=id_ from database."""
if get_authentication():
if request.authorization is None:
return failed_authentication()
else:
auth = check_authorization(request, get_session())
if auth is False:
return failed_authentication()
class_type = get_doc().collections[type_]["collection"].class_.title
if checkClassOp(class_type, "DELETE"):
try:
crud.delete(id_, class_type, session=get_session())
response = {"message": "Object with ID %s successfully deleted" % (id_)}
return set_response_headers(jsonify(response))
except Exception as e:
status_code, message = e.get_HTTP()
return set_response_headers(jsonify(message), status_code=status_code)
abort(405)
def delete(self, type_):
"""Delete a non Collection class item."""
if get_authentication():
if request.authorization is None:
return failed_authentication()
else:
auth = check_authorization(request, get_session())
if auth is False:
return failed_authentication()
if checkEndpoint("DELETE", type_):
# No Delete Operation for collections
if type_ in get_doc().parsed_classes and type_+"Collection" not in get_doc().collections:
try:
crud.delete_single(type_, session=get_session())
response = {"message": "Object successfully deleted"}
return set_response_headers(jsonify(response))
except Exception as e:
status_code, message = e.get_HTTP()
return set_response_headers(jsonify(message), status_code=status_code)
abort(405)
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def require(self, users=(), roles=(), test_auth=None, test_method=None):
"""
Authenticates/authorizes a request based on the content of the
request.authorization parameter.
users -- users permitted to call this method
roles -- roles permitted to call this method
test_auth -- credentials only for testing
test_method -- method only for testing
"""
users = self._process_targets(users)
roles = self._process_targets(roles)
def loaded_decorated(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = test_auth or request.authorization
method = test_method.upper() if test_method else request.method
authenticated = auth and (auth.username in self.users) and \
self.users[auth.username] == auth.password
if not authenticated:
return self.no_authentication()
allowed_users = users[method] if users else None
allowed_roles = roles[method] if roles else None
if allowed_users or allowed_roles:
auth_as_user = auth.username in allowed_users
auth_as_role = allowed_roles & self.roles[auth.username]
if not auth_as_user and not auth_as_role:
return self.no_authorization()
return f(*args, **kwargs)
return decorated
return loaded_decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
if _globals.get('test'):
return f(*args, **kwargs)
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
"""Wrapper function."""
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]:
""" Given a function returns one that requires basic auth """
@wraps(func)
def decorator():
auth = request.authorization
if auth and validAuth(auth.username, auth.password):
return func()
return authFailure()
return decorator
def login_required(func):
"""
desc: ???????
"""
@wraps(func)
def _decorator_func(*args, **kwargs):
if request.authorization is None:
content_type = request.headers["Content-Type"]
if "application/x-www-form-urlencoded" in content_type:
data = request.form.to_dict()
elif "application/json" in content_type:
data = request.get_json()
elif "multipart/form-data" in content_type:
data = request.get_json()
if data is None:
data = request.form.to_dict()
else:
raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"????"}))
if not isinstance(data, dict):
raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???json????"}))
token = data.get("token", None)
if token is None:
raise error_handlers.MissToken(
http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???????token"})
)
else:
token = request.authorization["username"]
g.user = User.verify_auth_token(token)
# ????
identity = g.cache.get(token)
if identity is not None:
g.identity = pickle.loads(identity)
return func(*args, **kwargs)
return _decorator_func
def api_generic(request, request_processor, app, resource):
req = {}
req['method'] = request.method
req['resource'] = resource
req['json'] = request.json
req['params'] = {}
params = {}
for k, v in request.args.iteritems():
try:
in_json = json.loads(v)
if type(in_json) is dict:
params[k] = in_json
else:
params[k] = str(in_json)
except:
params[k] = v
req['params'] = params
auth_form = {}
if request.authorization:
auth_form['user'] = request.authorization.username
auth_form['password'] = request.authorization.password
else:
if 'X-User' in request.headers:
auth_form['user'] = request.headers['X-User']
if 'X-Token' in request.headers:
auth_form['token'] = request.headers['X-Token']
req['auth_form'] = auth_form
response = request_processor(app, req)
if 'errors' in response:
return jsonify({'errors': response['errors']}), 400
elif request.method == 'GET':
return jsonify({resource: response['json']}), response['status']
return jsonify(response['json']), response['status']
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
# import the user created module
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
api_key = request.headers.get("API_KEY") or request.args.get("API_KEY")
if (API_KEY is not None) and api_key == API_KEY:
return f(*args, **kwargs)
if (AUTH_USER is not None) and (not auth or not check_auth(
auth.username, auth.password)):
return authenticate()
return f(*args, **kwargs)
return decorated
def username(self):
if not request.authorization:
return ""
return request.authorization.username
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def token():
if request.authorization is not None:
username = request.authorization.get('username', None)
passwd = request.authorization.get('password', None)
if username is not None and passwd is not None:
user = User.query.filter(User.username_iequal(username)).first()
if user is None or user.deleted:
pass
elif not user.active:
raise APIError("User '{0}' is blocked".format(username), 403)
elif user.verify_password(passwd):
return jsonify({'status': 'OK', 'token': user.get_token()})
raise APIError('Username or password invalid', 401)
raise APIError('You are not authorized to access the resource', 401)
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
logger.info("{0} {1} {2} {3}".format(request.remote_addr, request.method, request.url, str(request.args)))
logger.debug("data: {0}".format(str(request.form)))
auth = request.authorization
logger.debug('Check_auth: ' + str(auth))
if not auth or not check_auth(auth.username, auth.password):
logger.warning("Unauthorized.")
return {'error' : 'Unauthorized.'}, 401
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
logger.info("{0} {1} {2} {3}".format(request.remote_addr, request.method, request.url, str(request.args)))
logger.debug("data: {0}".format(str(request.form)))
auth = request.authorization
logger.debug('Check_auth: ' + str(auth))
if not auth or not check_auth(auth.username, auth.password):
logger.warning("Unauthorized.")
return {'error' : 'Unauthorized.'}, 401
return f(*args, **kwargs)
return decorated
def requires_authentication(function):
"""Creates a decorator that can be applied with @requires_authentication
to protect an API endpoint."""
@wraps(function)
def decorated(*args, **kwargs):
"""Checks for authorization headers. Tells the user to authenticate
if none are found."""
auth = request.authorization
if not auth or not check_authentication(auth.username, auth.password):
return authenticate()
return function(*args, **kwargs)
return decorated
def require_auth_header(func):
""" no auth check is performed but the auth headers are still required. auth check
is performed by a service other than ourselves """
@wraps(func)
def decorator(*args, **kwargs):
if not request.authorization:
return ErrorResponseJson("auth header required").make_response()
return func(*args, **kwargs)
return decorator
def after_request(response):
""" called after every request """
# log the endpoint hit and any errors
delta = int((time.time() - g.start_time) * 1000)
start_utc = datetime.datetime.utcfromtimestamp(g.start_time)
username = request.authorization.username if request.authorization else None
err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None
Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method,
response.status_code, err_msg)
return response
def verify_user(self, require_admin=True, require_credentials=True):
auth = request.authorization
if not auth:
return False
username = auth.username
password = auth.password
user = self._mongo.db['users'].find_one({'username': username})
if not user:
return False
result = False
ip = get_ip()
if not require_credentials:
result = self._verify_user_by_token(user, password, ip)
if not result and self._is_blocked_temporarily(username):
return False
if not result:
result = _verify_user_by_credentials(user, password)
if not result:
self._add_block_entry(username)
return False
if not require_admin or user['is_admin']:
return True
return False
def _is_blocked_temporarily(self, username):
num_login_attempts = self._config.defaults['authorization']['num_login_attempts']
block_for_seconds = self._config.defaults['authorization']['block_for_seconds']
self._mongo.db['block_entries'].delete_many({'timestamp': {'$lt': time() - block_for_seconds}})
block_entries = list(self._mongo.db['block_entries'].find({'username': username}))
if len(block_entries) > num_login_attempts:
return True
return False
def issue_token(self):
salt = urandom(16)
kdf = _kdf(salt)
token = generate_secret()
username = request.authorization.username
ip = get_ip()
self._mongo.db['tokens'].insert_one({
'username': username,
'ip': ip,
'salt': salt,
'token': kdf.derive(token.encode('utf-8')),
'timestamp': time()
})
return token
def _verify_user_by_token(self, user, token, ip):
tokens_valid_for_seconds = self._config.defaults['authorization']['tokens_valid_for_seconds']
self._mongo.db['tokens'].delete_many({'timestamp': {'$lt': time() - tokens_valid_for_seconds}})
cursor = self._mongo.db['tokens'].find(
{'username': user['username'], 'ip': ip},
{'token': 1, 'salt': 1}
)
for c in cursor:
try:
kdf = _kdf(c['salt'])
kdf.verify(token.encode('utf-8'), c['token'])
return True
except:
pass
return False
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def getCurrentUser(request):
auth = request.authorization
if not auth:
return None
token = auth.username
return User.verify_auth_token(token)
def requires_passcode(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if config.get('web_passcode') and not session.get('allowed') and (
not request.authorization or request.authorization.username != config['web_passcode']):
return redirect(url_for('login'))
return func(*args, **kwargs)
return wrapper
def ensure_admin_authenticated():
auth = request.authorization
if not auth or auth.username != config.admin_username or auth.password != config.admin_password:
return Response('401 Unauthorized', 401, {
'WWW-Authenticate': 'Basic realm="Login Required"'
})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
sl = SettingLoader()
settings = sl.settings
if settings.rest_api.password_protected:
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
#####################
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def username(self):
if not request.authorization:
return ""
return request.authorization.username