我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.serial_for_url()。
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD): """Base constructor for ESPLoader bootloader interaction Don't call this constructor, either instantiate ESP8266ROM or ESP32ROM, or use ESPLoader.detect_chip(). This base class has all of the instance methods for bootloader functionality supported across various chips & stub loaders. Subclasses replace the functions they don't support with ones which throw NotImplementedInROMError(). """ if isinstance(port, serial.Serial): self._port = port else: self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._port.baudrate = baud
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False): """Base constructor for ESPLoader bootloader interaction Don't call this constructor, either instantiate ESP8266ROM or ESP32ROM, or use ESPLoader.detect_chip(). This base class has all of the instance methods for bootloader functionality supported across various chips & stub loaders. Subclasses replace the functions they don't support with ones which throw NotImplementedInROMError(). """ if isinstance(port, serial.Serial): self._port = port else: self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port, self.trace) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._set_port_baudrate(baud) self._trace_enabled = trace_enabled
def serialPoller(self): start = True def addText(self, text): tb = self.consoleBody.get_buffer() tb.insert(tb.get_end_iter(), text) while True: if self.serialConnection: try: data = self.serialConnection.read() d2 = '' for i in data: if i in string.printable: d2 += i gobject.idle_add(addText, self, d2) except: pass else: try: self.serialConnection = serial.serial_for_url(self.serialLocation) self.serialConnection.baudrate = self.baudrate except: pass time.sleep(0.1)
def inlineSerialPoller(self): start = True def addText(self, text): tb = self.serialConsoleBody.get_buffer() tb.insert(tb.get_end_iter(), text) while True: if self.serialConnection: try: data = self.serialConnection.read() d2 = '' for i in data: if i in string.printable: d2 += i gobject.idle_add(addText, self, d2) except: pass else: try: self.serialConnection = serial.serial_for_url(self.serialLocation) self.serialConnection.baudrate = self.baudrate except: pass time.sleep(0.1)
def test_init_sets_the_correct_attrs(self): """__init__ sets the fields that get_settings reads""" for setting, value in ( ('baudrate', 57600), ('timeout', 7), ('write_timeout', 12), ('inter_byte_timeout', 15), ('stopbits', serial.STOPBITS_TWO), ('bytesize', serial.SEVENBITS), ('parity', serial.PARITY_ODD), ('xonxoff', True), ('rtscts', True), ('dsrdtr', True)): kwargs = {'do_not_open': True, setting: value} ser = serial.serial_for_url(PORT, **kwargs) d = ser.get_settings() self.assertEqual(getattr(ser, setting), value) self.assertEqual(d[setting], value)
def test_framed_packet(self): """simple test of line reader class""" class TestFramedPacket(serial.threaded.FramedPacket): def __init__(self): super(TestFramedPacket, self).__init__() self.received_packets = [] def handle_packet(self, packet): self.received_packets.append(packet) def send_packet(self, packet): self.transport.write(self.START) self.transport.write(packet) self.transport.write(self.STOP) ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) with serial.threaded.ReaderThread(ser, TestFramedPacket) as protocol: protocol.send_packet(b'1') protocol.send_packet(b'2') protocol.send_packet(b'3') time.sleep(1) self.assertEqual(protocol.received_packets, [b'1', b'2', b'3'])
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD): """Base constructor for ESPLoader bootloader interaction Don't call this constructor, either instantiate ESP8266ROM or ESP32ROM, or use ESPLoader.detect_chip(). This base class has all of the instance methods for bootloader functionality supported across various chips & stub loaders. Subclasses replace the functions they don't support with ones which throw NotImplementedInROMError(). """ if isinstance(port, serial.Serial): self._port = port else: self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._set_port_baudrate(baud)
def __init__(self, port, baud_rate): import serial self.serial = serial.serial_for_url(port) self.serial.baudrate = baud self.serial.timeout = 1
def __init__(self, rt): super().__init__(rt, '.faceplate') enc_cfg = self.rt.config['enclosure'] self.serial = serial.serial_for_url(url=enc_cfg['port'], baudrate=enc_cfg['rate'], timeout=enc_cfg['timeout']) self.queue = Queue() self.timers = []
def __init__(self, port=0, baud=ESP_ROM_BAUD): self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._port.baudrate = baud
def connect(self, pyelk, address, ratelimit): """Connect to the Elk panel. address: Host to connect to in either "socket://IP.Add.re.ss:Port" or "/dev/ttyUSB0" format. ratelimit: rate limit for outgoing events """ self._connection = serial.serial_for_url(address, timeout=1) self._connection_thread = serial.threaded.ReaderThread(self._connection, SerialInputHandler) self._connection_thread.start() self._connection_transport, self._connection_protocol = self._connection_thread.connect() self._connection_protocol.set_pyelk(pyelk) self._connection_output = SerialOutputHandler(ratelimit) self._connection_output.set_pyelk(pyelk) _LOGGER.debug('ReaderThread created')
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=10)
def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0): try: self.serial = serial.serial_for_url(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=1) except AttributeError: # happens when the installed pyserial is older than 2.5. use the # Serial class directly then. self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=1) self.echo = echo self.repr_mode = repr_mode self.convert_outgoing = convert_outgoing self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] self.dtr_state = True self.rts_state = True self.break_state = False
def __init__(self, port=0, baud=ESP_ROM_BAUD): self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/themadinventor/esptool/issues/44#issuecomment-107094446 self._port.baudrate = baud
def refresh(self, *args): self.ports = list(list_ports.grep('')) #self.serialLocation = self.ports[0][0] if self.ports else None #self.serialConnection = None if not self.serialLocation else serial.serial_for_url(self.serialLocation) #if self.serialLocation is not None: #self.serialConnection.baudrate = self.baudrate self.gtkserialloc.get_model().clear() for i in self.ports: self.gtkserialloc.append_text(i[0]) self.gtkserialloc.set_active(0 if self.ports else -1)
def brtchange(self, widget, *args): model = widget.get_model() index = widget.get_active() newbdrate = int(model[index][0]) self.baudrate = newbdrate if not self.serialConnection: self.serialConnection = serial.serial_for_url(self.serialLocation) self.serialConnection.baudrate = newbdrate
def portchange(self, widget, *args): model = widget.get_model() index = widget.get_active() if 0 <= index < len(model): newport = model[index][0] self.serialLocation = newport if not self.serialConnection: self.serialConnection = serial.serial_for_url(self.serialLocation) self.serialConnection.port = newport self.serialConnection.baudrate = self.baudrate
def __init__(self, port, baudrate=115200): self.port = serial.serial_for_url(port, baudrate)
def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger("{}({})".format(self, self.target)) if isinstance(self.port, SerialPort): self.serial = serial.Serial() else: if self.port.protocol == "rfc2217": self.serial = serial.rfc2217.Serial() elif self.port.protocol == "raw": self.serial = serial.serial_for_url("socket://", do_not_open=True) else: raise Exception("SerialDriver: unknown protocol") self.status = 0
def create_serial_connection(loop, protocol_factory, *args, **kwargs): ser = serial.serial_for_url(*args, **kwargs) protocol = protocol_factory() transport = SerialTransport(loop, protocol, ser) return (transport, protocol)
def __init__(self, dev_name, timeout_secs, control_char_cb, logger): """ *dev_name* is string name of the device e.g. /dev/cu.usbserial *timeout_secs* in fractional seconds; e.g. 0.25 = 250 milliseconds """ self.control_char_cb = control_char_cb self.logger = logger self.logger.debug("SerialInterface Starting") # Ugly debugging hack if dev_name == 'fake': return self.serdev = serial.serial_for_url(dev_name, baudrate=CONCORD_BAUD, bytesize=CONCORD_BYTESIZE, parity=CONCORD_PARITY, stopbits=CONCORD_STOPBITS, timeout=timeout_secs, xonxoff=False, rtscts=False, dsrdtr=False)
def setUp(self): # create a closed serial port self.s = serial.serial_for_url(PORT, do_not_open=True)
def test_RtsCtsSetting(self): for rtscts in (True, False): self.s.rtscts = rtscts # test get method self.assertEqual(self.s.rtscts, rtscts) # test internals self.assertEqual(self.s._rtscts, rtscts) # no illegal values here, normal rules for the boolean value of an # object are used thus all objects have a truth value. # this test does not work anymore since serial_for_url that is used # now, already sets a port
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=1) #~ self.io = io.TextIOWrapper(self.s) self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s))
def test_getsettings(self): """the settings dict reflects the current settings""" ser = serial.serial_for_url(PORT, do_not_open=True) d = ser.get_settings() for setting in SETTINGS: self.assertEqual(getattr(ser, setting), d[setting])
def test_partial_settings(self): """partial settings dictionaries are also accepted""" ser = serial.serial_for_url(PORT, do_not_open=True) d = ser.get_settings() del d['baudrate'] del d['bytesize'] ser.apply_settings(d) for setting in d: self.assertEqual(getattr(ser, setting), d[setting])
def setUp(self): # create a closed serial port self.s = serial.serial_for_url(PORT) self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read") self.s.timeout = 10 self.cancel_called = 0
def setUp(self): # create a closed serial port self.s = serial.serial_for_url(PORT, baudrate=300) # extra slow ~30B/s => 1kb ~ 34s self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write") self.s.write_timeout = 10 self.cancel_called = 0
def test_failed_connection(self): # connection to closed port s = serial.serial_for_url('rfc2217://127.99.99.99:2217', do_not_open=True) self.assertRaises(serial.SerialException, s.open) self.assertFalse(s.is_open) s.close() # no errors expected # invalid address s = serial.serial_for_url('rfc2217://127goingtofail', do_not_open=True) self.assertRaises(serial.SerialException, s.open) self.assertFalse(s.is_open) s.close() # no errors expected # close w/o open is also OK s = serial.serial_for_url('rfc2217://irrelevant', do_not_open=True) self.assertFalse(s.is_open) s.close() # no errors expected
def setUp(self): self.s = serial.serial_for_url(PORT, BAUDRATE, timeout=10)
def test_loop(self): """loop interface""" serial.serial_for_url('loop://', do_not_open=True)
def test_bad_url(self): """invalid protocol specified""" self.assertRaises(ValueError, serial.serial_for_url, "imnotknown://")
def test_custom_url(self): """custom protocol handlers""" # it's unknown self.assertRaises(ValueError, serial.serial_for_url, "test://") # add search path serial.protocol_handler_packages.append('handlers') # now it should work serial.serial_for_url("test://") # remove our handler again serial.protocol_handler_packages.remove('handlers') # so it should not work anymore self.assertRaises(ValueError, serial.serial_for_url, "test://")
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=1)
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=self.timeout)
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=None) self.event = SendEvent(self.s)
def setUp(self): self.s = serial.serial_for_url(PORT, timeout=None)
def setUp(self): self.s = serial.serial_for_url(PORT)
def test_RtsCtsSetting(self): for rtscts in (True, False): self.s.rtscts = rtscts # test get method self.failUnlessEqual(self.s.rtscts, rtscts) # test internals self.failUnlessEqual(self.s._rtscts, rtscts) # no illegal values here, normal rules for the boolean value of an # object are used thus all objects have a truth value. # this test does not work anymore since serial_for_url that is used # now, already sets a port