Python flask.request 模块,query_string() 实例源码
我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用flask.request.query_string()。
def url():
# check if the post request has the file part
url = request.query_string[2:]
if url == "":
#print("not url")
return redirect("/")
title = uuid.uuid4().hex
path = os.path.join(app.config['UPLOAD_FOLDER'], title)
# get url file
(filename, headers) = urllib.urlretrieve(url, path)
# convert to jpg
im = Image.open(path)
if im.mode != "RGB":
im = im.convert("RGB")
im.save(path+".jpg", "JPEG")
FLAGS.input_files = path+".jpg"
# inference
result = main(None)
return jsonify(result)
def get_info_hash(request, multiple=False):
"""
Get infohashes from a QS.
"""
if not multiple:
return b2a_hex(cgi.parse_qs(request.query_string)['info_hash'][0])
else:
hashes = set()
for hash in cgi.parse_qs(request.query_string)['info_hash']:
hashes.add(b2a_hex(hash))
return hashes
def get_hosts():
"""Return all hosts."""
#Check for query string, redirect to endpoint with trailling '/'.
if request.query_string:
return redirect(url_for('run_cmd') + '?' + request.query_string)
hosts = RSPET_API.get_hosts()
return jsonify({'hosts': [make_public_host(hosts[h_id], h_id) for h_id in hosts]})
def get_host(host_id):
"""Return specific host."""
#Check for query string, redirect to endpoint with trailling '/'.
if request.query_string:
return redirect(url_for('run_cmd_host', host_id=host_id) + '?' + request.query_string)
hosts = RSPET_API.get_hosts()
try:
host = hosts[host_id]
except KeyError:
abort(404)
return jsonify(make_public_host(host, host_id))
def prepare_auth_request(request):
url_data = urlparse(request.url)
return {
"https": 'on',
'http_host': request.host,
'server_port': url_data.port,
'script_name': request.path,
'get_data': request.args.copy(),
'post_data': request.form.copy(),
# Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144
# 'lowercase_urlencoding': True,
'query_string': request.query_string
}
def get_current_url():
if not request.query_string:
return request.path
return '%s?%s' % (request.path, request.query_string)
def get_current_url():
if not request.query_string:
return request.path
return '%s?%s' % (request.path, request.query_string)
def img():
i = request.query_string
f = open('a.png','wb')
f.write(i.decode('base64'))
return "success <img src='" + i + "'>"
def announce():
'''/announce.php?info_hash=&peer_id=&ip=&port=&uploaded=&downloaded=&left=&numwant=&key=&compact=1'''
ip = request.args.get('ip')
port = request.args.get('port')
if not ip or not port:
return abort(404)
address = (ip, int(port))
binhash = urlparse.parse_qs(request.query_string)['info_hash'][0]
country = geoip.country_code_by_addr(ip)
if country not in ('CN','TW','JP','HK', 'KR'):
return abort(404)
rpc.announce(binhash.encode('hex'), address)
return bencode({'peers': '', 'interval': 86400})
def try_get_banner(user_id, sizename, privacy=0):
if sizename.endswith(".png"):
sizename = sizename[:-4]
if sizename not in sizemap:
abort(404)
if len(str(user_id)) != 9:
abort(404)
size = sizemap[sizename]
try:
data, mtime = get_data(user_id)
key = "%d_p%d" % (user_id, privacy)
privatize(data, privacy)
res = get_sized_banner(key, data, mtime, size)
if request.query_string == "dl":
res.headers['Content-Disposition'] = 'attachment; filename=%d_p%d_%s.png' % (user_id, privacy, sizename)
return res
except APIError as e:
if e.code == 1457:
return send_file("static/error_404_%d.png" % size, mimetype="image/png", cache_timeout=60)
elif e.code == 101:
return send_file("static/error_503_%d.png" % size, mimetype="image/png", cache_timeout=60)
else:
app.logger.exception("API error for %r/%r/%r" % (user_id, sizename, privacy))
return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60)
except Exception as e:
app.logger.exception("Exception thrown for %r/%r/%r" % (user_id, sizename, privacy))
return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60)
def try_get_snap(snap, sizename):
if sizename.endswith(".png"):
sizename = sizename[:-4]
if sizename not in sizemap:
abort(404)
size = sizemap[sizename]
data = load_snap(snap)
key = "s_" + snap
res = get_sized_banner(key, data, None, size, max_age=None)
if request.query_string == "dl":
res.headers['Content-Disposition'] = 'attachment; filename=snap_%s_%s.png' % (snap, sizename)
return res
def get_current_url():
if not request.query_string:
return request.path
return '%s?%s' % (request.path, request.query_string)
def get_current_url():
if not request.query_string:
return request.path
return '%s?%s' % (request.path, request.query_string)
def _get_devices():
db = aeon_ztp.db.session
to_json = device_schema
# ---------------------------------------------------------------
# if the request has arguments, use these to form an "and" filter
# and return only the subset of items matching
# ---------------------------------------------------------------
if request.args:
try:
recs = find_devices(db, request.args.to_dict())
if len(recs) == 0:
return jsonify(ok=False,
message='Not Found: %s' % request.query_string), 404
items = [to_json.dump(rec).data for rec in recs]
return jsonify(count=len(items), items=items)
except AttributeError:
return jsonify(ok=False, message='invalid arguments'), 500
# -------------------------------------------
# otherwise, return all items in the database
# -------------------------------------------
items = [to_json.dump(rec).data for rec in db.query(Device).all()]
return jsonify(count=len(items), items=items)
# -----------------------------------------------------------------------------
# POST /api/devices
# -----------------------------------------------------------------------------
def _delete_devices():
if request.args.get('all'):
try:
db = aeon_ztp.db.session
db.query(Device).delete()
db.commit()
except Exception as exc:
return jsonify(
ok=False,
message='unable to delete all records: {}'.format(exc.message)), 400
return jsonify(ok=True, message='all records deleted')
elif request.args:
db = aeon_ztp.db.session
try:
recs = find_devices(db, request.args.to_dict())
n_recs = len(recs)
if n_recs == 0:
return jsonify(ok=False,
message='Not Found: %s' % request.query_string), 404
for dev in recs:
db.delete(dev)
db.commit()
return jsonify(
ok=True, count=n_recs,
message='{} records deleted'.format(n_recs))
except AttributeError:
return jsonify(ok=False, message='invalid arguments'), 500
except Exception as exc:
msg = 'unable to delete specific records: {}'.format(exc.message)
return jsonify(ok=False, message=msg), 500
else:
return jsonify(ok=False, message='all or filter required'), 400
def home_project():
"""Fetches the home project, creating it if necessary.
Eve projections are supported, but at least the following fields must be present:
'permissions', 'category', 'user'
"""
from pillar.auth import current_user
user_id = current_user.user_id
roles = current_user.roles
log.debug('Possibly creating home project for user %s with roles %s', user_id, roles)
if HOME_PROJECT_USERS and not HOME_PROJECT_USERS.intersection(roles):
log.debug('User %s is not a subscriber, not creating home project.', user_id)
return 'No home project', 404
# Create the home project before we do the Eve query. This costs an extra round-trip
# to the database, but makes it easier to do projections correctly.
if not has_home_project(user_id):
write_access = write_access_with_roles(roles)
create_home_project(user_id, write_access)
resp, _, _, status, _ = get('projects', category='home', user=user_id)
if status != 200:
return utils.jsonify(resp), status
if resp['_items']:
project = resp['_items'][0]
else:
log.warning('Home project for user %s not found, while we just created it! Could be '
'due to projections and other arguments on the query string: %s',
user_id, request.query_string)
return 'No home project', 404
return utils.jsonify(project), status
def setSpeed():
print 'gotParams'
print request.query_string
print request.args
print request.args.getlist('name[]')
# print request.form.get('left',1,type=int)
#print request.form.get('right',1,type=int)
print request.args['right']
print request.args['left']
return ''
def auth_decorator(func):
@wraps(func)
def decorator_func(*args,**kwargs):
user = request.headers.get('user')
api_key = request.headers.get('api_key')
# api_secret = request.headers.get('api_secret')
user_hash = request.headers.get('hash')
user_timestamp = request.headers.get('timestamp')
if not user or not api_key :
return jsonify("Error: Invalid Request"),412
if not hash or not user_timestamp or not user_hash:
return jsonify("Error: Invalid Request"), 412
server_key = get_key(api_key,user)
if not server_key:
return jsonify("key not found"),412
timestamp_hash = generate_hmac(str(server_key), str(user_timestamp))
#for get request
if request.method == 'GET':
url = request.path + '?' + request.query_string if request.query_string else request.path
server_hash = generate_hmac(str(timestamp_hash), str(url))
if hmac.compare_digest(server_hash, user_hash):
return func(*args, **kwargs)
else:
return jsonify("Error : HMAC is not matched"), 412
#change with the hmac
# server_hash = base64.base64encode(str(server_key),str(url))
# if user_hash == server_hash:
# return func(*args,**kwargs)
# else :
# return jsonify("Error: HMAC is not matched"),412
if request.method == 'POST':
#check for file upload
data = request.data.decode('utf-8')
server_hash = generate_hmac(str(timestamp_hash),data)
if hmac.compare_digest(server_hash,user_hash):
return func(*args, **kwargs)
else:
return jsonify("Error : HMAC is not matched"), 412
return decorator_func