Python flask.request 模块,args() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.args()。
def authorize_view(self):
"""Flask view that starts the authorization flow.
Starts flow by redirecting the user to the OAuth2 provider.
"""
args = request.args.to_dict()
# Scopes will be passed as mutliple args, and to_dict() will only
# return one. So, we use getlist() to get all of the scopes.
args['scopes'] = request.args.getlist('scopes')
return_url = args.pop('return_url', None)
if return_url is None:
return_url = request.referrer or '/'
flow = self._make_flow(return_url=return_url, **args)
auth_url = flow.step1_get_authorize_url()
return redirect(auth_url)
def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def index():
"""Primary index function.
This function handles searching and the main page. If ``q`` is passed in a query
string, e.g. ``http://localhost?q=gabriel+dropout``, then a search will be performed.
If request path is ``search``, e.g. ``http://localhost/search``, then the navigation
menu will not be rendered.
Should there be no shows returned from the backend, ``front_end.do_first_time_setup``
will be called to scrape shows from the source.
Returns:
A rendered template, either ``first_time.html`` for the first run or ``default.html`` otherwise.
"""
log.debug("Entering index, attempting to get shows.")
watching, airing, specials, movies = fe.get_shows_for_display(request.args.get('q',None))
standalone = True if request.path.strip('/') == 'search' else False
logged_in = fe.check_login_id(escape(session['logged_in'])) if 'logged_in' in session else False
if not watching and not airing and not specials and not movies:
log.debug("No shows found in any category. Starting first time startup.")
fe.do_first_time_setup()
return render_template('first_time.html', logged_in=logged_in)
return render_template('default.html', watching=watching, airing=airing, specials=specials, movies=movies, standalone=standalone, logged_in=logged_in)
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star, trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def drop_show():
"""Show removal handler.
Attempts to remove a show from the backend system. The show ID must
be passed in the ``id`` query string. If the user if unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
An HTTP redirect to the home page, to refresh.
"""
log.debug("Entering drop_show, trying to remove show from list.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.remove_show(request.args['id'])
log.debug("Refreshing user's page.")
return redirect('/')
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def setup(args, pipeline, runmod, injector):
"""Load configuration"""
logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
_globals['gransk'] = gransk.api.API(injector)
_globals['config'] = _globals['gransk'].config
if pipeline:
_globals['gransk'].pipeline = pipeline
if _globals['gransk'].pipeline.get_service('related_entities'):
_globals['gransk'].pipeline.get_service('related_entities').load_all(_globals['config'])
if _globals['gransk'].pipeline.get_service('related_documents'):
_globals['gransk'].pipeline.get_service('related_documents').load_all(_globals['config'])
def authorize_view(self):
"""Flask view that starts the authorization flow.
Starts flow by redirecting the user to the OAuth2 provider.
"""
args = request.args.to_dict()
# Scopes will be passed as mutliple args, and to_dict() will only
# return one. So, we use getlist() to get all of the scopes.
args['scopes'] = request.args.getlist('scopes')
return_url = args.pop('return_url', None)
if return_url is None:
return_url = request.referrer or '/'
flow = self._make_flow(return_url=return_url, **args)
auth_url = flow.step1_get_authorize_url()
return redirect(auth_url)
def customEvent():
event_name = request.args['eventName']
location = request.args['location']
start_dt = request.args['startDt']
end_dt = request.args['endDt']
event_hashkey = utils.make_hashkey("salttt");
if len(event_name) == 0:
event_name = None
if len(location) == 0:
location = None
db_manager.query(
"""
INSERT INTO EVENT
(event_hashkey,calendar_hashkey,event_id,summary,start_dt,end_dt,location)
VALUES
(%s, 'admin_calendar_hashkey','admin_event_id', %s, %s, %s, %s)
""",
(event_hashkey,event_name,start_dt,end_dt,location)
)
reco_maestro = recoMaestro.RecoMaestro( account_hashkey = 'admin_account_hashkey', switchExtractor = True)
return json.dumps(reco_maestro.result_final)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
app.logger.debug(_data)
access_token = _data['access_token']
uid = _data['uid']
userData = Get_User_Info(access_token, uid)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
openid = Get_OpenID(access_token)['openid']
userData = Get_User_Info(access_token, openid)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
app.logger.debug(_data)
access_token = _data['access_token']
uid = _data['uid']
userData = Get_User_Info(access_token, uid)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def get_filter_arguments(mrequest):
"""
Get timestamp and address from request
"""
data = mrequest.args
cur_timestamp, addr = None, None
if data is not None:
if 'timestamp' in data.keys():
cur_timestamp = data['timestamp']
form = "%Y-%m-%dT%H:%M:%S.%f"
try:
cur_timestamp = datetime.datetime.strptime(cur_timestamp, form)
except ValueError:
abort(500, "Wrong timestamp format")
if 'addr' in data.keys():
addr = int(data['addr'], 16)
return cur_timestamp, addr
def _session_history():
try:
data = request.args
sid = data.get('session')
sess = session_manager.get_session(sid)
fname = sess.dump_file
if fname is not None and os.path.isfile(fname):
return send_from_directory(
os.path.dirname(fname),
os.path.basename(fname),
mimetype='text/plain')
else:
return '', 404
except Exception as ex:
logger.error("Internal error {}".format(ex))
return '', 500
def jsonhistory():
history = db.session.query(models.Processed)
table = DataTable(request.args, models.Processed, history, [
("date", "time", lambda i: "{}".format(i.time.strftime('%Y/%m/%d')) if i.stopped else '<span class="orange">{}</span>'.format(_("Currently watching..."))),
("ipaddress", lambda i: "{}".format(i.get_xml().find('Player').get("address"))),
("user", lambda i: '<a href="{0}" class="invert-link">{1}</a>'.format(url_for('user', name=i.user), i.user)),
("platform"),
("title", lambda i: u'<a class="invert-link" href="{0}">{1}</a>'.format(url_for('info', id=i.get_xml_value('ratingKey')), i.title)),
("type", lambda i: "{}".format(i.get_xml_value("type"))),
("streaminfo", lambda i: '<a href="#" data-link="{0}" class="orange" data-target="#streamModal" data-toggle="modal"><i class="glyphicon glyphicon glyphicon-info-sign"></i></a>'.format(url_for('streaminfo',id=i.id)) if i.platform != "Imported" else ''),
("time",lambda i: "{}".format(i.time.strftime('%H:%M'))),
("paused_counter", lambda i: "{} min".format(int(i.paused_counter)/60) if i.paused_counter else "0 min" ),
("stopped", lambda i: "{}".format(i.stopped.strftime('%H:%M')) if i.stopped else "n/a"),
("duration", lambda i: "{} min".format(int((((i.stopped - i.time).total_seconds() - (int(i.paused_counter or 0))) /60))) if i.stopped else "n/a"),
("completed", lambda i: '<span class="badge badge-warning">{}%</span>'.format(helper.getPercentage(i.get_xml_value("duration") if i.platform == "Imported" else i.get_xml_value("viewOffset"), i.get_xml_value("duration")))),
])
table.searchable(lambda queryset, user_input: perform_some_search(queryset, user_input))
return json.dumps(table.json())
def audit(func):
"""
Record a Flask route function in the audit log.
Generates a JSON record in the Flask log for every request.
"""
@wraps(func)
def wrapper(*args, **kwargs):
options = AuditOptions(
include_request_body=DEFAULT_INCLUDE_REQUEST_BODY,
include_response_body=DEFAULT_INCLUDE_RESPONSE_BODY,
include_path=True,
include_query_string=True,
)
with logging_levels():
return _audit_request(options, func, None, *args, **kwargs)
return wrapper
def __init__(self, options, func, request_context):
self.options = options
self.operation = request.endpoint
self.func = func.__name__
self.method = request.method
self.args = request.args
self.view_args = request.view_args
self.request_context = request_context
self.timing = dict()
self.error = None
self.stack_trace = None
self.request_body = None
self.response_body = None
self.response_headers = None
self.status_code = None
self.success = None
def file_index(page=1):
"""Show available actions regarding files.
Args:
page (int): Listing page number to show.
"""
order_key, order_dir, ordering = _sort_uploads(request.args)
files = (
FileUpload.query
.order_by(ordering)
.paginate(page, 20, False)
)
try:
return render_template(
'akamatsu/dashboard/file/index.html',
files=files,
order_key=order_key,
order_dir=order_dir
)
except NotFound:
# Show a 'no files found' notice instead of a 404 error
return render_template('akamatsu/dashboard/file/index.html')
def user_index(page=1):
"""Show list of users registered in the application.
Args:
page (int): Listing page number to show.
"""
order_key, order_dir, ordering = _sort_users(request.args)
users = (
User.query
.order_by(ordering)
.paginate(page, 20, False)
)
try:
return render_template(
'akamatsu/dashboard/user/index.html',
users=users,
order_key=order_key,
order_dir=order_dir
)
except NotFound:
# Show a 'no posts found' notice instead of a 404 error
return render_template('akamatsu/dashboard/user/index.html')
def config_sensors_change(self, mode, id):
if mode == 'add' and 'module' not in request.args:
if request.method == 'POST':
filename = OS().upload_file('sensors/', 'file');
return self._get_module_chooser("Add Sensor", "/config/sensors/add", "sensors", "Sensor")
data = {
"edit": (mode == 'edit'),
"mode": mode,
"sensor": None,
"sensor_impl": None,
"sensor_module": None,
"modules": OS().get_classes("sensors", "Sensor")
}
if mode == 'edit' and id is not None:
sensor = Sensors().get(id)
data['sensor'] = sensor
data['sensor_module'] = sensor.get_classpath()
data['sensor_impl'] = sensor.get_sensor_impl()
elif mode == 'add':
data['sensor_module'] = request.args.get('module')
data['sensor_impl'] = OS().create_object(data['sensor_module'])
return self.get_view('config_sensor_change.html').data(data)
def config_notifications_change(self, mode, nid):
if mode == 'add' and 'module' not in request.args:
if request.method == 'POST':
filename = OS().upload_file('notifiers/', 'file')
return self._get_module_chooser("Add Notification Service", "/config/notifications/add", "notifiers", "Notifier")
data = {
"edit": (mode == 'edit'),
"mode": mode,
"notifier": None,
"notifier_impl": None,
"notifier_module": None,
"modules": OS().get_classes("notifiers", "Notifier")
}
if mode == 'edit' and nid is not None:
notifier = Notifiers().get(nid)
data['notifier'] = notifier
data['notifier_module'] = notifier.get_classpath()
data['notifier_impl'] = notifier.get_notifier_impl()
elif mode == 'add':
data['notifier_module'] = request.args.get('module')
data['notifier_impl'] = OS().create_object(data['notifier_module'])
return self.get_view('config_notifications_change.html').data(data)
def index():
for ip in request.headers.get('X-Forwarded-For', '').split(','):
ip = ip.strip().lower()
if ip in HALL_OF_SHAME:
abort(403)
if 'f' in request.args:
try:
f = request.args['f']
if re.search(r'proc|random|zero|stdout|stderr', f):
abort(403)
elif '\x00' in f:
abort(404)
return open(f).read(4096)
except IOError:
abort(404)
else:
return INDEX
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User Browser: %s
User Browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def search_results():
q = request.args.get('q') or ''
if not q:
return render_template('results_page.html', results=[], q=q)
m = re_qid.match(q.strip())
if m:
return redirect(url_for('item_page', wikidata_id=m.group(1)[1:]))
try:
results = nominatim.lookup(q)
except nominatim.SearchError:
message = 'nominatim API search error'
return render_template('error_page.html', message=message)
update_search_results(results)
for hit in results:
add_hit_place_detail(hit)
return render_template('results_page.html', results=results, q=q)
def saved_places():
abort(404)
if 'filter' in request.args:
arg_filter = request.args['filter'].strip().replace(' ', '_')
if arg_filter:
return redirect(url_for('saved_with_filter', name_filter=arg_filter))
else:
return redirect(url_for('saved_places'))
sort = request.args.get('sort') or 'name'
name_filter = g.get('filter') or None
if name_filter:
place_tbody = render_template('place_tbody.html',
existing=get_existing(sort, name_filter))
else:
place_tbody = get_place_tbody(sort)
return render_template('saved.html', place_tbody=place_tbody, sort_link=sort_link)
def twitter_authorized(resp):
if resp is None:
return 'Access denied: reason: {} error:{}'.format(
request.args['error_reason'],
request.args['error_description'])
session['twitter_oauth_token'] = resp['oauth_token'] + \
resp['oauth_token_secret']
user = User.query.filter_by(
username=resp['screen_name']).first()
if not user:
user = User(username=resp['screen_name'], password='jmilkfan')
db.session.add(user)
db.session.commit()
flash("You have been logged in.", category="success")
return redirect(
request.args.get('next') or url_for('blog.home'))
def push(URI, arc_id, p_args={}):
global handlers
try:
# push to all possible archives
res = []
### if arc_id == 'all':
### for handler in handlers:
### if (handlers[handler].api_required):
# pass args like key API
### res.append(handlers[handler].push(str(URI), p_args))
### else:
### res.append(handlers[handler].push(str(URI)))
### else:
# push to the chosen archives
for handler in handlers:
if (arc_id == handler) or (arc_id == 'all'):
### if (arc_id == handler): ### and (handlers[handler].api_required):
res.append(handlers[handler].push(str(URI), p_args))
### elif (arc_id == handler):
### res.append(handlers[handler].push(str(URI)))
return res
except:
pass
return ["bad request"]
def get_callsign(objtype, kw):
def decorator(func):
@wraps(func)
def decorated_function(*args, **kwargs):
try:
item = int(re.match(r'^\d+', kwargs[kw]).group(0))
item = objtype(item)
item._data
except (NameError, AttributeError, OverflowError, NoRow):
abort(404)
kwargs[kw] = item
setattr(g, kw, item)
return func(*args, **kwargs)
return decorated_function
return decorator
def special_access_required(func):
@login_required
@wraps(func)
def decorated_function(*args, **kwargs):
if current_user.type != UserType.ADMIN:
if 'club' in kwargs:
club = kwargs['club']
elif 'activity' in kwargs:
club = kwargs['activity'].club
else:
abort(403) # Admin-only page
if current_user.type == UserType.TEACHER:
if current_user != club.teacher:
abort(403)
else:
if current_user != club.leader:
abort(403)
return func(*args, **kwargs)
return decorated_function
def require_student_membership(func):
@login_required
@wraps(func)
def decorated_function(*args, **kwargs):
if 'club' in kwargs:
club = kwargs['club']
elif 'activity' in kwargs:
club = kwargs['activity'].club
else:
assert False
if current_user not in club.members:
abort(403)
return func(*args, **kwargs)
return decorated_function
def require_membership(func):
@login_required
@wraps(func)
def decorated_function(*args, **kwargs):
if current_user.type != UserType.ADMIN:
if 'club' in kwargs:
club = kwargs['club']
elif 'activity' in kwargs:
club = kwargs['activity'].club
else:
assert False
if current_user not in [club.teacher] + club.members:
abort(403)
return func(*args, **kwargs)
return decorated_function
def __call__(self, func):
@wraps(func)
def decorator(*args, **kwargs):
response = func(*args, **kwargs)
if not USE_CACHE:
return response
for path in self.routes:
route = '%s,%s,%s' % (path, json.dumps(self.args), None)
Cache.delete(unicode(route))
user = kwargs.get('user', None) if self.clear_for_user else None
Logger.debug(unicode('clear %s from cache' % (route)))
if user:
route = '%s,%s,%s' % (path, json.dumps(self.args), user)
Cache.delete(unicode(route))
Logger.debug(unicode('clear %s from cache' % (route)))
return response
return decorator
def perm_required(func):
""" Check if user can do actions
"""
@wraps(func)
def check_perm(*args, **kwargs):
if 'report' in kwargs:
GeneralController.check_perms(
method=request.method,
user=g.user,
report=kwargs['report']
)
if 'ticket' in kwargs:
GeneralController.check_perms(
method=request.method,
user=g.user,
ticket=kwargs['ticket']
)
if 'defendant' in kwargs and request.method != 'GET':
GeneralController.check_perms(
method=request.method,
user=g.user,
defendant=kwargs['defendant']
)
return func(*args, **kwargs)
return check_perm
def callback(self):
if 'code' not in request.args:
return None, None, None
oauth_session = self.service.get_auth_session(
data={'code': request.args['code'],
'grant_type': 'authorization_code',
'redirect_uri': self.get_callback_url()},
decoder=json.loads
)
me = oauth_session.get('/api/v1/user.json').json()
return (
me.get('data').get('id'),
me.get('data').get('first_name'),
me.get('data').get('email')
)
def __init__(self, *args, **kwargs):
print "init app"
super(App, self).__init__(*args, **kwargs)
self.config.from_object('config.Production')
self.config['ROOT_FOLDER'], _ = os.path.split(self.instance_path)
self.setup()
def set_request(cls, request, **kwargs):
uuid = uuid4().hex[:8]
cls.requests[uuid] = dict(request.args)
if kwargs:
cls.requests[uuid].update(kwargs)
return uuid
def collector(command):
command = command.replace('-', '_')
cmd = getattr(app._cr, 'do_%s' % command)
return cmd(**request.args)
def analyzer(command):
kwargs = dict(request.args)
uuid = parse_single_arg('request_id', kwargs)
if uuid:
kwargs = Request.get_request(uuid)
command = command.replace('-', '_')
cmd = getattr(app._ar, 'do_%s' % command)
return cmd(**kwargs)
def plot():
command = parse_single_arg('plot', request.args, default_val='raw-data')
uuid = Request.set_request(request, columns_data=True)
return render_template('chart.html', command=command, req_id=uuid)
def handle_verification():
"""
Handles Facebook verification call.
"""
return request.args['hub.challenge']
def showgauges():
return render_template("showgauges.html",
subjectivityavg=request.args["subjectivityavg"],
sentimentavg=request.args["sentimentavg"])
def telemetry(function):
def _wrapper(*args, **kwargs):
telemetry = Telemetry(referrer=request.referrer,
ip=md5(request.remote_addr).hexdigest(),
creation_date=datetime.now())
save(telemetry)
return function(*args, **kwargs)
return _wrapper
def index_page():
if request.args.get('epub'):
return create_epub()
else:
return create_page()
def create_page():
page = int(request.args.get("page", "0"))
limit = int(request.args.get("limit", "10"))
offset = page * limit
toots, count, count_all = get_toots(offset, limit)
accounts = Account.query.order_by(Account.username)
instances = Instance.query.order_by(Instance.domain)
blacklist_status = True if request.args.get('blacklisted', None) else False
if request.args.get('blacklisted') != 'ignore':
accounts = accounts.filter(Account.blacklisted == blacklist_status)
instances = instances.filter(Instance.blacklisted == blacklist_status)
pagination = {'page': page + 1,
'limit': limit}
pagination['next'] = "/?page=%s&" % (page + 1)
pagination['previous'] = "/?page=%s&" % (page - 1)
for key, value in request.args.iteritems():
if not key == "page":
pagination['next'] += "&%s=%s" % (key, value)
pagination['previous'] += "&%s=%s" % (key, value)
if count < limit:
pagination.pop('next')
if page == 0:
pagination.pop('previous')
pagination['page_count'] = int(count_all / limit) + 1
return render_template('index.html',
toots=toots,
accounts=accounts,
instances=instances,
pagination=pagination)
def authorize(self, *args, **kwargs):
"""Default authorization method."""
if self.api is not None:
return self.api.authorize(*args, **kwargs)
return current_user
def get_many(self, *args, **kwargs):
"""Get collection."""
return []
def get_one(self, *args, **kwargs):
"""Load resource."""
return kwargs.get(self.meta.name)