我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.IP_MULTICAST_TTL。
def setup_sockets(self): self.sockets = {} ip_addresses = get_interface_addresses(self.logger) self.ip_addresses = ip_addresses multi_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) multi_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self.ttl) for ip in ip_addresses: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mreq=socket.inet_aton(self.address)+socket.inet_aton(ip) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.logger.info("Regestering multicast for: %s: %s"%(self.address, ip)) sock.bind((ip, self.port)) self.sockets[ip] = sock multi_sock.bind(("", self.port)) self.socks = [self.sockets[x] for x in self.sockets.keys()] self.multi_sock = multi_sock
def __init__(self, playerCount): """ Initialisierung eines GameLogic Parameter: int die maximale Spieleranzahl Rückgabewerte: - """ threading.Thread.__init__(self) self.deamon = True self.queue = Queue.Queue(0) self.playerCount = playerCount self.playerConnectedCount = 0 self.players = [] self.weaponPositions = [] self.possibleWeaponPositions = ["140_110", "490_110", "420_300", "220_300", "060_300", "600_300", "420_550", "220_550", "090_490", "550_480", "600_170", "600_040", "350_050", "290_040", "050_180"] self.playerPositions = [] self.game = game.Game(self) self.MCAST_GRP = '224.1.1.1' self.MCAST_PORT = 5000 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
def sender(group): addrinfo = socket.getaddrinfo(group, None)[0] s = socket.socket(addrinfo[0], socket.SOCK_DGRAM) # Set Time-to-live (optional) ttl_bin = struct.pack('@i', MYTTL) if addrinfo[0] == socket.AF_INET: # IPv4 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) else: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) while True: data = repr(time.time()) s.sendto(data + '\0', (addrinfo[4][0], MYPORT)) time.sleep(1)
def connection_made(self, transport): """ Protocol connection made """ if self._verbose: print('Connection made') self._transport = transport sock = self._transport.get_extra_info('socket') # sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(2) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.bind(('', self._upnp.ssdp_port)) tmpl = ('M-SEARCH * HTTP/1.1', 'Host: ' + self._upnp.ssdp_host + ':' + str(self._upnp.ssdp_port), 'Man: "ssdp:discover"', 'ST: {}'.format(self._search_target), # 'ST: ssdp:all', 'MX: 3', '', '') msg = "\r\n".join(tmpl).encode('ascii') self._transport.sendto(msg, (self._upnp.ssdp_host, self._upnp.ssdp_port))
def run(self): try: PLUGIN.broadcasterLogger.debug("Broadcaster.run called") sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) start_time = time.time() end_time = start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60) PLUGIN.broadcasterLogger.debug("Broadcaster.run: sending first broadcast:\n{}".format(self.broadcast_packet)) while True: sock.sendto(self.broadcast_packet, (BCAST_IP, UPNP_PORT)) for x in range(BROADCAST_INTERVAL): time.sleep(1) # Following code will only time out the Broadcaster Thread if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] > 0 (valid values 0 thru 10 inclusive) # A value of zero means 'always on' if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > end_time: PLUGIN.broadcasterLogger.debug("Broadcaster thread timed out") self.stop() if self.interrupted: PLUGIN.setDeviceDiscoveryState(False, self.ahbDevId) sock.close() return except StandardError, e: PLUGIN.broadcasterLogger.error(u"StandardError detected in Broadcaster.Run for '{}'. Line '{}' has error='{}'".format(indigo.devices[self.ahbDevId].name, sys.exc_traceback.tb_lineno, e))
def setup_ipv4_multicast_socket(ifaddrs, if_name, addr): #todo: if_name ignored s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", Config.udp_multicast.port)) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, Config.udp_multicast.ttl) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0) mreq = struct.pack("4sl", socket.inet_aton(addr), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) s.setblocking(0) multicast_socket_ipv4.append((s,addr)) return True
def runAttack(self, targetList): # targetList = [{'ipaddress:':'127.0.0.1', 'port':53, 'payload':'blablabla'] for counter in range(0, self.replayCount): for port in self.bindPorts: requestPool = Pool(processes=self.maxProcesses) sharedSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sharedSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sharedSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #sharedSocket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64) try: sharedSocket.bind(('', port)) print("Sending packets from port %s" % port) taskList = [(sharedSocket, targetParams.get('ipaddress'), targetParams.get('port'), targetParams.get('payload')) for targetParams in targetList] results = requestPool.starmap(asyncSendPayload, taskList) except Exception as e: print("Failed binding port %s: %s" % (port, e)) print("Closing process pool") requestPool.close() print("Joining process pool") requestPool.join() print("Closing shared socket") sharedSocket.close() return True
def discover(self): service = "ssdp:all" group = ("239.255.255.250", 1900) message = "\r\n".join([ 'M-SEARCH * HTTP/1.1', 'HOST: {0}:{1}', 'MAN: "ssdp:discover"', 'ST: {st}','MX: 3','','']) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.settimeout(5) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.sendto(message.format(*group, st=service), group) while True: try: response = SSDPResponse(sock.recv(1024)) if response.type == SSDPResponse.ST_ROOT_DEVICE: pass elif response.type == SSDPResponse.ST_DEVICE: device = Device.fromSSDPResponse(response) self.devices[response.uuid] = device except socket.timeout: break self.__discoveryDone()
def _startSSDPNotifier(addr): msgHeader = [ 'NOTIFY * HTTP/1.1\r\n', 'HOST: 239.255.255.250:1900\r\n', "NTS: ssdp:alive\r\n" ] msg = ''.join(msgHeader) + gen_ssdp_content(addr, 'NT') while True: print "Sending M-NOTIFY..." sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) sock.sendto(msg, (ssdp_addr, ssdp_port)) sock.close() del sock time.sleep(6)
def discover(service, timeout=5, retries=1, mx=3): group = ("239.255.255.250", 1900) message = "\r\n".join([ 'M-SEARCH * HTTP/1.1', 'HOST: {0}:{1}', 'MAN: "ssdp:discover"', 'ST: {st}', 'MX: {mx}', '', '']) socket.setdefaulttimeout(timeout) responses = {} for _ in range(retries): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8') sock.sendto(message_bytes, group) while True: try: response = SSDPResponse(sock.recv(1024)) responses[response.location] = response except socket.timeout: break return list(responses.values())
def make_data_sender_socket(ip_address=None): """Create a socket for sending multicast data.""" ip_address = gethostip() if not ip_address else ip_address datasock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) datasock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) datasock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) datasock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # print("Binding source socket to %s" % ip_address) datasock.bind((ip_address, 0)) return datasock
def getTTL(self): return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
def setTTL(self, ttl): ttl = struct.pack("B", ttl) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
def __init__(self, thisNode): logging.info('Initializing listener') self.thisNode = thisNode self.MCAST_PORT = 4242 self.MCAST_GRP = '192.168.1.255' self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind(('', self.MCAST_PORT))
def discover(self): ws = None usn = None apiV = None srv = None req = ('M-SEARCH * HTTP/1.1\r\n' + 'MX: 10\r\n' + 'HOST: 239.255.255.250:1900\r\n' + 'MAN: \"ssdp:discover\"\r\n' + 'NT: panono:ball-camera\r\n' + '\r\n') ws = None sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.settimeout(7) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) mcast = struct.pack('4sL', socket.inet_aton('239.255.255.250') , socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mcast) sock.bind(('', 1900)) try: sock.sendto( req.encode(), ('239.255.255.250', 1900)) except socket.error as e: print(e) return (None, None, None) for _ in range(5): try: data, addr = sock.recvfrom(1024) if not data: continue ws = ssdpNotify().getLocation(data) if ws is None: continue usn = ssdpNotify().getUsn(data) apiV = ssdpNotify().getApiVersion(data) srv = ssdpNotify().getSrv(data) break except socket.error as e: print(e) break sock.close() return (ws, usn, apiV, srv)
def receiver(service='mihome'): from plugins import gateway assert service in MULTICAST, 'No such service' store = get_store() address, port = MULTICAST.get(service) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("0.0.0.0", port)) mreq = struct.pack("=4sl", socket.inet_aton(address), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) current = {} while True: data, _ = sock.recvfrom(SOCKET_BUFSIZE) # buffer size is 1024 bytes print(datetime.now().isoformat(), data) if service == 'mihome': message = json.loads(data.decode()) data = json.loads(message['data']) if message.get('model') in ('sensor_ht', 'weather.v1') and not sensor_ht.process(conn, cursor, current, message, data): continue elif message.get('model') == 'magnet': magnet.process(store, message, data) elif message.get('model') == 'gateway': gateway.process(store, message, data) current = {} elif service == 'yeelight': yeelight.process(data.decode())
def discover(): from mihome import MULTICAST, SOCKET_BUFSIZE # Empty stored list store.delete(STORE_KEY) address, port = MULTICAST.get('yeelight') yee_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) yee_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) yee_socket.sendto('\r\n'.join(YEE_DISCOVER).encode(), (address, port)) while True: data, _ = yee_socket.recvfrom(SOCKET_BUFSIZE) add_device(get_data(data.decode()))
def send(self, uid, msg=""): addrinfo = socket.getaddrinfo(self.group, None)[0] s = socket.socket(addrinfo[0], socket.SOCK_DGRAM) # Set Time-to-live (optional) ttl_bin = struct.pack('@i', self.ttl) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) data = uid + ":" + msg s.sendto((data + '\0').encode(), (addrinfo[4][0], self.port))
def server_bind(self): MADDR = "224.0.0.252" self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255) Join = self.socket.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,socket.inet_aton(MADDR) + settings.Config.IP_aton) if OsInterfaceIsSupported(): try: self.socket.setsockopt(socket.SOL_SOCKET, 25, settings.Config.Bind_To+'\0') except: pass UDPServer.server_bind(self)
def server_bind(self): MADDR = "224.0.0.251" self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255) Join = self.socket.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP, socket.inet_aton(MADDR) + settings.Config.IP_aton) if OsInterfaceIsSupported(): try: self.socket.setsockopt(socket.SOL_SOCKET, 25, settings.Config.Bind_To+'\0') except: pass UDPServer.server_bind(self)
def __init__(self): self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.settimeout(3.0) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(('', 0))
def discover_MediaPlayer(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 4) s.bind(("", SSDP_BROADCAST_PORT + 20)) s.sendto(SSDP_BROADCAST_MSG.encode("UTF-8"), (SSDP_BROADCAST_ADDR, SSDP_BROADCAST_PORT)) s.settimeout(5.0) devices = [] while True: try: data, addr = s.recvfrom(self.SOCKET_BUFSIZE) if len(data) is None: continue except socket.timeout: break try: info = [a.split(":", 1) for a in data.decode("UTF-8").split("\r\n")[1:]] device = dict([(a[0].strip().lower(), a[1].strip()) for a in info if len(a) >= 2]) devices.append(device) except: pass devices_urls = [device["location"] for device in devices if "AVTransport" in device["st"]] devices_urls = list(set(devices_urls)) devices = [] for location_url in devices_urls: device = self.register_device(location_url) if device != None: devices.append(device) return devices
def __enter__(self): # Set up socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 10) if self.local_port: # Bind the socket to a local interface self.sock.bind(('', self.local_port)) return self
def create_multicast_sock(own_ip, remote_addr, bind_to_multicast_addr): """Create UDP multicast socket.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(False) sock.setsockopt( socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(own_ip)) sock.setsockopt( socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(remote_addr[0]) + socket.inet_aton(own_ip)) sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(own_ip)) # I have no idea why we have to use different bind calls here # - bind() with multicast addr does not work with gateway search requests # on some machines. It only works if called with own ip. # - bind() with own_ip does not work with ROUTING_INDICATIONS on Gira # knx router - for an unknown reason. if bind_to_multicast_addr: sock.bind((remote_addr[0], remote_addr[1])) else: sock.bind((own_ip, 0)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0) return sock
def Connect(self): self.socket_mode = UDP_MODE self.mac = uuid.getnode() # Set up UDP receiver. self.udp_rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_rx_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Pack multicast group structure correctly. mreq = struct.pack('=4sl', socket.inet_aton(MCAST_GRP),socket.INADDR_ANY) # Request access to multicast group. self.udp_rx_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) # Bind to all intfs. self.udp_rx_sock.bind(('', MCAST_PORT)) self.udp_rx_sock.settimeout(TIMEOUT) # Set up UDP transmitter. self.udp_tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_tx_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255) # Get the MAC address of the local adapter. msg = bytearray(8) struct.pack_into('<Q', msg, 0, int(self.mac)) self.local_mac = ''.join('{:02x}'.format(x) for x in msg[0:6]) logging.debug('MAC Addr: %s', self.local_mac)
def discover(service, timeout=5, retries=1, mx=3): group = ("239.255.255.250", 1900) message = "\r\n".join([ 'M-SEARCH * HTTP/1.1', 'HOST: {0}:{1}', 'MAN: "ssdp:discover"', 'ST: {st}','MX: {mx}','','']) socket.setdefaulttimeout(timeout) responses = {} for _ in range(retries): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8') sock.sendto(message_bytes, group) while True: try: response = SSDPResponse(sock.recv(1024)) responses[response.location] = response except socket.timeout: break return list(responses.values()) # Example for upnp service: # import ssdp # ssdp.discover("upnp:rootdevice")
def server_bind(self): MADDR = "224.0.0.251" self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255) Join = self.socket.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,inet_aton(MADDR)+inet_aton(OURIP)) if OsInterfaceIsSupported(INTERFACE): try: self.socket.setsockopt(socket.SOL_SOCKET, 25, BIND_TO_Interface+'\0') except: pass UDPServer.server_bind(self)
def server_bind(self): MADDR = "224.0.0.252" self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255) Join = self.socket.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,inet_aton(MADDR)+inet_aton(OURIP)) if OsInterfaceIsSupported(INTERFACE): try: self.socket.setsockopt(socket.SOL_SOCKET, 25, BIND_TO_Interface+'\0') except: pass UDPServer.server_bind(self)
def createUdpSocket(discoverFlags): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 31 if (discoverFlags & SSDP_DISCOVERF_USE_TTL_31) else 1) return sock # # prepares a UDP socket for multicast receives #
def _open_multicast_socket(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) return sock
def discover_bulbs(timeout=2): """ Discover all the bulbs in the local network. :param int timeout: How many seconds to wait for replies. Discovery will always take exactly this long to run, as it can't know when all the bulbs have finished responding. :returns: A list of dictionaries, containing the ip, port and capabilities of each of the bulbs in the network. """ msg = 'M-SEARCH * HTTP/1.1\r\n' \ 'ST:wifi_bulb\r\n' \ 'MAN:"ssdp:discover"\r\n' # Set up UDP socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) s.settimeout(timeout) s.sendto(msg.encode(), ('239.255.255.250', 1982)) bulbs = [] bulb_ips = set() while True: try: data, addr = s.recvfrom(65507) except socket.timeout: break capabilities = dict([x.strip("\r").split(": ") for x in data.decode().split("\n") if ":" in x]) parsed_url = urlparse(capabilities["Location"]) bulb_ip = (parsed_url.hostname, parsed_url.port) if bulb_ip in bulb_ips: continue capabilities = {key: value for key, value in capabilities.items() if key.islower()} bulbs.append({"ip": bulb_ip[0], "port": bulb_ip[1], "capabilities": capabilities}) bulb_ips.add(bulb_ip) return bulbs
def set_ipv4_multicast_source_address(sock, source_address): """Sets the given socket up to send multicast from the specified source. Ensures the multicast TTL is set to 1, so that packets are not forwarded beyond the local link. :param sock: An opened IP socket. :param source_address: A string representing an IPv4 source address. """ sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(source_address))
def new_socket(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # SO_REUSEADDR should be equivalent to SO_REUSEPORT for # multicast UDP sockets (p 731, "TCP/IP Illustrated, # Volume 2"), but some BSD-derived systems require # SO_REUSEPORT to be specified explicity. Also, not all # versions of Python have SO_REUSEPORT available. # Catch OSError and socket.error for kernel versions <3.9 because lacking # SO_REUSEPORT support. try: reuseport = socket.SO_REUSEPORT except AttributeError: pass else: try: s.setsockopt(socket.SOL_SOCKET, reuseport, 1) except (OSError, socket.error) as err: # OSError on python 3, socket.error on python 2 if not err.errno == errno.ENOPROTOOPT: raise # OpenBSD needs the ttl and loop values for the IP_MULTICAST_TTL and # IP_MULTICAST_LOOP socket options as an unsigned char. ttl = struct.pack(b'B', 255) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) loop = struct.pack(b'B', 1) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, loop) s.bind(('', _MDNS_PORT)) return s