我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用flask.json.loads()。
def test_user_can_follow_another_user(client, users_repo): eve = User(name='Eve') adam = User(name='Adam') users_repo.store(eve) users_repo.store(adam) response = client.post('/users/{}/follow'.format(eve.pk), data=json.dumps({ 'pk': adam.pk }), content_type='application/json') assert response.status_code == 200 response = client.get('/users/{}'.format(eve.pk)) assert response.status_code == 200 followed_users = json.loads(response.data)['followed_users'] followed_users_names = [u['name'] for u in followed_users] assert 'Adam' in followed_users_names
def test_user_can_like_movie(client, movie, user, users_repo, movies_repo): url = '/users/{user_pk}/liked_movies'.format( user_pk=user.pk, ) assert movie.likes == 0 response = client.post(url, data=json.dumps({ 'pk': movie.pk, }), content_type='application/json') assert response.status_code == 200 response = client.get('/users/{}'.format(user.pk)) liked_movies = json.loads(response.data)['liked_movies'] movie_titles = [m['title'] for m in liked_movies] assert movie.title in movie_titles assert movie.likes == 1
def validate(validation_class, silent=not settings.DEBUG): def decorator(fun): @functools.wraps(fun) def wrapper(*a, **kw): if flask.request.method == 'GET': p = {k: v for k, v in flask.request.values and flask.request.values.items() or {}} else: try: p = flask.request.data and json.loads(flask.request.data.decode()) except Exception: raise exceptions.WrongParametersException if silent: try: validation_class(p) except PyCombValidationError: raise exceptions.WrongParametersException else: validation_class(p) return fun(*a, params=p, **kw) return wrapper return decorator
def get_job(request_id): job = _get_job(request_id) status = job.status run_at = None scheduled_at = None last_retry = None retries_left = 0 if status != "done": args = json.loads(job.args) taskid, fname, args, kwargs, run_at = args scheduled_at = kwargs['scheduled_at'] last_retry = kwargs.get('last_retry') retries_left = kwargs['_attempts'] return jsonify({ 'request_id': request_id, 'status': status, 'run_at': run_at, 'scheduled_at': scheduled_at, 'last_retry': last_retry, 'retries_left': retries_left, })
def test_volume_list_get(self, etcd_client, volume_raw_ok_ready, flask_app): generators = [volume_raw_ok_ready, volume_raw_ok_ready] volume_writes = [] for generator in generators: volume_writes.append( etcd_client.write(VolumeManager.KEY, generator, append=True)) volume_writes = flask_app.volume_manager._load_from_etcd(volume_writes) expected, errors = VolumeSchema().dump(volume_writes, many=True) assert errors == {} with flask_app.test_client() as c: response = c.get('/volumes') assert response.status_code == 200 actual = json.loads(response.data.decode()) assert len(actual) == len(expected) assert actual == expected
def test_volume_put_empty_body(self, etcd_client, volume_raw_ok_ready, flask_app): volume = etcd_client.write(VolumeManager.KEY, volume_raw_ok_ready, append=True) id = flask_app.volume_manager.get_id_from_key(volume.key) expected_result = { 'message': {'constraints': ['Missing data for required field.'], 'reserved_size': ['Missing data for required field.']}} expected_code = 400 with flask_app.test_client() as c: response = c.put('/volumes/{}'.format(id), data='{}', content_type='application/json') result = json.loads(response.data.decode()) assert response.status_code == expected_code assert expected_result == result
def _load_from_etcd(self, data): """Utility method to expand result objects. Args: data ([etcd.result] | etcd.result): The iterable or a single result object not expanded Returns: etcd.result: Return type matches given input (if it is a list->list), and all result objects are expanded """ # we trust that etcd data is valid try: iter(data) except TypeError: data.value = json.loads(data.value) return data else: for entry in data: entry.value = json.loads(entry.value) return data
def test_server_register_backend(dynamodb): app.test_client().post( '/service', data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'}) ) app.test_client().post( '/target', data=json.dumps({'target_group_name': 'foo-blue', 'service_name': 'foo', 'weight': 80}) ) response = app.test_client().post( '/backend', data=json.dumps( { 'host': '10.0.0.1:80', 'service_name': 'foo', 'target_group_name': 'foo-blue', } ) ) assert response.status_code == 200 assert json.loads(response.data)['host'] == '10.0.0.1:80'
def post(self): if current_user() is None: redirect(url_for('authorized')) user = current_user() if not user.faculty: return abort(403) key = request.form.get('key', None) value = request.form.get('value', None) try: try: config.set(key, config.from_frontend_value(key, json.loads(value))) return jsonify({'status': 'OK'}) except ValueError: return abort(404) except: return abort(400)
def chapter(comic_id, chapter_id): comic = data.get_comic(comic_id) chapter = data.get_chapter(chapter_id) next_chapter = data.get_next_chapter(comic_id, chapter.chapter_number) prev_chapter = data.get_prev_chapter(comic_id, chapter.chapter_number) url = 'http://www.ishuhui.net/ComicBooks/ReadComicBooksToIsoV1/' + str( chapter_id) + '.html' if chapter.comic_id != comic_id: abort(404) if chapter.images: images = json.loads(chapter.images) else: images = get_images_from_url(url) chapter.images = json.dumps(images) db.session.commit() return render_template( 'images.html', comic=comic, chapter=chapter, next_chapter=next_chapter, prev_chapter=prev_chapter, images=images, url=url)
def test_get_jobs_running(self): e_job_uuid = [] jobs = [] print self.manager.available_job_plugins() for i in range(4): job = self.manager.create_job(self.test_jobs_module.constants.WAIT_10_SECONDS_JOB_NAME, {}, True) jobs.append(job) e_job_uuid.append(job.uuid) print jobs[0].status() r = self.client.get('/jobs/running') self.assertOk(r) r_jobs = json.loads(r.get_data())["jobs"] self.assertEquals(len(r_jobs), 4) r_job_uuid = [] for job in r_jobs: r_job_uuid.append(job["uuid"]) self.assertEquals(sorted(e_job_uuid), sorted(r_job_uuid)) self._terminate_all_jobs()
def recommendations(ALERTID=None): headers = {'Content-type': 'application/json', 'Accept': 'application/json'} alertURL = vropsURL+"api/alerts/"+ALERTID auth = (vropsUser, vropsPass) response = callapi(alertURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY) # Fetch the alert to grab alert def ID alertInfo = json.loads(response) alertDescURL = vropsURL+"api/alertdefinitions/"+alertInfo['alertDefinitionId'] response = callapi(alertDescURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY) # Fetch recommendations from alert def recommendations = json.loads(response) if recommendations['states'][0]['recommendationPriorityMap']: for recommendation in recommendations['states'][0]['recommendationPriorityMap']: if recommendations['states'][0]['recommendationPriorityMap'][recommendation] == 1: alertDescURL = vropsURL+"api/recommendations/"+recommendation response = callapi(alertDescURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY) recText = json.loads(response) return recText['description'] else: return
def fetchResourceProperties(resourceId=None): headers = {'Content-type': 'application/json', 'Accept': 'application/json'} alertURL = vropsURL+"api/resources/"+resourceId+"/properties" auth = (vropsUser, vropsPass) response = callapi(alertURL, method='get', payload=None, headers=headers, auth=auth, check=VERIFY) rawProps = json.loads(response) props = {} for prop in rawProps['property']: props[prop["name"]] = prop["value"] return props # Route without <ALERTID> are for LI, with are for vROps # Adding PUT and POST for vROps so REST Notification test works # vROps (as of 6.6.1) will attempt both methods and fail if both # do not respond # TODO : Add this capability to all shims
def test_handle_empty_input(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.EMPTY), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for empty input') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for empty input') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_BAD_ID_BAD_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad reply_message_id and bad text length') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad reply_message_id and bad text length') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_ok_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_OK_ID_BAD_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad text length') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad text length') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_ok_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_BAD_ID_OK_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad reply_message_id') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad reply_message_id') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_malformed_Message(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_MALFORMED_NO_MESSAGE), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'a malformed Message') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'a malformed Message') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_update_id_already_used(self, mocked_send_to_chat, mocked_celery_chain): # we don't need to test celery tasks in the view # that's objective for a separate test suite mocked_send_to_chat.return_value = None mocked_celery_chain.return_value = None response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_SAME_UPDATE_ID), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'an ID already used') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'an ID already used') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_valid_input(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_OK_ID_OK_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for a valid input') self.assertNotEqual({}, response.data, 'Failed to return a non-empty JSON for a valid input') self.assertEqual(TelegramUpdates.TEXT_OK_ID_OK_TEXT, json.loads(response.data), 'Failed to return an Update itself for a valid input Update') # watch out! parse_update eliminates Updates with message_id already processed # so we have to use not parsed Update here mocked_update = {'chat_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['chat']['id'], 'reply_to_message_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['message_id'], 'text': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['text']} mocked_celery_chain.assert_called_with(mocked_update) with self.assertRaises(AssertionError) as send_err: mocked_send_to_chat.assert_called() self.assertIn("Expected 'send_message_to_chat' to have been called", str(send_err.exception)) self.assertEqual(mocked_send_to_chat.call_args_list, [])
def test_members_get_id(self): with self.app.app_context(): # create collection, members c_obj = self.mock.collection(description={'something':'abcdefghi123ö'}) m_objs = [self.mock.member() for i in range(5)] # add collection, members self.app.db.set_collection(c_obj) for m_obj in m_objs: self.app.db.set_member(c_obj.id, m_obj) # GET members responses = [{'out': self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members/"+urllib.parse.quote_plus(m_obj.id)), 'in':m_obj} for m_obj in m_objs] # assert 200 OK for r in responses: self.assertEqual(r['out'].status_code, 200) # compare members self.assertDictEqual(json.loads(r['out'].data).dict(), r['in'].dict())
def test_members_get(self): with self.app.app_context(): # create collection, members c_obj = self.mock.collection(description={'something':'abcdefghi123ö'}) m_objs = [self.mock.member() for i in range(5)] # add collection, members self.app.db.set_collection(c_obj) # for m_obj in m_objs: self.app.db.set_member(c_obj.id, m_objs) # pool = ThreadPool(50) # pool.map(lambda m_obj: self.app.db.set_member(c_obj.id, m_obj), m_objs) # GET members response = self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members") # assert 200 OK self.assertEqual(response.status_code, 200) sortedResponse = [r.dict() for r in sorted(json.loads(response.data)['contents'], key=lambda x: x.id)] sortedMocks = [m.dict() for m in sorted(m_objs, key=lambda x: x.id)] for i in range(len(sortedMocks)): self.assertDictEqual(sortedResponse[i], sortedMocks[i])
def post(self, id=None): try: if id: raise NotFoundError() # 404 if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user()).x: raise UnauthorizedError() objs = json.loads(request.data) if not isinstance(objs, list): raise ParseError() pids = app.db.get_service().providesCollectionPids if pids: objs = [CollectionObject(**obj.update({'id': app.mint.get_id(CollectionObject)})) if isinstance(obj, dict) else obj for obj in objs] if app.db.ask_collection([obj.id for obj in objs]): raise ConflictError() app.db.set_collection(objs) return jsonify(objs), 201 except (NotFoundError, DBError, UnauthorizedError, ConflictError): raise except: raise ParseError() # 400
def post(self, id): try: if not id: raise NotFoundError if 'findMatch' not in app.service.supportedCollectionOperations: raise NotFoundError() if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user(),id).r: raise UnauthorizedError posted = json.loads(request.data) if isinstance(posted, Model): posted = posted.dict() if isinstance(posted.get('mappings'), Model): posted['mappings'] = posted.get('mappings').dict() members = [m for m in app.db.get_member(id) if dict_subset(posted, m.dict())] return jsonify(MemberResultSet(members)), 200 except (NotFoundError, DBError, UnauthorizedError): raise except: raise ParseError() # 400
def test_api_can_read_feature_request(self): FeatkeeperTestUtils.destroy_test_db() FeatkeeperTestUtils.populate_test_feature_requests() FeatkeeperTestUtils.populate_test_users() expected = { '_id': '56d3d524402e5f1cfc273340', 'title': 'Support custom themes', 'description': 'Client wants to be able to choose different colors, fonts, and layouts for each module', 'client_name': 'Mandel Jamesdottir', 'client_priority': 1, 'target_date': '2016-08-21', 'ticket_url': 'http://localhost:5000/8VZuWu', 'product_area': 'Policies', 'agent_name': 'Eleuthere', 'created_at': '2016-02-28 23:35:19', 'is_open': 1 } response = self.app.get(API_ROOT_URL + '/feature-request/56d3d524402e5f1cfc273340') response_test = json.loads(response.data) self.assertEqual(expected, response_test) # Test for PUT /feature-request, should create a new feature request and # return success message
def add_rule(): if not app.config["service_built"]: return "Service is not yet built", 409 rule = json.loads(request.data) if not is_valid(rule): return "Invalid rule. Expected format is {}".format(RULE_FORMAT), 400 try: command = "docker run -t --rm --net=host --privileged fw-docker" if is_valid_policy(rule): arguments = " -p {policy}".format(**rule) else: arguments = " add {direction} {protocol} {dst_port} {dst_src} {ip_range} {action}".format(**rule) log.debug("running docker add...") output = subprocess.check_call(shlex.split(command + arguments)) log.debug("docker run. output: " + str(output)) return "", 204 except Exception as e: return str(e), 500
def delete_rule(): if not app.config["service_built"]: return "Service is not yet built", 409 rule = json.loads(request.data) if not is_valid(rule): return "Invalid rule. Expected format {}".format(RULE_FORMAT), 400 if is_valid_policy(rule): return "Unsupported operation. Cannot delete policy", 409 try: command = "docker run -t --rm --net=host --privileged fw-docker" arguments = " del {direction} {protocol} {dst_port} {dst_src} {ip_range} {action}".format(**rule) log.debug("running docker DEL...") output = subprocess.check_call(shlex.split(command + arguments)) log.debug("docker run. output: " + str(output)) return "", 204 except Exception as e: return str(e), 500
def test_jwt_refresh_required_with_cookies(app, options): test_client = app.test_client() auth_url, cookie_name, protected_url = options # Test without cookies response = test_client.get(protected_url) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Missing cookie "{}"'.format(cookie_name)} # Test after receiving cookies test_client.get(auth_url) response = test_client.get(protected_url) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert json_data == {'foo': 'bar'} # Test after issuing a 'logout' to delete the cookies test_client.get('/delete_tokens') response = test_client.get(protected_url) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Missing cookie "{}"'.format(cookie_name)}
def test_default_access_csrf_protection(app, options): test_client = app.test_client() auth_url, csrf_cookie_name, post_url = options # Get the jwt cookies and csrf double submit tokens response = test_client.get(auth_url) csrf_cookie = _get_cookie_from_response(response, csrf_cookie_name) csrf_token = csrf_cookie[csrf_cookie_name] # Test you cannot post without the additional csrf protection response = test_client.post(post_url) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Missing CSRF token in headers'} # Test that you can post with the csrf double submit value csrf_headers = {'X-CSRF-TOKEN': csrf_token} response = test_client.post(post_url, headers=csrf_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert json_data == {'foo': 'bar'}
def test_csrf_with_custom_header_names(app, options): app.config['JWT_ACCESS_CSRF_HEADER_NAME'] = 'FOO' app.config['JWT_REFRESH_CSRF_HEADER_NAME'] = 'FOO' test_client = app.test_client() auth_url, csrf_cookie_name, post_url = options # Get the jwt cookies and csrf double submit tokens response = test_client.get(auth_url) csrf_cookie = _get_cookie_from_response(response, csrf_cookie_name) csrf_token = csrf_cookie[csrf_cookie_name] # Test that you can post with the csrf double submit value csrf_headers = {'FOO': csrf_token} response = test_client.post(post_url, headers=csrf_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert json_data == {'foo': 'bar'}
def test_get_jwt_identity_in_verification_method(app, url): jwt = get_jwt_manager(app) @jwt.claims_verification_loader def user_load_callback(user_claims): # Make sure that we can get the jwt identity in here if we need it. user = get_jwt_identity() return user == 'username' test_client = app.test_client() with app.test_request_context(): access_token = create_access_token('username', fresh=True) response = test_client.get(url, headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'foo': 'bar'} assert response.status_code == 200
def test_non_blacklisted_access_token(app, blacklist_type): jwt = get_jwt_manager(app) app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type @jwt.token_in_blacklist_loader def check_blacklisted(decrypted_token): return False with app.test_request_context(): access_token = create_access_token('username') test_client = app.test_client() response = test_client.get('/protected', headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'foo': 'bar'} assert response.status_code == 200
def test_blacklisted_access_token(app, blacklist_type): jwt = get_jwt_manager(app) app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type @jwt.token_in_blacklist_loader def check_blacklisted(decrypted_token): return True with app.test_request_context(): access_token = create_access_token('username') test_client = app.test_client() response = test_client.get('/protected', headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'msg': 'Token has been revoked'} assert response.status_code == 401
def test_non_blacklisted_refresh_token(app, blacklist_type): jwt = get_jwt_manager(app) app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type @jwt.token_in_blacklist_loader def check_blacklisted(decrypted_token): return False with app.test_request_context(): refresh_token = create_refresh_token('username') test_client = app.test_client() response = test_client.get('/refresh_protected', headers=make_headers(refresh_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'foo': 'bar'} assert response.status_code == 200
def test_blacklisted_refresh_token(app, blacklist_type): jwt = get_jwt_manager(app) app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = blacklist_type @jwt.token_in_blacklist_loader def check_blacklisted(decrypted_token): return True with app.test_request_context(): refresh_token = create_refresh_token('username') test_client = app.test_client() response = test_client.get('/refresh_protected', headers=make_headers(refresh_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'msg': 'Token has been revoked'} assert response.status_code == 401
def test_custom_blacklisted_message(app): jwt = get_jwt_manager(app) @jwt.token_in_blacklist_loader def check_blacklisted(decrypted_token): return True @jwt.revoked_token_loader def custom_error(): return jsonify(baz='foo'), 404 with app.test_request_context(): access_token = create_access_token('username') test_client = app.test_client() response = test_client.get('/protected', headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'baz': 'foo'} assert response.status_code == 404
def test_expired_token(app): url = '/protected' jwtM = get_jwt_manager(app) test_client = app.test_client() with app.test_request_context(): token = create_access_token('username', expires_delta=timedelta(minutes=-1)) # Test default response response = test_client.get(url, headers=make_headers(token)) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Token has expired'} # Test custom response @jwtM.expired_token_loader def custom_response(): return jsonify(msg='foobar'), 201 response = test_client.get(url, headers=make_headers(token)) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 201 assert json_data == {'msg': 'foobar'}
def test_no_token(app): url = '/protected' jwtM = get_jwt_manager(app) test_client = app.test_client() # Test default response response = test_client.get(url, headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Missing Authorization Header'} # Test custom response @jwtM.unauthorized_loader def custom_response(err_str): return jsonify(msg='foobar'), 201 response = test_client.get(url, headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 201 assert json_data == {'msg': 'foobar'}
def test_custom_user_loader_errors(app, url): jwt = get_jwt_manager(app) @jwt.user_loader_callback_loader def user_load_callback(identity): return None @jwt.user_loader_error_loader def user_loader_error(identity): return jsonify(foo='bar'), 201 test_client = app.test_client() with app.test_request_context(): access_token = create_access_token('username') response = test_client.get(url, headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 201 assert json_data == {'foo': "bar"}
def test_asymmetric_cropto(app): test_client = app.test_client() with app.test_request_context(): hs256_token = create_access_token('username') app.config['JWT_ALGORITHM'] = 'RS256' rs256_token = create_access_token('username') # Insure the symmetric token does not work now access_headers = {'Authorization': 'Bearer {}'.format(hs256_token)} response = test_client.get('/protected', headers=access_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 422 assert json_data == {'msg': 'The specified alg value is not allowed'} # Insure the asymmetric token does work access_headers = {'Authorization': 'Bearer {}'.format(rs256_token)} response = test_client.get('/protected', headers=access_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert json_data == {'foo': 'bar'}
def test_custom_header_name(app): app.config['JWT_HEADER_NAME'] = 'Foo' test_client = app.test_client() with app.test_request_context(): access_token = create_access_token('username') # Insure 'default' headers no longer work access_headers = {'Authorization': 'Bearer {}'.format(access_token)} response = test_client.get('/protected', headers=access_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Missing Foo Header'} # Insure new headers do work access_headers = {'Foo': 'Bearer {}'.format(access_token)} response = test_client.get('/protected', headers=access_headers) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert json_data == {'foo': 'bar'}
def test_missing_headers(app): test_client = app.test_client() jwtM = get_jwt_manager(app) # Insure 'default' no headers response response = test_client.get('/protected', headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': "Missing Authorization Header"} # Test custom no headers response @jwtM.unauthorized_loader def custom_response(err_str): return jsonify(foo='bar'), 201 response = test_client.get('/protected', headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 201 assert json_data == {'foo': "bar"}
def test_user_claims_with_different_name(app): jwt = get_jwt_manager(app) app.config['JWT_USER_CLAIMS'] = 'banana' @jwt.user_claims_loader def add_claims(identity): return {'foo': 'bar'} with app.test_request_context(): access_token = create_access_token('username') # Make sure the name is actually different in the token decoded_token = decode_token(access_token) assert decoded_token['banana'] == {'foo': 'bar'} # Make sure the correct data is returned to us from the full call test_client = app.test_client() response = test_client.get('/protected', headers=make_headers(access_token)) json_data = json.loads(response.get_data(as_text=True)) assert json_data == {'foo': 'bar'} assert response.status_code == 200
def test_user_watch(self): '''watch and unwatch, suc and fail''' rv = self.user_watch(self.u_id, '222222', self.ua_id, '1') assert '1' == json.loads(rv.data).get('code','') rv = self.sign_in(self.name,'222222') assert self.ua_id in json.loads(rv.data).get('u_watchusers','') rv = self.sign_in(self.name2,'222222') assert self.u_id in json.loads(rv.data).get('u_watchusers','') rv = self.user_watch(self.u_id, '222222', self.ua_id, '1') assert 'user already watched' in json.loads(rv.data).get('codeState','') rv = self.user_watch(self.u_id, '222222', self.ua_id, '0') assert '1' == json.loads(rv.data).get('code','') rv = self.sign_in(self.name,'222222') assert self.ua_id not in json.loads(rv.data).get('u_watchusers','') rv = self.sign_in(self.name2,'222222') assert self.u_id not in json.loads(rv.data).get('u_watchusers','') rv = self.user_watch(self.u_id, '222222', self.ua_id, '0') assert 'user already unwatched' in json.loads(rv.data).get('codeState','')
def test_article_del_fail(self): '''delete_fail''' rv = self.article_del('', '222222', self.t_id) assert 'userid empty' in json.loads(rv.data).get('codeState','') rv = self.article_del('100000', '222222', self.t_id) assert 'user not existed' in json.loads(rv.data).get('codeState','') rv = self.article_del(self.u_id, '', self.t_id) assert 'password empty' in json.loads(rv.data).get('codeState','') rv = self.article_del(self.u_id, '222223', self.t_id) assert 'wrong password' in json.loads(rv.data).get('codeState','') rv = self.article_del(self.u_id, '222222', '') assert 'article id empty' in json.loads(rv.data).get('codeState','') rv = self.article_del(self.u_id, '222222', '100000') assert 'article not existed' in json.loads(rv.data).get('codeState','') rv = self.article_del(self.ua_id, '222222', self.t_id) assert 'no access to modify article' in json.loads(rv.data).get('codeState','')