Python flask.json 模块,loads() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用flask.json.loads()

项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_user_can_follow_another_user(client, users_repo):
    eve = User(name='Eve')
    adam = User(name='Adam')
    users_repo.store(eve)
    users_repo.store(adam)

    response = client.post('/users/{}/follow'.format(eve.pk), data=json.dumps({
        'pk': adam.pk
    }), content_type='application/json')
    assert response.status_code == 200

    response = client.get('/users/{}'.format(eve.pk))
    assert response.status_code == 200

    followed_users = json.loads(response.data)['followed_users']
    followed_users_names = [u['name'] for u in followed_users]

    assert 'Adam' in followed_users_names
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_user_can_like_movie(client, movie, user, users_repo, movies_repo):
    url = '/users/{user_pk}/liked_movies'.format(
        user_pk=user.pk,
    )
    assert movie.likes == 0
    response = client.post(url, data=json.dumps({
        'pk': movie.pk,
    }), content_type='application/json')
    assert response.status_code == 200

    response = client.get('/users/{}'.format(user.pk))
    liked_movies = json.loads(response.data)['liked_movies']
    movie_titles = [m['title'] for m in liked_movies]

    assert movie.title in movie_titles
    assert movie.likes == 1
项目:ssshare    作者:gdassori    | 项目源码 | 文件源码
def validate(validation_class, silent=not settings.DEBUG):
    def decorator(fun):
        @functools.wraps(fun)
        def wrapper(*a, **kw):
            if flask.request.method == 'GET':
                p = {k: v for k, v in flask.request.values and flask.request.values.items() or {}}
            else:
                try:
                    p = flask.request.data and json.loads(flask.request.data.decode())
                except Exception:
                    raise exceptions.WrongParametersException
            if silent:
                try:
                    validation_class(p)
                except PyCombValidationError:
                    raise exceptions.WrongParametersException
            else:
                validation_class(p)
            return fun(*a, params=p, **kw)
        return wrapper
    return decorator
项目:doppler    作者:TakumiHQ    | 项目源码 | 文件源码
def get_job(request_id):
    job = _get_job(request_id)
    status = job.status
    run_at = None
    scheduled_at = None
    last_retry = None
    retries_left = 0

    if status != "done":
        args = json.loads(job.args)
        taskid, fname, args, kwargs, run_at = args
        scheduled_at = kwargs['scheduled_at']
        last_retry = kwargs.get('last_retry')
        retries_left = kwargs['_attempts']

    return jsonify({
        'request_id': request_id,
        'status': status,
        'run_at': run_at,
        'scheduled_at': scheduled_at,
        'last_retry': last_retry,
        'retries_left': retries_left,
    })
项目:cobalt    作者:PressLabs    | 项目源码 | 文件源码
def test_volume_list_get(self, etcd_client, volume_raw_ok_ready,
                             flask_app):
        generators = [volume_raw_ok_ready, volume_raw_ok_ready]

        volume_writes = []
        for generator in generators:
            volume_writes.append(
                etcd_client.write(VolumeManager.KEY, generator, append=True))
        volume_writes = flask_app.volume_manager._load_from_etcd(volume_writes)

        expected, errors = VolumeSchema().dump(volume_writes, many=True)
        assert errors == {}

        with flask_app.test_client() as c:
            response = c.get('/volumes')
            assert response.status_code == 200

            actual = json.loads(response.data.decode())

        assert len(actual) == len(expected)
        assert actual == expected
项目:cobalt    作者:PressLabs    | 项目源码 | 文件源码
def test_volume_put_empty_body(self, etcd_client, volume_raw_ok_ready,
                                   flask_app):
        volume = etcd_client.write(VolumeManager.KEY, volume_raw_ok_ready, append=True)
        id = flask_app.volume_manager.get_id_from_key(volume.key)

        expected_result = {
            'message': {'constraints': ['Missing data for required field.'],
                        'reserved_size': ['Missing data for required field.']}}

        expected_code = 400

        with flask_app.test_client() as c:
            response = c.put('/volumes/{}'.format(id), data='{}',
                             content_type='application/json')

            result = json.loads(response.data.decode())

            assert response.status_code == expected_code
            assert expected_result == result
