Python flask.request 模块,method() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.method()。
def query():
if(request.method == 'POST'):
organisation = request.form['organisation']
email_address = request.form['email_address']
filename = organisation + ".html"
info = db.session.query(User.github_username, User.name).filter_by(organisation = organisation).all()
if(info == []):
job = q.enqueue_call(
func="main.save_info", args=(organisation, email_address, ), result_ttl=5000, timeout=600
)
flash("We shall notify you at " + email_address + " when the processing is complete")
else:
lists = []
for i in info:
lists.append([str(i.github_username), str(i.name)])
get_nodes.creating_objs(lists, organisation)
return render_template(filename, organisation=str(organisation)+'.json')
return render_template('query.html')
def is_hot_dog():
if request.method == 'POST':
if not 'file' in request.files:
return jsonify({'error': 'no file'}), 400
# Image info
img_file = request.files.get('file')
img_name = img_file.filename
mimetype = img_file.content_type
# Return an error if not a valid mimetype
if not mimetype in valid_mimetypes:
return jsonify({'error': 'bad-type'})
# Write image to static directory and do the hot dog check
img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
hot_dog_conf = rekognizer.get_confidence(img_name)
# Delete image when done with analysis
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
is_hot_dog = 'false' if hot_dog_conf == 0 else 'true'
return_packet = {
'is_hot_dog': is_hot_dog,
'confidence': hot_dog_conf
}
return jsonify(return_packet)
def add():
if request.method == 'POST':
if request.form['submit'] == 'Add':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'a', encoding="utf-8")
f.write(addr.decode('utf-8') + u'\r\n')
f.close()
return render_template('listadded.html',addr=addr)
elif request.form['submit'] == 'Remove':
addr = request.form['addr'].lstrip().rstrip()
f = io.open('blastlist.txt', 'r', encoding="utf-8")
lines = f.readlines()
f.close()
f = io.open('blastlist.txt', 'w', encoding="utf-8")
for line in lines:
if addr not in line:
f.write(line.decode('utf-8'))
f.close()
return render_template('listremoved.html',addr=addr)
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format, so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username, please try again.")
return render_template("login.html")
def main_teacher():
tm = teachers_model.Teachers(flask.session['id'])
if request.method == 'POST':
cm = courses_model.Courses()
if "close" in request.form.keys():
cid = request.form["close"]
cm.cid = cid
cm.close_session(cm.get_active_session())
elif "open" in request.form.keys():
cid = request.form["open"]
cm.cid = cid
cm.open_session()
courses = tm.get_courses_with_session()
empty = True if len(courses) == 0 else False
context = dict(data=courses)
return render_template('main_teacher.html', empty=empty, **context)
def add_backnode():
if request.method == 'POST':
node_name = request.form['node_name']
ftp_ip = request.form['ftp_ip']
ftp_port = request.form['ftp_port']
ftp_user = request.form['ftp_user']
ftp_pass = request.form['ftp_pass']
back_node = backhosts.query.filter(or_(backhosts.host_node==node_name,backhosts.ftp_ip==ftp_ip)).all()
if back_node:
flash(u'%s ?????????????!' %node_name)
return render_template('addbacknode.html')
backhost = backhosts(host_node=node_name,ftp_ip=ftp_ip,ftp_port=ftp_port,ftp_user=ftp_user,ftp_pass=ftp_pass)
db.session.add(backhost)
db.session.commit()
flash(u'%s ??????!' %node_name)
return render_template('addbacknode.html')
else:
return render_template('addbacknode.html')
def customer():
if request.method == 'POST':
try:
customer_id = request.form['customer_id']
customer_oper = request.form['customer_oper']
customer = customers.query.filter_by(id=customer_id).first()
if customer_oper == 'stop_back':
customer.customers_status = 1
else:
customer.customers_status = 0
db.session.add(customer)
db.session.commit()
return u"???????"
except Exception, e:
print e
return u"???????"
else:
customer_all = customers.query.all()
return render_template('customers.html',customers=customer_all)
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def adduser():
if request.method == 'POST':
if request.form['pass1'] == request.form['pass2']:
if user_exists(request.form['username']) == False:
# create the user
admin = 0
if request.form['status'] == 'admin':
admin = 1
create_user(request.form['username'], request.form['pass1'], admin)
log_action(session['uid'], 10)
flash(request.form['username'] + ' has been created.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The username you entered is already in use.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('adduser.html', acp=session['priv'], username=session['username'])
return render_template('adduser.html', acp=session['priv'], username=session['username'])
def login():
"""Login POST handler.
Only runs when ``/login`` is hit with a POST method. There is no GET method
equivilent, as it is handled by the navigation template. Sets the status
code to ``401`` on login failure.
Returns:
JSON formatted output describing success or failure.
"""
log.debug("Entering login, attempting to authenticate user.")
username = request.form['signin_username']
password = request.form['signin_password']
log.debug("Username: {0}".format(username))
if fe.check_auth(username, password):
log.debug("User authenticated. Trying to set session.")
session_id = fe.set_login_id()
session['logged_in'] = session_id
log.debug("Session ID: {0}, returning to user".format(session_id))
return jsonify({ "login": "success" })
log.debug("Username or password not recognized, sending 401.")
response.status = 401
return jsonify({ "login": "failed" })
def json_query_simple(query, query_args=[], empty_ok=False):
"""Do a SQL query that selects one column and dump those values as
a JSON array"""
if request.method != 'GET':
return not_allowed()
try:
cursor = dbcursor_query(query, query_args)
except Exception as ex:
return error(str(ex))
if cursor.rowcount == 0:
cursor.close()
if empty_ok:
return json_response([])
else:
return not_found()
result = []
for row in cursor:
result.append(row[0])
cursor.close()
return json_response(result)
def json_query(query, query_args=[], name = 'name', single = False):
"""Do a SQL query that selects one column containing JSON and dump
the results, honoring the 'expanded' and 'pretty' arguments. If
the 'single' argument is True, the first-returned row will be
returned as a single item instead of an array."""
if request.method != 'GET':
return not_allowed()
try:
cursor = dbcursor_query(query, query_args)
except Exception as ex:
return error(str(ex))
if single and cursor.rowcount == 0:
cursor.close()
return not_found()
result = []
for row in cursor:
this = base_url(None if single else row[0][name])
row[0]['href'] = this
result.append( row[0] if single or is_expanded() else this)
cursor.close()
return json_response(result[0] if single else result)
def new():
form = ProjectForm(request.form)
if request.method == 'POST' and form.validate():
user_repo_path = join('repos', form.name.data)
if os.path.isdir(user_repo_path):
flash(_('This project name already exists'), 'error')
else:
project = Project(form.name.data, current_user)
db.session.add(project)
db.session.commit()
#project.create_project(form.name.data, current_user)
flash(_('Project created successfuly!'), 'info')
return redirect(url_for('branches.view',
project=form.name.data,
branch='master', filename='index'))
return render_template('new.html', form=form)
def html2rst():
if request.method == 'POST':
if request.form.has_key('content'):
input = request.form.get('content')
if not input:
input = 'undefined'
if input != 'undefined':
try:
converted = html2rest(input)
prefetch = None
except:
converted = None
prefetch = input
return render_template('html2rst.html', converted=converted,
prefetch=prefetch)
return render_template('html2rst.html')
def request_validate(view):
@wraps(view)
def wrapper(*args, **kwargs):
endpoint = request.endpoint.partition('.')[-1]
# data
method = request.method
if method == 'HEAD':
method = 'GET'
locations = validators.get((endpoint, method), {})
data_type = {"json": "request_data", "args": "request_args"}
for location, schema in locations.items():
value = getattr(request, location, MultiDict())
validator = FlaskValidatorAdaptor(schema)
result = validator.validate(value)
LOG.info("Validated request %s: %s" % (location, result))
if schema.get("maxProperties") == 0:
continue
else:
kwargs[data_type[location]] = result
context = request.environ['context']
return view(*args, context=context, **kwargs)
return wrapper
def update_content_in_local_cache(url, content, method='GET'):
"""?? local_cache ??????, ??content
?stream?????"""
if local_cache_enable and method == 'GET' and cache.is_cached(url):
info_dict = cache.get_info(url)
resp = cache.get_obj(url)
resp.set_data(content)
# ???????????content?, without_content ????true
# ?????????, ???content????, ????????
# ?stream???, ??????http?, ???????, ????????????????
# ?????????????????????, ???????????????
info_dict['without_content'] = False
if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content))
cache.put_obj(
url,
resp,
obj_size=len(content),
expires=get_expire_from_mime(parse.mime),
last_modified=info_dict.get('last_modified'),
info_dict=info_dict,
)
def request_remote_site():
"""
???????(high-level), ????404/500??? domain_guess ??
"""
# ????????
# ??: ?zmirror?????????, ??????????????
parse.remote_response = send_request(
parse.remote_url,
method=request.method,
headers=parse.client_header,
data=parse.request_data_encoded,
)
if parse.remote_response.url != parse.remote_url:
warnprint("requests's remote url", parse.remote_response.url,
'does no equals our rewrited url', parse.remote_url)
if 400 <= parse.remote_response.status_code <= 599:
# ??url????????
dbgprint("Domain guessing for", request.url)
result = guess_correct_domain()
if result is not None:
parse.remote_response = result
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def publish(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
#### shruthi
if("sched" in project.info.keys() and project.info["sched"]=="FRG"):
if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)):
flash("You did not created quiz.Please create the quiz","danger")
return redirect(url_for('quiz.create_quiz', short_name=project.short_name))
#### end
pro = pro_features()
ensure_authorized_to('publish', project)
if request.method == 'GET':
return render_template('projects/publish.html',
project=project,
pro_features=pro)
project.published = True
project_repo.save(project)
task_repo.delete_taskruns_from_project(project)
result_repo.delete_results_from_project(project)
webhook_repo.delete_entries_from_project(project)
auditlogger.log_event(project, current_user, 'update', 'published', False, True)
flash(gettext('Project published! Volunteers will now be able to help you!'))
return redirect(url_for('.details', short_name=project.short_name))
def users(user_id=None):
"""Manage users of PYBOSSA."""
form = SearchForm(request.body)
users = [user for user in user_repo.filter_by(admin=True)
if user.id != current_user.id]
if request.method == 'POST' and form.user.data:
query = form.user.data
found = [user for user in user_repo.search_by_name(query)
if user.id != current_user.id]
[ensure_authorized_to('update', found_user) for found_user in found]
if not found:
flash("<strong>Ooops!</strong> We didn't find a user "
"matching your query: <strong>%s</strong>" % form.user.data)
response = dict(template='/admin/users.html', found=found, users=users,
title=gettext("Manage Admin Users"),
form=form)
return handle_content_type(response)
response = dict(template='/admin/users.html', found=[], users=users,
title=gettext("Manage Admin Users"), form=form)
return handle_content_type(response)
def new_item(category):
if request.method == 'GET':
return render('newitem.html', category=category)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
user_id = login_session['user_id']
item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id)
session.add(item)
session.commit()
flash("Newed item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('newitem.html', category=category, name=name, highlight=highlight, url=url,
error=error)
def edit_item(category, item):
if request.method == 'GET':
return render('edititem.html', category=category, item=item, name=item.name, highlight=item.highlight,
url=item.url)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
item.name = name
item.highlight = highlight
item.url = url
session.commit()
flash("Edited item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('edititem.html', category=category, item=item, name=name, highlight=highlight,
url=url, error=error)
def new_category():
if request.method == 'GET':
return render('newcategory.html')
elif request.method == 'POST':
name = request.form['name']
description = request.form['description']
if valid_category(name, description):
user_id = login_session['user_id']
category = Category(name=name, description=description, user_id=user_id)
session.add(category)
session.commit()
flash("Newed category %s!" % category.name)
return redirect(url_for("show_items", category_id=category.id))
else:
error = "Complete info please!"
return render('newcategory.html', name=name, description=description, error=error)
def edit_category(category):
if request.method == 'GET':
return render('editcategory.html', category=category, name=category.name,
description=category.description)
elif request.method == 'POST':
name = request.form['name']
description = request.form['description']
if valid_category(name, description):
category.name = name
category.description = description
session.commit()
flash("Edited category %s!" % category.name)
return redirect(url_for("show_items", category_id=category.id))
else:
error = "Complete info please!"
return render('editcategory.html', category=category, name=name, description=description, error=error)
def modify_blog_posts(id):
""" RESTful routes for fetching, updating, or deleting a specific blog post """
post = BlogPost.query.get(id)
if not post:
return jsonify({"message": "No blog post found with id " + str(id)}), 404
if request.method == 'GET':
return post
elif request.method == 'PUT':
data = request.form
post.content = data["blog_content"]
db.session.add(post)
db.session.commit()
return jsonify({"message": "Success!"}), 200
else:
db.session.delete(post)
db.session.commit()
return jsonify({"message": "Success!"}), 200
def userlist():
u=User()
form=AddUserForm()
flag=current_user.is_administrator(g.user)
if flag is True:
userlist=u.GetUserList()
jsondata=request.get_json()
if request.method == 'POST' and jsondata:
if jsondata['action'] == u'edit':
username=jsondata['username']
location=url_for('.admin_edit_profile',username=username)
return jsonify({"status":302,"location":location})
else:
username=jsondata['username']
u.RemUser(username)
return redirect('userlist')
elif request.method == 'POST' and form.validate():
pwd=u.GetPassword(g.user)
if u.verify_password(form.oripassword.data):
u.AddUser(form.username.data,form.password.data,form.role.data,form.email.data)
return redirect('userlist')
else:
return render_template('userlist.html',userlist=userlist,form=form)
else:
abort(403)
def edit_profile():
form=EditProfileForm()
u=User()
if request.method == 'POST' and form.validate():
pwd=u.GetPassword(g.user)
if u.verify_password(form.oripassword.data):
email=form.email.data
aboutme=form.about_me.data
if form.password.data is not u'':
u.ChangePassword(g.user,form.password.data)
u.ChangeProfile(g.user,email,aboutme)
flash('??????')
return redirect(url_for('.user',username=g.user))
else:
flash('????????')
u.GetUserInfo(g.user)
form.email.data=u.email
form.about_me.data=u.aboutme
return render_template('edit_profile.html',form=form,u=u)
def nodes_node(name):
name = serializer.loads(name)
node = NodeDefender.db.node.get(name)
if request.method == 'GET':
return render_template('frontend/nodes/node.html', Node = node)
if icpeform.Submit.data and icpeform.validate_on_submit():
icpe.alias = BasicForm.alias.data
icpe.comment = BasicForm.comment.data
elif locationform.Submit.data and locationform.validate_on_submit():
icpe.location.street = AddressForm.street.data
icpe.location.city = AddressForm.city.data
icpe.location.geolat = AddressForm.geolat.data
icpe.location.geolong = AddressForm.geolong.data
db.session.add(icpe)
db.session.commit()
return render_template('frontend/nodes/node.html', Node = node)
def login():
if request.method == 'GET':
return redirect(url_for('auth_view.authenticate'))
login_form = LoginForm()
if login_form.validate() and login_form.email.data:
user = NodeDefender.db.user.get(login_form.email.data)
if user is None:
flash('Email or Password Wrong', 'error')
return redirect(url_for('auth_view.login'))
if not user.verify_password(login_form.password.data):
flash('Email or Password Wrong', 'error')
return redirect(url_for('auth_view.login'))
if not user.enabled:
flash('Account Locked', 'error')
return redirect(url_for('auth_view.login'))
if login_form.remember():
login_user(user, remember = True)
else:
login_user(user)
return redirect(url_for('index'))
def register():
if request.method == 'GET':
return redirect(url_for('auth_view.authenticate'))
register_form = RegisterForm()
if register_form.validate() and register_form.email.data:
email = register_form.email.data
firstname = register_form.firstname.data
lastname = register_form.lastname.data
NodeDefender.db.user.create(email, firstname, lastname)
NodeDefender.db.user.enable(email)
NodeDefender.db.user.set_password(email, register_form.password.data)
NodeDefender.mail.user.confirm_user(email)
flash('Register Successful, please login', 'success')
else:
flash('Error doing register, please try again', 'error')
return redirect(url_for('auth_view.authenticate'))
flash('error', 'error')
return redirect(url_for('auth_view.authenticate'))
def register_token(token):
user = NodeDefender.db.user.get(serializer.loads_salted(token))
if user is None:
flash('Invalid Token', 'error')
return redirect(url_for('index'))
register_form = RegisterTokenForm()
if request.method == 'GET':
return render_template('frontend/auth/register.html', RegisterForm =
register_form, user = user)
if register_form.validate_on_submit():
user.firstname = register_form.firstname.data
user.lastname = register_form.lastname.data
user.enabled = True
user.confirmed_at = datetime.now()
NodeDefender.db.user.save_sql(user)
NodeDefender.db.user.set_password(user.email, register_form.password.data)
NodeDefender.mail.user.confirm_user(user.email)
flash('Register Successful, please login', 'success')
else:
flash('Error doing register, please try again', 'error')
return redirect(url_for('auth_view.login'))
return redirect(url_for('auth_view.login'))
def admin_server():
MQTTList = NodeDefender.db.mqtt.list(user = current_user.email)
MQTT = CreateMQTTForm()
if request.method == 'GET':
return render_template('frontend/admin/server.html',
MQTTList = MQTTList, MQTTForm = MQTT)
if MQTT.Submit.data and MQTT.validate_on_submit():
try:
NodeDefender.db.mqtt.create(MQTT.IPAddr.data, MQTT.Port.data)
NodeDefender.mqtt.connection.add(MQTT.IPAddr.data, MQTT.Port.data)
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
return redirect(url_for('admin_view.admin_server'))
if General.Submit.data and General.validate_on_submit():
flash('Successfully updated General Settings', 'success')
return redirect(url_for('admin_server'))
else:
flash('Error when trying to update General Settings', 'danger')
return redirect(url_for('admin_view.admin_server'))
flash('{}'.format(e), 'success')
return redirect(url_for('admin_view.admin_server'))
def admin_groups():
GroupForm = CreateGroupForm()
groups = NodeDefender.db.group.list(user_mail = current_user.email)
if request.method == 'GET':
return render_template('frontend/admin/groups.html', groups = groups,
CreateGroupForm = GroupForm)
else:
if not GroupForm.validate_on_submit():
flash('Form not valid', 'danger')
return redirect(url_for('admin_view.admin_groups'))
try:
group = NodeDefender.db.group.create(GroupForm.Name.data)
NodeDefender.db.group.update(group.name, **\
{'email' : GroupForm.Email.data,
'description' :
GroupForm.description.data})
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
return redirect(url_for('admin_view.admin_groups'))
flash('Successfully Created Group: {}'.format(group.name), 'success')
return redirect(url_for('admin_view.admin_group', name =
serializer.dumps(group.name)))
def admin_users():
UserForm = CreateUserForm()
if request.method == 'GET':
if current_user.superuser:
users = NodeDefender.db.user.list()
else:
groups = NodeDefender.db.group.list(current_user.email)
groups = [group.name for group in groups]
users = NodeDefender.db.user.list(*groups)
return render_template('frontend/admin/users.html', Users = users,\
CreateUserForm = UserForm)
if not UserForm.validate():
flash('Error adding user', 'danger')
return redirect(url_for('admin_view.admin_users'))
try:
user = NodeDefender.db.user.create(UserForm.Email.data,
UserForm.Firstname.data,
UserForm.Lastname.data)
except ValueError as e:
flash('Error: {}'.format(e), 'danger')
redirect(url_for('admin_view.admin_users'))
flash('Successfully added user {}'.format(user.firstname), 'success')
return redirect(url_for('admin_view.admin_user', email = user.email))
def admin_user(email):
email = serializer.loads(email)
usersettings = UserSettings()
userpassword = UserPassword()
usergroupadd = UserGroupAdd()
user = NodeDefender.db.user.get(email)
if request.method == 'GET':
if user is None:
flash('User {} not found'.format(id), 'danger')
return redirect(url_for('admin_view.admin_groups'))
return render_template('frontend/admin/user.html', User = user, UserSettings =
usersettings, UserPassword = userpassword,
UserGroupAdd = usergroupadd)
if usersettings.Email.data and usersettings.validate():
NodeDefender.db.user.update(usersettings.Email.data, **\
{'firstname' : usersettings.Firstname.data,
'lastname' : usersettings.Lastname.data})
return redirect(url_for('admin_view.admin_user', email =
serializer.dumps(usersettings.Email.data)))
def protect(self):
if request.method not in current_app.config['WTF_CSRF_METHODS']:
return
try:
validate_csrf(self._get_csrf_token())
except ValidationError as e:
logger.info(e.args[0])
self._error_response(e.args[0])
if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']:
if not request.referrer:
self._error_response('The referrer header is missing.')
good_referrer = 'https://{0}/'.format(request.host)
if not same_origin(request.referrer, good_referrer):
self._error_response('The referrer does not match the host.')
g.csrf_valid = True # mark this request as CSRF valid
def register_extension(app):
@app.before_request
def add_correlation_id(*args, **kw):
correlation_id = request.headers.get(CORRELATION_ID)
log.debug("%s %s", request.method, request.url)
if not correlation_id:
correlation_id = str(uuid.uuid4())
if request.method != "GET":
"""
TODO: remove sensitive information such as username/password
"""
log.debug({
"message": "Tracking request",
"correlation_id": correlation_id,
"method": request.method,
"uri": request.url,
"data": request.data,
})
request.correlation_id = correlation_id
@app.after_request
def save_correlation_id(response):
if CORRELATION_ID not in response.headers:
response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
return response
def admin_view():
if request.method == 'POST':
username = request.form.get('name')
password = request.form.get('password')
admin_user= Teams.query.filter_by(name=request.form['name'], admin=True).first()
if admin_user and bcrypt_sha256.verify(request.form['password'], admin_user.password):
try:
session.regenerate() # NO SESSION FIXATION FOR YOU
except:
pass # TODO: Some session objects dont implement regenerate :(
session['username'] = admin_user.name
session['id'] = admin_user.id
session['admin'] = True
session['nonce'] = sha512(os.urandom(10))
db.session.close()
return redirect('/admin/graphs')
if is_admin():
return redirect('/admin/graphs')
return render_template('admin/login.html')
def admin_keys(chalid):
if request.method == 'GET':
chal = Challenges.query.filter_by(id=chalid).first_or_404()
json_data = {'keys':[]}
flags = json.loads(chal.flags)
for i, x in enumerate(flags):
json_data['keys'].append({'id':i, 'key':x['flag'], 'type':x['type']})
return jsonify(json_data)
elif request.method == 'POST':
chal = Challenges.query.filter_by(id=chalid).first()
newkeys = request.form.getlist('keys[]')
newvals = request.form.getlist('vals[]')
print(list(zip(newkeys, newvals)))
flags = []
for flag, val in zip(newkeys, newvals):
flag_dict = {'flag':flag, 'type':int(val)}
flags.append(flag_dict)
json_data = json.dumps(flags)
chal.flags = json_data
db.session.commit()
db.session.close()
return '1'
def hello():
form = ReusableForm(request.form)
print(form.errors)
if request.method == 'POST':
name=request.form['name']
password=request.form['password']
email=request.form['email']
print(name, " ", email, " ", password)
if form.validate():
# Save the comment here.
flash('Thank you, ' + name)
else:
flash('Error: All the form fields are required. ')
return render_template('hello.html', form=form)
def login():
if request.method == "POST":
post_data = {
"username": request.form["username"],
"password": request.form["password"],
}
r = requests.post("http://127.0.0.1:8000/api/user/login", data=post_data)
resp = r.json()
if resp["status"] == 1:
session["token"] = r.cookies["token"]
session["api-session"] = r.cookies["flask"]
return redirect("/problems")
else:
flash(resp["message"])
return render_template("login.html")
else:
return render_template("login.html")
def problem(pid):
if "token" not in session:
return redirect("/")
if request.method == "POST":
r = requests.post("http://127.0.0.1:8000/api/problems/submit",
data={
"pid": pid,
"key": request.form["solution"],
},
cookies={
"token": session["token"],
"flask": session["api-session"]
})
app.logger.info(r.text)
app.logger.info(r.status_code)
resp = r.json()
if resp.get("status", 0) == 0:
flash(resp.get("message", "That's not the correct answer"))
return redirect("/problem/" + pid)
else:
return redirect("/problems")
else:
problem = api_get("/api/problems/" + pid)
return render_template("problem.html",
problem=problem["data"])
def __init__(self, options, func, request_context):
self.options = options
self.operation = request.endpoint
self.func = func.__name__
self.method = request.method
self.args = request.args
self.view_args = request.view_args
self.request_context = request_context
self.timing = dict()
self.error = None
self.stack_trace = None
self.request_body = None
self.response_body = None
self.response_headers = None
self.status_code = None
self.success = None
def newMenuItem(restaurant_id):
if request.method == 'POST':
newItem = MenuItem(name=request.form['name'], description=request.form[
'description'], price=request.form['price'], course=request.form['course'], restaurant_id=restaurant_id)
session.add(newItem)
session.commit()
return redirect(url_for('showMenu', restaurant_id=restaurant_id))
else:
return render_template('newMenuItem.html', restaurant_id=restaurant_id)
return render_template('newMenuItem.html', restaurant=restaurant)
# return 'This page is for making a new menu item for restaurant %s'
# %restaurant_id
# Edit a menu item
def editMenuItem(restaurant_id, menu_id):
editedItem = session.query(MenuItem).filter_by(id=menu_id).one()
if request.method == 'POST':
if request.form['name']:
editedItem.name = request.form['name']
if request.form['description']:
editedItem.description = request.form['name']
if request.form['price']:
editedItem.price = request.form['price']
if request.form['course']:
editedItem.course = request.form['course']
session.add(editedItem)
session.commit()
return redirect(url_for('showMenu', restaurant_id=restaurant_id))
else:
return render_template(
'editMenuItem.html', restaurant_id=restaurant_id, menu_id=menu_id, item=editedItem)
# return 'This page is for editing menu item %s' % menu_id
# Delete a menu item
def stop_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['STOP']
policy['container_serial'] = req_data['container_serial']
policy['container_name'] = instance_query_result.container_name
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res['code'] = worker_res_dict['code']
res['message'] = worker_res_dict['message']
res['container_serial'] = worker_res_dict['container_serial']
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def restart_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['RESTART']
policy['container_serial'] = req_data['container_serial']
policy['container_name'] = instance_query_result.container_name
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res = worker_res_dict
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def remove_instance():
res = {}
if request.method == 'POST':
req_data = json.loads(request.data)
db_session = db.Session()
instance_query_result = db_session.query(Instance).filter(\
Instance.container_serial == req_data['container_serial']).first()
if instance_query_result is not None:
policy = {}
policy['operate'] = app.config['REMOVE']
policy['container_serial'] = req_data['container_serial']
policy['user_name'] = req_data['user_name']
message = json.dumps(policy)
ui_mq = UiQueue()
worker_res = ui_mq.send(message)
worker_res_dict = json.loads(worker_res)
res['code'] = worker_res_dict['code']
res['message'] = worker_res_dict['message']
res['container_serial'] = worker_res_dict['container_serial']
eagle_logger.info(res['message'])
else:
res['code'] = '0x9'
res['message'] = 'container not exist'
return jsonify(**res)
def tail():
if request.method == 'POST':
fi = request.form['file']
if os.path.isfile(fi):
n = int(request.form['n'])
le = io.open(fi, 'r', encoding='utf-8')
taildata = le.read()[-n:]
le.close()
else:
taildata = "No such file."
return render_template('tail.html',taildata=taildata)