Python ipaddress 模块,IPv4Address() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv4Address()

项目:typepy    作者:thombashi    | 项目源码 | 文件源码
def exeute(self, method, value):
        str_convert_type = (
            six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address)

        try:
            result = getattr(
                self.typeclass(value, self.strict_level), method)()

            if method == "validate":
                result = "NOP [#f1]_"
            elif isinstance(result, str_convert_type):
                result = '``"{}"``'.format(result)
        except TypeError:
            result = "E [#f2]_"
        except typepy.TypeConversionError:
            result = "E [#f3]_"

        return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:validus    作者:shopnilsazal    | 项目源码 | 文件源码
def isipv4(value):
    """
    Return whether or not given value is an IP version 4.
    If the value is an IP version 4, this function returns ``True``, otherwise ``False``.

    Examples::

        >>> isipv4('127.0.0.1')
        True

        >>> isipv4('::1')
        False

    :param value: string to validate IP version 4
    """
    try:
        ip_addr = ipaddress.IPv4Address(value)
    except ipaddress.AddressValueError:
        return False
    return ip_addr.version == 4
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def ip(cls, value, raise_exception=False):
            """ IP address validation """
            if cls.network(value):
                value = NetTest.convert.string.cidr(value)
                if value.endswith('/32'):
                    value = value.split('/')[0]
                else:
                    return False
            if not isinstance(value, basestring):
                if raise_exception:
                    raise TypeError('Invalid type \'{}\''.format(type(value)))
                return False
            else:
                value = unicode(value)
            try:
                ipaddress.IPv4Address(value)
            except (ValueError, TypeError):
                if raise_exception:
                    raise
                return False
            return True
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:telepresence    作者:datawire    | 项目源码 | 文件源码
def test_covering_cidr(ips):
    """
    covering_cidr() gets the minimal CIDR that covers given IPs.

    In particular, that means any subnets should *not* cover all given IPs.
    """
    cidr = telepresence.vpn.covering_cidr(ips)
    assert isinstance(cidr, str)
    cidr = ipaddress.IPv4Network(cidr)
    assert cidr.prefixlen <= 24
    # All IPs in given CIDR:
    ips = [ipaddress.IPv4Address(i) for i in ips]
    assert all([ip in cidr for ip in ips])
    # Subnets do not contain all IPs if we're not in minimum 24 bit CIDR:
    if cidr.prefixlen < 24:
        for subnet in cidr.subnets():
            assert not all([ip in subnet for ip in ips])
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def reinit_data(self):
        """
        Subclass update loopback information
        """
        self.loips = {}

        self.db_conn.connect(mibs.APPL_DB)
        loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*")
        if not loopbacks:
            return

        # collect only ipv4 interfaces
        for loopback in loopbacks:
            lostr = loopback.decode()
            loip = lostr[len("INTF_TABLE:lo:"):]
            ipa = ipaddress.ip_address(loip)
            if isinstance(ipa, ipaddress.IPv4Address):
                self.loips[loip] = ipa
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def get_free_ip_list(self, net):
        # type: (oca.VirtualNetworkPool) -> list[str]
        """
        Returns the set of free IP addresses in the given network

        :param net: oca.VirtualNetworkPool
        :return: a set of IPv4 addresses
        """
        ip_list, used_ip_list = set(), set()

        for r in net.address_ranges:
            start_ip = int(IPv4Address(unicode(r.ip)))
            end_ip = start_ip + r.size

            for ip in range(start_ip, end_ip):
                ip_list.add(str(IPv4Address(ip)))
            for lease in r.leases:
                used_ip_list.add(lease.ip)

        free_ips = list(ip_list - used_ip_list)
        random.shuffle(free_ips)
        return free_ips
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def parse_autoblock(data):
        # type: (str) -> set
        def _parse(item):
            # Try to parse item as a single IP
            try:
                ipaddress.IPv4Address(item)
                return {item}
            except ipaddress.AddressValueError:
                pass

            # Try parse item as ip range: ip1-ip2
            try:
                first_ip, last_ip = [utils.ip2int(i) for i in item.split('-')]
                return {utils.int2ip(n) for n in range(first_ip, last_ip + 1)}
            except ValueError:
                raise APIError(
                    'Exclude IP\'s are expected to be in the form of '
                    '10.0.0.1,10.0.0.4 or 10.1.0.10-10.1.1.54 or both '
                    'comma-separated')

        ip_sets = (_parse(unicode(d)) for d in data.split(','))
        return reduce(operator.or_, ip_sets)
