我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.htonl()。
def netmask(self, interface_name, netmask): """ Checking interface first, if interface name found in Get().interfaces() validating Ipv4. After that applied ip address to interace interface_name = Applied Interface netmask = New netmask ip address """ interface_check = Get().interfaces valid_ipv4 = validators.ipv4(netmask) if not interface_name in interface_check: raise WrongInterfaceName("Wrong Interface Name %s" % interface_name) elif not valid_ipv4 is True: raise NotValidIPv4Address("Not Valid IPv4 Address %s" % netmask) else: prefix_len = self.get_net_size(netmask.split('.')) ifname = interface_name.encode(encoding='UTF-8') netmask = ctypes.c_uint32(~((2 ** (32 - prefix_len)) - 1)).value nmbytes = socket.htonl(netmask) ifreq = struct.pack('16sH2sI8s', ifname, AF_INET, b'\x00'*2, nmbytes, b'\x00'*8) fcntl.ioctl(self.sock, SIOCSIFNETMASK, ifreq)
def encrypt(self,text,appid): """??????? @param text: ??????? @return: ???????? """ # 16????????????? text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + appid # ??????????????????? pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # ?? cryptor = AES.new(self.key,self.mode,self.key[:16]) try: ciphertext = cryptor.encrypt(text) # ??BASE64???????????? return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext) except Exception,e: #print e return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def encrypt(self, text, appid): """??????? @param text: ??????? @return: ???????? """ # 16????????????? text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + to_binary(text) + appid # ??????????????????? pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # ?? cryptor = AES.new(self.key, self.mode, self.key[:16]) try: ciphertext = cryptor.encrypt(text) # ??BASE64???????????? return base64.b64encode(ciphertext) except Exception as e: raise EncryptAESError(e)
def encrypt(self, text, appid): """??????? @param text: ??????? @return: ???????? """ # 16????????????? pack_str = struct.pack(b"I", socket.htonl(len(text))) text = smart_bytes(self.get_random_str()) + pack_str + smart_bytes(text) + smart_bytes(appid) # ??????????????????? pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # ?? cryptor = AES.new(self.key, self.mode, self.key[:16]) try: ciphertext = cryptor.encrypt(text) # ??BASE64???????????? return WXBizMsgCrypt_OK, base64.b64encode(ciphertext) except Exception: return WXBizMsgCrypt_EncryptAES_Error, None
def encrypt(self,text,corpid): """??????? @param text: ??????? @return: ???????? """ # 16????????????? text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + corpid # ??????????????????? pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # ?? cryptor = AES.new(self.key,self.mode,self.key[:16]) try: ciphertext = cryptor.encrypt(text) # ??BASE64???????????? return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext) except Exception,e: print e return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def send(channel, *args): buf = marshall(args) value = htonl(len(buf)) size = struct.pack("L", value) channel.send(size) channel.send(buf)
def toString(self, noItemData=True): self.header.fileLength = ( self.header._FORMAT.size + self.header.itemCount * HuaweiFirmwareItem._FORMAT.size - 0x4c # FIXME: Can not find where does this bias come from. ) strs = [ self.header.toString()[20:], # Partial header used for calculate CRC32 value. ] if self.header.extraHeaderLength: strs.append(self.extraHeader) self.header.fileLength += len(self.extraHeader) data = [] for item in self.items: strs.append(item.toString()) data.append(item.data) self.header.fileLength += item.size # Convert to big endian. self.header.fileLength = socket.htonl(self.header.fileLength) # Update header CRC32 value. self.header.headerCrc = seqCrc32(strs) if not noItemData: strs.extend(data) # All data are present, now update file CRC32 value. strs[0] = self.header.toString()[12:] self.header.fileCrc = seqCrc32(strs) # Using the latest header with correct CRC32 value and file length. strs[0] = self.header.toString() return ''.join(strs)
def route_to_dbus(route, family): addr, netmask, gateway, metric = route return [ fixups.addr_to_dbus(addr, family), fixups.mask_to_dbus(netmask), fixups.addr_to_dbus(gateway, family), socket.htonl(metric) ] # Constants below are generated with makeconstants.py. Do not edit manually.
def send(channel, *args): """ Send a message to a channel """ buf = pickle.dumps(args) value = socket.htonl(len(buf)) size = struct.pack("L",value) channel.send(size) channel.send(buf)
def send(channel, *args): buffer = pickle.dumps(args) value = socket.htonl(len(buffer)) size = struct.pack("L",value) channel.send(size) channel.send(buffer)
def convert_integer(): data = 1234 # 32-bit print ("Original: %s => Long host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data))) # 16-bit print ("Original: %s => Short host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
def encrypt(self, xml): """??????? @param text: ??????? @return: ???????? """ # 16????????????? xml = xml.encode('utf-8') if PY2: text = self.get_random_str() +\ struct.pack("I", socket.htonl(len(xml))) + xml + self.appid else: text = self.get_random_str().encode('utf-8') +\ struct.pack("I", socket.htonl(len(xml))) +\ xml + self.appid.encode('utf-8') # ??????????????????? pkcs7 = PKCS7Encoder() text = pkcs7.encode(text) # ?? cryptor = AES.new(self.key, self.mode, self.key[:16]) try: ciphertext = cryptor.encrypt(text) # ??BASE64???????????? result = base64.b64encode(ciphertext).decode("utf8") return Crypt_OK, result except Exception: return Crypt_EncryptAES_Error, None
def handle_write(self): while len(self.buffer) > 0: payload, params = self.buffer.pop(0) sndrcvinfo = sctp.sndrcvinfo() sndrcvinfo.ppid = socket.htonl(params.get('ppid',0)) fromaddr,flags,msg,notif="",128,payload[:1460],sndrcvinfo sent = self.sctp_send(msg,ppid=sndrcvinfo.ppid,flags=notif.flags)
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def send(channel,*args): buffers=pickle.dumps(args) value=socket.htonl(len(buffers)) size=struct.pack("L",value) channel.send(size) channel.send(buffers)
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1L<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def num2ip(self, num): """???10???????IP Args: num: ???? Return: ???? ??? IP ?: 10.10.10.11 Raise: None """ return socket.inet_ntoa(struct.pack('I', socket.htonl(num)))
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def int_to_ip(int_ip): return socket.inet_ntoa(struct.pack('I',socket.htonl(int_ip)))
def int_to_ip(self, int_ip): return socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip)))
def _encrypt(self, text, _id): text = to_binary(text) tmp_list = [] tmp_list.append(to_binary(self.get_random_string())) length = struct.pack(b'I', socket.htonl(len(text))) tmp_list.append(length) tmp_list.append(text) tmp_list.append(to_binary(_id)) text = b''.join(tmp_list) text = PKCS7Encoder.encode(text) ciphertext = to_binary(self.cipher.encrypt(text)) return base64.b64encode(ciphertext)
def int_to_ipstr(int_ip): return socket.inet_ntoa( struct.pack('I', socket.htonl(int_ip)) )
def num2ip(arg, int_ip): """ IP address and number conversion arg == ip, Convert digital to IP address """ if arg == 'ip': ip = socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip))) else: ip = str(socket.ntohl(struct.unpack('I', socket.inet_aton(int_ip))[0])) return ip
def num2ip(arg, int_ip): """ IP address and number conversion """ if arg == 'ip': ip = socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip))) else: ip = str(socket.ntohl(struct.unpack('I', socket.inet_aton(int_ip))[0])) return ip
def convert_integer(): data = 1234 # 32-bit print "Original: %s => Long host byte order: %s, Network byte order: %s"\ %(data, socket.ntohl(data), socket.htonl(data)) # 16-bit print "Original: %s => Short host byte order: %s, Network byte order: %s"\ %(data, socket.ntohs(data), socket.htons(data))
def send(channel, *args): buffer = cPickle.dumps(args) value = socket.htonl(len(buffer)) size = struct.pack("L", value) channel.send(size) channel.send(buffer)
def read_routes(): f=open("/proc/net/route","r") routes = [] s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME)) addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME)) msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0]) dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr)) else: warning("Interface lo: unkown address family (%i)"% addrfamily) for l in f.readlines()[1:]: iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split() flags = int(flags,16) if flags & RTF_UP == 0: continue if flags & RTF_REJECT: continue try: ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff)) except IOError: # interface is present in routing tables but does not have any assigned IP ifaddr="0.0.0.0" else: addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) else: warning("Interface %s: unkown address family (%i)"%(iff, addrfamily)) continue routes.append((socket.htonl(long(dst,16))&0xffffffffL, socket.htonl(long(msk,16))&0xffffffffL, scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))), iff, ifaddr)) f.close() return routes ############ ### IPv6 ### ############
def read_routes(): try: f=open("/proc/net/route","r") except IOError: warning("Can't open /proc/net/route !") return [] routes = [] s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME)) addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME)) msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0]) dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr)) else: warning("Interface lo: unkown address family (%i)"% addrfamily) for l in f.readlines()[1:]: iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split() flags = int(flags,16) if flags & RTF_UP == 0: continue if flags & RTF_REJECT: continue try: ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff)) except IOError: # interface is present in routing tables but does not have any assigned IP ifaddr="0.0.0.0" else: addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) else: warning("Interface %s: unkown address family (%i)"%(iff, addrfamily)) continue routes.append((socket.htonl(long(dst,16))&0xffffffffL, socket.htonl(long(msk,16))&0xffffffffL, scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))), iff, ifaddr)) f.close() return routes ############ ### IPv6 ### ############
def read_routes(): try: f=open("/proc/net/route","rb") except IOError: warning("Can't open /proc/net/route !") return [] routes = [] s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8'))) addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8'))) msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0]) dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr)) else: warning("Interface lo: unkown address family (%i)"% addrfamily) for l in f.readlines()[1:]: iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split() flags = int(flags,16) if flags & RTF_UP == 0: continue if flags & RTF_REJECT: continue try: ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff)) except IOError: # interface is present in routing tables but does not have any assigned IP ifaddr="0.0.0.0" else: addrfamily = struct.unpack("h",ifreq[16:18])[0] if addrfamily == socket.AF_INET: ifaddr = scapy.utils.inet_ntoa(ifreq[20:24]) else: warning("Interface %s: unkown address family (%i)"%(iff, addrfamily)) continue routes.append((socket.htonl(int(dst,16))&0xffffffff, socket.htonl(int(msk,16))&0xffffffff, scapy.utils.inet_ntoa(struct.pack("I",int(gw,16))), iff.decode('utf-8'), ifaddr)) f.close() return routes ############ ### IPv6 ### ############