Python scapy.all 模块,sniff() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用scapy.all.sniff()

项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def sniff_aps(int_mon):
    # Print the program header
    print("-=-=-=-=-=-= AIROSCAPY =-=-=-=-=-=-")
    print("CH ENC BSSID             SSID")

    # Start the channel hopper
    global channel_hop
    channel_hop = True
    pros = Process(target=channel_hopper)
    pros.start()

    # Capture CTRL-C
    #signal.signal(signal.SIGINT, sniff_aps_signal_handler)

    # Start the sniffer
    sniff(iface=int_mon, prn=sniff_aps_callback)
    sniff_aps_signal_handler()
项目:DET    作者:sensepost    | 项目源码 | 文件源码
def listen():
    app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
    # Filter for echo requests only to prevent capturing generated replies
    scapy.sniff(filter="icmp and icmp[0]=8", prn=analyze)
项目:smart_sniffer    作者:ScarWar    | 项目源码 | 文件源码
def update_next_packet(self):
        packet = s.sniff(count=1)  # filter = "tcp.len > 0",
        packet = packet[0]

        if s.IP not in packet:
            return

        ip_send = packet[s.IP].src
        ip_rec = packet[s.IP].dst

        if ip_send != self.our_ip and ip_rec != self.our_ip:
            return
        print packet.summary()
        if make_stamp(packet) is not None:
            ip_send, ip_rec, protocol = make_stamp(packet)
            three_tuple = [ip_send, ip_rec, protocol]

            stamp = self.decide_stamp(three_tuple)

            if self.sessions.get(stamp) is None:
                threading.Thread(target=self.set_session, args=[packet, stamp, self.our_ip]).start()
            else:
                threading.Thread(target=self.sessions[stamp].update_session, args=[packet]).start()

                # in case we done working on connection
                # we will order by time and add to global list
                if self.sessions[stamp].got_fin is True:
                    to_add = self.sessions.pop(stamp)
                    sorted(to_add.income, key=key_func)
                    sorted(to_add.outcome, key=key_func)
                    sorted(to_add.combined, key=key_func)
                    lst.add(to_add)
        else:
            print "[Error] - Shit happens"
项目:voltha    作者:opencord    | 项目源码 | 文件源码
def run(self):
        # TODO this loop could be reconciled with the ofp Connection to become a
        # single select loop.
        self.sock = s = conf.L2listen(
            type=ETH_P_ALL,
            iface=self.iface,
            filter='inbound'
        )
        while not self.finished:
            try:
                sniffed = sniff(1, iface=self.iface, timeout=1, opened_socket=s)
                print 'Sniffer received %d packet(s)' % len(sniffed)
                for pkt in sniffed:
                    self.forward_packet(pkt)

            except Exception, e:
                logging.error("scapy.sniff error: %s" % e)
项目:Web-Stalker    作者:Dylan-halls    | 项目源码 | 文件源码
def main():
    sn = Sniffer()
    pkt = sniff(prn=sn.pkt_handle)
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def StartSF():
    conf.verb = 0
    return sniff(iface=ingress, filter='ip dst ' + get_ip_address(ingress) + ' and (udp port 6633)',
                 prn=lambda x: NSHForward(x))
项目:wifi-probe-hack    作者:kings-way    | 项目源码 | 文件源码
def capture(mon_interface):
    global flag_test
    """
    :type if_mon: string,  name of the interface operating in monitor mode
    """
    try:
        scapy.conf.iface=mon_interface
        scapy.sniff(iface=mon_interface, prn=on_receiving, store=0, filter="subtype probereq")
    except Exception, info:
        sys.stderr.write("\nError: " + str(info) + "\n")
        quit(0)