项目:archive-Holmes-Totem-Service-Library    作者:HolmesProcessing    | 项目源码 | 文件源码
def ValidateIP(ip):
    """
    Check if an IP is public or not.
    Public IP is any IP that is neither of:

    - private
    - loopback
    - broadcast / multicast
    - link-local

    Returns True on success and False on failure.

    Raises a ValueError exception if the input is not of type
    ipaddress.IPv4Address or ipaddress.IPv6Address.
    """
    return __validateIP(ip)
项目:pytestlab    作者:sangoma    | 项目源码 | 文件源码
def generate_packet(src, data):
    min_size = ethhdr.min_size + ip4hdr.min_size + udphdr.min_size
    packet = bytearray(len(data) + min_size)

    eth = ethhdr({'src': src,
                  'dst': pnet.HWAddress(u'ff:ff:ff:ff:ff:ff'),
                  'type': pnet.ETH_P_IP},
                 buf=packet)

    ip4 = ip4hdr({'dst': ipaddress.IPv4Address(u'255.255.255.255'),
                  'proto': socket.IPPROTO_UDP,
                  'len': len(data) + ip4hdr.min_size + udphdr.min_size},
                 buf=eth.payload)

    udp = udphdr({'sport': 68,
                  'dport': 67,
                  'len': len(data) + udphdr.min_size},
                 buf=ip4.payload)

    ip4['csum'] = pnet.checksum(ip4.tobytes())
    udp['csum'] = pnet.ipv4_checksum(ip4, udp, data)
    udp.payload = data
    return packet
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def test_socks_request_ipv4(self):
        self.layer.socks_conn = Mock()
        self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)

        addr_future = self.layer.handle_request_and_create_destination(
            Request(REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
                    u"127.0.0.1", self.port))

        dest_stream, host, port = yield addr_future

        self.assertIsNotNone(self.event)
        self.assertIsInstance(self.event, Response)
        self.assertEqual(self.event.status, RESP_STATUS["SUCCESS"])
        self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])

        self.assertIsNotNone(dest_stream)
        self.assertFalse(dest_stream.closed())
        self.assertEqual(host, IPv4Address(u"127.0.0.1"))
        self.assertEqual(port, self.port)

        dest_stream.close()
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def test_handle_connection_timeout(self):
        self.layer.socks_conn = Mock()
        self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)

        socks_request = Request(
            REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
            u"1.2.3.4", self.port)

        self.layer.create_dest_stream = Mock(
            side_effect=self.create_raise_exception_function(TimeoutError))
        addr_future = self.layer.handle_request_and_create_destination(
            socks_request)

        with self.assertRaises(DestNotConnectedError):
            yield addr_future

        self.assertIsNotNone(self.event)
        self.assertIsInstance(self.event, Response)
        self.assertEqual(self.event.status, RESP_STATUS["NETWORK_UNREACHABLE"])
        self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
        self.assertEqual(self.event.addr, IPv4Address(u"1.2.3.4"))
        self.assertEqual(self.event.port, self.port)
