我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用httplib.HTTPSConnection()。
def need_update(self): if "HTTP_PROXY" in os.environ or "HTTPS_PROXY" in os.environ: if "HTTP_PROXY" in os.environ: if sys.version_info >= (3, 0): proxy = urllib.parse.urlparse(os.environ["HTTP_PROXY"]) else: proxy = urlparse.urlparse(os.environ["HTTP_PROXY"]) else: if sys.version_info >= (3, 0): proxy = urllib.parse.urlparse(os.environ["HTTPS_PROXY"]) else: proxy = urlparse.urlparse(os.environ["HTTPS_PROXY"]) if sys.version_info >= (3, 0): conn = http.client.HTTPSConnection(proxy.hostname, proxy.port) else: conn = httplib.HTTPSConnection(proxy.hostname, proxy.port) conn.set_tunnel(self.version_host, 443) else: if sys.version_info >= (3, 0): conn = http.client.HTTPSConnection("raw.githubusercontent.com") else: conn = httplib.HTTPSConnection("raw.githubusercontent.com") conn.request("GET", self.version_url) version = conn.getresponse().read() try: if StrictVersion(version) > StrictVersion(PYJFUZZ_VERSION): self.new_version = version return True except: pass return False
def validate_optional_args(args): """Check if an argument was provided that depends on a module that may not be part of the Python standard library. If such an argument is supplied, and the module does not exist, exit with an error stating which module is missing. """ optional_args = { 'json': ('json/simplejson python module', json), 'secure': ('SSL support', HTTPSConnection), } for arg, info in optional_args.items(): if getattr(args, arg, False) and info[1] is None: raise SystemExit('%s is not installed. --%s is ' 'unavailable' % (info[0], arg))
def get_connection(self, tmp_host=None): host = '' port = 80 if not tmp_host: tmp_host = self.host host_port_list = tmp_host.split(":") if len(host_port_list) == 1: host = host_port_list[0].strip() elif len(host_port_list) == 2: host = host_port_list[0].strip() port = int(host_port_list[1].strip()) if self.is_security or port == 443: self.is_security = True if sys.version_info >= (2, 6): return httplib.HTTPSConnection(host=host, port=port, timeout=self.timeout) else: return httplib.HTTPSConnection(host=host, port=port) else: if sys.version_info >= (2, 6): return httplib.HTTPConnection(host=host, port=port, timeout=self.timeout) else: return httplib.HTTPConnection(host=host, port=port)
def textanalyze(self,index_name, analyzer, text): # Create JSON string for request body reqobject={} reqobject['text'] = text reqobject['analyzer'] = analyzer io=StringIO() json.dump(reqobject, io) req_body = io.getvalue() # HTTP request to Azure search REST API conn = httplib.HTTPSConnection(self.__api_url) conn.request("POST", u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION), req_body, self.headers) response = conn.getresponse() #print "status:", response.status, response.reason data = (response.read()).decode('utf-8') #print("data:{}".format(data)) conn.close() return data
def _get_connection(self, uri, headers=None): """Opens a socket connection to the server to set up an HTTP request. Args: uri: The full URL for the request as a Uri object. headers: A dict of string pairs containing the HTTP headers for the request. """ connection = None if uri.scheme == 'https': if not uri.port: connection = httplib.HTTPSConnection(uri.host) else: connection = httplib.HTTPSConnection(uri.host, int(uri.port)) else: if not uri.port: connection = httplib.HTTPConnection(uri.host) else: connection = httplib.HTTPConnection(uri.host, int(uri.port)) return connection
def process_task(self, server_connectivity_info, plugin_command, option_dict=None): if option_dict and 'path' in option_dict.keys(): path = str(option_dict['path']) else: path = '/' if server_connectivity_info.port == 80: conn = httplib.HTTPConnection(server_connectivity_info.ip_address,server_connectivity_info.port) elif server_connectivity_info.port == 443: conn = httplib.HTTPSConnection(server_connectivity_info.ip_address,server_connectivity_info.port,context=ssl._create_unverified_context()) else: raise ValueError("ShellshockTesterPlugin: Can\'t make test for this port {0}".format(server_connectivity_info.port)) try: conn.connect() except Exception as e: raise ValueError("ShellshockTesterPlugin: Connection error for port {0}. {1}".format(server_connectivity_info.port,str(e))) else: conn.request("GET", path, "", {'User-Agent': '() { :; }; echo; echo Vulnerable to CVE-2014-6271'}) response = conn.getresponse() return ShellshockTesterResult(server_connectivity_info, plugin_command, option_dict, self.is_vulnerable(response))
def sendRequest(host, port, path, headers, params, reqType="GET"): params = urlencode(params) path = path + "?"+ params if reqType == "GET" and params else path if len(headers): logger.debug(headers) if len(params): logger.debug(params) logger.debug("Opening connection to %s" % host); conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port) logger.debug("Sending %s request to %s" % (reqType, path)) conn.request(reqType, path, params, headers); response = conn.getresponse() return response
def train_persongroup(self, persongroup_id): headers = { # Request headers 'Ocp-Apim-Subscription-Key': self.__subkey, } params = urllib.urlencode({}) REQ_BODY = '' try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("POST", "/face/v1.0/persongroups/{0}/train?{1}".format(persongroup_id, params), REQ_BODY, headers) response = conn.getresponse() data = response.read() conn.close() except Exception as e: print("[Errno {0}] {1}".format(e.errno, e.strerror)) return data
def add_personface_to_person(self, persongroup_id, person_id, face_url): headers = { 'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': self.__subkey, } params = urllib.urlencode({}) REQ_BODY = json.dumps( {"url":face_url} ) data = '' try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("POST", "/face/v1.0/persongroups/{0}/persons/{1}/persistedFaces?{2}".format(persongroup_id, person_id, params), REQ_BODY, headers) response = conn.getresponse() data = response.read() conn.close() except Exception as e: print("[Errno {0}] {1}".format(e.errno, e.strerror)) return '' res=json.loads(data) #print res if isinstance(res, dict) and res.has_key('error'): print("[Error code:{0}] {1}".format(res['error']['code'], res['error']['message'])) return '' return res['persistedFaceId']
def __init__(self, target): # Target comes as protocol://target:port/path self.target = target proto, host, path = target.split(':') host = host[2:] self.path = '/' + path.split('/', 1)[1] if proto.lower() == 'https': #Create unverified (insecure) context try: uv_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self.session = HTTPSConnection(host,context=uv_context) except AttributeError: #This does not exist on python < 2.7.11 self.session = HTTPSConnection(host) else: self.session = HTTPConnection(host) self.lastresult = None
def send_to_slack(text, username, icon_emoji, webhook): values = {'payload': '{"username": "' + username + '", ' '"icon_emoji": "' + icon_emoji + '", ' '"text": "' + text + '"}' } str_values = {} for k, v in values.items(): str_values[k] = unicode(v).encode('utf-8') data = urllib.urlencode(str_values) h = httplib.HTTPSConnection('hooks.slack.com') headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} h.request('POST', webhook, data, headers) r = h.getresponse() # ack = r.read() # print data # print ack
def connect(self): """Overrides HTTPSConnection.connect to specify TLS version""" # Standard implementation from HTTPSConnection, which is not # designed for extension, unfortunately if sys.version_info >= (2, 7): sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) elif sys.version_info >= (2, 6): sock = socket.create_connection((self.host, self.port), self.timeout) else: sock = socket.create_connection((self.host, self.port)) if getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() # This is the only difference; default wrap_socket uses SSLv23 self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1)
def get_https_response(self): try: self.__port = 443 self.__connection = httplib.HTTPSConnection(self.parse_host(), self.__port, cert_file=self.__cert_file, key_file=self.__key_file) self.__connection.connect() post_data = None if self.get_content_type() == constant.CONTENT_TYPE_FORM and self.get_body(): post_data = urllib.urlencode(self.get_body()) else: post_data = self.get_body() self.__connection.request(method=self.get_method(), url=self.get_url(), body=post_data, headers=self.get_headers()) response = self.__connection.getresponse() return response.status, response.getheaders(), response.read() except Exception as e: return None, None, None finally: self.__close_connection()
def get_https_response_object(self): if self.__port is None or self.__port == "": self.__port = 443 try: self.__port = 443 self.__connection = httplib.HTTPSConnection(self.get_host(), self.__port, cert_file=self.__cert_file, key_file=self.__key_file) self.__connection.connect() self.__connection.request(method=self.get_method(), url=self.get_url(), body=self.get_body(), headers=self.get_headers()) response = self.__connection.getresponse() return response.status, response.getheaders(), response.read() except Exception as e: return None, None, None finally: self.__close_connection()
def create_http_request(self, method, url, headers, body, timeout, **kwargs): scheme, netloc, path, query, _ = urlparse.urlsplit(url) if netloc.rfind(':') <= netloc.rfind(']'): # no port number host = netloc port = 443 if scheme == 'https' else 80 else: host, _, port = netloc.rpartition(':') port = int(port) if query: path += '?' + query if 'Host' not in headers: headers['Host'] = host if body and 'Content-Length' not in headers: headers['Content-Length'] = str(len(body)) ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection connection = ConnectionType(netloc, timeout=timeout) connection.request(method, path, body=body, headers=headers) response = connection.getresponse() return response
def __getattr__(self, name): # Anything that can't be found on this instance is presumably a # property of underlying connection object. # We need to be a little bit careful here. There are a few methods # that can act on a HTTPSConnection before it actually connects to # the remote server. We don't want to change the semantics of the, # HTTPSConnection so we need to spot these and queue them up. When # we actually create the backing Connection, we'll apply them # immediately. These methods can't throw exceptions, so we should # be fine. delay_methods = ["set_tunnel", "set_debuglevel"] if self._conn is None and name in delay_methods: # Return a little closure that saves off the method call to # apply later. def capture(obj, *args, **kwargs): self._call_queue.append((name, args, kwargs)) return capture elif self._conn is None: # We're being told to do something! We can now connect to the # remote server and build the connection object. self._delayed_connect() # Call through to the underlying object. return getattr(self._conn, name)
def _check_one_host(host): try: conn = httplib.HTTPSConnection(host, 443, timeout=30) header = { "user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", "accept": "application/json, text/javascript, */*; q=0.01", "accept-encoding": "gzip, deflate, sdch", "accept-language": 'en-US,en;q=0.8,ja;q=0.6,zh-CN;q=0.4,zh;q=0.2', "connection": "keep-alive" } conn.request("HEAD", "/", headers=header) response = conn.getresponse() if response.status: return True except Exception as e: return False
def __init__(self, parent, site_customization): InterfaceAction.__init__(self, parent, site_customization) https_proxy = get_proxies(debug=False).get('https', None) if https_proxy: https_address = ':'.join(https_proxy.split(':')[:-1]) https_port = int(https_proxy.split(':')[-1]) goodreads_conn = HTTPSConnection(https_address, https_port) goodreads_conn.set_tunnel('www.goodreads.com', 443) amazon_conn = HTTPSConnection(https_address, https_port) amazon_conn.set_tunnel('www.amazon.com', 443) else: goodreads_conn = HTTPSConnection('www.goodreads.com') amazon_conn = HTTPSConnection('www.amazon.com') self._connections = {'goodreads': goodreads_conn, 'amazon': amazon_conn} self.menu = QMenu(self.gui)
def is_alive(self): url = urlparse.urlparse(self.conf.monasca_url) if url.scheme == 'https': http_connector = httplib.HTTPSConnection else: http_connector = httplib.HTTPConnection try: connection = http_connector(host=url.netloc) connection.request('HEAD', url=url.path) response = connection.getresponse() except httplib.socket.error: return False try: if getattr(response, 'status') in [200, 401]: return True except AttributeError: pass return False
def checkUpdate(): try: conn = httplib.HTTPSConnection("raw.githubusercontent.com", 443) conn.request("GET", "/JonathanSalwan/ROPgadget/master/ropgadget/version.py") except: print("Can't connect to raw.githubusercontent.com") return d = conn.getresponse().read() majorVersion = re.search("MAJOR_VERSION.+=.+(?P<value>[\d])", d).group("value") minorVersion = re.search("MINOR_VERSION.+=.+(?P<value>[\d])", d).group("value") webVersion = int("%s%s" %(majorVersion, minorVersion)) curVersion = int("%s%s" %(MAJOR_VERSION, MINOR_VERSION)) if webVersion > curVersion: print("The version %s.%s is available. Currently, you use the version %d.%d." %(majorVersion, minorVersion, MAJOR_VERSION, MINOR_VERSION)) else: print("Your version is up-to-date.")
def getResponse(url, method, data): # print url, method, data # ????? conn = httplib.HTTPSConnection('xueqiu.com') conn.request(method, url, data, getHeaders()) # ?????? resp = conn.getresponse() status = resp.status if status != 200: print '??????' quit() entity = resp.read(); decode = json.loads(entity) return decode # ???????
def face_detect(image_url): params =urllib.urlencode({ # Request parameters 'returnFaceId': 'true', 'returnFaceLandmarks': 'false' }) body = '{"url":"%s"}'% image_url try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("POST", "/face/v1.0/detect?%s" % params, body, headers) response = conn.getresponse() data = response.read() conn.close() obj = json.loads(data) #print obj[0]['faceId'] return obj[0]['faceId'] except Exception as e: print("Error: %s" % e.message)
def create_person_group(): params = urllib.urlencode({ 'personGroupId' : 'group1' }) body = '{}' try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("PUT", "/face/v1.0/persongroups/{personGroupId}?%s" % params,body, headers) response = conn.getresponse() data = response.read() print(data) conn.close() except Exception as e: print("Error: %s" % e.message)
def get_persons(): try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("GET", "/face/v1.0/persongroups/%s/persons?" % personGroupId, "", headers) response = conn.getresponse() data = response.read() data = json.loads(data) #print(data) persons=[] for row in data: persons.append({'name':row['name'],'personId':row['personId']}) conn.close() return persons except Exception as e: print("Error: %s" % e.message)
def create_person(pname,udata): params = urllib.urlencode({ 'personGroupId' : personGroupId }) persons=get_persons() for row in persons: if pname == row['name']: return row['personId'] body = '{"name":"%s","userData":"%s"}' % (pname,udata) try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("POST", "/face/v1.0/persongroups/%s/persons?" % personGroupId, body, headers) response = conn.getresponse() data = response.read() data = json.loads(data) conn.close() if not data['personId']: return '' else: return data['personId'] except Exception as e: print("Error: %s" % e.message)
def train(): try: conn = httplib.HTTPSConnection('westus.api.cognitive.microsoft.com') conn.request("POST", "/face/v1.0/persongroups/%s/train?" % personGroupId, "", headers) response = conn.getresponse() data = response.read() conn.close() except Exception as e: print("Error: %s" % e.message) #if __name__ == '__main__': #fid =face_detect('http://res.cloudinary.com/aish/image/upload/v1488457817/SmartMirror/aishwarya/6.jpg') #print fid #print create_person('aishwarya_mittal','Aishwarya Mittal') #url='http://res.cloudinary.com/aish/image/upload/v1488721372/SmartMirror/dataset/aishwarya_mittal/img_1.jpg' #pid='7b1f2956-8635-4ce0-bfff-bf76afccc899' #add_person_face(pid,url) #face_identify('3885e697-f922-4e39-800c-1b7e0387bbf7')
def _http_request(self, url, timeout=40): try: if not url: url = '/' conn_fuc = httplib.HTTPSConnection if self.schema == 'https' else httplib.HTTPConnection conn = conn_fuc(self.host, timeout=timeout) conn.request(method='GET', url=url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.111 Safari/537.36 BBScan/1.0'} ) resp = conn.getresponse() resp_headers = dict(resp.getheaders()) status = resp.status if resp_headers.get('content-type', '').find('text') >= 0 or resp_headers.get('content-type', '').find('html') >= 0 or \ int(resp_headers.get('content-length', '0')) <= 1048576: html_doc = self._decode_response_text(resp.read()) else: html_doc = '' conn.close() return status, resp_headers, html_doc except Exception, e: #logging.error('[Exception in InfoDisScanner._http_request] %s' % e) return -1, {}, ''
def _new_conn(self): """ Return a fresh :class:`httplib.HTTPSConnection`. """ self.num_connections += 1 log.info("Starting new HTTPS connection (%d): %s" % (self.num_connections, self.host)) if not ssl: return HTTPSConnection(host=self.host, port=self.port) connection = VerifiedHTTPSConnection(host=self.host, port=self.port) connection.set_cert(key_file=self.key_file, cert_file=self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs) return connection ## Helpers
def do_post(self, url, body, headers, redirected=False): if self.ssl: conn = httplib.HTTPSConnection(self.server, self.port) conn.request("POST", url, body, headers) else: conn = httplib.HTTPConnection(self.server, self.port) conn.request("POST", url, body, headers) res = conn.getresponse() if res.status == 451: self.server = res.getheader("X-MS-Location").split()[2] if not redirected: return self.do_post(url, body, headers, False) else: raise Exception("Redirect loop encountered. Stopping request.") else: return res
def get_status(self,gps_lat,gps_lon,weather): if Globals.AirConnected: try: self.connection = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut) self.headers = Globals.xapikey self.connection.request('GET', '/status/v2/point?latitude='+gps_lat+'&longitude='+gps_lon+'&weather='+weather, '', self.headers) result = self.connection.getresponse().read() self.status_json = json.loads(result) Globals.strPrint (self.thisGlobals,self.status_json) return (self.status_json['status'] == "success") except Exception,e: Globals.strPrint (self.thisGlobals,"Airmap: No Connection or slow connection ->Request Timeout...") #Globals.strPrint(self.thisGlobals,str(e)) return False else: Globals.strPrint (self.thisGlobals,"Not Connected") return False
def get_SecureToken(self): """Retrieve security token and refresh :param: None :returns: Token if successful otherwise False :todo: Remove hardcoded token and add token from https endpoint based on CID """ try: connectAuth0 = httplib.HTTPSConnection(Globals.keyAddr, Globals.httpsPort, timeout=Globals.timeOut) headers = Globals.xapikey connectAuth0.request('POST', '/delegation', json.dumps({"refresh_token":"ezKrfuSeSD8DA7w2Dq7gqsL10sYuKdVEXA6BIIJLEAJQh","grant_type":"urn:ietf:params:oauth:grant-type:jwt-bearer","client_id":"2iV1XSfdLJNOfZiTZ9JGdrNHtcNzYstt","api_type":"app"}), headers) result = connectAuth0.getresponse().read() parsed_json = json.loads(result) Globals.myToken = parsed_json['id_token'] return Globals.myToken except: print "OAuth2 Error..." traceback.print_exc() return False
def end_Flight(self, flightID): try: connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut) headers = Globals.xapikey headers['Authorization'] = "Bearer {}".format(Globals.myToken) connectFlight.request('POST', '/flight/v2/{}/end'.format(flightID), '', headers) result = connectFlight.getresponse().read() parsed_json = json.loads(result) parsed_status = parsed_json['status'] if parsed_status != "success": return False else: return True except: print "End Flight Error..." traceback.print_exc()
def delete_Flight(self, flightID): try: connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut) headers = Globals.xapikey headers['Authorization'] = "Bearer {}".format(Globals.myToken) connectFlight.request('POST', '/flight/v2/{}/delete'.format(flightID), '', headers) result = connectFlight.getresponse().read() parsed_json = json.loads(result) parsed_status = parsed_json['status'] if parsed_status != "success": return False else: return True except: print "End Flight Error..." traceback.print_exc()
def start_comm(self, flightID): try: connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut) headers = Globals.xapikey headers['Authorization'] = "Bearer {}".format(Globals.myToken) connectFlight.request('POST', '/flight/v2/{}/start-comm'.format(flightID), '', headers) result = connectFlight.getresponse().read() parsed_json = json.loads(result) print parsed_json #parsed_status = parsed_json['data']['key']['data'] parsed_status = parsed_json['data']['key'] print "H:" + parsed_status #thisKey = (''.join(str(hex(i)[2:].zfill(2)) for i in parsed_status)).decode('hex') thisKey = parsed_status.decode('base64') return thisKey except: print "Could Not Start Comms..." traceback.print_exc()
def recover_Pilot(self): try: connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut) headers = Globals.xapikey headers['Authorization'] = "Bearer {}".format(Globals.myToken) connectFlight.request('GET', '/pilot/v2/profile', "", headers) result = connectFlight.getresponse().read() try: parsed_json = json.loads(result) parsed_status = parsed_json['status'] print parsed_status Globals.pilot_id = parsed_json['data']['id'] Globals.pilotIDValid = True except: Globals.strPrint (self.thisGlobals,"Pilot Recover ID not found...Retry!") Globals.strPrint (self.thisGlobals,result) return False if parsed_status != "success": return False except: print "Create Flight Error..." traceback.print_exc() return Globals.pilot_id
def api_request(command): ret = {} conn = httplib.HTTPSConnection(target_url) try: conn.request("GET", "/api/" + command + "/") except Exception as e: logMsg("ERROR api_request(): " + str(e)) else: response = conn.getresponse() if response.status != 200: logMsg("ERROR api_request(): " + str(response.status) + " " + response.reason) else: try: ret = json.load(response) except ValueError as e: logMsg("ERROR api_request(): " + str(e)) conn.close() return ret
def info_request(command): logger.debug("Making api request to " + REQUEST_HOST + REQUEST_PATH + command + '/') ret = None conn = None try: conn = httplib.HTTPSConnection(REQUEST_HOST, timeout=HTTPCON_TIMEOUT) conn.request('GET', REQUEST_PATH + command + '/') # Pre-process response resp = conn.getresponse() data = resp.read() # Utilizar a classe OrderedDict para preservar a ordem dos elementos response_json = json.loads(data, object_pairs_hook=OrderedDict) logger.debug("info_request status: " + str(resp.status) + " " + str(resp.reason)) except Exception as e: logger.error("Failed api request: " + str(e)) else: ret = response_json finally: if conn: conn.close() return ret
def https_open(self, req): return self.do_open(httplib.HTTPSConnection, req)
def make_connection(self, host): if self._connection and host == self._connection[0]: return self._connection[1] # create a HTTPS connection object from a host descriptor # host may be a string, or a (host, x509-dict) tuple try: HTTPS = httplib.HTTPSConnection except AttributeError: raise NotImplementedError( "your version of httplib doesn't support HTTPS" ) else: chost, self._extra_headers, x509 = self.get_host_info(host) self._connection = host, HTTPS(chost, None, **(x509 or {})) return self._connection[1] ## # Standard server proxy. This class establishes a virtual connection # to an XML-RPC server. # <p> # This class is available as ServerProxy and Server. New code should # use ServerProxy, to avoid confusion. # # @def ServerProxy(uri, **options) # @param uri The connection point on the server. # @keyparam transport A transport factory, compatible with the # standard transport class. # @keyparam encoding The default encoding used for 8-bit strings # (default is UTF-8). # @keyparam verbose Use a true value to enable debugging output. # (printed to standard output). # @see Transport
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): httplib.HTTPSConnection.__init__(self, host, port=port, key_file=key_file, cert_file=cert_file, strict=strict) self.timeout = timeout self.proxy_info = proxy_info if ca_certs is None: ca_certs = CA_CERTS self.ca_certs = ca_certs self.disable_ssl_certificate_validation = \ disable_ssl_certificate_validation # The following two methods were adapted from https_wrapper.py, released # with the Google Appengine SDK at # http://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py # under the following license: # # Copyright 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
def __init__(self, host, port=None, key_file=None, cert_file=None, strict=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): httplib.HTTPSConnection.__init__(self, host, port=port, key_file=key_file, cert_file=cert_file, strict=strict, timeout=timeout) self._fetch = _new_fixed_fetch( not disable_ssl_certificate_validation) # Update the connection classes to use the Googel App Engine specific ones.
def _create_connection(scheme, netloc): if scheme == 'https': conn = httplib.HTTPSConnection(netloc) else: conn = httplib.HTTPConnection(netloc) conn.connect() return conn
def __init__(self, output_spec): """ Uses HTTP chunked transfer-encoding to stream a request body to a server. Unfortunately requests does not support hooking into this logic easily, so we use the lower-level httplib module. """ super(HttpStreamPushAdapter, self).__init__(output_spec) self._closed = False parts = urlparse.urlparse(output_spec['url']) if parts.scheme == 'https': ssl_context = ssl.create_default_context() conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context) else: conn = httplib.HTTPConnection(parts.netloc) try: conn.putrequest(output_spec.get('method', 'POST').upper(), parts.path, skip_accept_encoding=True) for header, value in output_spec.get('headers', {}).items(): conn.putheader(header, value) conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() # This actually flushes the headers to the server except Exception: print('HTTP connection to "%s" failed.' % output_spec['url']) conn.close() raise self.conn = conn