Python socket 模块,IPPROTO_ICMP 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用socket.IPPROTO_ICMP

项目:XFLTReaT    作者:earthquake    | 项目源码 | 文件源码
def serve(self):
        server_socket = None
        try:
            common.internal_print("Starting server: {0} on {1}".format(self.get_module_name(), self.config.get("Global", "serverbind")))

            server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
            whereto = (self.config.get("Global", "serverbind"), self.ICMP_fake_serverport)

            self.comms_socket = server_socket
            self.serverorclient = 1
            self.authenticated = False

            self.communication_initialization()
            self.communication(False) 

        except KeyboardInterrupt:

                self.cleanup()
                return

        self.cleanup()

        return
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:fdslight    作者:fdslight    | 项目源码 | 文件源码
def init_func(self, creator_fd):
        self.__creator_fd = creator_fd
        self.__sent = []

        family = socket.AF_INET

        s = socket.socket(family, socket.SOCK_RAW,
                          socket.IPPROTO_UDP | socket.IPPROTO_ICMP | socket.IPPROTO_UDP | 136)
        s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
        s.setblocking(0)

        self.__socket = s
        self.set_fileno(s.fileno())
        self.register(self.fileno)
        self.add_evt_read(self.fileno)

        return self.fileno
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr:
            if other.dst == "224.0.0.251" and self.dst == "224.0.0.251":  # mDNS
                return self.payload.answers(other.payload)
            elif (self.dst != other.src):
                return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:isf    作者:w3h    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:scapy-vxlan    作者:p4lang    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:XFLTReaT    作者:earthquake    | 项目源码 | 文件源码
def client(self):
        try:
            common.internal_print("Starting client: {0}".format(self.get_module_name()))

            server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
            self.server_tuple = (self.config.get("Global", "remoteserverip"), self.ICMP_fake_serverport)
            self.comms_socket = server_socket
            self.serverorclient = 0
            self.authenticated = False

            self.communication_initialization()
            self.do_auth()
            self.communication(False)

        except KeyboardInterrupt:
            self.do_logoff()
            self.cleanup()
            raise

        self.cleanup()

        return
项目:XFLTReaT    作者:earthquake    | 项目源码 | 文件源码
def check(self):
        try:
            common.internal_print("Checking module on server: {0}".format(self.get_module_name()))

            server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
            self.server_tuple = (self.config.get("Global", "remoteserverip"), self.ICMP_fake_serverport)
            self.comms_socket = server_socket
            self.serverorclient = 0
            self.authenticated = True
            self.communication_initialization()
            self.do_check()
            self.communication(True)

        except KeyboardInterrupt:
            self.cleanup()
            raise
        except socket.timeout:
            common.internal_print("Checking failed: {0}".format(self.get_module_name()), -1)

        self.cleanup()

        return
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def answers(self, other):
        if not isinstance(other,IP):
            return 0
        if conf.checkIPaddr and (self.dst != other.src):
            return 0
        if ( (self.proto == socket.IPPROTO_ICMP) and
             (isinstance(self.payload, ICMP)) and
             (self.payload.type in [3,4,5,11,12]) ):
            # ICMP error message
            return self.payload.payload.answers(other)

        else:
            if ( (conf.checkIPaddr and (self.src != other.dst)) or
                 (self.proto != other.proto) ):
                return 0
            return self.payload.answers(other.payload)
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:pyng    作者:jdowner    | 项目源码 | 文件源码
def ping(addr, size=16):
    # Open a raw socket to send the request. NB: you need to be root to have
    # permission to open a raw socket.
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

    # Generate a positive, random 16-bit integer to serve as the id for this
    # ping
    id = random.randint(1, 2**16 - 1)

    # Send the message and wait for the reply
    send(sock, addr, id, size)
    duration = yield from recv(sock, id)

    return duration
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:fabric-oftest    作者:opencord    | 项目源码 | 文件源码
def parse(self):
        """
        Update the headers in self.match based on self.data 

        Parses the relevant header features out of the packet, using
        the table outlined in the OF1.1 spec, Figure 4
        """
        self.bytes = len(self.data)
        self.match.in_port = self.in_port
        self.match.type = ofp.OFPMT_STANDARD
        self.match.length = ofp.OFPMT_STANDARD_LENGTH
        self.match.wildcards = 0
        self.match.nw_dst_mask = 0
        self.match.nw_dst_mask = 0
        self.match.dl_dst_mask = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
        self.match.dl_src_mask = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
        self.mpls_tag_offset = None
        self.ip_header_offset = None

        idx = 0
        try:
            idx = self._parse_l2(idx)

            if self.match.dl_type == ETHERTYPE_IP:
                self.ip_header_offset = idx 
                idx = self._parse_ip(idx)
                if self.match.nw_proto in [ socket.IPPROTO_TCP,
                                            socket.IPPROTO_UDP,
                                            socket.IPPROTO_ICMP]:
                    self.tcp_header_offset = idx
                    if self.match.nw_proto != socket.IPPROTO_ICMP:
                        idx = self._parse_l4(idx)
                    else:
                        idx = self._parse_icmp(idx)
            elif self.match.dl_type == ETHERTYPE_ARP:
                self._parse_arp(idx)
        except (parse_error), e:
            self.logger.warn("Giving up on parsing packet, got %s" % 
                             (str(e)))
            return None
        return self.match
