我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用time.ticks_diff()。
def templog(sleep=True): """Log voltage and temperature to MQTT.""" start = time.ticks_ms() # get sensor values values = read_voltage() values.update(read_temps()) print(values) # send values over MQTT if connected if wait_connect(): if not mqtt_send(values): machine.reset() else: # failed to connect, reboot machine.reset() if sleep: delta = time.ticks_diff(start, time.ticks_ms()) deepsleep(delta)
def __init__(self,pno=4,bsz=1024,dbgport=None): self.BUFSZ=int(pow(2, math.ceil(math.log(bsz)/math.log(2)))) self.MASK=self.BUFSZ-1 self.pin=pno self.ir=Pin(pno,Pin.IN) self.buf=array.array('i',0 for i in range(self.BUFSZ)) self.ledge=0 self.wptr=0 self.rptr=0 self.dbgport=dbgport if dbgport is None: self.dbgport=DummyPort() self.dbgport.low() # cache ticks functions for native call self.ticks_diff = time.ticks_diff self.ticks_us = time.ticks_us self.start()
def timed_function(f, *args, **kwargs): import time myname = str(f).split(' ')[1] def new_func(*args, **kwargs): t = time.ticks_us() result = f(*args, **kwargs) delta = time.ticks_diff(t, time.ticks_us()) log.debug('GC: {} Function {} Time = {:6.3f}ms'.format(str(gc.mem_free()), myname, delta/1000)) return result return new_func # @timed_function
def _read_timeout(cnt, timeout_ms=2000): time_support = "ticks_ms" in dir(time) s_time = time.ticks_ms() if time_support else 0 data = sys.stdin.read(cnt) if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms): return None return data
def sleep_from_until (start, delay): while time.ticks_diff(start, time.ticks_ms()) < delay: idle_func() return start + delay
def sleep_from_until (start, delay): ## while time.ticks_diff(start, time.ticks_ms()) < delay: while time.ticks_diff(time.ticks_ms(),start) < delay: machine.idle() return start + delay
def _update(self): if self.turn_start is not None: # turn in process current=time.ticks_ms() if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms: if len(self.angle_list) > 0: self._trigger_next_turn() else: self._release()
def _publish_status(device_list=None, ignore_time=False): global _client, _last_publish if device_list is None: device_list = _devlist.values() current_time = time.ticks_us() if ignore_time or \ time.ticks_diff(current_time, _last_publish) >= MIN_PUBLISH_TIME_US: _last_publish = current_time try: for d in device_list: v = d.value() if v is not None: rt = (_topic + "/" + d.name).encode() for s in d.getters: if s == "": t = rt else: t = rt + "/" + s.encode() my_value = d.getters[s]() print('Publishing', t, my_value) if type(my_value) is bytes or type( my_value) is bytearray: _client.publish(t, my_value) else: _client.publish(t, str(my_value).encode()) except Exception as e: print('Trouble publishing, re-init network.') print(e) _init_mqtt() # ======= Setup and execution
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10): # updates: send out a status every this amount of seconds. # If 0, never send time based status updates only when change happened. # sleepms: going o sleep for how long between each loop run # poll_rate_network: how often to evaluate incoming network commands # poll_rate_inputs: how often to evaluate inputs _init_mqtt() if updates == 0: overflow = 1000 else: overflow = updates * 1000 overflow *= poll_rate_network * poll_rate_inputs / sleepms overflow = int(overflow) # print("Overflow:",overflow) counter = 0 while True: if counter % poll_rate_network == 0: _poll_subscripton() if counter % poll_rate_inputs == 0: device_list = [] for d in _devlist.values(): if d.update(): device_list.append(d) if len(device_list) > 0: _publish_status(device_list) if updates != 0 and counter % (updates * 1000) == 0: print("Publishing full update.") _publish_status(ignore_time=True) # execute things on timestack now = time.ticks_ms() for id in list(_timestack): t, cb = _timestack[id] if time.ticks_diff(now, t) >= 0: del (_timestack[id]) cb(id) time.sleep_ms(sleepms) counter += 1 if counter >= overflow: counter = 0
def _update(self): t = time.ticks_ms() #if self.suspend_start is not None \ # and time.ticks_diff(t,self.suspend_start) <= self.suspend_time: # print("Bus access suspended.") if self.suspend_start is None \ or time.ticks_diff(t,self.suspend_start) > self.suspend_time: self.suspend_start = None # done waiting try: if self.msgq is not None: self.pin.writeto(self.addr, self.msgq) self.msgq = None s = self.pin.readfrom(self.addr,self.BUFFER_SIZE) except: print("Trouble accessing i2c.") else: if s[0]==255 and s[1]==255: # illegal read print("Got garbage, waiting 1s before accessing bus again.") print("data:", s) self.suspend_time=1000 # take a break self.suspend_start = time.ticks_ms(); else: count = s[0]*256+s[1] if self.count != count: l = s[2]; self.suspend_time = s[3]; # scale timer if self.suspend_time & 128: self.suspend_time = (self.suspend_time & 127) * 100 if self.suspend_time>5000: # should not happen print("Garbage time -> waiting 1s before accessing bus again.") print("data:",s) self.suspend_time=1000 if self.suspend_time > 0: self.suspend_start = time.ticks_ms(); print("Bus suspend for",self.suspend_time,"ms requested.") if l <= self.BUFFER_SIZE: # discard if too big self.count = count self.current_value = s[4:4+l]
def _update(self): if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL: value = self._measure() if abs(value - self._distance) >= self.precision: self._distance = value self._last_measured = ticks_ms()
def flush(self): t = time.ticks_ms() if self.out_fill == 0: # reset time, if there is nothing to send self.out_last_sent = t # debug dp.println("rt {}".format(t)) elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL: # debug dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent), # t,self.out_last_sent)) self.acquire_out_buffer() self._send() self.release_out_buffer()
def ticks_diff(a, b): return a - b
def receive(self, request=0, timeoutms=None): # receive into network buffer, # fill buffer once and decrypt # if request>0 wait blocking for request number of bytes (if timeout # given interrupt after timeoutms ms) # timeout==None: block # timeout==0: return immediately after trying to read something from # network buffer # timeout>0: try for time specified to read something before returning # this function always returns a pointer to the buffer and # number of bytes read (could be 0) data = self.netbuf_in data_mv = self.netbuf_in_mv readbytes = 0 start_t = ticks_ms() while readbytes < self.netbuf_size: try: if self.sock_read(data_mv[readbytes:readbytes+1]): readbytes += 1 # result was not 0 or none else: if readbytes >= request: break # break if not blocking to request size except OSError as e: if len(e.args) > 0 \ and (e.args[0] == errno.EAGAIN or e.args[0] == errno.ETIMEDOUT): if readbytes >= request: break # break if not blocking to request size else: raise if timeoutms is not None \ and ticks_diff(ticks_ms(), start_t) >= timeoutms: break sleep_ms(1) # prevent busy waiting if readbytes>0: self.crypt_in.decrypt(data,length=readbytes) return data,readbytes
def isr(self, pin): # debounce if time.ticks_diff(time.ticks_ms(), self.last_isr) < 10: return print('! reset gyro request') self.flag_reset_gyro = True self.last_isr = time.ticks_ms()
def serve(self): print('starting mpu server on port {}'.format(self.port)) lastgc = lastsent = lastread = time.ticks_ms() while True: now = time.ticks_ms() write_dt = time.ticks_diff(now, lastsent) read_dt = time.ticks_diff(now, lastread) gc_dt = time.ticks_diff(now, lastgc) time.sleep_ms(max(0, 1-read_dt)) if self.flag_reset_gyro: self.mpu.filter.reset_gyro() self.flag_reset_gyro = False values = self.mpu.read_position() lastread = now if write_dt >= self.write_interval: lastsent = time.ticks_ms() self.sock.sendto(tojson(values), ('192.168.4.2', 8000)) if gc_dt >= self.gc_interval: gc.collect()
def input(self, vals): now = time.ticks_ms() # unpack sensor readings accel_data = vals[0:3] gyro_data = vals[4:7] # convert accelerometer reading to degrees self.accel_pos = self.calculate_accel_pos(*accel_data) # if this is our first chunk of data, simply accept # the accelerometer reads and move on. if self.last == 0: self.filter_pos = self.gyro_pos = self.accel_pos self.last = now return # calculate the elapsed time (in seconds) since last data. # we need this because the gyroscope measures movement in # degrees/second. dt = time.ticks_diff(now, self.last)/1000 self.last = now # calculate change in position from gyroscope readings gyro_delta = [i * dt for i in gyro_data] self.gyro_pos = [i + j for i, j in zip(self.gyro_pos, gyro_delta)] # pitch self.filter_pos[0] = ( self.gyro_weight * (self.filter_pos[0] + gyro_delta[0]) + (1-self.gyro_weight) * self.accel_pos[0]) # roll self.filter_pos[1] = ( self.gyro_weight * (self.filter_pos[1] + gyro_delta[1]) + (1-self.gyro_weight) * self.accel_pos[1])
def waiting(self): acum = time.ticks_ms() delta = time.ticks_diff(time.ticks_ms(),acum) while delta < 5000: delta = time.ticks_diff(time.ticks_ms(), acum) time.sleep_ms(5) self.DO_SLEEP = True print("Ordem para dormir")
def timeseq_isr(self,p): '''compute edge to edge transition time, ignore polarity''' e=self.ticks_us() self.dbgport.high() s=2*p.value()-1 d=self.ticks_diff(e,self.ledge) self.ledge=e self.buf[self.wptr]=d*s self.wptr=(self.wptr+1)&self.MASK self.dbgport.low()
def __call__(self,pin): self.stop=time.ticks_us() self.flight=time.ticks_diff(self.stop,self.start)