我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.AF_UNSPEC。
def main(): application = get_application() port = get_port() if CONFIG.IPV4_ONLY: family = socket.AF_INET else: family = socket.AF_UNSPEC if CONFIG.DEBUG_MODE: address = None else: address = 'localhost' sockets = tornado.netutil.bind_sockets(port, address, family=family) server = HTTPServer(application, xheaders=CONFIG.XHEADERS) server.add_sockets(sockets) server.listen_exit_signal(CONFIG.MAX_WAIT_SECONDS_BEFORE_SHUTDOWN) tornado.ioloop.IOLoop.instance().start()
def __init__(self, hosts): """ The `hosts` parameter should be a sequence of hosts to permit connections to. """ msg = ('WhiteListRoundRobinPolicy is deprecated. ' 'It will be removed in 4.0. ' 'It can effectively be reimplemented using HostFilterPolicy.') warn(msg, DeprecationWarning) # DeprecationWarnings are silent by default so we also log the message log.warning(msg) self._allowed_hosts = hosts self._allowed_hosts_resolved = [endpoint[4][0] for a in self._allowed_hosts for endpoint in socket.getaddrinfo(a, None, socket.AF_UNSPEC, socket.SOCK_STREAM)] RoundRobinPolicy.__init__(self)
def select_ip_version(host, port): """Returns AF_INET4 or AF_INET6 depending on where to connect to.""" # disabled due to problems with current ipv6 implementations # and various operating systems. Probably this code also is # not supposed to work, but I can't come up with any other # ways to implement this. # try: # info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, # socket.SOCK_STREAM, 0, # socket.AI_PASSIVE) # if info: # return info[0][0] # except socket.gaierror: # pass if ':' in host and hasattr(socket, 'AF_INET6'): return socket.AF_INET6 return socket.AF_INET
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None, max_buffer_size=None): """Connect to the given host and port. Asynchronously returns an `.IOStream` (or `.SSLIOStream` if ``ssl_options`` is not None). """ addrinfo = yield self.resolver.resolve(host, port, af) connector = _Connector( addrinfo, self.io_loop, functools.partial(self._create_stream, max_buffer_size)) af, addr, stream = yield connector.start() # TODO: For better performance we could cache the (af, addr) # information here and re-use it on subsequent connections to # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) if ssl_options is not None: stream = yield stream.start_tls(False, ssl_options=ssl_options, server_hostname=host) raise gen.Return(stream)
def is_valid_ip(ip): """Returns true if the given string is a well-formed IP address. Supports IPv4 and IPv6. """ if not ip or '\x00' in ip: # getaddrinfo resolves empty strings to localhost, and truncates # on zero bytes. return False try: res = socket.getaddrinfo(ip, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_NUMERICHOST) return bool(res) except socket.gaierror as e: if e.args[0] == socket.EAI_NONAME: return False raise return True
def _load_from_socket(port, serializer): sock = None # Support for both IPv4 and IPv6. # On most of IPv6-ready systems, IPv6 will take precedence. for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res sock = socket.socket(af, socktype, proto) try: sock.settimeout(3) sock.connect(sa) except socket.error: sock.close() sock = None continue break if not sock: raise Exception("could not open socket") try: rf = sock.makefile("rb", 65536) for item in serializer.load_stream(rf): yield item finally: sock.close()
def select_ip_version(host, port): """Returns AF_INET4 or AF_INET6 depending on where to connect to.""" # disabled due to problems with current ipv6 implementations # and various operating systems. Probably this code also is # not supposed to work, but I can't come up with any other # ways to implement this. ##try: ## info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, ## socket.SOCK_STREAM, 0, ## socket.AI_PASSIVE) ## if info: ## return info[0][0] ##except socket.gaierror: ## pass if ':' in host and hasattr(socket, 'AF_INET6'): return socket.AF_INET6 return socket.AF_INET
def ipv6_echo_client(port, host=HOST): for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: sock = socket.socket(af, socktype, proto) except socket.error as err: print ("Error:%s" %err) try: sock.connect(sa) except socket.error as msg: sock.close() continue if sock is None: print ('Failed to open socket!') sys.exit(1) msg = "Hello from ipv6 client" print ("Send data to server: %s" %msg) sock.send(bytes(msg.encode('utf-8'))) while True: data = sock.recv(BUFSIZE) print ('Received from server', repr(data)) if not data: break sock.close()
def _connect_socket(self, host, port): sock = None error = None for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP): af, socktype, proto, canonname, sa = res try: sock = socket.socket(af, socktype, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.connect(sa) return sock except Exception as e: error = e if sock is not None: sock.close() break if error is not None: raise ConnectionException("connection failed: {}".format(error)) else: raise ConnectionException("no suitable socket")
def allowed_gai_family(): """This function is designed to work in the context of getaddrinfo, where family=socket.AF_UNSPEC is the default and will perform a DNS search for both IPv6 and IPv4 records.""" family = socket.AF_INET if HAS_IPV6: family = socket.AF_UNSPEC return family
def _write_SOCKS5_address(self, addr, file): """ Return the host and port packed for the SOCKS5 protocol, and the resolved address as a tuple object. """ host, port = addr proxy_type, _, _, rdns, username, password = self.proxy family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"} # If the given destination address is an IP address, we'll # use the IP address request even if remote resolving was specified. # Detect whether the address is IPv4/6 directly. for family in (socket.AF_INET, socket.AF_INET6): try: addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port except socket.error: continue # Well it's not an IP number, so it's probably a DNS name. if rdns: # Resolve remotely host_bytes = host.encode("idna") file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: # Resolve locally addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG) # We can't really work out what IP is reachable, so just pick the # first. target_addr = addresses[0] family = target_addr[0] host = target_addr[4][0] addr_bytes = socket.inet_pton(family, host) file.write(family_to_byte[family] + addr_bytes) host = socket.inet_ntop(family, addr_bytes) file.write(struct.pack(">H", port)) return host, port
def test_error_multiple(self): if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2: raise unittest.SkipTest('localhost only resolves one address') cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043, connect_timeout=10, protocol_version=PROTOCOL_VERSION) self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error', cluster.connect)