我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用httplib.HTTPConnection()。
def make_connection(self, host): #return an existing connection if possible. This allows #HTTP/1.1 keep-alive. if self._connection and host == self._connection[0]: return self._connection[1] # create a HTTP connection object from a host descriptor chost, self._extra_headers, x509 = self.get_host_info(host) #store the host argument along with the connection object self._connection = host, httplib.HTTPConnection(chost) return self._connection[1] ## # Clear any cached connection object. # Used in the event of socket errors. #
def request_api(params_dict): httpClient = None try: params = urllib.urlencode(params_dict) headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} httpClient = httplib.HTTPConnection(api_url, int(api_port), timeout=5) httpClient.request("POST", api_path, params, headers) response = httpClient.getresponse() status = response.status if str(status) != '200': data = {'status':500,'auth':'failed'} else: data = eval(response.read()) data['status'] = status return data except Exception, e: print e data = {'status':500,'auth':'failed'} return data finally: if httpClient: httpClient.close()
def get_connection(self, tmp_host=None): host = '' port = 80 if not tmp_host: tmp_host = self.host host_port_list = tmp_host.split(":") if len(host_port_list) == 1: host = host_port_list[0].strip() elif len(host_port_list) == 2: host = host_port_list[0].strip() port = int(host_port_list[1].strip()) if self.is_security or port == 443: self.is_security = True if sys.version_info >= (2, 6): return httplib.HTTPSConnection(host=host, port=port, timeout=self.timeout) else: return httplib.HTTPSConnection(host=host, port=port) else: if sys.version_info >= (2, 6): return httplib.HTTPConnection(host=host, port=port, timeout=self.timeout) else: return httplib.HTTPConnection(host=host, port=port)
def Fuzz(entry): try: entry = "/" + entry connection = httplib.HTTPConnection(site) connection.request("GET",entry) response = connection.getresponse() if response.status == 200: str = 'http://'+site+entry print "Found : %s " % (str) found.append(str) else: pass except(KeyboardInterrupt,SystemExit): raise except: pass
def request(self, method, url, body=None, headers={}): # Request is called before connect, so can interpret url and get # real host/port to be used to make CONNECT request to proxy proto, rest = urllib.splittype(url) if proto is None: raise ValueError, "unknown URL type: %s" % url # Get host host, rest = urllib.splithost(rest) # Try to get port host, port = urllib.splitport(host) # If port is not defined try to get from proto if port is None: try: port = self._ports[proto] except KeyError: raise ValueError, "unknown protocol for: %s" % url self._real_host = host self._real_port = int(port) httplib.HTTPConnection.request(self, method, url, body, headers)
def senddata(self): filename = "output.txt" fileit = open(filename, 'r') done = 0 while not done: output = fileit.readline() if output != "": # 'text' is the value needed to be past to the php script to put in db # the form name is text which is a textarea params = urllib.urlencode({'text': output}) headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} conn = httplib.HTTPConnection(linecache.getline('info.txt', 1)) conn.request("POST", "/filewrite.php", params, headers) response = conn.getresponse() print response.status, response.reason data = response.read() else: done = 1 fileit.close() return True
def __init__( self, host, port=None, strict=None, timeout=None, certfile=None, keyfile=None, require_cert=True, ca_certs=None ): httplib.HTTPConnection.__init__(self, host, port, strict, timeout) self.certfile = certfile self.keyfile = keyfile self.require_cert = require_cert self.ca_certs = ca_certs
def connect(self): httplib.HTTPConnection.connect(self) with ca_certs(self.ca_certs) as certs: self.sock = ssl.wrap_socket( self.sock, certfile=self.certfile, keyfile=self.keyfile, cert_reqs=ssl.CERT_REQUIRED if self.require_cert else ssl.CERT_NONE, ca_certs=certs ) if self.require_cert: hostname = self.host if not self._tunnel_host else self._tunnel_host cert = self.sock.getpeercert() match_hostname(cert, hostname)
def retrieve_json(self,url): ''' Retrieve data from the Veneer service at the given url path. url: Path to required resource, relative to the root of the Veneer service. ''' if PRINT_URLS: print("*** %s ***" % (url)) if self.protocol=='file': text = open(self.prefix+url+self.data_ext).read() else: conn = hc.HTTPConnection(self.host,port=self.port) conn.request('GET',quote(url+self.data_ext)) resp = conn.getresponse() text = resp.read().decode('utf-8') #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8') text = self._replace_inf(text) if PRINT_ALL: print(json.loads(text)) print("") return json.loads(text)
def post_to_thingspeak(payload): headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "text/plain"} not_connected = 1 while (not_connected): try: conn = httplib.HTTPConnection("api.thingspeak.com:80") conn.connect() not_connected = 0 except (httplib.HTTPException, socket.error) as ex: print "Error: %s" % ex time.sleep(10) # sleep 10 seconds conn.request("POST", "/update", payload, headers) response = conn.getresponse() print( response.status, response.reason, payload, time.strftime("%c")) data = response.read() conn.close()
def post_to_mcs(payload): headers = {"Content-type": "application/json", "deviceKey": deviceKey} not_connected = 1 while (not_connected): try: conn = httplib.HTTPConnection("api.mediatek.com:80") conn.connect() not_connected = 0 except (httplib.HTTPException, socket.error) as ex: print "Error: %s" % ex time.sleep(10) # sleep 10 seconds conn.request("POST", "/mcs/v2/devices/" + deviceId + "/datapoints", json.dumps(payload), headers) response = conn.getresponse() print( response.status, response.reason, json.dumps(payload), time.strftime("%c")) data = response.read() conn.close()
def _get_connection(self, uri, headers=None): """Opens a socket connection to the server to set up an HTTP request. Args: uri: The full URL for the request as a Uri object. headers: A dict of string pairs containing the HTTP headers for the request. """ connection = None if uri.scheme == 'https': if not uri.port: connection = httplib.HTTPSConnection(uri.host) else: connection = httplib.HTTPSConnection(uri.host, int(uri.port)) else: if not uri.port: connection = httplib.HTTPConnection(uri.host) else: connection = httplib.HTTPConnection(uri.host, int(uri.port)) return connection
def block_http(whitelist): def whitelisted(self, host, *args, **kwargs): try: string_type = basestring except NameError: # python3 string_type = str if isinstance(host, string_type) and host not in whitelist: logger.warning('Denied HTTP connection to: %s' % host) raise MockHttpCall(host) logger.debug('Allowed HTTP connection to: %s' % host) return self.old(host, *args, **kwargs) whitelisted.blockade = True if not getattr(httplib.HTTPConnection, 'blockade', False): logger.debug('Monkey patching httplib') httplib.HTTPConnection.old = httplib.HTTPConnection.__init__ httplib.HTTPConnection.__init__ = whitelisted
def do_command(self, verb, args): conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout) try: body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8')) for i in range(len(args)): body += '&' + unicode(i+1) + '=' + \ urllib_parse.quote_plus(unicode(args[i]).encode('utf-8')) if (None != self.sessionId): body += "&sessionId=" + unicode(self.sessionId) headers = { "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" } conn.request("POST", "/selenium-server/driver/", body, headers) response = conn.getresponse() data = unicode(response.read(), "UTF-8") if (not data.startswith('OK')): raise Exception(data) return data finally: conn.close()
def process_task(self, server_connectivity_info, plugin_command, option_dict=None): if option_dict and 'path' in option_dict.keys(): path = str(option_dict['path']) else: path = '/' if server_connectivity_info.port == 80: conn = httplib.HTTPConnection(server_connectivity_info.ip_address,server_connectivity_info.port) elif server_connectivity_info.port == 443: conn = httplib.HTTPSConnection(server_connectivity_info.ip_address,server_connectivity_info.port,context=ssl._create_unverified_context()) else: raise ValueError("ShellshockTesterPlugin: Can\'t make test for this port {0}".format(server_connectivity_info.port)) try: conn.connect() except Exception as e: raise ValueError("ShellshockTesterPlugin: Connection error for port {0}. {1}".format(server_connectivity_info.port,str(e))) else: conn.request("GET", path, "", {'User-Agent': '() { :; }; echo; echo Vulnerable to CVE-2014-6271'}) response = conn.getresponse() return ShellshockTesterResult(server_connectivity_info, plugin_command, option_dict, self.is_vulnerable(response))
def sendRequest(host, port, path, headers, params, reqType="GET"): params = urlencode(params) path = path + "?"+ params if reqType == "GET" and params else path if len(headers): logger.debug(headers) if len(params): logger.debug(params) logger.debug("Opening connection to %s" % host); conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port) logger.debug("Sending %s request to %s" % (reqType, path)) conn.request(reqType, path, params, headers); response = conn.getresponse() return response
def _send_soap_request(location, upnp_schema, control_url, soap_message): """ Send out SOAP request to UPnP device and return a response. """ headers = { 'SOAPAction': ( '"urn:schemas-upnp-org:service:{schema}:' '1#AddPortMapping"'.format(schema=upnp_schema) ), 'Content-Type': 'text/xml' } conn = httplib.HTTPConnection(location.hostname, location.port) conn.request('POST', control_url, soap_message, headers) response = conn.getresponse() conn.close() return _parse_for_errors(response)
def work(IP): conn = httplib.HTTPConnection("www.iknowwhatyoudownload.com/en/peer/?ip="+IP) conn.request("GET","/") response = conn.getresponse() data =response.read() soup = BS(data,"html.parser") table = soup.find('tbody') rows = table.findAll('tr') for tr in rows: cols = tr.findAll('td') Begin,End,Category,title,size=[c.text for c in cols] RESULT+="\n"+Begin+" "+Category+" "+title+" "+size toplevel = Toplevel() label= Label(toplevel,text=RESULT,height=0,width=100) label.pack() print RESULT
def assertResponse(self, app, method, url, status=None, headers=None, content=None): host, port = 'localhost', 80 http_client_intercept.install() add_wsgi_intercept(host, port, app) client = http_lib.HTTPConnection(host, port) client.request(method, url) response = client.getresponse() if status is not None: self.assertEqual(response.status, status) headers = headers or {} for k, v in headers.items(): self.assertEqual(response.getheader(k), v) if content is not None: self.assertEqual(response.read(), content) client.close() remove_wsgi_intercept(host, port) http_client_intercept.uninstall()
def DoesUrlExist(url): """Determines whether a resource exists at the given URL. Args: url: URL to be verified. Returns: True if url exists, otherwise False. """ parsed = urlparse.urlparse(url) try: conn = httplib.HTTPConnection(parsed.netloc) conn.request('HEAD', parsed.path) response = conn.getresponse() except (socket.gaierror, socket.error): return False finally: conn.close() # Follow both permanent (301) and temporary (302) redirects. if response.status == 302 or response.status == 301: return DoesUrlExist(response.getheader('location')) return response.status == 200
def __init__(self, target): # Target comes as protocol://target:port/path self.target = target proto, host, path = target.split(':') host = host[2:] self.path = '/' + path.split('/', 1)[1] if proto.lower() == 'https': #Create unverified (insecure) context try: uv_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self.session = HTTPSConnection(host,context=uv_context) except AttributeError: #This does not exist on python < 2.7.11 self.session = HTTPSConnection(host) else: self.session = HTTPConnection(host) self.lastresult = None
def download(host, port, filepath): conn = httplib.HTTPConnection(host, port) conn.request("GET", "/"+filepath) fp = open(filepath, 'wb') resp =conn.getresponse() #read 4096 bytes at a time block_size = 4096 buffer = resp.read(block_size) while(buffer): fp.write(buffer) buffer = resp.read(block_size) fp.close() conn.close() #use: python download.py [host] [port] [filepath]
def get_http_response(self): if self.__port is None or self.__port == "": self.__port = 80 try: self.__connection = httplib.HTTPConnection(self.parse_host(), self.__port) self.__connection.connect() post_data = None if self.get_content_type() == constant.CONTENT_TYPE_FORM and self.get_body(): post_data = urllib.urlencode(self.get_body()) else: post_data = self.get_body() self.__connection.request(method=self.get_method(), url=self.get_url(), body=post_data, headers=self.get_headers()) response = self.__connection.getresponse() return response.status, response.getheaders(), response.read() except Exception as e: return None, None, None finally: self.__close_connection()
def create_http_request(self, method, url, headers, body, timeout, **kwargs): scheme, netloc, path, query, _ = urlparse.urlsplit(url) if netloc.rfind(':') <= netloc.rfind(']'): # no port number host = netloc port = 443 if scheme == 'https' else 80 else: host, _, port = netloc.rpartition(':') port = int(port) if query: path += '?' + query if 'Host' not in headers: headers['Host'] = host if body and 'Content-Length' not in headers: headers['Content-Length'] = str(len(body)) ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection connection = ConnectionType(netloc, timeout=timeout) connection.request(method, path, body=body, headers=headers) response = connection.getresponse() return response
def _check_ipv6_host(host): try: conn = httplib.HTTPConnection(host, 80, timeout=5) header = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", "accept":"application/json, text/javascript, */*; q=0.01", "accept-encoding":"gzip, deflate, sdch", "accept-language":'en-US,en;q=0.8,ja;q=0.6,zh-CN;q=0.4,zh;q=0.2', "connection":"keep-alive" } conn.request("HEAD", "/", headers=header) response = conn.getresponse() if response.status: return True else: return False except Exception as e: return False
def write(obj): try: conn = httplib.HTTPConnection("127.0.0.1", localPort) headers = { "Content-Type": "application/json", "Echonify-SecurityToken": securityToken, "Echonify-FlowDirection": "outbound" } data = json.dumps(obj) conn.request("POST", "/", data, headers=headers) response = conn.getresponse() if response.status <> 200: return False return True finally: conn.close
def writeStream(fileName, streamId): try: conn = httplib.HTTPConnection("127.0.0.1", localPort) conn.connect() conn.putrequest("POST", "/") conn.putheader("Transfer-Encoding", "chunked") conn.putheader("Content-Type", "application/octet-stream") conn.putheader("Echonify-SecurityToken", securityToken) conn.putheader("Echonify-FlowDirection", "outbound") conn.putheader("Echonify-StreamId", streamId) conn.endheaders() with open(fileName, 'rb') as f: while True: chunk = f.read(2048) if not chunk: break conn.send("%s\r\n" % hex(len(chunk))[2:]) conn.send("%s\r\n" % chunk) conn.send("0\r\n\r\n") response = conn.getresponse() if response.status <> 200: return False return True finally: conn.close
def _delete_pod(self, name, namespace): headers = {'Content-Type': 'application/json'} con_dest_pod = httplib.HTTPConnection(self.target+":"+self.port) con_dest_pod.request('DELETE', '/api/'+self.apiversion+'/namespaces/' + namespace+'/pods/'+name, headers=headers, body=POD_DELETE) resp = con_dest_pod.getresponse() reason = resp.reason status = resp.status data = resp.read() con_dest_pod.close() if not 200 <= status <= 299: errmsg = "Failed to delete Pod: {} {} - {}".format( status, reason, data) raise RuntimeError(errmsg) for _ in xrange(5): status, data, reason = self._get_pod(name, namespace) if status != 404: time.sleep(1) continue break if status != 404: errmsg = "Failed to delete Pod: {} {} - {}".format( status, reason, data) raise RuntimeError(errmsg)
def send_http(self, url): o = urlparse(url) r = 0 try: conn = httplib.HTTPConnection(o[1], timeout=self.timeout) if o[4]: conn.request('GET', o[2] + o[3] + '?' + o[4], headers=self.headers) else: conn.request('GET', o[2] + o[3], headers=self.headers) r = conn.getresponse() logger.info('%s %s' % (url, r.status)) time.sleep(self.delay) except (httplib.HTTPException, socket.timeout, socket.gaierror, Exception), e: logger.error('url %s is unreachable, Exception %s %s' % (url, e.__class__.__name__, e)) print 'url %s is unreachable, Exception %s %s' % (url.encode('utf-8'), e.__class__.__name__, e) pass return r
def is_alive(self): url = urlparse.urlparse(self.conf.monasca_url) if url.scheme == 'https': http_connector = httplib.HTTPSConnection else: http_connector = httplib.HTTPConnection try: connection = http_connector(host=url.netloc) connection.request('HEAD', url=url.path) response = connection.getresponse() except httplib.socket.error: return False try: if getattr(response, 'status') in [200, 401]: return True except AttributeError: pass return False
def _http_request(self, url, timeout=40): try: if not url: url = '/' conn_fuc = httplib.HTTPSConnection if self.schema == 'https' else httplib.HTTPConnection conn = conn_fuc(self.host, timeout=timeout) conn.request(method='GET', url=url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.111 Safari/537.36 BBScan/1.0'} ) resp = conn.getresponse() resp_headers = dict(resp.getheaders()) status = resp.status if resp_headers.get('content-type', '').find('text') >= 0 or resp_headers.get('content-type', '').find('html') >= 0 or \ int(resp_headers.get('content-length', '0')) <= 1048576: html_doc = self._decode_response_text(resp.read()) else: html_doc = '' conn.close() return status, resp_headers, html_doc except Exception, e: #logging.error('[Exception in InfoDisScanner._http_request] %s' % e) return -1, {}, ''
def http_open(self, req): return self.do_open(httplib.HTTPConnection, req)
def doit(): params = urllib.urlencode({'field1': read_temp(), 'key'= 'API_KEY_NUMBER'}) headers = {"Content-type": "application/x-www-form-urlencoded","Accept":"text/plain"} conn = httplib.HTTPConnection("api.thingspeak.com:80") conn.request("POST", "/update", params, headers) response = conn.getresponse() data = response.read() conn.close() # Get current mode from DB
def __init__(self, host, port=None, strict=None, timeout=None, proxy_info=None): httplib.HTTPConnection.__init__(self, host, port, strict) self.timeout = timeout self.proxy_info = proxy_info
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): httplib.HTTPConnection.__init__(self, host, port=port, strict=strict, timeout=timeout)
def _create_connection(scheme, netloc): if scheme == 'https': conn = httplib.HTTPSConnection(netloc) else: conn = httplib.HTTPConnection(netloc) conn.connect() return conn
def saveFailedTest(data, expect, filename): """Upload failed test images to web server to allow CI test debugging. """ commit = runSubprocess(['git', 'rev-parse', 'HEAD']) name = filename.split('/') name.insert(-1, commit.strip()) filename = '/'.join(name) host = 'data.pyqtgraph.org' # concatenate data, expect, and diff into a single image ds = data.shape es = expect.shape shape = (max(ds[0], es[0]) + 4, ds[1] + es[1] + 8 + max(ds[1], es[1]), 4) img = np.empty(shape, dtype=np.ubyte) img[..., :3] = 100 img[..., 3] = 255 img[2:2+ds[0], 2:2+ds[1], :ds[2]] = data img[2:2+es[0], ds[1]+4:ds[1]+4+es[1], :es[2]] = expect diff = makeDiffImage(data, expect) img[2:2+diff.shape[0], -diff.shape[1]-2:-2] = diff png = makePng(img) conn = httplib.HTTPConnection(host) req = urllib.urlencode({'name': filename, 'data': base64.b64encode(png)}) conn.request('POST', '/upload.py', req) response = conn.getresponse().read() conn.close() print("\nImage comparison failed. Test result: %s %s Expected result: " "%s %s" % (data.shape, data.dtype, expect.shape, expect.dtype)) print("Uploaded to: \nhttp://%s/data/%s" % (host, filename)) if not response.startswith(b'OK'): print("WARNING: Error uploading data to %s" % host) print(response)
def _net_worker(self, method, path, data='', headers={}, metadata={}): connection = httplib.HTTPConnection(self.thehost) if self.content_md5 != '': headers['Content-MD5'] = self.content_md5 self.content_md5 = '' if self.file_secret != '': headers['Content-Secret'] = self.file_secret self.file_secret = '' final_headers = merge_meta(headers, metadata) if self.upAuth: self._add_upyun_auth_header(final_headers,method,path) else : self._basicAuth(final_headers,self.username,self.password) connection.request(method, path , data, final_headers) resp = connection.getresponse() if self.debug and resp.status != 200 and method != "HEAD" : raise UpYunException(u'ERROR: Code:%d,Message:%s'%(resp.status,resp.read())) return resp #??????
def __init__(self, output_spec): """ Uses HTTP chunked transfer-encoding to stream a request body to a server. Unfortunately requests does not support hooking into this logic easily, so we use the lower-level httplib module. """ super(HttpStreamPushAdapter, self).__init__(output_spec) self._closed = False parts = urlparse.urlparse(output_spec['url']) if parts.scheme == 'https': ssl_context = ssl.create_default_context() conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context) else: conn = httplib.HTTPConnection(parts.netloc) try: conn.putrequest(output_spec.get('method', 'POST').upper(), parts.path, skip_accept_encoding=True) for header, value in output_spec.get('headers', {}).items(): conn.putheader(header, value) conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() # This actually flushes the headers to the server except Exception: print('HTTP connection to "%s" failed.' % output_spec['url']) conn.close() raise self.conn = conn
def __init__(self, url, headers={}): self._url = url """ Uses HTTP chunked transfer-encoding to stream a request body to a server. Unfortunately requests does not support hooking into this logic easily, so we use the lower-level httplib module. """ self._closed = False parts = urlparse.urlparse(self._url) if parts.scheme == 'https': ssl_context = ssl.create_default_context() conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context) else: conn = httplib.HTTPConnection(parts.netloc) try: url = parts.path if parts.query is not None: url = '%s?%s' % (url, parts.query) conn.putrequest('POST', url, skip_accept_encoding=True) for header, value in headers.items(): conn.putheader(header, value) conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() # This actually flushes the headers to the server except Exception: sys.stderr.write('HTTP connection to "%s" failed.\n' % self._url) conn.close() raise self.conn = conn
def sendCommand(self, server, url, response, follow_redirects=1, secure=0, keyfile=None, certfile=None): data = self.Encode() if secure: if keyfile and certfile: conn = httplib.HTTPSConnection(server, key_file=keyfile, cert_file=certfile) else: conn = httplib.HTTPSConnection(server) else: conn = httplib.HTTPConnection(server) conn.putrequest("POST", url) conn.putheader("Content-Length", "%d" %len(data)) conn.endheaders() conn.send(data) resp = conn.getresponse() if follow_redirects > 0 and resp.status == 302: m = URL_RE.match(resp.getheader('Location')) if m: protocol, server, url = m.groups() return self.sendCommand(server, url, response, follow_redirects=follow_redirects - 1, secure=(protocol == 'https'), keyfile=keyfile, certfile=certfile) if resp.status != 200: raise ProtocolBufferReturnError(resp.status) if response is not None: response.ParseFromString(resp.read()) return response