一尘不染

结合Flask-restless,Flask-security和常规Python请求

flask

我的目标是为我的Web应用程序提供REST API。使用:

  • Python 2.7.5
  • Flask==0.10.1
  • Flask-Restless==0.13.1
  • Flask-Security==1.7.3

对于Web和REST访问,我都需要保护对数据的访问权限。但是,request尝试连接到安全的API时,我无法获得任何常规的python 成功。

使用此问题结尾处提供的功能齐全的模块可获得以下输出。

使用时,我设法获得正确答案http://127.0.0.1:5000/api/v1/free_stuff:

>>> import requests
>>> r=requests.get('http://127.0.0.1:5000/api/v1/free_stuff')
>>> print 'status:', r.status_code
status: 200  # all is fine

尝试使用进行身份验证时http://127.0.0.1:5000/api/v1/protected_stuff:

>>> from requests.auth import HTTPBasicAuth, HTTPDigestAuth
>>> r=requests.get('http://127.0.0.1:5000/api/v1/protected_stuff', 
                   auth=HTTPBasicAuth('test', 'test')) #  the same with ``HTTPDigestAuth``
>>> print 'status:', r.status_code
status: 401
>>> r.json()  # failed!
{u'message': u'401: Unauthorized'}

这是用于产生上述结果的虚拟功能模块:

from flask import Flask, render_template, url_for, redirect
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.security import Security, SQLAlchemyUserDatastore, \
    UserMixin, RoleMixin, login_required, current_user
from flask.ext.restless import APIManager
from flask.ext.restless import ProcessingException

# Create app
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'super-secret'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'

# Create database connection object
db = SQLAlchemy(app)

# Define Flask-security models
roles_users = db.Table('roles_users',
        db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
        db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))

class Role(db.Model, RoleMixin):
    id = db.Column(db.Integer(), primary_key=True)
    name = db.Column(db.String(80), unique=True)
    description = db.Column(db.String(255))

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), unique=True)
    password = db.Column(db.String(255))
    active = db.Column(db.Boolean())
    confirmed_at = db.Column(db.DateTime())
    roles = db.relationship('Role', secondary=roles_users,
                            backref=db.backref('users', lazy='dynamic'))
#Some additional stuff to query over...
class SomeStuff(db.Model):
    __tablename__ = 'somestuff'
    id = db.Column(db.Integer, primary_key=True)
    data1 = db.Column(db.Integer)
    data2 = db.Column(db.String(10))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
    user = db.relationship(User, lazy='joined', join_depth=1, viewonly=True)

# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)

# API
def auth_func(**kw):
    #import ipdb; ipdb.set_trace()
    if not current_user.is_authenticated():
        raise ProcessingException(description='Not authenticated!',
                code=401)
    return True
apimanager = APIManager(app, flask_sqlalchemy_db=db)

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    collection_name='free_stuff',
    include_columns=['data1', 'data2', 'user_id'])

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),
    collection_name='protected_stuff',
    include_columns=['data1', 'data2', 'user_id'])

# Create a user to test with
@app.before_first_request
def create_user():
    db.create_all()
    user_datastore.create_user(email='test', password='test')
    user_datastore.create_user(email='test2', password='test2')
    ###
    stuff = SomeStuff(data1=2, data2='toto', user_id=1)
    db.session.add(stuff)
    stuff = SomeStuff(data1=5, data2='titi', user_id=1)
    db.session.add(stuff)
    db.session.commit()

# Views
@app.route('/')
@login_required
def home():
    return render_template('index.html')

@app.route('/logout/')
def log_out():
    logout_user()
    return redirect(request.args.get('next') or '/')


if __name__ == '__main__':
    app.run()

任何想法?

[edit]要通过 Web界面充分发挥作用,您需要一个templates至少包含以下login.html文件的子文件夹:

{% block body %}
  <form action="" method=post class="form-horizontal">
    <h2>Signin to FlaskLogin(Todo) Application </h2>
    <div class="control-group">
        <div class="controls">
          <input type="text" id="username" name="username" class="input-xlarge"
            placeholder="Enter Username" required>
        </div>
    </div>

    <div class="control-group">
        <div class="controls">
          <input type="password" id="password" name="password" class="input-xlarge"
            placeholder="Enter Password" required>
        </div>
    </div>

    <div class="control-group">
        <div class="controls">
          <button type="submit" class="btn btn-success">Signin</button>
        </div>
    </div>  
  </form>
{% endblock %}

