Python flask.json 模块,jsonify() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.json.jsonify()

项目:flamyngo    作者:materialsvirtuallab    | 项目源码 | 文件源码
def get_data():
    cname = request.args.get("collection")
    settings = CSETTINGS[cname]
    search_string = request.args.get("search_string")
    xaxis = request.args.get("xaxis")
    yaxis = request.args.get("yaxis")

    xaxis = get_mapped_name(settings, xaxis)
    yaxis = get_mapped_name(settings, yaxis)

    projection = [xaxis, yaxis]

    if search_string.strip() != "":
        criteria = process_search_string(search_string, settings)
        data = []
        for r in DB[cname].find(criteria, projection=projection):
            x = _get_val(xaxis, r, None)
            y = _get_val(yaxis, r, None)
            if x and y:
                data.append([x, y])
    else:
        data = []
    return jsonify(jsanitize(data))
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def _serve_webui(self, file_name='index.html'):  # pylint: disable=redefined-builtin
        try:
            assert file_name
            web3 = self.flask_app.config.get('WEB3_ENDPOINT')
            if web3 and 'config.' in file_name and file_name.endswith('.json'):
                host = request.headers.get('Host')
                if any(h in web3 for h in ('localhost', '127.0.0.1')) and host:
                    _, _port = split_endpoint(web3)
                    _host, _ = split_endpoint(host)
                    web3 = 'http://{}:{}'.format(_host, _port)
                response = jsonify({'raiden': self._api_prefix, 'web3': web3})
            else:
                response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name)
        except (NotFound, AssertionError):
            response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html')
        return response
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def delete_executor(executor_id):
    """
    Delete an executor

    Example request::

        DEL /api/1/executors/6890192d8b6c40e5af16f13aa036c7dc
    """
    manager.remove_executor(executor_id)
    return json.jsonify({
        "msg": "Executor %s succesfully deleted" % executor_id
    })

########
# UTILS
#######
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def join(id):
    event = find_event(id)
    if event == None:
        return not_found()

    user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
    if user == None:
        return user_not_specified()

    join_event(user, event)

    return jsonify({
        'joined': True,
        'rsvps_disabled': False,
        'event_archived': False,
        'over_capacity': False,
        'past_deadline': False,
    })
项目:oidc-fed    作者:its-dirg    | 项目源码 | 文件源码
def handle_authn_response():
    # parse authn response
    authn_response = current_app.rp.client.parse_response(AuthorizationResponse,
                                              info=request.query_string.decode("utf-8"),
                                              sformat="urlencoded")

    auth_code = None
    if "code" in authn_response:
        auth_code = authn_response["code"]
        # make token request
        args = {
            "code": auth_code,
            "client_id": current_app.rp.client.client_id,
            "client_secret": current_app.rp.client.client_secret
        }

        token_response = current_app.rp.client.do_access_token_request(scope="openid", request_args=args)
        access_token = token_response["access_token"]
        id_token = token_response["id_token"].to_dict()
        # TODO do userinfo req
    else:
        id_token = authn_response["id_token"].to_dict()
        access_token = authn_response.get("access_token")

    return jsonify(dict(auth_code=auth_code, token=access_token, id_token=id_token))
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_counties_by_month():
    NOW = datetime.now()
    result = []
    r = current_app.db_session.query(
            Report.test_start_time,
            Report.probe_cc) \
        .order_by(Report.test_start_time)

    # XXX this can be done better in a SQL that is not sqlite
    monthly_buckets = {}
    for tst, country in r:
        if tst > NOW:
            # We ignore measurements from time travelers
            continue
        bkt = tst.strftime("%Y-%m-01")
        monthly_buckets[bkt] = monthly_buckets.get(bkt, [])
        if country not in monthly_buckets[bkt]:
            monthly_buckets[bkt].append(country)

    for bkt in monthly_buckets.keys():
        result.append({
            'date': bkt,
            'value': len(monthly_buckets[bkt])
        })
    return jsonify(result)
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_runs_by_month():
    NOW = datetime.now()
    result = []
    r = current_app.db_session.query(
            Report.test_start_time) \
        .order_by(Report.test_start_time)

    # XXX this can be done better in a SQL that is not sqlite
    monthly_buckets = {}
    for res in r:
        tst = res.test_start_time
        if tst > NOW:
            # We ignore measurements from time travelers
            continue
        bkt = tst.strftime("%Y-%m-01")
        monthly_buckets[bkt] = monthly_buckets.get(bkt, 0)
        monthly_buckets[bkt] += 1

    for bkt in monthly_buckets.keys():
        result.append({
            'date': bkt,
            'value': monthly_buckets[bkt]
        })

    return jsonify(result)
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_blockpage_detected():
    q = current_app.db_session.query(
        distinct(Report.probe_cc).label('probe_cc'),
    ).join(Measurement, Measurement.report_no == Report.report_no) \
     .filter(Measurement.confirmed == True) \
     .filter(or_(
        Report.test_name == 'http_requests',
        Report.test_name == 'web_connectivity'
      ))

    results = []
    for row in q:
        results.append({
            'probe_cc': row.probe_cc
        })

    return jsonify({
        'results': results
    })
