我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv4Network()。
def _validate_cidr(self, rule): """Validate the cidr block in a rule. Returns: True: Upon successful completion. Raises: SpinnakerSecurityGroupCreationFailed: CIDR definition is invalid or the network range is too wide. """ try: network = ipaddress.IPv4Network(rule['app']) except (ipaddress.NetmaskValueError, ValueError) as error: raise SpinnakerSecurityGroupCreationFailed(error) self.log.debug('Validating CIDR: %s', network.exploded) return True
def validate_ip_network_address_prefix(address, prefix, strict=True): ''' If address/prefix is a valid IP network when parsed according to strict, return it as an ipnetwork object. Otherwise, return None. ''' ip_address = validate_ip_address(address) if ip_address is None or ip_address.version not in [4,6]: return None try: if ip_address.version == 4: return ipaddress.IPv4Network((ip_address, prefix), strict=strict) else: return ipaddress.IPv6Network((ip_address, prefix), strict=strict) except ValueError: return None
def __init__(self, value): if not isinstance( value, ( ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network ) ): raise TypeError( "value must be an instance of ipaddress.IPv4Address, " "ipaddress.IPv6Address, ipaddress.IPv4Network, or " "ipaddress.IPv6Network" ) self._value = value
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEqual(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assert_(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEquals(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def test_covering_cidr(ips): """ covering_cidr() gets the minimal CIDR that covers given IPs. In particular, that means any subnets should *not* cover all given IPs. """ cidr = telepresence.vpn.covering_cidr(ips) assert isinstance(cidr, str) cidr = ipaddress.IPv4Network(cidr) assert cidr.prefixlen <= 24 # All IPs in given CIDR: ips = [ipaddress.IPv4Address(i) for i in ips] assert all([ip in cidr for ip in ips]) # Subnets do not contain all IPs if we're not in minimum 24 bit CIDR: if cidr.prefixlen < 24: for subnet in cidr.subnets(): assert not all([ip in subnet for ip in ips])
def recreate_routable_ip_pool(self): self.ip_pools.clear() # First we parse get the default interface: # 8.8.8.8 via 192.168.115.254 dev ens3 src 192.168.113.245 # # Then we extract IP/Preffix from this: # 2: ens3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast # link/ether 02:00:c0:a8:71:f5 brd ff:ff:ff:ff:ff:ff # inet 192.168.113.245/22 brd 192.168.115.255 scope global ens3 cmd = """ MAIN_INTERFACE=$(ip route get 8.8.8.8 | egrep 'dev\s+.+?\s+src' -o | awk '{print $2}'); # noqa ip addr show dev $MAIN_INTERFACE | awk 'NR==3 { print $2 }' """ _, main_ip, _ = self.ssh_exec('master', cmd) ip_pool = str(IPv4Network(unicode(main_ip), strict=False)) self.ip_pools.add(ip_pool, includes=self.env['KD_ONE_PUB_IPS'])
def run(self, subnet, exclude, include, node=None): if exclude and include: raise InvalidCommand('Can\'t specify both -e and -i') if include: to_include = ippool.IpAddrPool().parse_autoblock(include) net = IPv4Network(unicode(subnet)) hosts = {str(i) for i in net.hosts()} # Do not include network and broadcast IP address # TODO: Fix according to AC-4044 hosts.add(str(net.network_address)) hosts.add(str(net.broadcast_address)) exclude = ','.join(hosts - to_include) ippool.IpAddrPool().create({ 'network': subnet.decode(), 'autoblock': exclude, 'node': node })
def _discover(self, network): """ Runs the discovery scan to find Mebo on your LAN :param network: The IPv4 network netmask as a CIDR string. Example: 192.168.1.0/24 :returns: The IP address of the Mebo found on the LAN :raises: `MeboDiscoveryError` if broadcast discovery times out or the API probe produces a 40X or 50x status code """ try: print('Looking for Mebo...') self._network = IPv4Network(network) addr = self._network.broadcast_address.compressed broadcast = self._get_broadcast(addr) api_response = self._probe(broadcast.ip) api_response.raise_for_status() print('Mebo found at {}'.format(broadcast.ip)) return broadcast.ip except (socket.timeout, HTTPError): raise MeboDiscoveryError(('Unable to locate Mebo on the network.\n' '\tMake sure it is powered on and connected to LAN.\n' '\tIt may be necessary to power cycle the Mebo.'))
def ip_address_get(value): ''' Get an IP address from a string. Raises a GetError if the string is not a valid IP address. Supports both IPv4 and IPv6. ''' if isinstance(value, (ipaddress.IPv4Network, ipaddress.IPv6Network)): raise GetError( "%s not allowed to be converted to IP address, use class attributes" % type(value).__name__, ) try: return ipaddress.ip_address(value) except ValueError: raise GetError("value '%s' is not a valid IPv4/IPv6 address" % value)
def get_less_used_network(num_of_requested_ip, global_variable, config_obj): """ Get the network with more IP available. :return: Network """ # identify less used network while global_variable.free_ip_lock is True: time.sleep(1) free_ip = global_variable.free_ips less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0] # network must have enough IPs if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int( config_section_map(config_obj, 'vcloud')['min_spare_ip']): return None ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip] pvdc_external_networks = global_variable.vcs.get_network() from ipaddress import IPv4Address, IPv4Network external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks) return {'object': external_network[0], 'iplist': ip_list}
def __init__(self, value): """Initialise new IPPrefix instance.""" try: obj = ipaddress.ip_address(value) except Exception: try: obj = ipaddress.ip_network(value, strict=False) except Exception: raise if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]: self.prefix = None self.type = self.HOST elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]: self.prefix = obj.network_address self.type = self.PREFIX self.version = obj.version self.txt = obj.compressed
def is_cidr_in_cidr(small_cidr, big_cidr): """ Return True if the small CIDR is contained in the big CIDR. """ # The default route (0.0.0.0/0) is handled differently, since every route # would always be contained in there. Instead, only a small CIDR of # "0.0.0.0/0" can match against it. Other small CIDRs will always result in # 'False' (not contained). if small_cidr == "0.0.0.0/0": return big_cidr == "0.0.0.0/0" else: if big_cidr == "0.0.0.0/0": return False s = ipaddress.IPv4Network(unicode(small_cidr)) b = ipaddress.IPv4Network(unicode(big_cidr)) return s.subnet_of(b)
def parseIRLorigin(self, fileName, ip2as=True, as2ip=False): re_origin = re.compile('(\d+\.\d+\.\d+\.\d+/\d+)\t(\d+)', re.UNICODE) if as2ip: self.as2ip = dict() if ip2as: self.ip2as = SubnetTree.SubnetTree() F_origin = open(fileName, 'r') for line in iter(F_origin): match = re_origin.match(line) if match is not None: asNum = int(match.group(2)) if ip2as: self.ip2as[match.group(1)] = asNum if as2ip: newNet = IPv4Network(match.group(1)) if newNet.prefixlen > self.smallSubnetPrefix: continue if asNum in self.netGraph: if asNum not in self.as2ip: self.as2ip[asNum] = [newNet] else: self.as2ip[asNum].append(newNet) return None
def removeSAPNAT(self, SAPSwitch): SAPip = SAPSwitch.ip SAPNet = str(ipaddress.IPv4Network(unicode(SAPip), strict=False)) # due to a bug with python-iptables, removing and finding rules does not succeed when the mininet CLI is running # so we use the iptables tool rule0_ = "iptables -t nat -D POSTROUTING ! -o {0} -s {1} -j MASQUERADE".format(SAPSwitch.deployed_name, SAPNet) p = Popen(shlex.split(rule0_)) p.communicate() rule1_ = "iptables -D FORWARD -o {0} -j ACCEPT".format(SAPSwitch.deployed_name) p = Popen(shlex.split(rule1_)) p.communicate() rule2_ = "iptables -D FORWARD -i {0} -j ACCEPT".format(SAPSwitch.deployed_name) p = Popen(shlex.split(rule2_)) p.communicate() info("remove SAP NAT rules for: {0} - {1}\n".format(SAPSwitch.name, SAPNet))
def __init__(self, address: str, mask: int=32): """ :param address: The IP address :param mask: this is the mask it should take for the number of hosts that you are planning to access the mask can be a valid host mask or a valid net_mask or a num_digit slash. host mask starts with 0 and contain either 0's and 255's net masks start with 255 and contain either 0's and 255's """ try: self.ip_address = Resolve(address) self.__mask = mask addr = self.ip_address + str(mask) self.mask_addr = ipaddress.IPv4Network(addr, strict=False) if self.ip_address.version == "IPv4" else \ ipaddress.IPv6Network(addr, strict=False) except UnResolvedException as unRes: print(unRes.args[1]) sys.exit()
def _validate_ip_name(self, tree): if any(isinstance(name, IPAddress) and not isinstance( name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network) ) for name in tree): raise TypeError( "IPAddress name constraints must be an IPv4Network or" " IPv6Network object" )
def register_ipaddress(conn_or_curs=None): """ Register conversion support between `ipaddress` objects and `network types`__. :param conn_or_curs: the scope where to register the type casters. If `!None` register them globally. After the function is called, PostgreSQL :sql:`inet` values will be converted into `~ipaddress.IPv4Interface` or `~ipaddress.IPv6Interface` objects, :sql:`cidr` values into into `~ipaddress.IPv4Network` or `~ipaddress.IPv6Network`. .. __: https://www.postgresql.org/docs/current/static/datatype-net-types.html """ global ipaddress import ipaddress global _casters if _casters is None: _casters = _make_casters() for c in _casters: register_type(c, conn_or_curs) for t in [ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network]: register_adapter(t, adapt_ipaddress)
def test_cidr_array_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]") l = cur.fetchone()[0] self.assertTrue(l[0] is None) self.assertEqual(l[1], ip.ip_network('127.0.0.1')) self.assertEqual(l[2], ip.ip_network('::ffff:102:300/128')) self.assertTrue(isinstance(l[1], ip.IPv4Network), l) self.assertTrue(isinstance(l[2], ip.IPv6Network), l)
def setup(self): self.invalid_types = (True, False, 1) self.address01 = IPv4Network(u'10.0.0.0/8') self.address02 = IPv4Network(u'192.168.0.0/16') self.address03 = IPv4Network(u'172.16.0.0/12')
def test_addresses_getitem(self): group = NetworkGroup() assert_equals(group.addresses[0], IPv4Network(u'0.0.0.0/0'))
def test_add(self): group = NetworkGroup() address04 = IPv4Network(u'10.0.0.0/8') assert_true(group.add(self.address01)) assert_false(group.add(self.address01)) assert_true(group.add(self.address02)) assert_false(group.add(self.address02)) assert_false(group.add(self.address01)) assert_false(group.add(address04))
def network_address(value): """ Convert a string to a network IP """ value = NetTest.convert.string.cidr(value) return ipaddress.IPv4Network(value).network_address.exploded
def broadcast_address(value): """ Convert a string to a broadcast IP """ value = NetTest.convert.string.cidr(value) return ipaddress.IPv4Network(value).broadcast_address.exploded
def address_builder(value): if isinstance(value, (basestring)): try: value = IPv4Interface(unicode(value)).network except AddressValueError: message = 'Unsupported string initialization format \'{}\'' message = message.format(value) raise ValueError(message) elif not isinstance(value, IPv4Network): raise_type_exception(value, (IPv4Network, ), 'build with') return value
def __init__(self, prefix=None, strict=False): """Init new Prefix instance.""" if isinstance(prefix, (ipaddress.IPv4Network, ipaddress.IPv6Network)): self._prefix = prefix else: self._prefix = ipaddress.ip_network(prefix, strict=strict)
def create_ips_for_subnet(sender, instance, created, **kwargs): if not created: return subnet = ipaddress.IPv4Network(instance.network + "/" + instance.netmask) for addr in subnet.hosts(): address = Ip4Address(address=str(addr), subnet=instance) address.save()
def num_addresses(self, ip4subnet): subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask) return subnet.num_addresses
def broadcast_address(self, ip4subnet): subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask) return subnet.broadcast_address
def first_address(self, ip4subnet): subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask) return next(subnet.hosts())