我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用socket.AI_ADDRCONFIG。
def _connect_tcp(self, host, port): try: flags = socket.AI_ADDRCONFIG except AttributeError: flags = 0 err = None for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, flags): af, socktype, proto, canonname, sa = res sock = None try: sock = socket.socket(af, socktype, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.settimeout(self.timeout) sock.connect(sa) return sock except socket.error as e: err = e if sock is not None: sock.close() if err is not None: raise err else: raise ConnectionError("getaddrinfo returns an empty list")
def connect(self, host, port, timeout): for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG): family, socket_type, proto, canon_name, sa = res sock = None try: sock = socket.socket(family, socket_type, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.settimeout(timeout) sock.connect(sa) return sock except socket.error as e: if sock is not None: sock.close() if e is not None: raise e else: raise OSError('Socket: getaddrinfo returns an empty list')
def test_AI_ADDRCONFIG(self): # When the users sets AI_ADDRCONFIG but only has an IPv4 # address configured we will iterate over the results, but the # call for the IPv6 address will fail rather then return an # empty list. In that case we should catch the exception and # only return the ones which worked. def getaddrinfo(addr, port, family, socktype, proto, aiflags): if addr == '127.0.0.1': return [(socket.AF_INET, 1, 0, '', ('127.0.0.1', 0))] elif addr == '::1' and aiflags & socket.AI_ADDRCONFIG: raise socket.error(socket.EAI_ADDRFAMILY, 'Address family for hostname not supported') elif addr == '::1' and not aiflags & socket.AI_ADDRCONFIG: return [(socket.AF_INET6, 1, 0, '', ('::1', 0, 0, 0))] greendns.socket.getaddrinfo = getaddrinfo greendns.resolve = _make_mock_resolve() greendns.resolve.add('localhost', '127.0.0.1') greendns.resolve.add('localhost', '::1') res = greendns.getaddrinfo('localhost', None, 0, 0, 0, socket.AI_ADDRCONFIG) assert res == [(socket.AF_INET, 1, 0, '', ('127.0.0.1', 0))]
def _write_SOCKS5_address(self, addr, file): """ Return the host and port packed for the SOCKS5 protocol, and the resolved address as a tuple object. """ host, port = addr proxy_type, _, _, rdns, username, password = self.proxy family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"} # If the given destination address is an IP address, we'll # use the IP address request even if remote resolving was specified. # Detect whether the address is IPv4/6 directly. for family in (socket.AF_INET, socket.AF_INET6): try: addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port except socket.error: continue # Well it's not an IP number, so it's probably a DNS name. if rdns: # Resolve remotely host_bytes = host.encode("idna") file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: # Resolve locally addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG) # We can't really work out what IP is reachable, so just pick the # first. target_addr = addresses[0] family = target_addr[0] host = target_addr[4][0] addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port
def _write_socks5_address(self, addr): host, port = addr proxy_type, rdns, _, _, username, password = self.proxy_settings family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"} for family in (socket.AF_INET, socket.AF_INET6): try: addr_bytes = socket.inet_pton(family, host) yield self.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) yield self.write(struct.pack(">H", port)) raise gen.Return((host, port)) except socket.error: continue if rdns: host_bytes = host.encode("idna") yield self.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: addresses = yield self.resolver.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG) target_addr = addresses[0] family = target_addr[0] host = target_addr[4][0] addr_bytes = socket.inet_pton(family, host) yield self.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) yield self.write(struct.pack(">H", port)) raise gen.Return((host, port))
def _write_SOCKS5_address(self, addr, file): """ Return the host and port packed for the SOCKS5 protocol, and the resolved address as a tuple object. """ host, port = addr proxy_type, _, _, rdns, username, password = self.proxy family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"} # If the given destination address is an IP address, we'll # use the IP address request even if remote resolving was specified. # Detect whether the address is IPv4/6 directly. for family in (socket.AF_INET, socket.AF_INET6): try: addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port except socket.error: continue # Well it's not an IP number, so it's probably a DNS name. if rdns: # Resolve remotely host_bytes = host.encode('idna') file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: # Resolve locally addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG) # We can't really work out what IP is reachable, so just pick the # first. target_addr = addresses[0] family = target_addr[0] host = target_addr[4][0] addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port
def _resolveAddr(self): if self._unix_socket is not None: return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, self._unix_socket)] else: return socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE | socket.AI_ADDRCONFIG)
def _get_dst_addr(self): infos = await self._loop.getaddrinfo( self._dst_host, self._dst_port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, flags=socket.AI_ADDRCONFIG) if not infos: raise OSError('getaddrinfo() returned empty list') return infos[0][0], infos[0][4][0]
def test_AI_ADDRCONFIG_noaddr(self): # If AI_ADDRCONFIG is used but there is no address we need to # get an exception, not an empty list. def getaddrinfo(addr, port, family, socktype, proto, aiflags): raise socket.error(socket.EAI_ADDRFAMILY, 'Address family for hostname not supported') greendns.socket.getaddrinfo = getaddrinfo greendns.resolve = _make_mock_resolve() try: greendns.getaddrinfo('::1', None, 0, 0, 0, socket.AI_ADDRCONFIG) except socket.error as e: assert e.errno == socket.EAI_ADDRFAMILY
def _write_SOCKS5_address(self, addr, file): """ Return the host and port packed for the SOCKS5 protocol, and the resolved address as a tuple object. """ host, port = addr proxy_type, _, _, rdns, username, password = self.proxy family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"} # If the given destination address is an IP address, we'll # use the IP address request even if remote resolving was specified. # Detect whether the address is IPv4/6 directly. for family in (socket.AF_INET, socket.AF_INET6): try: addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port except socket.error: continue # Well it's not an IP number, so it's probably a DNS name. if rdns: # Resolve remotely host_bytes = host.encode('idna') file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: # Resolve locally addresses = socket.getaddrinfo( host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG) # We can't really work out what IP is reachable, so just pick the # first. target_addr = addresses[0] family = target_addr[0] host = target_addr[4][0] addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128): """Creates listening sockets bound to the given port and address. Returns a list of socket objects (multiple sockets are returned if the given address maps to multiple IP addresses, which is most common for mixed IPv4 and IPv6 use). Address may be either an IP address or hostname. If it's a hostname, the server will listen on all IP addresses associated with the name. Address may be an empty string or None to listen on all available interfaces. Family may be set to either socket.AF_INET or socket.AF_INET6 to restrict to ipv4 or ipv6 addresses, otherwise both will be used if available. The ``backlog`` argument has the same meaning as for ``socket.listen()``. """ sockets = [] if address == "": address = None flags = socket.AI_PASSIVE if hasattr(socket, "AI_ADDRCONFIG"): # AI_ADDRCONFIG ensures that we only try to bind on ipv6 # if the system is configured for it, but the flag doesn't # exist on some platforms (specifically WinXP, although # newer versions of windows have it) flags |= socket.AI_ADDRCONFIG for res in socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags): af, socktype, proto, canonname, sockaddr = res sock = socket.socket(af, socktype, proto) set_close_exec(sock.fileno()) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) sock.bind(sockaddr) sock.listen(backlog) sockets.append(sock) return sockets