我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用socket.IPV6_JOIN_GROUP。
def setup_ipv6_multicast_socket(ifaddrs, if_name, addr): #todo: if_name ignored s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", Config.udp_multicast.port)) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, Config.udp_multicast.ttl) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0) mreq = struct.pack("16s16s", socket.inet_pton(socket.AF_INET6,addr), chr(0)*16) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) s.setblocking(0) multicast_socket_ipv6.append((s,addr)) return True
def join_ipv6_beacon_group(sock, ifindex): """Joins the MAAS IPv6 multicast group using the specified UDP socket. :param sock: An opened IPv6 UDP socket. :param ifindex: The interface index that should join the multicast group. """ # XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet. # It would be nice to do this: # transport.joinGroup(BEACON_IPV6_MULTICAST) ipv6_join_sockopt_args = ( socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) + struct.pack("I", ifindex) ) try: sock.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, ipv6_join_sockopt_args) except OSError: # Do this on a best-effort basis. We might get an "Address already in # use" error if the group is already joined, or (for whatever reason) # it is not possible to join a multicast group using this interface. pass
def receiver(group): # Look up multicast group address in name server and find out IP version addrinfo = socket.getaddrinfo(group, None)[0] # Create a socket s = socket.socket(addrinfo[0], socket.SOCK_DGRAM) # Allow multiple copies of this program on one machine # (not strictly needed) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind it to the port s.bind(('', MYPORT)) group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0]) # Join group if addrinfo[0] == socket.AF_INET: # IPv4 mreq = group_bin + struct.pack('=I', socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) else: mreq = group_bin + struct.pack('@I', 0) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) # Loop, printing any data we receive while True: data, sender = s.recvfrom(1500) while data[-1:] == '\0': data = data[:-1] # Strip trailing \0's print (str(sender) + ' ' + repr(data))
def __init__(self, clientId, port, addr): self.PORT = port # 26000 self.ADDR = addr # 'ff01::1' #IPV6 Multicast Address self.clientId = clientId self.closing = False addrInfo = socket.getaddrinfo(self.ADDR, None)[0] self.socket = socket.socket(addrInfo[0], socket.SOCK_DGRAM) self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(('', self.PORT)) #Join Multicast grp. group = socket.inet_pton(addrInfo[0], addrInfo[4][0]) mreq = group + struct.pack('@I', 0) self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) self.__registeredBusInterfaces = dict() self.subPatterns = dict() self.subPatternLock = Lock() self.requestQueues = dict() self.inboxLock = Lock() self.inbox = [] if not os.name == 'nt': r, w = os.pipe() self.sigKill = os.fdopen(w, 'w') self.isKilled = os.fdopen(r, 'r')