我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用tornado.escape.parse_qs_bytes()。
def _on_access_token(self, redirect_uri, client_id, client_secret, callback, fields, response): if response.error: logging.warning('Facebook auth error: %s' % str(response)) callback(None) return args = escape.parse_qs_bytes(escape.native_str(response.body)) session = { "access_token": args["access_token"][-1], "expires": args.get("expires") } self.facebook_request( path="/me", callback=self.async_callback( self._on_get_user_info, callback, session, fields), access_token=session["access_token"], fields=",".join(fields) )
def _on_request_body(self, data): self._request.body = data content_type = self._request.headers.get("Content-Type", "") if self._request.method in ("POST", "PUT"): if content_type.startswith("application/x-www-form-urlencoded"): arguments = parse_qs_bytes(native_str(self._request.body)) for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self._request.arguments.setdefault(name, []).extend( values) elif content_type.startswith("multipart/form-data"): fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: httputil.parse_multipart_form_data( utf8(v), data, self._request.arguments, self._request.files) break else: logging.warning("Invalid multipart/form-data") self.request_callback(self._request)
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): if response.error: future.set_exception(AuthError('Facebook auth error: %s' % str(response))) return args = escape.parse_qs_bytes(escape.native_str(response.body)) session = { "access_token": args["access_token"][-1], "expires": args.get("expires") } self.facebook_request( path="/me", callback=self.async_callback( self._on_get_user_info, future, session, fields), access_token=session["access_token"], fields=",".join(fields) )
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): if response.error: future.set_exception(AuthError('Facebook auth error: %s' % str(response))) return args = escape.parse_qs_bytes(escape.native_str(response.body)) session = { "access_token": args["access_token"][-1], "expires": args.get("expires") } self.facebook_request( path="/me", callback=functools.partial( self._on_get_user_info, future, session, fields), access_token=session["access_token"], fields=",".join(fields) )
def parse_body_arguments(content_type, body, arguments, files): """Parses a form request body. Supports ``application/x-www-form-urlencoded`` and ``multipart/form-data``. The ``content_type`` parameter should be a string and ``body`` should be a byte string. The ``arguments`` and ``files`` parameters are dictionaries that will be updated with the parsed contents. """ if content_type.startswith("application/x-www-form-urlencoded"): uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True) for name, values in uri_arguments.items(): if values: arguments.setdefault(name, []).extend(values) elif content_type.startswith("multipart/form-data"): fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: parse_multipart_form_data(utf8(v), body, arguments, files) break else: gen_log.warning("Invalid multipart/form-data")
def _on_openid(self, redirect_uri, appid, secret, future, response): if response.error: future.set_exception(AuthError('WechatMp auth error: %s') % str(response.error)) return #args = escape.parse_qs_bytes(escape.native_str(response.body)) logging.debug("response.body: %s" % response.body) args = escape.json_decode(response.body) logging.debug("args[access_token]: %s" % args.get('access_token')) session = { 'access_token': args.get('access_token'), 'expires_in': args.get('expires_in'), 'refresh_token': args.get('refresh_token'), 'openid': args.get('openid'), 'scope': args.get('scope'), } future.set_result(session)
def _on_access_token(self, redirect_uri, appid, secret, future, fields, response): if response.error: future.set_exception(AuthError('WechatMp auth error: %s' % str(response))) return #args = escape.parse_qs_bytes(escape.native_str(response.body)) args = escape.json_decode(response.body) session = { 'access_token': args.get('access_token'), 'expires_in': args.get('expires_in'), 'refresh_token': args.get('refresh_token'), 'openid': args.get('openid'), 'scope': args.get('scope'), } self.wechat_request( path="/userinfo", callback=functools.partial( self._on_get_user_info, future, session, fields), access_token=session['access_token'], fields=",".join(fields) )
def __init__(self, method=None, uri=None, version="HTTP/1.0", headers=None, body=None, host=None, files=None, connection=None, start_line=None): if start_line is not None: method, uri, version = start_line self.method = method self.uri = uri self.version = version self.headers = headers or HTTPHeaders() self.body = body or b"" # set remote IP and protocol context = getattr(connection, 'context', None) self.remote_ip = getattr(context, 'remote_ip', None) self.protocol = getattr(context, 'protocol', "http") self.host = host or self.headers.get("Host") or "127.0.0.1" self.files = files or {} self.connection = connection self._start_time = time.time() self._finish_time = None self.path, sep, self.query = uri.partition('?') self.arguments = parse_qs_bytes(self.query, keep_blank_values=True) self.query_arguments = copy.deepcopy(self.arguments) self.body_arguments = {}
def parse_body_arguments(content_type, body, arguments, files, headers=None): """Parses a form request body. Supports ``application/x-www-form-urlencoded`` and ``multipart/form-data``. The ``content_type`` parameter should be a string and ``body`` should be a byte string. The ``arguments`` and ``files`` parameters are dictionaries that will be updated with the parsed contents. """ if headers and 'Content-Encoding' in headers: gen_log.warning("Unsupported Content-Encoding: %s", headers['Content-Encoding']) return if content_type.startswith("application/x-www-form-urlencoded"): try: uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True) except Exception as e: gen_log.warning('Invalid x-www-form-urlencoded body: %s', e) uri_arguments = {} for name, values in uri_arguments.items(): if values: arguments.setdefault(name, []).extend(values) elif content_type.startswith("multipart/form-data"): try: fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: parse_multipart_form_data(utf8(v), body, arguments, files) break else: raise ValueError("multipart boundary not found") except Exception as e: gen_log.warning("Invalid multipart/form-data: %s", e)
def __init__(self, method, uri, version="HTTP/1.0", headers=None, body=None, remote_ip=None, protocol=None, host=None, files=None, connection=None): self.method = method self.uri = uri self.version = version self.headers = headers or httputil.HTTPHeaders() self.body = body or "" if connection and connection.xheaders: # Squid uses X-Forwarded-For, others use X-Real-Ip self.remote_ip = self.headers.get( "X-Real-Ip", self.headers.get("X-Forwarded-For", remote_ip)) # AWS uses X-Forwarded-Proto self.protocol = self.headers.get( "X-Scheme", self.headers.get("X-Forwarded-Proto", protocol)) if self.protocol not in ("http", "https"): self.protocol = "http" else: self.remote_ip = remote_ip if protocol: self.protocol = protocol elif connection and isinstance(connection.stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" self.host = host or self.headers.get("Host") or "127.0.0.1" self.files = files or {} self.connection = connection self._start_time = time.time() self._finish_time = None scheme, netloc, path, query, fragment = urlparse.urlsplit(native_str(uri)) self.path = path self.query = query arguments = parse_qs_bytes(query) self.arguments = {} for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self.arguments[name] = values
def parse_body_arguments(content_type, body, arguments, files): """Parses a form request body. Supports ``application/x-www-form-urlencoded`` and ``multipart/form-data``. The ``content_type`` parameter should be a string and ``body`` should be a byte string. The ``arguments`` and ``files`` parameters are dictionaries that will be updated with the parsed contents. """ if content_type.startswith("application/x-www-form-urlencoded"): try: uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True) except Exception as e: gen_log.warning('Invalid x-www-form-urlencoded body: %s', e) uri_arguments = {} for name, values in uri_arguments.items(): if values: arguments.setdefault(name, []).extend(values) elif content_type.startswith("multipart/form-data"): fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: parse_multipart_form_data(utf8(v), body, arguments, files) break else: gen_log.warning("Invalid multipart/form-data")
def __init__(self, environ): """Parses the given WSGI environment to construct the request.""" self.method = environ["REQUEST_METHOD"] self.path = urllib_parse.quote(from_wsgi_str(environ.get("SCRIPT_NAME", ""))) self.path += urllib_parse.quote(from_wsgi_str(environ.get("PATH_INFO", ""))) self.uri = self.path self.arguments = {} self.query = environ.get("QUERY_STRING", "") if self.query: self.uri += "?" + self.query self.arguments = parse_qs_bytes(native_str(self.query), keep_blank_values=True) self.version = "HTTP/1.1" self.headers = httputil.HTTPHeaders() if environ.get("CONTENT_TYPE"): self.headers["Content-Type"] = environ["CONTENT_TYPE"] if environ.get("CONTENT_LENGTH"): self.headers["Content-Length"] = environ["CONTENT_LENGTH"] for key in environ: if key.startswith("HTTP_"): self.headers[key[5:].replace("_", "-")] = environ[key] if self.headers.get("Content-Length"): self.body = environ["wsgi.input"].read( int(self.headers["Content-Length"])) else: self.body = "" self.protocol = environ["wsgi.url_scheme"] self.remote_ip = environ.get("REMOTE_ADDR", "") if environ.get("HTTP_HOST"): self.host = environ["HTTP_HOST"] else: self.host = environ["SERVER_NAME"] # Parse request body self.files = {} httputil.parse_body_arguments(self.headers.get("Content-Type", ""), self.body, self.arguments, self.files) self._start_time = time.time() self._finish_time = None
def __init__(self, method, uri, version="HTTP/1.0", headers=None, body=None, remote_ip=None, protocol=None, host=None, files=None, connection=None): self.method = method self.uri = uri self.version = version self.headers = headers or httputil.HTTPHeaders() self.body = body or "" # set remote IP and protocol self.remote_ip = remote_ip if protocol: self.protocol = protocol elif connection and isinstance(connection.stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" # xheaders can override the defaults if connection and connection.xheaders: # Squid uses X-Forwarded-For, others use X-Real-Ip ip = self.headers.get( "X-Real-Ip", self.headers.get("X-Forwarded-For", self.remote_ip)) if netutil.is_valid_ip(ip): self.remote_ip = ip # AWS uses X-Forwarded-Proto proto = self.headers.get( "X-Scheme", self.headers.get("X-Forwarded-Proto", self.protocol)) if proto in ("http", "https"): self.protocol = proto self.host = host or self.headers.get("Host") or "127.0.0.1" self.files = files or {} self.connection = connection self._start_time = time.time() self._finish_time = None self.path, sep, self.query = uri.partition('?') self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
def __init__(self, environ): """Parses the given WSGI environment to construct the request.""" self.method = environ["REQUEST_METHOD"] self.path = urllib_parse.quote(from_wsgi_str(environ.get("SCRIPT_NAME", ""))) self.path += urllib_parse.quote(from_wsgi_str(environ.get("PATH_INFO", ""))) self.uri = self.path self.arguments = {} self.query_arguments = {} self.body_arguments = {} self.query = environ.get("QUERY_STRING", "") if self.query: self.uri += "?" + self.query self.arguments = parse_qs_bytes(native_str(self.query), keep_blank_values=True) self.query_arguments = copy.deepcopy(self.arguments) self.version = "HTTP/1.1" self.headers = httputil.HTTPHeaders() if environ.get("CONTENT_TYPE"): self.headers["Content-Type"] = environ["CONTENT_TYPE"] if environ.get("CONTENT_LENGTH"): self.headers["Content-Length"] = environ["CONTENT_LENGTH"] for key in environ: if key.startswith("HTTP_"): self.headers[key[5:].replace("_", "-")] = environ[key] if self.headers.get("Content-Length"): self.body = environ["wsgi.input"].read( int(self.headers["Content-Length"])) else: self.body = "" self.protocol = environ["wsgi.url_scheme"] self.remote_ip = environ.get("REMOTE_ADDR", "") if environ.get("HTTP_HOST"): self.host = environ["HTTP_HOST"] else: self.host = environ["SERVER_NAME"] # Parse request body self.files = {} httputil.parse_body_arguments(self.headers.get("Content-Type", ""), self.body, self.body_arguments, self.files) for k, v in self.body_arguments.items(): self.arguments.setdefault(k, []).extend(v) self._start_time = time.time() self._finish_time = None
def __init__(self, method, uri, version="HTTP/1.0", headers=None, body=None, remote_ip=None, protocol=None, host=None, files=None, connection=None): self.method = method self.uri = uri self.version = version self.headers = headers or httputil.HTTPHeaders() self.body = body or "" # set remote IP and protocol self.remote_ip = remote_ip if protocol: self.protocol = protocol elif connection and isinstance(connection.stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" # xheaders can override the defaults if connection and connection.xheaders: # Squid uses X-Forwarded-For, others use X-Real-Ip ip = self.headers.get("X-Forwarded-For", self.remote_ip) ip = ip.split(',')[-1].strip() ip = self.headers.get( "X-Real-Ip", ip) if netutil.is_valid_ip(ip): self.remote_ip = ip # AWS uses X-Forwarded-Proto proto = self.headers.get( "X-Scheme", self.headers.get("X-Forwarded-Proto", self.protocol)) if proto in ("http", "https"): self.protocol = proto self.host = host or self.headers.get("Host") or "127.0.0.1" self.files = files or {} self.connection = connection self._start_time = time.time() self._finish_time = None self.path, sep, self.query = uri.partition('?') self.arguments = parse_qs_bytes(self.query, keep_blank_values=True) self.query_arguments = copy.deepcopy(self.arguments) self.body_arguments = {}