项目:babeltrace-zipkin    作者:vears91    | 项目源码 | 文件源码
def create_endpoint(ipv4, port, service_name):
    """Creates an endpoint object

    :param ipv4: ipv4 address of the endpoint
    :param port: port of the endpoint
    :service_name: human readable name that identifies the service of the endpoint
    :returns: zipkin endpoint object
    """

    if (ipv4 == ""):
        ipv4 = "0.0.0.0"
    try:
        ipv4 = int(ipaddress.IPv4Address(ipv4))
    except:
        ipv4 = "0.0.0.0"
    port = int(port)

    return zipkin_core.Endpoint(
        ipv4=ipv4, port=port, service_name=service_name)
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
    trace_req_header_values = TRACEREQHEADER()
    trace_req_header_values.oam_type = oam_type
    trace_req_header_values.sil = sil
    trace_req_header_values.port = int(remote_port)

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((remote_ip, trace_req_header_values.port))
    # print(s.getsockname()[0])
    src_addr = ipaddress.ip_address(s.getsockname()[0])
    if src_addr.version == 4:
        trace_req_header_values.ip_1 = 0x00000000
        trace_req_header_values.ip_2 = 0x00000000
        trace_req_header_values.ip_3 = 0x0000FFFF
        trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
    elif src_addr.version == 6:
        int_addr6 = int(ipaddress.IPv6Address(src_addr))
        trace_req_header_values.ip_1 = int_addr6 >> 96
        trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
        trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
        trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF

    return trace_req_header_values
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def process_context_headers(ctx1=0, ctx2=0, ctx3=0, ctx4=0):
    """
    Encode context header values in NSH header. The function
    is smart enough that is one of the values is an IP address
    it will properly convert to a integer and encode it properly
    :param ctx1: NSH context header 1
    :param ctx2: NSH context header 2
    :param ctx3: NSH context header 3
    :param ctx4: NSH context header 4
    :return: Array of encoded headers
    """
    context_headers = []
    for ctx in [ctx1, ctx2, ctx3, ctx4]:
        try:
            ipaddr = ipaddress.IPv4Address(ctx)
            context_headers.append(int(ipaddr))
        except ValueError:
            try:
                context_headers.append((int(ctx) & 0xFFFFFFFF))
            except ValueError:
                logger.error("Context header %d can not be represented as an integer", ctx)

    return context_headers
项目:pysnmp    作者:etingof    | 项目源码 | 文件源码
def getRecvFrom(addressType):
        def recvfrom(s, sz):
            _to = None
            data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz))
            for anc in ancdata:
                if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO:
                    addr = in_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
                elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO:
                    addr = in6_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
            return data, addressType(_from).setLocalAddress(_to)

        return recvfrom
项目:pysnmp    作者:etingof    | 项目源码 | 文件源码
def getSendTo(addressType):
        def sendto(s, _data, _to):
            ancdata = []
            if type(_to) == addressType:
                addr = ipaddress.ip_address(_to.getLocalAddress()[0])
            else:
                addr = ipaddress.ip_address(s.getsockname()[0])
            if type(addr) == ipaddress.IPv4Address:
                _f = in_pktinfo()
                _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
            elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
                _f = in6_pktinfo()
                _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
            return s.sendmsg([_data], ancdata, 0, _to)

        return sendto
