Python flask.request 模块,path() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.path()。
def build_sitemap():
from redberry.models import RedPost, RedCategory
from apesmit import Sitemap
sm = Sitemap(changefreq='weekly')
for post in RedPost.all_published():
sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date())
for category in RedCategory.query.all():
sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date())
with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f:
sm.write(f)
flash("Sitemap created.", 'success')
return redirect(url_for('redberry.home'))
##############
# ADMIN ROUTES
##############
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN'])
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'application/json'
return response
def make_pagination_headers(limit, curpage, total, link_header=True):
"""Return Link Hypermedia Header."""
lastpage = int(math.ceil(1.0 * total / limit) - 1)
headers = {'X-Total-Count': str(total), 'X-Limit': str(limit),
'X-Page-Last': str(lastpage), 'X-Page': str(curpage)}
if not link_header:
return headers
base = "{}?%s".format(request.path)
links = {}
links['first'] = base % urlencode(dict(request.args, **{PAGE_ARG: 0}))
links['last'] = base % urlencode(dict(request.args, **{PAGE_ARG: lastpage}))
if curpage:
links['prev'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage - 1}))
if curpage < lastpage:
links['next'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage + 1}))
headers['Link'] = ",".join(['<%s>; rel="%s"' % (v, n) for n, v in links.items()])
return headers
# pylama:ignore=R0201
def iam_sts_credentials(api_version, requested_role, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
msg = "Role name {0} doesn't match expected role for container"
log.error(msg.format(requested_role))
return '', 404
log.debug('Providing assumed role credentials for {0}'.format(role_params['name']))
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
return jsonify(assumed_role)
def ip_whitelist_add(ip_to_allow, info_record_dict=None):
"""??ip????, ?????"""
if ip_to_allow in single_ip_allowed_set:
return
dbgprint('ip white added', ip_to_allow, 'info:', info_record_dict)
single_ip_allowed_set.add(ip_to_allow)
is_ip_not_in_allow_range.cache_clear()
append_ip_whitelist_file(ip_to_allow)
# dbgprint(single_ip_allowed_set)
try:
with open(zmirror_root(human_ip_verification_whitelist_log), 'a', encoding='utf-8') as fp:
fp.write(datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " " + ip_to_allow
+ " " + str(request.user_agent)
+ " " + repr(info_record_dict) + "\n")
except: # coverage: exclude
errprint('Unable to write log file', os.path.abspath(human_ip_verification_whitelist_log))
traceback.print_exc()
def check_auth(func):
"""
This decorator for routes checks that the user is authorized (or that no login is required).
If they haven't, their intended destination is stored and they're sent to get authorized.
It has to be placed AFTER @app.route() so that it can capture `request.path`.
"""
if 'login' not in conf:
return func
# inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required>
@functools.wraps(func)
def decorated_view(*args, **kwargs):
if current_user.is_anonymous:
print('unauthorized user visited {!r}'.format(request.path))
session['original_destination'] = request.path
return redirect(url_for('get_authorized'))
print('{} visited {!r}'.format(current_user.email, request.path))
assert current_user.email.lower() in conf.login['whitelist'], current_user
return func(*args, **kwargs)
return decorated_view
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def convert(content_type, content):
cachekey = request.path
cachevalue = cache.get(cachekey)
if cachevalue is None:
content = decode(content)
content_type = decode(content_type).decode()
if content_type.startswith('image/svg+xml'):
content = __svg2png(content)
content_type = 'image/png'
cache.set(cachekey, (content, content_type), timeout=CACHE_RETENTION)
else:
content = cachevalue[0]
content_type = cachevalue[1]
response = make_response(content)
response.content_type = content_type
response.cache_control.max_age = CACHE_RETENTION
return response
def init_utils(app):
app.jinja_env.filters['unix_time'] = unix_time
app.jinja_env.filters['unix_time_millis'] = unix_time_millis
app.jinja_env.filters['long2ip'] = long2ip
app.jinja_env.globals.update(pages=pages)
app.jinja_env.globals.update(can_register=can_register)
app.jinja_env.globals.update(mailserver=mailserver)
app.jinja_env.globals.update(ctf_name=ctf_name)
@app.context_processor
def inject_user():
if authed():
return dict(session)
return dict()
@app.before_request
def needs_setup():
if request.path == '/setup' or request.path.startswith('/static'):
return
if not is_setup():
return redirect('/setup')
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def _atlassian_jwt_post_token(self):
if not getattr(g, 'ac_client', None):
return dict()
args = request.args.copy()
try:
del args['jwt']
except KeyError:
pass
signature = encode_token(
'POST',
request.path + '?' + urlencode(args),
g.ac_client.clientKey,
g.ac_client.sharedSecret)
args['jwt'] = signature
return dict(atlassian_jwt_post_url=request.path + '?' + urlencode(args))
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User Browser: %s
User Browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def log_request(code='-'):
proto = request.environ.get('SERVER_PROTOCOL')
msg = request.method + ' ' + request.path + ' ' + proto
code = str(code)
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
logger.info('%s - - [%s] "%s" %s', request.remote_addr, log_date_time_string(), msg, code)
def authenticated(fn):
"""Mark a route as requiring authentication."""
@wraps(fn)
def decorated_function(*args, **kwargs):
if not session.get('is_authenticated'):
return redirect(url_for('login', next=request.url))
if request.path == '/logout':
return fn(*args, **kwargs)
if (not session.get('name') or
not session.get('email') or
not session.get('institution')) and request.path != '/profile':
return redirect(url_for('profile', next=request.url))
return fn(*args, **kwargs)
return decorated_function
def __call__(self, func):
@wraps(func)
def decorator(*args, **kwargs):
response = func(*args, **kwargs)
if not USE_CACHE:
return response
for path in self.routes:
route = '%s,%s,%s' % (path, json.dumps(self.args), None)
Cache.delete(unicode(route))
user = kwargs.get('user', None) if self.clear_for_user else None
Logger.debug(unicode('clear %s from cache' % (route)))
if user:
route = '%s,%s,%s' % (path, json.dumps(self.args), user)
Cache.delete(unicode(route))
Logger.debug(unicode('clear %s from cache' % (route)))
return response
return decorator
def sitemap():
conn,curr = sphinx_conn()
querysql='SELECT info_hash,create_time FROM film order by create_time desc limit 100'
curr.execute(querysql)
rows=curr.fetchall()
sphinx_close(curr,conn)
sitemaplist=[]
for row in rows:
info_hash = row['info_hash']
mtime = datetime.datetime.fromtimestamp(int(row['create_time'])).strftime('%Y-%m-%d')
url = domain+'hash/{}.html'.format(info_hash)
url_xml = '<url><loc>{}</loc><lastmod>{}</lastmod><changefreq>daily</changefreq><priority>0.8</priority></url>'.format(url, mtime)
sitemaplist.append(url_xml)
xml_content = '<?xml version="1.0" encoding="UTF-8"?><urlset>{}</urlset>'.format("".join(x for x in sitemaplist))
with open('static/sitemap.xml', 'wb') as f:
f.write(xml_content)
f.close()
return send_from_directory(app.static_folder, request.path[1:])
def chan_friends(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/friend
with <channel> replaced for the channel of the friends you want to get
<channel> can either be an int that matches the channel, or a string
that matches the owner's username
"""
# model = request.path.split("/")[-1]
model = "Friend"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
data=results,
fields=fields
)
return make_response(jsonify(packet), code)
# There was an error!
# if not str(code).startswith("2"):
# return make_response(jsonify(packet), code)
# NOTE: Not needed currently, but this is how you would check
# TODO: Fix this endpoint to remove timing elements (friends are forever)
# TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/messages
with <channel> replaced for the messages you want to get
"""
model = "Message"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_quotes(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/quote
with <channel> replaced for the channel you want to get quotes for
"""
model = "quote"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"owner": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_commands(channel):
"""
If you GET this endpoint, simply go to /api/v1/channel/<channel>/command
with <channel> replaced for the channel you want to get commands for
"""
model = "Command"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"channelName": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def after_request_log(response):
name = dns_resolve(request.remote_addr)
current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
Request: {method} {path}
Version: {http}
Status: {status}
Url: {url}
IP: {ip}
Hostname: {host}
Agent: {agent_platform} | {agent_browser} | {agent_browser_version}
Raw Agent: {agent}
""".format(method=request.method,
path=request.path,
url=request.url,
ip=request.remote_addr,
host=name if name is not None else '?',
agent_platform=request.user_agent.platform,
agent_browser=request.user_agent.browser,
agent_browser_version=request.user_agent.version,
agent=request.user_agent.string,
http=request.environ.get('SERVER_PROTOCOL'),
status=response.status))
return response
def request_start():
content_type = request.headers.get('Accept') or ''
real_ip = request.headers.get('X-Real-Ip') or ''
Log.info(request.path+' '+format_args(request.args)\
+' '+real_ip\
+' '+content_type)
#Test content_type
# if content_type and content_type not in AVAILABLE_CONTENT_TYPES:
# results = {'message' : 'Content-Type not supported',
# 'message_code' : 8
# }
# return {'error' : 'content-type'}
# return self.render(results, status_code = 405)
def cached(key='view{path}'):
""" Sets up a function to have cached results """
def decorator(func):
""" Gets a cached value or calculates one """
@wraps(func)
def decorated_function(*args, **kwargs):
ckey = cache_key(key, *args, **kwargs)
value = cache.get(ckey)
if value is not None:
return value
print("DB HIT: \"{}\"".format(ckey))
value = func(*args, **kwargs)
cache.set(ckey, value)
return value
return decorated_function
return decorator
def post(year, month, day, post_name):
rel_url = request.path[len('/post/'):]
fixed_rel_url = storage.fix_post_relative_url(rel_url)
if rel_url != fixed_rel_url:
return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect
post_ = storage.get_post(rel_url, include_draft=False)
if post_ is None:
abort(404)
post_d = post_.to_dict()
del post_d['raw_content']
post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
post_d['url'] = make_abs_url(post_.unique_key)
post_ = post_d
return custom_render_template(post_['layout'] + '.html', entry=post_)
def entity(entity_id):
entity = load_entity(entity_id, request.auth)
if entity is None:
raise NotFound()
# load possible duplicates via fingerprint expansion
# fingerprints = expand_fingerprints(entity.fingerprints, request.auth)
query = Query(request.args, prefix='duplicates_', path=request.path,
limit=5)
# query.add_facet('countries', 'Countries', country_label)
duplicates = search_duplicates(entity.id, entity.fingerprints, query,
request.auth)
# load links
query = Query(request.args, prefix='links_', path=request.path,
limit=10)
query.add_facet('schemata', 'Types', link_schema_label)
query.add_facet('remote.countries', 'Countries', country)
links = search_links(entity, query, request.auth)
return render_template("entity.html", entity=entity, links=links,
duplicates=duplicates)
def store_form(form):
entry = FormEntry(form_id=g.page.id)
for f in form:
field = Field.query.filter_by(form_id=g.page.id).filter_by(name=f.name).one_or_none()
if field is None:
continue
field_entry = FieldEntry(field_id=field.id)
data = f.data
if field.type == 'file_input':
file_data = request.files[field.name]
filename = '%s-%s-%s.%s' %(field.name, date_stamp(), str(time.time()).replace('.', ''), os.path.splitext(file_data.filename)[-1])
path = os.path.join(app.config['FORM_UPLOADS_PATH'], secure_filename(filename))
file_data.save(path)
data = filename
field_entry.value = data
db.session.add(field_entry)
entry.fields.append(field_entry)
db.session.add(entry)
db.session.commit()
def route():
args = request.json
args = args if args else {}
cfg = args.get('cfg', None)
log_request(
args.get('user', 'unknown'),
args.get('hostname', 'unknown'),
request.remote_addr,
request.method,
request.path,
args)
try:
endpoint = create_endpoint(request.method, cfg, args)
json, status = endpoint.execute()
except AutocertError as ae:
status = 500
json = dict(errors={ae.name: ae.message})
return make_response(jsonify(json), status)
if not json:
raise EmptyJsonError(json)
return make_response(jsonify(json), status)
def _create_etag(path, recursive, lm=None):
if lm is None:
lm = _create_lastmodified(path, recursive)
if lm is None:
return None
hash = hashlib.sha1()
hash.update(str(lm))
hash.update(str(recursive))
if path.endswith("/files") or path.endswith("/files/sdcard"):
# include sd data in etag
hash.update(repr(sorted(printer.get_sd_files(), key=lambda x: x[0])))
return hash.hexdigest()
def get_full_path(ver):
""" L2-L4 Path Query, use onePath for single paths """
onepath = False
depth = '20'
error = version_chk(ver, ['v1.0', 'v1.1', 'v2'])
if error:
return error
if 'onepath' in request.args:
if request.args['onepath'] == "True":
onepath = True
if 'depth' in request.args:
depth = request.args['depth']
try:
return jsonify(upgrade_api(nglib.query.path.get_full_path(request.args['src'], \
request.args['dst'], {"onepath": onepath, "depth": depth}), ver))
except ResultError as e:
return jsonify(errors.json_error(e.expression, e.message))
def get_routed_path(ver):
""" Routed Paths, accepts vrf """
onepath = False
depth = '20'
vrf = 'default'
error = version_chk(ver, ['v1.0', 'v1.1', 'v2'])
if error:
return error
if 'onepath' in request.args:
if request.args['onepath'] == "True":
onepath = True
if 'depth' in request.args:
depth = request.args['depth']
if 'vrf' in request.args:
vrf = request.args['vrf']
try:
return jsonify(upgrade_api(nglib.query.path.get_routed_path(request.args['src'], \
request.args['dst'], {"onepath": onepath, "depth": depth, \
"VRF": vrf}), ver))
except ResultError as e:
return jsonify(errors.json_error(e.expression, e.message))
def render_redberry(template_name, **kwargs):
# For Accelerated Mobile Pages (AMP, ref: https://www.ampproject.org) use templates with a .amp suffix
if '.amp' in request.path:
kwargs['render_amp'] = True
return render_template(template_name, **kwargs)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
if request.method == 'POST' :
if not is_authenticated():
abort(401)
else:
return func(*args, **kwargs)
else:
if not is_authenticated():
return redirect("/login/" + "?next=" + request.path)
else:
return func(*args, **kwargs)
return wrapper
def image_list(masterip):
data = {
"user": session['username']
}
# path = request.path[:request.path.rfind("/")]
# path = path[:path.rfind("/")+1]
result = dockletRequest.post("/image/list/", data, masterip)
logger.debug("image" + str(type(result)))
return json.dumps(result)
def monitor_request(comid,infotype,masterip):
data = {
"user": session['username']
}
path = request.path[request.path.find("/")+1:]
path = path[path.find("/")+1:]
path = path[path.find("/")+1:]
logger.debug(path + "_____" + masterip)
result = dockletRequest.post("/monitor/"+path, data, masterip)
logger.debug("monitor" + str(type(result)))
return json.dumps(result)
def monitor_user_request(issue,masterip):
data = {
"user": session['username']
}
path = "/monitor/user/" + str(issue) + "/"
logger.debug(path + "_____" + masterip)
result = dockletRequest.post(path, data, masterip)
logger.debug("monitor" + str(type(result)))
return json.dumps(result)
def jupyter_prefix():
path = request.args.get('next')
if path == None:
return redirect('/login/')
return redirect('/login/'+'?next='+path)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
global G_usermgr
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
return json.dumps({'success':'false', 'message':'user or key is null'})
cur_user = G_usermgr.auth_token(token)
if (cur_user == None):
return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(cur_user, cur_user.username, request.form, *args, **kwargs)
return wrapper
def get_request_information():
"""
Returns a dictionary of contextual information about the user at the time of logging.
Returns:
The dictionary.
"""
information = {}
if has_request_context():
information["request"] = {
"api_endpoint_method": request.method,
"api_endpoint": request.path,
"ip": request.remote_addr,
"platform": request.user_agent.platform,
"browser": request.user_agent.browser,
"browser_version": request.user_agent.version,
"user_agent":request.user_agent.string
}
if api.auth.is_logged_in():
user = api.user.get_user()
team = api.user.get_team()
groups = api.team.get_groups()
information["user"] = {
"username": user["username"],
"email": user["email"],
"team_name": team["team_name"],
"groups": [group["name"] for group in groups]
}
return information
def teacher_session():
if '/teacher/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_teacher']:
return flask.redirect(flask.url_for('register'))
def student_session():
if '/student/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_student']:
return flask.redirect(flask.url_for('register'))
# make sure user is authenticated w/ live session on every request
def manage_session():
# want to go through oauth flow for this route specifically
# not get stuck in redirect loop
if request.path == '/oauth/callback':
return
# allow all users to visit the index page without a session
if request.path == '/' or request.path == '/oauth/logout':
return
# validate that user has valid session
# add the google user info into session
if 'credentials' not in flask.session:
flask.session['redirect'] = request.path
return flask.redirect(flask.url_for('oauth2callback'))
def iam_role_info(api_version, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr)
if role_params_from_ip['name']:
log.debug('Providing IAM role info for {0}'.format(role_params_from_ip['name']))
return jsonify(roles.get_role_info_from_params(role_params_from_ip))
else:
log.error('Role name not found; returning 404.')
return '', 404
def iam_role_name(api_version):
if not _supports_iam(api_version):
return passthrough(request.path)
role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr)
if role_params_from_ip['name']:
return role_params_from_ip['name']
else:
log.error('Role name not found; returning 404.')
return '', 404
def load_ip_whitelist_file():
"""?????ip???"""
set_buff = set()
if os.path.exists(zmirror_root(human_ip_verification_whitelist_file_path)):
with open(zmirror_root(human_ip_verification_whitelist_file_path), 'r', encoding='utf-8') as fp:
set_buff.add(fp.readline().strip())
return set_buff
def response_cookies_deep_copy():
"""
It's a BAD hack to get RAW cookies headers, but so far, we don't have better way.
We'd go DEEP inside the urllib's private method to get raw headers
raw_headers example:
[('Cache-Control', 'private'),
('Content-Length', '48234'),
('Content-Type', 'text/html; Charset=utf-8'),
('Server', 'Microsoft-IIS/8.5'),
('Set-Cookie','BoardList=BoardID=Show; expires=Mon, 02-May-2016 16:00:00 GMT; path=/'),
('Set-Cookie','aspsky=abcefgh; expires=Sun, 24-Apr-2016 16:00:00 GMT; path=/; HttpOnly'),
('Set-Cookie', 'ASPSESSIONIDSCSSDSSQ=OGKMLAHDHBFDJCDMGBOAGOMJ; path=/'),
('X-Powered-By', 'ASP.NET'),
('Date', 'Tue, 26 Apr 2016 12:32:40 GMT')]
"""
raw_headers = parse.remote_response.raw._original_response.headers._headers
header_cookies_string_list = []
for name, value in raw_headers:
if name.lower() == 'set-cookie':
if my_host_scheme == 'http://':
value = value.replace('Secure;', '')
value = value.replace(';Secure', ';')
value = value.replace('; Secure', ';')
if 'httponly' in value.lower():
if enable_aggressive_cookies_path_rewrite:
# ??cookie path??, ???path???? /
value = regex_cookie_path_rewriter.sub('path=/;', value)
elif enable_aggressive_cookies_path_rewrite is not None:
# ??HttpOnly Cookies?path???url?
# eg(/extdomains/a.foobar.com): path=/verify; -> path=/extdomains/a.foobar.com/verify
if parse.remote_domain not in domain_alias_to_target_set: # do not rewrite main domains
value = regex_cookie_path_rewriter.sub(
'\g<prefix>=/extdomains/' + parse.remote_domain + '\g<path>', value)
header_cookies_string_list.append(value)
return header_cookies_string_list
def assemble_parse():
"""??????URL???????????URL"""
_temp = decode_mirror_url()
parse.remote_domain = _temp['domain'] # type: str
parse.is_https = _temp['is_https'] # type: bool
parse.remote_path = _temp['path'] # type: str
parse.remote_path_query = _temp['path_query'] # type: str
parse.is_external_domain = is_external_domain(parse.remote_domain)
parse.remote_url = assemble_remote_url() # type: str
parse.url_no_scheme = parse.remote_url[parse.remote_url.find('//') + 2:] # type: str
recent_domains[parse.remote_domain] = True # ?????????
dbgprint('after assemble_parse, url:', parse.remote_url, ' path_query:', parse.remote_path_query)
def delete(short_name):
(project, owner, n_tasks,
n_task_runs, overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
title = project_title(project, "Delete")
ensure_authorized_to('read', project)
ensure_authorized_to('delete', project)
pro = pro_features()
project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user)
if request.method == 'GET':
response = dict(template='/projects/delete.html',
title=title,
project=project_sanitized,
owner=owner_sanitized,
n_tasks=n_tasks,
overall_progress=overall_progress,
last_activity=last_activity,
pro_features=pro,
csrf=generate_csrf())
return handle_content_type(response)
######### this block was edited by shruthi
if("directory_names" in project.info.keys()):
for i in project.info["directory_names"]:
if os.path.exists(i):
shutil.rmtree(i)#deleting the actual folder
project_repo.delete(project)
########## end block
auditlogger.add_log_entry(project, None, current_user)
flash(gettext('Project deleted!'), 'success')
return redirect_content_type(url_for('account.profile', name=current_user.name))
def _check_if_redirect_to_password(project):
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.password_needed(project, get_user_id_or_ip()):
return redirect(url_for('.password_required',
short_name=project.short_name, next=request.path))
def ratelimit(limit, per, send_x_headers=True,
scope_func=lambda: request.remote_addr,
key_func=lambda: request.endpoint,
path=lambda: request.path):
"""
Decorator for limiting the access to a route.
Returns the function if within the limit, otherwise TooManyRequests error
"""
def decorator(f):
@wraps(f)
def rate_limited(*args, **kwargs):
try:
key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
rlimit = RateLimit(key, limit, per, send_x_headers)
g._view_rate_limit = rlimit
#if over_limit is not None and rlimit.over_limit:
if rlimit.over_limit:
raise TooManyRequests
return f(*args, **kwargs)
except Exception as e:
return error.format_exception(e, target=path(),
action=f.__name__)
return update_wrapper(rate_limited, f)
return decorator