项目:cobalt    作者:PressLabs    | 项目源码 | 文件源码
def _load_from_etcd(self, data):
        """Utility method to expand result objects.

        Args:
            data ([etcd.result] | etcd.result): The iterable or a single result object not expanded

        Returns:
             etcd.result: Return type matches given input (if it is a list->list), and all result objects are expanded
        """
        # we trust that etcd data is valid
        try:
            iter(data)
        except TypeError:
            data.value = json.loads(data.value)
            return data
        else:
            for entry in data:
                entry.value = json.loads(entry.value)

        return data
项目:flyby    作者:Skyscanner    | 项目源码 | 文件源码
def test_server_register_backend(dynamodb):
    app.test_client().post(
        '/service',
        data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
    )
    app.test_client().post(
        '/target',
        data=json.dumps({'target_group_name': 'foo-blue', 'service_name': 'foo', 'weight': 80})
    )
    response = app.test_client().post(
        '/backend',
        data=json.dumps(
            {
                'host': '10.0.0.1:80',
                'service_name': 'foo',
                'target_group_name': 'foo-blue',
            }
        )
    )
    assert response.status_code == 200
    assert json.loads(response.data)['host'] == '10.0.0.1:80'
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def post(self):
        if current_user() is None:
            redirect(url_for('authorized'))
            user = current_user()
        if not user.faculty:
            return abort(403)
        key = request.form.get('key', None)
        value = request.form.get('value', None)
        try:
            try:
                config.set(key, config.from_frontend_value(key, json.loads(value)))
                return jsonify({'status': 'OK'})
            except ValueError:
                return abort(404)
        except:
            return abort(400)
项目:flask_ishuhui    作者:lufficc    | 项目源码 | 文件源码
def chapter(comic_id, chapter_id):
    comic = data.get_comic(comic_id)
    chapter = data.get_chapter(chapter_id)
    next_chapter = data.get_next_chapter(comic_id, chapter.chapter_number)
    prev_chapter = data.get_prev_chapter(comic_id, chapter.chapter_number)
    url = 'http://www.ishuhui.net/ComicBooks/ReadComicBooksToIsoV1/' + str(
        chapter_id) + '.html'
    if chapter.comic_id != comic_id:
        abort(404)
    if chapter.images:
        images = json.loads(chapter.images)
    else:
        images = get_images_from_url(url)
        chapter.images = json.dumps(images)
        db.session.commit()
    return render_template(
        'images.html',
        comic=comic,
        chapter=chapter,
        next_chapter=next_chapter,
        prev_chapter=prev_chapter,
        images=images,
        url=url)
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def test_get_jobs_running(self):
        e_job_uuid = []
        jobs = []
        print self.manager.available_job_plugins()
        for i in range(4):
            job = self.manager.create_job(self.test_jobs_module.constants.WAIT_10_SECONDS_JOB_NAME, {}, True)
            jobs.append(job)
            e_job_uuid.append(job.uuid)
        print jobs[0].status()
        r = self.client.get('/jobs/running')
        self.assertOk(r)
        r_jobs = json.loads(r.get_data())["jobs"]
        self.assertEquals(len(r_jobs), 4)
        r_job_uuid = []
        for job in r_jobs:
            r_job_uuid.append(job["uuid"])
        self.assertEquals(sorted(e_job_uuid), sorted(r_job_uuid))
        self._terminate_all_jobs()
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def recommendations(ALERTID=None):
    headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
    alertURL = vropsURL+"api/alerts/"+ALERTID
    auth = (vropsUser, vropsPass)
    response = callapi(alertURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY)
# Fetch the alert to grab alert def ID
    alertInfo = json.loads(response)
    alertDescURL = vropsURL+"api/alertdefinitions/"+alertInfo['alertDefinitionId']
    response = callapi(alertDescURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY)
