我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv4Address()。
def exeute(self, method, value): str_convert_type = ( six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address) try: result = getattr( self.typeclass(value, self.strict_level), method)() if method == "validate": result = "NOP [#f1]_" elif isinstance(result, str_convert_type): result = '``"{}"``'.format(result) except TypeError: result = "E [#f2]_" except typepy.TypeConversionError: result = "E [#f3]_" return result
def __init__(self, value): if not isinstance( value, ( ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network ) ): raise TypeError( "value must be an instance of ipaddress.IPv4Address, " "ipaddress.IPv6Address, ipaddress.IPv4Network, or " "ipaddress.IPv6Network" ) self._value = value
def isipv4(value): """ Return whether or not given value is an IP version 4. If the value is an IP version 4, this function returns ``True``, otherwise ``False``. Examples:: >>> isipv4('127.0.0.1') True >>> isipv4('::1') False :param value: string to validate IP version 4 """ try: ip_addr = ipaddress.IPv4Address(value) except ipaddress.AddressValueError: return False return ip_addr.version == 4
def ip(cls, value, raise_exception=False): """ IP address validation """ if cls.network(value): value = NetTest.convert.string.cidr(value) if value.endswith('/32'): value = value.split('/')[0] else: return False if not isinstance(value, basestring): if raise_exception: raise TypeError('Invalid type \'{}\''.format(type(value))) return False else: value = unicode(value) try: ipaddress.IPv4Address(value) except (ValueError, TypeError): if raise_exception: raise return False return True
def test_covering_cidr(ips): """ covering_cidr() gets the minimal CIDR that covers given IPs. In particular, that means any subnets should *not* cover all given IPs. """ cidr = telepresence.vpn.covering_cidr(ips) assert isinstance(cidr, str) cidr = ipaddress.IPv4Network(cidr) assert cidr.prefixlen <= 24 # All IPs in given CIDR: ips = [ipaddress.IPv4Address(i) for i in ips] assert all([ip in cidr for ip in ips]) # Subnets do not contain all IPs if we're not in minimum 24 bit CIDR: if cidr.prefixlen < 24: for subnet in cidr.subnets(): assert not all([ip in subnet for ip in ips])
def reinit_data(self): """ Subclass update loopback information """ self.loips = {} self.db_conn.connect(mibs.APPL_DB) loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*") if not loopbacks: return # collect only ipv4 interfaces for loopback in loopbacks: lostr = loopback.decode() loip = lostr[len("INTF_TABLE:lo:"):] ipa = ipaddress.ip_address(loip) if isinstance(ipa, ipaddress.IPv4Address): self.loips[loip] = ipa
def get_free_ip_list(self, net): # type: (oca.VirtualNetworkPool) -> list[str] """ Returns the set of free IP addresses in the given network :param net: oca.VirtualNetworkPool :return: a set of IPv4 addresses """ ip_list, used_ip_list = set(), set() for r in net.address_ranges: start_ip = int(IPv4Address(unicode(r.ip))) end_ip = start_ip + r.size for ip in range(start_ip, end_ip): ip_list.add(str(IPv4Address(ip))) for lease in r.leases: used_ip_list.add(lease.ip) free_ips = list(ip_list - used_ip_list) random.shuffle(free_ips) return free_ips
def parse_autoblock(data): # type: (str) -> set def _parse(item): # Try to parse item as a single IP try: ipaddress.IPv4Address(item) return {item} except ipaddress.AddressValueError: pass # Try parse item as ip range: ip1-ip2 try: first_ip, last_ip = [utils.ip2int(i) for i in item.split('-')] return {utils.int2ip(n) for n in range(first_ip, last_ip + 1)} except ValueError: raise APIError( 'Exclude IP\'s are expected to be in the form of ' '10.0.0.1,10.0.0.4 or 10.1.0.10-10.1.1.54 or both ' 'comma-separated') ip_sets = (_parse(unicode(d)) for d in data.split(',')) return reduce(operator.or_, ip_sets)
def ValidateIP(ip): """ Check if an IP is public or not. Public IP is any IP that is neither of: - private - loopback - broadcast / multicast - link-local Returns True on success and False on failure. Raises a ValueError exception if the input is not of type ipaddress.IPv4Address or ipaddress.IPv6Address. """ return __validateIP(ip)
def generate_packet(src, data): min_size = ethhdr.min_size + ip4hdr.min_size + udphdr.min_size packet = bytearray(len(data) + min_size) eth = ethhdr({'src': src, 'dst': pnet.HWAddress(u'ff:ff:ff:ff:ff:ff'), 'type': pnet.ETH_P_IP}, buf=packet) ip4 = ip4hdr({'dst': ipaddress.IPv4Address(u'255.255.255.255'), 'proto': socket.IPPROTO_UDP, 'len': len(data) + ip4hdr.min_size + udphdr.min_size}, buf=eth.payload) udp = udphdr({'sport': 68, 'dport': 67, 'len': len(data) + udphdr.min_size}, buf=ip4.payload) ip4['csum'] = pnet.checksum(ip4.tobytes()) udp['csum'] = pnet.ipv4_checksum(ip4, udp, data) udp.payload = data return packet
def test_socks_request_ipv4(self): self.layer.socks_conn = Mock() self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event) addr_future = self.layer.handle_request_and_create_destination( Request(REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"], u"127.0.0.1", self.port)) dest_stream, host, port = yield addr_future self.assertIsNotNone(self.event) self.assertIsInstance(self.event, Response) self.assertEqual(self.event.status, RESP_STATUS["SUCCESS"]) self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"]) self.assertIsNotNone(dest_stream) self.assertFalse(dest_stream.closed()) self.assertEqual(host, IPv4Address(u"127.0.0.1")) self.assertEqual(port, self.port) dest_stream.close()
def test_handle_connection_timeout(self): self.layer.socks_conn = Mock() self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event) socks_request = Request( REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"], u"1.2.3.4", self.port) self.layer.create_dest_stream = Mock( side_effect=self.create_raise_exception_function(TimeoutError)) addr_future = self.layer.handle_request_and_create_destination( socks_request) with self.assertRaises(DestNotConnectedError): yield addr_future self.assertIsNotNone(self.event) self.assertIsInstance(self.event, Response) self.assertEqual(self.event.status, RESP_STATUS["NETWORK_UNREACHABLE"]) self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"]) self.assertEqual(self.event.addr, IPv4Address(u"1.2.3.4")) self.assertEqual(self.event.port, self.port)
def create_endpoint(ipv4, port, service_name): """Creates an endpoint object :param ipv4: ipv4 address of the endpoint :param port: port of the endpoint :service_name: human readable name that identifies the service of the endpoint :returns: zipkin endpoint object """ if (ipv4 == ""): ipv4 = "0.0.0.0" try: ipv4 = int(ipaddress.IPv4Address(ipv4)) except: ipv4 = "0.0.0.0" port = int(port) return zipkin_core.Endpoint( ipv4=ipv4, port=port, service_name=service_name)
def build_trace_req_header(oam_type, sil, remote_ip, remote_port): trace_req_header_values = TRACEREQHEADER() trace_req_header_values.oam_type = oam_type trace_req_header_values.sil = sil trace_req_header_values.port = int(remote_port) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((remote_ip, trace_req_header_values.port)) # print(s.getsockname()[0]) src_addr = ipaddress.ip_address(s.getsockname()[0]) if src_addr.version == 4: trace_req_header_values.ip_1 = 0x00000000 trace_req_header_values.ip_2 = 0x00000000 trace_req_header_values.ip_3 = 0x0000FFFF trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr)) elif src_addr.version == 6: int_addr6 = int(ipaddress.IPv6Address(src_addr)) trace_req_header_values.ip_1 = int_addr6 >> 96 trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF return trace_req_header_values
def process_context_headers(ctx1=0, ctx2=0, ctx3=0, ctx4=0): """ Encode context header values in NSH header. The function is smart enough that is one of the values is an IP address it will properly convert to a integer and encode it properly :param ctx1: NSH context header 1 :param ctx2: NSH context header 2 :param ctx3: NSH context header 3 :param ctx4: NSH context header 4 :return: Array of encoded headers """ context_headers = [] for ctx in [ctx1, ctx2, ctx3, ctx4]: try: ipaddr = ipaddress.IPv4Address(ctx) context_headers.append(int(ipaddr)) except ValueError: try: context_headers.append((int(ctx) & 0xFFFFFFFF)) except ValueError: logger.error("Context header %d can not be represented as an integer", ctx) return context_headers
def getRecvFrom(addressType): def recvfrom(s, sz): _to = None data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz)) for anc in ancdata: if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO: addr = in_pktinfo.from_buffer_copy(anc[2]) addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes()) _to = (str(addr), s.getsockname()[1]) elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO: addr = in6_pktinfo.from_buffer_copy(anc[2]) addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes()) _to = (str(addr), s.getsockname()[1]) return data, addressType(_from).setLocalAddress(_to) return recvfrom
def getSendTo(addressType): def sendto(s, _data, _to): ancdata = [] if type(_to) == addressType: addr = ipaddress.ip_address(_to.getLocalAddress()[0]) else: addr = ipaddress.ip_address(s.getsockname()[0]) if type(addr) == ipaddress.IPv4Address: _f = in_pktinfo() _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed) ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())] elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address: _f = in6_pktinfo() _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed) ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())] return s.sendmsg([_data], ancdata, 0, _to) return sendto
def serverList(self): data = b'\x04' # packet ID data += bytes([AUTH_NUMGATEWAYS]) # total num game servers available data += b'\x01' # last game server used # the following is repeated for each server: data += bytes([GATEWAY_ID]) # ID of each server (starting at 1) # gameserver IP, packed in big-endian order data += ipaddress.IPv4Address(GATEWAY_IP).packed # gameserver port, little-endian order data += struct.pack("<I", GATEWAY_PORT) data += bytes([GATEWAY_AGELIMIT]) # unsure what this is used for data += bytes([GATEWAY_PVP]) # 1 if GATEWAY_PVP server, otherwise 0 data += struct.pack("<H", GATEWAY_NUMPLAYERS) # current # of players data += struct.pack("<H", GATEWAY_MAXPLAYERS) # max # of players data += bytes([GATEWAY_ONLINE]) # 1 if server should be listed, otherwise 0 if (GATEWAY_ONLINE == 1): data += b'\x04\x00\x00\x00\x00' else: # TODO doesn't list server if it isn't a test server data += b'\x00\x00\x00\x00\x00' self.sendPacket(data)
def test_recv_in_response(self): conn = Connection(our_role="client") conn._conn.machine.set_state("response") conn._conn._version = 5 conn._conn._addr_type = ADDR_TYPE["IPV4"] conn._conn._addr = ipaddress.IPv4Address("127.0.0.1") conn._conn._port = 8080 raw_data = struct.pack("!BBxB4BH", 0x5, 0x0, 0x1, 127, 0, 0, 1, 8080) event = conn.recv(raw_data) self.assertEqual(conn._conn.state, "end") self.assertEqual(event, "Response") self.assertEqual(event.status, 0) self.assertEqual(event.atyp, 1) self.assertEqual(event.addr, ipaddress.IPv4Address("127.0.0.1")) self.assertEqual(event.port, 8080)
def test_greeting_request_socks4(self): raw_data = struct.pack("!BBH4B6sB", 0x4, 0x1, 5580, 127, 0, 0, 1, "Johnny".encode("ascii"), 0) request = read_greeting_request(raw_data) self.assertIsInstance(request, Socks4Request) self.assertEqual(request.cmd, 1) self.assertEqual(request.port, 5580) self.assertEqual(request.addr, ipaddress.IPv4Address("127.0.0.1")) self.assertEqual(request.name, "Johnny") raw_data = struct.pack( "!BBH4B6sB14sB", 0x4, 0x1, 5580, 0, 0, 0, 1, "Johnny".encode("ascii"), 0, "www.google.com".encode("idna"), 0) request = read_greeting_request(raw_data) self.assertIsInstance(request, Socks4Request) self.assertEqual(request.cmd, 1) self.assertEqual(request.port, 5580) self.assertEqual(request.addr, ipaddress.IPv4Address("0.0.0.1")) self.assertEqual(request.name, "Johnny") self.assertEqual(request.domainname, "www.google.com")
def __init__(self, cmd, atyp, addr, port): if cmd not in REQ_COMMAND.values(): raise ValueError("Unsupported request command {}".format(cmd)) if atyp not in ADDR_TYPE.values(): raise ValueError("Unsupported address type {}".format(atyp)) if atyp == ADDR_TYPE["IPV4"]: try: addr = ipaddress.IPv4Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv4") elif atyp == ADDR_TYPE["IPV6"]: try: addr = ipaddress.IPv6Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv6") elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func): raise ValueError("Domain name expect to be unicode string") self.cmd = cmd self.atyp = atyp self.addr = addr self.port = port
def __init__(self, status, atyp, addr, port): if status not in RESP_STATUS.values(): raise ValueError("Unsupported status code {}".format(status)) if atyp not in ADDR_TYPE.values(): raise ValueError("Unsupported address type {}".format(atyp)) if atyp == ADDR_TYPE["IPV4"]: try: addr = ipaddress.IPv4Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv4") elif atyp == ADDR_TYPE["IPV6"]: try: addr = ipaddress.IPv6Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv6") elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func): raise ValueError("Domain name expect to be unicode string") self.status = status self.atyp = atyp self.addr = addr self.port = port
def validate_ip_address(self, address, af=6): if af == 4: try: ipaddress.IPv4Address(address) return True except ipaddress.AddressValueError as ex: logging.error(ex) elif af == 6: try: ipaddress.IPv6Address(address) return True except ipaddress.AddressValueError as ex: logging.error(ex) else: raise Exception("Invalid AF: {}".format(af)) return False
def testTeredo(self): # stolen from wikipedia server = ipaddress.IPv4Address('65.54.227.120') client = ipaddress.IPv4Address('192.0.2.45') teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2' self.assertEqual((server, client), ipaddress.ip_address(teredo_addr).teredo) bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2' self.assertFalse(ipaddress.ip_address(bad_addr).teredo) bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2' self.assertFalse(ipaddress.ip_address(bad_addr).teredo) # i77 teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1') self.assertEqual((ipaddress.IPv4Address('94.245.121.253'), ipaddress.IPv4Address('95.26.244.94')), teredo_addr.teredo)
def parse_ip4(pack): arr = struct_ip4.unpack(pack[:20]) hdr = { "version" : arr[0] >> 4, # should be 4 ... "ihl" : arr[0] & 0xF, "tos" : arr[1], "tot_len" : arr[2], "id" : arr[3], "frag_off" : arr[4], "ttl" : arr[5], "protcol" : arr[6], "check" : arr[7], "saddr" : ipaddress.IPv4Address(arr[8]), "daddr" : ipaddress.IPv4Address(arr[9]) } hdr_len = hdr["ihl"] * 4 return hdr, pack[hdr_len:]
def clean(self): # delete existing entry if exists machine = MachineData.objects(ip_address=self.ip_address) if machine: machine.delete() # add ip_address_decimal, aws, ec2 fields depending on the ip_address if self.ip_address_decimal is None: self.ip_address_decimal = int(ipaddress.IPv4Address(self.ip_address)) if self.aws is None: self.aws = Validation.is_aws(self.ip_address) if self.aws: if self.status == 'OK': self.ec2 = { 'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)), 'state': "running" } else: self.ec2 = { 'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)), 'state': None }
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]: # Get DNS info try: a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname], stderr=subprocess.DEVNULL) aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname], stderr=subprocess.DEVNULL) dns_records = a_records + aaaa_records dns_results = [] for line in dns_records.decode('utf-8').strip().split(): try: dns_results.append(str(ip_address(line))) except ValueError: pass except subprocess.CalledProcessError: dns_results = [] return dns_results
def _nodes(self, container_base_name: str) -> Set[Node]: """ Args: container_base_name: The start of the container names. Returns: ``Node``s corresponding to containers with names starting with ``container_base_name``. """ client = docker.from_env(version='auto') filters = {'name': container_base_name} containers = client.containers.list(filters=filters) return set( Node( ip_address=IPv4Address( container.attrs['NetworkSettings']['IPAddress'] ), ssh_key_path=self._path / 'include' / 'ssh' / 'id_rsa', ) for container in containers )
def is_ipv4(string): """ Checks if a string is a valid ip-address (v4) :param string: String to check :type string: str :return: True if an ip, false otherwise. :rtype: bool """ try: ipaddress.IPv4Address(string) return True except ipaddress.AddressValueError: return False
def get_less_used_network(num_of_requested_ip, global_variable, config_obj): """ Get the network with more IP available. :return: Network """ # identify less used network while global_variable.free_ip_lock is True: time.sleep(1) free_ip = global_variable.free_ips less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0] # network must have enough IPs if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int( config_section_map(config_obj, 'vcloud')['min_spare_ip']): return None ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip] pvdc_external_networks = global_variable.vcs.get_network() from ipaddress import IPv4Address, IPv4Network external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks) return {'object': external_network[0], 'iplist': ip_list}
def is_ipv4_address(self, ipaddr): ''' @summary: Check address is valid IPv4 address. @param ipaddr IP address to check @return Boolean ''' is_valid_ipv4 = True try : # building ipaddress fails for some of addresses unless unicode(ipaddr) is specified for both ipv4/ipv6 # Example - 192.168.156.129, it is valid IPV4 address, send_packet works with it. ip = ipaddress.IPv4Address(unicode(ipaddr)) except Exception, e : is_valid_ipv4 = False return is_valid_ipv4 #---------------------------------------------------------------------
def __init__(self, value): """Initialise new IPPrefix instance.""" try: obj = ipaddress.ip_address(value) except Exception: try: obj = ipaddress.ip_network(value, strict=False) except Exception: raise if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]: self.prefix = None self.type = self.HOST elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]: self.prefix = obj.network_address self.type = self.PREFIX self.version = obj.version self.txt = obj.compressed
def validate_ip(addr): """Pass-through function for validating an IPv4 address. Args: ip: (str) IP address Returns: unicode string with same address Raises: Error: if IPv4 address is not valid """ try: return ipaddress.IPv4Address(unicode(addr)).compressed except ipaddress.AddressValueError as exc: raise Error('Invalid IPv4 address "%s"; %s' % (addr, exc))