我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用flask.json.dumps()。
def post_phonemes(): print(request.files) if 'wave' not in request.files: abort(400) file = request.files['wave'] tf = tempfile.NamedTemporaryFile(dir=UPLOAD_FOLDER) tmp_filename = tf.name tf.close() file.save(tmp_filename) wave = Wave() wave.load(tmp_filename) recogn = PhonemeRecognition() recogn.load('model_en_full') recogn.predict(wave) result = {'phonemes': wave.get_phoneme_map()} print(result) return json.dumps(result)
def run(self, Resource, path=None, query_string=None, kwargs=None, **rkwargs): """Run given resource manually. See werkzeug.test.EnvironBuilder for rkwargs. """ resource = Resource(self, raw=True) if isinstance(query_string, dict): if 'where' in query_string: query_string['where'] = json.dumps(query_string['where']) query_string = urlencode(query_string) rkwargs['query_string'] = query_string args = [] if path: args.append(path) ctx = self.app.test_request_context(*args, **rkwargs) with ctx: kwargs = kwargs or {} return resource.dispatch_request(**kwargs)
def test_user_can_follow_another_user(client, users_repo): eve = User(name='Eve') adam = User(name='Adam') users_repo.store(eve) users_repo.store(adam) response = client.post('/users/{}/follow'.format(eve.pk), data=json.dumps({ 'pk': adam.pk }), content_type='application/json') assert response.status_code == 200 response = client.get('/users/{}'.format(eve.pk)) assert response.status_code == 200 followed_users = json.loads(response.data)['followed_users'] followed_users_names = [u['name'] for u in followed_users] assert 'Adam' in followed_users_names
def create(self, data, suffix=''): """Create a new object inside the data store. Args: data (dict): Dictionary containing values you want to store suffix (str): If given then it will be used as a key while storing Returns: etcd.Result: THe created object expanded """ append = True if suffix == '' else False key = '/{}/{}'.format(self.KEY, suffix) entry = self.client.write(key, json.dumps(data), append=append) return self._load_from_etcd(entry)
def update(self, entity): """Update an existing object from the data store. Args: entity (etcd.Result): An expanded result object that needs to be serialized Returns: etcd.Result: The resulting object expanded, or None if Etcd failed """ entity.value = json.dumps(entity.value) try: entity = self.client.update(entity) except (etcd.EtcdCompareFailed, etcd.EtcdKeyNotFound): return None return self._load_from_etcd(entity)
def test_server_deregister_service(dynamodb): app.test_client().post( '/service', data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'}) ) response = app.test_client().delete( '/service/foo' ) assert response.status_code == 200 describe_response = app.test_client().get( '/service/foo' ) assert describe_response.status_code == 404 deletion_response = app.test_client().delete( '/service/foo' ) assert deletion_response.status_code == 404 assert deletion_response.data.decode('UTF-8') == 'Service: foo not currently registered with Flyby.'
def test_server_deregister_target_group(dynamodb): app.test_client().post( '/service', data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'}) ) app.test_client().post( '/target', data=json.dumps( { 'service_name': 'foo', 'target_group_name': 'foo-blue', 'weight': 50, } ) ) response = app.test_client().delete('/target/foo/foo-blue') assert response.status_code == 200 service = json.loads(app.test_client().get('/service/foo').data) assert service['target_groups'] == [] second_response = app.test_client().delete('/target/foo/foo-blue') assert second_response.status_code == 404 service = json.loads(app.test_client().get('/service/foo').data) assert service['target_groups'] == []
def test_server_register_backend(dynamodb): app.test_client().post( '/service', data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'}) ) app.test_client().post( '/target', data=json.dumps({'target_group_name': 'foo-blue', 'service_name': 'foo', 'weight': 80}) ) response = app.test_client().post( '/backend', data=json.dumps( { 'host': '10.0.0.1:80', 'service_name': 'foo', 'target_group_name': 'foo-blue', } ) ) assert response.status_code == 200 assert json.loads(response.data)['host'] == '10.0.0.1:80'
def chapter(comic_id, chapter_id): comic = data.get_comic(comic_id) chapter = data.get_chapter(chapter_id) next_chapter = data.get_next_chapter(comic_id, chapter.chapter_number) prev_chapter = data.get_prev_chapter(comic_id, chapter.chapter_number) url = 'http://www.ishuhui.net/ComicBooks/ReadComicBooksToIsoV1/' + str( chapter_id) + '.html' if chapter.comic_id != comic_id: abort(404) if chapter.images: images = json.loads(chapter.images) else: images = get_images_from_url(url) chapter.images = json.dumps(images) db.session.commit() return render_template( 'images.html', comic=comic, chapter=chapter, next_chapter=next_chapter, prev_chapter=prev_chapter, images=images, url=url)
def slack_stats(): """ Screen-scrape slack signup app since it's dynamic with node.js and grabs from slack API. """ stats = '' resp = requests.get(SLACK_URL) if resp.status_code == 200: user_count = re.search(r'<p class="status">(.*?)</p>', resp.content) if user_count is not None: stats = user_count.group(1) return Response(response=json.dumps({'text': stats}), status=200, mimetype='application/json')
def add_heart(): """ Add heart to referenced article and return new heart count as JSON """ user = models.find_user() if user is None: data = {'error': 'Cannot heart unless logged in'} return Response(response=json.dumps(data), status=401, mimetype='application/json') count = models.add_heart(request.form['stack'], request.form['title'], user.login) return Response(response=json.dumps({'count': count}), status=200, mimetype='application/json')
def remove_heart(): """ Remove heart to referenced article and return new heart count as JSON """ user = models.find_user() if user is None: data = {'error': 'Cannot heart unless logged in'} return Response(response=json.dumps(data), status=401, mimetype='application/json') count = models.remove_heart(request.form['stack'], request.form['title'], user.login) return Response(response=json.dumps({'count': count}), status=200, mimetype='application/json')
def indicator_bulk_add(): req_keys = ('control', 'data_type', 'event_id', 'pending', 'data') try: pld = request.get_json(silent=True) except Exception, e: return json.dumps({'results': 'error', 'data': '%s' % e}) if _valid_json(req_keys, pld): # load related stuff res_dict = ResultsDict(pld['event_id'], pld['control']) for val, desc in pld['data']: res_dict.new_ind(data_type=pld['data_type'], indicator=val, date=None, description=desc) results = _add_indicators(res_dict, pld['pending']) return json.dumps({'results': 'success', 'data': results}) else: return json.dumps({'results': 'error', 'data': 'bad json'})
def api_indicator_get(): req_keys = ('conditions',) req_keys2 = ('field', 'operator', 'val') try: pld = request.get_json(silent=True) except Exception, e: return json.dumps({'results': 'error', 'data': '%s' % e}) if not _valid_json(req_keys, pld): return json.dumps({'results': 'error', 'data': 'Invalid json'}) for item in pld.get('conditions'): if not _valid_json(req_keys2, item): return json.dumps({'results': 'error', 'data': 'Invalid json'}) q = filter_query(Indicator.query.join(Event), pld.get('conditions'))
def pushbullet(ALERTID=None, TOKEN=None): """ Send a `link` notification to all devices on pushbullet with a link back to the alert's query. If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account """ if not PUSHBULLETURL: return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None) if (TOKEN is not None): PUSHBULLETTOKEN = TOKEN if not PUSHBULLETTOKEN: return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None) a = parse(request) payload = { "body": a['info'], "title": a['AlertName'], "type": "link", "url": a['url'], } headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN} return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
def socialcast(ALERTID=None, NUMRESULTS=10, TEAM=None, I=None, X=None): """ Create a post on Socialcast containing log events. Limited to `NUMRESULTS` (default 10) or 1MB. If `TEAM/I/X` is not passed, requires `SOCIALCASTURL` defined in the form `https://TEAM.socialcast.com/api/webhooks/IIIIIIIIII/XXXXXXXXXXX` For more information see https://socialcast.github.io/socialcast/apidoc/incoming_webhook.html """ if X is not None: URL = 'https://' + TEAM + '.socialcast.com/api/webhooks/' + I + '/' + X if not SOCIALCASTURL or not 'socialcast.com/api/webhooks' in SOCIALCASTURL: return ("SOCIALCASTURL parameter must be set properly, please edit the shim!\n", 500, None) else: URL = SOCIALCASTURL # Socialcast cares about the order of the information in the body # json.dumps() does not preserve the order so just use raw get_data() return callapi(URL, 'post', request.get_data())
def active_tasks(): rpc = app.config['scheduler_rpc'] taskdb = app.config['taskdb'] project = request.args.get('project', "") limit = int(request.args.get('limit', 100)) try: tasks = rpc.get_active_tasks(project, limit) except socket.error as e: app.logger.warning('connect to scheduler rpc error: %r', e) return '{}', 502, {'Content-Type': 'application/json'} result = [] for updatetime, task in tasks: task['updatetime'] = updatetime task['updatetime_text'] = utils.format_date(updatetime) if 'status' in task: task['status_text'] = taskdb.status_to_string(task['status']) result.append(task) return json.dumps(result), 200, {'Content-Type': 'application/json'}
def counter(): rpc = app.config['scheduler_rpc'] if rpc is None: return json.dumps({}) result = {} try: data = rpc.webui_update() for type, counters in iteritems(data['counter']): for project, counter in iteritems(counters): result.setdefault(project, {})[type] = counter for project, paused in iteritems(data['pause_status']): result.setdefault(project, {})['paused'] = paused except socket.error as e: app.logger.warning('connect to scheduler rpc error: %r', e) return json.dumps({}), 200, {'Content-Type': 'application/json'} return json.dumps(result), 200, {'Content-Type': 'application/json'}
def test_plan_add_valid_body(app, manager): url = url_for("plans.add_plan") # add a planner and an attack manager.attacks_store.add(attack1_module) manager.planners_store.add(planner1_module) with app.test_request_context(url): res = app.test_client().post(url_for("plans.add_plan"), content_type="application/json", data=json.dumps(valid_request_body)) assert res.status_code == 200 assert res.mimetype == "application/json" assert res.json == {"msg": "ok"}
def test_handle_empty_input(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.EMPTY), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for empty input') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for empty input') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_BAD_ID_BAD_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad reply_message_id and bad text length') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad reply_message_id and bad text length') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_ok_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_OK_ID_BAD_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad text length') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad text length') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_bad_id_ok_text(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_BAD_ID_OK_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'bad reply_message_id') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'bad reply_message_id') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_malformed_Message(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_MALFORMED_NO_MESSAGE), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'a malformed Message') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'a malformed Message') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_update_id_already_used(self, mocked_send_to_chat, mocked_celery_chain): # we don't need to test celery tasks in the view # that's objective for a separate test suite mocked_send_to_chat.return_value = None mocked_celery_chain.return_value = None response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_SAME_UPDATE_ID), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for an Update with ' 'an ID already used') self.assertEqual({}, json.loads(response.data), 'Failed to return an empty JSON for an Update with ' 'an ID already used') mocked_send_to_chat.apply_async.assert_called_with(args=[{}]) with self.assertRaises(AssertionError) as chain_err: mocked_celery_chain.assert_called() self.assertIn("Expected 'celery_chain' to have been called", str(chain_err.exception)) self.assertEqual(mocked_celery_chain.call_args_list, [])
def test_handle_valid_input(self, mocked_send_to_chat, mocked_celery_chain): response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK, data=json.dumps(TelegramUpdates.TEXT_OK_ID_OK_TEXT), follow_redirects=True, headers=TelegramUpdates.HEADERS) self.assertTrue(response.status_code == 200, 'Failed to return status code 200 for a valid input') self.assertNotEqual({}, response.data, 'Failed to return a non-empty JSON for a valid input') self.assertEqual(TelegramUpdates.TEXT_OK_ID_OK_TEXT, json.loads(response.data), 'Failed to return an Update itself for a valid input Update') # watch out! parse_update eliminates Updates with message_id already processed # so we have to use not parsed Update here mocked_update = {'chat_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['chat']['id'], 'reply_to_message_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['message_id'], 'text': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['text']} mocked_celery_chain.assert_called_with(mocked_update) with self.assertRaises(AssertionError) as send_err: mocked_send_to_chat.assert_called() self.assertIn("Expected 'send_message_to_chat' to have been called", str(send_err.exception)) self.assertEqual(mocked_send_to_chat.call_args_list, [])
def update(cls, **kwargs): cur = get_db().cursor() query = 'UPDATE %s SET %s WHERE %s = ?' % ( cls.__table_name__, ', '.join("'%s' = ?" % str(x) for x in cls.__fields__),cls.__priamry_key__) data = () for f in cls.__fields__: if f in cls.__json_fields__: data = data + (json.dumps(kwargs.get(f)),) else: data = data + (kwargs.get(f),) data = data + (kwargs.get(cls.__priamry_key__),) cur.execute(query, data) get_db().commit() return cls(kwargs)
def git_status(self): repo = Repo('./') o = repo.remotes.origin o.fetch() # Tags tags = [] for t in repo.tags: tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date, "committer": t.commit.committer.name, "message": t.commit.message}) try: branch_name = repo.active_branch.name # test1 except: branch_name = None changes = [] commits_behind = repo.iter_commits('master..origin/master') for c in list(commits_behind): changes.append({"committer": c.committer.name, "message": c.message}) return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name, "master": {"changes": changes}})
def get(self): conn = None try: if not os.path.exists(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db'): self.api.notify(headline="File Not Found", message="Please upload a Kleiner Brauhelfer Database", type="danger") return ('', 404) conn = sqlite3.connect(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db') c = conn.cursor() c.execute('SELECT ID, Sudname, BierWurdeGebraut FROM Sud') data = c.fetchall() result = [] for row in data: result.append({"id": row[0], "name": row[1], "brewed": row[2]}) return json.dumps(result) except Exception as e: print e self.api.notify(headline="Failed to load KHB database", message="ERROR", type="danger") return ('', 500) finally: if conn: conn.close()
def get_logs_as_json(self, t, id): data = request.json result = [] if t == "s": name = cbpi.cache.get("sensors").get(id).name result.append({"name": name, "data": self.read_log_as_json("sensor", id)}) if t == "k": kettle = cbpi.cache.get("kettle").get(id) result = map(self.convert_chart_data_to_json, cbpi.get_controller(kettle.logic).get("class").chart(kettle)) if t == "f": fermenter = cbpi.cache.get("fermenter").get(id) result = map(self.convert_chart_data_to_json, cbpi.get_fermentation_controller(fermenter.logic).get("class").chart(fermenter)) return json.dumps(result)
def test_db_create_service(self): with app.app_context(): s_obj = self.mock.service() self.db.set_service(s_obj) self.assertDictEqual(self.db.get_service().dict(),s_obj.dict()) # def test_db_access_created_service(self): # with app.app_context(): # s_obj = self.mock.service() # self.db.set_service(s_obj) # t_obj = self.db.get_service() # self.assertEqual(json.dumps(s_obj), json.dumps(t_obj)) # # def test_db_overwrite_service(self): # with app.app_context(): # s_obj = self.mock.service() # t_obj = self.mock.service() # t_obj.providesCollectionPids = not s_obj.providesCollectionPids # self.db.set_service(s_obj) # self.db.set_service(t_obj) # self.assertNotEqual(json.dumps(s_obj), json.dumps(self.db.get_service()))
def spark_application(app_id): """Mock of the Spark jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'jobId': st.integers(0), 'name': st.text(), 'submissionTime': st.text(), 'completionTime': st.text(), 'stageIds': st.lists(st.integers(0), average_size=3), 'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']), 'numTasks': st.integers(0), 'numActiveTasks': st.integers(0), 'numCompletedTasks': st.integers(0), 'numSkippedTasks': st.integers(0), 'numFailedTasks': st.integers(0), 'numActiveStages': st.integers(0), 'numCompletedStages': st.integers(0), 'numSkippedStages': st.integers(0), 'numFailedStages': st.integers(0), }) result = json.dumps(st.lists(d, average_size=3).example()) redis.set(request.base_url, result) return jsonify(result)
def get_promotions(): page = request.args.get('page', 1, type=int) pagination = Promotion.query.paginate( page, per_page=current_app.config['YIAVE_PROMOTIONS_PER_PAGE'], error_out=False ) promotions = pagination.items prev = None if pagination.has_prev: prev = url_for('api.get_promotions', page=page - 1, _external=True) next = None if pagination.has_next: next = url_for('api.get_promotions', page=page + 1, _external=True) data = [p.to_json() for p in promotions] response = Response(json.dumps(data)) response.headers['Content-Type'] = "application/json" response.headers['X-Page-Pre'] = prev response.headers['X-Page-Next'] = next response.headers['X-Total-Count'] = pagination.total return response
def get_customers(): page = request.args.get('page', 1, type=int) pagination = Customer.query.paginate( page, per_page=current_app.config['YIAVE_BUSINESSES_PER_PAGE'], error_out=False ) customers = pagination.items prev = None if pagination.has_prev: prev = url_for('customer.get_customers', page=page - 1, _external=True) next = None if pagination.has_next: next = url_for('customer.get_customers', page=page + 1, _external=True) data = [b.to_json() for b in customers] response = Response(json.dumps(data)) response.headers['Content-Type'] = "application/json" response.headers['X-Page-Pre'] = prev response.headers['X-Page-Next'] = next response.headers['X-Total-Count'] = pagination.total return response
def get_businesses(): page = request.args.get('page', 1, type=int) pagination = Business.query.paginate( page, per_page=current_app.config['YIAVE_BUSINESSES_PER_PAGE'], error_out=False ) businesses = pagination.items prev = None if pagination.has_prev: prev = url_for('api.get_businesses', page=page - 1, _external=True) next = None if pagination.has_next: next = url_for('api.get_businesses', page=page + 1, _external=True) data = [b.to_json() for b in businesses] response = Response(json.dumps(data)) response.headers['Content-Type'] = "application/json" response.headers['X-Page-Pre'] = prev response.headers['X-Page-Next'] = next response.headers['X-Total-Count'] = pagination.total return response
def VPagina(): #GET parameter idUsuario = request.args['idUsuario'] res = {} if "actor" in session: res['actor']=session['actor'] res['usuario'] = {'nombre': session['nombre_usuario']} #Action code goes here, res should be a JSON structure pagina_existente = db.session.query(Pagina).filter_by(id_usuario=idUsuario).first() usuario = {"nombre":idUsuario} if pagina_existente is not None: res = {"fPagina": {"titulo":pagina_existente.titulo, "contenido":pagina_existente.contenido}} res["usuario"]=usuario res['idUsuario'] = idUsuario #Action code ends here return json.dumps(res) #Use case code starts here #Use case code ends here
def AIdentificar(): #POST/PUT parameters params = request.get_json() results = [{'label':'/VPrincipal', "actor":"duenoProducto"}, {'label':'/VLogin', 'msg':['Datos de identificación incorrectos']}, ] res = results[1] #Action code goes here, res should be a list with a label and a message for nombre_usuario, clave in db.session.query(Usuario.nombre_usuario, Usuario.clave) : if nombre_usuario == params['usuario'] and clave == params['clave'] : res = results[0] session['nombre_usuario']=params['usuario'] session['idPaginaSitio'] = " " res['idPaginaSitio'] = " " break #Action code ends here if "actor" in res: if res['actor'] is None: session.pop("actor", None) else: session['actor'] = res['actor'] return json.dumps(res)
def VSecundaria(): res = {} if "actor" in session: res['actor']=session['actor'] #Action code goes here, res should be a JSON structure idPagina = request.args['idPagina'] pag = Sitio.query.filter_by(id=idPagina).first() if pag is None: res['pag'] = { 'hilo':-1, 'titulo': 'Lo sentimos', 'contenido': 'La página que busca no existe.', 'imagenes':''} else: res['pag'] = { 'hilo': pag.hilo.id, 'titulo': pag.titulo, 'contenido': pag.contenido, 'imagenes': pag.imagenes} #Action code ends here if 'nombre_usuario' in session: res['idUsuario'] = session['nombre_usuario'] return json.dumps(res)
def test_creating_movie(client): response = client.post('/movies', data=json.dumps({ 'title': 'Home Alone', }), content_type='application/json') assert response.status_code == 201 response = client.get('/movies') assert response.status_code == 200 movies = json.loads(response.data)['items'] movie = movies[0] assert movie['title'] == 'Home Alone' assert movie['likes'] == 0
def test_creating_movie_400_error(client): response = client.post('/movies', data=json.dumps({ 'wrong_field': 'Home Alone', }), content_type='application/json') assert response.status_code == 400
def test_populate_movies(client, populate_feed, movies_repo): response = client.post( '/populate', data=json.dumps(populate_feed), content_type='application/json' ) assert response.status_code == 201 movies = movies_repo.all() titles = [m.title for m in movies] assert 'Home alone' in titles assert 'Shining' in titles
def test_populate_users(client, populate_feed, users_repo): response = client.post( '/populate', data=json.dumps(populate_feed), content_type='application/json' ) assert response.status_code == 201 users = users_repo.all() user_names = [u.name for u in users] assert 'Andrew' in user_names assert 'John' in user_names
def test_populate_follows(client, populate_feed, users_repo): response = client.post( '/populate', data=json.dumps(populate_feed), content_type='application/json' ) assert response.status_code == 201 user = users_repo.get_by_name('John') followed_names = [followed.name for followed in user.get_followed_users()] assert 'Andrew' in followed_names
def test_populate_400(client): response = client.post( '/populate', data=json.dumps({}), content_type='application/json' ) assert response.status_code == 400
def test_user_registration(client): response = client.post('/users', data=json.dumps({ 'name': 'Kevin', }), content_type='application/json') assert response.status_code == 201 response = client.get('/users') assert response.status_code == 200 movies = json.loads(response.data)['items'] names = [m['name'] for m in movies] assert 'Kevin' in names