Python flask.request 模块,url_root() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url_root()。
def advisory_atom():
last_recent_entries = 15
data = get_advisory_data()['published'][:last_recent_entries]
feed = AtomFeed('Arch Linux Security - Recent advisories',
feed_url=request.url, url=request.url_root)
for entry in data:
advisory = entry['advisory']
package = entry['package']
title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)
feed.add(title=title,
content=render_template('feed.html', content=advisory.content),
content_type='html',
summary=render_template('feed.html', content=advisory.impact),
summary_tpe='html',
author='Arch Linux Security Team',
url=TRACKER_ISSUE_URL.format(advisory.id),
published=advisory.created,
updated=advisory.created)
return feed.get_response()
def rss_feed():
feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)
documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())
for document in documents:
feed.add(document.title, document.tweet,
content_type='text',
author="@presproject2017",
url=make_external(document.full_url),
updated=document.document_date,
published=document.document_date)
return feed.get_response()
def run_tensorboard(run_id, tflog_id):
"""Launch TensorBoard for a given run ID and log ID of that run."""
data = current_app.config["data"]
# optimisticaly suppose the run exists...
run = data.get_run(run_id)
base_dir = Path(run["experiment"]["base_dir"])
log_dir = Path(run["info"]["tensorflow"]["logdirs"][tflog_id])
# TODO ugly!!!
if log_dir.is_absolute():
path_to_log_dir = log_dir
else:
path_to_log_dir = base_dir.joinpath(log_dir)
port = int(tensorboard.run_tensorboard(str(path_to_log_dir)))
url_root = request.url_root
url_parts = re.search("://([^:/]+)", url_root)
redirect_to_address = url_parts.group(1)
return redirect("http://%s:%d" % (redirect_to_address, port))
def get(self):
LOG.debug("API CALL: %s GET" % str(self.__class__.__name__))
resp = dict()
resp['versions'] = dict()
versions = [{
"status": "CURRENT",
"id": "v2",
"links": [
{
"href": request.url_root + '/v2',
"rel": "self"
}
]
}]
resp['versions'] = versions
return Response(json.dumps(resp), status=200, mimetype='application/json')
def get(self):
"""
Lists API versions.
:return: Returns a json with API versions.
:rtype: :class:`flask.response`
"""
LOG.debug("API CALL: Neutron - List API Versions")
resp = dict()
resp['versions'] = dict()
versions = [{
"status": "CURRENT",
"id": "v2.0",
"links": [
{
"href": request.url_root + '/v2.0',
"rel": "self"
}
]
}]
resp['versions'] = versions
return Response(json.dumps(resp), status=200, mimetype='application/json')
def get_feed():
from mhn.common.clio import Clio
from mhn.auth import current_user
authfeed = mhn.config['FEED_AUTH_REQUIRED']
if authfeed and not current_user.is_authenticated():
abort(404)
feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
url=request.url_root)
sessions = Clio().session.get(options={'limit': 1000})
for s in sessions:
feedtext = u'Sensor "{identifier}" '
feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
feedtext = feedtext.format(**s.to_dict())
feed.add('Feed', feedtext, content_type='text',
published=s.timestamp, updated=s.timestamp,
url=makeurl(url_for('api.get_session', session_id=str(s._id))))
return feed
def post(year, month, day, post_name):
rel_url = request.path[len('/post/'):]
fixed_rel_url = storage.fix_post_relative_url(rel_url)
if rel_url != fixed_rel_url:
return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect
post_ = storage.get_post(rel_url, include_draft=False)
if post_ is None:
abort(404)
post_d = post_.to_dict()
del post_d['raw_content']
post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
post_d['url'] = make_abs_url(post_.unique_key)
post_ = post_d
return custom_render_template(post_['layout'] + '.html', entry=post_)
def streaming_video(url_root):
'''Video streaming generator function'''
try:
while True:
if remote_control_cozmo:
image = get_annotated_image()
img_io = io.BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
yield (b'--frame\r\n'
b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n')
else:
asyncio.sleep(.1)
except cozmo.exceptions.SDKShutdown:
# Tell the main flask thread to shutdown
requests.post(url_root + 'shutdown')
def atom():
""" of the news page.
"""
resp = render_template('news/atom.xml', news=latest_news(current_session))
response = make_response(resp)
response.headers['Content-Type'] = 'application/atom+xml; charset=utf-8; filename=news-ATOM'
return response
# This makes output which crashes a feed validator.
# from werkzeug.contrib.atom import AtomFeed
# news=latest_news(current_session)
# feed = AtomFeed('pygame news', feed_url=request.url, url=request.url_root)
# for new in news:
# feed.add(new.title, new.description_html,
# content_type='html',
# author='pygame',
# url='https://www.pygame.org/news.html',
# updated=new.datetimeon,
# published=new.datetimeon)
# return feed.get_response()
def get(self, hash):
path = 'static' + os.sep + 'client.zip'
try:
os.remove(path)
except:
None
zip = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(CLIENT_FOLDER):
for f in files:
zip.write(os.path.join(root, f))
zip.close()
client = open(path).read()
if hash == hashlib.md5(client).hexdigest():
return {"err": "invalid request"}, 400
else:
return {"url": request.url_root + path}, 200
def server_netloc():
"""
Figure out the name of the server end of the request, punting if it's
the local host or not available.
"""
return urlparse.urlparse(request.url_root).netloc
#
# URLs
#
def root_url(path = None):
return request.url_root + ("" if path is None else path)
def make_external(url):
return urljoin(request.url_root, url)
def href_for(self, operation, qs=None, **kwargs):
"""
Construct an full href for an operation against a resource.
:parm qs: the query string dictionary, if any
:param kwargs: additional arguments for path expansion
"""
url = urljoin(request.url_root, self.url_for(operation, **kwargs))
qs_character = "?" if url.find("?") == -1 else "&"
return "{}{}".format(
url,
"{}{}".format(qs_character, urlencode(qs)) if qs else "",
)
def feed():
"""Return an atom feed for the blog."""
feed = AtomFeed(
'%s: Recent Posts' % app.config.get('SITENAME', 'akamatsu'),
feed_url=request.url,
url=request.url_root
)
posts = (
Post.query
.filter_by(is_published=True, ghost='')
.order_by(Post.timestamp.desc())
.limit(15)
)
for post in posts:
# unicode conversion is needed for the content
feed.add(
post.title,
markdown.render(post.content).unescape(),
content_type='html',
author=post.author.username,
url=url_for('blog.show', slug=post.slug, _external=True),
updated=post.timestamp
)
return feed.get_response()
def get_sign_in_view(target):
signin_url = request.url_root + target
oauth_service = OAuth2Service(
name="google",
client_id=current_app.config["GOOGLE_LOGIN_CLIENT_ID"],
client_secret=current_app.config["GOOGLE_LOGIN_CLIENT_SECRET"],
authorize_url=google_params.get("authorization_endpoint"),
base_url=google_params.get("userinfo_endpoint"),
access_token_url=google_params.get("token_endpoint"))
if "code" in request.args:
oauth_session = oauth_service.get_auth_session(
data={"code": request.args["code"],
"grant_type": "authorization_code",
"redirect_uri": signin_url},
decoder=json.loads)
user_data = oauth_session.get("").json()
user = load_user(user_data["email"])
if user:
flask_login.login_user(user)
return redirect(url_for("index"))
else:
error_message = "Not an authorized user ({})".format(user_data["email"])
return render_template("/sign_in.html", error_message=error_message)
elif "authorize" in request.args:
return redirect(oauth_service.get_authorize_url(
scope="email",
response_type="code",
prompt="select_account",
redirect_uri=signin_url))
else:
return render_template("/sign_in.html")
def get_next_url(self):
"""Returns the URL where we want to redirect to. This will
always return a valid URL.
"""
return (
self.check_safe_root(request.values.get('next')) or
self.check_safe_root(request.referrer) or
(self.fallback_endpoint and
self.check_safe_root(url_for(self.fallback_endpoint))) or
request.url_root
)
def check_safe_root(self, url):
if url is None:
return None
if self.safe_roots is None:
return url
if url.startswith(request.url_root) or url.startswith('/'):
# A URL inside the same app is deemed to always be safe
return url
for safe_root in self.safe_roots:
if url.startswith(safe_root):
return url
return None
def csrf_protect():
if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
referer = request.headers.get('Referer')
if referer is None or different_origin(referer, request.url_root):
raise Forbidden(description="Referer check failed.")
def required(self, price, **kwargs):
"""API route decorator to request payment for a resource.
This function stores the resource price in a closure. It will verify
the validity of a payment, and allow access to the resource if the
payment is successfully accepted.
"""
def decorator(fn):
"""Validates payment and returns the original API route."""
@wraps(fn)
def _fn(*fn_args, **fn_kwargs):
# Calculate resource cost
nonlocal price
_price = price(request, *fn_args, **fn_kwargs) if callable(price) else price
# Need better way to pass server url to payment methods (FIXME)
if 'server_url' not in kwargs:
url = urlparse(request.url_root)
kwargs.update({'server_url': url.scheme + '://' + url.netloc})
# Continue to the API view if payment is valid or price is 0
if _price == 0:
return fn(*fn_args, **fn_kwargs)
try:
contains_payment = self.contains_payment(_price, request.headers, **kwargs)
except BadRequest as e:
return Response(e.description, BAD_REQUEST)
if contains_payment:
return fn(*fn_args, **fn_kwargs)
else:
# Get headers for initial 402 response
payment_headers = {}
for method in self.allowed_methods:
payment_headers.update(method.get_402_headers(_price, **kwargs))
# Accessing the .files attribute of a request
# drains the input stream.
request.files
raise PaymentRequiredException(payment_headers)
return _fn
return decorator
def siteURL(config,request):
u = current_app.config.get('SITE_URL')
return u if u is not None else request.url_root[0:-1]
def page_not_found(error):
return render_template_string(generate_template(current_app.config,'error.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], path=request.path, entry=None, error="I'm sorry. I can't find that page.")
def send_doc(path):
siteURL = current_app.config.get('SITE_URL')
location = current_app.config.get('DOCS')
if location is None:
abort(404)
if location[0:4]=='http':
url = location + path
req = requests.get(url, stream = True,headers={'Connection' : 'close'})
if req.headers['Content-Type'][0:9]=='text/html':
return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=req.text, entry=None)
else:
return Response(stream_with_context(req.iter_content()), headers = dict(req.headers))
else:
dir = os.path.abspath(location)
if path.endswith('.html'):
glob = StringIO()
try:
with open(os.path.join(dir,path), mode='r', encoding='utf-8') as doc:
peeked = doc.readline()
if peeked.startswith('<!DOCTYPE'):
return send_from_directory(dir, path)
glob.write(peeked)
for line in doc:
glob.write(line)
return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=glob.getvalue(), entry=None)
except FileNotFoundError:
abort(404)
return send_from_directory(dir, path)
def ipxe_boot(node):
response = config_renderer.ipxe.render(node, request.url_root)
return Response(response, mimetype='text/plain')
def report(node):
if not node.maintenance_mode:
try:
node.active_config_version = int(request.args.get('version'))
except (ValueError, TypeError):
return abort(400)
if request.content_type != 'application/json':
return abort(400)
provision = models.Provision()
provision.node = node
provision.config_version = node.active_config_version
provision.ignition_config = request.data
if node.target_config_version == node.active_config_version:
provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
models.db.session.add(provision)
models.db.session.add(node)
node.disks.update({
models.Disk.wipe_next_boot: False
})
if node.cluster.are_etcd_nodes_configured:
node.cluster.assert_etcd_cluster_exists = True
models.db.session.add(node.cluster)
models.db.session.commit()
return Response('ok', mimetype='application/json')
def get_content(self):
packages = [P(self.node, request.url_root) for P in self.get_package_classes()]
files = list(itertools.chain.from_iterable(p.get_files() for p in packages))
units = list(itertools.chain.from_iterable(p.get_units() for p in packages))
networkd_units = list(itertools.chain.from_iterable(p.get_networkd_units() for p in packages))
ssh_keys = self.get_ssh_keys()
return {
'ignition': {
'version': '2.0.0',
'config': {},
},
'storage': self.get_storage_config(files),
'networkd': {
'units': networkd_units
},
'passwd': {
'users': [{
'name': 'root',
'sshAuthorizedKeys': ssh_keys,
}, {
'name': 'core',
'sshAuthorizedKeys': ssh_keys,
}],
},
'systemd': {
'units': units
},
}
def target_ipxe_config_view(self):
node = self.get_one(request.args.get('id'))
response = config_renderer.ipxe.render(node, request.url_root)
return Response(response, mimetype='text/plain')
def handle_cozmoImage():
if is_microsoft_browser(request):
return serve_single_image()
return flask_helpers.stream_video(streaming_video, request.url_root)
def login():
if g.auth:
return redirect(url_for("index"))
else:
query = {"sso": True,
"sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl,
"sso_p": SSO["SSO.PROJECT"],
"sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl))
}
SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl
logger.info("User request login to SSO: %s" %SSOLoginURL)
return redirect(SSOLoginURL)
def logout():
SSOLogoutURL = SSO.get("SSO.URL") + "/sso/?nextUrl=" + request.url_root.strip("/")
resp = make_response(redirect(SSOLogoutURL))
resp.set_cookie(key='logged_in', value='', expires=0)
resp.set_cookie(key='username', value='', expires=0)
resp.set_cookie(key='sessionId', value='', expires=0)
resp.set_cookie(key='time', value='', expires=0)
resp.set_cookie(key='Azone', value='', expires=0)
return resp
def atom():
feed = AtomFeed(title='Recent rules', feed_url=request.url, url=request.url_root, author='Spike',
icon=url_for('static', filename='favicon.ico'))
_rules = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).limit(15).all()
if _rules:
for rule in _rules:
feed.add(rule.msg, str(rule), updated=datetime.fromtimestamp(rule.timestamp), id=rule.sid)
return feed.get_response()
def authorize():
questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root))
user_authorization_url, state = questradeAPI.authorization_url(authorization_url)
session['oauth_state'] = state
return redirect(user_authorization_url)
def callback():
questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root), state=session['oauth_state'])
token = questradeAPI.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)
__set_session_token__(token)
return redirect(url_for('.token'))
def make_blog_feed(order=None, limit=15):
feed = AtomFeed(
title="{} - Blog Feed".format(settings.title),
subtitle=settings.tagline,
feed_url=request.url,
url=request.url_root,
author="Musharraf Omer",
icon=None,
logo=None,
rights="Copyright 2000-2016 - Mushy.ltd",
)
order = order or Post.publish_date.desc()
items = Post.query.order_by(order).limit(limit).all()
for item in items:
item.url = url_for('canella-blog.post', slug=item.slug)
feed.add(
title=item.title,
url=item.url,
content=make_summary(item.body),
content_type='html',
summary=item.meta_description,
updated=item.updated or item.created,
author="Musharraf Omer",
published=item.publish_date,
categories=[{'term': t.slug, 'label': t.title} for t in item.tags]
)
return feed
def delhistory():
if not str(request.referrer).startswith(request.url_root):
#app.logger.info("referer:", str(request.referrer))
return "CSRF ATTEMPT!"
os.unlink('ua-history.txt')
return "ok"
def _preemptive_unless(base_url=None, additional_unless=None):
if base_url is None:
base_url = request.url_root
disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \
or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \
or not (base_url.startswith("http://") or base_url.startswith("https://"))
recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no"
if callable(additional_unless):
return recording_disabled or disabled_for_root or additional_unless()
else:
return recording_disabled or disabled_for_root
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None):
if path is None:
path = request.path
if base_url is None:
base_url = request.url_root
d = dict(path=path,
base_url=base_url,
query_string="l10n={}".format(g.locale.language if g.locale else "en"))
if key != "_default":
d["plugin"] = key
# add data if we have any
if data is not None:
try:
if callable(data):
data = data()
if data:
if "query_string" in data:
data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"])
d.update(data)
except:
_logger.exception("Error collecting data for preemptive cache from plugin {}".format(key))
# add additional request data if we have any
if callable(additional_request_data):
try:
ard = additional_request_data()
if ard:
d.update(dict(
_additional_request_data=ard
))
except:
_logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key))
return d
def dict(self):
_id = str(self._id)
return {
"_id": _id,
"name": self.name,
"email": self.email,
"links": {
"self": "{}api/rsvps/{}".format(request.url_root, _id)
}
}
def set_webhook():
"""
Sets the BicingBot webhook in its Telegram Bot
:return: HTTP_RESPONSE with 200 OK status and a status message.
"""
response = 'Webhook configured'
if request.url_root.startswith('https'):
bot_response = get_bot().setWebhook('{}/bicingbot'.format(request.url_root))
logger.debug(bot_response)
else:
response = 'Bad webhook: https url must be provided for webhook'
logger.warn(response)
return response
def ImageUrlToFile(image_url):
"""images are stored as ad-213213.png in the db. We get them from the website as
/static/upload-debug/ad-213213.png so we trim them.
Returns True, filepath"""
BAD_RETURN = False, ''
prefix = request.url_root + "static/%s/" % app.config[Constants.KEY_UPLOAD_DIR]
if not image_url.startswith(prefix):
return BAD_RETURN
a = image_url.split('/')
if not a or (image_url != prefix + a[-1]):
return BAD_RETURN
return True, a[-1]
def FileToImageUrl(image_file):
return request.url_root + "static/%s/%s" % (app.config[Constants.KEY_UPLOAD_DIR], image_file)
def feeds_blogs():
"""Global feed generator for latest blogposts across all projects"""
@current_app.cache.cached(60*5)
def render_page():
feed = AtomFeed('Blender Cloud - Latest updates',
feed_url=request.url, url=request.url_root)
# Get latest blog posts
api = system_util.pillar_api()
latest_posts = Node.all({
'where': {'node_type': 'post', 'properties.status': 'published'},
'embedded': {'user': 1},
'sort': '-_created',
'max_results': '15'
}, api=api)
# Populate the feed
for post in latest_posts._items:
author = post.user.fullname
updated = post._updated if post._updated else post._created
url = url_for_node(node=post)
content = post.properties.content[:500]
content = '<p>{0}... <a href="{1}">Read more</a></p>'.format(content, url)
feed.add(post.name, str(content),
content_type='html',
author=author,
url=url,
updated=updated,
published=post._created)
return feed.get_response()
return render_page()
def get_next_url(self):
"""Returns the URL where we want to redirect to. This will
always return a valid URL.
"""
return (
self.check_safe_root(request.values.get('next')) or
self.check_safe_root(request.referrer) or
(self.fallback_endpoint and
self.check_safe_root(url_for(self.fallback_endpoint))) or
request.url_root
)
def check_safe_root(self, url):
if url is None:
return None
if self.safe_roots is None:
return url
if url.startswith(request.url_root) or url.startswith('/'):
# A URL inside the same app is deemed to always be safe
return url
for safe_root in self.safe_roots:
if url.startswith(safe_root):
return url
return None
def insert_bucket(project, conn):
""" Creates a new bucket.
Args:
project: A string specifying a project ID.
conn: An S3Connection instance.
Returns:
A JSON string representing a bucket.
"""
bucket_info = request.get_json()
# TODO: Do the following lookup and create under a lock.
if conn.lookup(bucket_info['name']) is not None:
return error('Sorry, that name is not available. '
'Please try a different one.', HTTP_CONFLICT)
index_bucket(bucket_info['name'], project)
conn.create_bucket(bucket_info['name'])
# The HEAD bucket request does not return creation_date. This is an
# inefficient way of retrieving it.
try:
bucket = next(bucket for bucket in conn.get_all_buckets()
if bucket.name == bucket_info['name'])
except StopIteration:
return error('Unable to find bucket after creating it.')
bucket_url = url_for('get_bucket', bucket_name=bucket.name)
response = {
'kind': 'storage#bucket',
'id': bucket.name,
'selfLink': request.url_root[:-1] + bucket_url,
'name': bucket.name,
'timeCreated': bucket.creation_date,
'updated': bucket.creation_date
}
return Response(json.dumps(response), mimetype='application/json')
def get_bucket(bucket_name, conn):
""" Returns metadata for the specified bucket.
Args:
bucket_name: A string specifying a bucket name.
conn: An S3Connection instance.
Returns:
A JSON string representing a bucket.
"""
projection = request.args.get('projection') or 'noAcl'
if projection != 'noAcl':
return error('projection: {} not supported.'.format(projection),
HTTP_NOT_IMPLEMENTED)
try:
bucket = next(bucket for bucket in conn.get_all_buckets()
if bucket.name == bucket_name)
except StopIteration:
return error('Not Found', HTTP_NOT_FOUND)
bucket_url = url_for('get_bucket', bucket_name=bucket.name)
response = {
'kind': 'storage#bucket',
'id': bucket.name,
'selfLink': request.url_root[:-1] + bucket_url,
'name': bucket.name,
'timeCreated': bucket.creation_date,
'updated': bucket.creation_date
}
return Response(json.dumps(response), mimetype='application/json')
def get(self, provider):
if provider == 'github':
return github.authorize(callback=request.url_root + 'api/v1/auth/callback/github')
raise ProviderInvalid(provider)
def _url_for(self, path):
return request.url_root + self.api_prefix + path
def index_discovery():
host = request.url_root
domain = request.headers['Host']
return """<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="appr-package" content="{domain}/{{name}} {host}/appr/api/v1/packages/{{name}}/pull">
</head>
<body>
</body>
</html>""".format(domain=domain, host=host)