我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用scapy.all.ARP。
def send_arp_reply(request): if request.haslayer(ARP): global _current_number_of_packets global _current_network_interface global _current_mac_address global _arp SOCK = socket(AF_PACKET, SOCK_RAW) SOCK.bind((_current_network_interface, 0)) if request[ARP].op == 1: if request[Ether].dst == "ff:ff:ff:ff:ff:ff" and request[ARP].hwdst == "00:00:00:00:00:00": print Base.c_info + "ARP request from MAC: " + request[ARP].hwsrc + " IP: " + request[ARP].pdst reply = _arp.make_response(ethernet_src_mac=_current_mac_address, ethernet_dst_mac=request[ARP].hwsrc, sender_mac=_current_mac_address, sender_ip=request[ARP].pdst, target_mac=request[ARP].hwsrc, target_ip=request[ARP].psrc) SOCK.send(reply) _current_number_of_packets += 1 if _current_number_of_packets >= _number_of_packets: SOCK.close() exit(0)
def mac_getter(self, IP): # Sending ARP for take the MAC address ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=IP), timeout=2, iface=self.interface, inter=0.2) for send, receive in ans: return receive.sprintf(r"%Ether.src%")
def ARPing(self): victimMAC = self.mac_getter(self.victimIP) AP_MAC = self.mac_getter(self.gatewayIP) # Creating and sending ARP packets for try to hide the attack send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=AP_MAC), count=10) send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=victimMAC), count=10) # Disabling IP Forwarding os.system("echo 0 > /proc/sys/net/ipv4/ip_forward") exit()
def sending_arp(self): victim = self.mac_getter(self.victimIP) AP_MAC = self.mac_getter(self.gatewayIP) # Those replies places us between them (ARP Spoofing) send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst=victim)) send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst=AP_MAC))
def procpkt(pkt): now = time() output = '{seconds}\t{ip}\t{hwaddr}\t{vendor}' if 'ARP' in pkt: hosts[pkt[ARP].psrc] = {} hosts[pkt[ARP].psrc]['hwaddr'] = pkt[ARP].hwsrc hosts[pkt[ARP].psrc]['vendor'] = mac2vendor.get_comment(pkt[ARP].hwsrc) hosts[pkt[ARP].psrc]['time'] = time() click.clear() for ip in sorted(hosts): print(output.format( seconds = int(now - hosts[ip]['time']), ip = ip, hwaddr = hosts[ip]['hwaddr'], vendor = hosts[ip]['vendor'] ))
def get_mac(iface, ip): gw_ip = "" gws = gateways() for gw in gws.keys(): try: if str(gws[gw][AF_INET][1]) == iface: gw_ip = str(gws[gw][AF_INET][0]) except IndexError: if str(gws[gw][0][1]) == iface: gw_ip = str(gws[gw][0][0]) try: alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), iface=iface, timeout=10, verbose=0) return str(alive[0][1].hwsrc) except IndexError: try: alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=gw_ip), iface=iface, timeout=10, verbose=0) return str(alive[0][1].hwsrc) except: return "ff:ff:ff:ff:ff:ff" except: return "ff:ff:ff:ff:ff:ff"
def scapy_mon_arp_callback(pkt): """CALLBACK: Monitor ARP table on all interfaces.""" global seen_arp if ARP in pkt and pkt[ARP].op in (1, 2): mon_data = (pkt[ARP].hwsrc + ' ' + pkt[ARP].psrc) arp_data = mon_data.split(' ') if (arp_data[0]) not in seen_arp: seen_arp[arp_data[0]] = arp_data[1] print(bc.OKGREEN + '\tAdding: ' + arp_data[0] + ' - ' + arp_data[1]) elif (arp_data[0]) in seen_arp: for key, value in seen_arp.items(): if key == arp_data[0] and value != arp_data[1]: print(bc.WARN + '\tWARNING - POSSIBLE ARP-ATTACK!') print(bc.WARN + '\tDuplicated mac address found: ' + key + ' != ' + value) print('') print('\tThe ARP data will be deleted within 10 seconds. If this warning persists, you are under attack!') sleep(10) seen_arp = {}
def restore(self, index): try: victimIP = self._devices[index][0] victimMAC = self._devices[index][1] _LOGGER.info("Enabling internet for device IP: %s MAC: %s", victimIP, victimMAC) del self._devices[index] send(ARP(op=2, pdst=victimIP, hwdst=victimMAC, psrc=self._router_ip, hwsrc=self._router_mac), count=4, iface=self._interface, verbose=False) send(ARP(op=2, pdst=self._router_ip, hwdst=self._router_mac, psrc=victimIP, hwsrc=victimMAC), count=4, iface=self._interface, verbose=False) except: _LOGGER.error("Error when restoring IP: %s", victimIP)
def get_arp_op(self, buff): ether_packet = scapy.Ether(buff) arp_packet = ether_packet[scapy.ARP] return constants.ARP_OP_TYPE[arp_packet.op]
def cmd_arpsniff(iface): """ ARP Sniffer """ conf.verb = False if iface: conf.iface = iface sniff(filter="arp", store=False, prn=procpkt)
def cmd_arpoison(t1, t2, iface, verbose): """ARP cache poison""" conf.verb = False if iface: conf.iface = iface mac1 = getmacbyip(t1) mac2 = getmacbyip(t2) pkt1 = Ether(dst=mac1)/ARP(op="is-at", psrc=t2, pdst=t1, hwdst=mac1) pkt2 = Ether(dst=mac2)/ARP(op="is-at", psrc=t1, pdst=t2, hwdst=mac2) try: while 1: sendp(pkt1) sendp(pkt2) if verbose: pkt1.show2() pkt2.show2() else: print(pkt1.summary()) print(pkt2.summary()) time.sleep(1) except KeyboardInterrupt: pass
def make_arp_reply_packet(): return (Ether(src=sender_mac_address, dst=args.target_mac) / ARP(hwsrc=sender_mac_address, psrc=args.sender_ip, hwdst=args.target_mac, pdst=args.target_ip))
def scapy_mon_arp(): """Monitor ARP table on all interfaces.""" mon_data = sniff(prn=scapy_mon_arp_callback, filter="arp", store=0) return mon_data
def ping_host(self, ip): """method to send ICMP/ARP requests and receive response from the host with @ip address. Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)""" # form an ICMP or ARP packet packet = self.__gen_packet(ip) try: # send and wait for response answers, unanswers = self.__send_recv(packet) except PermissionError: raise PermissionException if self.__resolve_names: # resolve host name by ip if resolve_names # flag was set try: host = gethostbyaddr(ip)[HOST_NAME_INDEX] except herror: host = ip else: # otherwise show ip host = ip if answers: answer = answers[FIRST_INDEX] # get the request object req = answer[REQUEST_INDEX] # get the response object resp = answer[RESPONSE_INDEX] # calculate response time and round it delta = resp.time - req.sent_time return host, ONLINE, delta else: # return unansered results unanswer = unanswers[FIRST_INDEX] resp = unanswer[RESPONSE_INDEX] return host, OFFLINE, None
def is_device_connected(mac_addr): answer, _ = scapy.srp(scapy.Ether(dst="ff:ff:ff:ff:ff:ff") / scapy.ARP(pdst=SUBNET), timeout=2) return mac_addr in (rcv.src for _, rcv in answer)
def arping(mac, ip, timeout): """Arping function takes IP Address or Network, returns nested mac/ip list""" for i in range(timeout/2): ans = srp1(Ether(dst=mac)/ARP(pdst=ip), timeout=2, verbose=0) if ans: break return ans
def update_cache(self): try: ans, unans = arping(self._router_ip + "/24", iface=self._interface, verbose=False) self._arp_cache = [] if ans: for s, r in ans: self._arp_cache.append([r[ARP].psrc, r[Ether].src.lower()]) _LOGGER.debug("ARP cache: %s", self._arp_cache) except Exception as e: _LOGGER.error("Error when trying update ARP cache: %s", str(e))
def spoof(self, index): try: victimIP = self._devices[index][0] victimMAC = self._devices[index][1] send(ARP(op=2, pdst=victimIP, psrc=self._router_ip, hwdst=victimMAC), iface=self._interface, verbose=False) send(ARP(op=2, pdst=self._router_ip, psrc=victimIP, hwdst=self._router_mac), iface=self._interface, verbose=False) except: _LOGGER.error("Error when trying to spoof device IP: %s MAC: %s", victimIP, victimMAC)
def cmd_arping(ip, iface, verbose): if verbose: logging.basicConfig(level=logging.INFO, format='%(message)s') conf.verb = False if iface: conf.iface = iface res, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=2) res.show()