Python scapy.all 模块,ARP 实例源码


项目:raw-packet    作者:Vladimir-Ivanov-Git    | 项目源码 | 文件源码
def send_arp_reply(request):
    if request.haslayer(ARP):
        global _current_number_of_packets
        global _current_network_interface
        global _current_mac_address
        global _arp

        SOCK = socket(AF_PACKET, SOCK_RAW)
        SOCK.bind((_current_network_interface, 0))

        if request[ARP].op == 1:
            if request[Ether].dst == "ff:ff:ff:ff:ff:ff" and request[ARP].hwdst == "00:00:00:00:00:00":
                print Base.c_info + "ARP request from MAC: " + request[ARP].hwsrc + " IP: " + request[ARP].pdst
                reply = _arp.make_response(ethernet_src_mac=_current_mac_address,
                                           sender_mac=_current_mac_address, sender_ip=request[ARP].pdst,
                                           target_mac=request[ARP].hwsrc, target_ip=request[ARP].psrc)
                _current_number_of_packets += 1
                if _current_number_of_packets >= _number_of_packets:
项目:pentesting-multitool    作者:ffmancera    | 项目源码 | 文件源码
def mac_getter(self, IP):

        # Sending ARP for take the MAC address
        ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=IP), timeout=2, iface=self.interface, inter=0.2)

        for send, receive in ans:
            return receive.sprintf(r"%Ether.src%")
项目:pentesting-multitool    作者:ffmancera    | 项目源码 | 文件源码
def ARPing(self):

        victimMAC = self.mac_getter(self.victimIP)
        AP_MAC = self.mac_getter(self.gatewayIP)

        # Creating and sending ARP packets for try to hide the attack
        send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=AP_MAC), count=10)
        send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=victimMAC), count=10)

        # Disabling IP Forwarding
        os.system("echo 0 > /proc/sys/net/ipv4/ip_forward")

项目:pentesting-multitool    作者:ffmancera    | 项目源码 | 文件源码
def sending_arp(self):

        victim = self.mac_getter(self.victimIP)
        AP_MAC = self.mac_getter(self.gatewayIP)

        # Those replies places us between them (ARP Spoofing)
        send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst=victim))
        send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst=AP_MAC))
项目:habu    作者:portantier    | 项目源码 | 文件源码
def procpkt(pkt):

    now = time()
    output = '{seconds}\t{ip}\t{hwaddr}\t{vendor}'

    if 'ARP' in pkt:
        hosts[pkt[ARP].psrc] = {}
        hosts[pkt[ARP].psrc]['hwaddr'] = pkt[ARP].hwsrc
        hosts[pkt[ARP].psrc]['vendor'] = mac2vendor.get_comment(pkt[ARP].hwsrc)
        hosts[pkt[ARP].psrc]['time'] = time()


        for ip in sorted(hosts):
                seconds = int(now - hosts[ip]['time']),
                ip = ip,
                hwaddr = hosts[ip]['hwaddr'],
                vendor = hosts[ip]['vendor']
项目:raw-packet    作者:Vladimir-Ivanov-Git    | 项目源码 | 文件源码
def get_mac(iface, ip):
        gw_ip = ""
        gws = gateways()
        for gw in gws.keys():
                if str(gws[gw][AF_INET][1]) == iface:
                    gw_ip = str(gws[gw][AF_INET][0])
            except IndexError:
                if str(gws[gw][0][1]) == iface:
                    gw_ip = str(gws[gw][0][0])
            alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), iface=iface, timeout=10, verbose=0)
            return str(alive[0][1].hwsrc)
        except IndexError:
                alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=gw_ip), iface=iface, timeout=10, verbose=0)
                return str(alive[0][1].hwsrc)
                return "ff:ff:ff:ff:ff:ff"
            return "ff:ff:ff:ff:ff:ff"
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def scapy_mon_arp_callback(pkt):
    """CALLBACK: Monitor ARP table on all interfaces."""
    global seen_arp
    if ARP in pkt and pkt[ARP].op in (1, 2):
        mon_data = (pkt[ARP].hwsrc + ' ' + pkt[ARP].psrc)
        arp_data = mon_data.split(' ')
        if (arp_data[0]) not in seen_arp:
            seen_arp[arp_data[0]] = arp_data[1]
            print(bc.OKGREEN + '\tAdding: ' + arp_data[0] + ' - ' + arp_data[1])
        elif (arp_data[0]) in seen_arp:
            for key, value in seen_arp.items():
                if key == arp_data[0] and value != arp_data[1]:
                    print(bc.WARN + '\tWARNING - POSSIBLE ARP-ATTACK!')
                    print(bc.WARN + '\tDuplicated mac address found: ' + key + ' != ' + value)
                    print('\tThe ARP data will be deleted within 10 seconds. If this warning persists, you are under attack!')
                    seen_arp = {}
