我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用socket.IPV6_MULTICAST_HOPS。
def sender(group): addrinfo = socket.getaddrinfo(group, None)[0] s = socket.socket(addrinfo[0], socket.SOCK_DGRAM) # Set Time-to-live (optional) ttl_bin = struct.pack('@i', MYTTL) if addrinfo[0] == socket.AF_INET: # IPv4 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) else: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) while True: data = repr(time.time()) s.sendto(data + '\0', (addrinfo[4][0], MYPORT)) time.sleep(1)
def setup_ipv6_multicast_socket(ifaddrs, if_name, addr): #todo: if_name ignored s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", Config.udp_multicast.port)) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, Config.udp_multicast.ttl) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0) mreq = struct.pack("16s16s", socket.inet_pton(socket.AF_INET6,addr), chr(0)*16) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) s.setblocking(0) multicast_socket_ipv6.append((s,addr)) return True
def discover_peers(self, port=None): """This method can be invoked (periodically?) to broadcast message to discover peers, if there is a chance initial broadcast message may be lost (as these messages are sent over UDP). """ ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__} def _discover(addrinfo, port, task=None): ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) ping_sock.settimeout(2) if addrinfo.family == socket.AF_INET: ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) else: # addrinfo.family == socket.AF_INET6 ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, struct.pack('@i', 1)) ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn) ping_sock.bind((addrinfo.ip, 0)) if not port: port = addrinfo.udp_sock.getsockname()[1] ping_msg['location'] = addrinfo.location try: yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg), (addrinfo.broadcast, port)) except: pass ping_sock.close() for addrinfo in self._addrinfos: SysTask(_discover, addrinfo, port)
def _try_mcsock(ifi): """Internal use only""" #################################################### # Service function for _make_mcssock etc. # #################################################### mcssock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) mcssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mcssock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, struct.pack('@I', ifi)) if not listen_self: #don't listen to yourself talking mcssock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0) mcssock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1) return mcssock
def set_ipv6_multicast_source_ifindex(sock, ifindex): """Sets the given socket up to send multicast from the specified source. Ensures the multicast hop limit is set to 1, so that packets are not forwarded beyond the local link. :param sock: An opened IP socket. :param ifindex: An integer representing the interface index. """ sock.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1) packed_ifindex = struct.pack('I', ifindex) sock.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, packed_ifindex)
def __init__(self, clientId, port, addr): self.PORT = port # 26000 self.ADDR = addr # 'ff01::1' #IPV6 Multicast Address self.clientId = clientId self.closing = False addrInfo = socket.getaddrinfo(self.ADDR, None)[0] self.socket = socket.socket(addrInfo[0], socket.SOCK_DGRAM) self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(('', self.PORT)) #Join Multicast grp. group = socket.inet_pton(addrInfo[0], addrInfo[4][0]) mreq = group + struct.pack('@I', 0) self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) self.__registeredBusInterfaces = dict() self.subPatterns = dict() self.subPatternLock = Lock() self.requestQueues = dict() self.inboxLock = Lock() self.inbox = [] if not os.name == 'nt': r, w = os.pipe() self.sigKill = os.fdopen(w, 'w') self.isKilled = os.fdopen(r, 'r')