项目:fabric-oftest    作者:opencord    | 项目源码 | 文件源码
def set_tp_src(self, tp_src):
        if self.tcp_header_offset is None:
            return
        if (self.match.nw_proto == socket.IPPROTO_TCP or
            self.match.nw_proto == socket.IPPROTO_UDP): 
            self._set_2bytes(self.tcp_header_offset, tp_src)
        elif (self.match.nw_proto == socket.IPPROTO_ICMP):
            self._set_1bytes(self.tcp_header_offset, tp_src)
        self._update_l4_checksum()
        self.match.tp_src = tp_src
项目:fabric-oftest    作者:opencord    | 项目源码 | 文件源码
def set_tp_dst(self, tp_dst):
        if self.tcp_header_offset is None:
            return
        if (self.match.nw_proto == socket.IPPROTO_TCP or
            self.match.nw_proto == socket.IPPROTO_UDP): 
            self._set_2bytes(self.tcp_header_offset +2, tp_dst)
        elif (self.match.nw_proto == socket.IPPROTO_ICMP):
            self._set_1bytes(self.tcp_header_offset + 1, tp_dst)
        self._update_l4_checksum()
        self.match.tp_dst = tp_dst
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if self.dst == "224.0.0.251":  # mDNS
                return struct.pack("B", self.proto) + self.payload.hashret()
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:isf    作者:w3h    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:scapy-vxlan    作者:p4lang    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:duct    作者:ducted    | 项目源码 | 文件源码
def createInternetSocket(self):
        s = socket.socket(
            socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

        s.setblocking(0)

        fd = s.fileno()

        # Set close-on-exec

        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
        flags = flags | fcntl.FD_CLOEXEC
        fcntl.fcntl(fd, fcntl.F_SETFD, flags)

        return s
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def hashret(self):
        if ( (self.proto == socket.IPPROTO_ICMP)
             and (isinstance(self.payload, ICMP))
             and (self.payload.type in [3,4,5,11,12]) ):
            return self.payload.payload.hashret()
        else:
            if conf.checkIPsrc and conf.checkIPaddr:
                return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret()
            else:
                return struct.pack("B", self.proto)+self.payload.hashret()
项目:fabric-oftest    作者:opencord    | 项目源码 | 文件源码
def simple_icmp_packet(self,
                          pktlen=100, 
                          dl_dst='00:01:02:03:04:05',
                          dl_src='00:06:07:08:09:0a',
                          dl_vlan_enable=False,
                          dl_vlan_type=ETHERTYPE_VLAN,
                          dl_vlan=0,
                          dl_vlan_pcp=0,
                          dl_vlan_cfi=0,
                          mpls_type=None,
                          mpls_tags=None,
                          ip_src='192.168.0.1',
                          ip_dst='192.168.0.2',
                          ip_tos=0,
                          ip_ttl=64,
                          icmp_type=8, # ICMP_ECHO_REQUEST
                          icmp_code=0, 
                          ):
        """
        Return a simple dataplane ICMP packet

        Supports a few parameters:
        @param len Length of packet in bytes w/o CRC
        @param dl_dst Destinatino MAC
        @param dl_src Source MAC
        @param dl_vlan_enable True if the packet is with vlan, False otherwise
        @param dl_vlan_type Ether type for VLAN
        @param dl_vlan VLAN ID
        @param dl_vlan_pcp VLAN priority
        @param ip_src IP source
        @param ip_dst IP destination
        @param ip_tos IP ToS
        @param tcp_dport TCP destination port
        @param ip_sport TCP source port

        Generates a simple TCP request.  Users shouldn't assume anything 
        about this packet other than that it is a valid ethernet/IP/TCP frame.
        """
        self.data = ""
        self._make_ip_packet(dl_dst, dl_src, dl_vlan_enable, dl_vlan_type, 
                             dl_vlan, dl_vlan_pcp, dl_vlan_cfi, 
                             mpls_type, mpls_tags,
                             ip_tos,
                              ip_ttl, ip_src, ip_dst, socket.IPPROTO_ICMP)

        # Add ICMP header
        self.data += struct.pack("!BBHHH",
                                 icmp_type,
                                 icmp_code,
                                 0,  # icmp.checksum
                                 os.getpid() & 0xffff,  # icmp.echo.id
                                 Packet.icmp_counter   # icmp.echo.seq
                                 )                  
        Packet.icmp_counter += 1       

        # Fill out packet
        self.data += "D" * (pktlen - len(self.data))

        return self