我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用ipaddress.ip_address()。
def __init__(self): self.typeclass = None self.strict_level = None self.header_list = None self.value_list = None self.__table_writer = ptw.RstSimpleTableWriter() self.__table_writer._dp_extractor.type_value_mapping = { NullString(None).typecode: '``""``', NoneType(None).typecode: "``None``", Infinity(None).typecode: '``Decimal("inf")``', Nan(None).typecode: '``Decimal("nan")``', } self.__table_writer._dp_extractor.const_value_mapping = { True: "``True``", False: "``False``", '``"127.0.0.1"``': '``ip_address("127.0.0.1")``', }
def make_hosts(region, cell): # no of hosts need to match ip_address available no_of_hosts = 2 cab1 = region + "." + cell + "." + "C-1" cab2 = region + "." + cell + "." + "C-2" hosts = [] for host in range(no_of_hosts): hostname = "host%s.%s.example1.com" % (host, cab1) hosts.append(hostname) for host in range(no_of_hosts): hostname = "host%s.%s.example2.com" % (host, cab2) hosts.append(hostname) return hosts
def create_netdevice(self, name, device_type): network_devices_url = self.url + "/network-devices" payload = {"name": name, "model_name": "model-x", "os_version": "version-1", "device_type": device_type, "ip_address": "10.10.1.1", "active": True, "cloud_id": self.cloud.get("id"), "region_id": self.region.get("id"), "cell_id": self.cell.get("id")} resp = requests.post(network_devices_url, headers=self.headers, data=json.dumps(payload), verify=False) if resp.status_code != 201: raise Exception(resp.text) return resp.json()
def get_random_load_nonentry(): ''' Return a random item that probably isn't in match_func_result['load']. ''' match_type = sys.argv[1] if match_type == 'ipasn': # Yes, we could do IPv6 here. But the type of the list doesn't matter: # a random IPv4 might not be in an IPv4 list, and it won't be in an # IPv6 list random_32_bit = random.randint(0, 2**32 - 1) ip = ipaddress.ip_address(random_32_bit) return ip else: char_list = list(get_random_load_entry()) random.shuffle(char_list) return "".join(char_list) # try to make sure that other processes don't warp the results too much
def validate_ip_network_address_prefix(address, prefix, strict=True): ''' If address/prefix is a valid IP network when parsed according to strict, return it as an ipnetwork object. Otherwise, return None. ''' ip_address = validate_ip_address(address) if ip_address is None or ip_address.version not in [4,6]: return None try: if ip_address.version == 4: return ipaddress.IPv4Network((ip_address, prefix), strict=strict) else: return ipaddress.IPv6Network((ip_address, prefix), strict=strict) except ValueError: return None
def private(f): """Only allow approved source addresses.""" @functools.wraps(f) def wrapper(self, request): if not request.access_route: # this means something probably bugged in werkzeug, but let's fail # gracefully return Response('no client ip provided', status='403') ip_str = request.access_route[-1] if isinstance(ip_str, bytes): ip_str = ip_str.decode('utf8') ip = ip_address(ip_str) if ip.is_loopback or any(ip in network for network in get_networks()): return f(self, request) else: msg = PRIVATE_BODY_RESPONSE_TEMPLATE.format( ip_str, force_unicode(request.remote_addr), request.headers.get('x-forwarded-for')) return Response(msg, status='403') return wrapper
def set_unset(self, interface_id, attribute, address=None): """ Set attribute to True and unset the same attribute for all other interfaces. This is used for interface options that can only be set on one engine interface. """ for interface in self: for sub_interface in interface.sub_interfaces(): # Skip VLAN only interfaces (no addresses) if not isinstance(sub_interface, PhysicalVlanInterface): if getattr(sub_interface, attribute) is not None: if sub_interface.nicid == str(interface_id): if address is not None: # Find IP on Node Interface if ipaddress.ip_address(bytes_to_unicode(address)) in \ ipaddress.ip_network(sub_interface.network_value): setattr(sub_interface, attribute, True) else: setattr(sub_interface, attribute, False) else: setattr(sub_interface, attribute, True) else: #unset setattr(sub_interface, attribute, False)
def add_service(self, zeroconf, type, name): """This gets called for services discovered. Example: Service SupercomputerInABriefcase on tom-xps._sciabc._tcp.local. added, service info: ServiceInfo( type='_sciabc._tcp.local.', name='SupercomputerInABriefcase on tom-xps._sciabc._tcp.local.', address=b'\n\xb7\xcd\xb6', port=34343, weight=0, priority=0, server='tom-xps.local.', properties={} ) """ info = zeroconf.get_service_info(type, name) print("Service %s added, service info: %s" % (name, info)) ip = ipaddress.ip_address(info.address) append_ip_to_nodes_file(str(ip))
def validate_ip(ctx, param, value): try: ipaddress.ip_address(value) return value except ValueError as ex: raise click.BadParameter("Invalid IP: %s" % ex)
def add_ban(self, ip, reason, duration, name=None): """ Ban an ip with an optional reason and duration in minutes. If duration is None, ban is permanent. """ network = ip_network(text_type(ip), strict=False) for connection in list(self.connections.values()): if ip_address(connection.address[0]) in network: name = connection.name connection.kick(silent=True) if duration: duration = reactor.seconds() + duration * 60 else: duration = None self.bans[ip] = (name or '(unknown)', reason, duration) self.save_bans()
def sort_ip(ip, version="4"): """Sort the list by IP address.""" list_length = len(ip) # Length 10 is list -l, 5 is list if list_length == 10: try: ip = ip[7] if version == "4" else ip[8] _ip = str(ipaddress.ip_address(ip.rsplit("|")[1])) except (ValueError, IndexError): # Lame hack to have "-" last. _ip = "Z" elif list_length == 6: try: _ip = str(ipaddress.ip_address(ip[5])) except ValueError: # Lame hack to have "-" last. _ip = "Z" else: _ip = ip return _ip
def test_getnextpdu(self): get_pdu = GetNextPDU( header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), oids=( ObjectIdentifier(10, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7)), ) ) encoded = get_pdu.encode() response = get_pdu.make_response(self.lut) print(response) n = len(response.values) value0 = response.values[0] self.assertEqual(value0.type_, ValueType.IP_ADDRESS) self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
def test_getnextpdu_exactmatch(self): oid = ObjectIdentifier(14, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7, 0, 0, 0, 0)) get_pdu = GetNextPDU( header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), oids=[oid] ) encoded = get_pdu.encode() response = get_pdu.make_response(self.lut) print(response) n = len(response.values) value0 = response.values[0] self.assertEqual(value0.type_, ValueType.IP_ADDRESS) print("test_getnextpdu_exactmatch: ", str(oid)) self.assertEqual(str(value0.name), str(oid)) self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
def test_getnextpdu(self): get_pdu = GetNextPDU( header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), oids=( ObjectIdentifier(21, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0)), ) ) encoded = get_pdu.encode() response = get_pdu.make_response(self.lut) print(response) n = len(response.values) value0 = response.values[0] self.assertEqual(value0.type_, ValueType.IP_ADDRESS) self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
def test_getnextpdu_exactmatch(self): oid = ObjectIdentifier(24, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 17)) get_pdu = GetNextPDU( header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), oids=[oid] ) encoded = get_pdu.encode() response = get_pdu.make_response(self.lut) print(response) n = len(response.values) value0 = response.values[0] self.assertEqual(value0.type_, ValueType.IP_ADDRESS) print("test_getnextpdu_exactmatch: ", str(oid)) self.assertEqual(str(value0.name), str(oid)) self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
def reinit_data(self): """ Subclass update loopback information """ self.loips = {} self.db_conn.connect(mibs.APPL_DB) loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*") if not loopbacks: return # collect only ipv4 interfaces for loopback in loopbacks: lostr = loopback.decode() loip = lostr[len("INTF_TABLE:lo:"):] ipa = ipaddress.ip_address(loip) if isinstance(ipa, ipaddress.IPv4Address): self.loips[loip] = ipa
def perform_create(self, serializer, remote_ip): captcha = ( ipaddress.ip_address(remote_ip) not in ipaddress.IPv6Network(os.environ['DESECSTACK_IPV6_SUBNET']) and ( User.objects.filter( created__gte=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS), registration_remote_ip=remote_ip ).count() >= settings.ABUSE_BY_REMOTE_IP_LIMIT or User.objects.filter( created__gte=timezone.now() - timedelta(hours=settings.ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS), email__endswith=serializer.validated_data['email'].split('@')[-1] ).count() >= settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT ) ) user = serializer.save(registration_remote_ip=remote_ip, captcha_required=captcha) if user.captcha_required: send_account_lock_email(self.request, user) elif not user.dyn: context = {'token': user.get_token()} send_token_email(context, user) signals.user_registered.send(sender=self.__class__, user=user, request=self.request)
def get_location_from_geolite2(pool, ip_addr): try: ip_addr = ipaddress.ip_address(ip_addr) # fix issues with asyncpg not handling ip address netmask defaults # correctly ip_addr = "{}/{}".format(ip_addr, 32 if ip_addr.version == 4 else 128) async with pool.acquire() as con: row = await con.fetchrow( "SELECT cs.country_iso_code FROM geolite2_ip_addresses ips " "JOIN geolite2_countries cs ON ips.geoname_id = cs.geoname_id " "WHERE ips.network >> $1", ip_addr) return row['country_iso_code'] if row else None except ValueError: return None except asyncpg.exceptions.UndefinedTableError: log.warning("Missing GeoLite2 database tables") return None
def test_scandata_parsing(self): '''Tests parsing of the basic surface scan''' scan = ndr.NmapScan() scan.parse_nmap_xml(load_nmap_xml_data(TEST_SURFACE_SCAN_DATA)) self.assertEqual(5, len(scan)) # Got three hosts, find a host by MAC address host = scan.find_by_mac('84:39:BE:64:3F:E5')[0] # SHould only find one IP address self.assertNotEqual(None, host) self.assertTrue(host.addr, ipaddress.ip_address("192.168.2.100")) self.assertEqual(host.reason, ndr.NmapReasons.ARP_RESPONSE) # Confirm that we're not finding phantom hosts self.assertEqual(scan.find_by_mac('NOT_A_REAL_MAC'), None)
def test_ipv6_linklocal_parsing(self): '''Tests the results of the IPv6 link-local scan''' scan = ndr.NmapScan() scan.parse_nmap_xml(load_nmap_xml_data(TEST_V6_LINK_LOCAL_SCAN_DATA)) self.assertEqual(3, len(scan)) # Got three hosts, find a host by MAC address host_list = scan.find_by_mac('84:39:BE:64:3F:E5') # SHould only find one IP address self.assertEqual(len(host_list), 1) host = host_list[0] self.assertTrue(host.reason, ndr.NmapReasons.ND_RESPONSE) self.assertEqual(host.addr, ipaddress.ip_address("fe80::8639:beff:fe64:3fe5"))
def from_dict(self, config_dict): '''Load settings from dictionary''' # Load the easy objects first if config_dict.get('version', None) != 1: raise ValueError("Unknown NDR NMAP config file version!") # Clean out the IP lists self.mac_address_config = {} self.ip_address_config = {} machine_ips = config_dict.get('machine_ips', dict()) machine_macs = config_dict.get('machine_macs', dict()) # Load in the machine IP addresses for ip_addr, value in machine_ips.items(): self.ip_address_config[ipaddress.ip_address(ip_addr)] = NmapScanMode(value) # Now do it again with the MAC addresses. When we load in MACs, make them all # upper case to be consistent with NmapHosts for mac_addr, value in machine_macs.items(): self.mac_address_config[mac_addr.upper()] = NmapScanMode(value)
def build_nmap_commandline(self, base_flags, address, interface=None): '''Builds common NMAP option command lines''' ipaddr = ipaddress.ip_address(address) # Several bits of magic are required here # 1. If we're v6 address or range, we need -6 # 2. If we're link-local, we need to specify the interface options = base_flags if ipaddr.version == 6: options = "-6 " + options if ipaddr.is_link_local: options = "-e " + interface + " " + options return options
def from_dict(self, msg_dict): '''Converts the entry back to dict format''' self.timestamp = msg_dict['timestamp'] self.proto = ndr.PortProtocols(msg_dict['proto']) self.src = ipaddress.ip_address(msg_dict['src']) if msg_dict['srcport'] is not None: self.srcport = int(msg_dict['srcport']) self.dst = ipaddress.ip_address(msg_dict['dst']) if msg_dict['dstport'] is not None: self.dstport = int(msg_dict['dstport']) self.ethsrc = msg_dict['ethsrc'] self.ethdst = msg_dict['ethdst'] self.ethlen = int(msg_dict['ethlen']) self.tcpflags = msg_dict['tcpflags'] if 'tcpseq' is not None: self.tcpseq = msg_dict['tcpseq'] else: self.tcpseq = None
def valid_mappable_ipv4(address): try: address_uni = unicode(address, errors='ignore') except TypeError: address_uni = address if address_uni in NON_MAPPABLE: return False try: parsed = ipaddress.ip_address(address_uni) except ValueError: log.debug('invalid IPv4 address', input=address) return False if parsed is not None and parsed.version == 4: return True else: return False
def setUp(self): network = u'192.168.1.0/30' self.node = self.fixtures.node() IpAddrPool().create( {'network': network, 'autoblock': '192.168.1.1', 'node': self.node.id}) self.ippool = IPPool.query.get(network) self.pod = self.fixtures.pod(owner_id=self.user.id) self.pod_ip = PodIP( pod_id=self.pod.id, network=self.ippool.network, ip_address=int(ipaddress.ip_address(u'192.168.1.2')), ) self.db.session.add(self.pod_ip) self.db.session.add(self.node) self.db.session.commit() self.stubs = K8SAPIStubs() self.stubs.node_info_in_k8s_api(self.node.hostname) self.stubs.node_info_update_in_k8s_api(self.node.hostname)
def setUp(self): from kubedock.pods.models import PodIP, IPPool self.ip = ipaddress.ip_address(u'192.168.43.4') self.with_ip_conf = { 'public_ip': unicode(self.ip), 'containers': [ {'ports': [{'isPublic': True}, {}, {'isPublic': False}]}, {'ports': [{'isPublic': True}, {}, {'isPublic': False}]}, ], } self.without_ip_conf = { 'public_ip_before_freed': unicode(self.ip), 'containers': [ {'ports': [{'isPublic_before_freed': True}, {'isPublic_before_freed': None}, {'isPublic_before_freed': False}]}, {'ports': [{'isPublic_before_freed': True}, {'isPublic_before_freed': None}, {'isPublic_before_freed': False}]}, ], } self.pod = self.fixtures.pod(config=json.dumps(self.with_ip_conf)) self.ippool = IPPool(network='192.168.43.0/29').save() self.podip = PodIP(pod_id=self.pod.id, network=self.ippool.network, ip_address=int(self.ip)).save()
def free_hosts_and_busy(self, as_int=None, page=None): pods = PodIP.filter_by(network=self.network) allocated_ips = {int(pod): pod.get_pod() for pod in pods} blocked_ips = self.get_blocked_set(as_int=True) hosts = self.hosts(as_int=True, page=page) def get_ip_state(ip, pod): if pod: return 'busy' if ip in blocked_ips: return 'blocked' return 'free' def get_ip_info(ip): pod = allocated_ips.get(ip) state = get_ip_state(ip, pod) return ip if as_int else str(ipaddress.ip_address(ip)), pod, state return [get_ip_info(ip) for ip in hosts]
def get_free_host(cls, as_int=None, node=None, ip=None): """Return free host if available. :param as_int: return ip as long int :param node: if set, get free host only for this node :param ip: if set, first try to check if this ip available, if not, then return any other available ip :return: ip address, as str or int, depends on as_int value """ if ip: network = cls.get_network_by_ip(ip) if network and network.is_ip_available(ip, node): return int(ipaddress.ip_address(ip)) if as_int else ip if node is None: networks = cls.all() else: networks = cls.filter(cls.node.has(hostname=node)) for n in networks: free_host = n.get_first_free_host(as_int=as_int) if free_host is not None: return free_host raise NoFreeIPs()
def init(): config = read_config_ini('/run/flannel/subnet.env') config_network = config['flannel_network'] config_subnet = config['flannel_subnet'] _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net') _update_ipset('kuberdock_nodes', []) _update_ipset('kuberdock_cluster', ['kuberdock_flannel', 'kuberdock_nodes'], set_type='list:set') network = ipaddress.ip_network(unicode(config_network)) subnet = ipaddress.ip_network(unicode(config_subnet), strict=False) etcd = ETCD() for user in etcd.users(): for pod in etcd.pods(user): pod_ip = ipaddress.ip_address(pod) if pod_ip not in network or pod_ip in subnet: etcd.delete_user(user, pod) update_ipset()
def get_local_bricks(volume: str) -> Result: """ Return all bricks that are being served locally in the volume volume: Name of the volume to get local bricks for """ try: vol_info = volume_info(volume) local_ip = get_local_ip() if local_ip.is_err(): return Err(local_ip.value) local_brick_list = [] for vol in vol_info: for brick in vol.bricks: if ip_address(brick.peer.hostname) == local_ip.value: local_brick_list.append(brick) return Ok(local_brick_list) except GlusterCmdOutputParseError: raise
def main(): ''' You are here. ''' parser = argparse.ArgumentParser() parser.add_argument('--bind-address', type=ipaddress.ip_address, default='::', help='IPv6 or IPv4 address to listen on') parser.add_argument('--bind-port', type=int, default=9196, help='Port to listen on') parser.add_argument('--bind-v6only', type=int, choices=[0, 1], help='If 1, prevent IPv6 sockets from accepting IPv4 connections; if 0, allow; if unspecified, use OS default') parser.add_argument('--thread-count', type=int, help='Number of request-handling threads to spawn') args = parser.parse_args() server = wsgiext.Server((args.bind_address, args.bind_port), wsgiext.SilentRequestHandler, args.thread_count, args.bind_v6only) server.set_app(exporter.wsgi_app) wsgi_thread = threading.Thread(target=functools.partial(server.serve_forever, 86400), name='wsgi') def handle_sigterm(signum, frame): server.shutdown() signal.signal(signal.SIGTERM, handle_sigterm) wsgi_thread.start() wsgi_thread.join() server.server_close()
def get_external_url_status(self, finalUrl): external_url = False if finalUrl.startswith('"http') or finalUrl.startswith('http'): try: ip_addr = finalUrl.split('/')[2] if ':' in ip_addr: ip_addr = ip_addr.split(':')[0] except Exception as e: print(e) ip_addr = 'none' private_ip = False try: ip_obj = ipaddress.ip_address(ip_addr) print(ip_obj) if ip_obj.is_private: private_ip = True except Exception as e: print(e) if not private_ip: external_url = True return external_url
def build_trace_req_header(oam_type, sil, remote_ip, remote_port): trace_req_header_values = TRACEREQHEADER() trace_req_header_values.oam_type = oam_type trace_req_header_values.sil = sil trace_req_header_values.port = int(remote_port) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((remote_ip, trace_req_header_values.port)) # print(s.getsockname()[0]) src_addr = ipaddress.ip_address(s.getsockname()[0]) if src_addr.version == 4: trace_req_header_values.ip_1 = 0x00000000 trace_req_header_values.ip_2 = 0x00000000 trace_req_header_values.ip_3 = 0x0000FFFF trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr)) elif src_addr.version == 6: int_addr6 = int(ipaddress.IPv6Address(src_addr)) trace_req_header_values.ip_1 = int_addr6 >> 96 trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF return trace_req_header_values
def _get_current_ip_version(self, ip): """ Get current IP address version :param ip: IP address :type ip: str :return int """ if '/' in ip: ip_parts = ip.split('/') ip = ip_parts[0] ip = ipaddress.ip_address(ip) return ip.version
def resolve(self, host, port, **kwargs): with stats.record_latency('fetcher DNS lookup', url=host): with stats.coroutine_state('fetcher DNS lookup'): addrs = await super().resolve(host, port, **kwargs) ret = [] for a in addrs: try: ip = ipaddress.ip_address(a.host) except ValueError: continue if not self._crawllocalhost and ip.is_localhost: continue if not self._crawlprivate and ip.is_private: continue ret.append(a) if len(addrs) != len(ret): LOGGER.info('threw out some ip addresses for %s', host) return ret
def is_authorized(ip, authorized_ips): ip = ip_address(ip) for item in authorized_ips: try: if ip == ip_address(item): return True except ValueError: try: if ip in ip_network(item): return True except ValueError: logger.warn('The "authorized_ip" list (settings.HEARTBEAT)' 'contains an item that is neither an ip address ' 'nor an ip network: {}'.format(item))
def _ipaddress_match(ipname, host_ip): """Exact matching of IP addresses. RFC 6125 explicitly doesn't define an algorithm for this (section 1.7.2 - "Out of Scope"). """ # OpenSSL may add a trailing newline to a subjectAltName's IP address # Divergence from upstream: ipaddress can't handle byte str ip = ipaddress.ip_address(_to_unicode(ipname).rstrip()) return ip == host_ip
def check_access(clientip): global BANNED_IPS if clientip in BANNED_IPS: return False if not ALLOWED_NETWORKS: return True for net in ALLOWED_NETWORKS: ipobject = ipaddress.ip_address(clientip) if ipobject in ipaddress.ip_network(net): return True BANNED_IPS.append(clientip) return False
def test_restricted(remote_address): addr = ipaddress.ip_address(remote_address) if args.restrict is not None: allowed = False for restrict_address in restricted_addresses: if test_address_pair(restrict_address, addr): allowed = True break return allowed else: return True
def parse_listen_addr(listen_addr): """ Parse an address of the form [ipaddress]:port into a tcp or tcp6 Twisted endpoint description string for use with ``twisted.internet.endpoints.serverFromString``. """ if ':' not in listen_addr: raise ValueError( "'%s' does not have the correct form for a listen address: " '[ipaddress]:port' % (listen_addr,)) host, port = listen_addr.rsplit(':', 1) # Validate the host if host == '': protocol = 'tcp' interface = None else: if host.startswith('[') and host.endswith(']'): # IPv6 wrapped in [] host = host[1:-1] ip_address = ipaddress.ip_address(_to_unicode(host)) protocol = 'tcp6' if ip_address.version == 6 else 'tcp' interface = str(ip_address) # Validate the port if not port.isdigit() or int(port) < 1 or int(port) > 65535: raise ValueError( "'%s' does not appear to be a valid port number" % (port,)) args = [protocol, port] kwargs = {'interface': interface} if interface is not None else {} return _create_tx_endpoints_string(args, kwargs)
def force_convert(self): import ipaddress try: return ipaddress.ip_address(six.text_type(self._value)) except ValueError: raise TypeConversionError( "failed to force_convert to dictionary: type={}".format( type(self._value)))
def public_interface(self): """ Check that all minions have an address on the public network """ for node in self.data.keys(): if ('roles' in self.data[node] and 'master' in self.data[node]['roles']): continue found = False public_network = self.data[node].get("public_network", "") net_list = Util.parse_list_from_string(public_network) for address in self.grains[node]['ipv4']: try: for network in net_list: addr = ipaddress.ip_address(u'{}'.format(address)) net = ipaddress.ip_network(u'{}'.format(network)) if addr in net: found = True except ValueError: # Don't care about reporting a ValueError here if # public_network is malformed, because the # previous validation in public_network() will do that. pass if not found: msg = "minion {} missing address on public network {}".format(node, public_network) self.errors.setdefault('public_interface', []).append(msg) self._set_pass_status('public_network')
def cluster_interface(self): """ Check that storage nodes have an interface on the cluster network """ # pylint: disable=too-many-nested-blocks for node in self.data.keys(): if ('roles' in self.data[node] and 'storage' in self.data[node]['roles']): found = False cluster_network = self.data[node].get("cluster_network", "") net_list = Util.parse_list_from_string(cluster_network) for address in self.grains[node]['ipv4']: try: for network in net_list: addr = ipaddress.ip_address(u'{}'.format(address)) net = ipaddress.ip_network(u'{}'.format(network)) if addr in net: found = True except ValueError: # Don't care about reporting a ValueError here if # cluster_network is malformed, because the # previous validation in cluster_network() will do that. pass if not found: msg = "minion {}".format(node) msg += " missing address on cluster network {}".format(cluster_network) self.errors.setdefault('cluster_interface', []).append(msg) self._set_pass_status('cluster_interface')
def address(): """ Find the public address for a minion """ if 'public_network' not in __pillar__: return "" log.debug("pillar: {}".format(type(__pillar__['public_network']))) if isinstance(__pillar__['public_network'], str): networks = re.split(', *', __pillar__['public_network']) else: networks = __pillar__['public_network'] log.debug("networks: {}".format(pprint.pformat(networks))) for public_network in networks: log.info('public_network: {}'.format(public_network)) interfaces = __salt__['network.interfaces']() log.debug("interfaces: {}".format(pprint.pformat(interfaces))) for interface in interfaces: log.info("interface: {}".format(pprint.pformat(interface))) if 'inet' in interfaces[interface]: for entry in interfaces[interface]['inet']: _address = entry['address'] log.info("Checking address {}".format(_address)) if (ipaddress.ip_address(u'{}'.format(_address)) in ipaddress.ip_network(u'{}'.format(public_network))): return _address return ""
def with_host(self, host): """Return a new URL with host replaced. Autoencode host if needed. Changing host for relative URLs is not allowed, use .join() instead. """ # N.B. doesn't cleanup query/fragment if not isinstance(host, str): raise TypeError("Invalid host type") if not self.is_absolute(): raise ValueError("host replacement is not allowed " "for relative URLs") if not host: raise ValueError("host removing is not allowed") try: ip = ip_address(host) except ValueError: host = host.encode('idna').decode('ascii') else: if ip.version == 6: host = '[' + host + ']' val = self._val return URL( self._val._replace(netloc=self._make_netloc(val.username, val.password, host, val.port)), encoded=True)
def _normalize_url_with_hostname(url): # type: (Text) -> Text # TODO: replace urlparse by more robust utf-8 handling code parsed = urlparse(url.encode('utf-8')) hostname = parsed.hostname.decode('utf-8') # type: ignore try: ipaddress.ip_address(hostname) hostname = socket.gethostbyaddr(hostname)[0].encode('utf-8') except ValueError: return url return parsed._replace(netloc=b'%s:%d' % (hostname, parsed.port)).geturl()
def generate_ip_addresses(self, num_ips, starting_ip): start_ip_address = ip_address(starting_ip) ips = [str(start_ip_address + i) for i in range(num_ips)] return ips