我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用httplib.HTTPResponse()。
def __init__(self, info): # info is either an email.Message or # an httplib.HTTPResponse object. if isinstance(info, httplib.HTTPResponse): for key, value in info.getheaders(): self[key.lower()] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.Message.Message): for key, value in info.items(): self[key.lower()] = value self.status = int(self['status']) else: for key, value in info.iteritems(): self[key.lower()] = value self.status = int(self.get('status', self.status)) self.reason = self.get('reason', self.reason)
def get_headers(http_response): """Retrieves all HTTP headers from an HTTP response from the server. This method is provided for backwards compatibility for Python2.2 and 2.3. The httplib.HTTPResponse object in 2.2 and 2.3 does not have a getheaders method so this function will use getheaders if available, but if not it will retrieve a few using getheader. """ if hasattr(http_response, 'getheaders'): return http_response.getheaders() else: headers = [] for header in ( 'location', 'content-type', 'content-length', 'age', 'allow', 'cache-control', 'content-location', 'content-encoding', 'date', 'etag', 'expires', 'last-modified', 'pragma', 'server', 'set-cookie', 'transfer-encoding', 'vary', 'via', 'warning', 'www-authenticate', 'gdata-version'): value = http_response.getheader(header, None) if value is not None: headers.append((header, value)) return headers
def parse_http_response(sock): try: # H4ck to standardize the API between sockets and SSLConnection objects response = sock.read(4096) except AttributeError: response = sock.recv(4096) if 'HTTP/' not in response: # Try to get the rest of the response try: response += sock.read(4096) except AttributeError: response += sock.recv(4096) fake_sock = FakeSocket(response) response = HTTPResponse(fake_sock) response.begin() return response
def create_tcp_connection(self, hostname, port, timeout, **kwargs): sock = socket.create_connection((self.proxy_host, int(self.proxy_port))) if hostname.endswith('.appspot.com'): hostname = 'www.google.com' request_data = 'CONNECT %s:%s HTTP/1.1\r\n' % (hostname, port) if self.proxy_username and self.proxy_password: request_data += 'Proxy-Authorization: Basic %s\r\n' % base64.b64encode(('%s:%s' % (self.proxy_username, self.proxy_password)).encode()).decode().strip() request_data += '\r\n' sock.sendall(request_data) response = httplib.HTTPResponse(sock) response.fp.close() response.fp = sock.makefile('rb', 0) response.begin() if response.status >= 400: raise httplib.BadStatusLine('%s %s %s' % (response.version, response.status, response.reason)) return sock
def test_appid_exist(ssl_sock, appid): request_data = 'GET /_gh/ HTTP/1.1\r\nHost: %s.appspot.com\r\n\r\n' % appid ssl_sock.send(request_data.encode()) response = httplib.HTTPResponse(ssl_sock, buffering=True) response.begin() if response.status == 404: #xlog.warn("app check %s status:%d", appid, response.status) return False if response.status == 503: # out of quota return True if response.status != 200: xlog.warn("test appid %s status:%d", appid, response.status) content = response.read() if "GoAgent" not in content: #xlog.warn("app check %s content:%s", appid, content) return False return True
def __init__(self, info): # info is either an email.Message or # an httplib.HTTPResponse object. if isinstance(info, httplib.HTTPResponse): for key, value in info.getheaders(): self[key.lower()] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.Message.Message): for key, value in info.items(): self[key] = value self.status = int(self['status']) else: for key, value in info.iteritems(): self[key] = value self.status = int(self.get('status', self.status))
def __init__(self, info): # info is either an email.Message or # an httplib.HTTPResponse object. if isinstance(info, httplib.HTTPResponse): for key, value in info.getheaders(): self[key] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.Message.Message): for key, value in info.items(): self[key] = value self.status = int(self['status']) else: for key, value in info.iteritems(): self[key] = value self.status = int(self.get('status', self.status))
def test_malformed_truncation(self): # Other malformed header lines, especially without colons, used to # cause the rest of the header section to be truncated resp = ( b'HTTP/1.1 200 OK\r\n' b'Public-Key-Pins: \n' b'pin-sha256="xxx=";\n' b'report-uri="https://..."\r\n' b'Transfer-Encoding: chunked\r\n' b'\r\n' b'4\r\nbody\r\n0\r\n\r\n' ) resp = httplib.HTTPResponse(FakeSocket(resp)) resp.begin() self.assertIsNotNone(resp.getheader('Public-Key-Pins')) self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked') self.assertEqual(resp.read(), b'body')
def test_parse_all_octets(self): # Ensure no valid header field octet breaks the parser body = ( b'HTTP/1.1 200 OK\r\n' b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters b'VCHAR: ' + bytearray(range(0x21, 0x7E + 1)) + b'\r\n' b'obs-text: ' + bytearray(range(0x80, 0xFF + 1)) + b'\r\n' b'obs-fold: text\r\n' b' folded with space\r\n' b'\tfolded with tab\r\n' b'Content-Length: 0\r\n' b'\r\n' ) sock = FakeSocket(body) resp = httplib.HTTPResponse(sock) resp.begin() self.assertEqual(resp.getheader('Content-Length'), '0') self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) self.assertEqual(resp.getheader('VCHAR'), vchar) self.assertIsNotNone(resp.getheader('obs-text')) folded = resp.getheader('obs-fold') self.assertTrue(folded.startswith('text')) self.assertIn(' folded with space', folded) self.assertTrue(folded.endswith('folded with tab'))
def test_response_headers(self): # test response with multiple message headers with the same field name. text = ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: Customer="WILE_E_COYOTE";' ' Version="1"; Path="/acme"\r\n' 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' ' Path="/acme"\r\n' '\r\n' 'No body\r\n') hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' ', ' 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') s = FakeSocket(text) r = httplib.HTTPResponse(s) r.begin() cookies = r.getheader("Set-Cookie") if cookies != hdr: self.fail("multiple headers not combined properly")
def test_chunked_head(self): chunked_start = ( 'HTTP/1.1 200 OK\r\n' 'Transfer-Encoding: chunked\r\n\r\n' 'a\r\n' 'hello world\r\n' '1\r\n' 'd\r\n' ) sock = FakeSocket(chunked_start + '0\r\n') resp = httplib.HTTPResponse(sock, method="HEAD") resp.begin() self.assertEqual(resp.read(), '') self.assertEqual(resp.status, 200) self.assertEqual(resp.reason, 'OK') self.assertTrue(resp.isclosed())
def test_close(self): import httplib # calling .close() on urllib2's response objects should close the # underlying socket # delve deep into response to fetch socket._socketobject response = _urlopen_with_retry("http://www.example.com/") abused_fileobject = response.fp self.assertIs(abused_fileobject.__class__, socket._fileobject) httpresponse = abused_fileobject._sock self.assertIs(httpresponse.__class__, httplib.HTTPResponse) fileobject = httpresponse.fp self.assertIs(fileobject.__class__, socket._fileobject) self.assertTrue(not fileobject.closed) response.close() self.assertTrue(fileobject.closed)
def parse_json(http_response, response): """If the body is not empty, convert it to a python object and set as the value of response.body. http_response is always closed if no error occurs. :param http_response: the http_response object returned by HTTPConnection.getresponse() :type http_response: httplib.HTTPResponse :param response: general response object which will be returned to the caller :type response: baidubce.BceResponse :return: always true :rtype bool """ body = http_response.read() if body: response.__dict__.update(json.loads(body, object_hook=utils.dict_to_python_object).__dict__) http_response.close() return True
def _parse_bos_object(http_response, response): """Sets response.body to http_response and response.user_metadata to a dict consists of all http headers starts with 'x-bce-meta-'. :param http_response: the http_response object returned by HTTPConnection.getresponse() :type http_response: httplib.HTTPResponse :param response: general response object which will be returned to the caller :type response: baidubce.BceResponse :return: always true :rtype bool """ user_metadata = {} for k, v in http_response.getheaders(): if k.startswith(http_headers.BCE_USER_METADATA_PREFIX): k = k[len(http_headers.BCE_USER_METADATA_PREFIX):] user_metadata[k.decode(baidubce.DEFAULT_ENCODING)] = \ v.decode(baidubce.DEFAULT_ENCODING) response.metadata.user_metadata = user_metadata response.data = http_response return True
def Delete(self, uri, extra_headers=None, url_params=None, escape_params=True): """Deletes the entry at the given URI. Args: uri: string The URI of the entry to be deleted. Example: '/base/feeds/items/ITEM-ID' extra_headers: dict (optional) HTTP headers which are to be included. The client automatically sets the Content-Type and Authorization headers. url_params: dict (optional) Additional URL parameters to be included in the URI. These are translated into query arguments in the form '&dict_key=value&...'. Example: {'max-results': '250'} becomes &max-results=250 escape_params: boolean (optional) If false, the calling code has already ensured that the query will form a valid URL (all reserved characters have been escaped). If true, this method will escape the query and any URL parameters provided. Returns: httplib.HTTPResponse Server's response to the DELETE request. """ return self.request('DELETE', uri, data=None, headers=extra_headers, url_params=url_params)
def test_status_lines(self): # Test HTTP status lines body = "HTTP/1.1 200 Ok\r\n\r\nText" sock = FakeSocket(body) resp = httplib.HTTPResponse(sock) resp.begin() self.assertEqual(resp.read(0), '') # Issue #20007 self.assertFalse(resp.isclosed()) self.assertEqual(resp.read(), 'Text') self.assertTrue(resp.isclosed()) body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" sock = FakeSocket(body) resp = httplib.HTTPResponse(sock) self.assertRaises(httplib.BadStatusLine, resp.begin)