# Fetch recommendations from alert def
    recommendations = json.loads(response)
    if recommendations['states'][0]['recommendationPriorityMap']:
        for recommendation in recommendations['states'][0]['recommendationPriorityMap']:
            if recommendations['states'][0]['recommendationPriorityMap'][recommendation] == 1:
                alertDescURL = vropsURL+"api/recommendations/"+recommendation
                response = callapi(alertDescURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY)
                recText = json.loads(response)
                return recText['description']
    else:
        return
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def fetchResourceProperties(resourceId=None):
    headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
    alertURL = vropsURL+"api/resources/"+resourceId+"/properties"
    auth = (vropsUser, vropsPass)
    response = callapi(alertURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY)
    rawProps = json.loads(response)

    props = {}
    for prop in rawProps['property']:
        props[prop["name"]] = prop["value"]
    return props

# Route without <ALERTID> are for LI, with are for vROps
# Adding PUT and POST for vROps so REST Notification test works
# vROps (as of 6.6.1) will attempt both methods and fail if both
# do not respond
# TODO : Add this capability to all shims
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_empty_input(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.EMPTY),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)

        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for empty input')

        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for empty input')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_bad_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_BAD_ID_BAD_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad reply_message_id and bad text length')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad reply_message_id and bad text length')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_ok_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_OK_ID_BAD_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad text length')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad text length')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_bad_id_ok_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_BAD_ID_OK_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad reply_message_id')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad reply_message_id')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_malformed_Message(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_MALFORMED_NO_MESSAGE),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'a malformed Message')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'a malformed Message')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_update_id_already_used(self, mocked_send_to_chat, mocked_celery_chain):
        # we don't need to test celery tasks in the view
        # that's objective for a separate test suite
        mocked_send_to_chat.return_value = None
        mocked_celery_chain.return_value = None

        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                     data=json.dumps(TelegramUpdates.TEXT_SAME_UPDATE_ID),
                                     follow_redirects=True,
                                     headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'an ID already used')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'an ID already used')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_valid_input(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_OK_ID_OK_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for a valid input')
        self.assertNotEqual({}, response.data,
                            'Failed to return a non-empty JSON for a valid input')
        self.assertEqual(TelegramUpdates.TEXT_OK_ID_OK_TEXT,
                         json.loads(response.data),
                         'Failed to return an Update itself for a valid input Update')
        # watch out! parse_update eliminates Updates with message_id already processed
        # so we have to use not parsed Update here
        mocked_update = {'chat_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['chat']['id'],
                         'reply_to_message_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['message_id'],
                         'text': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['text']}
        mocked_celery_chain.assert_called_with(mocked_update)

        with self.assertRaises(AssertionError) as send_err:
            mocked_send_to_chat.assert_called()

        self.assertIn("Expected 'send_message_to_chat' to have been called",
                      str(send_err.exception))
        self.assertEqual(mocked_send_to_chat.call_args_list, [])
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def test_members_get_id(self):
        with self.app.app_context():
            # create collection, members
            c_obj = self.mock.collection(description={'something':'abcdefghi123ö'})
            m_objs = [self.mock.member() for i in range(5)]
            # add collection, members
            self.app.db.set_collection(c_obj)
            for m_obj in m_objs:
                self.app.db.set_member(c_obj.id, m_obj)
            # GET members
            responses = [{'out': self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members/"+urllib.parse.quote_plus(m_obj.id)), 'in':m_obj} for m_obj in m_objs]
            # assert 200 OK
            for r in responses:
                self.assertEqual(r['out'].status_code, 200)
                # compare members
                self.assertDictEqual(json.loads(r['out'].data).dict(), r['in'].dict())
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def test_members_get(self):
        with self.app.app_context():
            # create collection, members
            c_obj = self.mock.collection(description={'something':'abcdefghi123ö'})
            m_objs = [self.mock.member() for i in range(5)]
            # add collection, members
            self.app.db.set_collection(c_obj)
            # for m_obj in m_objs:
            self.app.db.set_member(c_obj.id, m_objs)
            # pool = ThreadPool(50)
            # pool.map(lambda m_obj: self.app.db.set_member(c_obj.id, m_obj), m_objs)
            # GET members
            response = self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members")
            # assert 200 OK
            self.assertEqual(response.status_code, 200)
            sortedResponse = [r.dict() for r in sorted(json.loads(response.data)['contents'], key=lambda x: x.id)]
            sortedMocks = [m.dict() for m in sorted(m_objs, key=lambda x: x.id)]
            for i in range(len(sortedMocks)):
                self.assertDictEqual(sortedResponse[i], sortedMocks[i])
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def post(self, id=None):
        try:
            if id:
                raise NotFoundError()  # 404
            if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user()).x:
                raise UnauthorizedError()
            objs = json.loads(request.data)
            if not isinstance(objs, list):
                raise ParseError()

            pids = app.db.get_service().providesCollectionPids
            if pids:
                    objs = [CollectionObject(**obj.update({'id': app.mint.get_id(CollectionObject)})) if isinstance(obj, dict) else obj for obj in objs]
            if app.db.ask_collection([obj.id for obj in objs]):
                raise ConflictError()
            app.db.set_collection(objs)
            return jsonify(objs), 201
        except (NotFoundError, DBError, UnauthorizedError, ConflictError):
            raise
        except:
            raise ParseError()  # 400
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def post(self, id):
        try:
            if not id:
                raise NotFoundError
            if 'findMatch' not in app.service.supportedCollectionOperations:
                raise NotFoundError()
            if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user(),id).r:
                raise UnauthorizedError
            posted = json.loads(request.data)
            if isinstance(posted, Model):
                posted = posted.dict()
            if isinstance(posted.get('mappings'), Model):
                posted['mappings'] = posted.get('mappings').dict()
            members = [m for m in app.db.get_member(id) if dict_subset(posted, m.dict())]
            return jsonify(MemberResultSet(members)), 200
        except (NotFoundError, DBError, UnauthorizedError):
            raise
        except:
            raise ParseError()  # 400
项目:featkeeper    作者:ivansabik    | 项目源码 | 文件源码
def test_api_can_read_feature_request(self):
        FeatkeeperTestUtils.destroy_test_db()
        FeatkeeperTestUtils.populate_test_feature_requests()
        FeatkeeperTestUtils.populate_test_users()
        expected = {
            '_id': '56d3d524402e5f1cfc273340',
            'title': 'Support custom themes',
            'description': 'Client wants to be able to choose different colors, fonts, and layouts for each module',
            'client_name': 'Mandel Jamesdottir',
            'client_priority': 1,
            'target_date': '2016-08-21',
            'ticket_url': 'http://localhost:5000/8VZuWu',
            'product_area': 'Policies',
            'agent_name': 'Eleuthere',
            'created_at': '2016-02-28 23:35:19',
            'is_open': 1
        }
        response = self.app.get(API_ROOT_URL + '/feature-request/56d3d524402e5f1cfc273340')
        response_test = json.loads(response.data)
        self.assertEqual(expected, response_test)

    # Test for PUT /feature-request, should create a new feature request and
    # return success message
项目:cnsmo    作者:dana-i2cat    | 项目源码 | 文件源码
def add_rule():
    if not app.config["service_built"]:
        return "Service is not yet built", 409

    rule = json.loads(request.data)
    if not is_valid(rule):
        return "Invalid rule. Expected format is {}".format(RULE_FORMAT), 400

    try:
        command = "docker run -t --rm --net=host --privileged fw-docker"
        if is_valid_policy(rule):
            arguments = " -p {policy}".format(**rule)
        else:
            arguments = " add {direction} {protocol} {dst_port} {dst_src} {ip_range} {action}".format(**rule)

        log.debug("running docker add...")
        output = subprocess.check_call(shlex.split(command + arguments))
        log.debug("docker run. output: " + str(output))
        return "", 204
    except Exception as e:
        return str(e), 500
项目:cnsmo    作者:dana-i2cat    | 项目源码 | 文件源码
def delete_rule():
    if not app.config["service_built"]:
        return "Service is not yet built", 409

    rule = json.loads(request.data)
    if not is_valid(rule):
        return "Invalid rule. Expected format {}".format(RULE_FORMAT), 400

    if is_valid_policy(rule):
        return "Unsupported operation. Cannot delete policy", 409

    try:
        command = "docker run -t --rm --net=host --privileged fw-docker"
        arguments = " del {direction} {protocol} {dst_port} {dst_src} {ip_range} {action}".format(**rule)

        log.debug("running docker DEL...")
        output = subprocess.check_call(shlex.split(command + arguments))
        log.debug("docker run. output: " + str(output))
        return "", 204

    except Exception as e:
        return str(e), 500
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_jwt_refresh_required_with_cookies(app, options):
    test_client = app.test_client()
    auth_url, cookie_name, protected_url = options

    # Test without cookies
    response = test_client.get(protected_url)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Missing cookie "{}"'.format(cookie_name)}

    # Test after receiving cookies
    test_client.get(auth_url)
    response = test_client.get(protected_url)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 200
    assert json_data == {'foo': 'bar'}

    # Test after issuing a 'logout' to delete the cookies
    test_client.get('/delete_tokens')
    response = test_client.get(protected_url)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Missing cookie "{}"'.format(cookie_name)}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_default_access_csrf_protection(app, options):
    test_client = app.test_client()
    auth_url, csrf_cookie_name, post_url = options

    # Get the jwt cookies and csrf double submit tokens
    response = test_client.get(auth_url)
    csrf_cookie = _get_cookie_from_response(response, csrf_cookie_name)
    csrf_token = csrf_cookie[csrf_cookie_name]

    # Test you cannot post without the additional csrf protection
    response = test_client.post(post_url)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Missing CSRF token in headers'}

    # Test that you can post with the csrf double submit value
    csrf_headers = {'X-CSRF-TOKEN': csrf_token}
    response = test_client.post(post_url, headers=csrf_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 200
    assert json_data == {'foo': 'bar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_csrf_with_custom_header_names(app, options):
    app.config['JWT_ACCESS_CSRF_HEADER_NAME'] = 'FOO'
    app.config['JWT_REFRESH_CSRF_HEADER_NAME'] = 'FOO'
    test_client = app.test_client()
    auth_url, csrf_cookie_name, post_url = options

    # Get the jwt cookies and csrf double submit tokens
    response = test_client.get(auth_url)
    csrf_cookie = _get_cookie_from_response(response, csrf_cookie_name)
    csrf_token = csrf_cookie[csrf_cookie_name]

    # Test that you can post with the csrf double submit value
    csrf_headers = {'FOO': csrf_token}
    response = test_client.post(post_url, headers=csrf_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 200
    assert json_data == {'foo': 'bar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_get_jwt_identity_in_verification_method(app, url):
    jwt = get_jwt_manager(app)

    @jwt.claims_verification_loader
    def user_load_callback(user_claims):
        # Make sure that we can get the jwt identity in here if we need it.
        user = get_jwt_identity()
        return user == 'username'

    test_client = app.test_client()
    with app.test_request_context():
        access_token = create_access_token('username', fresh=True)

    response = test_client.get(url, headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'foo': 'bar'}
    assert response.status_code == 200
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_non_blacklisted_access_token(app, blacklist_type):
    jwt = get_jwt_manager(app)
    app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type

    @jwt.token_in_blacklist_loader
    def check_blacklisted(decrypted_token):
        return False

    with app.test_request_context():
        access_token = create_access_token('username')

    test_client = app.test_client()
    response = test_client.get('/protected', headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'foo': 'bar'}
    assert response.status_code == 200
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_blacklisted_access_token(app, blacklist_type):
    jwt = get_jwt_manager(app)
    app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type

    @jwt.token_in_blacklist_loader
    def check_blacklisted(decrypted_token):
        return True

    with app.test_request_context():
        access_token = create_access_token('username')

    test_client = app.test_client()
    response = test_client.get('/protected', headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'msg': 'Token has been revoked'}
    assert response.status_code == 401
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_non_blacklisted_refresh_token(app, blacklist_type):
    jwt = get_jwt_manager(app)
    app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type

    @jwt.token_in_blacklist_loader
    def check_blacklisted(decrypted_token):
        return False

    with app.test_request_context():
        refresh_token = create_refresh_token('username')

    test_client = app.test_client()
    response = test_client.get('/refresh_protected', headers=make_headers(refresh_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'foo': 'bar'}
    assert response.status_code == 200
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_blacklisted_refresh_token(app, blacklist_type):
    jwt = get_jwt_manager(app)
    app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type

    @jwt.token_in_blacklist_loader
    def check_blacklisted(decrypted_token):
        return True

    with app.test_request_context():
        refresh_token = create_refresh_token('username')

    test_client = app.test_client()
    response = test_client.get('/refresh_protected', headers=make_headers(refresh_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'msg': 'Token has been revoked'}
    assert response.status_code == 401
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_custom_blacklisted_message(app):
    jwt = get_jwt_manager(app)

    @jwt.token_in_blacklist_loader
    def check_blacklisted(decrypted_token):
        return True

    @jwt.revoked_token_loader
    def custom_error():
        return jsonify(baz='foo'), 404

    with app.test_request_context():
        access_token = create_access_token('username')

    test_client = app.test_client()
    response = test_client.get('/protected', headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'baz': 'foo'}
    assert response.status_code == 404
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_expired_token(app):
    url = '/protected'
    jwtM = get_jwt_manager(app)
    test_client = app.test_client()
    with app.test_request_context():
        token = create_access_token('username', expires_delta=timedelta(minutes=-1))

    # Test default response
    response = test_client.get(url, headers=make_headers(token))
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Token has expired'}

    # Test custom response
    @jwtM.expired_token_loader
    def custom_response():
        return jsonify(msg='foobar'), 201

    response = test_client.get(url, headers=make_headers(token))
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 201
    assert json_data == {'msg': 'foobar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_no_token(app):
    url = '/protected'
    jwtM = get_jwt_manager(app)
    test_client = app.test_client()

    # Test default response
    response = test_client.get(url, headers=None)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Missing Authorization Header'}

    # Test custom response
    @jwtM.unauthorized_loader
    def custom_response(err_str):
        return jsonify(msg='foobar'), 201

    response = test_client.get(url, headers=None)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 201
    assert json_data == {'msg': 'foobar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_custom_user_loader_errors(app, url):
    jwt = get_jwt_manager(app)

    @jwt.user_loader_callback_loader
    def user_load_callback(identity):
        return None

    @jwt.user_loader_error_loader
    def user_loader_error(identity):
        return jsonify(foo='bar'), 201

    test_client = app.test_client()
    with app.test_request_context():
        access_token = create_access_token('username')

    response = test_client.get(url, headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 201
    assert json_data == {'foo': "bar"}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_asymmetric_cropto(app):
    test_client = app.test_client()

    with app.test_request_context():
        hs256_token = create_access_token('username')
        app.config['JWT_ALGORITHM'] = 'RS256'
        rs256_token = create_access_token('username')

    # Insure the symmetric token does not work now
    access_headers = {'Authorization': 'Bearer {}'.format(hs256_token)}
    response = test_client.get('/protected', headers=access_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 422
    assert json_data == {'msg': 'The specified alg value is not allowed'}

    # Insure the asymmetric token does work
    access_headers = {'Authorization': 'Bearer {}'.format(rs256_token)}
    response = test_client.get('/protected', headers=access_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 200
    assert json_data == {'foo': 'bar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_custom_header_name(app):
    app.config['JWT_HEADER_NAME'] = 'Foo'
    test_client = app.test_client()

    with app.test_request_context():
        access_token = create_access_token('username')

    # Insure 'default' headers no longer work
    access_headers = {'Authorization': 'Bearer {}'.format(access_token)}
    response = test_client.get('/protected', headers=access_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': 'Missing Foo Header'}

    # Insure new headers do work
    access_headers = {'Foo': 'Bearer {}'.format(access_token)}
    response = test_client.get('/protected', headers=access_headers)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 200
    assert json_data == {'foo': 'bar'}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_missing_headers(app):
    test_client = app.test_client()
    jwtM = get_jwt_manager(app)

    # Insure 'default' no headers response
    response = test_client.get('/protected', headers=None)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 401
    assert json_data == {'msg': "Missing Authorization Header"}

    # Test custom no headers response
    @jwtM.unauthorized_loader
    def custom_response(err_str):
        return jsonify(foo='bar'), 201

    response = test_client.get('/protected', headers=None)
    json_data = json.loads(response.get_data(as_text=True))
    assert response.status_code == 201
    assert json_data == {'foo': "bar"}
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_user_claims_with_different_name(app):
    jwt = get_jwt_manager(app)
    app.config['JWT_USER_CLAIMS'] = 'banana'

    @jwt.user_claims_loader
    def add_claims(identity):
        return {'foo': 'bar'}

    with app.test_request_context():
        access_token = create_access_token('username')

        # Make sure the name is actually different in the token
        decoded_token = decode_token(access_token)
        assert decoded_token['banana'] == {'foo': 'bar'}

    # Make sure the correct data is returned to us from the full call
    test_client = app.test_client()
    response = test_client.get('/protected', headers=make_headers(access_token))
    json_data = json.loads(response.get_data(as_text=True))
    assert json_data == {'foo': 'bar'}
    assert response.status_code == 200
项目:ec_forum    作者:imxana    | 项目源码 | 文件源码
def test_user_watch(self):
        '''watch and unwatch, suc and fail'''
        rv = self.user_watch(self.u_id, '222222', self.ua_id, '1')
        assert '1' == json.loads(rv.data).get('code','')
        rv = self.sign_in(self.name,'222222')
        assert self.ua_id in json.loads(rv.data).get('u_watchusers','')
        rv = self.sign_in(self.name2,'222222')
        assert self.u_id in json.loads(rv.data).get('u_watchusers','')
        rv = self.user_watch(self.u_id, '222222', self.ua_id, '1')
        assert 'user already watched' in json.loads(rv.data).get('codeState','')

        rv = self.user_watch(self.u_id, '222222', self.ua_id, '0')
        assert '1' == json.loads(rv.data).get('code','')
        rv = self.sign_in(self.name,'222222')
        assert self.ua_id not in json.loads(rv.data).get('u_watchusers','')
        rv = self.sign_in(self.name2,'222222')
        assert self.u_id not in json.loads(rv.data).get('u_watchusers','')
        rv = self.user_watch(self.u_id, '222222', self.ua_id, '0')
        assert 'user already unwatched' in json.loads(rv.data).get('codeState','')
项目:ec_forum    作者:imxana    | 项目源码 | 文件源码
def test_article_del_fail(self):
        '''delete_fail'''
        rv = self.article_del('', '222222', self.t_id)
        assert 'userid empty' in json.loads(rv.data).get('codeState','')
        rv = self.article_del('100000', '222222', self.t_id)
        assert 'user not existed' in json.loads(rv.data).get('codeState','')
        rv = self.article_del(self.u_id, '', self.t_id)
        assert 'password empty' in json.loads(rv.data).get('codeState','')
        rv = self.article_del(self.u_id, '222223', self.t_id)
        assert 'wrong password' in json.loads(rv.data).get('codeState','')
        rv = self.article_del(self.u_id, '222222', '')
        assert 'article id empty' in json.loads(rv.data).get('codeState','')
        rv = self.article_del(self.u_id, '222222', '100000')
        assert 'article not existed' in json.loads(rv.data).get('codeState','')
        rv = self.article_del(self.ua_id, '222222', self.t_id)
        assert 'no access to modify article' in json.loads(rv.data).get('codeState','')