我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用socket.IPPROTO_ESP。
def _create_sa(self, src_selector, dst_selector, src_port, dst_port, spi, ip_proto, ipsec_proto, mode, src, dst, enc_algorith, sk_e, auth_algorithm, sk_a): usersa = XfrmUserSaInfo( sel=XfrmSelector(family=socket.AF_INET, daddr=XfrmAddress.from_ipaddr(dst_selector[0]), saddr=XfrmAddress.from_ipaddr(src_selector[0]), dport=dst_port, sport=src_port, dport_mask=0 if dst_port == 0 else 0xFFFF, sport_mask=0 if src_port == 0 else 0xFFFF, prefixlen_d=dst_selector.prefixlen, prefixlen_s=src_selector.prefixlen, proto=ip_proto), id=XfrmId(daddr=XfrmAddress.from_ipaddr(dst), proto=(socket.IPPROTO_ESP if ipsec_proto == Proposal.Protocol.ESP else socket.IPPROTO_AH), spi=create_byte_array(spi)), family=socket.AF_INET, saddr=XfrmAddress.from_ipaddr(src), mode=mode, lft=XfrmLifetimeCfg.infinite(), ) attributes = {} if ipsec_proto == Proposal.Protocol.ESP: attributes[XFRMA_ALG_CRYPT] = XfrmAlgo.build(alg_name=self._cipher_names[enc_algorith], key=sk_e) attributes[XFRMA_ALG_AUTH] = XfrmAlgo.build(alg_name=self._auth_names[auth_algorithm], key=sk_a) self.send_recv(XFRM_MSG_NEWSA, (NLM_F_REQUEST | NLM_F_ACK), usersa, attributes)
def _create_policy(self, src_selector, dst_selector, src_port, dst_port, ip_proto, direction, ipsec_proto, mode, src, dst, index=0): policy = XfrmUserPolicyInfo( sel=XfrmSelector(family=socket.AF_INET, daddr=XfrmAddress.from_ipaddr(dst_selector[0]), saddr=XfrmAddress.from_ipaddr(src_selector[0]), dport=dst_port, sport=src_port, dport_mask=0 if dst_port == 0 else 0xFFFF, sport_mask=0 if src_port == 0 else 0xFFFF, prefixlen_d=dst_selector.prefixlen, prefixlen_s=src_selector.prefixlen, proto=ip_proto), dir=direction, index=index, action=XFRM_POLICY_ALLOW, lft=XfrmLifetimeCfg.infinite(), ) template = XfrmUserTmpl( id=XfrmId(daddr=XfrmAddress.from_ipaddr(dst), proto=(socket.IPPROTO_ESP if ipsec_proto == Proposal.Protocol.ESP else socket.IPPROTO_AH)), family=socket.AF_INET, saddr=XfrmAddress.from_ipaddr(src), aalgos=0xFFFFFFFF, ealgos=0xFFFFFFFF, calgos=0xFFFFFFFF, mode=mode) self.send_recv(XFRM_MSG_NEWPOLICY, (NLM_F_REQUEST | NLM_F_ACK), policy, {XFRMA_TMPL: template})
def delete_sa(self, daddr, proto, spi): xfrm_id = XfrmUserSaId( daddr=XfrmAddress.from_ipaddr(daddr), family=socket.AF_INET, proto=socket.IPPROTO_ESP if proto == Proposal.Protocol.ESP else socket.IPPROTO_AH, spi=create_byte_array(spi)) try: self.send_recv(XFRM_MSG_DELSA, (NLM_F_REQUEST | NLM_F_ACK), xfrm_id) except NetlinkError as ex: logging.error('Could not delete IPsec SA with SPI: {}. {}'.format(hexstring(spi), ex))
def _encrypt_esp(self, pkt, seq_num=None, iv=None): if iv is None: iv = self.crypt_algo.generate_iv() else: if len(iv) != self.crypt_algo.iv_size: raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) if self.tunnel_header: tunnel = self.tunnel_header.copy() if tunnel.version == 4: del tunnel.proto del tunnel.len del tunnel.chksum else: del tunnel.nh del tunnel.plen pkt = tunnel.__class__(str(tunnel / pkt)) ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) esp.data = payload esp.nh = nh esp = self.crypt_algo.pad(esp) esp = self.crypt_algo.encrypt(esp, self.crypt_key) self.auth_algo.sign(esp, self.auth_key) if self.nat_t_header: nat_t_header = self.nat_t_header.copy() nat_t_header.chksum = 0 del nat_t_header.len if ip_header.version == 4: del ip_header.proto else: del ip_header.nh ip_header /= nat_t_header if ip_header.version == 4: ip_header.len = len(ip_header) + len(esp) del ip_header.chksum ip_header = ip_header.__class__(str(ip_header)) else: ip_header.plen = len(ip_header.payload) + len(esp) # sequence number must always change, unless specified by the user if seq_num is None: self.seq_num += 1 return ip_header / esp
def _encrypt_esp(self, pkt, seq_num=None, iv=None): if iv is None: iv = self.crypt_algo.generate_iv() if self.crypt_salt: iv = self.crypt_salt + iv else: if len(iv) != self.crypt_algo.iv_size: raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) if self.tunnel_header: tunnel = self.tunnel_header.copy() if tunnel.version == 4: del tunnel.proto del tunnel.len del tunnel.chksum else: del tunnel.nh del tunnel.plen pkt = tunnel.__class__(bytes(tunnel / pkt)) ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) esp.data = payload esp.nh = nh esp = self.crypt_algo.pad(esp) esp = self.crypt_algo.encrypt(esp, self.crypt_key) self.auth_algo.sign(esp, self.auth_key) if self.nat_t_header: nat_t_header = self.nat_t_header.copy() nat_t_header.chksum = 0 del nat_t_header.len if ip_header.version == 4: del ip_header.proto else: del ip_header.nh ip_header /= nat_t_header if ip_header.version == 4: ip_header.len = len(ip_header) + len(esp) del ip_header.chksum ip_header = ip_header.__class__(bytes(ip_header)) else: ip_header.plen = len(ip_header.payload) + len(esp) # sequence number must always change, unless specified by the user if seq_num is None: self.seq_num += 1 return ip_header / esp
def _encrypt_esp(self, pkt, seq_num=None, iv=None): if iv is None: iv = self.crypt_algo.generate_iv() else: if len(iv) != self.crypt_algo.iv_size: raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) if self.tunnel_header: tunnel = self.tunnel_header.copy() if tunnel.version == 4: del tunnel.proto del tunnel.len del tunnel.chksum else: del tunnel.nh del tunnel.plen pkt = tunnel.__class__(bytes(tunnel / pkt)) ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) esp.data = payload esp.nh = nh esp = self.crypt_algo.pad(esp) esp = self.crypt_algo.encrypt(esp, self.crypt_key) self.auth_algo.sign(esp, self.auth_key) if self.nat_t_header: nat_t_header = self.nat_t_header.copy() nat_t_header.chksum = 0 del nat_t_header.len if ip_header.version == 4: del ip_header.proto else: del ip_header.nh ip_header /= nat_t_header if ip_header.version == 4: ip_header.len = len(ip_header) + len(esp) del ip_header.chksum ip_header = ip_header.__class__(bytes(ip_header)) else: ip_header.plen = len(ip_header.payload) + len(esp) # sequence number must always change, unless specified by the user if seq_num is None: self.seq_num += 1 return ip_header / esp