我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.wrappers.Request()。
def __call__(self, environ, start_response): request = Request(environ) if request.path.startswith(self.prefix): method = request.path[len(self.prefix):] if method == '': # no trailing / start_response('302', [('location', self.prefix + '/')]) return '' try: funcname = self.urlmap[method] func = getattr(self, funcname) except (KeyError, AttributeError): response = Response(status=404) else: response = func(request) return response(environ, start_response) else: return self.app(environ, start_response)
def wsgi_app(self, environ, start_response): """ wsgi ?? ??????(cgI????), ????start_response????, ???? ????header??start_response???? ?????????? """ # ???wsgi??????? # self is app # active: _request_ctx_stack.top _request_ctx_stack.push(_RequestContext(self, environ)) try: request = Request(environ) response = self.dispatch_request(request) return response(environ, start_response) finally: _request_ctx_stack.pop()
def __call__(self, environ, start_response): code = self.code content = self.content request = Request(environ) self.requests.append(request) data = request.data if request.content_encoding == 'deflate': data = zlib.decompress(data) data = data.decode(request.charset) if request.content_type == 'application/json': data = json.loads(data) self.payloads.append(data) validator = VALIDATORS.get(request.path, None) if validator and not self.skip_validate: try: validator.validate(data) code = 202 except jsonschema.ValidationError as e: code = 400 content = json.dumps({'status': 'error', 'message': str(e)}) response = Response(status=code) response.headers.clear() response.headers.extend(self.headers) response.data = content return response(environ, start_response)
def test_accept_mixin(self): request = wrappers.Request({ 'HTTP_ACCEPT': 'text/xml,application/xml,application/xhtml+xml,' 'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5', 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'HTTP_ACCEPT_ENCODING': 'gzip,deflate', 'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5' }) self.assert_equal(request.accept_mimetypes, MIMEAccept([ ('text/xml', 1), ('image/png', 1), ('application/xml', 1), ('application/xhtml+xml', 1), ('text/html', 0.9), ('text/plain', 0.8), ('*/*', 0.5) ])) self.assert_strict_equal(request.accept_charsets, CharsetAccept([ ('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7) ])) self.assert_strict_equal(request.accept_encodings, Accept([ ('gzip', 1), ('deflate', 1)])) self.assert_strict_equal(request.accept_languages, LanguageAccept([ ('en-us', 1), ('en', 0.5)])) request = wrappers.Request({'HTTP_ACCEPT': ''}) self.assert_strict_equal(request.accept_mimetypes, MIMEAccept())
def test_etag_request_mixin(self): request = wrappers.Request({ 'HTTP_CACHE_CONTROL': 'no-store, no-cache', 'HTTP_IF_MATCH': 'w/"foo", bar, "baz"', 'HTTP_IF_NONE_MATCH': 'w/"foo", bar, "baz"', 'HTTP_IF_MODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT', 'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT' }) assert request.cache_control.no_store assert request.cache_control.no_cache for etags in request.if_match, request.if_none_match: assert etags('bar') assert etags.contains_raw('w/"foo"') assert etags.contains_weak('foo') assert not etags.contains('foo') self.assert_equal(request.if_modified_since, datetime(2008, 1, 22, 11, 18, 44)) self.assert_equal(request.if_unmodified_since, datetime(2008, 1, 22, 11, 18, 44))
def test_user_agent_mixin(self): user_agents = [ ('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) ' 'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11', 'en-US'), ('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54', 'opera', 'windows', '8.54', 'de-DE'), ('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 ' '(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3', 'safari', 'iphone', '419.3', 'en'), ('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)', 'google', None, '2.1', None) ] for ua, browser, platform, version, lang in user_agents: request = wrappers.Request({'HTTP_USER_AGENT': ua}) self.assert_strict_equal(request.user_agent.browser, browser) self.assert_strict_equal(request.user_agent.platform, platform) self.assert_strict_equal(request.user_agent.version, version) self.assert_strict_equal(request.user_agent.language, lang) assert bool(request.user_agent) self.assert_strict_equal(request.user_agent.to_header(), ua) self.assert_strict_equal(str(request.user_agent), ua) request = wrappers.Request({'HTTP_USER_AGENT': 'foo'}) assert not request.user_agent
def test_common_request_descriptors_mixin(self): request = wrappers.Request.from_values(content_type='text/html; charset=utf-8', content_length='23', headers={ 'Referer': 'http://www.example.com/', 'Date': 'Sat, 28 Feb 2009 19:04:35 GMT', 'Max-Forwards': '10', 'Pragma': 'no-cache', 'Content-Encoding': 'gzip', 'Content-MD5': '9a3bc6dbc47a70db25b84c6e5867a072' }) self.assert_equal(request.content_type, 'text/html; charset=utf-8') self.assert_equal(request.mimetype, 'text/html') self.assert_equal(request.mimetype_params, {'charset': 'utf-8'}) self.assert_equal(request.content_length, 23) self.assert_equal(request.referrer, 'http://www.example.com/') self.assert_equal(request.date, datetime(2009, 2, 28, 19, 4, 35)) self.assert_equal(request.max_forwards, 10) self.assert_true('no-cache' in request.pragma) self.assert_equal(request.content_encoding, 'gzip') self.assert_equal(request.content_md5, '9a3bc6dbc47a70db25b84c6e5867a072')
def test_file_closing(self): data = (b'--foo\r\n' b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n' b'Content-Type: text/plain; charset=utf-8\r\n\r\n' b'file contents, just the contents\r\n' b'--foo--') req = wrappers.Request.from_values( input_stream=BytesIO(data), content_length=len(data), content_type='multipart/form-data; boundary=foo', method='POST' ) foo = req.files['foo'] self.assert_equal(foo.mimetype, 'text/plain') self.assert_equal(foo.filename, 'foo.txt') self.assert_equal(foo.closed, False) req.close() self.assert_equal(foo.closed, True)
def test_ranges(self): # basic range stuff req = wrappers.Request.from_values() assert req.range is None req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'}) self.assert_equal(req.range.ranges, [(0, 500)]) resp = wrappers.Response() resp.content_range = req.range.make_content_range(1000) self.assert_equal(resp.content_range.units, 'bytes') self.assert_equal(resp.content_range.start, 0) self.assert_equal(resp.content_range.stop, 500) self.assert_equal(resp.content_range.length, 1000) self.assert_equal(resp.headers['Content-Range'], 'bytes 0-499/1000') resp.content_range.unset() assert 'Content-Range' not in resp.headers resp.headers['Content-Range'] = 'bytes 0-499/1000' self.assert_equal(resp.content_range.units, 'bytes') self.assert_equal(resp.content_range.start, 0) self.assert_equal(resp.content_range.stop, 500) self.assert_equal(resp.content_range.length, 1000)
def __call__(self, environ, start_response): req = Request(environ, shallow=True) host = req.headers.get('Host') if host not in ('metadata.google.internal', '169.254.169.254' , 'metadata'): status = '403 Forbidden' response_headers = [('Content-Type','text/html; charset=UTF-8')] start_response(status, response_headers) return ['Host Header nust be one of (metadata.google.internal, metadata, 169.254.169.254)'] req_path = environ.get('PATH_INFO') metadata_flavor = req.headers.get('Metadata-Flavor') if (metadata_flavor is None and req_path != '/'): status = '403 Forbidden' response_headers = [('Content-Type','text/html; charset=UTF-8')] start_response(status, response_headers) logging.error("Metadata-Flavor: Google header not sent for: " + environ.get('PATH_INFO')) t = Template('''<p>Your client does not have permission to get URL <code>{{ err_path }}</code> from this server. Missing Metadata-Flavor:Google header. ''') return [str(t.render(err_path= environ.get('PATH_INFO')))] return self.app(environ, start_response) # Make sure every response has these headers (which is what gce does)
def zabbix_fake_app(environ, start_response): request = Request(environ) request_body = request.get_data(as_text=True) if getattr(zabbix_fake_app, 'status', False): response = Response(status=zabbix_fake_app.status) response.data = zabbix_fake_app.content return response(environ, start_response) response = Response(status=200, headers=[('Content-type', 'application/json')]) if '"method": "user.login"' in request_body: json_string = '{"jsonrpc":"2.0","result":"9287f336ffb611e586aa5e5517507c66","id":0}' elif '"method": "host.get"' in request_body: json_string = open('tests/fixtures/host.get_success.json').read() elif '"method": "item.get"' in request_body: json_string = open('tests/fixtures/items.get_success.json').read() else: json_string = 'Unrecognized test request' response.data = json_string return response(environ, start_response)
def get_response(ip, forwarded=None): req_dict = {'REMOTE_ADDR': ip} if forwarded: req_dict['HTTP_X_FORWARDED_FOR'] = forwarded return protected(None, Request(req_dict))
def wsgi_app(self, environ, start_response): request = Request(environ) self.pre_handle(request) response = self.dispatch_response(request) self.post_handle(request, response) return response(environ, start_response)
def wsgi_app(self, environ, start_response): request = Request(environ) response = self.dispatch_request(request) return response(environ, start_response)
def wsgi_app(self, environ, start_response): route = self.router.bind_to_environ(environ) try: endpoint, args = route.match() except RequestRedirect as e: return e except HTTPException: return NotFound() request = Request(environ) args = request.args response = Response() response.mimetype = 'text/plain' response.status_code = 200 if endpoint == 'contestant': if 'mac' not in args or 'row' not in args or 'col' not in args: response.status_code = 400 response.data = 'Required query parameters: mac, row, col' else: mac = args['mac'] row = args['row'] col = args['col'] self.add_contestant(mac, row, col, response) elif endpoint == 'worker': if 'mac' not in args or 'num' not in args: response.status_code = 400 response.data = 'Required query parameters: mac, num' else: mac = args['mac'] num = args['num'] self.add_worker(mac, num, response) elif endpoint == 'reboot_timestamp': response.data = str(self.reboot_string) return response
def wsgi_app(self, environ, start_response): route = self.router.bind_to_environ(environ) try: endpoint, args = route.match() except RequestRedirect as e: return e except HTTPException: return NotFound() request = Request(environ) get_args = dict(request.args) if endpoint == 'wipe': get_args['wipe'] = 'pixie_wipe=force' else: get_args['wipe'] = "" response = Response() response.mimetype = 'text/plain' response.status_code = 200 config = None if 'ip' in get_args: ip_addr = ipaddress.ip_address(get_args['ip'][0]) for cfg in self.configs: if ip_addr in cfg['subnet']: config = cfg if config is None: response.data = CONFIGSCRIPT.format(**self.default_config) else: for (k, v) in config.items(): get_args[k] = v response.data = BOOTSCRIPT.format(**get_args) return response
def test_query_string_is_bytes(self): req = wrappers.Request.from_values(u'/?foo=%2f') self.assert_strict_equal(req.query_string, b'foo=%2f')
def test_access_route(self): req = wrappers.Request.from_values(headers={ 'X-Forwarded-For': '192.168.1.2, 192.168.1.1' }) req.environ['REMOTE_ADDR'] = '192.168.1.3' self.assert_equal(req.access_route, ['192.168.1.2', '192.168.1.1']) self.assert_strict_equal(req.remote_addr, '192.168.1.3') req = wrappers.Request.from_values() req.environ['REMOTE_ADDR'] = '192.168.1.3' self.assert_strict_equal(list(req.access_route), ['192.168.1.3'])
def test_url_request_descriptors(self): req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test') self.assert_strict_equal(req.path, u'/bar') self.assert_strict_equal(req.full_path, u'/bar?foo=baz') self.assert_strict_equal(req.script_root, u'/test') self.assert_strict_equal(req.url, u'http://example.com/test/bar?foo=baz') self.assert_strict_equal(req.base_url, u'http://example.com/test/bar') self.assert_strict_equal(req.url_root, u'http://example.com/test/') self.assert_strict_equal(req.host_url, u'http://example.com/') self.assert_strict_equal(req.host, 'example.com') self.assert_strict_equal(req.scheme, 'http') req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test') self.assert_strict_equal(req.scheme, 'https')
def test_url_request_descriptors_query_quoting(self): next = 'http%3A%2F%2Fwww.example.com%2F%3Fnext%3D%2F' req = wrappers.Request.from_values('/bar?next=' + next, 'http://example.com/') self.assert_equal(req.path, u'/bar') self.assert_strict_equal(req.full_path, u'/bar?next=' + next) self.assert_strict_equal(req.url, u'http://example.com/bar?next=' + next)
def test_authorization_mixin(self): request = wrappers.Request.from_values(headers={ 'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==' }) a = request.authorization self.assert_strict_equal(a.type, 'basic') self.assert_strict_equal(a.username, 'Aladdin') self.assert_strict_equal(a.password, 'open sesame')
def test_stream_wrapping(self): class LowercasingStream(object): def __init__(self, stream): self._stream = stream def read(self, size=-1): return self._stream.read(size).lower() def readline(self, size=-1): return self._stream.readline(size).lower() data = b'foo=Hello+World' req = wrappers.Request.from_values('/', method='POST', data=data, content_type='application/x-www-form-urlencoded') req.stream = LowercasingStream(req.stream) self.assert_equal(req.form['foo'], 'hello world')
def test_get_data_method_parsing_caching_behavior(self): data = b'foo=Hello+World' req = wrappers.Request.from_values('/', method='POST', data=data, content_type='application/x-www-form-urlencoded') # get_data() caches, so form stays available self.assert_equal(req.get_data(), data) self.assert_equal(req.form['foo'], u'Hello World') self.assert_equal(req.get_data(), data) # here we access the form data first, caching is bypassed req = wrappers.Request.from_values('/', method='POST', data=data, content_type='application/x-www-form-urlencoded') self.assert_equal(req.form['foo'], u'Hello World') self.assert_equal(req.get_data(), b'') # Another case is uncached get data which trashes everything req = wrappers.Request.from_values('/', method='POST', data=data, content_type='application/x-www-form-urlencoded') self.assert_equal(req.get_data(cache=False), data) self.assert_equal(req.get_data(cache=False), b'') self.assert_equal(req.form, {}) # Or we can implicitly start the form parser which is similar to # the old .data behavior req = wrappers.Request.from_values('/', method='POST', data=data, content_type='application/x-www-form-urlencoded') self.assert_equal(req.get_data(parse_form_data=True), b'') self.assert_equal(req.form['foo'], u'Hello World')
def test_shallow_mode(self): request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True) self.assert_equal(request.args['foo'], 'bar') self.assert_raises(RuntimeError, lambda: request.form['foo'])
def test_form_parsing_failed(self): data = ( b'--blah\r\n' ) data = wrappers.Request.from_values(input_stream=BytesIO(data), content_length=len(data), content_type='multipart/form-data; boundary=foo', method='POST') assert not data.files assert not data.form
def test_url_charset_reflection(self): req = wrappers.Request.from_values() req.charset = 'utf-7' self.assert_equal(req.url_charset, 'utf-7')
def test_response_iter_wrapping(self): def uppercasing(iterator): for item in iterator: yield item.upper() def generator(): yield 'foo' yield 'bar' req = wrappers.Request.from_values() resp = wrappers.Response(generator()) del resp.headers['Content-Length'] resp.response = uppercasing(resp.iter_encoded()) actual_resp = wrappers.Response.from_app(resp, req.environ, buffered=True) self.assertEqual(actual_resp.get_data(), b'FOOBAR')
def test_other_method_payload(self): data = b'Hello World' req = wrappers.Request.from_values(input_stream=BytesIO(data), content_length=len(data), content_type='text/plain', method='WHAT_THE_FUCK') self.assert_equal(req.get_data(), data) self.assert_is_instance(req.stream, LimitedStream)
def test_form_data_ordering(self): class MyRequest(wrappers.Request): parameter_storage_class = ImmutableOrderedMultiDict req = MyRequest.from_values('/?foo=1&bar=0&foo=3') self.assert_equal(list(req.args), ['foo', 'bar']) self.assert_equal(list(req.args.items(multi=True)), [ ('foo', '1'), ('bar', '0'), ('foo', '3') ]) self.assert_is_instance(req.args, ImmutableOrderedMultiDict) self.assert_is_instance(req.values, CombinedMultiDict) self.assert_equal(req.values['foo'], '1') self.assert_equal(req.values.getlist('foo'), ['1', '3'])
def test_modified_url_encoding(self): class ModifiedRequest(wrappers.Request): url_charset = 'euc-kr' req = ModifiedRequest.from_values(u'/?foo=????'.encode('euc-kr')) self.assert_strict_equal(req.args['foo'], u'????')
def test_reverse_slash_behavior(self): class MyRequest(wrappers.ReverseSlashBehaviorRequestMixin, Request): pass req = MyRequest.from_values('/foo/bar', 'http://example.com/test') assert req.url == 'http://example.com/test/foo/bar' assert req.path == 'foo/bar' assert req.script_root == '/test/' # make sure the routing system works with the slashes in # reverse order as well. map = routing.Map([routing.Rule('/foo/bar', endpoint='foo')]) adapter = map.bind_to_environ(req.environ) assert adapter.match() == ('foo', {}) adapter = map.bind(req.host, req.script_root) assert adapter.match(req.path) == ('foo', {})
def test_dynamic_charset_request_mixin(self): class MyRequest(wrappers.DynamicCharsetRequestMixin, Request): pass env = {'CONTENT_TYPE': 'text/html'} req = MyRequest(env) assert req.charset == 'latin1' env = {'CONTENT_TYPE': 'text/html; charset=utf-8'} req = MyRequest(env) assert req.charset == 'utf-8' env = {'CONTENT_TYPE': 'application/octet-stream'} req = MyRequest(env) assert req.charset == 'latin1' assert req.url_charset == 'latin1' MyRequest.url_charset = 'utf-8' env = {'CONTENT_TYPE': 'application/octet-stream'} req = MyRequest(env) assert req.charset == 'latin1' assert req.url_charset == 'utf-8' def return_ascii(x): return "ascii" env = {'CONTENT_TYPE': 'text/plain; charset=x-weird-charset'} req = MyRequest(env) req.unknown_charset = return_ascii assert req.charset == 'ascii' assert req.url_charset == 'utf-8'
def application(environ, start_responseonse): request = Request(environ) if request.method == 'POST': response = stats(request) else: response = upload_file(request) return response(environ, start_responseonse)
def wsgi_app(app, environ, start_response): # ????? request = Request(environ) # ?????????????????????? response = app.dispatch_request(request) # ?????? return response(environ, start_response)
def __call__(self, environ, start_response): """ The WSGI application interface. """ request = Request(environ) response = self.handle(request) return response(environ, start_response)
def request(**kwargs): return Request(EnvironBuilder(**kwargs).get_environ())
def test_parse_body(self): request = Request({}) request.form = {"foo": "bar"} body = self.handler.parse_body(request) self.assertEqual("bar", body["foo"])
def test_call(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) request.form = { 'param': 42 } res = handler._call(handler.dummy_endpoint, {}, request) self.assertEqual(43, res["incremented"]) Logger.c.execute("SELECT * FROM logs WHERE category = 'HTTP'") row = Logger.c.fetchone() self.assertIn("1.2.3.4", row[3]) self.assertIn("dummy_endpoint", row[3])
def test_call_default(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) res = handler._call(handler.dummy_endpoint, {}, request) self.assertEqual(124, res["incremented"])
def test_call_cast_parameter(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) request.form = { 'param': '42' } res = handler._call(handler.dummy_endpoint, {}, request) self.assertEqual(43, res["incremented"])
def test_call_fail_cast_parameter(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) request.form = { 'param': 'nope' } with self.assertRaises(BadRequest): handler._call(handler.dummy_endpoint, {}, request)
def test_call_required_args(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) with self.assertRaises(BadRequest) as ex: handler._call(handler.required, {}, request) response = ex.exception.response self.assertIn("MISSING_PARAMETER", response.data.decode()) self.assertIn("param", response.data.decode())
def test_call_with_error(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) with self.assertRaises(Forbidden) as ex: handler._call(handler.required, {'param': 42}, request) response = ex.exception.response self.assertIn("NOBUONO", response.data.decode()) self.assertIn("nononono", response.data.decode())
def test_call_general_attrs(self, name_mock, content_mock, ip_mock): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) res = handler._call(handler.myip, {}, request) self.assertEqual("1.2.3.4", res)
def test_call_file(self): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) request.files = {"file": FileStorage(filename="foo")} res = handler._call(handler.file, {}, request) self.assertEqual("foo", res)
def test_call_with_decorators(self): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) Database.add_user("token", "", "") Database.add_task("poldo", "", "", 1, 1) Database.add_input("inputid", "token", "poldo", 1, "", 42) Database.add_output("outputid", "inputid", "", 42, "") handler._call(handler.with_decorators, {"input_id": "inputid", "output_id":"outputid"}, request)
def test_get_file_name_no_file(self): request = Request(Environ()) request.files = {} self.assertIsNone(BaseHandler._get_file_name(request))
def test_get_file_content(self): request = Request(Environ()) stream = _io.BytesIO("hello world".encode()) request.files = {"file": FileStorage(stream=stream, filename="foo")} self.assertEqual("hello world", BaseHandler._get_file_content(request).decode())