我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.SerialException()。
def find_port(self): # Finds the serial port names if sys.platform.startswith('win'): ports = ['COM%s' % (i+1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): ports = glob.glob('/dev/ttyUSB*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.usbserial*') else: raise EnvironmentError('Error finding ports on your operating system') openbci_port = '' for port in ports: try: s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout) s.write(b'v') openbci_serial = self.openbci_id(s) s.close() if openbci_serial: openbci_port = port; except (OSError, serial.SerialException): pass if openbci_port == '': raise OSError('Cannot find OpenBCI port') else: return openbci_port
def reader(self): """loop and copy serial->console""" try: while self.alive and self._reader_alive: # read all that is there or wait for one byte data = self.serial.read(self.serial.in_waiting or 1) if data: if self.raw: self.console.write_bytes(data) else: text = self.rx_decoder.decode(data) for transformation in self.rx_transformations: text = transformation.rx(text) self.console.write(text) except serial.SerialException: self.alive = False # XXX would be nice if the writer could be interrupted at this # point... to exit completely raise
def serial_class_for_url(url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'alt': raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,)) class_name = 'Serial' try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'class': class_name = values[0] else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e) return (''.join([parts.netloc, parts.path]), getattr(serial, class_name)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'spy': raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,)) # process options now, directly altering self formatter = FormatHexdump color = False output = sys.stderr try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'file': output = open(values[0], 'w') elif option == 'color': color = True elif option == 'raw': formatter = FormatRaw elif option == 'all': self.show_all = True else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e) self.formatter = formatter(output, color) return ''.join([parts.netloc, parts.path])
def setDeviceID(dev, id): if id < 0 or id > 255: raise ValueError("ID must be an unsigned byte!") try: com = serial.Serial(dev, timeout=5) packet = DriverSerial._generateHeader(CMDTYPE.SETID, 1) packet.append(id) com.write(packet) resp = com.read(1) if len(resp) == 0: DriverSerial._comError() else: if ord(resp) != RETURN_CODES.SUCCESS: DriverSerial._printError(ord(resp)) except serial.SerialException: log.error("Problem connecting to serial device.") raise IOError("Problem connecting to serial device.")
def dump_port_settings(self): """Write current settings to sys.stderr""" sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( p=self.serial)) sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format( ('active' if self.serial.rts else 'inactive'), ('active' if self.serial.dtr else 'inactive'), ('active' if self.serial.break_condition else 'inactive'))) try: sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format( ('active' if self.serial.cts else 'inactive'), ('active' if self.serial.dsr else 'inactive'), ('active' if self.serial.ri else 'inactive'), ('active' if self.serial.cd else 'inactive'))) except serial.SerialException: # on RFC 2217 ports, it can happen if no modem state notification was # yet received. ignore this error. pass sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self): """loop and copy serial->console""" try: while self.alive and self._reader_alive: # read all that is there or wait for one byte data = self.serial.read(self.serial.in_waiting or 1) if data: if self.raw: self.console.write_bytes(data) else: text = self.rx_decoder.decode(data) for transformation in self.rx_transformations: text = transformation.rx(text) self.console.write(text) except serial.SerialException: self.alive = False self.console.cancel() raise # XXX handle instead of re-raise?
def serial_ports(): if sys.platform.startswith('win'): # windows COM port enumerations ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # linux and cygwin support # this excludes your current terminal "/dev/tty" ports = glob.glob('/dev/tty[A-Za-z]*') elif sys.platform.startswith('darwin'): # OSX support ports = glob.glob('/dev/tty.*') else: raise EnvironmentError('Unsupported platform') result = [] for port in ports: # for every possible port name try: s = serial.Serial(port) # open it and close it s.close() result.append(port) # if nothing went wrong, the port exists on the system except (OSError, serial.SerialException): # if something went wrong, the port does not exist pass return result # return the list of ports that currently actually exist on the system
def __init__(self): if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # this excludes your current terminal "/dev/tty" ports = glob.glob('/dev/tty[A-Za-z]*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.*') else: raise EnvironmentError('Unsupported platform') self.result = [] for port in ports: try: s = serial.Serial(port) s.close() self.result.append(port) except (OSError, serial.SerialException): pass
def connect(self, port_name='/dev/ttyACM0', baud_rate=9600): self.serial_connection = None try: self.serial_connection = serial.Serial( port=port_name, baudrate=baud_rate ) except serial.SerialException as e: print "Could not connect", e time.sleep(2) # no idea why but robot does not move if speed commands are sent # directly after opening serial while self.current_status is None: print "Waiting for status message" time.sleep(1) print "received first status message with position", self.current_status.position
def findPorts(): result=[] temp_list = glob.glob('/dev/ttyA[A-Za-z]*') for a_port in temp_list: try: s = serial.Serial(a_port) s.close() result.append(a_port) except serial.SerialException: pass temp_list = glob.glob('/dev/ttyUSB[A-Za-z]*') for a_port in temp_list: try: s = serial.Serial(a_port) s.close() result.append(a_port) except serial.SerialException: pass return result
def run(self): logger.debug("Serial listening thread started") incoming = '' while True: try: incoming = self.Listen2.readline().rstrip() # use [:-1]? except serial.SerialException as e: logger.debug("Serial exception occured: " + str(e)) if not self.Stop: logger.warning("serial exception ocurred: " + str(e)) break if self.Stop: break else: if is_hex_string(incoming): Send2ParentThread(self.Network, incoming).start() else: logger.debug("Serial port said: " + str(incoming)) logger.info("Serial listening thread shutting down")
def connect_serial_gps(): """ Check if the device is available. If its not, disconnect any currently running gpsd using killall. Check availability again. If still not available, raise an error. When device is available, connect to the L80GPS object. If device does not become available after killall on gpsd, another process such as kismet_server may be forcing it to remain connected. :return: L80GPS object """ device = get_device_gps() if not device_available(device): cmd = 'sudo killall gpsd' os.system(cmd) #rpi_utils.run_program(cmd) time.sleep(1) if not device_available(device): raise serial.SerialException('Device busy ' + device + '. Check your processes.') gps = l80gps.L80GPS() return gps
def _setup_connection(self, device: str, speed: int): ''' todo: doc ''' try: self._conn = serial.Serial(device, speed, timeout=2) except serial.SerialException as ex: if ex.errno == 2: raise device_error("there's no device '%s'" % device) raise self.disconnect() self._conn.flush() # todo: refactor _conn_settings = self._conn.getSettingsDict() for _ in range(2): _conn_settings['rtscts'] = not _conn_settings['rtscts'] self._conn.applySettingsDict(_conn_settings) # connection._assert_token(connection._read_byte(self._conn), _token.SYNC) self._handler_thread = threading.Thread(target=self._handler_thread_fn, daemon=True) self._handler_thread.start() LOG.info('connection initialized')
def find_port(self): try: temp_port_list = serial.tools.list_ports.comports() except OSError: raise OSError('Serial port not found! Try entering your port manually.') ports = [i[0] for i in temp_port_list][::-1] openbci_port = '' for port in ports: try: s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout) s.write(b'v') openbci_serial = self.openbci_id(s) s.close() if openbci_serial: openbci_port = port; except (OSError, serial.SerialException): pass if openbci_port == '': raise OSError('Cannot find OpenBCI port') else: return openbci_port
def _connectSerial(self): """connect via serial interface""" #DeviceID = {"Globalsat 580p" : "0483:5740", # "Timex Cycle Trainer" : "0484:5741" # } DeviceID = {"Globalsat 580p" : "0483:5740"} # search for the port first based on device ids if self.port is None: for key, value in DeviceID.iteritems(): ports = list(list_ports.grep(value)) if len(ports) > 0: self.port = ports[0][0] self.logger.debug("USB virtual serial port found on " + self.port) break # didnt find anything fall back to config.ini if self.port is None: self.port = self.config.get("serial", "comport") self.logger.debug("Virtual serial port not found. Reverting to config.ini: " + self.port) try: self.serial = serial.Serial(port=self.port, baudrate=self.config.get("serial", "baudrate"), timeout=self.config.getint("serial", "timeout"), xonxoff=0, rtscts=1) self.logger.debug("serial connection on " + self.serial.portstr) except serial.SerialException: self.logger.critical("error establishing serial connection") raise GB500SerialException
def connect(self, port = 'COM22', speed = 115200): if self.serial != None: self.close() try: self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000) except SerialException as e: raise ispBase.IspError("Failed to open serial port") except: raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0])) self.seq = 1 #Reset the controller self.serial.setDTR(1) time.sleep(0.1) self.serial.setDTR(0) time.sleep(0.2) self.sendMessage([1]) if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]: self.close() raise ispBase.IspError("Failed to enter programming mode")
def write(self, data): """Write some data to the transport. This method does not block; it buffers the data and arranges for it to be sent out asynchronously. Writes made after the transport has been closed will be ignored.""" if self._closing: return if self.get_write_buffer_size() == 0: # Attempt to send it right away first try: n = self._serial.write(data) except serial.SerialException as exc: self._fatal_error(exc, 'Fatal write error on serial transport') return if n == len(data): return # Whole request satisfied assert n > 0 data = data[n:] self._ensure_writer() self._write_buffer.append(data) self._maybe_pause_protocol()
def run(self): while not self.exit_event.is_set(): try: bytes_array = bytearray(self.serial_file.read(1)) except serial.SerialException: time.sleep(rate) continue if not bytes_array: time.sleep(rate) continue byte = bytes_array[0] with serial_lock: try: order = Order(byte) except ValueError: continue if order == Order.RECEIVED: n_received_semaphore.release() decodeOrder(self.serial_file, byte) time.sleep(rate) print("Listener Thread Exited")
def spOpen(serialName): # 115200/8/N/1 try: if serialName: serialFd = serial.Serial(serialName, timeout = 3) serialFd.baudrate = 115200 #serialFd.baudrate = 9600 serialFd.bytesize = 8 serialFd.parity = serial.PARITY_NONE; serialFd.stopbits = 1 serialFd.timeout = 3 serialFd.writeTimeout = 1 if serialFd and serialFd.readable(): return serialFd else: return None except serial.SerialException: return None
def connect(self): """ ????? ??????????? ? ??????????. """ if not self.connected: self.serial.port = self.port if not self.serial.isOpen(): try: self.serial.open() except serial.SerialException as exc: raise excepts.NoConnectionError( u'?? ??????? ??????? ???? {} ({})'.format( self.port, exc ) ) for r in self.check(self.CHECK_NUM, True): if r: self.connected = True return else: self.serial.close() raise excepts.NoConnectionError()
def listen(self): eol = b'\n' eol_len = len(eol) line = bytearray() while True: try: c = self.serial.read(1) if c: line += c if line[-eol_len:] == eol: print(line) sample = struct.unpack('<' + 'h'*(len(line[:-eol_len])//2), line[:-eol_len]) print(sample) self.emitter.emit(sample[-2:]) line = bytearray() except SerialException: return except AttributeError: return except struct.error: continue
def setPort(self, port:Port): if port.isOpen(): port.close() port.setBackground(QtGui.QColor(0, 0, 0, 0)) else: try: port.open(self.baudrate_combo.currentText(), self.parity_combo.currentText(), self.bytesize_combo.currentText()) port.setBackground(QtGui.QColor(0, 255, 0, 127)) except SerialException as e: print(e) port.setBackground(QtGui.QColor(255, 0, 0, 127)) self.check_availability_list.append(port) except ValueError as e: print(e) port.setBackground(QtGui.QColor(255, 0, 0, 127)) port.setSelected(False) print(self.getPortList())
def __init__(self, port=None): """Opens a connection to a Removinator controller on the specified serial port. If a port is not specified, an attempt will be made to auto-discover the Removinator controller. :param port: port that the Removinator is connected to :type port: str :raises: :exc:`ConnectError` :ivar last_result: The result output from the last executed command :ivar last_response: The response output from the last executed command :ivar port: The port that the Removinator is connected to """ self.port = port self.last_result = '' self.last_response = '' # Attempt to discover the Removinator controller if a # port was not specified. if port is None: port = _discover_removinator() # Open a connection to the Removinator controller. try: self.connection = serial.Serial( port, 9600, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE, 1) self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection, self.connection)) except serial.SerialException as e: raise ConnectError('Unable to open connection to Removinator ' 'controller on port {0}'.format(port))
def __init__(self, port, baud, delay): self.port = port self.baud = baud self.serial = None self.delay = delay try: self.serial = serial.Serial(port, baud) except serial.SerialException as e: raise TransportError(e.strerror) self.serial.timeout = 3 self.serial.interCharTimeout = 3
def __init__(self, port, baud_rate, terminal=None, reset=False): Connection.__init__(self, terminal) self._port = port self._baud_rate = baud_rate try: self._serial = serial.Serial(None, self._baud_rate, timeout=0, write_timeout=0.2) self._serial.dtr = False self._serial.rts = False self._serial.port = port self._serial.open() if reset: self._serial.rts = True time.sleep(0.1) self._serial.rts = False x = "" while not x.endswith(">>>"): x += self._serial.read().decode('utf-8', errors="ignore") self.send_kill() except (OSError, serial.SerialException) as e: self._serial = None return except Exception as e: return self._reader_thread = Thread(target=self._reader_thread_routine) self._reader_thread.start()
def _serial_ports(with_wifi): """ Lists serial port names :raises EnvironmentError: On unsupported or unknown platforms :returns: A list of the serial ports available on the system """ if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # this excludes your current terminal "/dev/tty" ports = glob.glob('/dev/tty[A-Za-z]*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.*') else: raise EnvironmentError('Unsupported platform') result = [] for port in ports: try: s = serial.Serial() s.dtr = False s.rts = False s.port = port s.open() s.close() result.append(port) except (OSError, serial.SerialException): pass if with_wifi: result.append("wifi") return result
def _flash_finished(self, code): Logger.log("Flash output contents:\r\n") Logger.log(self._flash_output) if code == 0: self._flash_output.extend(b"Rebooting from flash mode...\n") self._update_output() try: s = serial.Serial(self._port, 115200) s.dtr = False s.rts = True time.sleep(0.1) s.rts = False time.sleep(0.1) self._flash_output.extend(b"Done, you may now use the device.\n") self._update_output() except (OSError, serial.SerialException): QMessageBox.critical(self, "Flashing Error", "Failed to reboot into working mode.") elif code == -1: QMessageBox.critical(self, "Flashing Error", "Failed to run script.\nCheck that path to python is correct.") else: QMessageBox.critical(self, "Flashing Error", "Failed to flash new firmware") self.eraseButton.setEnabled(True) self.flashButton.setEnabled(True) self._flashing = False
def open(self): """\ Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if self.is_open: raise SerialException("Port is already open.") self.fd = None # open try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking try: self._reconfigure_port(force_update=True) except: try: os.close(self.fd) except: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None raise else: self.is_open = True if not self._dsrdtr: self._update_dtr_state() if not self._rtscts: self._update_rts_state() self.reset_input_buffer()
def write(self, data): """Output the given byte string over the serial port.""" if not self.is_open: raise portNotOpenError d = to_bytes(data) tx_len = len(d) if self._write_timeout is not None and self._write_timeout > 0: timeout = time.time() + self._write_timeout else: timeout = None while tx_len > 0: try: n = os.write(self.fd, d) if timeout: # when timeout is set, use select to wait for being ready # with the time left as timeout timeleft = timeout - time.time() if timeleft < 0: raise writeTimeoutError _, ready, _ = select.select([], [self.fd], [], timeleft) if not ready: raise writeTimeoutError else: # wait for write operation _, ready, _ = select.select([], [self.fd], [], None) if not ready: raise SerialException('write failed (select)') d = d[n:] tx_len -= n except SerialException: raise except OSError as v: if v.errno != errno.EAGAIN: raise SerialException('write failed: %s' % (v,)) # still calculate and check timeout if timeout and timeout - time.time() < 0: raise writeTimeoutError return len(data)
def read(self, size=1): """\ Read size bytes from the serial port. If a timeout is set it may return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ if self.fd is None: raise portNotOpenError read = bytearray() poll = select.poll() poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) if size > 0: while len(read) < size: # print "\tread(): size",size, "have", len(read) #debug # wait until device becomes ready to read (or something fails) for fd, event in poll.poll(self._timeout * 1000): if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): raise SerialException('device reports error (poll)') # we don't care if it is select.POLLIN or timeout, that's # handled below buf = os.read(self.fd, size - len(read)) read.extend(buf) if ((self._timeout is not None and self._timeout >= 0) or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: break # early abort on timeout return bytes(read)
def _reconfigure_port(self, force_update=True): """Set communication parameters on opened port.""" super(VTIMESerial, self)._reconfigure_port() fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK if self._inter_byte_timeout is not None: vmin = 1 vtime = int(self._inter_byte_timeout * 10) else: vmin = 0 vtime = int(self._timeout * 10) try: orig_attr = termios.tcgetattr(self.fd) iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise serial.SerialException("Could not configure port: %s" % msg) if vtime < 0 or vtime > 255: raise ValueError('Invalid vtime: %r' % vtime) cc[termios.VTIME] = vtime cc[termios.VMIN] = vmin termios.tcsetattr( self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def dump_port_settings(self): sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( p=self.serial)) sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format( ('active' if self.serial.rts else 'inactive'), ('active' if self.serial.dtr else 'inactive'), ('active' if self.serial.break_condition else 'inactive'))) try: sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format( ('active' if self.serial.cts else 'inactive'), ('active' if self.serial.dsr else 'inactive'), ('active' if self.serial.ri else 'inactive'), ('active' if self.serial.cd else 'inactive'))) except serial.SerialException: # on RFC 2217 ports, it can happen if no modem state notification was # yet received. ignore this error. pass sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) #~ sys.stderr.write('--- data escaping: %s linefeed: %s\n' % ( #~ REPR_MODES[self.repr_mode], #~ LF_MODES[self.convert_outgoing])) sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))