我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用serial.write()。
def __checkType__(self, serial): byte = serial.read().decode("utf-8") print(byte) if byte == "1": print("??? ???? ?? : " + serial.port) serial.write(b'0') self.threads["Sensor"] = SensorThread(serial) return self.threads["Sensor"] elif byte == "2" : print("??? ???? ?? : " + serial.port) serial.write(b'0') self.threads["Control"] = ControlThread(serial) return self.threads["Control"] else : print("? ? ?? ???? : " + serial.port) serial.write(b'1') return None #?? ???
def write(self, value): print(value) if(value == 0): serial.write('0') elif(value == 1): serial.write('1') elif(value == 2): serial.write('2') elif(value == 3): serial.write('3') elif(value == 4): serial.write('4') # Disconnects the Arduino
def showReady(self): serial.write('6')
def write(self, data): """\ Output the given byte string over the serial port. Can block if the connection is blocked. May raise SerialException if the connection is closed. """ if not self.is_open: raise portNotOpenError with self._write_lock: try: self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED)) except socket.error as e: raise SerialException("connection failed (socket error): %s" % (e,)) return len(data)
def _internal_raw_write(self, data): """internal socket write with no data escaping. used to send telnet stuff.""" with self._write_lock: self._socket.sendall(data)
def telnetSendOption(self, action, option): """Send DO, DONT, WILL, WONT.""" self.connection.write(to_bytes([IAC, action, option]))
def rfc2217SendSubnegotiation(self, option, value=b''): """Subnegotiation of RFC 2217 parameters.""" value = value.replace(IAC, IAC_DOUBLED) self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE])) # - check modem lines, needs to be called periodically from user to # establish polling
def write(self, data): """\ Output the given byte string over the serial port. Can block if the connection is blocked. May raise SerialException if the connection is closed. """ if not self.is_open: raise portNotOpenError with self._write_lock: try: self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED)) except socket.error as e: raise SerialException("connection failed (socket error): {}".format(e)) return len(data)
def telnet_send_option(self, action, option): """Send DO, DONT, WILL, WONT.""" self.connection.write(to_bytes([IAC, action, option]))
def rfc2217_send_subnegotiation(self, option, value=b''): """Subnegotiation of RFC 2217 parameters.""" value = value.replace(IAC, IAC_DOUBLED) self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE])) # - check modem lines, needs to be called periodically from user to # establish polling
def telnet_send_option(self, action, option): """Send DO, DONT, WILL, WONT.""" self.connection.write(IAC + action + option)
def rfc2217_send_subnegotiation(self, option, value=b''): """Subnegotiation of RFC 2217 parameters.""" value = value.replace(IAC, IAC_DOUBLED) self.connection.write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE) # - check modem lines, needs to be called periodically from user to # establish polling
def filter(self, data): """\ Handle a bunch of incoming bytes. This is a generator. It will yield all characters not of interest for Telnet/RFC 2217. The idea is that the reader thread pushes data from the socket through this filter: for byte in filter(socket.recv(1024)): # do things like CR/LF conversion/whatever # and write data to the serial port serial.write(byte) (socket error handling code left as exercise for the reader) """ for byte in iterbytes(data): if self.mode == M_NORMAL: # interpret as command or as data if byte == IAC: self.mode = M_IAC_SEEN else: # store data in sub option buffer or pass it to our # consumer depending on state if self.suboption is not None: self.suboption += byte else: yield byte elif self.mode == M_IAC_SEEN: if byte == IAC: # interpret as command doubled -> insert character # itself if self.suboption is not None: self.suboption += byte else: yield byte self.mode = M_NORMAL elif byte == SB: # sub option start self.suboption = bytearray() self.mode = M_NORMAL elif byte == SE: # sub option end -> process it now self._telnetProcessSubnegotiation(bytes(self.suboption)) self.suboption = None self.mode = M_NORMAL elif byte in (DO, DONT, WILL, WONT): # negotiation self.telnet_command = byte self.mode = M_NEGOTIATE else: # other telnet commands self._telnetProcessCommand(byte) self.mode = M_NORMAL elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following self._telnetNegotiateOption(self.telnet_command, byte) self.mode = M_NORMAL # - incoming telnet commands and options
def filter(self, data): """\ Handle a bunch of incoming bytes. This is a generator. It will yield all characters not of interest for Telnet/RFC 2217. The idea is that the reader thread pushes data from the socket through this filter: for byte in filter(socket.recv(1024)): # do things like CR/LF conversion/whatever # and write data to the serial port serial.write(byte) (socket error handling code left as exercise for the reader) """ for byte in iterbytes(data): if self.mode == M_NORMAL: # interpret as command or as data if byte == IAC: self.mode = M_IAC_SEEN else: # store data in sub option buffer or pass it to our # consumer depending on state if self.suboption is not None: self.suboption += byte else: yield byte elif self.mode == M_IAC_SEEN: if byte == IAC: # interpret as command doubled -> insert character # itself if self.suboption is not None: self.suboption += byte else: yield byte self.mode = M_NORMAL elif byte == SB: # sub option start self.suboption = bytearray() self.mode = M_NORMAL elif byte == SE: # sub option end -> process it now self._telnet_process_subnegotiation(bytes(self.suboption)) self.suboption = None self.mode = M_NORMAL elif byte in (DO, DONT, WILL, WONT): # negotiation self.telnet_command = byte self.mode = M_NEGOTIATE else: # other telnet commands self._telnet_process_command(byte) self.mode = M_NORMAL elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following self._telnet_negotiate_option(self.telnet_command, byte) self.mode = M_NORMAL # - incoming telnet commands and options
def run(self): if self.log.fan_out == True : jsonString = json.dumps({ "Comm" : "onFan" }) self.serial.write(bytes(jsonString, encoding="utf-8")) else : jsonString = json.dumps({ "Comm" : "offFan" }) self.serial.write(bytes(jsonString, encoding="utf-8")) self.serial.write(b'\n') byte = self.serial.read().decode("utf-8") print(byte) if byte == "0" : print("?? ?? ??") else : print("?? ?? ??") if self.log.humidifier == True : jsonString = json.dumps({ "Comm" : "onHumidifier" }) self.serial.write(bytes(jsonString, encoding="utf-8")) else : jsonString = json.dumps({ "Comm" : "offHumidifier" }) self.serial.write(bytes(jsonString, encoding="utf-8")) self.serial.write(b'\n') byte = self.serial.read().decode("utf-8") print(byte) if byte == "0" : print("?? ?? ??") else : print("?? ?? ??") if self.log.peltier == True : jsonString = json.dumps({ "Comm" : "onPeltier" }) self.serial.write(bytes(jsonString, encoding="utf-8")) else : jsonString = json.dumps({ "Comm" : "offPeltier" }) self.serial.write(bytes(jsonString, encoding="utf-8")) self.serial.write(b'\n') byte = self.serial.read().decode("utf-8") print(byte) if byte == "0" : print("?? ?? ??") else : print("?? ?? ??")