我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.int2byte()。
def set_alpn_protos(self, protos): """ Specify the clients ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) input_str_len = _ffi.cast("unsigned", len(protostr)) _lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos): """ Specify the client's ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) input_str_len = _ffi.cast("unsigned", len(protostr)) _lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def _truncate_digest(digest, order_bits): digest_len = len(digest) if 8 * digest_len > order_bits: digest_len = (order_bits + 7) // 8 digest = digest[:digest_len] if 8 * digest_len > order_bits: rshift = 8 - (order_bits & 0x7) assert 0 < rshift < 8 mask = 0xFF >> rshift << rshift # Set the bottom rshift bits to 0 digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask) return digest
def _SignedVarintEncoder(): """Return an encoder for a basic signed varint value (does not include tag).""" def EncodeSignedVarint(write, value): if value < 0: value += (1 << 64) bits = value & 0x7f value >>= 7 while value: write(six.int2byte(0x80|bits)) bits = value & 0x7f value >>= 7 return write(six.int2byte(bits)) return EncodeSignedVarint
def check(self, fmt, value): from random import randrange # build a buffer which is surely big enough to contain what we need # and check: # 1) that we correctly write the bytes we expect # 2) that we do NOT write outside the bounds # pattern = [six.int2byte(randrange(256)) for _ in range(256)] pattern = b''.join(pattern) buf = bytearray(pattern) buf2 = bytearray(pattern) offset = 16 pack_into(ord(fmt), buf, offset, value) struct.pack_into(fmt, buf2, offset, value) assert buf == buf2 # # check that it raises if it's out of bound out_of_bound = 256-struct.calcsize(fmt)+1 pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def _bytes_text(code_point_iter, quote, prefix=b'', suffix=b''): quote_code_point = six.byte2int(quote) buf = BytesIO() buf.write(prefix) buf.write(quote) for code_point in code_point_iter: if code_point == quote_code_point: buf.write(b'\\' + quote) elif code_point == six.byte2int(b'\\'): buf.write(b'\\\\') elif _is_printable_ascii(code_point): buf.write(six.int2byte(code_point)) else: buf.write(_escape(code_point)) buf.write(quote) buf.write(suffix) return buf.getvalue()
def encode(self, b64=False, always_bytes=True): """Encode the packet for transmission.""" if self.binary and not b64: encoded_packet = six.int2byte(self.packet_type) else: encoded_packet = six.text_type(self.packet_type) if self.binary and b64: encoded_packet = 'b' + encoded_packet if self.binary: if b64: encoded_packet += base64.b64encode(self.data).decode('utf-8') else: encoded_packet += self.data elif isinstance(self.data, six.string_types): encoded_packet += self.data elif isinstance(self.data, dict) or isinstance(self.data, list): encoded_packet += self.json.dumps(self.data, separators=(',', ':')) elif self.data is not None: encoded_packet += str(self.data) if always_bytes and not isinstance(encoded_packet, binary_types): encoded_packet = encoded_packet.encode('utf-8') return encoded_packet
def encode(self, b64=False): """Encode the payload for transmission.""" encoded_payload = b'' for pkt in self.packets: encoded_packet = pkt.encode(b64=b64) packet_len = len(encoded_packet) if b64: encoded_payload += str(packet_len).encode('utf-8') + b':' + \ encoded_packet else: binary_len = b'' while packet_len != 0: binary_len = six.int2byte(packet_len % 10) + binary_len packet_len = int(packet_len / 10) if not pkt.binary: encoded_payload += b'\0' else: encoded_payload += b'\1' encoded_payload += binary_len + b'\xff' + encoded_packet return encoded_payload
def _initialize_telnet(connection): logger.info('Initializing telnet connection') # Iac Do Linemode connection.send(IAC + DO + LINEMODE) # Suppress Go Ahead. (This seems important for Putty to do correct echoing.) # This will allow bi-directional operation. connection.send(IAC + WILL + SUPPRESS_GO_AHEAD) # Iac sb connection.send(IAC + SB + LINEMODE + MODE + int2byte(0) + IAC + SE) # IAC Will Echo connection.send(IAC + WILL + ECHO) # Negotiate window size connection.send(IAC + DO + NAWS)
def test_getbytes(self): # Test getbytes method. all_bytes = b''.join(six.int2byte(n) for n in range(256)) with self.fs.open('foo', 'wb') as f: f.write(all_bytes) self.assertEqual(self.fs.getbytes('foo'), all_bytes) _all_bytes = self.fs.getbytes('foo') self.assertIsInstance(_all_bytes, bytes) self.assertEqual(_all_bytes, all_bytes) with self.assertRaises(errors.ResourceNotFound): self.fs.getbytes('foo/bar') self.fs.makedir('baz') with self.assertRaises(errors.FileExpected): self.fs.getbytes('baz')
def _send_command(self, command, parameters): try: if self._serial.inWaiting() != 0: raise SIReaderException( 'Input buffer must be empty before sending command. Currently %s bytes in the input buffer.' % self._serial.inWaiting()) command_string = command + int2byte(len(parameters)) + parameters crc = SIReader._crc(command_string) cmd = SIReader.STX + command_string + crc + SIReader.ETX if self._debug: print("==>> command '%s', parameters %s, crc %s" % (hexlify(command).decode('ascii'), ' '.join( [hexlify(int2byte(c)).decode('ascii') for c in parameters]), hexlify(crc).decode('ascii'), )) self._serial.write(cmd) except (SerialException, OSError) as msg: raise SIReaderException('Could not send command: %s' % msg) if self._logfile: self._logfile.write('s %s %s\n' % (datetime.now(), cmd)) self._logfile.flush() os.fsync(self._logfile) return self._read_command()
def serialize(self): # fixup byte_length = (self.length + 7) // 8 bin_addr = self._to_bin(self.addr) if (self.length % 8) == 0: bin_addr = bin_addr[:byte_length] else: # clear trailing bits in the last octet. # rfc doesn't require this. mask = 0xff00 >> (self.length % 8) last_byte = six.int2byte( six.indexbytes(bin_addr, byte_length - 1) & mask) bin_addr = bin_addr[:byte_length - 1] + last_byte self.addr = self._from_bin(bin_addr) buf = bytearray() msg_pack_into(self._PACK_STR, buf, 0, self.length) return buf + bytes(bin_addr)
def encode(self, b64=False, always_bytes=True): """Encode the packet for transmission.""" if self.binary and not b64: encoded_packet = six.int2byte(self.packet_type) else: encoded_packet = six.text_type(self.packet_type) if self.binary and b64: encoded_packet = 'b' + encoded_packet if self.binary: if b64: encoded_packet += base64.b64encode(self.data).decode('utf-8') else: encoded_packet += self.data elif isinstance(self.data, six.string_types): encoded_packet += self.data elif isinstance(self.data, dict) or isinstance(self.data, list): encoded_packet += self.json.dumps(self.data, separators=(',', ':')) elif self.data is not None: encoded_packet += str(self.data) if always_bytes and not isinstance(encoded_packet, six.binary_type): encoded_packet = encoded_packet.encode('utf-8') return encoded_packet
def test_read_rain_collector_type(self, mock_read_config_setting): mock_read_config_setting.return_value = six.int2byte(0b10101110) collector_type = RainCollectorTypeSerial(self.communicator.read_rain_collector_type()) self.assertEqual(RainCollectorTypeSerial.millimeters_0_1, collector_type) mock_read_config_setting.assert_called_once_with('2B', '01') mock_read_config_setting.reset_mock() mock_read_config_setting.return_value = six.int2byte(0b10011110) collector_type = RainCollectorTypeSerial(self.communicator.read_rain_collector_type()) self.assertEqual(RainCollectorTypeSerial.millimeters_0_2, collector_type) mock_read_config_setting.assert_called_once_with('2B', '01') mock_read_config_setting.reset_mock() mock_read_config_setting.return_value = six.int2byte(0b10001110) collector_type = RainCollectorTypeSerial(self.communicator.read_rain_collector_type()) self.assertEqual(RainCollectorTypeSerial.inches_0_01, collector_type) mock_read_config_setting.assert_called_once_with('2B', '01')
def decode(value): """Decodes bytes from a base65536 string.""" stream = io.BytesIO() done = False for ch in value: code_point = ord(ch) b1 = code_point & ((1 << 8) - 1) try: b2 = B2[code_point - b1] except KeyError: raise ValueError('Invalid base65536 code ' 'point: %d' % code_point) b = int2byte(b1) if b2 == -1 else int2byte(b1) + int2byte(b2) if len(b) == 1: if done: raise ValueError('base65536 sequence ' 'continued after final byte') done = True stream.write(b) return stream.getvalue()
def encode(self, b64=False): """Encode the payload for transmission.""" encoded_payload = b'' for pkt in self.packets: encoded_packet = pkt.encode(b64=b64) packet_len = len(encoded_packet) if b64: encoded_payload += str(packet_len).encode('utf-8') + b':' + \ encoded_packet else: binary_len = b'' while packet_len != 0: binary_len = six.int2byte(packet_len % 10) + binary_len packet_len = int(packet_len / 10) encoded_payload += b'\0' + binary_len + b'\xff' + \ encoded_packet return encoded_payload