项目:GenomicsSampleAPIs    作者:Intel-HLS    | 项目源码 | 文件源码
def variantset_search():
    if not request.json:
        return makeGAException(
            message='Bad Content Type, please send application/json',
            ecode=-1,
            rcode=415)
    datasetId = request.json.get('datasetId', "")
    pageSize = request.json.get('pageSize', None)
    pageToken = request.json.get('pageToken', None)

    try:
        garesp = tileSearch.searchVariantSets(
            datasetId=datasetId, pageSize=pageSize, pageToken=pageToken)

    except:
        return makeGAException(
            message='search command exited abnormally', ecode=500, rcode=500)

    return jsonify(garesp.gavsetresponse_info)
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def put(self, username, transaction_id):
        user = self.authenticate()
        if user:
            new_transaction = request.get_json(force=True)
            transaction = user.transactions.filter_by(
                transaction_id=transaction_id).first_or_404()
            transaction.category_id = new_transaction['category_id']
            transaction.person_id = new_transaction['person_id']
            transaction.transaction_date = new_transaction['transaction_date']
            transaction.value = new_transaction['value']
            transaction.notes = new_transaction['notes']
            transaction.type = new_transaction['type']
            transaction.done = new_transaction['done']
            db.session.commit()
            return json.jsonify(transaction.as_dict())
        raise InvalidUsage()
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def post(self, username, transaction_id):
        user = self.authenticate()
        if user:
            transaction = user.transactions.filter_by(
                transaction_id=transaction_id).first_or_404()
            supposed_item = request.get_json(force=True)
            item = TransactionItem()
            item.item_id = 1 if len(transaction.transaction_items.all(
            )) else transaction.transaction_items.all()[-1].item_id + 1
            item.user_id = user.user_id
            item.transaction_id = transaction.transaction_id
            item.person_id = supposed_item['person_id']
            item.item_date = supposed_item['item_date']
            item.value = supposed_item['value']
            item.notes = supposed_item['notes']
            item.type = supposed_item['type']
            item.done = supposed_item['done']
            db.session.add(item)
            db.session.commit()
            if item.item_id:
                return json.jsonify(item.as_dict())
        raise InvalidUsage()
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def put(self, username, transaction_id, item_id):
        user = self.authenticate()
        if user:
            transaction = user.transactions.filter_by(
                transaction_id=transaction_id).first_or_404()
            new_item = request.get_json(force=True)
            item = transaction.transaction_items.filter_by(
                item_id=item_id).first_or_404()
            item.person_id = new_item['person_id']
            item.item_date = new_item['item_date']
            item.value = new_item['value']
            item.notes = new_item['notes']
            item.type = new_item['type']
            item.done = new_item['done']
            db.session.commit()
            return json.jsonify(item.as_dict())
        raise InvalidUsage()
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def stack_analyses_debug(external_request_id):
    """Debug endpoint exposing operational data for particular stack analysis.

    This endpoint is not part of the public API.

    Note the existence of the data is not guaranteed,
    therefore the endpoint can return 404 even for valid request IDs.
    """

    results = retrieve_worker_results(rdb, external_request_id)
    if not results:
        return jsonify(error='No operational data for the request ID'), 404

    response = {'tasks': []}
    for result in results:
        op_data = result.to_dict()
        audit = op_data.get('task_result', {}).get('_audit', {})
        task_data = {'task_name': op_data.get('worker')}
        task_data['started_at'] = audit.get('started_at')
        task_data['ended_at'] = audit.get('ended_at')
        task_data['error'] = op_data.get('error')
        response['tasks'].append(task_data)
    return jsonify(response), 200