阅读 574

收藏
2020-04-07

共1个答案

一尘不染

这是我修改的最小示例:

from flask import Flask, render_template, request, url_for, redirect
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.security import Security, SQLAlchemyUserDatastore, \
    UserMixin, RoleMixin, login_required, current_user, logout_user
from flask.ext.restless import APIManager
from flask.ext.restless import ProcessingException
from flask.ext.login import user_logged_in
# JWT imports
from datetime import timedelta
from flask_jwt import JWT, jwt_required

# Create app
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'super-secret'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'
# expiration delay for tokens (here is one minute)
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=60)

# Create database connection object
db = SQLAlchemy(app)

# creates the JWT Token authentication  ======================================
jwt = JWT(app)
@jwt.authentication_handler
def authenticate(username, password):
    user = user_datastore.find_user(email=username)
    print '%s vs. %s' % (username, user.email)
    if username == user.email and password == user.password:
        return user
    return None

@jwt.user_handler
def load_user(payload):
    user = user_datastore.find_user(id=payload['user_id'])
    return user

# Define Flask-security models ===============================================
roles_users = db.Table('roles_users',
        db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
        db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))

class Role(db.Model, RoleMixin):
    id = db.Column(db.Integer(), primary_key=True)
    name = db.Column(db.String(80), unique=True)
    description = db.Column(db.String(255))

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), unique=True)
    password = db.Column(db.String(255))
    active = db.Column(db.Boolean())
    confirmed_at = db.Column(db.DateTime())
    roles = db.relationship('Role', secondary=roles_users,
                            backref=db.backref('users', lazy='dynamic'))
#Some additional stuff to query over...
class SomeStuff(db.Model):
    __tablename__ = 'somestuff'
    id = db.Column(db.Integer, primary_key=True)
    data1 = db.Column(db.Integer)
    data2 = db.Column(db.String(10))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
    user = db.relationship(User, lazy='joined', join_depth=1, viewonly=True)
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)

# Flask-Restless API ==========================================================
@jwt_required()
def auth_func(**kw):
    return True

apimanager = APIManager(app, flask_sqlalchemy_db=db)

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    collection_name='free_stuff',
    include_columns=['data1', 'data2', 'user_id'])

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),
    collection_name='protected_stuff',
    include_columns=['data1', 'data2', 'user_id'])

# Create some users to test with
@app.before_first_request
def create_user():
    db.create_all()
    user_datastore.create_user(email='test', password='test')
    user_datastore.create_user(email='test2', password='test2')
    ###
    stuff = SomeStuff(data1=2, data2='toto', user_id=1)
    db.session.add(stuff)
    stuff = SomeStuff(data1=5, data2='titi', user_id=1)
    db.session.add(stuff)
    db.session.commit()

# Views
@app.route('/')
@login_required
def home():
    print(request.headers)
    return render_template('index.html')

@app.route('/logout/')
def log_out():
    logout_user()
    return redirect(request.args.get('next') or '/')

if __name__ == '__main__':
    app.run()

然后,与它交互通过 requests

>>>  import requests, json   
>>>  r=requests.get('http://127.0.0.1:5000/api/v1/free_stuff')  # this is OK   
>>>  print 'status:', r.status_code
status: 200   
>>>  r=requests.get('http://127.0.0.1:5000/api/v1/protected_stuff')  # this should fail   
>>>  print 'status:', r.status_code
status: 401   
>>>  print r.json()
{u'status_code': 401, 
u'description': u'Authorization header was missing', 
u'error':    u'Authorization Required'}   
>>>  # Authenticate and retrieve Token   
>>>  r = requests.post('http://127.0.0.1:5000/auth', 
...:                   data=json.dumps({'username': 'test', 'password': 'test'}),
...:                   headers={'content-type': 'application/json'}
...:                   )   
>>>  print 'status:', r.status_code
status: 200   
>>>  token = r.json()['token']   
>>>  # now we have the token, we can navigate to restricted area:   
>>>  r = requests.get('http://127.0.0.1:5000/api/v1/protected_stuff', 
...:                   headers={'Authorization': 'Bearer %s' % token})   
>>>  print 'status:', r.status_code
status: 200 
2020-04-07