项目:HomeAssistant-CustomComponents    作者:pilotak    | 项目源码 | 文件源码
def restore(self, index):
            victimIP = self._devices[index][0]
            victimMAC = self._devices[index][1]

  "Enabling internet for device IP: %s MAC: %s",
                         victimIP, victimMAC)

            del self._devices[index]

            send(ARP(op=2, pdst=victimIP, hwdst=victimMAC, psrc=self._router_ip,
                     hwsrc=self._router_mac), count=4, iface=self._interface, verbose=False)
            send(ARP(op=2, pdst=self._router_ip, hwdst=self._router_mac, psrc=victimIP,
                     hwsrc=victimMAC), count=4, iface=self._interface, verbose=False)

            _LOGGER.error("Error when restoring IP: %s", victimIP)
项目:steth    作者:openstack    | 项目源码 | 文件源码
def get_arp_op(self, buff):
        ether_packet = scapy.Ether(buff)
        arp_packet = ether_packet[scapy.ARP]
        return constants.ARP_OP_TYPE[arp_packet.op]
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_arpsniff(iface):
    """ ARP Sniffer """

    conf.verb = False

    if iface:
        conf.iface = iface

    sniff(filter="arp", store=False, prn=procpkt)
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_arpoison(t1, t2, iface, verbose):
    """ARP cache poison"""

    conf.verb = False

    if iface:
        conf.iface = iface

    mac1 = getmacbyip(t1)
    mac2 = getmacbyip(t2)

    pkt1 = Ether(dst=mac1)/ARP(op="is-at", psrc=t2, pdst=t1, hwdst=mac1)
    pkt2 = Ether(dst=mac2)/ARP(op="is-at", psrc=t1, pdst=t2, hwdst=mac2)

        while 1:

            if verbose:


    except KeyboardInterrupt:
项目:raw-packet    作者:Vladimir-Ivanov-Git    | 项目源码 | 文件源码
def make_arp_reply_packet():
    return (Ether(src=sender_mac_address, dst=args.target_mac) /
            ARP(hwsrc=sender_mac_address, psrc=args.sender_ip,
                hwdst=args.target_mac, pdst=args.target_ip))
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def scapy_mon_arp():
    """Monitor ARP table on all interfaces."""
    mon_data = sniff(prn=scapy_mon_arp_callback, filter="arp", store=0)
    return mon_data
项目:netmon    作者:bullerian    | 项目源码 | 文件源码
def ping_host(self, ip):
        """method to send ICMP/ARP requests and receive response
        from the host with @ip address.
        Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)"""

        # form an ICMP or ARP packet
        packet = self.__gen_packet(ip)

            # send and wait for response
            answers, unanswers = self.__send_recv(packet)
        except PermissionError:
            raise PermissionException

        if self.__resolve_names:
            # resolve host name by ip if resolve_names
            # flag was set
                host = gethostbyaddr(ip)[HOST_NAME_INDEX]
            except herror:
                host = ip
            # otherwise show ip
            host = ip

        if answers:
            answer = answers[FIRST_INDEX]
            # get the request object
            req = answer[REQUEST_INDEX]
            # get the response object
            resp = answer[RESPONSE_INDEX]
            # calculate response time and round it
            delta = resp.time - req.sent_time
            return host, ONLINE, delta
            # return unansered results
            unanswer = unanswers[FIRST_INDEX]
            resp = unanswer[RESPONSE_INDEX]
            return host, OFFLINE, None
项目:home-security    作者:icode-co-il    | 项目源码 | 文件源码
def is_device_connected(mac_addr):
    answer, _ = scapy.srp(scapy.Ether(dst="ff:ff:ff:ff:ff:ff") / scapy.ARP(pdst=SUBNET), timeout=2)
    return mac_addr in (rcv.src for _, rcv in answer)
项目:ansible    作者:crombeen    | 项目源码 | 文件源码
def arping(mac, ip, timeout):
    """Arping function takes IP Address or Network, returns nested mac/ip list"""
    for i in range(timeout/2):
        ans = srp1(Ether(dst=mac)/ARP(pdst=ip), timeout=2, verbose=0)
        if ans: break
    return ans
项目:HomeAssistant-CustomComponents    作者:pilotak    | 项目源码 | 文件源码
def update_cache(self):
            ans, unans = arping(self._router_ip + "/24",
                                iface=self._interface, verbose=False)

            self._arp_cache = []

            if ans:
                for s, r in ans:
                    self._arp_cache.append([r[ARP].psrc, r[Ether].src.lower()])

            _LOGGER.debug("ARP cache: %s", self._arp_cache)
        except Exception as e:
            _LOGGER.error("Error when trying update ARP cache: %s", str(e))
项目:HomeAssistant-CustomComponents    作者:pilotak    | 项目源码 | 文件源码
def spoof(self, index):
            victimIP = self._devices[index][0]
            victimMAC = self._devices[index][1]

            send(ARP(op=2, pdst=victimIP, psrc=self._router_ip,
                     hwdst=victimMAC), iface=self._interface, verbose=False)
            send(ARP(op=2, pdst=self._router_ip, psrc=victimIP,
                     hwdst=self._router_mac), iface=self._interface, verbose=False)
            _LOGGER.error("Error when trying to spoof device IP: %s MAC: %s",
                     victimIP, victimMAC)
项目:habu    作者:portantier    | 项目源码 | 文件源码
def cmd_arping(ip, iface, verbose):

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    conf.verb = False

    if iface:
        conf.iface = iface

    res, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=2)