我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tornado.escape.json_decode()。
def test_decode_argument(self): # These urls all decode to the same thing urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8", "/decode_arg/%E9?foo=%E9&encoding=latin1", "/decode_arg_kw/%E9?foo=%E9&encoding=latin1", ] for req_url in urls: response = self.fetch(req_url) response.rethrow() data = json_decode(response.body) self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')], u('query'): [u('unicode'), u('\u00e9')], }) response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9") response.rethrow() data = json_decode(response.body) self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')], u('query'): [u('bytes'), u('c3a9')], })
def test_multipart_form(self): # Encodings here are tricky: Headers are latin1, bodies can be # anything (we use utf8 by default). response = self.raw_fetch([ b"POST /multipart HTTP/1.0", b"Content-Type: multipart/form-data; boundary=1234567890", b"X-Header-encoding-test: \xe9", ], b"\r\n".join([ b"Content-Disposition: form-data; name=argument", b"", u("\u00e1").encode("utf-8"), b"--1234567890", u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"), b"", u("\u00fa").encode("utf-8"), b"--1234567890--", b"", ])) data = json_decode(response) self.assertEqual(u("\u00e9"), data["header"]) self.assertEqual(u("\u00e1"), data["argument"]) self.assertEqual(u("\u00f3"), data["filename"]) self.assertEqual(u("\u00fa"), data["filebody"])
def test_chunked_request_body(self): # Chunked requests are not widely supported and we don't have a way # to generate them in AsyncHTTPClient, but HTTPServer will read them. self.stream.write(b"""\ POST /echo HTTP/1.1 Transfer-Encoding: chunked Content-Type: application/x-www-form-urlencoded 4 foo= 3 bar 0 """.replace(b"\n", b"\r\n")) read_stream_body(self.stream, self.stop) headers, response = self.wait() self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
def test_get_argument(self): response = self.fetch("/get_argument?foo=bar") self.assertEqual(response.body, b"bar") response = self.fetch("/get_argument?foo=") self.assertEqual(response.body, b"") response = self.fetch("/get_argument") self.assertEqual(response.body, b"default") # Test merging of query and body arguments. # In singular form, body arguments take precedence over query arguments. body = urllib_parse.urlencode(dict(foo="hello")) response = self.fetch("/get_argument?foo=bar", method="POST", body=body) self.assertEqual(response.body, b"hello") # In plural methods they are merged. response = self.fetch("/get_arguments?foo=bar", method="POST", body=body) self.assertEqual(json_decode(response.body), dict(default=['bar', 'hello'], query=['bar'], body=['hello']))
def test_username_query(self): username = "bobsmith" positive_query = 'bobsm' negative_query = 'nancy' async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS) resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 1) # ensure we got a tracking event self.assertEqual((await self.next_tracking_event())[0], None) resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0) # ensure we got a tracking event self.assertEqual((await self.next_tracking_event())[0], None)
def test_username_query_sql_inject_attampt(self): username = "bobsmith" inject_attempt = quote_plus("x'; delete from users; select * from users") async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS) resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0) async with self.pool.acquire() as con: row = await con.fetchrow("SELECT COUNT(*) AS count FROM users") self.assertEqual(row['count'], 1)
def test_underscore_username_query(self): async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)", "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001") await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)", "bob_smith", "Robert", "0x0000000000000000000000000000000000000002") await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)", "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003") await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)", "user1234", "user1234", "0x0000000000000000000000000000000000000004") for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]: resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query)) for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]: resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
def test_inactive_username_query(self): username = "bobsmith" positive_query = 'bobsm' async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, toshi_id, active) VALUES ($1, $2, false)", username, TEST_ADDRESS) resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0) resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={ "payment_address": TEST_ADDRESS }) self.assertResponseCodeEqual(resp, 200) resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 1)
def test_create_user(self): resp = await self.fetch("/timestamp") self.assertEqual(resp.code, 200) resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST", body={'payment_address': TEST_PAYMENT_ADDRESS}) self.assertResponseCodeEqual(resp, 200) body = json_decode(resp.body) self.assertEqual(body['toshi_id'], TEST_ADDRESS) async with self.pool.acquire() as con: row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS) self.assertIsNotNone(row) self.assertFalse(row['is_app']) self.assertIsNotNone(row['username']) # ensure we got a tracking event self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
def test_create_app_user(self): resp = await self.fetch("/timestamp") self.assertEqual(resp.code, 200) resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST", body={"is_app": True, 'payment_address': TEST_PAYMENT_ADDRESS}) self.assertResponseCodeEqual(resp, 200) body = json_decode(resp.body) self.assertEqual(body['toshi_id'], TEST_ADDRESS) async with self.pool.acquire() as con: row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS) self.assertIsNotNone(row) self.assertTrue(row['is_app']) # ensure we got a tracking event self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
def test_set_unknown_categories_fails(self): username = "toshibot" name = "ToshiBot" categories = await self.setup_categories() async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, toshi_id, name, is_app, is_public) VALUES ($1, $2, $3, true, true)", username, TEST_ADDRESS, name) resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={ "categories": [categories[-1][0] + 10, 'badcat'] }) self.assertResponseCodeEqual(resp, 400) resp = await self.fetch("/user/{}".format(TEST_ADDRESS)) self.assertResponseCodeEqual(resp, 200) body = json_decode(resp.body) self.assertIn("categories", body) self.assertEqual(len(body['categories']), 0)
def test_app_underscore_username_query(self): async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)", "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001") await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)", "bob_smith", "Robert", "0x0000000000000000000000000000000000000002") await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)", "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003") await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)", "user1234", "user1234", "0x0000000000000000000000000000000000000004") for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]: resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query)) for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]: resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
def fetch_definitions(self, term): url = url_concat(UrbanDictionary.DEFINE_URL, dict(term=term)) request = HTTPRequest( url = url, headers = { 'X-Mashape-Key': self.api_key, 'Accept' : 'text/plain' } ) tornado_future = self.client.fetch(request) future = to_asyncio_future(tornado_future) response = await future data = json_decode(response.body) return data['list']
def access_token(self, code, state): client = AsyncHTTPClient() payload = ( ('client_id', self.client_id), ('client_secret', self.client_secret), ('grant_type', 'authorization_code'), ('redirect_uri', Twitch.REDIRECT_URI), ('code', code), ('state', state), ) url = Twitch.TOKEN_URL request = HTTPRequest( url = url, method = 'POST', body = urlencode(payload) ) tornado_future = client.fetch(request) future = to_asyncio_future(tornado_future) response = await future data = json_decode(response.body) return data['access_token']
def test_username_query(self): username = "TokenBot" positive_query = 'enb' negative_query = 'TickleFight' async with self.pool.acquire() as con: await con.execute("INSERT INTO apps (name, token_id) VALUES ($1, $2)", username, TEST_ADDRESS) await con.execute("INSERT INTO sofa_manifests (token_id, payment_address) VALUES ($1, $2)", TEST_ADDRESS, TEST_ADDRESS) resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['apps']), 1) resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['apps']), 0)
def save_term_settings(term, location, session, settings): """ Saves the *settings* associated with the given *term*, *location*, and *session* in the 'term_settings.json' file inside the user's session directory. When complete the given *callback* will be called (if given). """ if not session: return # Just a viewer of a broadcast terminal term = str(term) # JSON wants strings as keys term_settings = RUDict() term_settings[location] = {term: settings} session_dir = os.path.join(getsettings('BASE_DIR'), 'sessions') session_dir = os.path.join(session_dir, session) settings_path = os.path.join(session_dir, 'term_settings.json') # First we read in the existing settings and then update them. if os.path.exists(settings_path): with io.open(settings_path, encoding='utf-8') as f: term_settings.update(json_decode(f.read())) term_settings[location][term].update(settings) with io.open(settings_path, 'w', encoding='utf-8') as f: f.write(json_encode(term_settings))
def get_log_metadata(golog_path): """ Returns the metadata from the log at the given *golog_path* in the form of a dict. """ metadata = {} if not os.path.getsize(golog_path): # 0 bytes return metadata # Nothing to do try: first_frame, distance = retrieve_first_frame(golog_path) except IOError: # Something wrong with the log... Probably still being written to return metadata if first_frame[14:].startswith('{'): # This is JSON, capture metadata metadata = json_decode(first_frame[14:]) return metadata # All done
def get_current_user(self): """ Mostly identical to the function of the same name in MainHandler. The difference being that when API authentication is enabled the WebSocket will expect and perform its own auth of the client. """ expiration = self.settings.get('auth_timeout', "14d") # Need the expiration in days (which is a bit silly but whatever): expiration = ( float(total_seconds(convert_to_timedelta(expiration))) / float(86400)) #user_json = self.get_secure_cookie( #"gateone_user", max_age_days=expiration) user_json = self.message.http_session.get('gateone_user',None) #print 'user_json',user_json #user_json {"upn": "ANONYMOUS", "session": "YmM5MDU5MDgyYmVjNDU0M2E5MDMzYTg5NWMzZTI5YTBkN"} if not user_json: if not self.settings['auth']: # This can happen if the user's browser isn't allowing # persistent cookies (e.g. incognito mode) return {'upn': 'ANONYMOUS', 'session': generate_session_id()} return None user = json_decode(user_json) #user['ip_address'] = self.request.remote_ip return user
def clear_term_settings(self, term): """ Removes any settings associated with the given *term* in the user's term_settings.json file (in their session directory). """ term = str(term) self.term_log.debug("clear_term_settings(%s)" % term) term_settings = RUDict() term_settings[self.ws.location] = {term: {}} #session_dir = options.session_dir session_dir = self.settings['session_dir'] session_dir = os.path.join(session_dir, self.ws.session) settings_path = os.path.join(session_dir, 'term_settings.json') if not os.path.exists(settings_path): return # Nothing to do # First we read in the existing settings and then update them. if os.path.exists(settings_path): with io.open(settings_path, encoding='utf-8') as f: term_settings.update(json_decode(f.read())) del term_settings[self.ws.location][term] with io.open(settings_path, 'w', encoding='utf-8') as f: f.write(json_encode(term_settings)) self.trigger("terminal:clear_term_settings", term) #@require(authenticated(), policies('terminal'))
def graphql_request(self): return json_decode(self.request.body)
def on_message(self, message): data = json_decode(message) subid = data.get('id') if data.get('type') == 'subscription_start': self.on_subscribe(subid, data) elif data.get('type') == 'subscription_end': self.on_unsubscribe(subid, data) else: raise ValueError('Invalid type: {0}'.format(data.get('type')))
def response_handler(self, msg): ident, resp_bytes = msg resp = json_decode(to_unicode(resp_bytes)) app_log.debug('resp: %s', resp) subid = self.subscriptions.get('commandExecute') if subid is not None: self.write_message(json_encode({ 'type': 'subscription_data', 'id': subid, 'payload': { 'data': resp } }))
def test_types(self): headers = {"Cookie": "foo=bar"} response = self.fetch("/typecheck?foo=bar", headers=headers) data = json_decode(response.body) self.assertEqual(data, {}) response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers) data = json_decode(response.body) self.assertEqual(data, {}) # This is kind of hacky, but run some of the HTTPServer tests through # WSGIContainer and WSGIApplication to make sure everything survives # repeated disassembly and reassembly.
def fetch_json(self, *args, **kwargs): response = self.fetch(*args, **kwargs) response.rethrow() return json_decode(response.body)
def test_query_string_encoding(self): response = self.fetch("/echo?foo=%C3%A9") data = json_decode(response.body) self.assertEqual(data, {u("foo"): [u("\u00e9")]})
def test_empty_query_string(self): response = self.fetch("/echo?foo=&foo=") data = json_decode(response.body) self.assertEqual(data, {u("foo"): [u(""), u("")]})
def test_types(self): headers = {"Cookie": "foo=bar"} response = self.fetch("/typecheck?foo=bar", headers=headers) data = json_decode(response.body) self.assertEqual(data, {}) response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers) data = json_decode(response.body) self.assertEqual(data, {})
def test_double_slash(self): # urlparse.urlsplit (which tornado.httpserver used to use # incorrectly) would parse paths beginning with "//" as # protocol-relative urls. response = self.fetch("//doubleslash") self.assertEqual(200, response.code) self.assertEqual(json_decode(response.body), {})
def test_uncompressed(self): response = self.fetch('/', method='POST', body='foo=bar') self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
def test_gzip(self): response = self.post_gzip('foo=bar') self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
def fetch_chunk_sizes(self, **kwargs): response = self.fetch('/', method='POST', **kwargs) response.rethrow() chunks = json_decode(response.body) self.assertEqual(len(self.BODY), sum(chunks)) for chunk_size in chunks: self.assertLessEqual(chunk_size, self.CHUNK_SIZE, 'oversized chunk: ' + str(chunks)) self.assertGreater(chunk_size, 0, 'empty chunk: ' + str(chunks)) return chunks
def fetch_json(self, path): return json_decode(self.fetch(path).body)
def test_types(self): cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET, "asdf", "qwer")) response = self.fetch("/typecheck/asdf?foo=bar", headers={"Cookie": "asdf=" + cookie_value}) data = json_decode(response.body) self.assertEqual(data, {}) response = self.fetch("/typecheck/asdf?foo=bar", method="POST", headers={"Cookie": "asdf=" + cookie_value}, body="foo=bar")
def test_pos(self): response = self.fetch('/pos/foo') response.rethrow() data = json_decode(response.body) self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
def test_kw(self): response = self.fetch('/kw/foo') response.rethrow() data = json_decode(response.body) self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
def test_catch_error(self): response = self.fetch('/') self.assertEqual(json_decode(response.body), {'arg_name': 'foo', 'log_message': 'Missing argument foo'})
def test_flow_control_fixed_body(self): response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz', method='POST') response.rethrow() self.assertEqual(json_decode(response.body), dict(methods=['prepare', 'data_received', 'data_received', 'data_received', 'post']))
def test_flow_control_compressed_body(self): bytesio = BytesIO() gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio) gzip_file.write(b'abcdefghijklmnopqrstuvwxyz') gzip_file.close() compressed_body = bytesio.getvalue() response = self.fetch('/', body=compressed_body, method='POST', headers={'Content-Encoding': 'gzip'}) response.rethrow() self.assertEqual(json_decode(response.body), dict(methods=['prepare', 'data_received', 'data_received', 'data_received', 'post']))