我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.to_bytes()。
def encode(self, input, final=False): state = self.state encoded = [] for c in input.upper(): if c in HEXDIGITS: z = HEXDIGITS.index(c) if state: encoded.append(z + (state & 0xf0)) state = 0 else: state = 0x100 + (z << 4) elif c == ' ': # allow spaces to separate values if state and self.errors == 'strict': raise UnicodeError('odd number of hex digits') state = 0 else: if self.errors == 'strict': raise UnicodeError('non-hex digit found: %r' % c) self.state = state return serial.to_bytes(encoded)
def encode(self, data, final=False): """\ Incremental encode, keep track of digits and emit a byte when a pair of hex digits is found. The space is optional unless the error handling is defined to be 'strict'. """ state = self.state encoded = [] for c in data.upper(): if c in HEXDIGITS: z = HEXDIGITS.index(c) if state: encoded.append(z + (state & 0xf0)) state = 0 else: state = 0x100 + (z << 4) elif c == ' ': # allow spaces to separate values if state and self.errors == 'strict': raise UnicodeError('odd number of hex digits') state = 0 else: if self.errors == 'strict': raise UnicodeError('non-hex digit found: {!r}'.format(c)) self.state = state return serial.to_bytes(encoded)
def handle_socket_read(self): """Read from socket""" try: # read a chunk from the serial port data = self.socket.recv(1024) if data: # Process RFC 2217 stuff when enabled if self.rfc2217: data = serial.to_bytes(self.rfc2217.filter(data)) # add data to buffer self.buffer_net2ser += data else: # empty read indicates disconnection self.handle_disconnect() except socket.error: self.handle_socket_error()
def reader(self): """loop forever and copy serial->socket""" self.log.debug('reader thread started') while self.alive: try: data = self.serial.read(1) # read one, blocking n = self.serial.inWaiting() # look if there is more if n: data = data + self.serial.read(n) # and get as much as possible if data: # escape outgoing data when needed (Telnet IAC (0xff) character) data = serial.to_bytes(self.rfc2217.escape(data)) self._write_lock.acquire() try: self.socket.sendall(data) # send it over TCP finally: self._write_lock.release() except socket.error, msg: self.log.error('%s' % (msg,)) # probably got disconnected break self.alive = False self.log.debug('reader thread terminated')
def hex_encode(input, errors='strict'): return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
def encode(self, input, errors='strict'): return serial.to_bytes([int(h, 16) for h in input.split()])
def hex_encode(data, errors='strict'): """'40 41 42' -> b'@ab'""" return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'): """'40 41 42' -> b'@ab'""" return serial.to_bytes([int(h, 16) for h in data.split()])
def reader(self): """loop forever and copy serial->socket""" self.log.debug('reader thread started') while self.alive: try: data = self.serial.read(self.serial.in_waiting or 1) if data: # escape outgoing data when needed (Telnet IAC (0xff) character) self.write(serial.to_bytes(self.rfc2217.escape(data))) except socket.error as msg: self.log.error('{}'.format(msg)) # probably got disconnected break self.alive = False self.log.debug('reader thread terminated')
def writer(self): """loop forever and copy socket->serial""" while self.alive: try: data = self.socket.recv(1024) if not data: break self.serial.write(serial.to_bytes(self.rfc2217.filter(data))) except socket.error as msg: self.log.error('{}'.format(msg)) # probably got disconnected break self.stop()
def handle_serial_read(self): """Reading from serial port""" try: data = os.read(self.serial.fileno(), 1024) if data: # store data in buffer if there is a client connected if self.socket is not None: # escape outgoing data when needed (Telnet IAC (0xff) character) if self.rfc2217: data = serial.to_bytes(self.rfc2217.escape(data)) self.buffer_ser2net += data else: self.handle_serial_error() except Exception as msg: self.handle_serial_error(msg)
def test_to_bytes(self): self.assertEqual(serial.to_bytes([1, 2, 3]), b'\x01\x02\x03') self.assertEqual(serial.to_bytes(b'\x01\x02\x03'), b'\x01\x02\x03') self.assertEqual(serial.to_bytes(bytearray([1,2,3])), b'\x01\x02\x03') # unicode is not supported test. use decode() instead of u'' syntax to be # compatible to Python 3.x < 3.4 self.assertRaises(TypeError, serial.to_bytes, b'hello'.decode('utf-8'))
def test_readlines(self): """Test readlines method""" self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) self.assertEqual( self.s.readlines(), [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] )
def test_xreadlines(self): """Test xreadlines method (skipped for io based systems)""" if hasattr(self.s, 'xreadlines'): self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) self.assertEqual( list(self.s.xreadlines()), [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] )
def test_for_in(self): """Test for line in s""" self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) lines = [] for line in self.s: lines.append(line) self.assertEqual( lines, [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] )
def test_alternate_eol(self): """Test readline with alternative eol settings (skipped for io based systems)""" if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class self.s.write(serial.to_bytes("no\rno\nyes\r\n")) self.assertEqual( self.s.readline(eol=serial.to_bytes("\r\n")), serial.to_bytes("no\rno\nyes\r\n"))
def writer(self): """loop forever and copy socket->serial""" while self.alive: try: data = self.socket.recv(1024) if not data: break self.serial.write(serial.to_bytes(self.rfc2217.filter(data))) except socket.error, msg: self.log.error('%s' % (msg,)) # probably got disconnected break self.stop()
def handle_serial_read(self): """Reading from serial port""" try: data = os.read(self.serial.fileno(), 1024) if data: # store data in buffer if there is a client connected if self.socket is not None: # escape outgoing data when needed (Telnet IAC (0xff) character) if self.rfc2217: data = serial.to_bytes(self.rfc2217.escape(data)) self.buffer_ser2net += data else: self.handle_serial_error() except Exception, msg: self.handle_serial_error(msg)
def test_readline(self): """Test readline method""" self.s.write(serial.to_bytes("1\n2\n3\n")) self.failUnlessEqual(self.s.readline(), serial.to_bytes("1\n")) self.failUnlessEqual(self.s.readline(), serial.to_bytes("2\n")) self.failUnlessEqual(self.s.readline(), serial.to_bytes("3\n")) # this time we will get a timeout self.failUnlessEqual(self.s.readline(), serial.to_bytes(""))
def test_xreadlines(self): """Test xreadlines method (skipped for io based systems)""" if hasattr(self.s, 'xreadlines'): self.s.write(serial.to_bytes("1\n2\n3\n")) self.failUnlessEqual( list(self.s.xreadlines()), [serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")] )
def test_for_in(self): """Test for line in s""" self.s.write(serial.to_bytes("1\n2\n3\n")) lines = [] for line in self.s: lines.append(line) self.failUnlessEqual( lines, [serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")] )
def test_alternate_eol(self): """Test readline with alternative eol settings (skipped for io based systems)""" if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class self.s.write(serial.to_bytes("no\rno\nyes\r\n")) self.failUnlessEqual( self.s.readline(eol=serial.to_bytes("\r\n")), serial.to_bytes("no\rno\nyes\r\n"))
def yatta_txData(dataBuff): global yatta_mode , uart if(yatta_mode == 0): uart.write(serial.to_bytes(dataBuff))
def initialize_grid(ser, lock): """Initialize the Grid by sending "0xC0", expected response is "0x21" Returns: - True for successful initialization - False otherwise """ try: with lock: # Flush input and output buffers ser.reset_input_buffer() ser.reset_output_buffer() # Write data to serial port to initialize the Grid bytes_written = ser.write(serial.to_bytes([0xC0])) # Wait before checking response time.sleep(WAIT_GRID) # Read response, one byte = 0x21 is expected for a successful initialization response = ser.read(size=1) # Check if the Grid responded with any data if response: # Check for correct response (should be 0x21) if response[0] == int("0x21", 16): print("Grid initialized") return True # Incorrect response received from the grid else: helper.show_error("Problem initializing the Grid unit.\n\n" "Response 0x21 expected, got " + hex(ord(response)) + ".\n\n" "Please check serial port " + ser.port +".\n") return False # In case no response (0 bytes) from the Grid else: helper.show_error("Problem initializing the Grid unit.\n\n" "Response 0x21 expected, no response received.\n\n" "Please check serial port " + ser.port +".\n") return False except Exception as e: helper.show_error("Problem initializing the Grid unit.\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") sys.exit(0)
def set_fan(ser, fan, voltage, lock): """Sets voltage of a specific fan. Note: The Grid only supports voltages between 4.0V and 12.0V in 0.5V steps (e.g. 4.0, 7.5. 12.0) Configuring "0V" stops a fan. """ # Valid voltages and corresponding data (two bytes) speed_data = {0: [0x00, 0x00], # 0% (Values below 4V is not supported by the Grid, fans will be stopped) 4.0: [0x04, 0x00], # 33.3% (4.0V) 4.5: [0x04, 0x50], # 37.5% (4.5V) 5.0: [0x05, 0x00], # 41.7% (5V) 5.5: [0x05, 0x50], # 45.8% (5.5V) 6.0: [0x06, 0x00], # 50.0% (6V) 6.5: [0x06, 0x50], # 54.2% (6.5V) 7.0: [0x07, 0x00], # 58.3% (7V) 7.5: [0x07, 0x50], # 62.5% (7.5V) 8.0: [0x08, 0x00], # 66.7% (8V) 8.5: [0x08, 0x50], # 70.1% (8.5V) 9.0: [0x09, 0x00], # 75.0% (9.0V) 9.5: [0x09, 0x50], # 79.2% (9.5V) 10.0: [0x0A, 0x00], # 83.3% (10.0V) 10.5: [0x0A, 0x50], # 87.5% (10.5V) 11.0: [0x0B, 0x00], # 91.7% (11.0V) 11.5: [0x0B, 0x50], # 95.8% (11.5V) 12.0: [0x0C, 0x00]} # 100.0% (12.0V) fan_data = {1: 0x01, # Fan 1 2: 0x02, # Fan 2 3: 0x03, # Fan 3 4: 0x04, # Fan 4 5: 0x05, # Fan 5 6: 0x06} # Fan 6 # Define bytes to be sent to the Grid for configuring a specific fan's voltage # Format is seven bytes: # 44 <fan id> C0 00 00 <voltage integer> <voltage decimal> # # Example configuring "7.5V" for fan "1": # 44 01 C0 00 00 07 50 serial_data = [0x44, fan_data[fan], 0xC0, 0x00, 0x00, speed_data[voltage][0], speed_data[voltage][1]] try: with lock: bytes_written = ser.write(serial.to_bytes(serial_data)) time.sleep(WAIT_GRID) # TODO: Check reponse # Expected response is one byte response = ser.read(size=1) print("Fan " + str(fan) + " updated") except Exception as e: helper.show_error("Could not set speed for fan " + str(fan) + ".\n\n" "Please check settings for serial port " + str(ser.port) + ".\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") sys.exit(0)
def read_fan_rpm(ser, lock): """Reads the current rpm of each fan. Returns: - If success: A list with rpm data for each fan - If failure to read data: An empty list """ # List to hold fan rpm data to be returned fans = [] with lock: for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]: try: # Define bytes to be sent to the Grid for reading rpm for a specific fan # Format is two bytes: # 8A <fan id> serial_data = [0x8A, fan] ser.reset_output_buffer() # TODO: Check bytes written bytes_written = ser.write(serial.to_bytes(serial_data)) # Wait before checking response time.sleep(WAIT_GRID) # Expected response is 5 bytes # Example response: C0 00 00 03 00 = 0x0300 = 768 rpm (two bytes unsigned) response = ser.read(size=5) # Check if the Grid responded with any data if response: # Check for correct response, first three bytes should be C0 00 00 if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16): # Convert rpm from 2-bytes unsigned value to decimal rpm = response[3] * 256 + response[4] fans.append(rpm) # An incorrect response was received, return an empty list else: return [] # In case no response (0 bytes) received, return an empty list else: return [] except Exception as e: helper.show_error("Could not read rpm for fan " + str(fan) + ".\n\n" "Please check serial port settings.\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") print(str(e)) sys.exit(0) # Fan return fans
def read_fan_voltage(ser, lock): """Reads the current voltage of each fan. Returns: - If success: a list with voltage data for each fan - If failure to read data: An empty list """ # List to hold fan voltage data to be returned fans = [] with lock: for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]: try: # Define bytes to be sent to the Grid for reading voltage for a specific fan # Format is two bytes, e.g. [0x84, <fan id>] serial_data = [0x84, fan] ser.reset_output_buffer() bytes_written = ser.write(serial.to_bytes(serial_data)) # Wait before checking response time.sleep(WAIT_GRID) # Expected response is 5 bytes # Example response: 00 00 00 0B 01 = 0x0B 0x01 = 11.01 volt response = ser.read(size=5) # Check if the Grid responded with any data if response: # Check for correct response (first three bytes should be 0x00) if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16): # Convert last two bytes to a decimal float value voltage = float(str(response[3]) + "." + str(response[4])) # Add the voltage for the current fan to the list fans.append(voltage) # An incorrect response was received else: print("Error reading fan voltage, incorrect response") return [] # In case no response (0 bytes) is returned from the Grid else: print("Error reading fan voltage, no data returned") return [] except Exception as e: helper.show_error("Could not read fan voltage.\n\n" "Please check serial port " + ser.port + ".\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") print(str(e)) sys.exit(0) return fans
def initialize_grid(ser, lock): """Initialize the Grid by sending "0xC0", expected response is "0x21" Returns: - True for successful initialization - False otherwise """ try: with lock: # Flush input and output buffers ser.reset_input_buffer() ser.reset_output_buffer() # Write data to serial port to initialize the Grid bytes_written = ser.write(serial.to_bytes([0xC0])) # Wait before checking response time.sleep(WAIT_GRID) # Read response, one byte = 0x21 is expected for a successful initialization response = ser.read(size=1) # Check if the Grid responded with any data if response: # Check for correct response (should be 0x21) if response[0] == int("0x21", 16): print("Grid initialized") return True # Incorrect response received from the grid else: helper.show_error("Problem initializing the Grid unit.\n\n" "Response 0x21 expected, got " + hex(ord(response)) + ".\n\n" "Please check serial port " + ser.port +".\n") return False # In case no response (0 bytes) from the Grid else: helper.show_error("Problem initializing the Grid unit.\n\n" "Response 0x21 expected, no response received.\n\n" "Please check serial port " + ser.port +".\n") return False except Exception as e: pass