我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.Serial()。
def find_port(self): # Finds the serial port names if sys.platform.startswith('win'): ports = ['COM%s' % (i+1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): ports = glob.glob('/dev/ttyUSB*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.usbserial*') else: raise EnvironmentError('Error finding ports on your operating system') openbci_port = '' for port in ports: try: s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout) s.write(b'v') openbci_serial = self.openbci_id(s) s.close() if openbci_serial: openbci_port = port; except (OSError, serial.SerialException): pass if openbci_port == '': raise OSError('Cannot find OpenBCI port') else: return openbci_port
def ThreadWorker(contextz,port,baud): print("ThreadProcess Start") btnState = "btn_a_0" obj = serial.Serial(port,baud,timeout=0.1) buf = b'' while SerialListener.isActive: data = obj.readline() if data.__len__() > 0: buf += data if b'\n' in buf: tmp = buf.decode().strip(' \t\n\r') buf = b'' CmdActions.parse(tmp) obj.close() print("ThreadProcess End") SerialListener.isActive = False return #====================================================================================
def open(self): print("i am the", self._port_) #self._port_ = 'COM4' print(self._baud_) #print("the open") self._serial_port_ = serial.Serial(self._port_,int(self._baud_)) print("thread") #self._serial_port_.setRTS(self._RTS_) #self._serial_port_.setDTR(self._DTR_) self._received_thread_ = threading.Thread(target = self.__recv_func__,args=(self,)) #print("thread") self._is_running_ = True #print("thread1") self._received_thread_.setDaemon(True) #print("thread2") self._received_thread_.setName("SerialPortRecvThread") self._received_thread_.start()
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyACM0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE)
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10): """Initialize internal variables and serial connection :param port: The COM port to open. See the documentation for `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation of the possible value. The default value is '/dev/ttyUSB0'. :type port: str or int :param baudrate: 9600, 19200, 38400 where 9600 is the default :type baudrate: int """ # The serial connection should be setup with the following parameters: # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware # handshake. These are all default for Serial and therefore not input # below #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1) self.serial = serial.Serial(port = port, baudrate=baudrate, timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE) self.string = DataHandler()
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD): """Base constructor for ESPLoader bootloader interaction Don't call this constructor, either instantiate ESP8266ROM or ESP32ROM, or use ESPLoader.detect_chip(). This base class has all of the instance methods for bootloader functionality supported across various chips & stub loaders. Subclasses replace the functions they don't support with ones which throw NotImplementedInROMError(). """ if isinstance(port, serial.Serial): self._port = port else: self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._port.baudrate = baud
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, timeout = 3, xonxoff = 0, rtscts = 0): # do NOT use timeout = 0 !! self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts) javareactor.JConnection.__init__(self, self._serial.sPort, protocol, None) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.protocol.makeConnection(self) wb = javareactor.WriteBlocker(self, reactor.q) wb.start() self.writeBlocker = wb javareactor.ReadBlocker(self, reactor.q).start()
def write(self, b): """Write to port, controlling RTS before and after transmitting.""" if self._alternate_rs485_settings is not None: # apply level for TX and optional delay self.setRTS(self._alternate_rs485_settings.rts_level_for_tx) if self._alternate_rs485_settings.delay_before_tx is not None: time.sleep(self._alternate_rs485_settings.delay_before_tx) # write and wait for data to be written super(RS485, self).write(b) super(RS485, self).flush() # optional delay and apply level for RX if self._alternate_rs485_settings.delay_before_rx is not None: time.sleep(self._alternate_rs485_settings.delay_before_rx) self.setRTS(self._alternate_rs485_settings.rts_level_for_rx) else: super(RS485, self).write(b) # redirect where the property stores the settings so that underlying Serial # instance does not see them
def __init__(self, port='/dev/ttyAMA0', baudrate=115200, verbose=True, connected=None): self.verbose = verbose self.version = None self.connectedCV = threading.Condition() self.responseQueue = queue.Queue() self.port = serial.Serial(port, baudrate=baudrate) self._transactionLock = threading.Lock() self.tx = self.transmitThreadClass(self.port, verbose=self.verbose) self.rx = self.receiverThreadClass(self.port, callback=self._receive, verbose=self.verbose) self.rx.start() self.tx.start() if connected is None: self.connected = True self.connected = self._testForExistingConnection() else: self.connected = connected if self.verbose: if self.connected: print("Already connected to gimbal, version %s" % self.version) else: print("Waiting for gimbal to power on")
def __init__(self, dev, debug = 0): self.name = dev self.debug = debug if self.debug: print (self.name + " init") # Work-around for OXM-43: Intermittently Telit devices doesn't exist when system boot up attempt = 1 while (attempt <= xcModemSer.MAX_ATTEMPT): try: self.freeser = serial.Serial(self.name, 115200, timeout=0, rtscts=0) except IOError as e: LOG.error("%s %s - Work-around attempt %s" % (self.name, e, attempt)) cmd = ('xc_modem_gsm.sh -R 0; sleep 3; xc_modem_gsm.sh -R 1; xc_modem_gsm.sh -w "%s"' % dev) # LOG.debug("issuing: " + cmd) subprocess.call(cmd, shell=True) attempt += 1 else: break;
def find_finger(): ps = request.vars.ps import serial ser = serial.Serial('/dev/ttyACM0', 9600) ser.write('5') # ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X while True: line = ser.readline() print line if "FINGERFOUND" in line: id = line.split(",")[1] ser.close() r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id) obj = json.loads(r.text) if ps == obj['password']: return r.text else: return 'error'
def __init__(self): res = Xlib.display.Display().screen().root.get_geometry() self.w = res.width self.h = res.height self.p = Popen("/usr/bin/xte", stdin=PIPE) self.down = False self.state = 0 self.min_x = 63081 self.max_x = 65449 self.min_y = 63480 self.max_y = 65402 self.simulate = False if not self.simulate: port = "/dev/ttyS3" speed = 38400 self.serial = serial.Serial(port, speed, timeout=0)
def __init__(self, mode, port, baud=None, hostname=None, verbose=None): self.rtu = None self.mode = mode if self.mode == 'rtu' and baud and port: self.rtu = serial.Serial(port=port, baudrate=baud) Simulator.__init__(self, ModbusRtuServer(self.rtu)) # timeout is too fast for 19200 so increase a little bit self.server._serial.timeout *= 2 self.server._serial.interCharTimeout *= 2 LOGGER.info('Initializing modbus %s simulator: baud = %d port = %s parity = %s' % (self.mode, baud, port, self.rtu.parity)) LOGGER.info('stop bits = %d xonxoff = %d' % (self.rtu.stopbits, self.rtu.xonxoff)) elif self.mode == 'tcp' and hostname and port: Simulator.__init__(self, TcpServer(address=hostname, port=port)) LOGGER.info('Initializing modbus %s simulator: addr = %s port = %s' % (self.mode, hostname, port)) else: raise ModbusSimError('Unknown mode: %s' % (mode)) self.server.set_verbose(True)
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False): """Base constructor for ESPLoader bootloader interaction Don't call this constructor, either instantiate ESP8266ROM or ESP32ROM, or use ESPLoader.detect_chip(). This base class has all of the instance methods for bootloader functionality supported across various chips & stub loaders. Subclasses replace the functions they don't support with ones which throw NotImplementedInROMError(). """ if isinstance(port, serial.Serial): self._port = port else: self._port = serial.serial_for_url(port) self._slip_reader = slip_reader(self._port, self.trace) # setting baud rate in a separate step is a workaround for # CH341 driver on some Linux versions (this opens at 9600 then # sets), shouldn't matter for other platforms/drivers. See # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 self._set_port_baudrate(baud) self._trace_enabled = trace_enabled
def openserialport(): ''' Attempt to open a serial connection using the baud rate defined in the BAUD global variable and continue retrying until successful. On successful connection, send a message out the port, return the connection to the calling function, and register with atexit to close the serial port when the program exits ''' while True: try: ser = Serial(port=getportname(), baudrate=BAUD, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=0, rtscts=0) atexit.register(closeserialport, connection=ser) logger.info('-'*30 + ' Serial Port opened ' + '-'*30) logger.info('Serial port opened successfully.\n\tPort Configuration: {}'.format(ser)) return ser except Exception as e: logger.critical('Exception opening serial port. Retrying...\n\tException Message: {}'.format(e)) time.sleep(10) return ser
def openserialport(): ''' Attempt to open a serial connection using the baud rate defined in the BAUD global variable and continue retrying until successful. On successful connection, send a message out the port, return the connection to the calling function, and register with atexit to close the serial port when the program exits ''' while True: try: ser = Serial(port=getportname(), baudrate=BAUD, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=0, rtscts=0) atexit.register(closeserialport, connection=ser) logger.info('-'*30 + ' Serial Port opened ' + '-'*30) logger.info('Serial port opened successfully. Port Configuration: {}'.format(ser)) sendmessage(ser, 'Startup - Serial Port {} Opened\n'.format(ser.port).encode()) return ser except Exception as e: logger.critical('Exception opening serial port. Retrying...\n\tException Message: {}'.format(e)) time.sleep(10) return ser
def __init__(self,port=None,logFileName=False): print("\n### TENMA MULTIMETER INTERFACE ###") devList=self.device_list() self.port=port self.logFileName=logFileName if self.logFileName: self.log_init() if port is None: self.port=self.guessPort if not self.port in devList: print("!!! %s isn't in the list of serial ports !!!"%self.port) print("Preparing to use",self.port) self.ser = serial.Serial() self.ser.port = self.port self.ser.baudrate = 19230 self.ser.bytesize = serial.SEVENBITS self.ser.parity = serial.PARITY_ODD self.ser.stopbits = serial.STOPBITS_ONE self.ser.timeout = 3 self.ser.xonxoff = False self.ser.rtscts = False self.ser.dsrdtr = False self.ser.writeTimeout = 3
def __init__(self, port="lscom-w", sensors=["W", None, "K"], dataform=[(int, float), (float, float, float), (int, float)], seed=42): threading.Thread.__init__(self) self.dataform = {s: d for s, d in zip(sensors, dataform)} self.sensors = sensors from os import name if name == 'nt': # pragma: no cover self.port = port else: self.port = "/dev/tty.{}".format(port) from serial import Serial self.serial = Serial(self.port, 9600, dsrdtr=True, rtscts=True) self.seed = seed self.alive = threading.Event() self.alive.set()
def __init__(self, shotmgr): # initialize shotmanager object self.shotmgr = shotmgr self.vehicle = self.shotmgr.vehicle self.sPort = None self.registerCallbacks() if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) self.dataFile = time.strftime("%Y%m%d-%H%M%S") + ".log" temp_list = glob.glob ('/dev/tty[A-Za-z]*') for t in temp_list: if "ttyACM" in t: self.sPort = serial.Serial(t, 19200, timeout=1)
def camera_feedback_callback(self, vehicle, name, msg): self.logFile = open(LOG_DIR + self.dataFile, "a") if self.sPort == None: print "No Serial Relay installed" else: cSerial(0.25, self.sPort) # p = Process(target=cSerial, args=(0.2, self.sPort)) # p.start() self.logFile.write(str(msg.time_usec) + "," + str(float(msg.lat) / 10000000.0) + "," + str(float(msg.lng) / 10000000.0) + "," + str(msg.alt_rel) + "\n") print msg.time_usec, msg.lat * 10000000, msg.lng * 10000000, msg.alt_rel self.logFile.flush() self.logFile.close() #Have Camera Take Picture on Mode Change #This helps in testing and also ensures camera does not fall asleep before timeing out #Also creates a picture at a known event in the log file incase camera clock is off
def setDeviceID(dev, id): if id < 0 or id > 255: raise ValueError("ID must be an unsigned byte!") try: com = serial.Serial(dev, timeout=5) packet = DriverSerial._generateHeader(CMDTYPE.SETID, 1) packet.append(id) com.write(packet) resp = com.read(1) if len(resp) == 0: DriverSerial._comError() else: if ord(resp) != RETURN_CODES.SUCCESS: DriverSerial._printError(ord(resp)) except serial.SerialException: log.error("Problem connecting to serial device.") raise IOError("Problem connecting to serial device.")
def read(self): """ Read complete DSMR telegram's from the serial interface and parse it into CosemObject's and MbusObject's :rtype: generator """ with serial.Serial(**self.serial_settings) as serial_handle: while True: data = serial_handle.readline() self.telegram_buffer.append(data.decode('ascii')) for telegram in self.telegram_buffer.get_all(): try: yield self.telegram_parser.parse(telegram) except InvalidChecksumError as e: logger.warning(str(e)) except ParseError as e: logger.error('Failed to parse telegram: %s', e)
def alternatingApply(self): try: if self.alternatingList.count() != 2: self.error("Alternating must have two colors") else: with serial.Serial(self.portTxt.text(), 256000) as ser: if self.getChannel() == None: hue.power(ser, 0, "off") else: speed = self.alternatingSpeed.value() size = self.alternatingSize.value() direction = 1 if self.alternatingBackwards.isChecked() else 0 moving = self.alternatingMoving.isChecked() hue.alternating(ser, 0, self.getChannel(), self.getColors(self.alternatingList), speed, size, moving, direction) except serial.serialutil.SerialException: self.error("Serial port is invalid. Try /dev/ttyACM0 for Linux or COM3 or COM4 for Windows") ## candle
def connect(self): try: print "Connecting to Arduino on port", self.port, "..." self.port = Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout) # The next line is necessary to give the firmware time to wake up. time.sleep(1) test = self.get_baud() if test != self.baudrate: time.sleep(1) test = self.get_baud() if test != self.baudrate: raise SerialException print "Connected at", self.baudrate print "Arduino is ready." except SerialException: print "Serial Exception:" print sys.exc_info() print "Traceback follows:" traceback.print_exc(file=sys.stdout) print "Cannot connect to Arduino!" os._exit(1)
def openSerialPort(self, device_name=None): if device_name is None: device_name = self.device_name try: self.serial_port = serial.Serial(device_name, baudrate=self.baud_rate, bytesize=8, parity='N', stopbits=1, timeout=self.timeout, write_timeout=1) except Exception as e: return False if not self.serial_port.isOpen(): return False return True
def calsource_init(self,source=None): ''' setup communication to the Low Frequency source ''' if source==None: print('Please enter the calibration source: HF or LF') return None if source.upper()=='LF': dev='/dev/calsource-LF' which_freq='Low' else: dev='/dev/calsource-HF' which_freq='High' if not os.path.exists(dev): print('ERROR! could not connect to the %s Frequency Calibration Source.' % which_freq) return None connection=serial.Serial(dev) return connection
def __init__(self, clientID, bootstrapURI, port, serialPort, debug=False, verbose=False): super(ConstrainedClientSerial, self).__init__(clientID, bootstrapURI, port, debug, verbose) print("Creating serial") serialConnection = serial.Serial( port=serialPort, baudrate=57600, # 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, # STOPBITS_TWO bytesize=serial.EIGHTBITS ) #self._clientSession = fdpexpect.fdspawn(self._serial, logfile=open("constrained_client_serial.log", "w")) self._clientSession = SerialSession(serialConnection) self._sendline("") # FIXME - This should be a reboot? self.expect("Contiki>") self.init()
def __init__(self, device): self.ser = None self.ser = serial.Serial(device, baudrate=921600, # 1152000, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, timeout=0.1, xonxoff=False, rtscts=False, dsrdtr=False) self._connect = False self._lock() self._speed_up() self.reset_input_buffer = self.ser.reset_input_buffer self.reset_output_buffer = self.ser.reset_output_buffer self.write = self.ser.write self.read = self.ser.read self.flush = self.ser.flush
def __init__(self, port, baudrate=31250, timeout=None): """ ``port`` must be string, representing the path used for connecting to the machine's serial interface Example: on Raspberry3, the path to serial port is '/dev/serial0' ``baudrate`` is an int set up at 31250 by default and should not be changed. This is the standard baudrate, used by all MIDI devices. ``timeout`` default=None. If you *don't want* the MidiConnector.read() method to block for ever if it receives nothing, use this keyword argument to set up a maximum duration of blocking. The timeout is only used for reading, not writing. """ if timeout and not (isinstance(timeout, float) or isinstance(timeout, int)): raise TypeError('Specified timeout must be float or integer ({} given)'.format(type(timeout))) self.timeout = timeout self.baudrate = baudrate self.port = port self.connector = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout)
def __init__(self, port, key=None, shift_baud_rate=False): """Initialization function. Follows RAII, so creating the object opens the port.""" super(ET312SerialSync, self).__init__(key) # Allow derived classes to set up a port to mock serial ports for # tests. There are cleaner ways to mock this, but this will do for now. if not hasattr(self, "port"): # Check argument validity import serial if not port or type(port) is not str: raise ButtshockIOError("Serial port name is missing or is not string!") self.port = serial.Serial(port, 19200, timeout=1, parity=serial.PARITY_NONE, bytesize=8, stopbits=1, xonxoff=0, rtscts=0) self.needs_bytes = False # Test for python 3 if not isinstance(bytes([0]), str): self.needs_bytes = True self.shift_baud_rate = shift_baud_rate
def __init__(self): self.ser = serial.Serial('/dev/ttyACM0', 9600) #geometrical calibration self.rs = [50, 50, 50] self.ls = [95, 130, 95] self.pot_rad_per_unit = 1./3000.*math.pi angles = [2./3.*math.pi, 0., -2./3.*math.pi] #placements of the 3 joysticks self.placements = [] #attach point on the ball self.attach_ps = [] for r,l,a in zip(self.rs, self.ls, angles): p_init = pose.exp(col([0, 0, 0, 0, 0, -(r+l)])) p_rot = pose.exp(col([0, a, 0, 0, 0, 0])) placement = p_rot * p_init self.placements.append(placement) attach_p = placement * col([0, 0, l]) self.attach_ps.append(attach_p) #last calculated pose in logarithmic coordinates self.last_x = col([0, 0, 0, 0, 0, 0]) #definition of the numerical solver f = lambda x: self.getValuesFromPose(pose.exp(x)) self.solver = solver.Solver(f)
def __init__(self): self.tty = '/dev/ttyUSB0' self.serial = serial.Serial(self.tty, timeout=0.01) self.count = 0 self.listo = False self.reset()
def __init__(self): self.tty = '/dev/ttyUSB0' self.serial = serial.Serial(self.tty, timeout=0.01)
def wait_on_serial(): p = serial.Serial(SERIAL_PORT, 115200, timeout=3) while True: new_data.append(p.readline().rstrip('\n\r'))
def connect(self, port): global serial try: serial = serial.Serial(port, 9600) except SerialException: print("port already open") return serial # Writes to the Arduino
def __init__(self, tty_port): self.arduino = Arduino() self.port = self.arduino.getPort() self.serial = self.arduino.connect(self.port) self.ser = serial.Serial(port=tty_port, baudrate=9600, timeout=3, dsrdtr=1) self.buffer = [] self.listeners = [] # Recieves Packet
def __init__(self, device="/dev/tty.usbmodem1411", accelerometer=False, magnetometer=False, gyroscope=False, euler=False, quaternion=False, heading=False ): """ Opens a connection to the VMU931 device :param device: Serial device name (on Windows) or path (nix, including OS X). :param accelerometer: Enable/disable accelerometer data streaming. :param magnetometer: Enable/disable magnetometer data streaming. :param gyroscope: Enable/disable gyroscope data streaming. :param euler: Enable/disable euler angle data streaming. :param quaternion: Enable/disable quaternion data streaming. :param heading: Enable/disable compass heading data streaming. """ self.ser = serial.Serial(device) self.device_status = None self.parse() self.set_accelerometer(accelerometer) self.set_magnetometer(magnetometer) self.set_gyroscope(gyroscope) self.set_euler(euler) self.set_quaternion(quaternion) self.set_heading(heading)
def __init__(self, port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5): self.port = port self.baudrate = baudrate self.parity = parity self.stopbits = stopbits self.bytesize = bytesize self.timeout = timeout self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, parity=self.parity, stopbits=self.stopbits, bytesize=self.bytesize, timeout=self.timeout ) atexit.register(self.close) for msg in """24 45 49 47 50 51 2c 44 54 4d 2a 33 42 0d 0a b5 62 06 01 03 00 f0 0a 00 04 23 24 45 49 47 50 51 2c 47 42 53 2a 33 30 0d 0a b5 62 06 01 03 00 f0 09 00 03 21 24 45 49 47 50 51 2c 47 4c 4c 2a 32 31 0d 0a b5 62 06 01 03 00 f0 01 00 fb 11 24 45 49 47 50 51 2c 47 52 53 2a 32 30 0d 0a b5 62 06 01 03 00 f0 06 00 00 1b 24 45 49 47 50 51 2c 47 53 41 2a 33 33 0d 0a b5 62 06 01 03 00 f0 02 00 fc 13 24 45 49 47 50 51 2c 47 53 54 2a 32 36 0d 0a b5 62 06 01 03 00 f0 07 00 01 1d 24 45 49 47 50 51 2c 47 53 56 2a 32 34 0d 0a b5 62 06 01 03 00 f0 03 00 fd 15 24 45 49 47 50 51 2c 56 54 47 2a 32 33 0d 0a b5 62 06 01 03 00 f0 05 00 ff 19 24 45 49 47 50 51 2c 5a 44 41 2a 33 39 0d 0a b5 62 06 01 03 00 f0 08 00 02 1f 24 45 49 47 50 51 2c 47 47 41 2a 32 37 0d 0a b5 62 06 01 03 00 f0 00 01 fb 10 24 45 49 47 50 51 2c 52 4d 43 2a 33 41 0d 0a b5 62 06 01 03 00 f0 04 01 ff 18 B5 62 06 08 06 00 64 00 01 00 01 00 7A 12 B5 62 06 08 00 00 0E 30 """.splitlines(): self.ser.write(bytes(map(lambda x: int(x, 16), msg.strip().split(' ')))) time.sleep(0.1) pass
def __init__(self, port, baud_rate, byte_size=serial.EIGHTBITS, parity=serial.PARITY_NONE, stop_bit=serial.STOPBITS_ONE): # initialize the connection and if anything ok open the port self.__main_conn = serial.Serial(port, baud_rate, bytesize=byte_size, parity=parity, stopbits=stop_bit) # Queue self.dataQueue = queue.Queue() self.checkStatusQueue = queue.Queue() # Define Variables self.out = '' self.__flag_out = True
def send_command(self, command): """Send a command and check if it is positively acknowledged :param command: The command to send :type command: str :raises IOError: if the negative acknowledged or a unknown response is returned <STX>[FN][TEXT]<ETB>[C1][C2]<CR><LF> """ self.serial.write(self._astm_string(string=command)) response = self.serial.readline() print response if response == NAK: message = 'Serial communication returned negative acknowledge' message = response.encode('hex') return IOError(message) elif response != ACK: message = response.encode('hex') return IOError(message)
def __init__(self, port=None): """Opens a connection to a Removinator controller on the specified serial port. If a port is not specified, an attempt will be made to auto-discover the Removinator controller. :param port: port that the Removinator is connected to :type port: str :raises: :exc:`ConnectError` :ivar last_result: The result output from the last executed command :ivar last_response: The response output from the last executed command :ivar port: The port that the Removinator is connected to """ self.port = port self.last_result = '' self.last_response = '' # Attempt to discover the Removinator controller if a # port was not specified. if port is None: port = _discover_removinator() # Open a connection to the Removinator controller. try: self.connection = serial.Serial( port, 9600, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE, 1) self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection, self.connection)) except serial.SerialException as e: raise ConnectError('Unable to open connection to Removinator ' 'controller on port {0}'.format(port))
def open(self): logging.debug("Opening serial port %s (read_timeout=%s)" % (self.serialport, self.read_timeout)) self.stream = serial.Serial(port=self.serialport, timeout=self.read_timeout) assert self.stream is not None
def __init__(self, inport): self.serial = serial.Serial(port=inport, baudrate=9600)
def __init__(self, port, baud, delay): self.port = port self.baud = baud self.serial = None self.delay = delay try: self.serial = serial.Serial(port, baud) except serial.SerialException as e: raise TransportError(e.strerror) self.serial.timeout = 3 self.serial.interCharTimeout = 3
def log(options): # configure the serial connections (the parameters differs on the device you are connecting to) ser = serial.Serial( port=options.device, baudrate=9600, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS ) prog = Progress() with open(options.outfile, 'w') as the_file: the_file.write('timestamp;value\n') while True: # \r\n is for device terminators set to CR LF ser.write((':FETCh?\r\n')) # wait one second before reading output. time.sleep(options.interval) out = '' while ser.inWaiting() > 0: out += ser.read(1) if out != '': out = out.rstrip() res = "%s;%s\n" % (time.time(), float(out)) the_file.write(res) the_file.flush() prog.show()
def Start(self): self._Serial = serial.Serial(port = self._Port, baudrate = self._Baudrate, timeout = 1) self._KeepRunning = True self._ReceiverThread = threading.Thread(target=self._Listen) self._ReceiverThread.setDaemon(True) self._ReceiverThread.start()
def __init__(self, port=9): #port 9 is for the USB dongle. self.port = port self.ser =serial.Serial(self.port, baudrate=9600, stopbits=serial.STOPBITS_ONE,timeout=1) time.sleep(2)
def __init__(self, path): self.device = serial.Serial(*path) self.device.timeout=0 #nonblocking fcntl.ioctl(self.device.fileno(), TIOCEXCL) self.in_buffer = '' self.in_lines = []