项目:mmoserver    作者:kukfa    | 项目源码 | 文件源码
def serverList(self):
        data = b'\x04'                  # packet ID
        data += bytes([AUTH_NUMGATEWAYS])     # total num game servers available
        data += b'\x01'                 # last game server used

        # the following is repeated for each server:
        data += bytes([GATEWAY_ID])       # ID of each server (starting at 1)
        # gameserver IP, packed in big-endian order
        data += ipaddress.IPv4Address(GATEWAY_IP).packed
        # gameserver port, little-endian order
        data += struct.pack("<I", GATEWAY_PORT)
        data += bytes([GATEWAY_AGELIMIT])       # unsure what this is used for
        data += bytes([GATEWAY_PVP])            # 1 if GATEWAY_PVP server, otherwise 0
        data += struct.pack("<H", GATEWAY_NUMPLAYERS)   # current # of players
        data += struct.pack("<H", GATEWAY_MAXPLAYERS)   # max # of players
        data += bytes([GATEWAY_ONLINE])     # 1 if server should be listed, otherwise 0
        if (GATEWAY_ONLINE == 1):
            data += b'\x04\x00\x00\x00\x00'
        else:
            # TODO doesn't list server if it isn't a test server
            data += b'\x00\x00\x00\x00\x00'
        self.sendPacket(data)
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def test_recv_in_response(self):
        conn = Connection(our_role="client")
        conn._conn.machine.set_state("response")
        conn._conn._version = 5
        conn._conn._addr_type = ADDR_TYPE["IPV4"]
        conn._conn._addr = ipaddress.IPv4Address("127.0.0.1")
        conn._conn._port = 8080

        raw_data = struct.pack("!BBxB4BH", 0x5, 0x0, 0x1, 127, 0, 0, 1, 8080)
        event = conn.recv(raw_data)
        self.assertEqual(conn._conn.state, "end")
        self.assertEqual(event, "Response")
        self.assertEqual(event.status, 0)
        self.assertEqual(event.atyp, 1)
        self.assertEqual(event.addr, ipaddress.IPv4Address("127.0.0.1"))
        self.assertEqual(event.port, 8080)
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def test_greeting_request_socks4(self):
        raw_data = struct.pack("!BBH4B6sB", 0x4, 0x1, 5580, 127, 0, 0, 1, "Johnny".encode("ascii"), 0)

        request = read_greeting_request(raw_data)
        self.assertIsInstance(request, Socks4Request)
        self.assertEqual(request.cmd, 1)
        self.assertEqual(request.port, 5580)
        self.assertEqual(request.addr, ipaddress.IPv4Address("127.0.0.1"))
        self.assertEqual(request.name, "Johnny")

        raw_data = struct.pack(
            "!BBH4B6sB14sB", 0x4, 0x1, 5580, 0, 0, 0, 1, "Johnny".encode("ascii"), 0, "www.google.com".encode("idna"), 0)

        request = read_greeting_request(raw_data)
        self.assertIsInstance(request, Socks4Request)
        self.assertEqual(request.cmd, 1)
        self.assertEqual(request.port, 5580)
        self.assertEqual(request.addr, ipaddress.IPv4Address("0.0.0.1"))
        self.assertEqual(request.name, "Johnny")
        self.assertEqual(request.domainname, "www.google.com")
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def __init__(self, cmd, atyp, addr, port):
        if cmd not in REQ_COMMAND.values():
            raise ValueError("Unsupported request command {}".format(cmd))

        if atyp not in ADDR_TYPE.values():
            raise ValueError("Unsupported address type {}".format(atyp))

        if atyp == ADDR_TYPE["IPV4"]:
            try:
                addr = ipaddress.IPv4Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv4")
        elif atyp == ADDR_TYPE["IPV6"]:
            try:
                addr = ipaddress.IPv6Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv6")
        elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
            raise ValueError("Domain name expect to be unicode string")

        self.cmd = cmd
        self.atyp = atyp
        self.addr = addr
        self.port = port
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def __init__(self, status, atyp, addr, port):
        if status not in RESP_STATUS.values():
            raise ValueError("Unsupported status code {}".format(status))

        if atyp not in ADDR_TYPE.values():
            raise ValueError("Unsupported address type {}".format(atyp))

        if atyp == ADDR_TYPE["IPV4"]:
            try:
                addr = ipaddress.IPv4Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv4")
        elif atyp == ADDR_TYPE["IPV6"]:
            try:
                addr = ipaddress.IPv6Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv6")
        elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
            raise ValueError("Domain name expect to be unicode string")

        self.status = status
        self.atyp = atyp
        self.addr = addr
        self.port = port
项目:peerme    作者:cooperlees    | 项目源码 | 文件源码
def validate_ip_address(self, address, af=6):
        if af == 4:
            try:
                ipaddress.IPv4Address(address)
                return True
            except ipaddress.AddressValueError as ex:
                logging.error(ex)
        elif af == 6:
            try:
                ipaddress.IPv6Address(address)
                return True
            except ipaddress.AddressValueError as ex:
                logging.error(ex)
        else:
            raise Exception("Invalid AF: {}".format(af))
        return False
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testTeredo(self):
        # stolen from wikipedia
        server = ipaddress.IPv4Address('65.54.227.120')
        client = ipaddress.IPv4Address('192.0.2.45')
        teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
        self.assertEqual((server, client),
                         ipaddress.ip_address(teredo_addr).teredo)
        bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
        bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)

        # i77
        teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
        self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
                          ipaddress.IPv4Address('95.26.244.94')),
                         teredo_addr.teredo)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def parse_ip4(pack):
    arr = struct_ip4.unpack(pack[:20])
    hdr = {
        "version"  : arr[0] >> 4,       # should be 4 ...
        "ihl"      : arr[0] & 0xF,
        "tos"      : arr[1],
        "tot_len"  : arr[2],
        "id"       : arr[3],
        "frag_off" : arr[4],
        "ttl"      : arr[5],
        "protcol"  : arr[6],
        "check"    : arr[7],
        "saddr"    : ipaddress.IPv4Address(arr[8]),
        "daddr"    : ipaddress.IPv4Address(arr[9])
    }
    hdr_len = hdr["ihl"] * 4
    return hdr, pack[hdr_len:]
