我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用httplib.InvalidURL()。
def get_page(self,url): """ loads a webpage into a string """ page = '' try: f = urllib.urlopen(url=url) page = f.read() f.close() except IOError: print "Error opening {}".format(url) except httplib.InvalidURL, e: print "{} caused an Invalid URL error.".format(url) if hasattr(e, 'reason'): print 'We failed to reach a server.' print 'Reason: ', e.reason elif hasattr(e, 'code'): print 'The server couldn\'t fulfill the request.' print 'Error code: ', e.code return page
def parseSitemap(url, retVal=None): global abortedFlag if retVal is not None: logger.debug("parsing sitemap '%s'" % url) try: if retVal is None: abortedFlag = False retVal = oset() try: content = Request.getPage(url=url, raise404=True)[0] if not abortedFlag else "" except httplib.InvalidURL: errMsg = "invalid URL given for sitemap ('%s')" % url raise SqlmapSyntaxException, errMsg for match in re.finditer(r"<loc>\s*([^<]+)", content or ""): if abortedFlag: break url = match.group(1).strip() if url.endswith(".xml") and "sitemap" in url.lower(): if kb.followSitemapRecursion is None: message = "sitemap recursion detected. Do you want to follow? [y/N] " kb.followSitemapRecursion = readInput(message, default='N', boolean=True) if kb.followSitemapRecursion: parseSitemap(url, retVal) else: retVal.add(url) except KeyboardInterrupt: abortedFlag = True warnMsg = "user aborted during sitemap parsing. sqlmap " warnMsg += "will use partial list" logger.warn(warnMsg) return retVal
def parseSitemap(url, retVal=None): global abortedFlag if retVal is not None: logger.debug("parsing sitemap '%s'" % url) try: if retVal is None: abortedFlag = False retVal = oset() try: content = Request.getPage(url=url, raise404=True)[0] if not abortedFlag else "" except httplib.InvalidURL: errMsg = "invalid URL given for sitemap ('%s')" % url raise SqlmapSyntaxException, errMsg for match in re.finditer(r"<loc>\s*([^<]+)", content or ""): if abortedFlag: break url = match.group(1).strip() if url.endswith(".xml") and "sitemap" in url.lower(): if kb.followSitemapRecursion is None: message = "sitemap recursion detected. Do you want to follow? [y/N] " test = readInput(message, default="N") kb.followSitemapRecursion = test[0] in ("y", "Y") if kb.followSitemapRecursion: parseSitemap(url, retVal) else: retVal.add(url) except KeyboardInterrupt: abortedFlag = True warnMsg = "user aborted during sitemap parsing. sqlmap " warnMsg += "will use partial list" logger.warn(warnMsg) return retVal
def get_connection(self): _class = self.connection_class try: return _class(self.endpoint_hostname, self.endpoint_port, **self.connection_kwargs) except httplib.InvalidURL: raise exc.EndpointNotFound
def _openURL2(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if (response.getcode() == 200): byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.getcode()) except HTTPError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code) except URLError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason) except InvalidURL as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL' except Exception as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def _openURL3(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if (response.status == 200): byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.status) except HTTPError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code) except URLError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason) except InvalidURL as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL' except Exception as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def metricCollector2(self): try: if (self._userName and self._userPass): password_mgr = urllib2.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener) response = urllib2.urlopen(self._url, timeout=10) if response.getcode() == 200: byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code' + str(response.getcode()) except HTTPError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : HTTP Error ' + str(e.code) except URLError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : URL Error ' + str(e.reason) except InvalidURL as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : Invalid URL' except Exception as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Exception occured in collecting data : ' + str(e)
def metricCollector3(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if response.status == 200: byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code' + str(response.status) except HTTPError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : HTTP Error ' + str(e.code) except URLError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : URL Error ' + str(e.reason) except InvalidURL as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : Invalid URL' except Exception as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Exception occured in collecting data : ' + str(e)
def _openURL2(self,str_URLsuffix): str_responseData = None url = None try: url = self._url + str_URLsuffix password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(None, url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) proxy = urlconnection.ProxyHandler({}) # Uses NO Proxy opener = urlconnection.build_opener(proxy, auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(url, timeout = 5) if response.getcode() == 200: byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') else: self.dictEsPluginData['status'] = '0' self.dictEsPluginData['msg'] = 'Invalid response after opening URL : ' + str(response.getcode()) except HTTPError as e: self.dictEsPluginData['status'] = '0' self.dictEsPluginData['msg'] ='HTTP Error '+str(e.code) except URLError as e: self.dictEsPluginData['status'] = '0' self.dictEsPluginData['msg'] = 'URL Error '+str(e.reason) except InvalidURL as e: self.dictEsPluginData['status'] = '0' self.dictEsPluginData['msg'] = 'Invalid URL' except Exception as e: self.dictEsPluginData['status'] = '0' self.dictEsPluginData['msg'] = 'Exception while opening stats url in python 2 : ' + str(e) finally: return str_responseData
def set_server(my, server_name): '''Function: set_server(server_name) Set the server name for this XML-RPC server''' my.server_name = server_name if my.protocol == "local": from pyasm.prod.service import ApiXMLRPC my.server = ApiXMLRPC() my.server.set_protocol('local') my.has_server = True return if (my.server_name.startswith("http://") or my.server_name.startswith("https://")): url = "%s/tactic/default/Api/" % my.server_name else: url = "http://%s/tactic/default/Api/" % my.server_name #url = "http://localhost:8081/" # TODO: Not implmeneted: This is needed for isolation of transactions #if my.transaction_ticket: # url = '%s%s' % (url, my.transaction_ticket) if my.transport: my.server = xmlrpclib.Server(url, allow_none=True, transport=my.transport) else: my.server = xmlrpclib.Server(url, allow_none=True) try: pass #print my.server.test(my.ticket) except httplib.InvalidURL: raise TacticApiException("You have supplied an invalid server name [%s]" % my.server_name) my.has_server = True # WARNING: this is changing code in the xmlrpclib library. This # library is not sending a proper user agent. Hacking it in # so that at least the OS is sent if os.name == "nt": user_agent = 'xmlrpclib.py (Windows)' else: user_agent = 'xmlrpclib.py (Linux)' xmlrpclib.Transport.user_agent = user_agent