我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用serial.SerialTimeoutException()。
def close_can_channel(self): """Close CAN channel.""" try: self.stop_rx_thread() self.serial_port.write("C\r".encode('ascii')) except serial.SerialTimeoutException as e: raise USBtinException(e) self.firmware_version = 0 self.hardware_version = 0
def write(self, data): if self._debug_awol: return len(data) if self._debug_drop_connection: self._logger.info("Debug drop of connection requested, raising SerialTimeoutException") raise SerialTimeoutException() with self._incoming_lock: if self.incoming is None or self.outgoing is None: return 0 if "M112" in data and self._supportM112: self._seriallog.info("<<< {}".format(data.strip())) self._kill() return len(data) try: written = self.incoming.put(data, timeout=self._write_timeout, partial=True) self._seriallog.info("<<< {}".format(data.strip())) return written except queue.Full: self._logger.info("Incoming queue is full, raising SerialTimeoutException") raise SerialTimeoutException()
def send_byte(self, b): try: if type(b) is str and len(b) == 1: log.debug("sending byte: %s (%s)" % (b, ord(b))) self._ser.write(b) elif type(b) is int: b = chr(b) if type(b) is str and len(b) == 1: log.debug("sending byte: %s (%s)" % (b, ord(b))) self._ser.write(b) else: raise TypeError else: raise TypeError except TypeError: raise UnsupportedPayloadType("byte type should be str length 1 or int but %s found" % type(b)) except SerialTimeoutException: log.exception("write timeout") return 0
def send_first_tx_fifo_message(self): """ Send first message in tx fifo """ if len(self.tx_fifo) == 0: return canmsg = self.tx_fifo[0] try: self.serial_port.write('{}\r'.format(canmsg.to_string()).encode('ascii')) except serial.SerialTimeoutException as e: raise USBtinException(e)
def write_mcp_register(self, register, value): """Write given register of MCP2515 Keyword arguments: register -- Register address value -- Value to write """ try: cmd = "W{:02x}{:02x}".format(register, value) self.transmit(cmd) except serial.SerialTimeoutException as e: raise USBtinException(e)
def write(self, frame): if self.tty is not None: log.log(logging.DEBUG-1, ">>> %s", hexlify(frame)) self.tty.flushInput() try: self.tty.write(str(frame)) except serial.SerialTimeoutException: raise IOError(errno.EIO, os.strerror(errno.EIO))
def readline(self): if self._debug_awol: time.sleep(self._read_timeout) return "" if self._debug_drop_connection: raise SerialTimeoutException() if self._debug_sleep > 0: # if we are supposed to sleep, we sleep not longer than the read timeout # (and then on the next call sleep again if there's time to sleep left) sleep_for = min(self._debug_sleep, self._read_timeout) self._debug_sleep -= sleep_for time.sleep(sleep_for) if self._debug_sleep > 0: # we slept the full read timeout, return an empty line return "" # otherwise our left over timeout is the read timeout minus what we already # slept for timeout = self._read_timeout - sleep_for else: # use the full read timeout as timeout timeout = self._read_timeout try: # fetch a line from the queue, wait no longer than timeout line = self.outgoing.get(timeout=timeout) self._seriallog.info(">>> {}".format(line.strip())) self.outgoing.task_done() return line except queue.Empty: # queue empty? return empty line return ""
def init(self): """ ????? ????????????? ?????????? ????? ????????? ???????. """ try: self.serial.write(ENQ) byte = self.serial.read() if not byte: raise excepts.NoConnectionError() if byte == NAK: pass elif byte == ACK: self.handle_response() else: while self.serial.read(): pass return False return True except serial.SerialTimeoutException: self.serial.flushOutput() raise excepts.ProtocolError(u'?? ??????? ???????? ???? ? ???') except serial.SerialException as exc: self.serial.flushInput() raise excepts.ProtocolError(unicode(exc))
def test_WriteTimeout(self): """Test write() timeout.""" # use xonxoff setting and the loop-back adapter to switch traffic on hold self.s.port = PORT self.s.write_timeout = 1.0 self.s.xonxoff = True self.s.open() self.s.write(serial.XOFF) time.sleep(0.5) # some systems need a little delay so that they can react on XOFF t1 = time.time() self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200) t2 = time.time() self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
def test_WriteTimeout(self): """Test write() timeout.""" # use xonxoff setting and the loop-back adapter to switch traffic on hold self.s.port = PORT self.s.writeTimeout = 1 self.s.xonxoff = 1 self.s.open() self.s.write(serial.XOFF) time.sleep(0.5) # some systems need a little delay so that they can react on XOFF t1 = time.time() self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200)) t2 = time.time() self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))
def _serial_write(self): ### sending super commands (handled in serial rx interrupt) if self.request_status == 1: self._send_char(CMD_STATUS) self.request_status = 0 elif self.request_status == 2: self._send_char(CMD_SUPERSTATUS) self.request_status = 0 if self.request_stop: self._send_char(CMD_STOP) self.request_stop = False if self.request_resume: self._send_char(CMD_RESUME) self.firmbuf_used = 0 # a resume resets the hardware's rx buffer self.request_resume = False self.reset_status() self.request_status = 2 # super request ### send buffer chunk if self.tx_buffer and len(self.tx_buffer) > self.tx_pos: if not self._paused: if (self.FIRMBUF_SIZE - self.firmbuf_used) > self.TX_CHUNK_SIZE: try: # to_send = ''.join(islice(self.tx_buffer, 0, self.TX_CHUNK_SIZE)) to_send = self.tx_buffer[self.tx_pos:self.tx_pos+self.TX_CHUNK_SIZE] expectedSent = len(to_send) # by protocol duplicate every char to_send_double = [] for c in to_send: to_send_double.append(c) to_send_double.append(c) to_send = ''.join(to_send_double) # t_prewrite = time.time() actuallySent = self.device.write(to_send) if actuallySent != expectedSent*2: print "ERROR: write did not complete" assumedSent = 0 else: assumedSent = expectedSent self.firmbuf_used += assumedSent if self.firmbuf_used > self.FIRMBUF_SIZE: print "ERROR: firmware buffer tracking too high" if time.time() - t_prewrite > 0.1: print "WARN: write delay 1" except serial.SerialTimeoutException: assumedSent = 0 print "ERROR: writeTimeoutError 2" self.tx_pos += assumedSent else: if self.tx_buffer: # job finished sending self.job_size = 0 self.tx_buffer = [] self.tx_pos = 0
def _send_char(self, char): try: t_prewrite = time.time() # self.device.write(char) # print "send_char: [%s,%s]" % (str(ord(char)),str(ord(char))) self.device.write(char+char) # by protocol send twice if time.time() - t_prewrite > 0.1: pass # print "WARN: write delay 2" except serial.SerialTimeoutException: print "ERROR: writeTimeoutError 1" ########################################################################### ### API ################################################################### ########################################################################### # def find_controller(baudrate=conf['baudrate'], verbose=True): # if os.name == 'posix': # iterator = sorted(serial.tools.list_ports.grep('tty')) # for port, desc, hwid in iterator: # # print "Looking for controller on port: " + port # try: # s = serial.Serial(port=port, baudrate=baudrate, timeout=2.0) # lasaur_hello = s.read(8) # if lasaur_hello.find(INFO_HELLO) > -1: # return port # s.close() # except serial.SerialException: # pass # else: # # windows hack because pyserial does not enumerate USB-style com ports # if verbose: # print "Trying to find controller ..." # for i in range(24): # try: # s = serial.Serial(port=i, baudrate=baudrate, timeout=2.0) # lasaur_hello = s.read(8) # if lasaur_hello.find(INFO_HELLO) > -1: # return s.portstr # s.close() # except serial.SerialException: # pass # if verbose: # print "ERROR: No controller found." # return None
def _do_send_without_checksum(self, cmd): if self._serial is None: return self._log("Send: " + str(cmd)) cmd += "\n" written = 0 passes = 0 while written < len(cmd): to_send = cmd[written:] old_written = written try: result = self._serial.write(to_send) if result is None or not isinstance(result, int): # probably some plugin not returning the written bytes, assuming all of them written += len(cmd) else: written += result except serial.SerialTimeoutException: self._log("Serial timeout while writing to serial port, trying again.") try: result = self._serial.write(to_send) if result is None or not isinstance(result, int): # probably some plugin not returning the written bytes, assuming all of them written += len(cmd) else: written += result except: if not self._connection_closing: self._logger.exception("Unexpected error while writing to serial port") self._log("Unexpected error while writing to serial port: %s" % (get_exception_string())) self._errorValue = get_exception_string() self.close(is_error=True) break except: if not self._connection_closing: self._logger.exception("Unexpected error while writing to serial port") self._log("Unexpected error while writing to serial port: %s" % (get_exception_string())) self._errorValue = get_exception_string() self.close(is_error=True) break if old_written == written: # nothing written this pass passes += 1 if passes > self._max_write_passes: # nothing written in max consecutive passes, we give up message = "Could not write anything to the serial port in {} tries, something appears to be wrong with the printer communication".format(self._max_write_passes) self._logger.error(message) self._log(message) self._errorValue = "Could not write to serial port" self.close(is_error=True) break ##~~ command handlers
def command_nopass(self, cmd, params=bytearray()): """ ????? ???????? ??????? ??? ?????? ?????????. :type cmd: int :param cmd: ????? ??????? :type params: bytearray :param params: ????? ?????????? ??????? :rtype: dict :return: ????? ?????????? ?????? ? ???? ??????? """ if not isinstance(params, bytearray): raise TypeError(u'{} expected, got {} instead'.format(bytearray, type(params))) cmd_len = len(misc.int_to_bytes(cmd)) buff = misc.bytearray_concat( misc.CAST_SIZE['1'](cmd_len + len(params)), misc.CAST_CMD[cmd_len](cmd), params ) command = misc.bytearray_concat(STX, buff, misc.CAST_SIZE['1'](misc.lrc(buff))) for r in self.check(self.CHECK_NUM): if not r: continue for _ in xrange(self.MAX_ATTEMPTS): try: self.serial.write(command) byte = self.serial.read() if byte == ACK: return self.handle_response() except serial.SerialTimeoutException: self.serial.flushOutput() raise excepts.ProtocolError(u'?? ??????? ???????? ???? ? ???') except serial.SerialException as exc: self.serial.flushInput() raise excepts.ProtocolError(unicode(exc)) else: raise excepts.NoConnectionError() else: raise excepts.NoConnectionError()
def do_pub(self, arg): """ Publishes a (value | string) on <topic>. Usage: pub (--u8 | --u16 | --u32 | --i8 | --i16 | --i32 | --f32 | --s) <topic> <value> """ if arg['--f32']: arg['<value>'] = float(arg['<value>']) elif not arg['--s']: try: arg['<value>'] = int(arg['<value>']) except: # User most likely entered a float with an integer flag inter = float(arg['<value>']) rounded = int(inter) if isclose(inter,rounded): arg['<value>'] = rounded else: s = "Aborted : Wrote decimal number ({0}) with integer flag.".format(arg['<value>']) self.stdout.write(s + "\n") logger.warning(s) return subset = {k: arg[k] for k in ("--u8","--u16","--u32","--i8","--i16","--i32","--f32","--s")} valtype = None for i, k in subset.items(): if k: valtype = self.types_lookup[i] if not valtype: logger.error( "Payload type [{0}] unkown." .format(arg)) return try: self.telemetry.publish(arg['<topic>'],arg['<value>'],valtype) except SerialTimeoutException as e: self.stdout.write("Pub failed. Connection most likely terminated.") logger.error("Pub failed. Connection most likely terminated. exception : %s" % e) return except AttributeError as e: self.stdout.write("Pub failed because you are not connected to any device. Connect first using `serial` command.") logger.warning("Trying to publish while not connected. exception : %s" % e) return s = "Published on topic '{0}' : {1} [{2}]".format(arg['<topic>'], arg['<value>'],valtype) self.stdout.write(s + "\n") logger.info(s)