我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用socket.gaierror()。
def get_ips_from_url(url): """ Retrieve IPs from url :param str url: The url to resolve :rtype: list :return: the list of resolved IP address for given url """ try: parsed = urlparse(url) if parsed.hostname: socket.setdefaulttimeout(5) ips = socket.gethostbyname_ex(parsed.hostname)[2] return ips except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout): pass
def request(self, method, url, body, headers): # Calculate the absolute URI, which fetch requires netloc = self.host if self.port: netloc = '%s:%s' % (self.host, self.port) absolute_uri = '%s://%s%s' % (self.scheme, netloc, url) try: response = fetch(absolute_uri, payload=body, method=method, headers=headers, allow_truncated=False, follow_redirects=False, deadline=self.timeout, validate_certificate=self.validate_certificate) self.response = ResponseDict(response.headers) self.response['status'] = str(response.status_code) self.response['reason'] = httplib.responses.get(response.status_code, 'Ok') self.response.status = response.status_code setattr(self.response, 'read', lambda : response.content) # Make sure the exceptions raised match the exceptions expected. except InvalidURLError: raise socket.gaierror('') except (DownloadError, ResponseTooLargeError, SSLCertificateError): raise httplib.HTTPException()
def gethostbyname2(self, hostname): try: iplist = self.dns_cache[hostname] except KeyError: if re.match(r'^\d+\.\d+\.\d+\.\d+$', hostname) or ':' in hostname: iplist = [hostname] elif self.dns_servers: try: record = dnslib_resolve_over_udp(hostname, self.dns_servers, timeout=2, blacklist=self.dns_blacklist) except socket.gaierror: record = dnslib_resolve_over_tcp(hostname, self.dns_servers, timeout=2, blacklist=self.dns_blacklist) iplist = dnslib_record2iplist(record) else: iplist = socket.gethostbyname_ex(hostname)[-1] self.dns_cache[hostname] = iplist return iplist
def choose_boundary(): """Return a string usable as a multipart boundary. The string chosen is unique within a single program run, and incorporates the user id (if available), process id (if available), and current time. So it's very unlikely the returned string appears in message text, but there's no guarantee. The boundary contains dots so you have to quote it in the header.""" global _prefix import time if _prefix is None: import socket try: hostid = socket.gethostbyname(socket.gethostname()) except socket.gaierror: hostid = '127.0.0.1' try: uid = repr(os.getuid()) except AttributeError: uid = '1' try: pid = repr(os.getpid()) except AttributeError: pid = '1' _prefix = hostid + '.' + uid + '.' + pid return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter()) # Subroutines for decoding some common content-transfer-types
def select_ip_version(host, port): """Returns AF_INET4 or AF_INET6 depending on where to connect to.""" # disabled due to problems with current ipv6 implementations # and various operating systems. Probably this code also is # not supposed to work, but I can't come up with any other # ways to implement this. # try: # info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, # socket.SOCK_STREAM, 0, # socket.AI_PASSIVE) # if info: # return info[0][0] # except socket.gaierror: # pass if ':' in host and hasattr(socket, 'AF_INET6'): return socket.AF_INET6 return socket.AF_INET
def resolve(self, host, port, family): """Return list of (family, address) pairs.""" child_gr = greenlet.getcurrent() main = child_gr.parent assert main is not None, "Should be on child greenlet" def handler(exc_typ, exc_val, exc_tb): # If netutil.Resolver is configured to use TwistedResolver. if DomainError and issubclass(exc_typ, DomainError): exc_typ = socket.gaierror exc_val = socket.gaierror(str(exc_val)) # Depending on the resolver implementation, we could be on any # thread or greenlet. Return to the loop's thread and raise the # exception on the calling greenlet from there. self.io_loop.add_callback(functools.partial( child_gr.throw, exc_typ, exc_val, exc_tb)) return True # Don't propagate the exception. with stack_context.ExceptionStackContext(handler): self.resolver.resolve(host, port, family, callback=child_gr.switch) return main.switch()
def test_ipv6(self): try: [sock] = bind_sockets(None, '::1', family=socket.AF_INET6) port = sock.getsockname()[1] self.http_server.add_socket(sock) except socket.gaierror as e: if e.args[0] == socket.EAI_ADDRFAMILY: # python supports ipv6, but it's not configured on the network # interface, so skip this test. return raise url = '%s://[::1]:%d/hello' % (self.get_protocol(), port) # ipv6 is currently enabled by default but can be disabled self.http_client.fetch(url, self.stop, allow_ipv6=False) response = self.wait() self.assertEqual(response.code, 599) self.http_client.fetch(url, self.stop) response = self.wait() self.assertEqual(response.body, b"Hello world!")
def is_valid_ip(ip): """Returns true if the given string is a well-formed IP address. Supports IPv4 and IPv6. """ if not ip or '\x00' in ip: # getaddrinfo resolves empty strings to localhost, and truncates # on zero bytes. return False try: res = socket.getaddrinfo(ip, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_NUMERICHOST) return bool(res) except socket.gaierror as e: if e.args[0] == socket.EAI_NONAME: return False raise return True
def run(self): value = getword() try: print "-"*12 print "User:",user[:-1],"Password:",value pop = poplib.POP3(ipaddr[0]) pop.user(user[:-1]) pop.pass_(value) print "\t\nLogin successful:",value, user print pop.stat() pop.quit() work.join() sys.exit(2) except(poplib.error_proto, socket.gaierror, socket.error, socket.herror), msg: #print "An error occurred:", msg pass
def sendchk(listindex, host, user, password): # seperated function for checking try: smtp = smtplib.SMTP(host) smtp.login(user, password) code = smtp.ehlo()[0] if not (200 <= code <= 299): code = smtp.helo()[0] if not (200 <= code <= 299): raise SMTPHeloError(code, resp) smtp.sendmail(fromaddr, toaddr, message) print "\n\t[!] Email Sent Successfully:",host, user, password print "\t[!] Message Sent Successfully\n" LSstring = host+":"+user+":"+password+"\n" nList.append(LSstring) # special list for AMS file ID's LFile = open(output, "a") LFile.write(LSstring) # save working host/usr/pass to file LFile.close() AMSout = open("AMSlist.txt", "a") AMSout.write("[Server"+str(nList.index(LSstring))+"]\nName="+str(host)+"\nPort=25\nUserID=User\nBccSize=50\nUserName="+str(user)+"\nPassword="+str(password)+"\nAuthType=0\n\n") smtp.quit() except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg: print "[-] Login Failed:", host, user, password pass
def test(host): socket.setdefaulttimeout(5) if int(verbose) == 1: s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[+] Testing:", host)) try: if host[:7] != "http://": host = "http://"+host source = urllib2.urlopen(host).read() if re.search(MATCH, source): s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[!] Found:", host)) file = open("foundsqli.txt", "a") file.write("\n[!] Found: "+host) file.close() else: if int(verbose) == 1: s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[-] Not Vuln:", host)) except(socket.gaierror, socket.timeout, socket.error), msg: s.send("PRIVMSG %s :%s%s @ %s\r\n" % (CHAN, "[-] Error: ",msg, host)) except: pass
def isProgramURL(url: str, acceptAll=True) -> bool: """ Whether the given url is a program URL """ domain = urlparse(url).netloc.split(':')[0].lower() if not config.domains: return True if config.domains or (not acceptAll): try: ip = socket.gethostbyname(domain) except (socket.gaierror, UnicodeError): ip = None if domain and isinstance(config.domains, list): return (any([domain.endswith(hostname.lower()) for hostname in config.domains]) and ip != '127.0.0.1') return False if acceptAll: return True
def verbose_ping(dest_addr, timeout = 2, count = 5): """ Send >count< ping to >dest_addr< with the given >timeout< and display the result. """ Stats = "socketError" for i in xrange(count): print "ping %s..." % dest_addr, try: delay = do_one(dest_addr, timeout) except socket.gaierror, e: print "failed. (socket error: '%s')" % e[1] break if delay == None: print "failed. (timeout within %ssec.)" % timeout Stats = "Down" else: delay = delay * 1000 print "get ping in %0.4fms" % delay return "Up" print return Stats
def get_camera_details(self): """ Return the details of the cameras. @rtype: list [str] @return: ordered list of camera details """ camera_dict = dict() camera_config_param = "/robot_config/camera_config" try: camera_dict = rospy.get_param(camera_config_param) except KeyError: rospy.logerr("RobotParam:get_camera_details cannot detect any " "cameras under the parameter {0}".format(camera_config_param)) except (socket.error, socket.gaierror): self._log_networking_error() return camera_dict
def get_robot_assemblies(self): """ Return the names of the robot's assemblies from ROS parameter. @rtype: list [str] @return: ordered list of assembly names (e.g. right, left, torso, head). on networked robot """ assemblies = list() try: assemblies = rospy.get_param("/robot_config/assembly_names") except KeyError: rospy.logerr("RobotParam:get_robot_assemblies cannot detect assembly names" " under param /robot_config/assembly_names") except (socket.error, socket.gaierror): self._log_networking_error() return assemblies
def get_joint_names(self, limb_name): """ Return the names of the joints for the specified limb from ROS parameter. @type limb_name: str @param limb_name: name of the limb for which to retrieve joint names @rtype: list [str] @return: ordered list of joint names from proximal to distal (i.e. shoulder to wrist). joint names for limb """ joint_names = list() try: joint_names = rospy.get_param( "robot_config/{0}_config/joint_names".format(limb_name)) except KeyError: rospy.logerr(("RobotParam:get_joint_names cannot detect joint_names for" " arm \"{0}\"").format(limb_name)) except (socket.error, socket.gaierror): self._log_networking_error() return joint_names
def get_robot_name(self): """ Return the name of class of robot from ROS parameter. @rtype: str @return: name of the class of robot (eg. "sawyer", "baxter", etc.) """ robot_name = None try: robot_name = rospy.get_param("/manifest/robot_class") except KeyError: rospy.logerr("RobotParam:get_robot_name cannot detect robot name" " under param /manifest/robot_class") except (socket.error, socket.gaierror): self._log_networking_error() return robot_name
def select_ip_version(host, port): """Returns AF_INET4 or AF_INET6 depending on where to connect to.""" # disabled due to problems with current ipv6 implementations # and various operating systems. Probably this code also is # not supposed to work, but I can't come up with any other # ways to implement this. ##try: ## info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, ## socket.SOCK_STREAM, 0, ## socket.AI_PASSIVE) ## if info: ## return info[0][0] ##except socket.gaierror: ## pass if ':' in host and hasattr(socket, 'AF_INET6'): return socket.AF_INET6 return socket.AF_INET
def resolve_dns(self, shost): """Perform DNS lookups once per file, and cache the results. tested.""" rdns = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$" if (re.match(rdns, shost) == None): if shost in self.dnscache: logging.debug("Host %s in DNSCACHE, returned %s", shost, self.dnscache[shost]) shost = self.dnscache[shost] else: logging.debug("Host %s not in DNSCACHE", shost) #try socket lookupt try: resolved_ip = socket.gethostbyname(shost) self.dnscache[shost] = resolved_ip logging.debug("Resolved %s to %s", shost, resolved_ip) shost = resolved_ip except socket.gaierror: compiler_bailout("Cannot resolve %s" % shost) return shost
def start(self): if len(self._args.hosts) == 0: # read from stdin for line in self.gen_std_line(): search = re.search(strings.REG_IP, line) _format = self._args.format if self._args.format else '%C %s %c' if search: write(line + "\t" + self._format(search.group(0), _format)) else: write(line) write("\n") else: if self._args.format: try: write(self._format(self._args.hosts[0], self._args.format)) except socket.gaierror: sys.stderr.write("Can't resolve name: " + self._args.hosts[0]) else: for host in self._args.hosts: if self._args.detail: write("\n\n".join(self._get_output(host))) write("\n") else: write("\n".join(self._get_output(host))) write("\n")
def _connect(self): try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if self._broadcast: # self._sock.bind((self._broadcast_interface, self._port)) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) return self._sock except socket.gaierror: error = "Unable to connect to or resolve host: {}".format( self._host) log.error(error) raise IOError(error) # Push new data to strand
def process(input_string): """ minimal chat bot :param input_string: """ request.query = input_string try: response = request.getresponse().read() except socket.gaierror: # if the user is not connected to internet dont give a response click.echo('Yoda cannot sense the internet right now!') sys.exit(1) output = json.loads(response) answer = output["result"]["fulfillment"]["speech"] chalk.blue('Yoda speaks:') click.echo(answer)
def get_host_ip(hostIP=None): if hostIP is None or hostIP == 'auto': hostIP = 'ip' if hostIP == 'dns': hostIP = socket.getfqdn() elif hostIP == 'ip': from socket import gaierror try: hostIP = socket.gethostbyname(socket.getfqdn()) except gaierror: logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()') hostIP = socket.gethostbyname(socket.gethostname()) if hostIP.startswith("127."): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # doesn't have to be reachable s.connect(('10.255.255.255', 1)) hostIP = s.getsockname()[0] return hostIP
def ping(self): """ Run the ping process """ for i in range(self.count): print ("Ping to %s..." % self.target_host,) try: delay = self.ping_once() except socket.gaierror as e: print ("Ping failed. (socket error: '%s')" % e[1]) break if delay == None: print ("Ping failed. (timeout within %ssec.)" % self.timeout) else: delay = delay * 1000 print ("Get pong in %0.4fms" % delay)
def run(): try: try: socket.setdefaulttimeout(float(variables['timeout'][0])) except ValueError: printError('invalid timeout') return ModuleError("invalid timeout") conn = http.client.HTTPConnection(variables['target'][0]) conn.request("HEAD","/index.html") res = conn.getresponse() results = res.getheaders() print('') for item in results: print(colors.yellow+item[0], item[1]+colors.end) print('') return results except http.client.InvalidURL: printError('invalid url') return ("invalid url") except socket.gaierror: printError('name or service not known') return ModuleError("name or service not known") except socket.timeout: printError('timeout') return ModuleError("timeout")
def __init__(self, host='', port=0, local_hostname=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): """Initialize a new instance. If specified, `host' is the name of the remote host to which to connect. If specified, `port' specifies the port to which to connect. By default, smtplib.SMTP_PORT is used. An SMTPConnectError is raised if the specified `host' doesn't respond correctly. If specified, `local_hostname` is used as the FQDN of the local host. By default, the local hostname is found using socket.getfqdn(). """ self.timeout = timeout self.esmtp_features = {} if host: (code, msg) = self.connect(host, port) if code != 220: raise SMTPConnectError(code, msg) if local_hostname is not None: self.local_hostname = local_hostname else: # RFC 2821 says we should use the fqdn in the EHLO/HELO verb, and # if that can't be calculated, that we should use a domain literal # instead (essentially an encoded IP address like [A.B.C.D]). fqdn = socket.getfqdn() if '.' in fqdn: self.local_hostname = fqdn else: # We can't find an fqdn hostname, so use a domain literal addr = '127.0.0.1' try: addr = socket.gethostbyname(socket.gethostname()) except socket.gaierror: pass self.local_hostname = '[%s]' % addr
def _safe_gethostbyname(host): try: return socket.gethostbyname(host) except socket.gaierror: return None
def get_names(self): if FileHandler.names is None: try: FileHandler.names = tuple( socket.gethostbyname_ex('localhost')[2] + socket.gethostbyname_ex(socket.gethostname())[2]) except socket.gaierror: FileHandler.names = (socket.gethostbyname('localhost'),) return FileHandler.names # not entirely sure what the rules are here
def start(cdxjFilePath=INDEX_FILE, proxy=None): hostPort = ipwbConfig.getIPWBReplayConfig() app.proxy = proxy if not hostPort: ipwbConfig.setIPWBReplayConfig(IPWBREPLAY_IP, IPWBREPLAY_PORT) hostPort = ipwbConfig.getIPWBReplayConfig() if ipwbConfig.isDaemonAlive(): if cdxjFilePath == INDEX_FILE: ipwbConfig.firstRun() ipwbConfig.setIPWBReplayIndexPath(cdxjFilePath) app.cdxjFilePath = cdxjFilePath else: print('Sample data not pulled from IPFS.') print('Check that the IPFS daemon is running.') try: print('IPWB replay started on http://{0}:{1}'.format( IPWBREPLAY_IP, IPWBREPLAY_PORT )) app.run(host='0.0.0.0', port=IPWBREPLAY_PORT) except gaierror: print('Detected no active Internet connection.') print('Overriding to use default IP and port configuration.') app.run() except socketerror: print('Address {0}:{1} already in use!'.format( IPWBREPLAY_IP, IPWBREPLAY_PORT)) sys.exit()
def try_address(fqdn): """ Check if the fqdn is valid Args: fqdn (str): fully qualified domain name """ import socket try: socket.gethostbyname_ex(fqdn) except (socket.gaierror, UnicodeEncodeError): return False else: return True
def __dns_resolve_host(host, ip_version, timeout): """ Resolve a host using the system's facilities """ family = socket.AF_INET if ip_version == 4 else socket.AF_INET6 def proc(host, family, queue): try: queue.put(socket.getaddrinfo(host, 0, family)) except socket.gaierror as ex: # TODO: Would be nice if we could isolate just the not # found error. queue.put([]) except socket.timeout: # Don't care, we just want the queue to be empty if # there's an error. pass queue = Queue.Queue() thread = threading.Thread(target=proc, args=(host, family, queue)) thread.setDaemon(True) thread.start() try: results = queue.get(True, timeout) if len(results) == 0: return None family, socktype, proto, canonname, sockaddr = results[0] except Queue.Empty: return None # NOTE: Don't make any attempt to kill the thread, as it will get # Python all confused if it holds the GIL. (ip) = sockaddr return str(ip[0])