def __init__(self,port=None,logFileName=False): print("\n### TENMA MULTIMETER INTERFACE ###") devList=self.device_list() self.port=port self.logFileName=logFileName if self.logFileName: self.log_init() if port is None: self.port=self.guessPort if not self.port in devList: print("!!! %s isn't in the list of serial ports !!!"%self.port) print("Preparing to use",self.port) self.ser = serial.Serial() self.ser.port = self.port self.ser.baudrate = 19230 self.ser.bytesize = serial.SEVENBITS self.ser.parity = serial.PARITY_ODD self.ser.stopbits = serial.STOPBITS_ONE self.ser.timeout = 3 self.ser.xonxoff = False self.ser.rtscts = False self.ser.dsrdtr = False self.ser.writeTimeout = 3
def test_init_sets_the_correct_attrs(self): """__init__ sets the fields that get_settings reads""" for setting, value in ( ('baudrate', 57600), ('timeout', 7), ('write_timeout', 12), ('inter_byte_timeout', 15), ('stopbits', serial.STOPBITS_TWO), ('bytesize', serial.SEVENBITS), ('parity', serial.PARITY_ODD), ('xonxoff', True), ('rtscts', True), ('dsrdtr', True)): kwargs = {'do_not_open': True, setting: value} ser = serial.serial_for_url(PORT, **kwargs) d = ser.get_settings() self.assertEqual(getattr(ser, setting), value) self.assertEqual(d[setting], value)
def log(options): # configure the serial connections (the parameters differs on the device you are connecting to) ser = serial.Serial( port=options.device, baudrate=9600, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS ) prog = Progress() with open(options.outfile, 'w') as the_file: the_file.write('timestamp;value\n') while True: # \r\n is for device terminators set to CR LF ser.write((':FETCh?\r\n')) # wait one second before reading output. time.sleep(options.interval) out = '' while ser.inWaiting() > 0: out += ser.read(1) if out != '': out = out.rstrip() res = "%s;%s\n" % (time.time(), float(out)) the_file.write(res) the_file.flush() prog.show()
def __init__(self, serial_name): self.port = serial_name self.baudrate = 9600 # Default baud rate self.timeout = 0.1 # Default timeout, seconds self.parity = serial.PARITY_ODD # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 42.0 # Volts self.max_current = 6.0 # Amps self.max_power = 100.0 # Watts
def __init__(self, serial_name): self.port = serial_name self.baudrate = 57600 # Default baud rate self.timeout = 0.1 # Default timeout, seconds self.parity = serial.PARITY_ODD # Default parity self.stopbits = serial.STOPBITS_ONE # Default stop bits self.bytesize = serial.EIGHTBITS self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port self.max_voltage = 360.0 # Volts self.max_current = 10.0 # Amps self.max_power = 1000.0 # Watts
def __init__(self,addr='COM1',ndacs=8,polarity=('BIP','BIP'),verb=True,timeout = 2,reset=False): #Directly using pyserial interface and skipping pyvisa self.serialport = serial.Serial(addr,baudrate=115200,bytesize=serial.EIGHTBITS, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE,timeout=timeout) if ndacs!=8 and ndacs!=16: print('DAC WARNING, non-standard number of dacs. Should be 8 or 16 but %d was given' % ndacs) self.ndacs = ndacs self.verb = verb self.SetPolarity(polarity) if reset: self.RampAllZero(tt=20.) return self.lastmessage = () #Function to set polarity. This just informs the driver what polarities are in use so it can correctly set the voltages. #The driver cannot physically set the polarity. The real polarity of the DACs can only be set form the hardware switches.
def __init__(self, tty, timeout=2): self.conn = serial.Serial(tty, baudrate=19200, bytesize=serial.SEVENBITS, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE, timeout=timeout, xonxoff=1, rtscts=0, dsrdtr=0 ) # switch on power for hardware RS232 # transmitter via handshake-signals self.conn.dtr = True self.conn.rts = False
def connect(self): if self.conn is None: self.conn = serial.Serial(port = self.port, baudrate = 19200, parity = serial.PARITY_ODD, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout = 5, xonxoff = 1) elif not self.is_connected(): self.conn.open()
def __init__(self, port): self.ser = serial.Serial( port=port, baudrate=baud_rate, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS ) self.ser.write(b'\xF6') res = self.ser.read(2) if res == b'e\xfb': click.secho('nejemojo is go', fg='green') else: click.secho('bad mojo: {}'.format(res), fg='red')
def test_ParitySetting(self): for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD): self.s.parity = parity # test get method self.assertEqual(self.s.parity, parity) # test internals self.assertEqual(self.s._parity, parity) # test illegal values for illegal_value in (0, 57, 'a', None): self.assertRaises(ValueError, setattr, self.s, 'parity', illegal_value)
def __init__(self, port: str, baudrate: int, bytesize: int, stopbits: float, parity: str, timeout: float, xonxoff: bool, rtscts: bool): """Converts data from JSON format to serial.Serial format.""" self._port = port self._baudrate = baudrate self._bytesize = { 5: serial.FIVEBITS, 6: serial.SIXBITS, 7: serial.SEVENBITS, 8: serial.EIGHTBITS }[bytesize] self._stopbits = { 1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO }[stopbits] self._parity = { 'none': serial.PARITY_NONE, 'even': serial.PARITY_EVEN, 'odd': serial.PARITY_ODD, 'mark': serial.PARITY_MARK, 'space': serial.PARITY_SPACE }[parity] self._timeout = timeout self._xonxoff = xonxoff self._rtscts = rtscts
def test_ParitySetting(self): for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD): self.s.parity = parity # test get method self.failUnlessEqual(self.s.parity, parity) # test internals self.failUnlessEqual(self.s._parity, parity) # test illegal values for illegal_value in (0, 57, 'a', None): self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
def serial_hook(self, comm_instance, port, baudrate, connection_timeout, *args, **kwargs): if port == 'VIRTUAL': return None if port is None or port == 'AUTO': # no known port, try auto detection comm_instance._changeState(comm_instance.STATE_DETECT_SERIAL) serial_obj = comm_instance._detectPort(False) if serial_obj is None: comm_instance._log("Failed to autodetect serial port") comm_instance._errorValue = 'Failed to autodetect serial port.' comm_instance._changeState(comm_instance.STATE_ERROR) eventManager().fire(Events.ERROR, {"error": comm_instance.getErrorString()}) return None else: # connect to regular serial port comm_instance._log("Connecting to: %s" % port) if baudrate == 0: serial_obj = serial.Serial(str(port), 115200, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD) else: serial_obj = serial.Serial(str(port), baudrate, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD) serial_obj.close() serial_obj.parity = serial.PARITY_NONE serial_obj.open() self._logger.info("########################################") self._logger.info("Restarting Marlin") #Reset the controller serial_obj.setDTR(1) time.sleep(0.1) serial_obj.setDTR(0) time.sleep(0.2) self._logger.info("Marlin Reset") #Flush input and output self._logger.info("Flushing Input and Output!") serial_obj.flushOutput() serial_obj.flushInput() self._logger.info("Finished Flushing") #write something to the serial line to get rid of any bad characters in the buffer self._logger.info("Writing M105") first_write = "M105\n" serial_obj.write(first_write) self._logger.info("########################################") return serial_obj
def serial_hook(self, comm_instance, port, baudrate, connection_timeout, *args, **kwargs): if port is None or port == 'AUTO': # no known port, try auto detection comm_instance._changeState(comm_instance.STATE_DETECT_SERIAL) serial_obj = comm_instance._detectPort(False) if serial_obj is None: comm_instance._log("Failed to autodetect serial port") comm_instance._errorValue = 'Failed to autodetect serial port.' comm_instance._changeState(comm_instance.STATE_ERROR) eventManager().fire(Events.ERROR, {"error": comm_instance.getErrorString()}) return None else: # connect to regular serial port comm_instance._log("Connecting to: %s" % port) if baudrate == 0: serial_obj = serial.Serial(str(port), 115200, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD) else: serial_obj = serial.Serial(str(port), baudrate, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD) serial_obj.close() serial_obj.parity = serial.PARITY_NONE serial_obj.open() self._logger.info("########################################") self._logger.info("Restarting Marlin") #Reset the controller serial_obj.setDTR(1) time.sleep(0.1) serial_obj.setDTR(0) time.sleep(0.2) self._logger.info("Marlin Reset") #Flush input and output self._logger.info("Flushing Input and Output!") serial_obj.flushOutput() serial_obj.flushInput() self._logger.info("Finished Flushing") #write something to the serial line to get rid of any bad characters in the buffer self._logger.info("Writing M105") first_write = "M105\n" serial_obj.write(first_write) self._logger.info("########################################") return serial_obj
def openSerial(self): #Set up the relationship between what the user enters and what the API calls for bytedic = {'5':serial.FIVEBITS, '6':serial.SIXBITS, '7':serial.SEVENBITS, '8':serial.EIGHTBITS} bytesize = bytedic[str(self.root.variables['databits'])] paritydict = {'None':serial.PARITY_NONE, 'Even':serial.PARITY_EVEN, 'Odd' :serial.PARITY_ODD, 'Mark':serial.PARITY_MARK, 'Space':serial.PARITY_SPACE} parity=paritydict[self.root.variables['parity']] stopbitsdict = {'1':serial.STOPBITS_ONE, '2':serial.STOPBITS_TWO} stopbits = stopbitsdict[str(self.root.variables['stopbits'])] #Open the serial port given the settings, store under the root if os.name == 'nt': port = self.root.variables['COMport'][0:5].strip() self.root.ser = serial.Serial(\ port=port,\ baudrate=str(self.root.variables['baud']),\ bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5) else: first_space = self.root.variables['COMport'].index(' ') port = self.root.variables['COMport'][0:first_space].strip() # Parameters necessary due to https://github.com/pyserial/pyserial/issues/59 self.root.ser = serial.Serial(\ port=port,\ baudrate=str(self.root.variables['baud']),\ bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5, rtscts=True, dsrdtr=True) io.DEFAULT_BUFFER_SIZE = 5000 #Purge the buffer of any previous data if float(serial.VERSION[0:3]) < 3: #If we're executing with pySerial 2.x serial.Serial.flushInput(self.root.ser) serial.Serial.flushOutput(self.root.ser) else: #Otherwise we're using pySerial 3.x serial.Serial.reset_input_buffer(self.root.ser) serial.Serial.reset_output_buffer(self.root.ser)
def predict_result(): ser = serial.Serial( # ???????????? port='COM6', baudrate=1200, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS ) serial_data = [] plt.xlim(0, 100) plt.ylim(300, 700) plt.title('GSR') plt.ion() i = 0 j = 0 id = 0 while True: line = ser.readline() line = int(line) serial_data.append(line) if i > 100: plt.xlim(i - 100, i) plt.plot(serial_data) i += 1 j += 1 if j >= 50: clf = joblib.load('model\\happy_model.m') select = joblib.load('model\\vector_select.m') vector = getattr.get_vector(serial_data) new_vector = select.transform(vector) print(new_vector) result = clf.predict(new_vector) if result[0] == '2': clf = joblib.load('model\\sad_model.m') result = clf.predict(new_vector) j = 0 plt.plot([i, i], [300, 700], 'r--') if result[0] == '1': plt.annotate('happy', xy=(i, 600), xytext=(i - 10, 600), arrowprops=dict(facecolor='red', shrink=0.05)) res = 1 database.insert(id, res) elif result[0] == '2': plt.annotate('normal', xy=(i, 600), xytext=(i - 10, 600), arrowprops=dict(facecolor='blue', shrink=0.05)) res = 0 database.insert(id, res) else: plt.annotate('sad', xy=(i, 600), xytext=(i - 10, 600),arrowprops=dict(facecolor='black', shrink=0.05)) res = 2 database.insert(id, res) print(result) id += 1 plt.pause(0.001)