Python flask.request 模块,values() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.values()。
def deployment():
"""
"""
hasFiles = False
if request.files.getlist('file')[0]:
hasFiles = True
DATA = {'files': list(zip(request.files.getlist('file'), request.form.getlist('File Type'), request.form.getlist('file_code')))}
for file, fileType, fileCod in DATA['files']:
if fileType in ['static', 'data'] and not fileCod:
return json.dumps('Error - You should specify a file code for statics and outputs'), 500
env = request.values['env']
isNew = True if request.values['isNew'] == 'true' else False
if isNew:
createEnv(env)
if hasFiles:
return deployFiles(env, DATA)
return json.dumps("Environment created"), 200
def crawl():
try:
depth_limit = int(request.values['depth'])
except ValueError as e:
return "Depth parameter must be a number", 400
except:
depth_limit = 1
if 'url' in request.values:
url = request.values['url']
parsed_url = urlparse.urlsplit(url)
if parsed_url.scheme not in ['http', 'https']:
return "Only http and https protocols are supported", 400
if parsed_url.netloc == '':
return "Missing domain", 400
allowed_domains = [ parsed_url.netloc ]
crawler = Crawler(allowed_domains, depth_limit)
crawler.crawl(url)
return jsonify(**crawler.crawled)
else:
return "Missing url parameter", 400
def verify_slack_username(http_method, api_method, method_acl_userlist):
user_name = ''
if http_method == 'GET':
user_name = request.args.get('user_name', '')
elif http_method == 'POST':
user_name = request.values['user_name']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if "*" in method_acl_userlist:
super_name_list = method_acl_userlist["*"]
if user_name in super_name_list:
return None
if api_method in method_acl_userlist:
allowed_user_list = method_acl_userlist[api_method]
if user_name not in allowed_user_list:
content = "Error: user(%s) not allowed to call api_method(%s)" % \
(user_name, api_method)
raise Exception(content)
return None
def join(id):
event = find_event(id)
if event == None:
return not_found()
user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
if user == None:
return user_not_specified()
join_event(user, event)
return jsonify({
'joined': True,
'rsvps_disabled': False,
'event_archived': False,
'over_capacity': False,
'past_deadline': False,
})
def chan_friends(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/friend
with <channel> replaced for the channel of the friends you want to get
<channel> can either be an int that matches the channel, or a string
that matches the owner's username
"""
# model = request.path.split("/")[-1]
model = "Friend"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
data=results,
fields=fields
)
return make_response(jsonify(packet), code)
# There was an error!
# if not str(code).startswith("2"):
# return make_response(jsonify(packet), code)
# NOTE: Not needed currently, but this is how you would check
# TODO: Fix this endpoint to remove timing elements (friends are forever)
# TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/messages
with <channel> replaced for the messages you want to get
"""
model = "Message"
if channel.isdigit():
fields = {"channelId": int(channel)}
else:
fields = {"owner": channel.lower()}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_quotes(channel):
"""
If you GET this endpoint, go to /api/v1/channel/<channel>/quote
with <channel> replaced for the channel you want to get quotes for
"""
model = "quote"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"owner": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def user_commands(channel):
"""
If you GET this endpoint, simply go to /api/v1/channel/<channel>/command
with <channel> replaced for the channel you want to get commands for
"""
model = "Command"
if channel.isdigit():
fields = {"channelId": int(channel), "deleted": False}
else:
fields = {"channelName": channel.lower(), "deleted": False}
packet, code = generate_response(
model,
request.path,
request.method,
request.values,
fields=fields
)
return make_response(jsonify(packet), code)
def handle_states():
'''Returns all the states from the database as JSON objects with a GET
request, or adds a new state to the database with a POST request. Refer to
exception rules of peewee `get()` method for additional explanation of
how the POST request is handled:
http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get
'''
if request.method == 'GET':
list = ListStyle().list(State.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
'''Check that all the required parameters are made in request.'''
required = set(["name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
try:
State.select().where(State.name == request.form['name']).get()
return jsonify(code=10001, msg="State already exists"), 409
except State.DoesNotExist:
state = State.create(name=request.form['name'])
return jsonify(state.to_dict()), 201
def handle_state_id(state_id):
'''Select the state with the id from the database and store as the variable
`state` with a GET request method. Update the data of the particular state
with a PUT request method. This will take the parameters passed and update
only those values. Remove the state with this id from the database with
a DELETE request method.
Keyword arguments:
state_id: The id of the state from the database.
'''
try:
state = State.select().where(State.id == state_id).get()
except State.DoesNotExist:
return jsonify(msg="There is no state with this id."), 404
if request.method == 'GET':
return jsonify(state.to_dict()), 200
elif request.method == 'DELETE':
try:
state = State.delete().where(State.id == state_id)
except State.DoesNotExist:
raise Exception("There is no state with this id.")
state.execute()
return jsonify(msg="State deleted successfully."), 200
def handle_sms():
"""Handles Twillio SMS message"""
customer_phone_number = request.values["From"]
text_message_body = request.values["Body"]
# Retrieve the customer from the DB or create a new one
customer = Customer.get_customer_by_phone_number(customer_phone_number)
if not customer:
# Save the customer to the DB
customer = Customer.create_new({
CFields.PHONE_NUMBER: customer_phone_number,
})
customer.create()
customer_lc.on_new_customer()
messaging.on_message_recieve(customer, text_message_body)
return jsonpickle.encode(dict(
success=True
))
def engine():
if session.get('login_in',None):
if session.get('username',None):
if request.values.get('uuid',None):
uuid = request.values['uuid']
session['uuid'] = uuid
else:
uuid = session['uuid']
username = session['username']
try:
engine = models.Engine.query.filter_by(uuid = uuid, user_name = username).first()
base_url = "tcp://" + engine.host + ":" + engine.port
docker = DOCKER(base_url = base_url, timeout = 5, version = "auto")
return render_template('engine.html', host_info = docker.get_info(), usage = docker.monitor())
except Exception as msg:
return str(msg), 503
else:
return "Error 401, Authentication failed", 401
else:
return redirect('/login')
def change_passwd():
if session.get('login_in',None):
if session.get('username',None):
oldpassword = request.values['oldpassword']
newpassword = request.values['newpassword']
try:
user = models.User.query.filter_by(username = session['username']).first()
if check_password_hash(user.password, oldpassword):
user.password = generate_password_hash(newpassword)
db.session.add(user)
db.session.commit()
return jsonify(result="change sucessfull")
else:
return jsonify(result="change failed")
except:
db.session.rollback()
return jsonify(result="change failed")
finally:
db.session.close()
else:
return redirect('/login')
else:
return redirect('/login')
def readGcodeFilesForOrigin(origin):
if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown origin: %s" % origin, 404)
recursive = request.values.get("recursive", "false") in valid_boolean_trues
force = request.values.get("force", "false") in valid_boolean_trues
if force:
with _file_cache_mutex:
try:
del _file_cache[origin]
except KeyError:
pass
files = _getFileList(origin, recursive=recursive)
if origin == FileDestinations.LOCAL:
usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
return jsonify(files=files, free=usage.free, total=usage.total)
else:
return jsonify(files=files)
def _get_temperature_data(preprocessor):
if not printer.is_operational():
return make_response("Printer is not operational", 409)
tempData = printer.get_current_temperatures()
if "history" in request.values.keys() and request.values["history"] in valid_boolean_trues:
tempHistory = printer.get_temperature_history()
limit = 300
if "limit" in request.values.keys() and unicode(request.values["limit"]).isnumeric():
limit = int(request.values["limit"])
history = list(tempHistory)
limit = min(limit, len(history))
tempData.update({
"history": map(lambda x: preprocessor(x), history[-limit:])
})
return preprocessor(tempData)
def uploadLanguagePack():
input_name = "file"
input_upload_path = input_name + "." + settings().get(["server", "uploads", "pathSuffix"])
input_upload_name = input_name + "." + settings().get(["server", "uploads", "nameSuffix"])
if not input_upload_path in request.values or not input_upload_name in request.values:
return make_response("No file included", 400)
upload_name = request.values[input_upload_name]
upload_path = request.values[input_upload_path]
exts = filter(lambda x: upload_name.lower().endswith(x), (".zip", ".tar.gz", ".tgz", ".tar"))
if not len(exts):
return make_response("File doesn't have a valid extension for a language pack archive", 400)
target_path = settings().getBaseFolder("translations")
if tarfile.is_tarfile(upload_path):
_unpack_uploaded_tarball(upload_path, target_path)
elif zipfile.is_zipfile(upload_path):
_unpack_uploaded_zipfile(upload_path, target_path)
else:
return make_response("Neither zip file nor tarball included", 400)
return getInstalledLanguagePacks()
def _get_locale(self):
global LANGUAGES
if "l10n" in request.values:
return Locale.negotiate([request.values["l10n"]], LANGUAGES)
if "X-Locale" in request.headers:
return Locale.negotiate([request.headers["X-Locale"]], LANGUAGES)
if hasattr(g, "identity") and g.identity and userManager.enabled:
userid = g.identity.id
try:
user_language = userManager.getUserSetting(userid, ("interface", "language"))
if user_language is not None and not user_language == "_default":
return Locale.negotiate([user_language], LANGUAGES)
except octoprint.users.UnknownUser:
pass
default_language = self._settings.get(["appearance", "defaultLanguage"])
if default_language is not None and not default_language == "_default" and default_language in LANGUAGES:
return Locale.negotiate([default_language], LANGUAGES)
return Locale.parse(request.accept_languages.best_match(LANGUAGES))
def get_alerts_by_form_parameters():
'''
Get an alert based on the
'''
start = request.args.get('start') or request.form.get('start')
end = request.args.get('end') or request.form.get('end')
_logger.info('{}: {} - {}'.format(request.method,
request.path,
request.values))
# Convert to datetime objects
start_datetime = _parse_date(start)
end_datetime = _parse_date(end)
alerts = s3_model.get_alerts_by_date(start_datetime, end_datetime)
status_code = 200
success = True
response = {
'success': success,
'alerts': alerts
}
return jsonify(response), status_code
def create():
"""
????
:return:
"""
username = request.values.get("username")
password = request.values.get("password")
email = request.values.get("email")
gender = int(request.values.get("gender"))
password_hash = generate_password_hash(password, method='pbkdf2:sha1', salt_length=8)
user = User(username=username,
password_hash=password_hash,
email=email,
gender=gender)
user.save()
return success()
def session_test():
if request.method == 'DELETE':
session.pop('username')
# ?? ??
return 'Session deleted!'
else:
if 'username' in session:
# ?? ?? ?? ??
return 'Hello {0}'.format(session['username'])
else:
session['username'] = request.values['username']
# ?? ??
return 'Session appended!'
def __init__(self, request, columns, collection):
self.columns = columns
self.collection = collection
# values specified by the datatable for filtering, sorting, paging
self.request_values = request.values
# results from the db
self.result_data = None
# total in the table after filtering
self.cardinality_filtered = 0
# total in the table unfiltered
self.cadinality = 0
self.run_queries()
def profile_change():
user = g.user
if set(['password', 'question', 'answer']).issubset(request.values):
password = request.values['password']
if is_valid_password(password):
name = request.values['name']
question = request.values['question']
answer = request.values['answer']
user.name = name
user.password = password
user.question = question
user.answer = answer
db.session.add(user)
db.session.commit()
flash('Account information successfully changed.')
else:
flash('Password does not meet complexity requirements.')
return redirect(url_for('ph_bp.profile'))
def guild_config_history(guild):
def serialize(gcc):
return {
'user': serialize_user(gcc.user_id),
'before': unicode(gcc.before_raw),
'after': unicode(gcc.after_raw),
'created_at': gcc.created_at.isoformat(),
}
q = GuildConfigChange.select(GuildConfigChange, User).join(
User, on=(User.user_id == GuildConfigChange.user_id),
).where(GuildConfigChange.guild_id == guild.guild_id).order_by(
GuildConfigChange.created_at.desc()
).paginate(int(request.values.get('page', 1)), 25)
return jsonify(map(serialize, q))
def update():
if request.method == 'POST':
user = request.values['user']
vector = request.values['vector']
if user not in logged_in:
abort(404)
vector = np.fromstring(vector, dtype=float, sep=',')
if True:
fm = FeatureMatrix()
# nearest_neighbor = fm.match('A', np.array([.1,.2,.3,.5]))
nearest_neighbor = fm.match(user, vector)
else:
nearest_neighbor = querySpark.match(user, vector)
return jsonify(result='Success',
neighbor=nearest_neighbor)
def login():
if request.method == 'POST':
remote_ip = request.remote_addr
print("Login from client IP",remote_ip)
user = request.values['user']
pwd = request.values['password']
if user in logged_in:
return jsonify(result='Success')
if user in registered_ids.keys():
if registered_ids[user] == pwd:
print("logging in",user)
logged_in.append(user)
#r.set(user,str(remote_ip))
return jsonify(result='Success')
else:
abort(404)
def get_history(targets, start='-1h', end='now'):
"""Retrieve the time series data for one or more metrics.
Args:
targets (str|List[str]): Graphite path, or list of paths.
start (Optional[str]): Start of date range. Defaults to one hour ago.
end (Optional[str]): End of date range. Defaults to now.
Returns:
A list of metric values.
"""
metrics = query(targets, start, end)
try:
metrics = json.loads(metrics)[0]['datapoints']
except:
return []
# Convert unix timestamps to plot.ly's required date format
for metric in metrics:
timestamp = datetime.fromtimestamp(metric[1])
metric[1] = timestamp.strftime('%Y-%m-%d %H:%M:%S')
return metrics
def plotly(metrics=[], name='', color=None):
"""Convert a list of metric values to a Plot.ly object.
"""
values, timestamps = zip(*metrics)
trace = {
'type': 'scatter',
'x': timestamps,
'y': values,
'name': name,
}
if color:
trace['marker'] = { color: color }
return trace
def date_range():
"""Ensure the requested date range is in a format Graphite will accept.
"""
range_from = request.values.get('from', '-1h')
range_end = request.values.get('end', 'now')
try:
# Try to coerce date range into integers
range_from = int(float(range_from) / 1000)
range_end = int(float(range_end) / 1000)
except:
# ... or pass string directly to Graphite
pass
return (range_from, range_end)
def get_efo_info_from_code(self, efo_codes, **kwargs):
params = SearchParams(**kwargs)
if not isinstance(efo_codes, list):
efo_codes = [efo_codes]
query_body = addict.Dict()
query_body.query.ids["values"] = efo_codes
query_body.size = params.size
if params.facets == 'true':
query_body.aggregations.significant_therapeutic_areas.significant_terms.field = "therapeutic_labels.keyword"
query_body.aggregations.significant_therapeutic_areas.significant_terms.size = 25
query_body.aggregations.significant_therapeutic_areas.significant_terms.min_doc_count = 2
if params.fields:
query_body._source = params.fields
if efo_codes:
res = self._cached_search(index=self._index_efo,
doc_type=self._docname_efo,
body=query_body.to_dict()
)
return PaginatedResult(res, params)
def get_evidences_by_id(self, evidenceid, **kwargs):
if isinstance(evidenceid, str):
evidenceid = [evidenceid]
params = SearchParams(**kwargs)
if params.datastructure == SourceDataStructureOptions.DEFAULT:
params.datastructure = SourceDataStructureOptions.FULL
res = self._cached_search(index=self._index_data,
# doc_type=self._docname_data,
body={"query": {
"ids": {"values": evidenceid},
},
"size": len(evidenceid),
}
)
return SimpleResult(res,
params,
data=[hit['_source'] for hit in res['hits']['hits']])
def _get_labels_for_reactome_ids(self, reactome_ids):
labels = defaultdict(str)
if reactome_ids:
res = self._cached_search(index=self._index_reactome,
doc_type=self._docname_reactome,
body={"query": {
"ids": {
"values": reactome_ids
}
},
'_source': {"includes": ['label']},
'size': 10000,
'from': 0,
}
)
if res['hits']['total']:
for hit in res['hits']['hits']:
labels[hit['_id']] = hit['_source']['label']
return labels
def handle_states():
'''Returns all the states from the database as JSON objects with a GET
request, or adds a new state to the database with a POST request. Refer to
exception rules of peewee `get()` method for additional explanation of
how the POST request is handled:
http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get
'''
if request.method == 'GET':
list = ListStyle().list(State.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
'''Check that all the required parameters are made in request.'''
required = set(["name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
try:
State.select().where(State.name == request.form['name']).get()
return jsonify(code=10001, msg="State already exists"), 409
except State.DoesNotExist:
state = State.create(name=request.form['name'])
return jsonify(state.to_dict()), 201
def handle_state_id(state_id):
'''Select the state with the id from the database and store as the variable
`state` with a GET request method. Update the data of the particular state
with a PUT request method. This will take the parameters passed and update
only those values. Remove the state with this id from the database with
a DELETE request method.
Keyword arguments:
state_id: The id of the state from the database.
'''
try:
state = State.select().where(State.id == state_id).get()
except State.DoesNotExist:
return jsonify(msg="There is no state with this id."), 404
if request.method == 'GET':
return jsonify(state.to_dict()), 200
elif request.method == 'DELETE':
try:
state = State.delete().where(State.id == state_id)
except State.DoesNotExist:
raise Exception("There is no state with this id.")
state.execute()
return jsonify(msg="State deleted successfully."), 200
def api_send_message():
print ('send message', request.values.get('type'))
game.doStage(session['name'], request.values)
return jsonify({'messages': 'success'})
def verify_slack_token(http_method, allowed_token_list):
token = ''
if http_method == 'GET':
token = request.args.get('token', '')
elif http_method == 'POST':
token = request.values['token']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if token not in allowed_token_list:
content = "Error: invalid token(%s)" % (token)
raise Exception(content)
return None
def get_response_url(http_method):
response_url = ''
if http_method == 'GET':
response_url = request.args.get('response_url', '')
elif http_method == 'POST':
response_url = request.values['response_url']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if response_url == '':
raise Exception("Error: fail to get response_url from the http request")
return response_url
def update_job(job_id, job=None):
result = Job.update(job_id, request.values)
if result.get("updated", False):
return json_resp(result)
else:
return json_resp(result), 450
def add_user():
result = User.create(request.values)
if result.get("created", False):
result["user"] = result["user"].as_object()
return json_resp(result)
else:
return json_resp(result), 450
def update_pheno(pheno_id, pheno=None):
result = Phenotype.update(pheno_id, request.values)
if result.get("updated", False):
return json_resp(result)
else:
return json_resp(result), 450
def webhook():
"""
Twilio will make a post request to `/webhook` everytime if receives a new
text message. This endpoint should both record the texter in the database,
and use Twilio to send a response. The status code will be 201 if
successful, and have an appropriate error code if not.
Returns:
object: The rendered JSON response.
"""
# pylint: disable=unused-variable
if not request.values:
return Response(status=400)
phone_number = request.values.get("From")
message_body = request.values.get("Body")
if None in {phone_number, message_body}:
return Response(status=400)
Texter.record(phone_number)
response = get_response(message_body)
get_text_class().send_message(phone_number, response)
return Response(status=201)
def set_param(param_name):
"""
POST /api/<version>/param/<param_name> {"value": "x"}
POST to the ROS Parameter Server. Value should be in the value field
of the request body.
"""
if not "value" in request.values:
return error("No value supplied")
rospy.set_param(param_name, request.values["value"])
return "", 204
def post_topic_message(topic_name):
"""
POST /api/<version>/topic/<topic_name>
POST a message to a ROS topic by name.
Requires a JSON payload of shape: ``[x, y, z]``. The values of the JSON
array must match the argument types of the topic's type constructor.
"""
topic_name = "/" + topic_name
# Get the list of ROS topics in the system.
# Returns a list of [topic, type_string] pairs.
topic_list = rostopic_master.getTopicTypes()
# Find topic in list (this is Python's approach to find).
try:
topic_match = next(x for x in topic_list if x[0] == topic_name)
except StopIteration:
# If we can't find the topic, return early (400 bad request).
return error("Topic does not exist")
json = request.get_json(silent=True)
args = json if json else []
try:
# Get the message type constructor for the topic's type string.
MsgType = get_message_class(topic_match[1])
pub = rospy.Publisher(topic_name, MsgType, queue_size=10)
# Unpack JSON list and pass to publish.
# pub.publish() will pass the list of arguments to the message
# type constructor.
pub.publish(*args)
except Exception, e:
return error("Wrong arguments for topic type constructor")
return success("Posted message to topic")
def get_topic_data(topic_name):
"""
GET /api/<version>/topic_data/<topic_name>
Get a single data point from a topic over HTTP.
"""
topic_name = "/" + topic_name
try:
msg_class, real_topic, _ = rostopic.get_topic_class(topic_name)
except rostopic.ROSTopicIOException as e:
raise e
if not real_topic:
return error("Topic does not exist", 404)
msg = rospy.wait_for_message(real_topic, msg_class)
data = getattr(msg, "data", None)
if data is None:
json = '{"status": ['
for x in msg.status:
if x == msg.status[-1]:
json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s}]}' % \
(x.name if x.name else "null", x.level, \
x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \
x.values)
else:
json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s},' % \
(x.name if x.name else "null", x.level, \
x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \
x.values)
return Response(json, mimetype = 'application/json')
else:
return jsonify({"result": data})
def filter_participants(event):
include_participants = request.values.get('include_participants', '').lower()
if include_participants == 'true':
return event
else:
return {k: v for k, v in event.items() if k != 'participants'}
def events():
if 'created_at_or_after' in request.values:
date = parse_date(request.values['created_at_or_after'])
events = [event for event in api_data if created_on_or_after(event, date)]
return render_events(sort_by_start_time(events))
elif 'ids' in request.values:
ids = json.loads(request.values['ids'])
return render_events(e for e in map(find_event, ids) if e)
else:
return render_events([])
def leave(id):
event = find_event(id)
if event == None:
return not_found()
user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
if user == None:
return user_not_specified()
leave_event(user, event)
return jsonify({})
def get_request_log():
# parse args and forms
values = ''
if len(request.values) > 0:
for key in request.values:
values += key + ': ' + request.values[key] + ', '
route = '/' + request.base_url.replace(request.host_url, '')
request_log = {
'route': route,
'method': request.method,
}
# ??xml ??
category = request_log['route'].split('/')[1]
if category != "virtual_card":
return request_log
if len(request.get_data()) != 0:
body = json.loads(request.get_data())
if "password" in body:
body.pop("password")
request_log['body'] = body
if len(values) > 0:
request_log['values'] = values
return request_log
def sonarr():
from slack_posting import postMsg
postMsg(request.values, 'bots')
return "ok"
# API Stuff
def handle_books_id(place_id, book_id):
'''Returns a JSON object of the book_id with a GET request method. Updates
a booking's attributes with a POST request method. Removes a booking with a
DELETE request method.
Keyword arguments:
place_id: The id of the place with the booking.
book_id: The id of the booking.
'''
try:
book = PlaceBook.select().where(PlaceBook.id == book_id).get()
except PlaceBook.DoesNotExist:
raise Exception("There is no placebook with this id.")
if request.method == 'GET':
return jsonify(book.to_dict()), 200
elif request.method == 'PUT':
params = request.values
for key in params:
if key == 'user':
return jsonify(msg="You may not change the user."), 409
if key == 'updated_at' or key == 'created_at':
continue
setattr(book, key, params.get(key))
book.save()
return jsonify(msg="Place book information updated successfully."), 200
elif request.method == 'DELETE':
try:
book = PlaceBook.delete().where(PlaceBook.id == book_id)
except PlaceBook.DoesNotExist:
raise Exception("There is no place with this id.")
book.execute()
return jsonify(msg="Place book deleted successfully."), 200
def handle_places():
'''Returns all the places with a GET request, or adds a new city to the
database with a POST request. The parameters passed to the POST request
iterate through the data and set the corresponding attributes of the
instance to be inserted into the database. Will not set attribute passed
as `updated_at` or `created_at`.
'''
if request.method == 'GET':
list = ListStyle().list(Place.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
place = Place()
'''Check that all the required parameters are made in request.'''
required = set(["owner", "city", "name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
for key in params:
if key == 'updated_at' or key == 'created_at':
continue
setattr(place, key, params.get(key))
place.save()
return jsonify(place.to_dict()), 200