我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用util.byte2int()。
def dump_packet(data): def is_ascii(data): if byte2int(data) >= 65 and byte2int(data) <= 122: #data.isalnum(): return data return '.' try: print "packet length %d" % len(data) print "method call[1]: %s" % sys._getframe(1).f_code.co_name print "method call[2]: %s" % sys._getframe(2).f_code.co_name print "method call[3]: %s" % sys._getframe(3).f_code.co_name print "method call[4]: %s" % sys._getframe(4).f_code.co_name print "method call[5]: %s" % sys._getframe(5).f_code.co_name print "-" * 88 except ValueError: pass dump_data = [data[i:i+16] for i in xrange(len(data)) if i%16 == 0] for d in dump_data: print ' '.join(map(lambda x:"%02X" % byte2int(x), d)) + \ ' ' * (16 - len(d)) + ' ' * 2 + \ ' '.join(map(lambda x:"%s" % is_ascii(x), d)) print "-" * 88 print ""
def _scramble_323(password, message): hash_pass = _hash_password_323(password) hash_message = _hash_password_323(message[:SCRAMBLE_LENGTH_323]) hash_pass_n = struct.unpack(">LL", hash_pass) hash_message_n = struct.unpack(">LL", hash_message) rand_st = RandStruct_323(hash_pass_n[0] ^ hash_message_n[0], hash_pass_n[1] ^ hash_message_n[1]) outbuf = StringIO.StringIO() for _ in xrange(min(SCRAMBLE_LENGTH_323, len(message))): outbuf.write(int2byte(int(rand_st.my_rnd() * 31) + 64)) extra = int2byte(int(rand_st.my_rnd() * 31)) out = outbuf.getvalue() outbuf = StringIO.StringIO() for c in out: outbuf.write(int2byte(byte2int(c) ^ byte2int(extra))) return outbuf.getvalue()
def __recv_packet(self): """Parse the packet header and read entire packet payload into buffer.""" packet_header = self.connection.rfile.read(4) if len(packet_header) < 4: raise OperationalError(2013, "Lost connection to MySQL server during query") if DEBUG: dump_packet(packet_header) packet_length_bin = packet_header[:3] self.__packet_number = byte2int(packet_header[3]) # TODO: check packet_num is correct (+1 from last packet) bin_length = packet_length_bin + int2byte(0) # pad little-endian number bytes_to_read = struct.unpack('<I', bin_length)[0] recv_data = self.connection.rfile.read(bytes_to_read) if len(recv_data) < bytes_to_read: raise OperationalError(2013, "Lost connection to MySQL server during query") if DEBUG: dump_packet(recv_data) self.__data = recv_data
def read_length_coded_binary(self): """Read a 'Length Coded Binary' number from the data buffer. Length coded numbers can be anywhere from 1 to 9 bytes depending on the value of the first byte. """ c = byte2int(self.read(1)) if c == NULL_COLUMN: return None if c < UNSIGNED_CHAR_COLUMN: return c elif c == UNSIGNED_SHORT_COLUMN: return unpack_uint16(self.read(UNSIGNED_SHORT_LENGTH)) elif c == UNSIGNED_INT24_COLUMN: return unpack_int24(self.read(UNSIGNED_INT24_LENGTH)) elif c == UNSIGNED_INT64_COLUMN: # TODO: what was 'longlong'? confirm it wasn't used? return unpack_int64(self.read(UNSIGNED_INT64_LENGTH))
def __parse_field_descriptor(self): """Parse the 'Field Descriptor' (Metadata) packet. This is compatible with MySQL 4.1+ (not compatible with MySQL 4.0). """ self.catalog = self.read_length_coded_string() self.db = self.read_length_coded_string() self.table_name = self.read_length_coded_string() self.org_table = self.read_length_coded_string() self.name = self.read_length_coded_string().decode(self.connection.charset) self.org_name = self.read_length_coded_string() self.advance(1) # non-null filler self.charsetnr = struct.unpack('<H', self.read(2))[0] self.length = struct.unpack('<I', self.read(4))[0] self.type_code = byte2int(self.read(1)) self.flags = struct.unpack('<H', self.read(2))[0] self.scale = byte2int(self.read(1)) # "decimals" self.advance(2) # filler (always 0x00) # 'default' is a length coded binary and is still in the buffer? # not used for normal result sets...
def _hash_password_323(password): nr = 1345345333L add = 7L nr2 = 0x12345671L for c in [byte2int(x) for x in password if x not in (' ', '\t')]: nr^= (((nr & 63)+add)*c)+ (nr << 8) & 0xFFFFFFFF nr2= (nr2 + ((nr2 << 8) ^ nr)) & 0xFFFFFFFF add= (add + c) & 0xFFFFFFFF r1 = nr & ((1L << 31) - 1L) # kill sign bits r2 = nr2 & ((1L << 31) - 1L) # pack return struct.pack(">LL", r1, r2)
def is_eof_packet(self): return byte2int(self.get_bytes(0)) == 254 # 'fe'
def is_resultset_packet(self): field_count = byte2int(self.get_bytes(0)) return field_count >= 1 and field_count <= 250
def is_error_packet(self): return byte2int(self.get_bytes(0)) == 255
def _get_server_information(self): i = 0 packet = MysqlPacket(self) data = packet.get_all_data() if DEBUG: dump_packet(data) #packet_len = byte2int(data[i:i+1]) #i += 4 self.protocol_version = byte2int(data[i:i+1]) i += 1 server_end = data.find(int2byte(0), i) # TODO: is this the correct charset? should it be default_charset? self.server_version = data[i:server_end].decode(self.charset) i = server_end + 1 self.server_thread_id = struct.unpack('<h', data[i:i+2]) i += 4 self.salt = data[i:i+8] i += 9 if len(data) >= i + 1: i += 1 self.server_capabilities = struct.unpack('<h', data[i:i+2])[0] i += 1 self.server_language = byte2int(data[i:i+1]) self.server_charset = charset_by_id(self.server_language).name i += 16 if len(data) >= i+12-1: rest_salt = data[i:i+12] self.salt += rest_salt
def _read_result_packet(self): self.field_count = byte2int(self.first_packet.read(1)) self._get_descriptions() self._read_rowdata_packet()