项目:hipfrog    作者:wardweistra    | 项目源码 | 文件源码
def hipfrog():
    requestdata = json.loads(request.get_data())
    callingMessage = requestdata['item']['message']['message'].lower().split()
    oauthId = requestdata['oauth_client_id']
    installation = messageFunctions.getInstallationFromOauthId(oauthId)

    if installation.glassfrogToken is None:
        message = strings.set_token_first
        message_dict = messageFunctions.createMessageDict(strings.error_color, message)
    elif len(callingMessage) == 1:
        message = strings.help_hipfrog
        message_dict = messageFunctions.createMessageDict(strings.succes_color, message)
    elif len(callingMessage) > 1:
        # /hipfrog something
        message = strings.missing_functionality.format(callingMessage[1])
        message_dict = messageFunctions.createMessageDict(strings.error_color, message)
    # TODO Generate message_dict and color here
    return json.jsonify(message_dict)
项目:flask-quickstart    作者:keathmilligan    | 项目源码 | 文件源码
def error_handler(error):
    """
    Standard Error Handler
    """
    if isinstance(error, HTTPException):
        return jsonify({
            'statusCode': error.code,
            'name': error.name,
            'description': error.description
        }), error.code
    else:
        return jsonify({
            'statusCode': 500,
            'name': 'Internal Server Error',
            'description': 'An unknown error has occurred'
        }), 500

# common errors - add others as needed
项目:sutd-timetable    作者:randName    | 项目源码 | 文件源码
def get_group_sections():
    q = request.query_string.decode()
    if not q:
        return json.jsonify({'status': 'error'})

    all_cn = []
    for cn in rd.smembers('group:%s' % q):
        try:
            all_cn.insert(0, int(cn))
        except ValueError:
            continue

    schedule = tuple(
        lesson.details
        for lesson in Lesson.query.filter(Lesson.class_no.in_(all_cn))
        .order_by(Lesson.start).all()
    )

    return json.jsonify({
        'status': 'ok',
        'events': schedule,
    })
项目:flamyngo    作者:materialsvirtuallab    | 项目源码 | 文件源码
def get_ids(collection_name):
    settings = CSETTINGS[collection_name]
    doc = DB[collection_name].distinct(settings["unique_key"])
    return jsonify(jsanitize(doc))
项目:flamyngo    作者:materialsvirtuallab    | 项目源码 | 文件源码
def get_doc_json(collection_name, uid):
    settings = CSETTINGS[collection_name]
    criteria = {
        settings["unique_key"]: process(uid, settings["unique_key_type"])}
    doc = DB[collection_name].find_one(criteria)
    return jsonify(jsanitize(doc))
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def task_in_json(taskid):
    if ':' not in taskid:
        return json.jsonify({'code': 400, 'error': 'bad project:task_id format'})
    project, taskid = taskid.split(':', 1)

    taskdb = app.config['taskdb']
    task = taskdb.get_task(project, taskid)

    if not task:
        return json.jsonify({'code': 404, 'error': 'not found'})
    task['status_string'] = app.config['taskdb'].status_to_string(task['status'])
    return json.jsonify(task)
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def handle_invalid_usage(error):
    """
    Handler for APIErrors thrown by API endpoints
    """
    log.info('%d %s', error.status_code, error.message)
    response = json.jsonify(error.to_dict())
    response.status_code = error.status_code
    return response
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def add_plan():
    """
    Add a plan.

    Example request::

        PUT /api/1/executors/3b373155577b4d1bbc62216ffea013a4
        Body:
            {
                "name": "Terminate instances in Playground",
                "attack": {
                    "args": {
                        "region": "eu-west-1",
                        "filters": {
                            "tag:Name": "playground-asg"
                        }
                    },
                    "ref": "terminate_ec2_instance:TerminateEC2Instance"
                },
                "planner": {
                    "ref": "simple_planner:SimplePlanner",
                    "args": {
                        "min_time" : "10:00",
                        "max_time" : "19:00",
                        "times": 4
                    }
                }
            }

    """
    assert validate_payload(request, plan_schema)
    req_json = request.get_json()

    name = req_json["name"]
    planner_config = req_json["planner"]
    attack_config = req_json["attack"]

    manager.execute_plan(name, planner_config, attack_config)

    return json.jsonify({"msg": "ok"})
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def delete_plan(plan_id):
    """
    Delete a plan

    Example request::

        DEL /api/1/plans/6890192d8b6c40e5af16f13aa036c7dc


    """
    manager.delete_plan(plan_id)
    return json.jsonify({
        "msg": "Plan %s successfully deleted" % plan_id
    })
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def render_event(event):
    return jsonify(filter_participants(event))
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def render_events(events):
    return jsonify([filter_participants(event) for event in events])
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def not_found():
    response = jsonify({"message":"not_found"})
    response.status_code = 404
    return response
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def user_not_specified():
    response = jsonify({"message": "user_not_specified"})
    response.status_code = 422
    return response
