我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用scapy.all.sr()。
def detect_inactive_hosts(scan_hosts): """ Scans the network to find scan_hosts are live or dead scan_hosts can be like 10.0.2.2-4 to cover range. See Scapy docs for specifying targets. """ global scheduler scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, )) inactive_hosts = [] try: ans, unans = sr(IP(dst=scan_hosts)/ICMP(), retry=0, timeout=1) ans.summary(lambda r : r.sprintf("%IP.src% is alive")) for inactive in unans: print ("%s is inactive" %inactive.dst) inactive_hosts.append(inactive.dst) print ("Total %d hosts are inactive" %(len(inactive_hosts))) except KeyboardInterrupt: exit(0)
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose): if verbose: logging.basicConfig(level=logging.INFO, format='%(message)s') conf.verb = False if iface: conf.iface = iface ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True) for s,r in ans: print(r.summary()) if not ICMP in r: r.show()
def detect_inactive_hosts(scan_hosts): """ Scans the network to find scan_hosts are live or dead scan_hosts can be like 10.0.2.2-4 to cover range. See Scapy docs for spefifying targets. """ global scheduler scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, )) inactive_hosts = [] try: ans, unans = sr(IP(dst = scan_hosts)/ICMP(), retry = 0, timeout = 1) ans.summary(lambda(s, r) : r.sprintf("%IP.src% is alive")) for inactive in unans: print "%s is inactive" %inactive.dst inactive_hosts.append(inactive.dst) print "Total %d hosts are inactive" %(len(inactive_hosts)) except KeyboardInterrupt: exit(0)
def heartbeat_call(self, event): # Send a packet to every WiFiTrilat server. for host in [s[1:s.find('/WiFi')] for s in self.client.discover()]: #sr(IP(dst=host)/ICMP(), iface=self.interface) cli.execute(command='ping -c 4 %s' % host, wait=False, shellexec=True)
def cmd_tcpscan(ip, port, iface, flags, sleeptime, timeout, show_all, verbose): if verbose: logging.basicConfig(level=logging.INFO, format='%(message)s') conf.verb = False if iface: conf.iface = iface port_regex = r'^[0-9,-]+$' if not re.match(port_regex, port): logging.critical("Invalid port specification") return False ports = [] for p in str(port).split(','): if '-' in p: first, last = p.split('-') for n in range(int(first), int(last)+1): ports.append(n) else: ports.append(int(p)) out = "{port} {sflags} -> {rflags}" pkts = IP(dst=ip)/TCP(flags=flags, dport=ports) if sleeptime: res = [] for pkt in pkts: logging.info(pkt.summary()) _ = sr1(pkt) if _: logging.info(_.summary()) res.append((pkt, _)) else: res, unans = sr(pkts, verbose=verbose) for s,r in res: if show_all or 'S' in r.sprintf(r"%TCP.flags%"): print(out.format( port=s[TCP].dport, sflags=s.sprintf(r"%TCP.flags%"), rflags=r.sprintf(r"%TCP.flags%") ))