我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.EIGHTBITS。
def __init__(self,N): self.time = 0 self.samplesPerUSec = 0 self.data = [] self.corr = np.zeros((4,4)) self.shifts = np.zeros((4,4), dtype=np.int) self.tdoa = np.zeros((4,4)) self.event_index = -1 spacing = 0.217 self.speed_of_sound = 343.0 self.ser = serial.Serial( port='/dev/ttyACM0',\ baudrate=9600,\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ bytesize=serial.EIGHTBITS,\ timeout=None) self.ser.flushInput() self.ser.flushOutput() self.locator = LocationCalculator() self.locator.simulate() time.sleep(5)
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyACM0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE)
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyUSB0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE) self.string = DataHandler()
def setup_serial(ser, port, lock): """Setup all parameters for the serial communication""" try: with lock: ser.baudrate = 4800 ser.port = port ser.bytesize = serial.EIGHTBITS ser.stopbits = serial.STOPBITS_ONE ser.parity = serial.PARITY_NONE ser.timeout = 0.1 # Read timeout in seconds ser.write_timeout = 0.1 # Write timeout in seconds except Exception as e: helper.show_error("Problem initializing serial port " + port + ".\n\n" "Exception:\n" + str(e) + "\n\n" "The application will now exit.") sys.exit(0)
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, timeout = 3, xonxoff = 0, rtscts = 0): # do NOT use timeout = 0 !! self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts) javareactor.JConnection.__init__(self, self._serial.sPort, protocol, None) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.protocol.makeConnection(self) wb = javareactor.WriteBlocker(self, reactor.q) wb.start() self.writeBlocker = wb javareactor.ReadBlocker(self, reactor.q).start()
def __init__(self, clientID, bootstrapURI, port, serialPort, debug=False, verbose=False): super(ConstrainedClientSerial, self).__init__(clientID, bootstrapURI, port, debug, verbose) print("Creating serial") serialConnection = serial.Serial( port=serialPort, baudrate=57600, # 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, # STOPBITS_TWO bytesize=serial.EIGHTBITS ) #self._clientSession = fdpexpect.fdspawn(self._serial, logfile=open("constrained_client_serial.log", "w")) self._clientSession = SerialSession(serialConnection) self._sendline("") # FIXME - This should be a reboot? self.expect("Contiki>") self.init()
def __init__(self, device): self.ser = None self.ser = serial.Serial(device, baudrate=921600, # 1152000, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, timeout=0.1, xonxoff=False, rtscts=False, dsrdtr=False) self._connect = False self._lock() self._speed_up() self.reset_input_buffer = self.ser.reset_input_buffer self.reset_output_buffer = self.ser.reset_output_buffer self.write = self.ser.write self.read = self.ser.read self.flush = self.ser.flush
def get_dsmr_connection_parameters(): """ Returns the communication settings required for the DSMR version set. """ DSMR_VERSION_MAPPING = { DataloggerSettings.DSMR_VERSION_2: { 'baudrate': 9600, 'bytesize': serial.SEVENBITS, 'parity': serial.PARITY_EVEN, 'crc': False, }, DataloggerSettings.DSMR_VERSION_4_PLUS: { 'baudrate': 115200, 'bytesize': serial.EIGHTBITS, 'parity': serial.PARITY_NONE, 'crc': True, }, } datalogger_settings = DataloggerSettings.get_solo() connection_parameters = DSMR_VERSION_MAPPING[datalogger_settings.dsmr_version] connection_parameters['com_port'] = datalogger_settings.com_port return connection_parameters
def __init__(self, strip_df_start=True, com_port=None, baud=38400, parity=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS): super(KissSerial, self).__init__(strip_df_start) self.com_port = com_port self.baud = baud self.parity = parity self.stop_bits = stop_bits self.byte_size = byte_size self.serial = None self.logger.info('Using interface_mode=Serial')
def main(): opts = dict( baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, rtscts=False, dsrdtr=False, xonxoff=False ) with serial.Serial('/dev/ttyUSB0', **opts) as port: while True: time.sleep(0.2) port.write(b'\x1b') res = port.read(6) led, segs = extract(res) print(json.dumps({ 'led': led, 'segs': segs })) sys.stdout.flush()
def __init__(self, com_port): ################################################ # Communications ################################################ self.is_open = False self.timeout = 30000 self.ser = serial.Serial( port=com_port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=self.timeout/1000 ) ############################################### # Printer parameters ############################################### self.x = None # position of x axis self.y = None # position of y axis self.z = None # position of z axis self.e = None # position of extruder self.feedrate = 10 # feedrate (mm/s)
def __init__(self, port="/dev/ttyUSB0", baudrate = 57600, timeout=None): #initialization and open the port #possible timeout values: # 1. None: wait forever, block call # 2. 0: non-blocking mode, return immediately # 3. x, x is bigger than 0, float allowed, timeout block call self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = baudrate self.ser.timeout = timeout self.ser.bytesize = serial.EIGHTBITS #number of bits per bytes self.ser.parity = serial.PARITY_NONE #set parity check: no parity self.ser.stopbits = serial.STOPBITS_ONE #number of stop bits self.ser.xonxoff = False #disable software flow control self.ser.rtscts = False #disable hardware (RTS/CTS) flow control self.ser.dsrdtr = False #disable hardware (DSR/DTR) flow control self.ser.writeTimeout = 2 #timeout for write self.ser.open()
def __init__(self, port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5): self.port = port self.baudrate = baudrate self.parity = parity self.stopbits = stopbits self.bytesize = bytesize self.timeout = timeout self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, parity=self.parity, stopbits=self.stopbits, bytesize=self.bytesize, timeout=self.timeout ) atexit.register(self.close) for msg in """24 45 49 47 50 51 2c 44 54 4d 2a 33 42 0d 0a b5 62 06 01 03 00 f0 0a 00 04 23 24 45 49 47 50 51 2c 47 42 53 2a 33 30 0d 0a b5 62 06 01 03 00 f0 09 00 03 21 24 45 49 47 50 51 2c 47 4c 4c 2a 32 31 0d 0a b5 62 06 01 03 00 f0 01 00 fb 11 24 45 49 47 50 51 2c 47 52 53 2a 32 30 0d 0a b5 62 06 01 03 00 f0 06 00 00 1b 24 45 49 47 50 51 2c 47 53 41 2a 33 33 0d 0a b5 62 06 01 03 00 f0 02 00 fc 13 24 45 49 47 50 51 2c 47 53 54 2a 32 36 0d 0a b5 62 06 01 03 00 f0 07 00 01 1d 24 45 49 47 50 51 2c 47 53 56 2a 32 34 0d 0a b5 62 06 01 03 00 f0 03 00 fd 15 24 45 49 47 50 51 2c 56 54 47 2a 32 33 0d 0a b5 62 06 01 03 00 f0 05 00 ff 19 24 45 49 47 50 51 2c 5a 44 41 2a 33 39 0d 0a b5 62 06 01 03 00 f0 08 00 02 1f 24 45 49 47 50 51 2c 47 47 41 2a 32 37 0d 0a b5 62 06 01 03 00 f0 00 01 fb 10 24 45 49 47 50 51 2c 52 4d 43 2a 33 41 0d 0a b5 62 06 01 03 00 f0 04 01 ff 18 B5 62 06 08 06 00 64 00 01 00 01 00 7A 12 B5 62 06 08 00 00 0E 30 """.splitlines(): self.ser.write(bytes(map(lambda x: int(x, 16), msg.strip().split(' ')))) time.sleep(0.1) pass
def __init__(self, port, baud_rate, byte_size=serial.EIGHTBITS, parity=serial.PARITY_NONE, stop_bit=serial.STOPBITS_ONE): # initialize the connection and if anything ok open the port self.__main_conn = serial.Serial(port, baud_rate, bytesize=byte_size, parity=parity, stopbits=stop_bit) # Queue self.dataQueue = queue.Queue() self.checkStatusQueue = queue.Queue() # Define Variables self.out = '' self.__flag_out = True
def __init__(self, port=None): """Opens a connection to a Removinator controller on the specified serial port. If a port is not specified, an attempt will be made to auto-discover the Removinator controller. :param port: port that the Removinator is connected to :type port: str :raises: :exc:`ConnectError` :ivar last_result: The result output from the last executed command :ivar last_response: The response output from the last executed command :ivar port: The port that the Removinator is connected to """ self.port = port self.last_result = '' self.last_response = '' # Attempt to discover the Removinator controller if a # port was not specified. if port is None: port = _discover_removinator() # Open a connection to the Removinator controller. try: self.connection = serial.Serial( port, 9600, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE, 1) self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection, self.connection)) except serial.SerialException as e: raise ConnectError('Unable to open connection to Removinator ' 'controller on port {0}'.format(port))
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0): self._serial = serial.Serial(deviceNameOrPortNumber, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=None, xonxoff=xonxoff, rtscts=rtscts) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.outQueue = [] self.closed = 0 self.closedNotifies = 0 self.writeInProgress = 0 self.protocol = protocol self._overlappedRead = win32file.OVERLAPPED() self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None) self._overlappedWrite = win32file.OVERLAPPED() self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None) self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent') self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent') self.protocol.makeConnection(self) flags, comstat = win32file.ClearCommError(self._serial.hComPort) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, timeout = 0, xonxoff = 0, rtscts = 0): abstract.FileDescriptor.__init__(self, reactor) self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts) self.reactor = reactor self.flushInput() self.flushOutput() self.protocol = protocol self.protocol.makeConnection(self) self.startReading()
def __init__(self): self.ser = serial.Serial( port='/dev/ttyAMA0', # number of device, numbering starts at baudrate=9600, # baud rate bytesize=serial.EIGHTBITS, # number of databits parity=serial.PARITY_NONE, # enable parity checking stopbits=serial.STOPBITS_ONE, # number of stopbits timeout=2, # set a timeout value, None for waiting forever xonxoff=0, # enable software flow control rtscts=0, # enable RTS/CTS flow control )
def __init__(self, path='/dev/ttyACM0', baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=1, open_on_create=True, debugging=False): """ Create Telium device instance :param str path: str Path to serial emulated device :param int baudrate: Set baud rate :param int timeout: Maximum delai before hanging out. :param bool open_on_create: Define if device has to be opened on instance creation :param bool debugging: Enable print device <-> host com trace. (stdout) """ self._path = path self._baud = baudrate self._debugging = debugging self._device_timeout = timeout self._device = None self._device = Serial( self._path if open_on_create else None, baudrate=self._baud, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=timeout ) if not open_on_create: self._device.setPort(self._path)
def open_port(self, port, listening_serial_thread): if not self.fake: self.ser = serial.Serial(port, baudrate=115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_TWO, #rtscts=True, timeout=1000) if listening_serial_thread: SerialThread.SerialThread(self.ser).start()
def tryReconnect(self): self.ser=None try: helper.internalLogger.info("Try to reconnect btmac: {0} to port: {1}".format(self.mac,self.port)) aux=subprocess.check_output([EMEM_DEPLOY_DIR+'/scsem/scripts/bindBTmac.sh',self.mac,self.port,'verbose']) helper.internalLogger.debug("Bind script output:" + aux) except subprocess.CalledProcessError as e: helper.internalLogger.debug("Bind script return error {0} {1}.".format(e.returncode, e.message)) return False except KeyboardInterrupt: print("Ok ok, quitting") sys.exit(1) except Exception as e: e = sys.exc_info()[0] helper.internalLogger.error('Unexpected error binding to btmac. It will be retried later.') helper.einternalLogger.exception(e) return False helper.internalLogger.info("Device visible {0} now trying setup serial interface towards it...".format(self.mac)) try: helper.internalLogger.info("Try open serial port:" + self.port) self.ser = serial.Serial( port=self.port,\ baudrate=self.speed,\ parity=serial.PARITY_NONE,\ stopbits=serial.STOPBITS_ONE,\ bytesize=serial.EIGHTBITS,\ timeout=self.timeout) except KeyboardInterrupt: print("Ok ok, quitting") sys.exit(1) except Exception as e: e = sys.exc_info()[0] helper.internalLogger.error('Unexpected error accesing to serial port. It will be retried later.') helper.einternalLogger.exception(e) self.ser=None return False return True
def serial_proc(): global acc_x global acc_y global acc_z if os.name == "nt": PORT = "COM7" else: PORT = "/dev/ttyACM0" # BAUD = 115200 s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE while True: data = s.readline().decode('UTF-8') data_list = data.rstrip().split(' ') try: x, y, z, a, b = data_list acc_x = x acc_y = y acc_z = z print(x, y, z) except: pass s.close()
def __init__(self): self.ser = None try: self.ser = serial.Serial(self.DEVICE, self.BAUD_RATE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=.01) except serial.serialutil.SerialException, err: raise CannotLoadSensor("Cannot load sensor: %s" % str(err)) except Exception as err: raise CannotLoadSensor("Cannot load sensor: %s" % str(err))
def __init__(self, port = '/dev/ttyUSB0', baudRate = 9600): """ Constructor @param string port @param integer baudRate """ ## Validates port if ( os.path.exists(port) == False ): raise Exception('The RFID sensor port "' + port + '" was not found!') ## Initializes connection self.__serial = serial.Serial(port = port, baudrate = baudRate, bytesize = serial.EIGHTBITS, timeout = 1)
def test(): PORT = '/dev/tty.usbmodem1412' BAUD = 115200 def get_serial(): s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE s.timeout = 0 # non blocking mode s.close() s.port = PORT s.open() return s s = get_serial() repl = REPL(s) repl.to_raw() repl.send_command("print('hello')") print(repl.wait_response()) repl.send_command("a=1") repl.wait_response() repl.send_command("print(a)") print(repl.wait_response()) # FINISHED s.close()
def get_serial(): s = serial.Serial(PORT) s.baudrate = BAUD s.parity = serial.PARITY_NONE s.databits = serial.EIGHTBITS s.stopbits = serial.STOPBITS_ONE s.timeout = 0 # non blocking mode s.close() s.port = PORT s.open() return s
def __init__(self, serial_name): self.port = serial_name self.baudrate = 9600 # Default baud rate self.timeout = 0.1 # Default timeout, seconds self.parity = serial.PARITY_ODD # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 42.0 # Volts self.max_current = 6.0 # Amps self.max_power = 100.0 # Watts
def __init__(self, serial_name): self.port = serial_name self.baudrate = 9600 # Default baud rate self.timeout = 1 # Default timeout, seconds self.parity = serial.PARITY_NONE # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 80.0 # Volts self.max_current = 10.0 # Amps self.max_power = 320.0 # Watts
def __init__(self, serial_name): """ Initialize the power supply. Each children an here define non-default values for his specific case Input : serial_name, String, is the serial port name (e.g. COM2) """ self.name = "Generic Power Supply" self.port = serial_name self.baudrate = 9600 # Default baud rate """ Possible baudrate values : 50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000 """ self.timeout = 1 # Default timeout, seconds self.parity = serial.PARITY_NONE # Default parity """ Possible parities : PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE """ self.stopbits = serial.STOPBITS_ONE # Default stop bits """ Possible stopbits : STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO """ self.bytesize = serial.EIGHTBITS """ Possible bytesizes : FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS """ self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 0.0 # Volts self.max_current = 0.0 # Amps self.max_power = 0.0 # Watts
def __init__(self, serial_name): self.port = serial_name self.baudrate = 57600 # Default baud rate self.timeout = 0.1 # Default timeout, seconds self.parity = serial.PARITY_ODD # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 360.0 # Volts self.max_current = 10.0 # Amps self.max_power = 1000.0 # Watts
def connect(self): """ Open tty connection to sensor """ if self.link is not None: self.disconnect() self.link = serial.Serial(self.port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, dsrdtr=True, timeout=5, interCharTimeout=0.1)
def __init__(self, device='/dev/ttyAMA0', baud=115200, parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0): # ACK header for data recive self.Header = bytearray([0x00, 0x00, 0xff, 0x00, 0xff, 0x00, 0x00, 0x00, 0xff]) # time out. if time_out != 0: self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size, timeout=time_out) else: self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size) # manually close connection
def __init__(self, device='/dev/ttyAMA0', baud=115200, parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0): super(PN532, self).__init__(device, baud, parity_, stop_bits, byte_size, time_out)
def __init__(self, port='/dev/serial0'): self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = 115200 self.ser.bytesize = serial.EIGHTBITS self.ser.parity = serial.PARITY_NONE self.ser.stopbits = serial.STOPBITS_ONE self.open_serial() self.rc_data = [0] * N_CHAN
def __init__(self, port="/dev/ttyACM0", bitrate=115200, rx_protocol=msp.RX_MSP, firmware=msp.FIRMWARE_BF): assert rx_protocol in msp.RX_OPTIONS, ( "unsupported rx protocol indicated") self.rx_protocol = rx_protocol self.rx_protocol_ch_count = len( MSP_PAYLOAD_FMT[msp.MSP_RC][rx_protocol][1:]) assert firmware in msp.FIRMWARE_OPTIONS, ( "unsupported firmware indicated") self.firmware = firmware self.firmware_motor_count = len( MSP_PAYLOAD_FMT[msp.MSP_MOTOR][firmware][1:]) self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = bitrate self.ser.bytesize = serial.EIGHTBITS self.ser.parity = serial.PARITY_NONE self.ser.stopbits = serial.STOPBITS_ONE self.ser.timeout = None self.ser.xonxoff = False self.ser.rtscts = False self.ser.dsrdtr = False self.ser.writeTimeout = 2 self.open_serial()
def MakeSerialDevice(port="/dev/ttyUSB0"): dev = serial.Serial( port=port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, # blocking timeout=5) # dev.open() return dev
def __init__(self, serialdevice, configuration, power_control=DTR_ON, scan_interval=0): """Initialize the data collector based on the given parameters.""" self.record_length = configuration[RECORD_LENGTH] self.start_sequence = configuration[STARTBLOCK] self.byte_order = configuration[BYTE_ORDER] self.multiplier = configuration[MULTIPLIER] self.timeout = configuration[TIMEOUT] self.scan_interval = scan_interval self.listeners = [] self.power_control = power_control self.sensordata = {} self.config = configuration self.data = None self.last_poll = None self.start_func = None self.stop_func = None self.ser = serial.Serial(port=serialdevice, baudrate=configuration[BAUD_RATE], parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) # Update date in using a background thread if self.scan_interval > 0: thread = threading.Thread(target=self.refresh, args=()) thread.daemon = True thread.start()
def read_mh_z19_with_temperature(serial_device): """ Read the CO2 PPM concenration and temperature from a MH-Z19 sensor""" logger = logging.getLogger(__name__) ser = serial.Serial(port=serial_device, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) sbuf = bytearray() starttime = time.time() finished = False timeout = 2 res = None ser.write(MZH19_READ) while not finished: mytime = time.time() if mytime - starttime > timeout: logger.error("read timeout after %s seconds, read %s bytes", timeout, len(sbuf)) return None if ser.inWaiting() > 0: sbuf += ser.read(1) if len(sbuf) == MHZ19_SIZE: # TODO: check checksum res = (sbuf[2]*256 + sbuf[3], sbuf[4] - 40) logger.debug("Finished reading data %s", sbuf) finished = True else: time.sleep(.1) logger.debug("Serial waiting for data, buffer length=%s", len(sbuf)) return res
def __init__(self, device='/dev/ttyUSB0', baudrate=9600, rst='-rts', debug=False): self._sl = serial.Serial( port = device, parity = serial.PARITY_EVEN, bytesize = serial.EIGHTBITS, stopbits = serial.STOPBITS_TWO, timeout = 1, xonxoff = 0, rtscts = 0, baudrate = baudrate, ) self._rst_pin = rst self._debug = debug
def __init__(self,addr='COM1',ndacs=8,polarity=('BIP','BIP'),verb=True,timeout = 2,reset=False): #Directly using pyserial interface and skipping pyvisa self.serialport = serial.Serial(addr,baudrate=115200,bytesize=serial.EIGHTBITS, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE,timeout=timeout) if ndacs!=8 and ndacs!=16: print('DAC WARNING, non-standard number of dacs. Should be 8 or 16 but %d was given' % ndacs) self.ndacs = ndacs self.verb = verb self.SetPolarity(polarity) if reset: self.RampAllZero(tt=20.) return self.lastmessage = () #Function to set polarity. This just informs the driver what polarities are in use so it can correctly set the voltages. #The driver cannot physically set the polarity. The real polarity of the DACs can only be set form the hardware switches.
def connect(self): if self.conn is None: self.conn = serial.Serial(port = self.port, baudrate = 19200, parity = serial.PARITY_ODD, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout = 5, xonxoff = 1) elif not self.is_connected(): self.conn.open()
def __init__(self, port): self.ser = serial.Serial( port=port, baudrate=baud_rate, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS ) self.ser.write(b'\xF6') res = self.ser.read(2) if res == b'e\xfb': click.secho('nejemojo is go', fg='green') else: click.secho('bad mojo: {}'.format(res), fg='red')
def __init__(self, usb_pathname = '/dev/ttyUSB0', _read_timeout = 3.0): # Open with Pyserial self.read_timeout = _read_timeout self.serial_port = serial.Serial( port = usb_pathname, baudrate = 2400, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout = self.read_timeout)