我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.json.jsonify()。
def get_data(): cname = request.args.get("collection") settings = CSETTINGS[cname] search_string = request.args.get("search_string") xaxis = request.args.get("xaxis") yaxis = request.args.get("yaxis") xaxis = get_mapped_name(settings, xaxis) yaxis = get_mapped_name(settings, yaxis) projection = [xaxis, yaxis] if search_string.strip() != "": criteria = process_search_string(search_string, settings) data = [] for r in DB[cname].find(criteria, projection=projection): x = _get_val(xaxis, r, None) y = _get_val(yaxis, r, None) if x and y: data.append([x, y]) else: data = [] return jsonify(jsanitize(data))
def _serve_webui(self, file_name='index.html'): # pylint: disable=redefined-builtin try: assert file_name web3 = self.flask_app.config.get('WEB3_ENDPOINT') if web3 and 'config.' in file_name and file_name.endswith('.json'): host = request.headers.get('Host') if any(h in web3 for h in ('localhost', '127.0.0.1')) and host: _, _port = split_endpoint(web3) _host, _ = split_endpoint(host) web3 = 'http://{}:{}'.format(_host, _port) response = jsonify({'raiden': self._api_prefix, 'web3': web3}) else: response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name) except (NotFound, AssertionError): response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html') return response
def delete_executor(executor_id): """ Delete an executor Example request:: DEL /api/1/executors/6890192d8b6c40e5af16f13aa036c7dc """ manager.remove_executor(executor_id) return json.jsonify({ "msg": "Executor %s succesfully deleted" % executor_id }) ######## # UTILS #######
def join(id): event = find_event(id) if event == None: return not_found() user = find_user(request.values.get('user_param'), request.values.get('user_param_value')) if user == None: return user_not_specified() join_event(user, event) return jsonify({ 'joined': True, 'rsvps_disabled': False, 'event_archived': False, 'over_capacity': False, 'past_deadline': False, })
def handle_authn_response(): # parse authn response authn_response = current_app.rp.client.parse_response(AuthorizationResponse, info=request.query_string.decode("utf-8"), sformat="urlencoded") auth_code = None if "code" in authn_response: auth_code = authn_response["code"] # make token request args = { "code": auth_code, "client_id": current_app.rp.client.client_id, "client_secret": current_app.rp.client.client_secret } token_response = current_app.rp.client.do_access_token_request(scope="openid", request_args=args) access_token = token_response["access_token"] id_token = token_response["id_token"].to_dict() # TODO do userinfo req else: id_token = authn_response["id_token"].to_dict() access_token = authn_response.get("access_token") return jsonify(dict(auth_code=auth_code, token=access_token, id_token=id_token))
def api_private_counties_by_month(): NOW = datetime.now() result = [] r = current_app.db_session.query( Report.test_start_time, Report.probe_cc) \ .order_by(Report.test_start_time) # XXX this can be done better in a SQL that is not sqlite monthly_buckets = {} for tst, country in r: if tst > NOW: # We ignore measurements from time travelers continue bkt = tst.strftime("%Y-%m-01") monthly_buckets[bkt] = monthly_buckets.get(bkt, []) if country not in monthly_buckets[bkt]: monthly_buckets[bkt].append(country) for bkt in monthly_buckets.keys(): result.append({ 'date': bkt, 'value': len(monthly_buckets[bkt]) }) return jsonify(result)
def api_private_runs_by_month(): NOW = datetime.now() result = [] r = current_app.db_session.query( Report.test_start_time) \ .order_by(Report.test_start_time) # XXX this can be done better in a SQL that is not sqlite monthly_buckets = {} for res in r: tst = res.test_start_time if tst > NOW: # We ignore measurements from time travelers continue bkt = tst.strftime("%Y-%m-01") monthly_buckets[bkt] = monthly_buckets.get(bkt, 0) monthly_buckets[bkt] += 1 for bkt in monthly_buckets.keys(): result.append({ 'date': bkt, 'value': monthly_buckets[bkt] }) return jsonify(result)
def api_private_blockpage_detected(): q = current_app.db_session.query( distinct(Report.probe_cc).label('probe_cc'), ).join(Measurement, Measurement.report_no == Report.report_no) \ .filter(Measurement.confirmed == True) \ .filter(or_( Report.test_name == 'http_requests', Report.test_name == 'web_connectivity' )) results = [] for row in q: results.append({ 'probe_cc': row.probe_cc }) return jsonify({ 'results': results })
def variantset_search(): if not request.json: return makeGAException( message='Bad Content Type, please send application/json', ecode=-1, rcode=415) datasetId = request.json.get('datasetId', "") pageSize = request.json.get('pageSize', None) pageToken = request.json.get('pageToken', None) try: garesp = tileSearch.searchVariantSets( datasetId=datasetId, pageSize=pageSize, pageToken=pageToken) except: return makeGAException( message='search command exited abnormally', ecode=500, rcode=500) return jsonify(garesp.gavsetresponse_info)
def put(self, username, transaction_id): user = self.authenticate() if user: new_transaction = request.get_json(force=True) transaction = user.transactions.filter_by( transaction_id=transaction_id).first_or_404() transaction.category_id = new_transaction['category_id'] transaction.person_id = new_transaction['person_id'] transaction.transaction_date = new_transaction['transaction_date'] transaction.value = new_transaction['value'] transaction.notes = new_transaction['notes'] transaction.type = new_transaction['type'] transaction.done = new_transaction['done'] db.session.commit() return json.jsonify(transaction.as_dict()) raise InvalidUsage()
def post(self, username, transaction_id): user = self.authenticate() if user: transaction = user.transactions.filter_by( transaction_id=transaction_id).first_or_404() supposed_item = request.get_json(force=True) item = TransactionItem() item.item_id = 1 if len(transaction.transaction_items.all( )) else transaction.transaction_items.all()[-1].item_id + 1 item.user_id = user.user_id item.transaction_id = transaction.transaction_id item.person_id = supposed_item['person_id'] item.item_date = supposed_item['item_date'] item.value = supposed_item['value'] item.notes = supposed_item['notes'] item.type = supposed_item['type'] item.done = supposed_item['done'] db.session.add(item) db.session.commit() if item.item_id: return json.jsonify(item.as_dict()) raise InvalidUsage()
def put(self, username, transaction_id, item_id): user = self.authenticate() if user: transaction = user.transactions.filter_by( transaction_id=transaction_id).first_or_404() new_item = request.get_json(force=True) item = transaction.transaction_items.filter_by( item_id=item_id).first_or_404() item.person_id = new_item['person_id'] item.item_date = new_item['item_date'] item.value = new_item['value'] item.notes = new_item['notes'] item.type = new_item['type'] item.done = new_item['done'] db.session.commit() return json.jsonify(item.as_dict()) raise InvalidUsage()
def stack_analyses_debug(external_request_id): """Debug endpoint exposing operational data for particular stack analysis. This endpoint is not part of the public API. Note the existence of the data is not guaranteed, therefore the endpoint can return 404 even for valid request IDs. """ results = retrieve_worker_results(rdb, external_request_id) if not results: return jsonify(error='No operational data for the request ID'), 404 response = {'tasks': []} for result in results: op_data = result.to_dict() audit = op_data.get('task_result', {}).get('_audit', {}) task_data = {'task_name': op_data.get('worker')} task_data['started_at'] = audit.get('started_at') task_data['ended_at'] = audit.get('ended_at') task_data['error'] = op_data.get('error') response['tasks'].append(task_data) return jsonify(response), 200
def hipfrog(): requestdata = json.loads(request.get_data()) callingMessage = requestdata['item']['message']['message'].lower().split() oauthId = requestdata['oauth_client_id'] installation = messageFunctions.getInstallationFromOauthId(oauthId) if installation.glassfrogToken is None: message = strings.set_token_first message_dict = messageFunctions.createMessageDict(strings.error_color, message) elif len(callingMessage) == 1: message = strings.help_hipfrog message_dict = messageFunctions.createMessageDict(strings.succes_color, message) elif len(callingMessage) > 1: # /hipfrog something message = strings.missing_functionality.format(callingMessage[1]) message_dict = messageFunctions.createMessageDict(strings.error_color, message) # TODO Generate message_dict and color here return json.jsonify(message_dict)
def error_handler(error): """ Standard Error Handler """ if isinstance(error, HTTPException): return jsonify({ 'statusCode': error.code, 'name': error.name, 'description': error.description }), error.code else: return jsonify({ 'statusCode': 500, 'name': 'Internal Server Error', 'description': 'An unknown error has occurred' }), 500 # common errors - add others as needed
def get_group_sections(): q = request.query_string.decode() if not q: return json.jsonify({'status': 'error'}) all_cn = [] for cn in rd.smembers('group:%s' % q): try: all_cn.insert(0, int(cn)) except ValueError: continue schedule = tuple( lesson.details for lesson in Lesson.query.filter(Lesson.class_no.in_(all_cn)) .order_by(Lesson.start).all() ) return json.jsonify({ 'status': 'ok', 'events': schedule, })
def get_ids(collection_name): settings = CSETTINGS[collection_name] doc = DB[collection_name].distinct(settings["unique_key"]) return jsonify(jsanitize(doc))
def get_doc_json(collection_name, uid): settings = CSETTINGS[collection_name] criteria = { settings["unique_key"]: process(uid, settings["unique_key_type"])} doc = DB[collection_name].find_one(criteria) return jsonify(jsanitize(doc))
def task_in_json(taskid): if ':' not in taskid: return json.jsonify({'code': 400, 'error': 'bad project:task_id format'}) project, taskid = taskid.split(':', 1) taskdb = app.config['taskdb'] task = taskdb.get_task(project, taskid) if not task: return json.jsonify({'code': 404, 'error': 'not found'}) task['status_string'] = app.config['taskdb'].status_to_string(task['status']) return json.jsonify(task)
def handle_invalid_usage(error): """ Handler for APIErrors thrown by API endpoints """ log.info('%d %s', error.status_code, error.message) response = json.jsonify(error.to_dict()) response.status_code = error.status_code return response
def add_plan(): """ Add a plan. Example request:: PUT /api/1/executors/3b373155577b4d1bbc62216ffea013a4 Body: { "name": "Terminate instances in Playground", "attack": { "args": { "region": "eu-west-1", "filters": { "tag:Name": "playground-asg" } }, "ref": "terminate_ec2_instance:TerminateEC2Instance" }, "planner": { "ref": "simple_planner:SimplePlanner", "args": { "min_time" : "10:00", "max_time" : "19:00", "times": 4 } } } """ assert validate_payload(request, plan_schema) req_json = request.get_json() name = req_json["name"] planner_config = req_json["planner"] attack_config = req_json["attack"] manager.execute_plan(name, planner_config, attack_config) return json.jsonify({"msg": "ok"})
def delete_plan(plan_id): """ Delete a plan Example request:: DEL /api/1/plans/6890192d8b6c40e5af16f13aa036c7dc """ manager.delete_plan(plan_id) return json.jsonify({ "msg": "Plan %s successfully deleted" % plan_id })
def render_event(event): return jsonify(filter_participants(event))
def render_events(events): return jsonify([filter_participants(event) for event in events])
def not_found(): response = jsonify({"message":"not_found"}) response.status_code = 404 return response
def user_not_specified(): response = jsonify({"message": "user_not_specified"}) response.status_code = 422 return response
def custom_401(error): # API CALL if 'Content-Type' in request.headers and request.headers['Content-Type'].lower() == 'application/json': return jsonify(errors=['Not authorized']), 401 return render_template('home/401.html', message=error.description['message']), 401
def get_version(): return jsonify({ "version": __version__ })
def api_private_asn_by_month(): NOW = datetime.now() result = [] r = current_app.db_session.query( Report.test_start_time, Report.probe_asn) \ .order_by(Report.test_start_time) # XXX this can be done better in a SQL that is not sqlite monthly_buckets = {} for tst, asn in r: asn = 'AS%d' % asn if tst > NOW: # We ignore measurements from time travelers continue bkt = tst.strftime("%Y-%m-01") monthly_buckets[bkt] = monthly_buckets.get(bkt, []) if asn not in monthly_buckets[bkt]: monthly_buckets[bkt].append(asn) for bkt in monthly_buckets.keys(): result.append({ 'date': bkt, 'value': len(monthly_buckets[bkt]) }) return jsonify(result)
def api_private_reports_per_day(): q = current_app.db_session.query( func.count(func.date_trunc('day', Report.test_start_time)), func.date_trunc('day', Report.test_start_time) ).group_by(func.date_trunc('day', Report.test_start_time)).order_by( func.date_trunc('day', Report.test_start_time) ) result = [] for count, date in q: result.append({ 'count': count, 'date': date.strftime("%Y-%m-%d") }) return jsonify(result)
def api_private_countries(): with_counts = request.args.get("with_counts") country_list = [] if not with_counts: country_list = [{ 'alpha_2': c.alpha_2, 'name': c.name } for c in countries] else: # XXX we probably actually want to get this from redis or compute it # periodically since it take 60 seconds to run. q = current_app.db_session.query( func.count(Measurement.id), Report.probe_cc.label('probe_cc') ).join(Report, Report.report_no == Measurement.report_no) \ .group_by(Report.probe_cc) for count, probe_cc in q: if probe_cc.upper() == 'ZZ': continue try: c = countries.lookup(probe_cc) country_list.append({ 'count': count, 'alpha_2': c.alpha_2, 'name': c.name, }) except Exception as exc: current_app.logger.warning("Failed to lookup: %s" % probe_cc) return jsonify({ "countries": country_list }) # XXX Everything below here are ghetto hax to support legacy OONI Explorer
def api_private_blockpages(): probe_cc = request.args.get('probe_cc') if probe_cc is None: raise Exception('err') q = current_app.db_session.query( Report.report_id.label('report_id'), Report.probe_cc.label('probe_cc'), Report.probe_asn.label('probe_asn'), Report.test_start_time.label('test_start_time'), Input.input.label('input') ).join(Measurement, Measurement.report_no == Report.report_no) \ .join(Input, Measurement.input_no == Input.input_no) \ .filter(Measurement.confirmed == True) \ .filter(or_( Report.test_name == 'http_requests', Report.test_name == 'web_connectivity' )) \ .filter(Report.probe_cc == probe_cc) results = [] for row in q: results.append({ 'report_id': row.report_id, 'probe_cc': row.probe_cc, 'probe_asn': "AS{}".format(row.probe_asn), 'test_start_time': row.test_start_time, 'input': row.input }) return jsonify({ 'results': results })
def api_private_website_measurements(): input_ = request.args.get('input') if input_ is None: raise Exception('err') q = current_app.db_session.query( Report.report_id.label('report_id'), Report.probe_cc.label('probe_cc'), Report.probe_asn.label('probe_asn'), Report.test_start_time.label('test_start_time'), Input.input.label('input'), ).join(Measurement, Measurement.report_no == Report.report_no) \ .join(Input, Measurement.input_no == Input.input_no) \ .filter(Measurement.confirmed == True) \ .filter(or_( Report.test_name == 'http_requests', Report.test_name == 'web_connectivity' )) \ .filter(Input.input.contains(input_)) results = [] for row in q: results.append({ 'report_id': row.report_id, 'probe_cc': row.probe_cc, 'probe_asn': "AS{}".format(row.probe_asn), 'test_start_time': row.test_start_time, 'input': row.input }) return jsonify({ 'results': results })
def makeGAException(message, ecode, rcode): error = { 'message': message, 'error_code': ecode } GAException = json.jsonify(error) GAException.status_code = rcode return GAException
def bad_method(e): return jsonify(error=405, text=str(e)), 405
def get(last_name, first_name): """ Return an user key information based on his name """ user = UserRepository.get(last_name=last_name, first_name=first_name) return jsonify({'user': user.json})
def post(last_name, first_name, age): """ Create an user based on the sent information """ user = UserRepository.create( last_name=last_name, first_name=first_name, age=age ) return jsonify({'user': user.json})
def put(last_name, first_name, age): """ Update an user based on the sent information """ repository = UserRepository() user = repository.update( last_name=last_name, first_name=first_name, age=age ) return jsonify({'user': user.json})
def get(self, lid): league = League.query.filter_by(id=lid).first() return self.league_schema.jsonify(league)
def get(self, mid): membership = Membership.query.filter_by(id=mid).first() return self.membership_schema.jsonify(membership)
def get(self, uid): user = User.query.filter_by(id=uid).first() return self.user_schema.jsonify(user)
def get(self): users = User.query.all() users_raw = {'users': []} for user in users: users_raw['users'].append(self.user_schema.dump(user).data) return json.jsonify(users_raw)
def handle_invalid_usage(error): return json.jsonify({'error': str(error)}), error.status_code
def handle_not_allowed(error): return json.jsonify({'error': str(error)}), 405
def handle_not_found(error): return json.jsonify({'error': str(error)}), 404
def post(self): supposed_user = request.get_json(force=True) user = User.query.filter_by(username=supposed_user[ 'username']).first_or_404() if user and user.password == Crypt.hash_sha256(supposed_user['password']): token = jwt.encode({ 'exp': time.time() + 60 * 60 * 24 * 7, 'user_id': user.user_id, 'username': user.username, 'name': user.name }, config.SECRET_KEY, algorithm='HS256') return json.jsonify({'token': token}) raise InvalidUsage("Username and password does not match.")
def get(self, username): user = self.authenticate() if user: return json.jsonify(user.as_dict()) raise InvalidUsage()
def put(self, username): user = self.authenticate() if user: new_user = request.get_json(force=True) user.name = new_user['name'] user.username = new_user['username'] user.password = Crypt.hash_sha256(new_user['password']) db.session.commit() return json.jsonify(user.as_dict()) raise InvalidUsage()
def delete(self, username): user = self.authenticate() if user: db.session.delete(user) db.session.commit() return json.jsonify({'success': '{} was deleted'.format(user)}) raise InvalidUsage()
def get(self, username, category_id): user = self.authenticate() if user: if category_id: return json.jsonify( user.categories.filter_by(category_id=category_id) .first_or_404() .as_dict()) return json.jsonify({'categories': [category.as_dict() for category in user.categories]}) raise InvalidUsage()