项目:crm    作者:Incubaid    | 项目源码 | 文件源码
def custom_401(error):
    # API CALL
    if 'Content-Type' in request.headers and request.headers['Content-Type'].lower() == 'application/json':
        return jsonify(errors=['Not authorized']), 401
    return render_template('home/401.html', message=error.description['message']), 401
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def get_version():
    return jsonify({
        "version": __version__
    })
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_asn_by_month():
    NOW = datetime.now()
    result = []
    r = current_app.db_session.query(
            Report.test_start_time,
            Report.probe_asn) \
        .order_by(Report.test_start_time)

    # XXX this can be done better in a SQL that is not sqlite
    monthly_buckets = {}
    for tst, asn in r:
        asn = 'AS%d' % asn
        if tst > NOW:
            # We ignore measurements from time travelers
            continue
        bkt = tst.strftime("%Y-%m-01")
        monthly_buckets[bkt] = monthly_buckets.get(bkt, [])
        if asn not in monthly_buckets[bkt]:
            monthly_buckets[bkt].append(asn)

    for bkt in monthly_buckets.keys():
        result.append({
            'date': bkt,
            'value': len(monthly_buckets[bkt])
        })
    return jsonify(result)
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_reports_per_day():
    q = current_app.db_session.query(
        func.count(func.date_trunc('day', Report.test_start_time)),
        func.date_trunc('day', Report.test_start_time)
    ).group_by(func.date_trunc('day', Report.test_start_time)).order_by(
        func.date_trunc('day', Report.test_start_time)
    )
    result = []
    for count, date in q:
        result.append({
            'count': count,
            'date': date.strftime("%Y-%m-%d")
        })
    return jsonify(result)
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_countries():
    with_counts = request.args.get("with_counts")
    country_list = []
    if not with_counts:
        country_list = [{ 'alpha_2': c.alpha_2, 'name': c.name } for c in countries]
    else:
        # XXX we probably actually want to get this from redis or compute it
        # periodically since it take 60 seconds to run.
        q = current_app.db_session.query(
                func.count(Measurement.id),
                Report.probe_cc.label('probe_cc')
        ).join(Report, Report.report_no == Measurement.report_no) \
         .group_by(Report.probe_cc)
        for count, probe_cc in q:
            if probe_cc.upper() == 'ZZ':
                continue

            try:
                c = countries.lookup(probe_cc)
                country_list.append({
                    'count': count,
                    'alpha_2': c.alpha_2,
                    'name': c.name,
                })
            except Exception as exc:
                current_app.logger.warning("Failed to lookup: %s" % probe_cc)

    return jsonify({
        "countries": country_list
    })

# XXX Everything below here are ghetto hax to support legacy OONI Explorer
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_blockpages():
    probe_cc = request.args.get('probe_cc')
    if probe_cc is None:
        raise Exception('err')

    q = current_app.db_session.query(
        Report.report_id.label('report_id'),
        Report.probe_cc.label('probe_cc'),
        Report.probe_asn.label('probe_asn'),
        Report.test_start_time.label('test_start_time'),
        Input.input.label('input')
    ).join(Measurement, Measurement.report_no == Report.report_no) \
     .join(Input, Measurement.input_no == Input.input_no) \
     .filter(Measurement.confirmed == True) \
     .filter(or_(
        Report.test_name == 'http_requests',
        Report.test_name == 'web_connectivity'
      )) \
     .filter(Report.probe_cc == probe_cc)

    results = []
    for row in q:
        results.append({
            'report_id': row.report_id,
            'probe_cc': row.probe_cc,
            'probe_asn': "AS{}".format(row.probe_asn),
            'test_start_time': row.test_start_time,
            'input': row.input
        })

    return jsonify({
        'results': results
    })
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_website_measurements():
    input_ = request.args.get('input')
    if input_ is None:
        raise Exception('err')

    q = current_app.db_session.query(
        Report.report_id.label('report_id'),
        Report.probe_cc.label('probe_cc'),
        Report.probe_asn.label('probe_asn'),
        Report.test_start_time.label('test_start_time'),
        Input.input.label('input'),
    ).join(Measurement, Measurement.report_no == Report.report_no) \
     .join(Input, Measurement.input_no == Input.input_no) \
     .filter(Measurement.confirmed == True) \
     .filter(or_(
        Report.test_name == 'http_requests',
        Report.test_name == 'web_connectivity'
      )) \
     .filter(Input.input.contains(input_))

    results = []
    for row in q:
        results.append({
            'report_id': row.report_id,
            'probe_cc': row.probe_cc,
            'probe_asn': "AS{}".format(row.probe_asn),
            'test_start_time': row.test_start_time,
            'input': row.input
        })

    return jsonify({
        'results': results
    })
