我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用httplib.HTTP。
def redirect_internal(self, url, fp, errcode, errmsg, headers, data): if 'location' in headers: newurl = headers['location'] elif 'uri' in headers: newurl = headers['uri'] else: return void = fp.read() fp.close() # In case the server sent a relative URL, join with original: newurl = basejoin(self.type + ":" + url, newurl) # For security reasons we do not allow redirects to protocols # other than HTTP, HTTPS or FTP. newurl_lower = newurl.lower() if not (newurl_lower.startswith('http://') or newurl_lower.startswith('https://') or newurl_lower.startswith('ftp://')): raise IOError('redirect error', errcode, errmsg + " - Redirection to url '%s' is not allowed" % newurl, headers) return self.open(newurl)
def post_multipart(host, selector, fields, files): """ Post fields and files to an http host as multipart/form-data. fields is a sequence of (name, value) elements for regular form fields. files is a sequence of (name, filename, value) elements for data to be uploaded as files Return the server's response page. """ content_type, body = encode_multipart_formdata(fields, files) h = httplib.HTTP(host) h.putrequest('POST', selector) h.putheader('content-type', content_type) h.putheader('content-length', str(len(body))) h.endheaders() h.send(body) errcode, errmsg, headers = h.getreply() return h.file.read()
def encode_multipart_formdata(fields, files): """ fields is a sequence of (name, value) elements for regular form fields. files is a sequence of (name, filename, value) elements for data to be uploaded as files Return (content_type, body) ready for httplib.HTTP instance """ BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$' CRLF = '\r\n' L = [] for (key, value) in fields: L.append('--' + BOUNDARY) L.append('Content-Disposition: form-data; name="%s"' % key) L.append('') L.append(value) for (key, filename, value) in files: L.append('--' + BOUNDARY) L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) L.append('Content-Type: %s' % get_content_type(filename)) L.append('') L.append(value) L.append('--' + BOUNDARY + '--') L.append('') body = CRLF.join(L) content_type = 'multipart/form-data; boundary=%s' % BOUNDARY return content_type, body
def redirect_internal(self, url, fp, errcode, errmsg, headers, data): if 'location' in headers: newurl = headers['location'] elif 'uri' in headers: newurl = headers['uri'] else: return fp.close() # In case the server sent a relative URL, join with original: newurl = basejoin(self.type + ":" + url, newurl) # For security reasons we do not allow redirects to protocols # other than HTTP, HTTPS or FTP. newurl_lower = newurl.lower() if not (newurl_lower.startswith('http://') or newurl_lower.startswith('https://') or newurl_lower.startswith('ftp://')): raise IOError('redirect error', errcode, errmsg + " - Redirection to url '%s' is not allowed" % newurl, headers) return self.open(newurl)
def make_connection(self, host): # create a HTTPS connection object from a host descriptor host, extra_headers, x509 = self.get_host_info(host) http = HTTPTLSConnection(host, None, self.username, self.password, self.sharedKey, self.certChain, self.privateKey, self.checker.cryptoID, self.checker.protocol, self.checker.x509Fingerprint, self.checker.x509TrustList, self.checker.x509CommonName, self.settings) http2 = httplib.HTTP() http2._setup(http) return http2
def post_multipart(url, fields, files): parts = urlparse.urlparse(url) scheme = parts[0] host = parts[1] selector = parts[2] content_type, body = encode_multipart_formdata(fields, files) if scheme == 'http': h = httplib.HTTP(host) elif scheme == 'https': h = httplib.HTTPS(host) else: raise ValueError('unknown scheme: ' + scheme) h.putrequest('POST', selector) h.putheader('content-type', content_type) h.putheader('content-length', str(len(body))) h.endheaders() h.send(body) errcode, errmsg, headers = h.getreply() return h.file.read()
def _send_message(self , xmlName): self._set_message(xmlName) librarylogger.info(u'?????????') webservice = httplib.HTTP(self._host) webservice.putrequest("POST" , self._postName) webservice.putheader("Host" , self._host) webservice.putheader("Content-Type" , self._ContentType) webservice.putheader("Content-length" , "%d" % len ( self._message)) webservice.putheader("SOAPAction" , "\"http://tempuri.org/WSMethod\"") webservice.putheader("Cookie" , self._cookie) # ???????????header??? webservice.endheaders( ) librarylogger.info(u'???????') librarylogger.info(u'??????data') webservice.send(self._message) webservice.getreply() msg = webservice.getfile().read() if(msg!=None and len(msg)>0): librarylogger.info(u'?????????????') return msg
def test_ipv6host_header(self): # Default host header on IPv6 transaction should be wrapped by [] if # it is an IPv6 address expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 'Accept-Encoding: identity\r\n\r\n' conn = httplib.HTTPConnection('[2001::]:81') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected)) expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 'Accept-Encoding: identity\r\n\r\n' conn = httplib.HTTPConnection('[2001:102A::]') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected))
def test_malformed_truncation(self): # Other malformed header lines, especially without colons, used to # cause the rest of the header section to be truncated resp = ( b'HTTP/1.1 200 OK\r\n' b'Public-Key-Pins: \n' b'pin-sha256="xxx=";\n' b'report-uri="https://..."\r\n' b'Transfer-Encoding: chunked\r\n' b'\r\n' b'4\r\nbody\r\n0\r\n\r\n' ) resp = httplib.HTTPResponse(FakeSocket(resp)) resp.begin() self.assertIsNotNone(resp.getheader('Public-Key-Pins')) self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked') self.assertEqual(resp.read(), b'body')
def test_status_lines(self): # Test HTTP status lines body = "HTTP/1.1 200 Ok\r\n\r\nText" sock = FakeSocket(body) resp = httplib.HTTPResponse(sock) resp.begin() self.assertEqual(resp.read(0), '') # Issue #20007 self.assertFalse(resp.isclosed()) self.assertEqual(resp.read(), 'Text') self.assertTrue(resp.isclosed()) body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" sock = FakeSocket(body) resp = httplib.HTTPResponse(sock) self.assertRaises(httplib.BadStatusLine, resp.begin)
def flush(self): if self.isOpen(): self.close() self.open(); # Pull data out of buffer data = self.__wbuf.getvalue() self.__wbuf = StringIO() # HTTP request self.__http.putrequest('POST', self.path) # Write headers self.__http.putheader('Host', self.host) self.__http.putheader('Content-Type', 'application/x-thrift') self.__http.putheader('Content-Length', str(len(data))) self.__http.endheaders() # Write payload self.__http.send(data) # Get reply to flush the request self.code, self.message, self.headers = self.__http.getreply() # Decorate if we know how to timeout
def send(self, event, msg, key): id, url = key request = self.format_request( "%s: %s" % (event, msg) if msg else event) webservice = httplib.HTTP(url) webservice.putrequest("POST", id) webservice.putheader("Host", url) webservice.putheader("Content-type", "text/xml") webservice.putheader("X-NotificationClass", "2") webservice.putheader("X-WindowsPhone-Target", "toast") webservice.putheader("Content-length", "%d" % len(request)) webservice.endheaders() webservice.send(request) webservice.close()
def run(dmn,file): h = httplib.HTTP('www.google.com') h.putrequest('GET',"/search?num=500&q=site:"+dmn+"+filetype:"+file) h.putheader('Host', 'www.google.com') h.putheader('User-agent', 'Internet Explorer 6.0 ') h.putheader('Referrer', 'www.g13net.com') h.endheaders() returncode, returnmsg, headers = h.getreply() data=h.getfile().read() data=re.sub('<b>','',data) for e in ('>','=','<','\\','(',')','"','http',':','//'): data = string.replace(data,e,' ') r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+file) res = r1.findall(data) return res
def test_ipv6host_header(self): # Default host header on IPv6 transaction should wrapped by [] if # its actual IPv6 address expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 'Accept-Encoding: identity\r\n\r\n' conn = httplib.HTTPConnection('[2001::]:81') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected)) expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 'Accept-Encoding: identity\r\n\r\n' conn = httplib.HTTPConnection('[2001:102A::]') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected))
def test_response_headers(self): # test response with multiple message headers with the same field name. text = ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: Customer="WILE_E_COYOTE";' ' Version="1"; Path="/acme"\r\n' 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' ' Path="/acme"\r\n' '\r\n' 'No body\r\n') hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' ', ' 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') s = FakeSocket(text) r = httplib.HTTPResponse(s) r.begin() cookies = r.getheader("Set-Cookie") if cookies != hdr: self.fail("multiple headers not combined properly")
def test_local_bad_hostname(self): # The (valid) cert doesn't validate the HTTP hostname import ssl server = self.make_server(CERT_fakehostname) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = True context.load_verify_locations(CERT_fakehostname) h = httplib.HTTPSConnection('localhost', server.port, context=context) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') h.close() # With context.check_hostname=False, the mismatching is ignored context.check_hostname = False h = httplib.HTTPSConnection('localhost', server.port, context=context) h.request('GET', '/nonexistent') resp = h.getresponse() self.assertEqual(resp.status, 404)
def test_chunked_head(self): chunked_start = ( 'HTTP/1.1 200 OK\r\n' 'Transfer-Encoding: chunked\r\n\r\n' 'a\r\n' 'hello world\r\n' '1\r\n' 'd\r\n' ) sock = FakeSocket(chunked_start + '0\r\n') resp = httplib.HTTPResponse(sock, method="HEAD") resp.begin() self.assertEqual(resp.read(), '') self.assertEqual(resp.status, 200) self.assertEqual(resp.reason, 'OK') self.assertTrue(resp.isclosed())
def make_connection(self, host): self.realhost = host proxies = urllib.getproxies() proxyurl = None if 'http' in proxies: proxyurl = proxies['http'] elif 'all' in proxies: proxyurl = proxies['all'] if proxyurl: urltype, proxyhost = urllib.splittype(proxyurl) host, selector = urllib.splithost(proxyhost) h = httplib.HTTP(host) self.proxy_is_used = True return h else: self.proxy_is_used = False return Transport.make_connection(self, host)
def make_connection(self, host): # return an existing connection if possible. This allows # HTTP/1.1 keep-alive. if self._connection and host == self._connection[0]: http = self._connection[1] else: # create a HTTPS connection object from a host descriptor chost, extra_headers, x509 = self.get_host_info(host) http = HTTPTLSConnection(chost, None, username=self.username, password=self.password, certChain=self.certChain, privateKey=self.privateKey, checker=self.checker, settings=self.settings, ignoreAbruptClose=self.ignoreAbruptClose) # store the host argument along with the connection object self._connection = host, http if not self.conn_class_is_http: return http http2 = httplib.HTTP() http2._setup(http) return http2
def do_search_files(self, files): h = httplib.HTTP(self.server) h.putrequest( 'GET', "search/web/results/?q=" + self.word + "filetype:" + self.files + "&elements_per_page=50&start_index=" + self.counter) h.putheader('Host', self.hostname) h.putheader('User-agent', self.userAgent) h.endheaders() returncode, returnmsg, headers = h.getreply() self.results = h.getfile().read() self.totalresults += self.results
def run_search(self): try: con = httplib.HTTP(self.server) con.putrequest('GET', '/search?q=%40'+self.keyword) con.putheader('Host', self.host) con.putheader('Cookie', 'SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50') con.putheader('Accept-Language', 'en-us,en') con.putheader('User-agent', self.u_agent) con.endheaders() # return code,messagge and header returncode, returnmsg, header = con.getreply() self.results = con.getfile().read() self.tresult += self.results except Exception as err: print "\t\t|" print "\t\t|__"+self.r+" Server not found!!\n"+self.t
def getreply(self): file = self.sock.makefile('rb') data = ''.join(file.readlines()) file.close() self.file = StringIO(data) line = self.file.readline() try: [ver, code, msg] = line.split(None, 2) except ValueError: try: [ver, code] = line.split(None, 1) msg = "" except ValueError: return -1, line, None if ver[:5] != 'HTTP/': return -1, line, None code = int(code) msg = msg.strip() headers = mimetools.Message(self.file, 0) return ver, code, msg, headers
def __snd_request(self, method, uri, headers={}, body='', eh=1): try: h = HTTP() h.connect(self.host, self.port) h.putrequest(method, uri) for n, v in headers.items(): h.putheader(n, v) if eh: h.endheaders() if body: h.send(body) ver, code, msg, hdrs = h.getreply() data = h.getfile().read() h.close() except Exception: raise NotAvailable(sys.exc_value) return http_response(ver, code, msg, hdrs, data) # HTTP methods