项目:recipe-catalog    作者:inocybe    | 项目源码 | 文件源码
def StartSF():
    conf.verb = 0
    return sniff(iface=ingress, filter='ip dst ' + get_ip_address(ingress) + ' and (udp port 6633)',
                 prn=lambda x: NSHForward(x))
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def __init__(self, config):
        super(CredentialSniffer, self).__init__(config, "credentialsniffer")
        self.running_interface = self.config["sniffing_interface"]
        self.running_bssid = self.config["bssid"]
        self.running_ssid = self.config["ssid"]
        self.log_dir = self.config["log_dir"]
        self.wifi_clients = {}
        self.wpa_handshakes = {}
        self.broadcasted_bssids = {}  # bssid: beacon_packet

        self.sniffer_thread = None
        self.should_stop = False
        self.log_lock = Lock()

        try:
            self.fixed_channel = int(self.config["fixed_sniffing_channel"])
        except:
            self.fixed_channel = 7

        try:
            self.timeout = int(self.config["timeout"])
        except:
            self.timeout = 30

        # When sniffing for credentials on interface running in Master mode
        # scapy will only be able to sniff for layer 3 packets (Networking)
        # so it never receives a Beacon packet (layer2) to verify the access point ssid
        # best to pass it as parameter since we are running the access point we know the ssid
        self.is_ap = False

    # This will be called by the AirSniffer
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def start_credential_sniffing(self):
        # TODO map packets to interface with threads
        try:
            sniff(  store       =   0,
                    prn         =   self.extract_credential_info,
                    stop_filter =   self._stop)
        except Exception as e:
            print "Error Occurred while sniffing."
            print str(e)
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def scapy_mon_arp():
    """Monitor ARP table on all interfaces."""
    mon_data = sniff(prn=scapy_mon_arp_callback, filter="arp", store=0)
    return mon_data
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def sniff_wifi_ssid_mac():
    """Scan for WiFi's and return SSID and ESSID."""
    sniff(count=0, prn=sniff_wifi_ssid_mac_callback, store=0)
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def sniff_wifi_ssid_mac_vendor():
    """Scan for WiFi's and return SSID and ESSID and Vendor."""
    sniff(count=0, prn=sniff_wifi_ssid_mac_callback, store=0)
项目:WMD    作者:ThomasTJdev    | 项目源码 | 文件源码
def detect_deauth(fm):
    """Detect deauth attacks by monitoring the deauth frame (fm.subtype==12)."""
    sniff(prn=detect_deauth_callback)
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def Sniffer(self, interface):
        self.sniffed_cnt = 0
        scapy2.sniff(iface="eth2", prn=self.pkt_callback, store=0, timeout=3)
项目:DET    作者:Exploit-install    | 项目源码 | 文件源码
def listen():
    app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
    scapy.sniff(filter="icmp", prn=analyze)
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def runTest(self):

        pkt = scapy2.Ether()
        pkt /= scapy2.IP(src="21.0.0.2", dst="22.0.0.2")
        pkt /= scapy2.TCP(dport = 80, flags="S", seq=42)
        pkt /= ("badabadaboom")

        t = Thread(target=self.Sniffer, args=("eth2",))
        t.start()
        scapy2.sendp(pkt, iface='eth2')
        sleep(4)
        # fail if no reply
        if self.sniffed_cnt == 0:
            self.assertTrue(False)


        #res = scapy2.sniff(iface="eth2", timeout=3)
        #print res
        #if res:
        #    raise

        #if reply:
        #    raise
        #print "================______====\n"
        #print reply
        #print error
        #print "================______====\n"
        #if reply:
        #    reply.show()
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
        #send_packet(self, 0, pkt)
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)


#        verify_packet(self, masked_exp_pkt, 1)


        #mpkt = Mask(pkt)
        #mpkt.set_do_not_care(0, 14*8)
        #mpkt.set_do_not_care(16*8, 49*8)
        #verify_packet(self, mpkt, 0)
        #(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
        #print "================______====\n"
        #y = 0
        #for x in rcv_pkt:
        #    print "%d - %X" % (y, ord(x))
        #    y +=1