项目:GenomicsSampleAPIs    作者:Intel-HLS    | 项目源码 | 文件源码
def makeGAException(message, ecode, rcode):
    error = {
        'message': message,
        'error_code': ecode
    }
    GAException = json.jsonify(error)
    GAException.status_code = rcode
    return GAException
项目:GenomicsSampleAPIs    作者:Intel-HLS    | 项目源码 | 文件源码
def bad_method(e):
    return jsonify(error=405, text=str(e)), 405
项目:flask-api-starter-kit    作者:antkahn    | 项目源码 | 文件源码
def get(last_name, first_name):
        """ Return an user key information based on his name """
        user = UserRepository.get(last_name=last_name, first_name=first_name)
        return jsonify({'user': user.json})
项目:flask-api-starter-kit    作者:antkahn    | 项目源码 | 文件源码
def post(last_name, first_name, age):
        """ Create an user based on the sent information """
        user = UserRepository.create(
            last_name=last_name,
            first_name=first_name,
            age=age
        )
        return jsonify({'user': user.json})
项目:flask-api-starter-kit    作者:antkahn    | 项目源码 | 文件源码
def put(last_name, first_name, age):
        """ Update an user based on the sent information """
        repository = UserRepository()
        user = repository.update(
            last_name=last_name,
            first_name=first_name,
            age=age
        )
        return jsonify({'user': user.json})
项目:MTGLeague    作者:JackRamey    | 项目源码 | 文件源码
def get(self, lid):
        league = League.query.filter_by(id=lid).first()
        return self.league_schema.jsonify(league)
项目:MTGLeague    作者:JackRamey    | 项目源码 | 文件源码
def get(self, mid):
        membership = Membership.query.filter_by(id=mid).first()
        return self.membership_schema.jsonify(membership)
项目:MTGLeague    作者:JackRamey    | 项目源码 | 文件源码
def get(self, uid):
        user = User.query.filter_by(id=uid).first()
        return self.user_schema.jsonify(user)
项目:MTGLeague    作者:JackRamey    | 项目源码 | 文件源码
def get(self):
        users = User.query.all()
        users_raw = {'users': []}
        for user in users:
            users_raw['users'].append(self.user_schema.dump(user).data)
        return json.jsonify(users_raw)
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def handle_invalid_usage(error):
    return json.jsonify({'error': str(error)}), error.status_code
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def handle_not_allowed(error):
    return json.jsonify({'error': str(error)}), 405
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def handle_not_found(error):
    return json.jsonify({'error': str(error)}), 404
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def post(self):
        supposed_user = request.get_json(force=True)
        user = User.query.filter_by(username=supposed_user[
                                    'username']).first_or_404()
        if user and user.password == Crypt.hash_sha256(supposed_user['password']):
            token = jwt.encode({
                'exp': time.time() + 60 * 60 * 24 * 7,
                'user_id': user.user_id,
                'username': user.username,
                'name': user.name
            }, config.SECRET_KEY, algorithm='HS256')
            return json.jsonify({'token': token})
        raise InvalidUsage("Username and password does not match.")
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def get(self, username):
        user = self.authenticate()
        if user:
            return json.jsonify(user.as_dict())
        raise InvalidUsage()
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def put(self, username):
        user = self.authenticate()
        if user:
            new_user = request.get_json(force=True)
            user.name = new_user['name']
            user.username = new_user['username']
            user.password = Crypt.hash_sha256(new_user['password'])
            db.session.commit()
            return json.jsonify(user.as_dict())
        raise InvalidUsage()
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def delete(self, username):
        user = self.authenticate()
        if user:
            db.session.delete(user)
            db.session.commit()
            return json.jsonify({'success': '{} was deleted'.format(user)})
        raise InvalidUsage()
项目:catherine    作者:felipemfp    | 项目源码 | 文件源码
def get(self, username, category_id):
        user = self.authenticate()
        if user:
            if category_id:
                return json.jsonify(
                    user.categories.filter_by(category_id=category_id)
                    .first_or_404()
                    .as_dict())
            return json.jsonify({'categories': [category.as_dict() for category
                                                in user.categories]})
        raise InvalidUsage()