我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.PARITY_NONE。
def __init__(self,N): self.time = 0 self.samplesPerUSec = 0 self.data = [] self.corr = np.zeros((4,4)) self.shifts = np.zeros((4,4), dtype=np.int) self.tdoa = np.zeros((4,4)) self.event_index = -1 spacing = 0.217 self.speed_of_sound = 343.0 self.ser = serial.Serial( port='/dev/ttyACM0',\ baudrate=9600,\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ bytesize=serial.EIGHTBITS,\ timeout=None) self.ser.flushInput() self.ser.flushOutput() self.locator = LocationCalculator() self.locator.simulate() time.sleep(5)
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyACM0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE)
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyUSB0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE) self.string = DataHandler()
def setup_serial(ser, port, lock): """Setup all parameters for the serial communication""" try: with lock: ser.baudrate = 4800 ser.port = port ser.bytesize = serial.EIGHTBITS ser.stopbits = serial.STOPBITS_ONE ser.parity = serial.PARITY_NONE ser.timeout = 0.1 # Read timeout in seconds ser.write_timeout = 0.1 # Write timeout in seconds except Exception as e: helper.show_error("Problem initializing serial port " + port + ".\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") sys.exit(0)
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, timeout = 3, xonxoff = 0, rtscts = 0): # do NOT use timeout = 0 !! self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts) javareactor.JConnection.__init__(self, self._serial.sPort, protocol, None) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.protocol.makeConnection(self) wb = javareactor.WriteBlocker(self, reactor.q) wb.start() self.writeBlocker = wb javareactor.ReadBlocker(self, reactor.q).start()
def __init__(self, clientID, bootstrapURI, port, serialPort, debug=False, verbose=False): super(ConstrainedClientSerial, self).__init__(clientID, bootstrapURI, port, debug, verbose) print("Creating serial") serialConnection = serial.Serial( port=serialPort, baudrate=57600, # 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, # STOPBITS_TWO bytesize=serial.EIGHTBITS ) #self._clientSession = fdpexpect.fdspawn(self._serial, logfile=open("constrained_client_serial.log", "w")) self._clientSession = SerialSession(serialConnection) self._sendline("") # FIXME - This should be a reboot? self.expect("Contiki>") self.init()
def __init__(self, port, key=None, shift_baud_rate=False): """Initialization function. Follows RAII, so creating the object opens the port.""" super(ET312SerialSync, self).__init__(key) # Allow derived classes to set up a port to mock serial ports for # tests. There are cleaner ways to mock this, but this will do for now. if not hasattr(self, "port"): # Check argument validity import serial if not port or type(port) is not str: raise ButtshockIOError("Serial port name is missing or is not string!") self.port = serial.Serial(port, 19200, timeout=1, parity=serial.PARITY_NONE, bytesize=8, stopbits=1, xonxoff=0, rtscts=0) self.needs_bytes = False # Test for python 3 if not isinstance(bytes([0]), str): self.needs_bytes = True self.shift_baud_rate = shift_baud_rate
def connect(port, baudrate, application, loop=None): if loop is None: loop = asyncio.get_event_loop() connection_future = asyncio.Future() protocol = Gateway(application, connection_future) transport, protocol = yield from serial_asyncio.create_serial_connection( loop, lambda: protocol, url=port, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, xonxoff=True, ) yield from connection_future return protocol
def spOpen(serialName): # 115200/8/N/1 try: if serialName: serialFd = serial.Serial(serialName, timeout = 3) serialFd.baudrate = 115200 #serialFd.baudrate = 9600 serialFd.bytesize = 8 serialFd.parity = serial.PARITY_NONE; serialFd.stopbits = 1 serialFd.timeout = 3 serialFd.writeTimeout = 1 if serialFd and serialFd.readable(): return serialFd else: return None except serial.SerialException: return None
def get_dsmr_connection_parameters(): """ Returns the communication settings required for the DSMR version set. """ DSMR_VERSION_MAPPING = { DataloggerSettings.DSMR_VERSION_2: { 'baudrate': 9600, 'bytesize': serial.SEVENBITS, 'parity': serial.PARITY_EVEN, 'crc': False, }, DataloggerSettings.DSMR_VERSION_4_PLUS: { 'baudrate': 115200, 'bytesize': serial.EIGHTBITS, 'parity': serial.PARITY_NONE, 'crc': True, }, } datalogger_settings = DataloggerSettings.get_solo() connection_parameters = DSMR_VERSION_MAPPING[datalogger_settings.dsmr_version] connection_parameters['com_port'] = datalogger_settings.com_port return connection_parameters
def __init__(self, strip_df_start=True, com_port=None, baud=38400, parity=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS): super(KissSerial, self).__init__(strip_df_start) self.com_port = com_port self.baud = baud self.parity = parity self.stop_bits = stop_bits self.byte_size = byte_size self.serial = None self.logger.info('Using interface_mode=Serial')
def __init__(self, port, baudrate, timeout, fs=False): """ ????? ??????????? ???????? ?????????????? ? ???????????. :type port: str :param port: ???? ?????????????? ? ??????????? :type baudrate: int :param baudrate: ???????? ?????????????? ? ??????????? :type timeout: float :param timeout: ????? ???????? ?????? ?????????? :type fs: bool :param fs: ??????? ??????? ?? (?????????? ??????????) """ self.port = port self.serial = serial.Serial( baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=timeout, writeTimeout=timeout ) self.fs = fs self.connected = False
def main(): opts = dict( baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, rtscts=False, dsrdtr=False, xonxoff=False ) with serial.Serial('/dev/ttyUSB0', **opts) as port: while True: time.sleep(0.2) port.write(b'\x1b') res = port.read(6) led, segs = extract(res) print(json.dumps({ 'led': led, 'segs': segs })) sys.stdout.flush()
def __init__(self, com_port): ################################################ # Communications ################################################ self.is_open = False self.timeout = 30000 self.ser = serial.Serial( port=com_port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=self.timeout/1000 ) ############################################### # Printer parameters ############################################### self.x = None # position of x axis self.y = None # position of y axis self.z = None # position of z axis self.e = None # position of extruder self.feedrate = 10 # feedrate (mm/s)
def connect(self): try: if not self.isnet: #print "connect to %s %d" % (self.port, self.speed) self.conn = serial.Serial(self.port, self.speed,\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ timeout=0.7,\ writeTimeout=0.7) """ self._exitmode() self.prn_devstatus() """ else: self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn.connect((self.ip, self.port)) self.MAX_WIDTH=self.DEVICE['dev_wchar'] self.MAX_PIXELS_WIDTH=self.DEVICE['dev_wpix'] self.debug("connected") return True except: return False
def connect(self): try: self.ser = serial.Serial(self.port, int(self.speed),\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ timeout=0.7,\ writeTimeout=0.7) except: return False try: self.kkm=kkmdrv.KKM(self.ser,password=self.password,wide=self.MAX_WIDTH) err=0 self._status_ready() #print "connect frk" except: err=const_error self._status_error() #print "not connect frk" return True
def __init__(self, port="/dev/ttyUSB0", baudrate = 57600, timeout=None): #initialization and open the port #possible timeout values: # 1. None: wait forever, block call # 2. 0: non-blocking mode, return immediately # 3. x, x is bigger than 0, float allowed, timeout block call self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = baudrate self.ser.timeout = timeout self.ser.bytesize = serial.EIGHTBITS #number of bits per bytes self.ser.parity = serial.PARITY_NONE #set parity check: no parity self.ser.stopbits = serial.STOPBITS_ONE #number of stop bits self.ser.xonxoff = False #disable software flow control self.ser.rtscts = False #disable hardware (RTS/CTS) flow control self.ser.dsrdtr = False #disable hardware (DSR/DTR) flow control self.ser.writeTimeout = 2 #timeout for write self.ser.open()
def __init__(self, port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5): self.port = port self.baudrate = baudrate self.parity = parity self.stopbits = stopbits self.bytesize = bytesize self.timeout = timeout self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, parity=self.parity, stopbits=self.stopbits, bytesize=self.bytesize, timeout=self.timeout ) atexit.register(self.close) for msg in """24 45 49 47 50 51 2c 44 54 4d 2a 33 42 0d 0a b5 62 06 01 03 00 f0 0a 00 04 23 24 45 49 47 50 51 2c 47 42 53 2a 33 30 0d 0a b5 62 06 01 03 00 f0 09 00 03 21 24 45 49 47 50 51 2c 47 4c 4c 2a 32 31 0d 0a b5 62 06 01 03 00 f0 01 00 fb 11 24 45 49 47 50 51 2c 47 52 53 2a 32 30 0d 0a b5 62 06 01 03 00 f0 06 00 00 1b 24 45 49 47 50 51 2c 47 53 41 2a 33 33 0d 0a b5 62 06 01 03 00 f0 02 00 fc 13 24 45 49 47 50 51 2c 47 53 54 2a 32 36 0d 0a b5 62 06 01 03 00 f0 07 00 01 1d 24 45 49 47 50 51 2c 47 53 56 2a 32 34 0d 0a b5 62 06 01 03 00 f0 03 00 fd 15 24 45 49 47 50 51 2c 56 54 47 2a 32 33 0d 0a b5 62 06 01 03 00 f0 05 00 ff 19 24 45 49 47 50 51 2c 5a 44 41 2a 33 39 0d 0a b5 62 06 01 03 00 f0 08 00 02 1f 24 45 49 47 50 51 2c 47 47 41 2a 32 37 0d 0a b5 62 06 01 03 00 f0 00 01 fb 10 24 45 49 47 50 51 2c 52 4d 43 2a 33 41 0d 0a b5 62 06 01 03 00 f0 04 01 ff 18 B5 62 06 08 06 00 64 00 01 00 01 00 7A 12 B5 62 06 08 00 00 0E 30 """.splitlines(): self.ser.write(bytes(map(lambda x: int(x, 16), msg.strip().split(' ')))) time.sleep(0.1) pass
def __init__(self, port, baud_rate, byte_size=serial.EIGHTBITS, parity=serial.PARITY_NONE, stop_bit=serial.STOPBITS_ONE): # initialize the connection and if anything ok open the port self.__main_conn = serial.Serial(port, baud_rate, bytesize=byte_size, parity=parity, stopbits=stop_bit) # Queue self.dataQueue = queue.Queue() self.checkStatusQueue = queue.Queue() # Define Variables self.out = '' self.__flag_out = True
def __init__(self, port=None): """Opens a connection to a Removinator controller on the specified serial port. If a port is not specified, an attempt will be made to auto-discover the Removinator controller. :param port: port that the Removinator is connected to :type port: str :raises: :exc:`ConnectError` :ivar last_result: The result output from the last executed command :ivar last_response: The response output from the last executed command :ivar port: The port that the Removinator is connected to """ self.port = port self.last_result = '' self.last_response = '' # Attempt to discover the Removinator controller if a # port was not specified. if port is None: port = _discover_removinator() # Open a connection to the Removinator controller. try: self.connection = serial.Serial( port, 9600, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE, 1) self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection, self.connection)) except serial.SerialException as e: raise ConnectError('Unable to open connection to Removinator ' 'controller on port {0}'.format(port))
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0): self._serial = serial.Serial(deviceNameOrPortNumber, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=None, xonxoff=xonxoff, rtscts=rtscts) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.outQueue = [] self.closed = 0 self.closedNotifies = 0 self.writeInProgress = 0 self.protocol = protocol self._overlappedRead = win32file.OVERLAPPED() self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None) self._overlappedWrite = win32file.OVERLAPPED() self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None) self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent') self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent') self.protocol.makeConnection(self) flags, comstat = win32file.ClearCommError(self._serial.hComPort) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, timeout = 0, xonxoff = 0, rtscts = 0): abstract.FileDescriptor.__init__(self, reactor) self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts) self.reactor = reactor self.flushInput() self.flushOutput() self.protocol = protocol self.protocol.makeConnection(self) self.startReading()
def __init__(self): self.ser = serial.Serial( port='/dev/ttyAMA0', # number of device, numbering starts at baudrate=9600, # baud rate bytesize=serial.EIGHTBITS, # number of databits parity=serial.PARITY_NONE, # enable parity checking stopbits=serial.STOPBITS_ONE, # number of stopbits timeout=2, # set a timeout value, None for waiting forever xonxoff=0, # enable software flow control rtscts=0, # enable RTS/CTS flow control )
def __init__(self, path='/dev/ttyACM0', baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=1, open_on_create=True, debugging=False): """ Create Telium device instance :param str path: str Path to serial emulated device :param int baudrate: Set baud rate :param int timeout: Maximum delai before hanging out. :param bool open_on_create: Define if device has to be opened on instance creation :param bool debugging: Enable print device <-> host com trace. (stdout) """ self._path = path self._baud = baudrate self._debugging = debugging self._device_timeout = timeout self._device = None self._device = Serial( self._path if open_on_create else None, baudrate=self._baud, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=timeout ) if not open_on_create: self._device.setPort(self._path)
def open_port(self, port, listening_serial_thread): if not self.fake: self.ser = serial.Serial(port, baudrate=115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_TWO, #rtscts=True, timeout=1000) if listening_serial_thread: SerialThread.SerialThread(self.ser).start()
def __init__(self, data_q, error_q, port, port_baud, port_stopbits=serial.STOPBITS_ONE, port_parity=serial.PARITY_NONE, port_timeout = 0.01, listener=False, virtual=False, infer_limit=15, encoding="UTF-8", delimiter=r"\s"): threading.Thread.__init__(self) self.port = port self.serial_port = None self.serial_arg = dict(port = port, baudrate = int(port_baud), stopbits = float(port_stopbits), parity = port_parity, timeout = float(port_timeout)) if virtual: msg.std("Running in virtual serial port mode.", 2) self.serial_arg["dsrdtr"] = True self.serial_arg["rtscts"] = True self.data_q = data_q self.error_q = error_q self.listener = listener self.encoding = encoding import re self.delimiter = delimiter global rxdelims rxdelims[port] = re.compile(delimiter) self.sensors = {} self._manual_sensors = False """bool: when True, the sensors list was constructed manually using a configuration file; otherwise, it was inferred from the first few lines of data from the serial port. """ if infer_limit is not None: self.inferrer = FormatInferrer(infer_limit) else: # pragma: no cover self.inferrer = None self.alive = threading.Event() self.alive.set()
def tryReconnect(self): self.ser=None try: helper.internalLogger.info("Try to reconnect btmac: {0} to port: {1}".format(self.mac,self.port)) aux=subprocess.check_output([EMEM_DEPLOY_DIR+'/scsem/scripts/bindBTmac.sh',self.mac,self.port,'verbose']) helper.internalLogger.debug("Bind script output:" + aux) except subprocess.CalledProcessError as e: helper.internalLogger.debug("Bind script return error {0} {1}.".format(e.returncode, e.message)) return False except KeyboardInterrupt: print("Ok ok, quitting") sys.exit(1) except Exception as e: e = sys.exc_info()[0] helper.internalLogger.error('Unexpected error binding to btmac. It will be retried later.') helper.einternalLogger.exception(e) return False helper.internalLogger.info("Device visible {0} now trying setup serial interface towards it...".format(self.mac)) try: helper.internalLogger.info("Try open serial port:" + self.port) self.ser = serial.Serial( port=self.port,\ baudrate=self.speed,\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ bytesize=serial.EIGHTBITS,\ timeout=self.timeout) except KeyboardInterrupt: print("Ok ok, quitting") sys.exit(1) except Exception as e: e = sys.exc_info()[0] helper.internalLogger.error('Unexpected error accesing to serial port. It will be retried later.') helper.einternalLogger.exception(e) self.ser=None return False return True
def serial_proc(): global acc_x global acc_y global acc_z if os.name == "nt": PORT = "COM7" else: PORT = "/dev/ttyACM0" # BAUD = 115200 s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE while True: data = s.readline().decode('UTF-8') data_list = data.rstrip().split(' ') try: x, y, z, a, b = data_list acc_x = x acc_y = y acc_z = z print(x, y, z) except: pass s.close()
def __init__(self): self.ser = None try: self.ser = serial.Serial(self.DEVICE, self.BAUD_RATE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=.01) except serial.serialutil.SerialException, err: raise CannotLoadSensor("Cannot load sensor: %s" % str(err)) except Exception as err: raise CannotLoadSensor("Cannot load sensor: %s" % str(err))
def setupSerial(devName, baudrate): serialDev = serial.Serial(port=devName, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE) if serialDev is None: raise RuntimeError('[%s]: Serial port %s not found!\n' % (rospy.get_name(), devDame)) serialDev.write_timeout = .05 serialDev.timeout= 0.05 serialDev.flushOutput() serialDev.flushInput() return serialDev
def test(): PORT = '/dev/tty.usbmodem1412' BAUD = 115200 def get_serial(): s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE s.timeout = 0 # non blocking mode s.close() s.port = PORT s.open() return s s = get_serial() repl = REPL(s) repl.to_raw() repl.send_command("print('hello')") print(repl.wait_response()) repl.send_command("a=1") repl.wait_response() repl.send_command("print(a)") print(repl.wait_response()) # FINISHED s.close()
def get_serial(): s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE s.timeout = 0 # non blocking mode s.close() s.port = PORT s.open() return s
def __init__(self): existing_ports = SerialInterface.serial_ports() if len(existing_ports) > 1: print('WARN! more than one serial port is active on the computer. Blindly using first one...') if len(existing_ports) is 0: print('ERR! no serial ports found on the computer! Exiting...') exit(-1) print('using port: ' + existing_ports[0]) port_name = existing_ports[0] self.ser = serial.Serial(port=port_name, baudrate=9600, bytesize=BYTE_SIZE, parity=serial.PARITY_NONE, stopbits=STOP_BITS, write_timeout=10, timeout=10) self.sampler = Sampler()
def __init__(self, serial_name): self.port = serial_name self.baudrate = 9600 # Default baud rate self.timeout = 1 # Default timeout, seconds self.parity = serial.PARITY_NONE # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 80.0 # Volts self.max_current = 10.0 # Amps self.max_power = 320.0 # Watts
def __init__(self, serial_name): """ Initialize the power supply. Each children an here define non-default values for his specific case Input : serial_name, String, is the serial port name (e.g. COM2) """ self.name = "Generic Power Supply" self.port = serial_name self.baudrate = 9600 # Default baud rate """ Possible baudrate values : 50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000 """ self.timeout = 1 # Default timeout, seconds self.parity = serial.PARITY_NONE # Default parity """ Possible parities : PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE """ self.stopbits = serial.STOPBITS_ONE # Default stop bits """ Possible stopbits : STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO """ self.bytesize = serial.EIGHTBITS """ Possible bytesizes : FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS """ self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 0.0 # Volts self.max_current = 0.0 # Amps self.max_power = 0.0 # Watts
def connect(self): """ Open tty connection to sensor """ if self.link is not None: self.disconnect() self.link = serial.Serial(self.port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, dsrdtr=True, timeout=5, interCharTimeout=0.1)
def GoodFETSerialPort(**kwargs): "Return a Serial port using default values possibly overriden by caller" import serial port = os.environ.get('GOODFET') or "/dev/ttyUSB0" args = dict(port=port, baudrate=115200, parity=serial.PARITY_NONE, timeout=2) args.update(kwargs) return serial.Serial(**args)
def main(args): global sock global ser sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) sock.bind(("0.0.0.0",UDPport)) ser = serial.Serial(serport,baudrate,timeout=0,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=8,rtscts=False,xonxoff=False) data = sock.recv(10000) print "start loop, listen-port %u" % UDPport print "sending to %s" % serport try: while(data): ## #print "UDP data %s" % repr(data)[:55] proc_input(data) data = sock.recv(10000) except socket.timeout: sock.close() return 0 # Eine Zeile Input verarbeiten: Auf 5-bit kuerzen, RGB Komponenten umordnen, alles an tty senden.
def __init__(self, device='/dev/ttyAMA0', baud=115200, parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0): # ACK header for data recive self.Header = bytearray([0x00, 0x00, 0xff, 0x00, 0xff, 0x00, 0x00, 0x00, 0xff]) # time out. if time_out != 0: self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size, timeout=time_out) else: self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size) # manually close connection
def __init__(self, device='/dev/ttyAMA0', baud=115200, parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0): super(PN532, self).__init__(device, baud, parity_, stop_bits, byte_size, time_out)
def __init__(self, port='/dev/serial0'): self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = 115200 self.ser.bytesize = serial.EIGHTBITS self.ser.parity = serial.PARITY_NONE self.ser.stopbits = serial.STOPBITS_ONE self.open_serial() self.rc_data = [0] * N_CHAN
def __init__(self, port="/dev/ttyACM0", bitrate=115200, rx_protocol=msp.RX_MSP, firmware=msp.FIRMWARE_BF): assert rx_protocol in msp.RX_OPTIONS, ( "unsupported rx protocol indicated") self.rx_protocol = rx_protocol self.rx_protocol_ch_count = len( MSP_PAYLOAD_FMT[msp.MSP_RC][rx_protocol][1:]) assert firmware in msp.FIRMWARE_OPTIONS, ( "unsupported firmware indicated") self.firmware = firmware self.firmware_motor_count = len( MSP_PAYLOAD_FMT[msp.MSP_MOTOR][firmware][1:]) self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = bitrate self.ser.bytesize = serial.EIGHTBITS self.ser.parity = serial.PARITY_NONE self.ser.stopbits = serial.STOPBITS_ONE self.ser.timeout = None self.ser.xonxoff = False self.ser.rtscts = False self.ser.dsrdtr = False self.ser.writeTimeout = 2 self.open_serial()
def MakeSerialDevice(port="/dev/ttyUSB0"): dev = serial.Serial( port=port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, # blocking timeout=5) # dev.open() return dev
def __init__(self, serialdevice, configuration, power_control=DTR_ON, scan_interval=0): """Initialize the data collector based on the given parameters.""" self.record_length = configuration[RECORD_LENGTH] self.start_sequence = configuration[STARTBLOCK] self.byte_order = configuration[BYTE_ORDER] self.multiplier = configuration[MULTIPLIER] self.timeout = configuration[TIMEOUT] self.scan_interval = scan_interval self.listeners = [] self.power_control = power_control self.sensordata = {} self.config = configuration self.data = None self.last_poll = None self.start_func = None self.stop_func = None self.ser = serial.Serial(port=serialdevice, baudrate=configuration[BAUD_RATE], parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) # Update date in using a background thread if self.scan_interval > 0: thread = threading.Thread(target=self.refresh, args=()) thread.daemon = True thread.start()
def read_mh_z19_with_temperature(serial_device): """ Read the CO2 PPM concenration and temperature from a MH-Z19 sensor""" logger = logging.getLogger(__name__) ser = serial.Serial(port=serial_device, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) sbuf = bytearray() starttime = time.time() finished = False timeout = 2 res = None ser.write(MZH19_READ) while not finished: mytime = time.time() if mytime - starttime > timeout: logger.error("read timeout after %s seconds, read %s bytes", timeout, len(sbuf)) return None if ser.inWaiting() > 0: sbuf += ser.read(1) if len(sbuf) == MHZ19_SIZE: # TODO: check checksum res = (sbuf[2]*256 + sbuf[3], sbuf[4] - 40) logger.debug("Finished reading data %s", sbuf) finished = True else: time.sleep(.1) logger.debug("Serial waiting for data, buffer length=%s", len(sbuf)) return res