我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用socket.IPPROTO_ICMP。
def serve(self): server_socket = None try: common.internal_print("Starting server: {0} on {1}".format(self.get_module_name(), self.config.get("Global", "serverbind"))) server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) whereto = (self.config.get("Global", "serverbind"), self.ICMP_fake_serverport) self.comms_socket = server_socket self.serverorclient = 1 self.authenticated = False self.communication_initialization() self.communication(False) except KeyboardInterrupt: self.cleanup() return self.cleanup() return
def answers(self, other): if not isinstance(other,IP): return 0 if conf.checkIPaddr and (self.dst != other.src): return 0 if ( (self.proto == socket.IPPROTO_ICMP) and (isinstance(self.payload, ICMP)) and (self.payload.type in [3,4,5,11,12]) ): # ICMP error message return self.payload.payload.answers(other) else: if ( (conf.checkIPaddr and (self.src != other.dst)) or (self.proto != other.proto) ): return 0 return self.payload.answers(other.payload)
def init_func(self, creator_fd): self.__creator_fd = creator_fd self.__sent = [] family = socket.AF_INET s = socket.socket(family, socket.SOCK_RAW, socket.IPPROTO_UDP | socket.IPPROTO_ICMP | socket.IPPROTO_UDP | 136) s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) s.setblocking(0) self.__socket = s self.set_fileno(s.fileno()) self.register(self.fileno) self.add_evt_read(self.fileno) return self.fileno
def answers(self, other): if not isinstance(other,IP): return 0 if conf.checkIPaddr: if other.dst == "224.0.0.251" and self.dst == "224.0.0.251": # mDNS return self.payload.answers(other.payload) elif (self.dst != other.src): return 0 if ( (self.proto == socket.IPPROTO_ICMP) and (isinstance(self.payload, ICMP)) and (self.payload.type in [3,4,5,11,12]) ): # ICMP error message return self.payload.payload.answers(other) else: if ( (conf.checkIPaddr and (self.src != other.dst)) or (self.proto != other.proto) ): return 0 return self.payload.answers(other.payload)
def client(self): try: common.internal_print("Starting client: {0}".format(self.get_module_name())) server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) self.server_tuple = (self.config.get("Global", "remoteserverip"), self.ICMP_fake_serverport) self.comms_socket = server_socket self.serverorclient = 0 self.authenticated = False self.communication_initialization() self.do_auth() self.communication(False) except KeyboardInterrupt: self.do_logoff() self.cleanup() raise self.cleanup() return
def check(self): try: common.internal_print("Checking module on server: {0}".format(self.get_module_name())) server_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) self.server_tuple = (self.config.get("Global", "remoteserverip"), self.ICMP_fake_serverport) self.comms_socket = server_socket self.serverorclient = 0 self.authenticated = True self.communication_initialization() self.do_check() self.communication(True) except KeyboardInterrupt: self.cleanup() raise except socket.timeout: common.internal_print("Checking failed: {0}".format(self.get_module_name()), -1) self.cleanup() return
def hashret(self): if ( (self.proto == socket.IPPROTO_ICMP) and (isinstance(self.payload, ICMP)) and (self.payload.type in [3,4,5,11,12]) ): return self.payload.payload.hashret() else: if conf.checkIPsrc and conf.checkIPaddr: return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret() else: return struct.pack("B", self.proto)+self.payload.hashret()
def ping(addr, size=16): # Open a raw socket to send the request. NB: you need to be root to have # permission to open a raw socket. sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) # Generate a positive, random 16-bit integer to serve as the id for this # ping id = random.randint(1, 2**16 - 1) # Send the message and wait for the reply send(sock, addr, id, size) duration = yield from recv(sock, id) return duration
def parse(self): """ Update the headers in self.match based on self.data Parses the relevant header features out of the packet, using the table outlined in the OF1.1 spec, Figure 4 """ self.bytes = len(self.data) self.match.in_port = self.in_port self.match.type = ofp.OFPMT_STANDARD self.match.length = ofp.OFPMT_STANDARD_LENGTH self.match.wildcards = 0 self.match.nw_dst_mask = 0 self.match.nw_dst_mask = 0 self.match.dl_dst_mask = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] self.match.dl_src_mask = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] self.mpls_tag_offset = None self.ip_header_offset = None idx = 0 try: idx = self._parse_l2(idx) if self.match.dl_type == ETHERTYPE_IP: self.ip_header_offset = idx idx = self._parse_ip(idx) if self.match.nw_proto in [ socket.IPPROTO_TCP, socket.IPPROTO_UDP, socket.IPPROTO_ICMP]: self.tcp_header_offset = idx if self.match.nw_proto != socket.IPPROTO_ICMP: idx = self._parse_l4(idx) else: idx = self._parse_icmp(idx) elif self.match.dl_type == ETHERTYPE_ARP: self._parse_arp(idx) except (parse_error), e: self.logger.warn("Giving up on parsing packet, got %s" % (str(e))) return None return self.match
def set_tp_src(self, tp_src): if self.tcp_header_offset is None: return if (self.match.nw_proto == socket.IPPROTO_TCP or self.match.nw_proto == socket.IPPROTO_UDP): self._set_2bytes(self.tcp_header_offset, tp_src) elif (self.match.nw_proto == socket.IPPROTO_ICMP): self._set_1bytes(self.tcp_header_offset, tp_src) self._update_l4_checksum() self.match.tp_src = tp_src
def set_tp_dst(self, tp_dst): if self.tcp_header_offset is None: return if (self.match.nw_proto == socket.IPPROTO_TCP or self.match.nw_proto == socket.IPPROTO_UDP): self._set_2bytes(self.tcp_header_offset +2, tp_dst) elif (self.match.nw_proto == socket.IPPROTO_ICMP): self._set_1bytes(self.tcp_header_offset + 1, tp_dst) self._update_l4_checksum() self.match.tp_dst = tp_dst
def hashret(self): if ( (self.proto == socket.IPPROTO_ICMP) and (isinstance(self.payload, ICMP)) and (self.payload.type in [3,4,5,11,12]) ): return self.payload.payload.hashret() else: if self.dst == "224.0.0.251": # mDNS return struct.pack("B", self.proto) + self.payload.hashret() if conf.checkIPsrc and conf.checkIPaddr: return strxor(inet_aton(self.src),inet_aton(self.dst))+struct.pack("B",self.proto)+self.payload.hashret() else: return struct.pack("B", self.proto)+self.payload.hashret()
def createInternetSocket(self): s = socket.socket( socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) s.setblocking(0) fd = s.fileno() # Set close-on-exec flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = flags | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags) return s
def simple_icmp_packet(self, pktlen=100, dl_dst='00:01:02:03:04:05', dl_src='00:06:07:08:09:0a', dl_vlan_enable=False, dl_vlan_type=ETHERTYPE_VLAN, dl_vlan=0, dl_vlan_pcp=0, dl_vlan_cfi=0, mpls_type=None, mpls_tags=None, ip_src='192.168.0.1', ip_dst='192.168.0.2', ip_tos=0, ip_ttl=64, icmp_type=8, # ICMP_ECHO_REQUEST icmp_code=0, ): """ Return a simple dataplane ICMP packet Supports a few parameters: @param len Length of packet in bytes w/o CRC @param dl_dst Destinatino MAC @param dl_src Source MAC @param dl_vlan_enable True if the packet is with vlan, False otherwise @param dl_vlan_type Ether type for VLAN @param dl_vlan VLAN ID @param dl_vlan_pcp VLAN priority @param ip_src IP source @param ip_dst IP destination @param ip_tos IP ToS @param tcp_dport TCP destination port @param ip_sport TCP source port Generates a simple TCP request. Users shouldn't assume anything about this packet other than that it is a valid ethernet/IP/TCP frame. """ self.data = "" self._make_ip_packet(dl_dst, dl_src, dl_vlan_enable, dl_vlan_type, dl_vlan, dl_vlan_pcp, dl_vlan_cfi, mpls_type, mpls_tags, ip_tos, ip_ttl, ip_src, ip_dst, socket.IPPROTO_ICMP) # Add ICMP header self.data += struct.pack("!BBHHH", icmp_type, icmp_code, 0, # icmp.checksum os.getpid() & 0xffff, # icmp.echo.id Packet.icmp_counter # icmp.echo.seq ) Packet.icmp_counter += 1 # Fill out packet self.data += "D" * (pktlen - len(self.data)) return self