项目:MgmtAppForLinuxMachines_flask    作者:yugokato    | 项目源码 | 文件源码
def clean(self):
        # delete existing entry if exists
        machine = MachineData.objects(ip_address=self.ip_address)
        if machine:
            machine.delete()

        # add ip_address_decimal, aws, ec2 fields depending on the ip_address
        if self.ip_address_decimal is None:
            self.ip_address_decimal = int(ipaddress.IPv4Address(self.ip_address))
        if self.aws is None:
            self.aws = Validation.is_aws(self.ip_address)
        if self.aws:
            if self.status == 'OK':
                self.ec2 = {
                    'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)),
                    'state': "running"
                }
            else:
                self.ec2 = {
                    'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)),
                    'state': None
                }
项目:recipe-catalog    作者:inocybe    | 项目源码 | 文件源码
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
    trace_req_header_values = TRACEREQHEADER()
    trace_req_header_values.oam_type = oam_type
    trace_req_header_values.sil = sil
    trace_req_header_values.port = int(remote_port)

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((remote_ip, trace_req_header_values.port))
    # print(s.getsockname()[0])
    src_addr = ipaddress.ip_address(s.getsockname()[0])
    if src_addr.version == 4:
        trace_req_header_values.ip_1 = 0x00000000
        trace_req_header_values.ip_2 = 0x00000000
        trace_req_header_values.ip_3 = 0x0000FFFF
        trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
    elif src_addr.version == 6:
        int_addr6 = int(ipaddress.IPv6Address(src_addr))
        trace_req_header_values.ip_1 = int_addr6 >> 96
        trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
        trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
        trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF

    return trace_req_header_values
项目:recipe-catalog    作者:inocybe    | 项目源码 | 文件源码
def process_context_headers(ctx1=0, ctx2=0, ctx3=0, ctx4=0):
    """
    Encode context header values in NSH header. The function
    is smart enough that is one of the values is an IP address
    it will properly convert to a integer and encode it properly
    :param ctx1: NSH context header 1
    :param ctx2: NSH context header 2
    :param ctx3: NSH context header 3
    :param ctx4: NSH context header 4
    :return: Array of encoded headers
    """
    context_headers = []
    for ctx in [ctx1, ctx2, ctx3, ctx4]:
        try:
            ipaddr = ipaddress.IPv4Address(ctx)
            context_headers.append(int(ipaddr))
        except ValueError:
            try:
                context_headers.append((int(ctx) & 0xFFFFFFFF))
            except ValueError:
                logger.error("Context header %d can not be represented as an integer", ctx)

    return context_headers
