Python flask.request 模块,full_path() 实例源码
我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用flask.request.full_path()。
def needs_authentication():
"""
Decorator: Attach this to a route and it will require that the session has been
previously authenticated.
"""
def auth_chk_wrapper(f):
# This is our decoratorated function wrapper
@wraps(f)
def deco(*args, **kwargs):
# If the session does not yet have an authentication
if not is_authenticated():
return redirect(sign_auth_path(request.full_path))
else:
return f(*args, **kwargs)
return deco
return auth_chk_wrapper
def needs_authentication():
"""
Decorator: Attach this to a route and it will require that the session has been
previously authenticated.
"""
def auth_chk_wrapper(f):
# This is our decoratorated function wrapper
@wraps(f)
def deco(*args, **kwargs):
# If the session does not yet have an authentication
if not is_authenticated():
return redirect(sign_auth_path(request.full_path))
else:
return f(*args, **kwargs)
return deco
return auth_chk_wrapper
def metadata():
"""
Metadata
---
tags:
- matchbox
responses:
200:
description: Metadata of the current group/profile
schema:
type: string
"""
matchbox_uri = application.config.get("MATCHBOX_URI")
if matchbox_uri:
matchbox_resp = requests.get("%s%s" % (matchbox_uri, request.full_path))
resp = matchbox_resp.content
matchbox_resp.close()
return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain")
return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")
def cached(timeout=5 * 60, key='blog_view_%s'):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
cache_key = key % request.full_path
if request.method == 'POST':
value = f(*args, **kwargs)
cache.set(cache_key, value, timeout=timeout)
return value
value = cache.get(cache_key)
if value is None:
value = f(*args, **kwargs)
cache.set(cache_key, value, timeout=timeout)
return value
return decorated_function
return decorator
def after_request(response):
timestamp = strftime('[%Y-%b-%d %H:%M]')
f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) )
return response
def exceptions(e):
tb = traceback.format_exc()
timestamp = strftime('[%Y-%b-%d %H:%M]')
f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) )
return abort(500)
def before_request():
''' Pre-request hander '''
logger.debug("Incoming Web Request: {0}".format(request.full_path))
g.dbc = connect_db(app.config)
def internal_server_error(e):
message = repr(e)
trace = traceback.format_exc()
trace = string.split(trace, '\n')
timestamp = (datetime.fromtimestamp(time.time())
.strftime('%Y-%m-%d %H:%M:%S'))
if current_user.is_authenticated:
user = current_user.username
else:
user = 'anonymous'
gathered_data = ('message: {}\n\n\n'
'timestamp: {}\n'
'ip: {}\n'
'method: {}\n'
'request.scheme: {}\n'
'request.full_path: {}\n'
'user: {}\n\n\n'
'trace: {}'.format(message, timestamp,
request.remote_addr, request.method,
request.scheme, request.full_path,
user, '\n'.join(trace)))
# send email to admin
if app.config['TESTING']:
print(gathered_data)
else:
mail_message = gathered_data
msg = Message('Error: ' + message[:40],
body=mail_message,
recipients=[app.config['ADMIN_MAIL']])
mail.send(msg)
flash(_('A message has been sent to the administrator'), 'info')
bookcloud_before_request()
return render_template('500.html', message=gathered_data), 500
def save_timing(name, t0):
timing = Timing(start=t0,
path=request.full_path,
name=name,
seconds=time() - t0)
database.session.add(timing)
def add_comment(user, article_id):
"""
@api {post} /article/:id/comment Post comment to an article
@apiName Post comment to an article
@apiGroup Comment
@apiUse AuthorizationTokenHeader
@apiParam {String} id Article unique ID.
@apiParam {String} comment The comment to be added
@apiParam {Boolean} public The privacy setting for the comment, default is True
@apiParamExample Request (Example)
{
"comment": "I hate you",
"public": "false"
}
@apiUse UnauthorizedAccessError
@apiUse ResourceDoesNotExist
@apiUse BadRequest
"""
app.logger.info('User {} Access {}'.format(user, request.full_path))
# Get request body
req = RequestUtil.get_request()
comment = req.get('comment')
public = req.get('public')
# Add comment
result = MongoUtil.add_comment(user, article_id, comment, public)
# If error occurs
if isinstance(result, str):
return ResponseUtil.error_response(result)
app.logger.info('User {} Create comment {}'.format(user, result.id))
return jsonify(JsonUtil.serialize(result))
def upload(path=""):
"""
Uploads a file to a place.
"""
# Check that uploads are OK
if not can_upload():
return redirect(sign_auth_path(request.full_path))
if request.method=='POST':
# handle uploaded files
if not 'file' in request.files:
abort(400) # General UA error
file = request.files["file"]
# We default to using the name of the file provided,
# but allow the filename to be changed via POST details.
fname = file.filename
if 'name' in request.form:
fname = request.form['name']
safe_fname = secure_filename(fname)
if not allowed_filename(safe_fname):
abort(400) # General client error
# We're handling a potentially dangerous path, better run it through
# The flask path jointer.
basepath=app.config.get("storage", "location")
fullpath = safe_join(basepath, path)
file.save(os.path.join(fullpath,fname))
flash("File uploaded successfully")
return redirect(url_for('browser.upload',path=path))
else:
return render_template("browser/upload.html",path=path,title="Upload Files")
def __call__(self, f):
@functools.wraps(f)
def my_decorator(*args, **kwargs):
global _is_cached
_is_cached = True
if _cache is None:
return f(*args, **kwargs)
response = _cache.get(request.full_path)
if response is None:
response = f(*args, **kwargs)
_cache.set(request.path, response, self.timeout)
_is_cached = False
if self.render_layout:
wrapped_response = make_response("%s%s%s" % (render_template("layout_header.html"), response, render_template("layout_footer.html")))
if is_response(response):
wrapped_response.status_code = response.status_code
wrapped_response.headers = response.headers
wrapped_response.status = response.status
wrapped_response.mimetype = response.mimetype
return wrapped_response
else:
return response
functools.update_wrapper(my_decorator, f)
return my_decorator
def setup_session():
session.permanent = True
app.permanent_session_lifetime = timedelta(days=365*30)
if not 'uuid' in session:
session['uuid'] = str(uuid.uuid4())
g.uuid_is_fresh = True
else:
g.uuid_is_fresh = False
now = datetime.now()
referrer = request.headers.get('Referer', '')
path = request.path
full_path = request.full_path
agent = request.headers.get('User-Agent', '')
if agent in BLACKLIST_AGENT or len(agent) < 15:
g.request_log_id = 0
return render_template('error.html',code=200,message="Layer 8 error. If you want my data, DON'T SCRAPE (too much cpu load), contact me and I will give it to you"), 200
with db_session:
req_log = RequestLog( uuid=session['uuid'],
uuid_is_fresh=g.uuid_is_fresh,
created_at=now,
agent=agent,
referrer=referrer,
path=path,
full_path=full_path)
flush()
g.request_log_id = req_log.id
def login_required(func):
"""decorator for login required
This decorator can be used for declaring a function
which requires authentication
"""
@wraps(func)
def handle_login_required(*args, **kwargs):
# try to get username from session
try:
username = session["username"]
return func(*args, **kwargs)
except:
pass
# allow HTTP basic auth
try:
user_basic = request.authorization.username
password_basic = request.authorization.password
authprovider = security.AuthenticationProvider.get_authprovider()
if authprovider.authenticate(user_basic, password_basic):
return func(*args, **kwargs)
except:
pass
# on error: redirect to login page or return HTTP/401
if json_check():
return json_error("Unauthenticated", 401)
session["redirect"] = request.full_path
return redirect("/login")
return handle_login_required
def before_request():
logger.info('feedback request', url_path=request.full_path)
metadata = get_metadata(current_user)
if metadata:
logger.bind(tx_id=metadata['tx_id'])
g.schema_json = load_schema_from_metadata(metadata)
def before_request():
metadata = get_metadata(current_user)
if metadata:
logger.bind(tx_id=metadata['tx_id'])
values = request.view_args
logger.bind(eq_id=values['eq_id'], form_type=values['form_type'],
ce_id=values['collection_id'])
logger.info('questionnaire request', method=request.method, url_path=request.full_path)
if metadata:
g.schema_json = load_schema_from_metadata(metadata)
_check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
def upload(path=""):
"""
Uploads a file to a place.
"""
# Check that uploads are OK
if not can_upload():
return redirect(sign_auth_path(request.full_path))
if request.method=='POST':
# handle uploaded files
if not 'file' in request.files:
abort(400) # General UA error
file = request.files["file"]
# We default to using the name of the file provided,
# but allow the filename to be changed via POST details.
fname = file.filename
if 'name' in request.form:
fname = request.form['name']
safe_fname = secure_filename(fname)
if not allowed_filename(safe_fname):
abort(400) # General client error
# We're handling a potentially dangerous path, better run it through
# The flask path jointer.
basepath=app.config.get("storage", "location")
fullpath = safe_join(basepath, path)
file.save(os.path.join(fullpath,fname))
flash("File uploaded successfully")
return redirect(url_for('browser.upload',path=path))
else:
return render_template("browser/upload.html",path=path,title="Upload Files")
def cached(timeout=1 * 60, key='view'):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
cache_key = '{}/{}'.format(key, request.full_path)
rv = cache.get(cache_key)
if rv is not None:
return rv
rv = f(*args, **kwargs)
cache.set(cache_key, rv, timeout=timeout)
return rv
return decorated_function
return decorator
def __call__(self, *args, **kwargs):
if not current_app.config.get('INDEXING_ENABLED', True):
return self._route(*args, **kwargs)
log = PageView(
page=request.full_path,
endpoint=request.endpoint,
user_id=current_user.id,
ip_address=request.remote_addr,
version=__version__
)
errorlog = None
log.object_id, log.object_type, log.object_action, reextract_after_request = self.extract_objects(*args, **kwargs)
db_session.add(log) # Add log here to ensure pageviews are accurate
try:
return self._route(*args, **kwargs)
except Exception as e:
db_session.rollback() # Ensure no lingering database changes remain after crashed route
db_session.add(log)
errorlog = ErrorLog.from_exception(e)
db_session.add(errorlog)
db_session.commit()
raise_with_traceback(e)
finally:
# Extract object id and type after response generated (if requested) to ensure
# most recent data is collected
if reextract_after_request:
log.object_id, log.object_type, log.object_action, _ = self.extract_objects(*args, **kwargs)
if errorlog is not None:
log.id_errorlog = errorlog.id
db_session.add(log)
db_session.commit()
def after_request(response):
logger.info('%s %s %s %s %s', request.remote_addr, request.method,
request.scheme, request.full_path, response.status)
return response
def exceptions(e):
tb = traceback.format_exc()
logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
request.remote_addr, request.method,
request.scheme, request.full_path, tb)
return e.status_code
def after_request(response):
logger.info('%s %s %s %s %s', request.method,
request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
request.scheme, request.full_path, response.status)
return response
def exceptions(e):
tb = traceback.format_exc()
tb = tb.decode('utf-8')
logger.error('%s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
request.method, request.scheme, request.full_path, tb)
return '500 INTERNAL SERVER ERROR', 500
def _before_request(self):
if self._has_token_checker is None or self.token_view is None:
return
if request.endpoint != self.token_view and request.endpoint not in self.token_view_overrides and self._has_token_checker() is not None and not self._has_token_checker():
return redirect(url_for(self.token_view, next=request.full_path))
def after_request(response):
timestamp = strftime('[%Y-%b-%d %H:%M]')
logger.error('%s %s %s %s %s %s',timestamp , request.remote_addr , \
request.method , request.scheme , request.full_path , response.status)
return response
def exceptions(e):
tb = traceback.format_exc()
timestamp = strftime('[%Y-%b-%d %H:%M]')
logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
timestamp, request.remote_addr, request.method,
request.scheme, request.full_path, tb)
return make_response(e , 405)
def ipxe():
"""
iPXE
---
tags:
- matchbox
responses:
200:
description: iPXE script
schema:
type: string
404:
description: Not valid
schema:
type: string
"""
app.logger.info("%s %s" % (request.method, request.url))
try:
matchbox_resp = requests.get(
"%s%s" % (
app.config["MATCHBOX_URI"],
request.full_path))
matchbox_resp.close()
response = matchbox_resp.content.decode()
mac = request.args.get("mac")
if mac:
repositories.machine_state.update(mac.replace("-", ":"), MachineStates.booting)
return Response(response, status=200, mimetype="text/plain")
except requests.exceptions.ConnectionError:
app.logger.warning("404 for /ipxe")
return "404", 404
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
if content is None or content == '':
return None
body = ''
if '\n' in content:
title, body = content.split('\n', 1)
body += '\n\n'
else:
title = content
body += 'Reported on {location} by {author}'.format(location=location, author=author)
if request:
body += textwrap.dedent("""
--------------------------------------------------------------------------------
Request Method: {method}
Path: {full_path}
Cookies: {cookies}
Endpoint: {endpoint}
View Args: {view_args}
Person: {id}
User-Agent: {user_agent}
Referrer: {referrer}
""".format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
print(title + '\n' + body)
# Only check for github details at the last second to get log output even if github not configured.
if not configuration.get('github_user') or not configuration.get('github_password'):
return None
g = Github(configuration.get('github_user'), configuration.get('github_password'))
repo = g.get_repo(repo)
issue = repo.create_issue(title=title, body=body)
return issue
def after_request(response):
timestamp = strftime('%Y-%b-%d %H:%M:%S')
useragent = request.user_agent
logger.info('[%s] [%s] [%s] [%s] [%s] [%s] [%s] [%s]',
timestamp, request.remote_addr, useragent, request.method,
request.scheme, request.full_path, response.status, request.referrer)
return response
def exceptions(e):
tb = traceback.format_exc()
timestamp = strftime('[%Y-%b-%d %H:%M]')
logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
timestamp, request.remote_addr, request.method,
request.scheme, request.full_path, tb)
return e.status_code
#Run the main app...
def key_prefix():
cache_key = 'view_%s' % request.full_path
return cache_key
def after_request(response):
timestamp = strftime('[%Y-%b-%d %H:%M]')
f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) )
return response
def exceptions(e):
tb = traceback.format_exc()
timestamp = strftime('[%Y-%b-%d %H:%M]')
f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) )
return abort(500)
def get_api(config, tools, models):
# config
API_NAME = config.get('api_journey', 'name')
DEFAULT_PAGE_SIZE = config.get('api_journey', 'page_size')
CACHE_TIMEOUT_SECS = config.getint('cache', 'timeout')
cache = tools['cache']
mongo = tools['mongo']
auth = tools['auth']
event_model = models['events']
# controller
ctrler = Blueprint(API_NAME, __name__)
def make_cache_key():
"""Make a key that includes GET parameters."""
return request.full_path
# route
@ctrler.route('/actors/<actor_id>/events', methods=['GET'])
@cache.cached(key_prefix=make_cache_key, timeout=CACHE_TIMEOUT_SECS)
def get_actor_events(actor_id):
pagesize = int(request.args.get('_page_size', default=DEFAULT_PAGE_SIZE))
page = int(request.args.get('_page', default=1))
# parse condition
cond = parse_query_to_mongo_cond(request.args, {
'actor.id': actor_id
})
output = event_model.query_by_page(cond, pagesize, page)
return jsonify(output)
@ctrler.route('/events', methods=['GET'])
@auth.login_required
def list_all_events():
return jsonify({'events': [e for e in event_model.find()]})
@ctrler.route('/events', methods=['POST'])
def send_new_event():
# async operation
pub = tools['pubsub']['producer']
event = request.get_json()
pub.publish(event)
return jsonify({'status': 'message is sent to backend for processing', 'event': event}), 201
return {'prefix': '/'+API_NAME, 'ctrler': ctrler}
def ignition():
"""
Ignition
---
tags:
- matchbox
responses:
200:
description: Ignition configuration
schema:
type: dict
403:
description: Matchbox unavailable
schema:
type: text/plain
503:
description: Matchbox is out of sync
schema:
type: text/plain
"""
cache_key = "sync-notify"
last_sync_ts = CACHE.get(cache_key)
app.logger.debug("cacheKey: %s is set with value %s" % (cache_key, last_sync_ts))
# we ignore the sync status if we arrived from /ignition-pxe because it's a discovery PXE boot
if last_sync_ts is None and request.path != "/ignition-pxe":
app.logger.error("matchbox state is out of sync: cacheKey: %s is None" % cache_key)
return Response("matchbox is out of sync", status=503, mimetype="text/plain")
if request.path == "/ignition-pxe":
app.logger.info("%s %s" % (request.method, request.url))
matchbox_uri = application.config.get("MATCHBOX_URI")
if matchbox_uri:
try:
# remove the -pxe from the path because matchbox only serve /ignition
path = request.full_path.replace("/ignition-pxe?", "/ignition?")
matchbox_resp = requests.get("%s%s" % (matchbox_uri, path))
resp = matchbox_resp.content
matchbox_resp.close()
return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain")
except requests.RequestException as e:
app.logger.error("fail to query matchbox ignition %s" % e)
return Response("matchbox doesn't respond", status=502, mimetype="text/plain")
return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")