我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv6Address()。
def exeute(self, method, value): str_convert_type = ( six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address) try: result = getattr( self.typeclass(value, self.strict_level), method)() if method == "validate": result = "NOP [#f1]_" elif isinstance(result, str_convert_type): result = '``"{}"``'.format(result) except TypeError: result = "E [#f2]_" except typepy.TypeConversionError: result = "E [#f3]_" return result
def __init__(self, value): if not isinstance( value, ( ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network ) ): raise TypeError( "value must be an instance of ipaddress.IPv4Address, " "ipaddress.IPv6Address, ipaddress.IPv4Network, or " "ipaddress.IPv6Network" ) self._value = value
def isipv6(value): """ Return whether or not given value is an IP version 6. If the value is an IP version 6, this function returns ``True``, otherwise ``False``. Examples:: >>> isipv6('2001:41d0:2:a141::1') True >>> isipv6('127.0.0.1') False :param value: string to validate IP version 6 """ try: ip_addr = ipaddress.IPv6Address(value) except ipaddress.AddressValueError: return False return ip_addr.version == 6
def setUp(self): self.packet_class = IPv6 self.packet_fixture = bytes.fromhex( '6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222' '123456780018a695') + b'Demo UDP packet!' self.message_fixture = IPv6( traffic_class=239, flow_label=773615, next_header=17, hop_limit=253, source=IPv6Address('2001:9e0::3:2002'), destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'), payload=UDP( source_port=4660, destination_port=22136, checksum=42645, payload=b'Demo UDP packet!' ) ) self.parse_packet()
def test_l4_payload_type(self): message = IPv6( traffic_class=0, flow_label=0, next_header=0, hop_limit=0, source=IPv6Address('::'), destination=IPv6Address('::'), payload=UnknownLayer4Protocol(b'1234') ) packet = bytes.fromhex('6000000000040000000000000000000000000000' '0000000000000000000000000000000000000000' '31323334') parsed_len, parsed_message = IPv6.parse(packet) self.assertEqual(parsed_len, len(packet)) self.assertEqual(parsed_message, message) self.assertEqual(message.save(), packet)
def find_link_address(link_address: Optional[IPv6Address], interface_addresses: Iterable[IPv6Address]) -> IPv6Address: """ Find the appropriate reply-from address :param link_address: The link-address address specified in the configuration, if any :param interface_addresses: The list of addresses on the interface :return: The reply-from address to use """ if link_address: return link_address else: # Pick the first global address global_addresses = [address for address in interface_addresses if is_global_unicast(address)] if global_addresses: return global_addresses[0] else: return IPv6Address('::')
def ValidateIP(ip): """ Check if an IP is public or not. Public IP is any IP that is neither of: - private - loopback - broadcast / multicast - link-local Returns True on success and False on failure. Raises a ValueError exception if the input is not of type ipaddress.IPv4Address or ipaddress.IPv6Address. """ return __validateIP(ip)
def build_trace_req_header(oam_type, sil, remote_ip, remote_port): trace_req_header_values = TRACEREQHEADER() trace_req_header_values.oam_type = oam_type trace_req_header_values.sil = sil trace_req_header_values.port = int(remote_port) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((remote_ip, trace_req_header_values.port)) # print(s.getsockname()[0]) src_addr = ipaddress.ip_address(s.getsockname()[0]) if src_addr.version == 4: trace_req_header_values.ip_1 = 0x00000000 trace_req_header_values.ip_2 = 0x00000000 trace_req_header_values.ip_3 = 0x0000FFFF trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr)) elif src_addr.version == 6: int_addr6 = int(ipaddress.IPv6Address(src_addr)) trace_req_header_values.ip_1 = int_addr6 >> 96 trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF return trace_req_header_values
def getSendTo(addressType): def sendto(s, _data, _to): ancdata = [] if type(_to) == addressType: addr = ipaddress.ip_address(_to.getLocalAddress()[0]) else: addr = ipaddress.ip_address(s.getsockname()[0]) if type(addr) == ipaddress.IPv4Address: _f = in_pktinfo() _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed) ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())] elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address: _f = in6_pktinfo() _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed) ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())] return s.sendmsg([_data], ancdata, 0, _to) return sendto
def __init__(self, cmd, atyp, addr, port): if cmd not in REQ_COMMAND.values(): raise ValueError("Unsupported request command {}".format(cmd)) if atyp not in ADDR_TYPE.values(): raise ValueError("Unsupported address type {}".format(atyp)) if atyp == ADDR_TYPE["IPV4"]: try: addr = ipaddress.IPv4Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv4") elif atyp == ADDR_TYPE["IPV6"]: try: addr = ipaddress.IPv6Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv6") elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func): raise ValueError("Domain name expect to be unicode string") self.cmd = cmd self.atyp = atyp self.addr = addr self.port = port
def __init__(self, status, atyp, addr, port): if status not in RESP_STATUS.values(): raise ValueError("Unsupported status code {}".format(status)) if atyp not in ADDR_TYPE.values(): raise ValueError("Unsupported address type {}".format(atyp)) if atyp == ADDR_TYPE["IPV4"]: try: addr = ipaddress.IPv4Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv4") elif atyp == ADDR_TYPE["IPV6"]: try: addr = ipaddress.IPv6Address(addr) except ipaddress.AddressValueError: raise ValueError("Invalid ipaddress format for IPv6") elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func): raise ValueError("Domain name expect to be unicode string") self.status = status self.atyp = atyp self.addr = addr self.port = port
def validate_ip_address(self, address, af=6): if af == 4: try: ipaddress.IPv4Address(address) return True except ipaddress.AddressValueError as ex: logging.error(ex) elif af == 6: try: ipaddress.IPv6Address(address) return True except ipaddress.AddressValueError as ex: logging.error(ex) else: raise Exception("Invalid AF: {}".format(af)) return False
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]: # Get DNS info try: a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname], stderr=subprocess.DEVNULL) aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname], stderr=subprocess.DEVNULL) dns_records = a_records + aaaa_records dns_results = [] for line in dns_records.decode('utf-8').strip().split(): try: dns_results.append(str(ip_address(line))) except ValueError: pass except subprocess.CalledProcessError: dns_results = [] return dns_results
def is_ipv6(string): """ Checks if a string is a valid ip-address (v6) :param string: String to check :type string: str :return: True if an ipv6, false otherwise. :rtype: bool """ try: ipaddress.IPv6Address(string) return True except ipaddress.AddressValueError: return False
def test_bad_address_split_v6_repeated_double_colon(self): def assertBadSplit(addr): msg = "At most one '::' permitted in %r" with self.assertAddressError(msg, addr): ipaddress.IPv6Address(addr) assertBadSplit("3ffe::1::1") assertBadSplit("1::2::3::4:5") assertBadSplit("2001::db:::1") assertBadSplit("3ffe::1::") assertBadSplit("::3ffe::1") assertBadSplit(":3ffe::1::1") assertBadSplit("3ffe::1::1:") assertBadSplit(":3ffe::1::1:") assertBadSplit(":::") assertBadSplit('2001:db8:::1')
def testTeredo(self): # stolen from wikipedia server = ipaddress.IPv4Address('65.54.227.120') client = ipaddress.IPv4Address('192.0.2.45') teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2' self.assertEqual((server, client), ipaddress.ip_address(teredo_addr).teredo) bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2' self.assertFalse(ipaddress.ip_address(bad_addr).teredo) bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2' self.assertFalse(ipaddress.ip_address(bad_addr).teredo) # i77 teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1') self.assertEqual((ipaddress.IPv4Address('94.245.121.253'), ipaddress.IPv4Address('95.26.244.94')), teredo_addr.teredo)
def handle_ipaddr_insert(self, prefix, prefix_len, _stable, flags, _is_local): """ Add an ip address for each prefix on prefix change. """ ipaddr_str = str(ipaddress.IPv6Address(prefix)) + \ str(self.wpan_api.nodeid) if CONFIG.DEBUG_LOG_PROP: print("\n>>>> new PREFIX add ipaddr: " + ipaddr_str) valid = 1 preferred = 1 flags = 0 ipaddr = ipaddress.IPv6Interface(unicode(ipaddr_str)) self.autoAddresses.add(ipaddr) arr = self.encode_fields('6CLLC', ipaddr.ip.packed, prefix_len, valid, preferred, flags) self.wpan_api.prop_insert_async(SPINEL.PROP_IPV6_ADDRESS_TABLE, arr, str(len(arr)) + 's', SPINEL.HEADER_EVENT_HANDLER)
def handle_property(self, line, prop_id, mixed_format='B', output=True): """ Helper to set property when line argument passed, get otherwise. """ value = self.prop_get_or_set_value(prop_id, line, mixed_format) if not output: return value if value is None or value == "": print("Error") return None if line is None or line == "": # Only print value on PROP_VALUE_GET if mixed_format == '6': print(str(ipaddress.IPv6Address(value))) elif (mixed_format == 'D') or (mixed_format == 'E'): print(util.hexify_str(value, '')) elif mixed_format == 'H': print("%04x" % value) else: print(str(value)) print("Done") return value
def __init__(self, value): """Initialise new IPPrefix instance.""" try: obj = ipaddress.ip_address(value) except Exception: try: obj = ipaddress.ip_network(value, strict=False) except Exception: raise if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]: self.prefix = None self.type = self.HOST elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]: self.prefix = obj.network_address self.type = self.PREFIX self.version = obj.version self.txt = obj.compressed
def _is_ipaddress(value): import ipaddress return isinstance( value, (ipaddress.IPv4Address, ipaddress.IPv6Address))
def is_ipv6(ip_addr): try: ipaddress.IPv6Address(ip_addr) return True except ipaddress.AddressValueError: return False
def unpack_ip_address_bytes(data_bytes, addr_type): ''' Return a tuple containing the unpacked addr_type IP address string, and the remainder of data_bytes. addr_type must be either IPV4_ADDRESS_TYPE or IPV6_ADDRESS_TYPE. data_bytes must be at least IPV4_ADDRESS_LEN or IPV6_ADDRESS_LEN long. ''' addr_len = get_addr_type_len(addr_type) assert len(data_bytes) >= addr_len if addr_type == IPV4_ADDRESS_TYPE: assert addr_len == IPV4_ADDRESS_LEN (addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes) # Some ipaddress variants demand bytearray, others demand bytes try: addr_value = ipaddress.IPv4Address(bytearray(addr_bytes)) return (str(addr_value), remaining_bytes) except ipaddress.AddressValueError: pass except UnicodeDecodeError: pass addr_value = ipaddress.IPv4Address(bytes(addr_bytes)) elif addr_type == IPV6_ADDRESS_TYPE: assert addr_len == IPV6_ADDRESS_LEN (addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes) try: addr_value = ipaddress.IPv6Address(bytearray(addr_bytes)) return (str(addr_value), remaining_bytes) except ipaddress.AddressValueError: pass except UnicodeDecodeError: pass addr_value = ipaddress.IPv6Address(bytes(addr_bytes)) else: raise ValueError('Unexpected address type: {}'.format(addr_type)) return (str(addr_value), remaining_bytes)
def get_bgp_table(host): """Return the contents of the cbgpPeer2Table table.""" obj = ObjectIdentity('CISCO-BGP4-MIB', 'cbgpPeer2Table') target = UdpTransportTarget((host, SNMP['port'])) results = defaultdict(dict) for (errorIndication, errorStatus, errorIndex, varBinds) in bulkCmd(snmp, usm, target, context, 0, 25, ObjectType(obj), lexicographicMode=False, lookupMib=True): if errorIndication: raise RuntimeError(errorIndication) elif errorStatus: raise RuntimeError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[-1][int(errorIndex)-1] or '?')) else: for (key, val) in varBinds: if not isinstance(val, EndOfMibView): (mib, name, index) = \ key.loadMibs('BGP4-MIB').getMibSymbol() a = index[1].prettyPrint() results[a]['cbgpPeer2Type'] = index[0] results[a][name] = val for address in results: if results[address]['cbgpPeer2Type'] == 1: results[address]['cbgpPeer2RemoteAddr'] = \ ipaddress.IPv4Address(int(address, 16)) elif results[address]['cbgpPeer2Type'] == 2: results[address]['cbgpPeer2RemoteAddr'] = \ ipaddress.IPv6Address(int(address, 16)) return results
def test_checksum_calculation(self): dummy_ipv6 = IPv6( source=IPv6Address('2001:9e0::3:2002'), destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'), ) self.assertEqual(self.message_fixture.checksum, self.message.calculate_checksum(dummy_ipv6))
def test_save_with_checksum_calculation(self): dummy_ipv6 = IPv6( source=IPv6Address('2001:9e0::3:2002'), destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'), ) self.message.checksum = -1 saved = self.message.save(recalculate_checksum_for=dummy_ipv6) self.assertEqual(saved, self.packet_fixture)
def test_validate_source(self): self.message.source = bytes.fromhex('20010db8000000000000000000000001') with self.assertRaisesRegex(ValueError, 'Source .* IPv6 address'): self.message.validate() self.message.source = IPv6Address('ff02::1') with self.assertRaisesRegex(ValueError, 'Source .* non-multicast IPv6 address'): self.message.validate()
def setUp(self): self.packet_class = Ethernet self.packet_fixture = bytes.fromhex( '0123456789abcdef0123456786dd' '6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222' '123456780018a695') + b'Demo UDP packet!' self.message_fixture = Ethernet( destination=bytes.fromhex('0123456789ab'), source=bytes.fromhex('cdef01234567'), ethertype=int('86dd', 16), payload=IPv6( traffic_class=239, flow_label=773615, next_header=17, hop_limit=253, source=IPv6Address('2001:9e0::3:2002'), destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'), payload=UDP( source_port=4660, destination_port=22136, checksum=42645, payload=b'Demo UDP packet!' ) ) ) self.parse_packet()
def __init__(self, traffic_class: int = 0, flow_label: int = 0, next_header: int = 0, hop_limit: int = 0, source: IPv6Address = None, destination: IPv6Address = None, payload: ProtocolElement = None): super().__init__() self.traffic_class = traffic_class self.flow_label = flow_label self.next_header = next_header self.hop_limit = hop_limit self.source = source self.destination = destination self.payload = payload
def validate(self): """ Validate that the contents of this object conform to protocol specs. """ # Check if the traffic class fits in 8 bits if not isinstance(self.traffic_class, int) or not (0 <= self.traffic_class < 2 ** 8): raise ValueError("Traffic class must be an unsigned 8 bit integer") # Check if the flow label fits in 20 bits if not isinstance(self.flow_label, int) or not (0 <= self.flow_label < 2 ** 20): raise ValueError("Flow label must be an unsigned 20 bit integer") # Check if the next header fits in 8 bits if not isinstance(self.next_header, int) or not (0 <= self.next_header < 2 ** 8): raise ValueError("Next header type must be an unsigned 8 bit integer") # Check if the hop limit fits in 8 bits if not isinstance(self.hop_limit, int) or not (0 <= self.hop_limit < 2 ** 8): raise ValueError("Hop limit must be an unsigned 8 bit integer") # Check if the source and destination are IPv6 addresses if not isinstance(self.source, IPv6Address): raise ValueError("Source must be an IPv6 address") if self.source.is_multicast: raise ValueError("Source must be a non-multicast IPv6 address") if not isinstance(self.destination, IPv6Address): raise ValueError("Destination must be an IPv6 address") # Check if the payload is a protocol element if not isinstance(self.payload, ProtocolElement): raise ValueError("Payload must be a protocol element") # Check if all options are allowed self.validate_contains([self.payload]) self.payload.validate()
def __init__(self, interface_index: int, interface_mac_address: bytes, client_mac_address: bytes, reply_from: IPv6Address, reply_socket: socket.socket): self.interface_index = interface_index self.interface_mac_address = interface_mac_address self.client_mac_address = client_mac_address self.reply_from = reply_from self.reply_socket = reply_socket
def find_reply_from(reply_from: Optional[IPv6Address], interface_name: str, interface_addresses: Iterable[IPv6Address]) -> IPv6Address: """ Find the appropriate reply-from address :param reply_from: The reply-from address specified in the configuration, if any :param interface_name: The name of the interface for logging purposes :param interface_addresses: The list of addresses on the interface :return: The reply-from address to use """ if reply_from: # Check if this address exists if reply_from not in interface_addresses: raise ValueError("Reply-from address {addr} does not exist on {intf}".format( addr=reply_from, intf=interface_name )) return reply_from else: # Pick the first link-local address ll_addresses = [address for address in interface_addresses if address.is_link_local] if not ll_addresses: raise ValueError("No link-local address found on {intf}".format( intf=interface_name )) return ll_addresses[0]
def set_domain_name_for_ipv6_subnet(sender, instance, **kwargs): if len(instance.domain_name) > 0: return network = ipaddress.IPv6Address("%s::" % instance.network) instance.domain_name = ".".join(network.exploded.replace(":", "")[:16])[::-1] + ".ip6.arpa"
def makeIP(i, size): if size == 4: return ipaddress.IPv4Address(i) else: return ipaddress.IPv6Address(i)