我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.ip_interface()。
def test_inet_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::inet") self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.1/24'::inet") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj)) self.assertEqual(obj, ip.ip_interface('127.0.0.1/24')) cur.execute("select '::ffff:102:300/128'::inet") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj)) self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
def test_inet_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::inet") self.assert_(cur.fetchone()[0] is None) cur.execute("select '127.0.0.1/24'::inet") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj)) self.assertEquals(obj, ip.ip_interface('127.0.0.1/24')) cur.execute("select '::ffff:102:300/128'::inet") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj)) self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
def cidr_validator(value, return_ip_interface=False): """Validate IPv4 + optional subnet in CIDR notation""" try: if '/' in value: ipaddr, netmask = value.split('/') netmask = int(netmask) else: ipaddr, netmask = value, 32 if not validators.ipv4_re.match(ipaddr) or not 1 <= netmask <= 32: raise ValueError ipi = ipaddress.ip_interface(six.text_type(value)) if ipi.is_reserved: raise ValueError except ValueError: raise ValidationError(_('Enter a valid IPv4 address or IPv4 network.')) if return_ip_interface: return ipi
def _create_ping_cmd(self, targets, vrf, count, timeout_value, size): """ Internal method to create ping command. """ cli_cmd = [] try: for numips in targets: check_valid_ip = ip_address(unicode(numips)) numips = str(check_valid_ip) valid_address = ip_interface(unicode(numips)) if valid_address.version == 4: cli = "ping {} vrf {} count {} datagram-size {} timeout {}".format( numips, vrf, count, size, timeout_value) elif valid_address.version == 6: cli = "ping ipv6 {} vrf {} count {} datagram-size {} timeout {}".format( numips, vrf, count, size, timeout_value) cli_cmd.append(cli) return cli_cmd except ValueError: raise ValueError('Invalid IP: %s', numips)
def testHash(self): self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')), hash(ipaddress.ip_interface('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')), hash(ipaddress.ip_network('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')), hash(ipaddress.ip_address('10.1.1.0'))) # i70 self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')), hash(ipaddress.ip_address( int(ipaddress.ip_address('1.2.3.4')._ip)))) ip1 = ipaddress.ip_address('10.1.1.0') ip2 = ipaddress.ip_address('1::') dummy = {} dummy[self.ipv4_address] = None dummy[self.ipv6_address] = None dummy[ip1] = None dummy[ip2] = None self.assertTrue(self.ipv4_address in dummy) self.assertTrue(ip2 in dummy)
def testHash(self): self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')), hash(ipaddress.ip_interface('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')), hash(ipaddress.ip_network('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')), hash(ipaddress.ip_address('10.1.1.0'))) # i70 self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')), hash(ipaddress.ip_address( int(ipaddress.ip_address('1.2.3.4')._ip)))) ip1 = ipaddress.ip_address('10.1.1.0') ip2 = ipaddress.ip_address('1::') dummy = {} dummy[self.ipv4_address] = None dummy[self.ipv6_address] = None dummy[ip1] = None dummy[ip2] = None self.assertIn(self.ipv4_address, dummy) self.assertIn(ip2, dummy)
def _get_host_ip_addr(attributes, networks): if 'intern_ip' not in attributes: if not networks: raise CreateError( '"intern_ip" is not given, and no networks could be found.' ) intern_ip = _choose_ip_addr(networks) if intern_ip is None: raise CreateError( 'No IP address could be selected from the given networks.' ) return intern_ip try: intern_ip = ip_interface(attributes['intern_ip']) except ValueError as error: raise CreateError(str(error)) _check_in_networks(networks, intern_ip.network) return intern_ip.ip
def edit(request): if 'object_id' in request.GET: server = Query({'object_id': request.GET['object_id']}).get() else: servertype = Servertype.objects.get(pk=request.POST['attr_servertype']) project = Project.objects.get(pk=request.POST['attr_project']) hostname = request.POST['attr_hostname'] if servertype.ip_addr_type == 'null': intern_ip = None elif servertype.ip_addr_type == 'network': intern_ip = ip_network(request.POST['attr_intern_ip']) else: intern_ip = ip_interface(request.POST['attr_intern_ip']) server = ServerObject.new(servertype, project, hostname, intern_ip) return _edit(request, server, True)
def _configure_interface_address(self, interface, address, default_route=None): net = ipaddress.ip_interface(address) if default_route: try: next_hop = ipaddress.ip_address(default_route) except ValueError: self.logger.error("next-hop address {} could not be parsed".format(default_route)) sys.exit(1) if default_route and next_hop not in net.network: self.logger.error("next-hop address {} not in network {}".format(next_hop, net)) sys.exit(1) subprocess.check_call(["ip", "-{}".format(net.version), "address", "add", str(net.ip) + "/" + str(net.network.prefixlen), "dev", interface]) if next_hop: try: subprocess.check_call(["ip", "-{}".format(net.version), "route", "del", "default"]) except: pass subprocess.check_call(["ip", "-{}".format(net.version), "route", "add", "default", "dev", interface, "via", str(next_hop)])
def _network(self, address, netmask): """ Return CIDR network """ return ipaddress.ip_interface(u'{}/{}'.format(address, netmask)).network
def cast_interface(s, cur=None): if s is None: return None # Py2 version force the use of unicode. meh. return ipaddress.ip_interface(str(s))
def test_inet_array_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]") l = cur.fetchone()[0] self.assertTrue(l[0] is None) self.assertEqual(l[1], ip.ip_interface('127.0.0.1')) self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128')) self.assertTrue(isinstance(l[1], ip.IPv4Interface), l) self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')]) self.assertEqual(cur.fetchone()[0], '127.0.0.1/24') cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')]) self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
def cast_interface(s, cur=None): if s is None: return None # Py2 version force the use of unicode. meh. return ipaddress.ip_interface(unicode(s))
def test_inet_array_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]") l = cur.fetchone()[0] self.assert_(l[0] is None) self.assertEquals(l[1], ip.ip_interface('127.0.0.1')) self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128')) self.assert_(isinstance(l[1], ip.IPv4Interface), l) self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')]) self.assertEquals(cur.fetchone()[0], '127.0.0.1/24') cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')]) self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def _get_subnets_by_interface_cidr(neutron_network_id, interface_cidr): iface = ipaddress.ip_interface(six.text_type(interface_cidr)) subnets = _get_subnets_by_attrs( network_id=neutron_network_id, cidr=six.text_type(iface.network)) if len(subnets) > 2: raise exceptions.DuplicatedResourceException( "Multiple Neutron subnets exist for the network_id={0}" "and cidr={1}" .format(neutron_network_id, iface.network)) return subnets
def _process_interface_address(port_dict, subnets_dict_by_id, response_interface): subnet_id = port_dict['subnet_id'] subnet = subnets_dict_by_id[subnet_id] iface = ipaddress.ip_interface(six.text_type(subnet['cidr'])) address_key = 'Address' if iface.version == 4 else 'AddressIPv6' response_interface[address_key] = six.text_type(iface)
def _get_fixed_ips_by_interface_cidr(subnets, interface_cidrv4, interface_cidrv6, fixed_ips): for subnet in subnets: fixed_ip = [('subnet_id=%s' % subnet['id'])] if interface_cidrv4 or interface_cidrv6: if subnet['ip_version'] == 4 and interface_cidrv4: iface = ipaddress.ip_interface(six.text_type(interface_cidrv4)) elif subnet['ip_version'] == 6 and interface_cidrv6: iface = ipaddress.ip_interface(six.text_type(interface_cidrv6)) if six.text_type(subnet['cidr']) != six.text_type(iface.network): continue fixed_ip.append('ip_address=%s' % iface.ip) fixed_ips.extend(fixed_ip)
def get_node_interface(data, node_ip): ip = ipaddress.ip_address(unicode(node_ip)) patt = re.compile(r'(?P<iface>\w+)\s+inet\s+(?P<ip>[0-9\/\.]+)') for line in data.splitlines(): m = patt.search(line) if m is None: continue iface = ipaddress.ip_interface(unicode(m.group('ip'))) if ip == iface.ip: return m.group('iface')
def iter_addrs(data): for ipaddr in data['ipaddr']: yield ipaddress.ip_interface('{}/{}'.format(*ipaddr))
def send(self, msg): """ Sends the IMC message to the node, filling in the destination :param msg: The IMC message to send :return: """ imcudp_services = self.services['imc+udp'] if not imcudp_services: logger.error('{} does not expose an imc+udp service'.format(self)) return # Determine which service to send to based on ip/netmask # Note: this might not account for funky ip routing networks = [ip.ip_interface(x[1] + '/' + x[2]).network for x in get_interfaces()] for svc in imcudp_services: svc_ip = ip.ip_address(svc.ip) if any([svc_ip in network for network in networks]): with IMCSenderUDP(svc.ip) as s: s.send(message=msg, port=svc.port) return # If this point is reached no local interfaces has the target system in its netmask # Could be running on same system with no available interfaces # Send on loopback ports = [svc.port for svc in imcudp_services] with IMCSenderUDP('127.0.0.1') as s: for port in ports: s.send(message=msg, port=port)
def get_ip_interface(self, ipaddr): return ipaddress.ip_interface(text_type(ipaddr + '/' + self.subnet.netmask))
def ip_interface(self): return self.ip_interface(self.ip)
def _get_local_netinfo(iface): ifconfig = netifaces.ifaddresses(iface)[netifaces.AF_INET][0] ifconfig['gateway'] = netifaces.gateways()['default'][netifaces.AF_INET][0] ip = ipaddress.ip_interface(u'%s/%s' % (ifconfig['addr'], ifconfig['netmask'])) ifconfig['network'] = str(ip.network.network_address) ifconfig['iface'] = iface return ifconfig
def cidr(ipaddr, netmask): try: ip = ipaddress.ip_interface(u'%s/%s' % (ipaddr, netmask)) return str(ip).replace('/', ' / ') except ValueError: return ipaddr + ' / ' + netmask
def _create_ping_cmd(self, targets, vrf, count, timeout_value, size): """ Internal method to create ping command. """ cli_cmd = [] try: for numips in targets: check_valid_ip = ip_address(unicode(numips)) numips = str(check_valid_ip) valid_address = ip_interface(unicode(numips)) if valid_address.version == 4: if vrf != 'default-vrf': cli = "ping vrf {} {} count {} datagram-size {} timeout {}".format( vrf, numips, count, size, timeout_value) else: cli = "ping {} count {} datagram-size {} timeout {}".format( numips, count, size, timeout_value) elif valid_address.version == 6: if vrf != 'default-vrf': cli = "ping vrf {} ipv6 {} count {} datagram-size {} timeout {}".format( vrf, numips, count, size, timeout_value) else: cli = "ping ipv6 {} count {} datagram-size {} timeout {}".format( numips, count, size, timeout_value) cli_cmd.append(cli) return cli_cmd except ValueError: raise ValueError('Invalid IP: %s', numips)
def test_ip_interface(self): self.assertFactoryError(ipaddress.ip_interface, "interface")
def testIpFromPacked(self): address = ipaddress.ip_address self.assertEqual(self.ipv4_interface._ip, ipaddress.ip_interface(b'\x01\x02\x03\x04')._ip) self.assertEqual(address('255.254.253.252'), address(b'\xff\xfe\xfd\xfc')) self.assertEqual(self.ipv6_interface.ip, ipaddress.ip_interface( b'\x20\x01\x06\x58\x02\x2a\xca\xfe' b'\x02\x00\x00\x00\x00\x00\x00\x01').ip) self.assertEqual(address('ffff:2:3:4:ffff::'), address(b'\xff\xff\x00\x02\x00\x03\x00\x04' + b'\xff\xff' + b'\x00' * 6)) self.assertEqual(address('::'), address(b'\x00' * 16))
def testInterfaceComparison(self): self.assertTrue(ipaddress.ip_interface('1.1.1.1') <= ipaddress.ip_interface('1.1.1.1')) self.assertTrue(ipaddress.ip_interface('1.1.1.1') <= ipaddress.ip_interface('1.1.1.2')) self.assertTrue(ipaddress.ip_interface('::1') <= ipaddress.ip_interface('::1')) self.assertTrue(ipaddress.ip_interface('::1') <= ipaddress.ip_interface('::2'))
def _get_ip_network(self, gateway_ip, prefixlen): """Returns the IP network for the gateway in CIDR form.""" return str(ipaddress.ip_interface(unicode(gateway_ip + "/" + str(prefixlen))).network)
def testCopyConstructor(self): addr1 = ipaddress.ip_network('10.1.1.0/24') addr2 = ipaddress.ip_network(addr1) addr3 = ipaddress.ip_interface('2001:658:22a:cafe:200::1/64') addr4 = ipaddress.ip_interface(addr3) addr5 = ipaddress.IPv4Address('1.1.1.1') addr6 = ipaddress.IPv6Address('2001:658:22a:cafe:200::1') self.assertEqual(addr1, addr2) self.assertEqual(addr3, addr4) self.assertEqual(addr5, ipaddress.IPv4Address(addr5)) self.assertEqual(addr6, ipaddress.IPv6Address(addr6))
def generatePeeringAddresses(): network = ip_network(u'10.0.%s.0/24' % AutonomousSystem.psIdx) AutonomousSystem.psIdx += 1 return ip_interface('%s/%s' % (network[1], network.prefixlen)), \ ip_interface('%s/%s' % (network[2], network.prefixlen))
def getLastAddress(network): return ip_interface(network.network_address + network.num_addresses - 2)
def getIthAddress(network, i): return ip_interface('%s/%s' % (network[i], network.prefixlen))
def test_commit_query(self): q = Query({'hostname': 'test1'}) s = q.get() s['os'] = 'wheezy' s['intern_ip'] = '10.16.2.1' q.commit() s = Query({'hostname': 'test1'}).get() self.assertEquals(s['os'], 'wheezy') self.assertEquals(s['intern_ip'], ip_interface('10.16.2.1'))
def clean(self, value): ip_string = super(IPv4Field, self).clean(value) if ip_string: return ip_interface(ip_string) return None
def clean(self, value): ip_string = super(IPv6Field, self).clean(value) if ip_string: try: return ip_interface(ip_string) except ValueError: raise ValidationError('Not a valid IPv6 Address') return None