项目:nat64check    作者:sjm-steffann    | 项目源码 | 文件源码
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]:
    # Get DNS info
    try:
        a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname],
                                            stderr=subprocess.DEVNULL)
        aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname],
                                               stderr=subprocess.DEVNULL)
        dns_records = a_records + aaaa_records

        dns_results = []
        for line in dns_records.decode('utf-8').strip().split():
            try:
                dns_results.append(str(ip_address(line)))
            except ValueError:
                pass
    except subprocess.CalledProcessError:
        dns_results = []

    return dns_results
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:dcos-e2e    作者:mesosphere    | 项目源码 | 文件源码
def _nodes(self, container_base_name: str) -> Set[Node]:
        """
        Args:
            container_base_name: The start of the container names.

        Returns: ``Node``s corresponding to containers with names starting
            with ``container_base_name``.
        """
        client = docker.from_env(version='auto')
        filters = {'name': container_base_name}
        containers = client.containers.list(filters=filters)

        return set(
            Node(
                ip_address=IPv4Address(
                    container.attrs['NetworkSettings']['IPAddress']
                ),
                ssh_key_path=self._path / 'include' / 'ssh' / 'id_rsa',
            ) for container in containers
        )
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def is_ipv4(string):
        """
        Checks if a string is a valid ip-address (v4)

        :param string: String to check
        :type string: str

        :return: True if an ip, false otherwise.
        :rtype: bool
        """

        try:
            ipaddress.IPv4Address(string)
            return True
        except ipaddress.AddressValueError:
            return False
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def getSendTo(addressType):
        def sendto(s, _data, _to):
            ancdata = []
            if type(_to) == addressType:
                addr = ipaddress.ip_address(_to.getLocalAddress()[0])
            else:
                addr = ipaddress.ip_address(s.getsockname()[0])
            if type(addr) == ipaddress.IPv4Address:
                _f = in_pktinfo()
                _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
            elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
                _f = in6_pktinfo()
                _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
            return s.sendmsg([_data], ancdata, 0, _to)
        return sendto
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testTeredo(self):
        # stolen from wikipedia
        server = ipaddress.IPv4Address('65.54.227.120')
        client = ipaddress.IPv4Address('192.0.2.45')
        teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
        self.assertEqual((server, client),
                         ipaddress.ip_address(teredo_addr).teredo)
        bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
        bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)

        # i77
        teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
        self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
                          ipaddress.IPv4Address('95.26.244.94')),
                         teredo_addr.teredo)
项目:vCloudDirectorManager    作者:blackms    | 项目源码 | 文件源码
def get_less_used_network(num_of_requested_ip, global_variable, config_obj):
    """
    Get the network with more IP available.

    :return: Network
    """
    # identify less used network
    while global_variable.free_ip_lock is True:
        time.sleep(1)
    free_ip = global_variable.free_ips
    less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0]
    # network must have enough IPs
    if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int(
            config_section_map(config_obj, 'vcloud')['min_spare_ip']):
        return None
    ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip]
    pvdc_external_networks = global_variable.vcs.get_network()
    from ipaddress import IPv4Address, IPv4Network

    external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks)
    return {'object': external_network[0],
            'iplist': ip_list}
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def is_ipv4_address(self, ipaddr):
        '''
        @summary: Check address is valid IPv4 address.
        @param ipaddr IP address to check
        @return Boolean
        '''
        is_valid_ipv4 = True
        try :
            # building ipaddress fails for some of addresses unless unicode(ipaddr) is specified for both ipv4/ipv6
            # Example - 192.168.156.129, it is valid IPV4 address, send_packet works with it.
            ip = ipaddress.IPv4Address(unicode(ipaddr))
        except Exception, e :
            is_valid_ipv4 = False

        return is_valid_ipv4
    #---------------------------------------------------------------------
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:djangolg    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, value):
        """Initialise new IPPrefix instance."""
        try:
            obj = ipaddress.ip_address(value)
        except Exception:
            try:
                obj = ipaddress.ip_network(value, strict=False)
            except Exception:
                raise
        if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
            self.prefix = None
            self.type = self.HOST
        elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
            self.prefix = obj.network_address
            self.type = self.PREFIX
        self.version = obj.version
        self.txt = obj.compressed
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testTeredo(self):
        # stolen from wikipedia
        server = ipaddress.IPv4Address('65.54.227.120')
        client = ipaddress.IPv4Address('192.0.2.45')
        teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
        self.assertEqual((server, client),
                         ipaddress.ip_address(teredo_addr).teredo)
        bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
        bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)

        # i77
        teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
        self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
                          ipaddress.IPv4Address('95.26.244.94')),
                         teredo_addr.teredo)
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:llama    作者:dropbox    | 项目源码 | 文件源码
def validate_ip(addr):
    """Pass-through function for validating an IPv4 address.

    Args:
        ip: (str) IP address

    Returns:
        unicode string with same address

    Raises:
        Error: if IPv4 address is not valid
    """
    try:
        return ipaddress.IPv4Address(unicode(addr)).compressed
    except ipaddress.AddressValueError as exc:
        raise Error('Invalid IPv4 address "%s"; %s' % (addr, exc))