我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.ip_network()。
def public_network(self): """ All nodes must have the same public network. The public network must be valid. """ for node in self.data.keys(): public_network = self.data[node].get("public_network", "") net_list = Util.parse_list_from_string(public_network) log.debug("public_network: {} {}".format(node, net_list)) for network in net_list: try: ipaddress.ip_network(u'{}'.format(network)) except ValueError as err: msg = "{} on {} is not valid: {}".format(network, node, err) self.errors.setdefault('public_network', []).append(msg) self._set_pass_status('public_network')
def cluster_network(self): """ All storage nodes must have the same cluster network. The cluster network must be valid. """ for node in self.data.keys(): if ('roles' in self.data[node] and 'storage' in self.data[node]['roles']): cluster_network = self.data[node].get("cluster_network", "") net_list = Util.parse_list_from_string(cluster_network) log.debug("cluster_network: {} {}".format(node, net_list)) for network in net_list: try: ipaddress.ip_network(u'{}'.format(network)) except ValueError as err: msg = "{} on {} is not valid: {}".format(network, node, err) self.errors.setdefault('cluster_network', []).append(msg) self._set_pass_status('cluster_network')
def block_cidr_network(*blocked_networks): # type: (*str) -> Callable """Block recipient URLs from a list of CIDR networks. Example: >>> block_cidr_network('192.168.0.0/24', '132.34.23.0/24') """ _blocked_networks = [ip_network(text_type(x)) for x in blocked_networks] def validate_cidr(recipient_url): # type: (str) -> None recipient_addr = _url_ip_address(recipient_url) for blocked_network in _blocked_networks: if recipient_addr in blocked_network: raise SecurityError( 'IP address of recipient {0}={1} is in network {2}'.format( recipient_url, recipient_addr, blocked_network, )) validate_cidr._args = blocked_networks validate_cidr._validator = 'block_cidr_network' return validate_cidr
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEqual(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def set_unset(self, interface_id, attribute, address=None): """ Set attribute to True and unset the same attribute for all other interfaces. This is used for interface options that can only be set on one engine interface. """ for interface in self: for sub_interface in interface.sub_interfaces(): # Skip VLAN only interfaces (no addresses) if not isinstance(sub_interface, PhysicalVlanInterface): if getattr(sub_interface, attribute) is not None: if sub_interface.nicid == str(interface_id): if address is not None: # Find IP on Node Interface if ipaddress.ip_address(bytes_to_unicode(address)) in \ ipaddress.ip_network(sub_interface.network_value): setattr(sub_interface, attribute, True) else: setattr(sub_interface, attribute, False) else: setattr(sub_interface, attribute, True) else: #unset setattr(sub_interface, attribute, False)
def process_arguments(args: List[str]) -> ConnectionData: assert isinstance(args, list) for a in args: assert isinstance(a, str) if len(args) == 0: #rv = ConnectionData(ip_net='192.168.3.0/24', iface='eth0') rv = ConnectionData(ip_net='192.168.3.0/24', iface='wlan0') elif len(args) == 1: rv = ConnectionData(ip_net=args[0], iface='eth0') elif len(args) == 2: rv = ConnectionData(ip_net=args[0], iface=args[1]) else: print('Incorrect number of arguments.') sys.exit(1) try: ipaddress.ip_network(rv.ip_net) except ValueError: print("First parameter doesn't appear to be an IP network specification.") sys.exit(1) return rv
def _validate_cidr_format(cidr): """Validate CIDR IP range :param str cidr: :return: :rtype: bool """ try: ipaddress.ip_network(cidr, strict=False) except (ValueError, ipaddress.AddressValueError, ipaddress.NetmaskValueError): return False if '/' not in cidr: return False if re.search('\s', cidr): return False return True
def add_ban(self, ip, reason, duration, name=None): """ Ban an ip with an optional reason and duration in minutes. If duration is None, ban is permanent. """ network = ip_network(text_type(ip), strict=False) for connection in list(self.connections.values()): if ip_address(connection.address[0]) in network: name = connection.name connection.kick(silent=True) if duration: duration = reactor.seconds() + duration * 60 else: duration = None self.bans[ip] = (name or '(unknown)', reason, duration) self.save_bans()
def load_config(self, config): with open(config) as c: cfg = json.load(c) m = hashlib.sha224() m.update(bytes(cfg['subnet'], 'utf-8')) m.update(bytes(cfg['swap_size'])) m.update(bytes(cfg['root_size'])) m.update(bytes(cfg['extra_args'], 'utf-8')) # TODO: check sizes for f in cfg['hashes']: with open(f, 'rb') as fl: for line in fl: m.update(line) cfg['sha224'] = m.hexdigest() cfg['subnet'] = ipaddress.ip_network(cfg['subnet']) cfg['image_method'] = IMAGE_METHOD return cfg
def getTestNetworkIps(manifest): testSubnets = getTestSubnets(manifest) testSubnet = testSubnets[0] otherSubnets = testSubnets[1:] testIpSubnet = ipaddress.ip_network(testSubnet["range"]) otherIpSubnets = [ipaddress.ip_network(s["range"]) for s in otherSubnets] subnetGateway = testSubnet["gateway"] overlapOtherIpSubnets = [s for s in otherIpSubnets if testIpSubnet.overlaps(s)] for otherIpSubnet in overlapOtherIpSubnets: testIpSubnet = testIpSubnet.address_exclude(otherIpSubnet) testHosts = [h.exploded for h in testIpSubnet.hosts()] testHosts.remove(subnetGateway) return testHosts
def _ip_network_hosts(obj, page=None): """ Returns a portion of IP addresses based on page parameter. :returns: IP addresses :rtype: iterator (AC-3531) Previous rtype was a generator """ pages = obj.pages() page = _page(page, pages) net_ip = obj.network_address + (page - 1) * 2 ** obj.page_bits net_pl = obj.max_prefixlen - obj.page_bits if pages > 1 else obj.prefixlen network = ipaddress.ip_network(u'{0}/{1}'.format(net_ip, net_pl)) # This method used to return _BaseNetwork.hosts() generator, # however the default implementation would skip network and # broadcast addresses, which made the final IP pool short of # two IP addresses. As of AC-3531 fix, now it returns # the _BaseNetwork.__iter__() instead, which does not skip any addresses. return iter(network)
def to_dict(self, include=None, exclude=None, page=None): free_hosts_and_busy = self.free_hosts_and_busy(page=page) pages = ip_network(self.network).pages() page = _page(page, pages) data = dict( id=self.network, network=self.network, ipv6=self.ipv6, free_hosts=self.free_hosts(page=page), blocked_list=list(self.get_blocked_set()), node=None if self.node is None else self.node.hostname, allocation=free_hosts_and_busy, page=page, pages=pages, ) return data
def init(): config = read_config_ini('/run/flannel/subnet.env') config_network = config['flannel_network'] config_subnet = config['flannel_subnet'] _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net') _update_ipset('kuberdock_nodes', []) _update_ipset('kuberdock_cluster', ['kuberdock_flannel', 'kuberdock_nodes'], set_type='list:set') network = ipaddress.ip_network(unicode(config_network)) subnet = ipaddress.ip_network(unicode(config_subnet), strict=False) etcd = ETCD() for user in etcd.users(): for pod in etcd.pods(user): pod_ip = ipaddress.ip_address(pod) if pod_ip not in network or pod_ip in subnet: etcd.delete_user(user, pod) update_ipset()
def _ip_is_system_blacklisted(self, addr): for net in self.env.get_config('apiserver.whitelisted_ips'): try: net = ip_network(net, strict=False) except ValueError: continue if addr in net: return False for net in self.env.get_config('apiserver.blacklisted_ips'): try: net = ip_network(net, strict=False) except ValueError: continue if addr in net: return True return False
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assert_(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEquals(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def process_acquire(self, xfrm_acquire, xfrm_tmpl): small_tsi = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.saddr.to_ipaddr()), xfrm_acquire.sel.sport, xfrm_acquire.sel.proto) small_tsr = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.daddr.to_ipaddr()), xfrm_acquire.sel.dport, xfrm_acquire.sel.proto) acquire = Acquire(small_tsi, small_tsr, xfrm_acquire.policy.index) request = None if self.state == IkeSa.State.INITIAL: request = self.generate_ike_sa_init_request(acquire) elif self.state == IkeSa.State.ESTABLISHED: request = self.generate_create_child_sa_request(acquire) else: logging.warning('Cannot process acquire while waiting for a response.') # TODO: Use a request queue for simplicity and to avoid problems with # state machine if request: request_data = request.to_bytes() self.log_message(request, self.peer_addr, request_data, send=True) return request_data else: return None
def create_policies(self, my_addr, peer_addr, ike_conf): for ipsec_conf in ike_conf['protect']: if ipsec_conf['mode'] == Mode.TUNNEL: src_selector = ipsec_conf['my_subnet'] dst_selector = ipsec_conf['peer_subnet'] else: src_selector = ip_network(my_addr) dst_selector = ip_network(peer_addr) # generate an index for outbound policies index = SystemRandom().randint(0, 10000) << 2 | XFRM_POLICY_OUT ipsec_conf['index'] = index self._create_policy(src_selector, dst_selector, ipsec_conf['my_port'], ipsec_conf['peer_port'], ipsec_conf['ip_proto'], XFRM_POLICY_OUT, ipsec_conf['ipsec_proto'], ipsec_conf['mode'], my_addr, peer_addr, index=index) self._create_policy(dst_selector, src_selector, ipsec_conf['peer_port'], ipsec_conf['my_port'], ipsec_conf['ip_proto'], XFRM_POLICY_IN, ipsec_conf['ipsec_proto'], ipsec_conf['mode'], peer_addr, my_addr) self._create_policy(dst_selector, src_selector, ipsec_conf['peer_port'], ipsec_conf['my_port'], ipsec_conf['ip_proto'], XFRM_POLICY_FWD, ipsec_conf['ipsec_proto'], ipsec_conf['mode'], peer_addr, my_addr)
def web_data_admin(self): """Return dict used in admin/DC web templates""" return { 'name': self.name, 'alias': self.alias, 'access': self.access, 'owner': self.owner.username, 'desc': self.desc, 'ip_network': str(self.ip_network), 'network': self.network, 'netmask': self.netmask, 'gateway': self.gateway, 'nic_tag': self.nic_tag, 'nic_tag_type': self.nic_tag_type, 'vlan_id': self.vlan_id, 'vxlan_id': self.vxlan_id, 'mtu': self.mtu, 'resolvers': self.get_resolvers(), 'dns_domain': self.dns_domain, 'ptr_domain': self.ptr_domain, 'dc_bound': self.dc_bound_bool, 'dhcp_passthrough': self.dhcp_passthrough, }
def test_incompatible_versions(self): # These should always raise TypeError v4addr = ipaddress.ip_address('1.1.1.1') v4net = ipaddress.ip_network('1.1.1.1') v6addr = ipaddress.ip_address('::1') v6net = ipaddress.ip_address('::1') self.assertRaises(TypeError, v4addr.__lt__, v6addr) self.assertRaises(TypeError, v4addr.__gt__, v6addr) self.assertRaises(TypeError, v4net.__lt__, v6net) self.assertRaises(TypeError, v4net.__gt__, v6net) self.assertRaises(TypeError, v6addr.__lt__, v4addr) self.assertRaises(TypeError, v6addr.__gt__, v4addr) self.assertRaises(TypeError, v6net.__lt__, v4net) self.assertRaises(TypeError, v6net.__gt__, v4net)
def testIpFromInt(self): self.assertEqual(self.ipv4_interface._ip, ipaddress.IPv4Interface(16909060)._ip) ipv4 = ipaddress.ip_network('1.2.3.4') ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1') self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address))) self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address))) v6_int = 42540616829182469433547762482097946625 self.assertEqual(self.ipv6_interface._ip, ipaddress.IPv6Interface(v6_int)._ip) self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version, 4) self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version, 6)
def testHash(self): self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')), hash(ipaddress.ip_interface('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')), hash(ipaddress.ip_network('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')), hash(ipaddress.ip_address('10.1.1.0'))) # i70 self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')), hash(ipaddress.ip_address( int(ipaddress.ip_address('1.2.3.4')._ip)))) ip1 = ipaddress.ip_address('10.1.1.0') ip2 = ipaddress.ip_address('1::') dummy = {} dummy[self.ipv4_address] = None dummy[self.ipv6_address] = None dummy[ip1] = None dummy[ip2] = None self.assertTrue(self.ipv4_address in dummy) self.assertTrue(ip2 in dummy)
def testInetRoundtrip(self): try: import ipaddress v = ipaddress.ip_network('192.168.0.0/28') self.cursor.execute("select %s as f1", (v,)) retval = self.cursor.fetchall() self.assertEqual(retval[0][0], v) v = ipaddress.ip_address('192.168.0.1') self.cursor.execute("select %s as f1", (v,)) retval = self.cursor.fetchall() self.assertEqual(retval[0][0], v) except ImportError: for v in ('192.168.100.128/25', '192.168.0.1'): self.cursor.execute( "select cast(cast(%s as varchar) as inet) as f1", (v,)) retval = self.cursor.fetchall() self.assertEqual(retval[0][0], v)
def find_cidr(ip): """Finds most specific CIDR in Networks""" # Check for non-ip, try DNS if not re.search(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip): try: ip = socket.gethostbyname(ip) except socket.gaierror: raise Exception("Hostname Lookup Failure on: " + ip) # Always start with default route mostSpecific = "0.0.0.0/0" # All networks networks = nglib.py2neo_ses.cypher.execute('MATCH (n:Network) RETURN n.cidr as cidr') if len(networks) > 1: for r in networks.records: if ipaddress.ip_address(ip) in ipaddress.ip_network(r.cidr): if nglib.verbose > 1: print("find_cidr", ip + " in " + r.cidr) mostSpecific = compare_cidr(mostSpecific, r.cidr) return mostSpecific
def test_incompatible_versions(self): # These should always raise TypeError v4addr = ipaddress.ip_address('1.1.1.1') v4net = ipaddress.ip_network('1.1.1.1') v6addr = ipaddress.ip_address('::1') v6net = ipaddress.ip_network('::1') self.assertRaises(TypeError, v4addr.__lt__, v6addr) self.assertRaises(TypeError, v4addr.__gt__, v6addr) self.assertRaises(TypeError, v4net.__lt__, v6net) self.assertRaises(TypeError, v4net.__gt__, v6net) self.assertRaises(TypeError, v6addr.__lt__, v4addr) self.assertRaises(TypeError, v6addr.__gt__, v4addr) self.assertRaises(TypeError, v6net.__lt__, v4net) self.assertRaises(TypeError, v6net.__gt__, v4net)
def testHash(self): self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')), hash(ipaddress.ip_interface('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')), hash(ipaddress.ip_network('10.1.1.0/24'))) self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')), hash(ipaddress.ip_address('10.1.1.0'))) # i70 self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')), hash(ipaddress.ip_address( int(ipaddress.ip_address('1.2.3.4')._ip)))) ip1 = ipaddress.ip_address('10.1.1.0') ip2 = ipaddress.ip_address('1::') dummy = {} dummy[self.ipv4_address] = None dummy[self.ipv6_address] = None dummy[ip1] = None dummy[ip2] = None self.assertIn(self.ipv4_address, dummy) self.assertIn(ip2, dummy)
def __init__(self, onosIps, num=1, numBgpSpeakers=1, asNum=65000, externalOnos=True, peerIntfConfig=None, withFpm=False): super(SdnAutonomousSystem, self).__init__(asNum, numBgpSpeakers) self.onosIps = onosIps self.num = num self.numBgpSpeakers = numBgpSpeakers self.peerIntfConfig = peerIntfConfig self.withFpm = withFpm self.externalOnos= externalOnos self.internalPeeringSubnet = ip_network(u'1.1.1.0/24') for router in self.routers.values(): # Add iBGP sessions to ONOS nodes for onosIp in onosIps: router.neighbors.append({'address':onosIp, 'as':asNum, 'port':2000}) # Add iBGP sessions to other BGP speakers for i, router2 in self.routers.items(): if router == router2: continue cpIpBase = self.num*10 ip = AutonomousSystem.getIthAddress(self.internalPeeringSubnet, cpIpBase+i) router.neighbors.append({'address':ip.ip, 'as':asNum})
def generateRoutes(baseRange, numRoutes, subnetSize=None): baseNetwork = ip_network(baseRange) # We need to get at least 2 addresses out of each subnet, so the biggest # prefix length we can have is /30 maxPrefixLength = baseNetwork.max_prefixlen - 2 if subnetSize is not None: return list(baseNetwork.subnets(new_prefix=subnetSize)) trySubnetSize = baseNetwork.prefixlen + 1 while trySubnetSize <= maxPrefixLength and \ len(list(baseNetwork.subnets(new_prefix=trySubnetSize))) < numRoutes: trySubnetSize += 1 if trySubnetSize > maxPrefixLength: raise Exception("Can't get enough routes from input parameters") return list(baseNetwork.subnets(new_prefix=trySubnetSize))[:numRoutes]
def __init__(self, file_path): self._ipv4_lpm_dict = LpmDict() for ip in EXCLUDE_IPV4_PREFIXES: self._ipv4_lpm_dict[ip] = self.NextHop() self._ipv6_lpm_dict = LpmDict(ipv4=False) for ip in EXCLUDE_IPV6_PREFIXES: self._ipv6_lpm_dict[ip] = self.NextHop() # filter out empty lines and lines starting with '#' pattern = re.compile("^#.*$|^[ \t]*$") with open(file_path, 'r') as f: for line in f.readlines(): if pattern.match(line): continue entry = line.split(' ', 1) prefix = ip_network(unicode(entry[0])) next_hop = self.NextHop(entry[1]) if prefix.version is 4: self._ipv4_lpm_dict[str(prefix)] = next_hop elif prefix.version is 6: self._ipv6_lpm_dict[str(prefix)] = next_hop
def parse_prefix_range(expr=None): pattern = r'^(?P<prefix>((\d+(\.\d+){3})|([0-9a-fA-F:]{2,40}))/\d+)' \ r'(\^(?P<ge>\d+)-(?P<le>\d+))?$' match = re.match(pattern, expr) if match: prefix = ipaddress.ip_network(unicode(match.group("prefix"))) afi = "ipv%d" % prefix.version entry = {"prefix": prefix} try: entry["greater-equal"] = int(match.group("ge")) entry["less-equal"] = int(match.group("le")) except (IndexError, TypeError): pass else: raise ValueError("no match found") return {afi: [entry]}
def _ipnet(net_str): """ This function is used for argparse module to verify validness of entered network. Valid networks are IPv4 networks in form A.B.C.D/prefix :param net_str: string that represents IPv4 net. Example A.B.C.D/prefix. :return: ipaddress.ip_network object or raises exception on fail """ try: the_net = ipaddress.ip_network(net_str) except ValueError: msg = '{} is not a valid IP network address'.format(net_str) raise argparse.ArgumentTypeError(msg) if the_net.version != Utilities.IPV4: msg = 'Only IPv4 addresses are supported' raise argparse.ArgumentTypeError(msg) return the_net
def test_arp_ping(self): """test ARP ping - compare to arping utilite""" arp_ping = Ping(IFACE, ARP_NAME, TIMEOUT, False) for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]: try: # need arping installed with os.popen('arping -c {} -t {} {}'.format(COUNT, TIMEOUT, str(ip)), 'r'): # get exit code ec = os.wait()[1] & 0xFF00 res = arp_ping.ping_host(str(ip)) except PermissionException: print('Need root previlegies') if res[STATUS_INDEX] == ONLINE: self.assertTrue(ec == 0) else: self.assertFalse(ec == 0)
def test_icmp_ping(self): """test icmp ping - compare to icmping utilite""" icmp_ping = Ping(IFACE, ICMP_NAME, TIMEOUT, False) for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]: try: # need arping installed with os.popen('ping -c {} -t {} {}'.format(COUNT, TIMEOUT, str(ip)), 'r'): # get exit code ec = os.wait()[1] & 0xFF00 res = icmp_ping.ping_host(str(ip)) except PermissionException: print('Need root previlegies') if res[STATUS_INDEX] == ONLINE: self.assertTrue(ec == 0) else: self.assertFalse(ec == 0)
def edit(request): if 'object_id' in request.GET: server = Query({'object_id': request.GET['object_id']}).get() else: servertype = Servertype.objects.get(pk=request.POST['attr_servertype']) project = Project.objects.get(pk=request.POST['attr_project']) hostname = request.POST['attr_hostname'] if servertype.ip_addr_type == 'null': intern_ip = None elif servertype.ip_addr_type == 'network': intern_ip = ip_network(request.POST['attr_intern_ip']) else: intern_ip = ip_interface(request.POST['attr_intern_ip']) server = ServerObject.new(servertype, project, hostname, intern_ip) return _edit(request, server, True)
def scan_network(self): logger.info('scanning Network') network = self._config_reader.get_config_section("Networking")['network'] netmask = self._config_reader.get_config_section("Networking")['netmask'] my_net = ipaddress.ip_network(network+'/'+netmask) host_list = dict() s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(2.0) for ip in my_net: try: # print(ip) host = self._generate_host(ip) if host is not None: host_list[ip] = host except socket.herror as ex: pass return host_list
def setUp(self): super().setUp() self.cluster.reset_hba() create_script = [] create_script.append('CREATE ROLE ssl_user WITH LOGIN;') self.cluster.add_hba_entry( type='hostssl', address=ipaddress.ip_network('127.0.0.0/24'), database='postgres', user='ssl_user', auth_method='trust') self.cluster.add_hba_entry( type='hostssl', address=ipaddress.ip_network('::1/128'), database='postgres', user='ssl_user', auth_method='trust') # Put hba changes into effect self.cluster.reload() create_script = '\n'.join(create_script) self.loop.run_until_complete(self.con.execute(create_script))
def __init__(self, value): """Initialise new IPPrefix instance.""" try: obj = ipaddress.ip_address(value) except Exception: try: obj = ipaddress.ip_network(value, strict=False) except Exception: raise if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]: self.prefix = None self.type = self.HOST elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]: self.prefix = obj.network_address self.type = self.PREFIX self.version = obj.version self.txt = obj.compressed
def _ip_ranges (self, h): if '/' in h: if h.startswith('gateway/web/freenode/ip.'): h = h.replace('gateway/web/freenode/ip.','') return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')] return [h] if utils.net.isIPV4(h): return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')] if utils.net.bruteIsIPV6(h): r = [] r.append(ipaddress.ip_network(u'%s/120' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/118' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/116' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/114' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/112' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/110' % h, False).with_prefixlen.encode('utf-8')) r.append(ipaddress.ip_network(u'%s/64' % h, False).with_prefixlen.encode('utf-8')) return r return [h]
def is_authorized(ip, authorized_ips): ip = ip_address(ip) for item in authorized_ips: try: if ip == ip_address(item): return True except ValueError: try: if ip in ip_network(item): return True except ValueError: logger.warn('The "authorized_ip" list (settings.HEARTBEAT)' 'contains an item that is neither an ip address ' 'nor an ip network: {}'.format(item))
def check_access(clientip): global BANNED_IPS if clientip in BANNED_IPS: return False if not ALLOWED_NETWORKS: return True for net in ALLOWED_NETWORKS: ipobject = ipaddress.ip_address(clientip) if ipobject in ipaddress.ip_network(net): return True BANNED_IPS.append(clientip) return False
def public_interface(self): """ Check that all minions have an address on the public network """ for node in self.data.keys(): if ('roles' in self.data[node] and 'master' in self.data[node]['roles']): continue found = False public_network = self.data[node].get("public_network", "") net_list = Util.parse_list_from_string(public_network) for address in self.grains[node]['ipv4']: try: for network in net_list: addr = ipaddress.ip_address(u'{}'.format(address)) net = ipaddress.ip_network(u'{}'.format(network)) if addr in net: found = True except ValueError: # Don't care about reporting a ValueError here if # public_network is malformed, because the # previous validation in public_network() will do that. pass if not found: msg = "minion {} missing address on public network {}".format(node, public_network) self.errors.setdefault('public_interface', []).append(msg) self._set_pass_status('public_network')
def cluster_interface(self): """ Check that storage nodes have an interface on the cluster network """ # pylint: disable=too-many-nested-blocks for node in self.data.keys(): if ('roles' in self.data[node] and 'storage' in self.data[node]['roles']): found = False cluster_network = self.data[node].get("cluster_network", "") net_list = Util.parse_list_from_string(cluster_network) for address in self.grains[node]['ipv4']: try: for network in net_list: addr = ipaddress.ip_address(u'{}'.format(address)) net = ipaddress.ip_network(u'{}'.format(network)) if addr in net: found = True except ValueError: # Don't care about reporting a ValueError here if # cluster_network is malformed, because the # previous validation in cluster_network() will do that. pass if not found: msg = "minion {}".format(node) msg += " missing address on cluster network {}".format(cluster_network) self.errors.setdefault('cluster_interface', []).append(msg) self._set_pass_status('cluster_interface')
def address(): """ Find the public address for a minion """ if 'public_network' not in __pillar__: return "" log.debug("pillar: {}".format(type(__pillar__['public_network']))) if isinstance(__pillar__['public_network'], str): networks = re.split(', *', __pillar__['public_network']) else: networks = __pillar__['public_network'] log.debug("networks: {}".format(pprint.pformat(networks))) for public_network in networks: log.info('public_network: {}'.format(public_network)) interfaces = __salt__['network.interfaces']() log.debug("interfaces: {}".format(pprint.pformat(interfaces))) for interface in interfaces: log.info("interface: {}".format(pprint.pformat(interface))) if 'inet' in interfaces[interface]: for entry in interfaces[interface]['inet']: _address = entry['address'] log.info("Checking address {}".format(_address)) if (ipaddress.ip_address(u'{}'.format(_address)) in ipaddress.ip_network(u'{}'.format(public_network))): return _address return ""
def validate_ip_network(network, strict=True): ''' If network is a valid IP network when parsed according to strict, return it as an ipnetwork object. Otherwise, return None. ''' try: return ipaddress.ip_network(unicode(address), strict=strict) except ValueError: return None
def get_networks(): network_tokens = os.environ.get('TALISKER_NETWORKS', '').split() networks = [ip_network(force_unicode(n)) for n in network_tokens] if networks: logger = logging.getLogger(__name__) logger.info('configured TALISKER_NETWORKS', extra={'networks': [str(n) for n in networks]}) return networks
def cast_network(s, cur=None): if s is None: return None return ipaddress.ip_network(str(s))