我的目标是为我的Web应用程序提供REST API。使用:
对于Web和REST访问,我都需要保护对数据的访问权限。但是,request尝试连接到安全的API时,我无法获得任何常规的python 成功。
使用此问题结尾处提供的功能齐全的模块可获得以下输出。
使用时,我设法获得正确答案http://127.0.0.1:5000/api/v1/free_stuff:
http://127.0.0.1:5000/api/v1/free_stuff:
>>> import requests >>> r=requests.get('http://127.0.0.1:5000/api/v1/free_stuff') >>> print 'status:', r.status_code status: 200 # all is fine
尝试使用进行身份验证时http://127.0.0.1:5000/api/v1/protected_stuff:
http://127.0.0.1:5000/api/v1/protected_stuff:
>>> from requests.auth import HTTPBasicAuth, HTTPDigestAuth >>> r=requests.get('http://127.0.0.1:5000/api/v1/protected_stuff', auth=HTTPBasicAuth('test', 'test')) # the same with ``HTTPDigestAuth`` >>> print 'status:', r.status_code status: 401 >>> r.json() # failed! {u'message': u'401: Unauthorized'}
这是用于产生上述结果的虚拟功能模块:
from flask import Flask, render_template, url_for, redirect from flask.ext.sqlalchemy import SQLAlchemy from flask.ext.security import Security, SQLAlchemyUserDatastore, \ UserMixin, RoleMixin, login_required, current_user from flask.ext.restless import APIManager from flask.ext.restless import ProcessingException # Create app app = Flask(__name__) app.config['DEBUG'] = True app.config['SECRET_KEY'] = 'super-secret' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://' # Create database connection object db = SQLAlchemy(app) # Define Flask-security models roles_users = db.Table('roles_users', db.Column('user_id', db.Integer(), db.ForeignKey('user.id')), db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))) class Role(db.Model, RoleMixin): id = db.Column(db.Integer(), primary_key=True) name = db.Column(db.String(80), unique=True) description = db.Column(db.String(255)) class User(db.Model, UserMixin): id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(255), unique=True) password = db.Column(db.String(255)) active = db.Column(db.Boolean()) confirmed_at = db.Column(db.DateTime()) roles = db.relationship('Role', secondary=roles_users, backref=db.backref('users', lazy='dynamic')) #Some additional stuff to query over... class SomeStuff(db.Model): __tablename__ = 'somestuff' id = db.Column(db.Integer, primary_key=True) data1 = db.Column(db.Integer) data2 = db.Column(db.String(10)) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) user = db.relationship(User, lazy='joined', join_depth=1, viewonly=True) # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore) # API def auth_func(**kw): #import ipdb; ipdb.set_trace() if not current_user.is_authenticated(): raise ProcessingException(description='Not authenticated!', code=401) return True apimanager = APIManager(app, flask_sqlalchemy_db=db) apimanager.create_api(SomeStuff, methods=['GET', 'POST', 'DELETE', 'PUT'], url_prefix='/api/v1', collection_name='free_stuff', include_columns=['data1', 'data2', 'user_id']) apimanager.create_api(SomeStuff, methods=['GET', 'POST', 'DELETE', 'PUT'], url_prefix='/api/v1', preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]), collection_name='protected_stuff', include_columns=['data1', 'data2', 'user_id']) # Create a user to test with @app.before_first_request def create_user(): db.create_all() user_datastore.create_user(email='test', password='test') user_datastore.create_user(email='test2', password='test2') ### stuff = SomeStuff(data1=2, data2='toto', user_id=1) db.session.add(stuff) stuff = SomeStuff(data1=5, data2='titi', user_id=1) db.session.add(stuff) db.session.commit() # Views @app.route('/') @login_required def home(): return render_template('index.html') @app.route('/logout/') def log_out(): logout_user() return redirect(request.args.get('next') or '/') if __name__ == '__main__': app.run()
任何想法?
[edit]要通过 Web界面充分发挥作用,您需要一个templates至少包含以下login.html文件的子文件夹:
{% block body %} <form action="" method=post class="form-horizontal"> <h2>Signin to FlaskLogin(Todo) Application </h2> <div class="control-group"> <div class="controls"> <input type="text" id="username" name="username" class="input-xlarge" placeholder="Enter Username" required> </div> </div> <div class="control-group"> <div class="controls"> <input type="password" id="password" name="password" class="input-xlarge" placeholder="Enter Password" required> </div> </div> <div class="control-group"> <div class="controls"> <button type="submit" class="btn btn-success">Signin</button> </div> </div> </form> {% endblock %}
这是我修改的最小示例:
from flask import Flask, render_template, request, url_for, redirect from flask.ext.sqlalchemy import SQLAlchemy from flask.ext.security import Security, SQLAlchemyUserDatastore, \ UserMixin, RoleMixin, login_required, current_user, logout_user from flask.ext.restless import APIManager from flask.ext.restless import ProcessingException from flask.ext.login import user_logged_in # JWT imports from datetime import timedelta from flask_jwt import JWT, jwt_required # Create app app = Flask(__name__) app.config['DEBUG'] = True app.config['SECRET_KEY'] = 'super-secret' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://' # expiration delay for tokens (here is one minute) app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=60) # Create database connection object db = SQLAlchemy(app) # creates the JWT Token authentication ====================================== jwt = JWT(app) @jwt.authentication_handler def authenticate(username, password): user = user_datastore.find_user(email=username) print '%s vs. %s' % (username, user.email) if username == user.email and password == user.password: return user return None @jwt.user_handler def load_user(payload): user = user_datastore.find_user(id=payload['user_id']) return user # Define Flask-security models =============================================== roles_users = db.Table('roles_users', db.Column('user_id', db.Integer(), db.ForeignKey('user.id')), db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))) class Role(db.Model, RoleMixin): id = db.Column(db.Integer(), primary_key=True) name = db.Column(db.String(80), unique=True) description = db.Column(db.String(255)) class User(db.Model, UserMixin): id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(255), unique=True) password = db.Column(db.String(255)) active = db.Column(db.Boolean()) confirmed_at = db.Column(db.DateTime()) roles = db.relationship('Role', secondary=roles_users, backref=db.backref('users', lazy='dynamic')) #Some additional stuff to query over... class SomeStuff(db.Model): __tablename__ = 'somestuff' id = db.Column(db.Integer, primary_key=True) data1 = db.Column(db.Integer) data2 = db.Column(db.String(10)) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) user = db.relationship(User, lazy='joined', join_depth=1, viewonly=True) # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore) # Flask-Restless API ========================================================== @jwt_required() def auth_func(**kw): return True apimanager = APIManager(app, flask_sqlalchemy_db=db) apimanager.create_api(SomeStuff, methods=['GET', 'POST', 'DELETE', 'PUT'], url_prefix='/api/v1', collection_name='free_stuff', include_columns=['data1', 'data2', 'user_id']) apimanager.create_api(SomeStuff, methods=['GET', 'POST', 'DELETE', 'PUT'], url_prefix='/api/v1', preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]), collection_name='protected_stuff', include_columns=['data1', 'data2', 'user_id']) # Create some users to test with @app.before_first_request def create_user(): db.create_all() user_datastore.create_user(email='test', password='test') user_datastore.create_user(email='test2', password='test2') ### stuff = SomeStuff(data1=2, data2='toto', user_id=1) db.session.add(stuff) stuff = SomeStuff(data1=5, data2='titi', user_id=1) db.session.add(stuff) db.session.commit() # Views @app.route('/') @login_required def home(): print(request.headers) return render_template('index.html') @app.route('/logout/') def log_out(): logout_user() return redirect(request.args.get('next') or '/') if __name__ == '__main__': app.run()
然后,与它交互通过 requests:
requests
>>> import requests, json >>> r=requests.get('http://127.0.0.1:5000/api/v1/free_stuff') # this is OK >>> print 'status:', r.status_code status: 200 >>> r=requests.get('http://127.0.0.1:5000/api/v1/protected_stuff') # this should fail >>> print 'status:', r.status_code status: 401 >>> print r.json() {u'status_code': 401, u'description': u'Authorization header was missing', u'error': u'Authorization Required'} >>> # Authenticate and retrieve Token >>> r = requests.post('http://127.0.0.1:5000/auth', ...: data=json.dumps({'username': 'test', 'password': 'test'}), ...: headers={'content-type': 'application/json'} ...: ) >>> print 'status:', r.status_code status: 200 >>> token = r.json()['token'] >>> # now we have the token, we can navigate to restricted area: >>> r = requests.get('http://127.0.0.1:5000/api/v1/protected_stuff', ...: headers={'Authorization': 'Bearer %s' % token}) >>> print 'status:', r.status_code status: 200