我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.IPPROTO_IPV6。
def __init__(self, *args, **kwargs): ipv6 = kwargs.pop("ipv6", False) if ipv6: self.address_family = socket.AF_INET6 kwargs["bind_and_activate"] = False else: self.address_family = socket.AF_INET socketserver.TCPServer.__init__(self, *args, **kwargs) if ipv6: # pylint: disable=no-member self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) try: self.server_bind() self.server_activate() except: self.server_close() raise
def init_func(self, creator, address, crypto, crypto_configs, conn_timeout=800, is_ipv6=False): self.__crypto_configs = crypto_configs self.__crypto = crypto self.__conn_timeout = conn_timeout if is_ipv6: fa = socket.AF_INET6 else: fa = socket.AF_INET s = socket.socket(fa, socket.SOCK_STREAM) if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.set_socket(s) self.bind(address) self.listen(10) self.register(self.fileno) self.add_evt_read(self.fileno) return self.fileno
def init_func(self, creator, address, crypto, crypto_configs, is_ipv6=False): if is_ipv6: fa = socket.AF_INET6 else: fa = socket.AF_INET s = socket.socket(fa, socket.SOCK_DGRAM) if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) self.set_socket(s) self.bind(address) self.register(self.fileno) self.add_evt_read(self.fileno) self.__encrypt = crypto.encrypt() self.__decrypt = crypto.decrypt() self.__encrypt.config(crypto_configs) self.__decrypt.config(crypto_configs) return self.fileno
def search(timeout): ''' Search for devices implementing WANCommonInterfaceConfig on the network. Search ends the specified number of seconds after the last result (if any) was received. Returns an iterator of root device URLs. ''' with contextlib.ExitStack() as stack: sockets = [] sockets.append(stack.enter_context(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP))) sockets.append(stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP))) for s in sockets: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if s.family == socket.AF_INET6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) with concurrent.futures.ThreadPoolExecutor(len(sockets)) as ex: return itertools.chain.from_iterable(ex.map(lambda s: search_socket(s, timeout, ns['i']), sockets))
def bind(self, family, type, proto=0): """Create (or recreate) the actual socket object.""" self.socket = socket.socket(family, type, proto) prevent_socket_inheritance(self.socket) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if self.nodelay and not isinstance(self.bind_addr, str): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.ssl_adapter is not None: self.socket = self.ssl_adapter.bind(self.socket) # If listening on the IPV6 any address ('::' = IN6ADDR_ANY), # activate dual-stack. See https://bitbucket.org/cherrypy/cherrypy/issue/871. if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')): try: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except (AttributeError, socket.error): # Apparently, the socket option is not available in # this machine's TCP stack pass self.socket.bind(self.bind_addr)
def sender(group): addrinfo = socket.getaddrinfo(group, None)[0] s = socket.socket(addrinfo[0], socket.SOCK_DGRAM) # Set Time-to-live (optional) ttl_bin = struct.pack('@i', MYTTL) if addrinfo[0] == socket.AF_INET: # IPv4 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) else: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) while True: data = repr(time.time()) s.sendto(data + '\0', (addrinfo[4][0], MYPORT)) time.sleep(1)
def bind(self, family, type, proto=0): """Create (or recreate) the actual socket object.""" self.socket = socket.socket(family, type, proto) prevent_socket_inheritance(self.socket) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if self.nodelay and not isinstance(self.bind_addr, str): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.ssl_adapter is not None: self.socket = self.ssl_adapter.bind(self.socket) # If listening on the IPV6 any address ('::' = IN6ADDR_ANY), # activate dual-stack. See http://www.cherrypy.org/ticket/871. if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')): try: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except (AttributeError, socket.error): # Apparently, the socket option is not available in # this machine's TCP stack pass self.socket.bind(self.bind_addr)
def _testOddCmsgSize(self): self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) try: nbytes = self.sendmsgToServer( [MSG], [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, array.array("i", [self.traffic_class]).tobytes() + b"\x00"), (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, array.array("i", [self.hop_limit]))]) except socket.error as e: self.assertIsInstance(e.errno, int) nbytes = self.sendmsgToServer( [MSG], [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, array.array("i", [self.traffic_class])), (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, array.array("i", [self.hop_limit]))]) self.assertEqual(nbytes, len(MSG)) # Tests for proper handling of truncated ancillary data
def testSingleCmsgTruncInData(self): # Test truncation of a control message inside its associated # data. The message may be returned with its data truncated, # or not returned at all. self.serv_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVHOPLIMIT, 1) self.misc_event.set() msg, ancdata, flags, addr = self.doRecvmsg( self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) self.assertEqual(msg, MSG) self.checkRecvmsgAddress(addr, self.cli_addr) self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) self.assertLessEqual(len(ancdata), 1) if ancdata: cmsg_level, cmsg_type, cmsg_data = ancdata[0] self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) self.assertLess(len(cmsg_data), SIZEOF_INT)
def setup_ipv6_multicast_socket(ifaddrs, if_name, addr): #todo: if_name ignored s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", Config.udp_multicast.port)) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, Config.udp_multicast.ttl) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0) mreq = struct.pack("16s16s", socket.inet_pton(socket.AF_INET6,addr), chr(0)*16) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) s.setblocking(0) multicast_socket_ipv6.append((s,addr)) return True
def bind(self, family, type, proto=0): """Create (or recreate) the actual socket object.""" self.socket = socket.socket(family, type, proto) prevent_socket_inheritance(self.socket) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if self.nodelay and not isinstance(self.bind_addr, str): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.ssl_adapter is not None: self.socket = self.ssl_adapter.bind(self.socket) # If listening on the IPV6 any address ('::' = IN6ADDR_ANY), # activate dual-stack. See # https://github.com/cherrypy/cherrypy/issues/871. if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')): try: self.socket.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except (AttributeError, socket.error): # Apparently, the socket option is not available in # this machine's TCP stack pass self.socket.bind(self.bind_addr)
def _testOddCmsgSize(self): self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) try: nbytes = self.sendmsgToServer( [MSG], [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, array.array("i", [self.traffic_class]).tobytes() + b"\x00"), (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, array.array("i", [self.hop_limit]))]) except OSError as e: self.assertIsInstance(e.errno, int) nbytes = self.sendmsgToServer( [MSG], [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, array.array("i", [self.traffic_class])), (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, array.array("i", [self.hop_limit]))]) self.assertEqual(nbytes, len(MSG)) # Tests for proper handling of truncated ancillary data
def __init__(self, handlers, addrinfo): self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) asyncore.dispatcher.__init__(self) self.handlers = handlers try: af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612 self.create_socket(af, socktype) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except AttributeError: pass if have_ipv6 and af == socket.AF_INET6: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) self.bind(sockaddr) self.listen(5) except Exception: self.logger.exception("Couldn't set up HTTP listener") self.close() for h in handlers: self.logger.debug("Handling %s", h[0])
def join_ipv6_beacon_group(sock, ifindex): """Joins the MAAS IPv6 multicast group using the specified UDP socket. :param sock: An opened IPv6 UDP socket. :param ifindex: The interface index that should join the multicast group. """ # XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet. # It would be nice to do this: # transport.joinGroup(BEACON_IPV6_MULTICAST) ipv6_join_sockopt_args = ( socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) + struct.pack("I", ifindex) ) try: sock.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, ipv6_join_sockopt_args) except OSError: # Do this on a best-effort basis. We might get an "Address already in # use" error if the group is already joined, or (for whatever reason) # it is not possible to join a multicast group using this interface. pass
def discover_peers(self, port=None): """This method can be invoked (periodically?) to broadcast message to discover peers, if there is a chance initial broadcast message may be lost (as these messages are sent over UDP). """ ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__} def _discover(addrinfo, port, task=None): ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) ping_sock.settimeout(2) if addrinfo.family == socket.AF_INET: ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) else: # addrinfo.family == socket.AF_INET6 ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, struct.pack('@i', 1)) ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn) ping_sock.bind((addrinfo.ip, 0)) if not port: port = addrinfo.udp_sock.getsockname()[1] ping_msg['location'] = addrinfo.location try: yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg), (addrinfo.broadcast, port)) except: pass ping_sock.close() for addrinfo in self._addrinfos: SysTask(_discover, addrinfo, port)
def _setup_socket(self): self.socket = socket.socket(self.address_family, self.socket_type) if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if not self.ipv6_only: try: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except (AttributeError, socket.error) as e: self.log.debug("Unable to set IPV6_V6ONLY to false %s", e) self.server_bind() self.server_activate()
def bind6(self, addr, port, tos, ttl): log.debug( "bind6(addr=%s, port=%d, tos=%d, ttl=%d)", addr, port, tos, ttl) self.socket = socket.socket( socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, tos) self.socket.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_UNICAST_HOPS, ttl) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((addr, port)) log.info("Wait to receive test packets on [%s]:%d", addr, port)
def connect6(self, server="", port=862, tos=0x88): self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, tos) self.socket.connect((server, port))
def _recv(self, p, x=MTU): if p is None: return p elif isinstance(p, IP): # TODO: verify checksum if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: if isinstance(p.payload, IPv6): return p.payload return p
def send(self, x): return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6)/x) ############################################################################# ############################################################################# ### Layers binding ### ############################################################################# #############################################################################
def init_func(self, creator, address, debug=False, server_side=False, is_ipv6=False): if is_ipv6: fa = socket.AF_INET6 else: fa = socket.AF_INET self.__is_ipv6 = is_ipv6 s = socket.socket(fa, socket.SOCK_DGRAM) if server_side and is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) self.set_socket(s) self.__server_side = server_side if server_side: self.bind((address, 53)) else: self.connect((address, 53)) self.__debug = debug self.__host_match = host_match self.__timer = timer.timer() self.__host_match = host_match.host_match() self.set_timeout(self.fileno, self.__LOOP_TIMEOUT) self.register(self.fileno) self.add_evt_read(self.fileno) return self.fileno
def init_func(self, creator, listen, is_ipv6=False): if is_ipv6: fa = socket.AF_INET6 else: fa = socket.AF_INET s = socket.socket(fa, socket.SOCK_STREAM) if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.set_socket(s) self.bind(listen) return self.fileno
def bind(family, type, proto): """Create (or recreate) the actual socket object.""" sock = socket.socket(family, type, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) # If listening on IPv6, activate dual-stack. if family == socket.AF_INET6: sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) return sock
def server_bind(self): self.socket.setsockopt(socket.IPPROTO_IP, 15, 1) # IP_FREEBIND if self.__bind_v6only is not None and self.address_family == socket.AF_INET6: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, self.__bind_v6only) super().server_bind()