我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gi.repository.GObject.timeout_add()。
def update_queue_banners(self): if self.goglib_box_width != 1: new_downloads_pixbuf_width = (self.goglib_box_width - 60) / 5 else: new_downloads_pixbuf_width = (self.mylib_box_width - 60) / 5 downloads_pixbuf_scale_level = float(new_downloads_pixbuf_width)/518 new_downloads_pixbuf_height = 240 * downloads_pixbuf_scale_level if queue_game_image_list: for i in range(0, len(queue_game_image_list)): if os.path.exists(data_dir + '/images/goglib_banners/' + queue_game_image_list[i].get_name() + '.jpg'): pixbuf = GdkPixbuf.Pixbuf.new_from_file(data_dir + '/images/goglib_banners/' + queue_game_image_list[i].get_name() + '.jpg') else: pixbuf = GdkPixbuf.Pixbuf.new_from_file(data_dir + '/images/mylib_banners/' + queue_game_image_list[i].get_name() + '.jpg') pixbuf = pixbuf.scale_simple(new_downloads_pixbuf_width, new_downloads_pixbuf_height, InterpType.BILINEAR) queue_game_image_list[i].set_from_pixbuf(pixbuf) #~ def timer_check_for_new_games(self): #~ self.check_for_new_games() #~ GObject.timeout_add(30000, self.timer_check_for_new_games)
def timer(self): if len(self.additional_windows_list) != 0: for window in self.additional_windows_list: window_notebook = window.get_child() if window_notebook != None: window_notebook_n_pages = window_notebook.get_n_pages() for i in range(window_notebook_n_pages): page_name = window_notebook.get_nth_page(i).get_name() if page_name not in self.detached_tabs_names: self.detached_tabs_names.append(page_name) if (len(self.detached_tabs_names) + self.notebook.get_n_pages()) == 5: self.button_add_tab.set_visible(False) else: self.button_add_tab.set_visible(True) GObject.timeout_add(1000, self.timer)
def timerFunc(self): if self.imageIndex < len(self.filenames): if self.showFixation == False: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale('./P300Photos/' + self.filenames[self.imageIndex], width=700, height=700, preserve_aspect_ratio=False) self.image.set_from_pixbuf(pixbuf) self.imageIndex += 1 gobject.timeout_add(self.testImageDuration, self.timerFunc) self.timestamps.append(self.dataThread.samplesRead) else: self.image.set_from_pixbuf(self.fixation) gobject.timeout_add(self.fixationDuration, self.timerFunc) self.showFixation = ~self.showFixation else: self.image.set_from_pixbuf(self.fixation) #gather an extra ~2 seconds of data at the end for window completion reasons sleep(2) self.dataThread.stop() return False
def load_book_data(self, filename): """ Loads book to Viwer and moves to correct chapter and scroll position :param filename: """ self.spinner.start() self.viewer.hide() self.right_box.add(self.spinner) self.filename = filename if not filename.upper().endswith(tuple(constants.NATIVE)): convert_thread = threading.Thread(target=self.__bg_import_book, args=(filename,)) self.job_running = True convert_thread.start() GObject.timeout_add(100, self.__check_on_work) else: self.__continiue_book_loading(filename)
def start_app(datadir): global splash_win if not options.no_splash: solfege.splash_win = splash_win = SplashWin() time.sleep(0.1) Gdk.flush() while Gtk.events_pending(): Gtk.main_iteration() else: solfege.splash_win = splash_win = None style_provider = Gtk.CssProvider() with open("solfege.css", "r") as f: css = f.read() try: style_provider.load_from_data(css) except GObject.GError, e: print e pass Gtk.StyleContext.add_provider_for_screen( Gdk.Screen.get_default(), style_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION) GObject.timeout_add(1, start_gui, datadir) Gtk.main()
def __init__(self): # Applet icon global indicator indicator = appindicator.Indicator.new(APPINDICATOR_ID, self.exchange_app.dogecoin.icon, appindicator.IndicatorCategory.SYSTEM_SERVICES) indicator.set_status(appindicator.IndicatorStatus.ACTIVE) indicator.set_label('€ *.****', '100%') # Set the menu of the indicator (default: gtk.Menu()) indicator.set_menu(self.build_menu()) # Ctrl-C behaviour signal.signal(signal.SIGINT, signal.SIG_DFL) # Setup the refresh every 5 minutes gobject.timeout_add(1000*60*5, self.exchange_app.update_price, "timeout") # First price update within 1 second gobject.timeout_add(1000, self.exchange_app.first_update_price, "first_update") # Main loop gtk.main()
def __init__(self, title): Gtk.Window.__init__(self, title=title) self.set_border_width(10) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self.add(vbox) self.progressbar = Gtk.ProgressBar() self.progressbar.pulse() self.progressbar.set_show_text(True) vbox.pack_start(self.progressbar, True, True, 0) self.timeout_id = GObject.timeout_add(50, self.on_timeout, None) self.set_position(Gtk.WindowPosition.CENTER_ALWAYS) self.set_deletable(False) self.set_decorated(False) self.set_resizable(False) self.set_keep_above(True) self.fraction = 0.0 self.pulse = True
def arm (self, manager): self.manager = manager props = dict(obj=self.remote.path, name=self.remote.item.name, expected=self.when.isoformat( ), **self.remote.item.fields) self.props = props new_path = PATH + '/Scheduler/Armed/' + self.hashed delay_ms = (self.when - datetime.datetime.now( )).total_seconds( ) * 1000 self.remote.bus.add_signal_receiver(self.cleanup, "Remove", dbus_interface=Trigger.OWN_IFACE, bus_name=BUS, path=new_path) # manager.bus.add_signal_receiver(self.attrs, ack=self.on_success, error=self.on_error) trigger = None try: trigger = Trigger(new_path, manager, props, self) if trigger: trigger.Armed( ) self.manager.Trigger("Arming", trigger.path) self.trigger = trigger print "DELAYING", delay_ms gobject.timeout_add(delay_ms, trigger.Fire) manager.InterfacesAdded(trigger.path, { Trigger.OWN_IFACE: props }) self.manager.Trigger("Armed", trigger.path) except: print "already exited?" raise finally: pass return trigger
def __init__(self, win_id=None): """ Builds a new GStreamer pipeline. If `win_id` is specified, it is used as a window ID to embed the video sink using the GstOverlay interface. """ super().__init__() self._async_loop = asyncio.get_event_loop() self._async_response = [] if platform.system() == 'Darwin': evt = threading.Event() error = [None] GObject.timeout_add(1, self._build, win_id, evt, error) evt.wait() if error[0]: raise PlaybinError from error[0] else: self._build(win_id, None, None)
def _send_next_ssid(self): if not self.ssids: logger.debug("No SSIDs available.") return self.is_notifying next_ssid = '' for ssid in self.ssids: if ssid not in self._ssids_sent: next_ssid = ssid break if not next_ssid: # all SSIDs have been sent at least once, repeat: self._ssids_sent = [] GObject.timeout_add(self._wait_time, self._start_send_ssids) return False logger.debug("Sending next SSID: %s" % next_ssid) self.value_update(string_to_dbus_array(next_ssid)) self._ssid_last_sent = next_ssid self._ssids_sent.append(next_ssid) return self.is_notifying
def praise_update(self): GObject.timeout_add(2000, self.praise_update) if not self.in_mv: return None id = self.play_list_first_row_id() if None == id: return None data = self.req.praise_fetch(id) logging.debug(data) nums = data[id] if nums > 100: nums = 100 self.label_praise.set_label(str(nums)) self.lb_praise.set_value(nums) self.comment_fetch() # logging.debug(dir(self.lb_praise))
def test_availablepane(self): from softwarecenter.ui.gtk3.panes.availablepane import get_test_window win = get_test_window() pane = win.get_data("pane") self._p() pane.on_search_terms_changed(None, "the") self._p() sortmode = pane.app_view.sort_methods_combobox.get_active_text() self.assertEqual(sortmode, "By Relevance") model = pane.app_view.tree_view.get_model() len1 = len(model) pane.on_search_terms_changed(None, "nosuchsearchtermforsure") self._p() len2 = len(model) self.assertTrue(len2 < len1) GObject.timeout_add(TIMEOUT, lambda: win.destroy()) Gtk.main()
def test_custom_lists(self): from softwarecenter.ui.gtk3.panes.availablepane import get_test_window win = get_test_window() pane = win.get_data("pane") self._p() pane.on_search_terms_changed(None, "ark,artha,software-center") self._p() model = pane.app_view.tree_view.get_model() # custom list should return three items self.assertTrue(len(model) == 3) # check package names, ordering is default "by relevance" self.assertPkgInListAtIndex(0, model, "ark") self.assertPkgInListAtIndex(1, model, "software-center") self.assertPkgInListAtIndex(2, model, "artha") # check that the status bar offers to install the packages install_button = pane.action_bar.get_button(ActionButtons.INSTALL) self.assertNotEqual(install_button, None) GObject.timeout_add(TIMEOUT, lambda: win.destroy()) Gtk.main()
def run(self, args): # show window as early as possible self.window_main.show_all() # delay cache open GObject.timeout_add(1, self.cache.open) # support both "pkg1 pkg" and "pkg1,pkg2" (and pkg1,pkg2 pkg3) if args: for (i, arg) in enumerate(args[:]): if "," in arg: args.extend(arg.split(",")) del args[i] # FIXME: make this more predictable and less random # show args when the app is ready self.show_available_packages(args) atexit.register(self.save_state)
def on_banner_rendered(self, renderer): self.image = renderer.get_pixbuf() if self.image.get_width() == 1: # the offscreen window is not really as such content not # correctly rendered GObject.timeout_add(500, self.on_banner_rendered, renderer) return from gi.repository import Atk self.get_accessible().set_name( self.exhibits[self.cursor].title_translated) self.get_accessible().set_role(Atk.Role.PUSH_BUTTON) self._fade_in() self.queue_next() return False
def _fade_in(self, step=0.05): self.alpha = 0.0 def fade_step(): retval = True self.alpha += step if self.alpha >= 1.0: self.alpha = 1.0 self.old_image = None retval = False self.queue_draw() return retval GObject.timeout_add(50, fade_step)
def _on_size_allocate(self, widget, allocation): # FIXME: mvo: allocation.height is no longer accessable with gtk3 # THIS SEEMS TO BREAK THE ANIMATION #height = allocation.height height = widget.get_allocation().height if self._is_sliding_in: self._current_height = height GObject.timeout_add(self.ANIMATE_STEP_INTERVAL, self._slide_in_cb, priority=100) elif self._is_sliding_out: self._current_height = height GObject.timeout_add(self.ANIMATE_STEP_INTERVAL, self._slide_out_cb, priority=100) else: self.queue_draw()
def wait_for_apt_cache_ready(f): """ decorator that ensures that self.cache is ready using a gtk idle_add - needs a cache as argument """ def wrapper(*args, **kwargs): self = args[0] # check if the cache is ready and window = None if hasattr(self, "app_view"): window = self.app_view.get_window() if not self.cache.ready: if window: window.set_cursor(self.busy_cursor) GObject.timeout_add(500, lambda: wrapper(*args, **kwargs)) return False # cache ready now if window: window.set_cursor(None) f(*args, **kwargs) return False return wrapper
def interrupt_build_and_wait(f): """ decorator that ensures that a build of the categorised installed apps is interrupted before a new build commences. expects self._build_in_progress and self._halt_build as properties """ def wrapper(*args, **kwargs): self = args[0] if self._build_in_progress: LOG.debug('Waiting for build to exit...') self._halt_build = True GObject.timeout_add(200, lambda: wrapper(*args, **kwargs)) return False # ready now self._halt_build = False f(*args, **kwargs) return False return wrapper
def __init__(self): self.battery = battery.Battery() self.battery.new_params = None self.battery.register_callback(self.battery_update_callback) self.battery.update() self.indicator = appindicator.Indicator.new( APPINDICATOR_ID, self.get_icon(), CATEGORY) self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE) self.indicator.set_menu(self.build_menu()) self.window = None self.log_update_period = timedelta(minutes=5) self.log_last_update = datetime.now() - self.log_update_period self.battery_data = None self.update_battery() self.update_chart() sec = 1000 gobject.timeout_add(1*sec, self.update_battery) gobject.timeout_add(1*sec, self.update_log) gobject.timeout_add(30*sec, self.update_chart)
def __init__(self, tokens_filename, audio, log): self._log = log self._audio = audio self._tokens_filename = tokens_filename self._eventQueue = queue.Queue() persist_path = "/tmp" for directory in ("alerts", "alerts/all", "alerts/active"): d = os.path.join(persist_path, directory) if not os.path.exists(d): os.mkdir(d) # would prefer to use sqlite, but that complains about # our threads accessing the same connection - and dbm seems to not # store any changes. self.allAlerts = Shove("file:///tmp/alerts/all") self.activeAlerts = Shove("file:///tmp/alerts/active") #print(list(self.allAlerts.values())) self._last_user_activity = datetime.datetime.now() t = threading.Thread(target=self.eventQueueThread, daemon=True) t.start() GObject.timeout_add(500, self.alertCheck)
def Listen(self, milliseconds=None, dialogid=None): success, state_micpipeline, pending = self._mic_pipeline.get_state(Gst.CLOCK_TIME_NONE) if state_micpipeline == Gst.State.PLAYING: self._log.info("Was already listening") aplay(TIMEOUT_BEEP, _bg=True) return False self._log.info("Starting listening, timeout milliseconds=%s, dialog=%s", milliseconds, dialogid) self.audio_pipeline.set_state(Gst.State.READY) self.audio_player.set_state(Gst.State.READY) self._waitForNothingPlaying() if milliseconds: self._listening_timeout_timer = GObject.timeout_add(milliseconds, self.cancelListen) aplay(LISTENING_BEEP, _bg=True) self._mic_pipeline.dialogid = dialogid self.speaking_started = False self.silence_count = 0 self._mic_pipeline.set_state(Gst.State.PLAYING)
def check_gogcom_tab(self): current_uri = self.webpage.get_uri() if (current_uri != None) and not (current_uri.startswith('https://www.gog.com')): self.webpage.go_back() self.webbrowser.open_new(current_uri) GObject.timeout_add(1000, self.check_gogcom_tab)
def update_mylib_interface(self): if (len(mylib_installation_queue) > 0) and (mylib_installation_queue[0] != self.mylib_now_installing): self.mylib_now_installing = mylib_installation_queue[0] self.mylib_install_game(mylib_installation_queue[0]) for button in mylib_setup_buttons_list: if button.get_name() == self.mylib_now_installing: button.set_label(_("Installing")) if self.mylib_filters_box.get_allocation().height != 1: self.scrolledwindow_mylib_filters.set_property('height_request', \ self.mylib_filters_box.get_allocation().height + 20) if (self.box_mylib_page.get_allocation().width != self.mylib_box_width): self.mylib_box_width = self.box_mylib_page.get_allocation().width self.update_mylib_grid() self.update_queue_banners() if (len(mylib_installation_queue) == 0): self.mylib_installation_status_box.set_visible(False) else: self.mylib_installation_status_box.set_visible(True) GObject.timeout_add(1000, self.update_mylib_interface)
def update_goglib_interface(self): if (len(self.goglib_new_games_list) != 0) and self.main_window.get_visible(): self.update_goglib() if (len(goglib_installation_queue) > 0) and (goglib_installation_queue[0] != self.goglib_now_installing): self.goglib_now_installing = goglib_installation_queue[0] self.goglib_download_game(goglib_installation_queue[0]) for button in goglib_setup_buttons_list: if button.get_name() == self.goglib_now_installing: button.set_label(_("Installing")) if self.box_goglib_filters.get_allocation().height != 1: self.scrolledwindow_filters.set_property('height_request', \ self.box_goglib_filters.get_allocation().height + 20) if (self.box_goglib_page.get_allocation().width != self.goglib_box_width): self.goglib_box_width = self.box_goglib_page.get_allocation().width self.update_goglib_grid() self.update_queue_banners() if (len(goglib_installation_queue) == 0): self.box_goglib_installation_status.set_visible(False) else: self.box_goglib_installation_status.set_visible(True) GObject.timeout_add(1000, self.update_goglib_interface)
def _set_repeat(self, delay=None, interval=None): if delay is not None and self.__repeat[0] is None: self.__tick_id = GObject.timeout_add(10, self._tick_cb) elif delay is None and self.__repeat[0] is not None: GObject.source_remove(self.__tick_id) self.__repeat = (delay, interval)
def activate_polling_of_property_on_element(self, element_name="whatever", property="property", interval_ms=1000): GObject.timeout_add(interval_ms, self.poll_property, element_name, property) self.do_poll = True
def go_clicked(self, widget): # update time control prior to move gv.tc.update_gui_time_control(self.stm) # side to move self.stm = self.get_side_to_move() # start a timer to display the time left while the player is thinking gv.tc.start_clock(self.stm) if not self.timer_active: GObject.timeout_add(1000, gv.tc.show_time) gv.gui.disable_menu_items() gv.gui.disable_go_button() gv.gui.enable_stop_button() self.stopped = False if gv.verbose: print("#") print("# " + self.get_side_to_move_string(self.stm) + " to move") print("#") gv.gui.apply_drag_and_drop_settings(self.player[self.stm], self.stm) # gv.board.reduce_board_history(self.movelist) self.engine_output.clear("w", " ") self.engine_output.clear("b", " ") if self.player[self.stm] == "Human": gv.gui.set_status_bar_msg(_("ready")) return gv.gui.set_status_bar_msg(_("Thinking ...")) # it's the computers turn to move. kick off a separate thread for # computers move so that gui is still useable self.ct = _thread.start_new_thread(self.computer_move, ())
def run_background(self, interval=1): """ Convert minute interval to seconds and updated services with new timeout. """ timeout = interval * 60 * 1000 self.services['timeout'] = GObject.timeout_add(timeout, self.update_reddit_data)
def _update_hr_msrmt_simulation(self): print('Update HR Measurement Simulation') if not self.notifying: return GObject.timeout_add(1000, self.hr_msrmt_cb)
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BATTERY_LVL_UUID, ['read', 'notify'], service) self.notifying = False self.battery_lvl = 100 GObject.timeout_add(5000, self.drain_battery)
def scan_devices( duration = 10 ): """ This causes the bluetooth system to scan for any broadcasting devices. Once found, the devices get added to a dbus backed database specific for bluetooth for retrieval later. Args: duration (int): the amount of time that the scan should run for in seconds. """ adapter = DeviceManager().find_adapter() adapter.StartDiscovery() mainloop = GObject.MainLoop() GObject.timeout_add(duration * 1000, quit, mainloop) mainloop.run()
def main(): signal.signal(signal.SIGINT, signal.SIG_DFL) indicator = appindicator.Indicator.new(APPINDICATOR_ID, \ os.path.abspath('/usr/share/icons/gnome/24x24/emotes/face-smile-big.png'), \ appindicator.IndicatorCategory.SYSTEM_SERVICES) indicator.set_status(appindicator.IndicatorStatus.ACTIVE) menu = gtk.Menu() indicator.set_menu(build_menu(menu)) # GObject.timeout_add(1000, timespent, indicator, menu) gtk.main()
def set_play(self, pos, time): if self._timer is not None: GObject.source_remove(self._timer) self._timer = None tracks = self._current_album.get_tracks() for index in range(0, pos): time = time + tracks[index].get_length() self._songs_scale.set_value(time+1) self._timer = GObject.timeout_add(1000, self._playing)
def maybe_auto_new_question(self): if self.get_bool('new_question_automatically'): if self.m_timeout_handle is None: def remove_timeout(self=self): self.m_timeout_handle = None self.g_view.new_question() self.m_timeout_handle = GObject.timeout_add(int(self.get_float('seconds_before_new_question')*1000), remove_timeout)
def on_start_practise(self): soundcard.solfege_c_midi.dsp_open_record() #self.__idle_tag = GObject.idle_add(self.update_view) self.__idle_tag = GObject.timeout_add(300, self.update_view)
def flash(self, s): self.clear() l = Gtk.Label(label=s) l.set_name("Feedback") l.set_alignment(0.0, 0.5) l.show() self.g_box.pack_start(l, True, True, 0) self.g_box.set_size_request( max(l.size_request().width + gu.PAD * 2, self.g_box.size_request().width), max(l.size_request().height + gu.PAD * 2, self.g_box.size_request().height)) self.__timeout = GObject.timeout_add(2000, self.unflash)
def delayed_flash(self, milliseconds, msg): GObject.timeout_add(milliseconds, lambda: self.flash(msg))
def on_up_press(self, eb, ev): if self.m_timeout: GObject.source_remove(self.m_timeout) self.m_timeout = None if ev.type == Gdk.EventType.BUTTON_PRESS: if self.m_value < 127: self.up() if self.m_value < 127: self.m_timeout = GObject.timeout_add(DELAY1, self.on_up_timeout)
def on_up_timeout(self, *v): if self.m_value < 127: self.up() self.m_timeout = GObject.timeout_add(DELAY2, self.on_up_timeout)
def on_down_press(self, eb, ev): if self.m_timeout: GObject.source_remove(self.m_timeout) self.m_timeout = None if ev.type == Gdk.EventType.BUTTON_PRESS: if self.m_value > 0: self.down() if self.m_value > 0: self.m_timeout = GObject.timeout_add(DELAY1, self.on_down_timeout)
def on_down_timeout(self, *v): if self.m_value > 0: self.down() self.m_timeout = GObject.timeout_add(DELAY2, self.on_down_timeout)
def do_real_clean_task(self): if len(self.clean_tasks) != 0: plugin, cruft_dict = self.clean_tasks.pop(0) plugin.set_property('clean_finished', False) for row in self.janitor_model: for child_row in row.iterchildren(): if child_row[self.JANITOR_PLUGIN] == plugin: plugin_iter = child_row.iter log.debug("Call %s to clean cruft" % plugin) self._object_clean_handler = plugin.connect('object_cleaned', self.on_plugin_object_cleaned, (plugin_iter, cruft_dict)) self._all_clean_handler = plugin.connect('all_cleaned', self.on_plugin_cleaned, plugin_iter) self._error_handler = plugin.connect('clean_error', self.on_clean_error, plugin_iter) self.janitor_view.scroll_to_cell(self.janitor_model.get_path(plugin_iter)) t = threading.Thread(target=plugin.clean_cruft, kwargs={'cruft_list': cruft_dict.keys(), 'parent': self.get_toplevel()}) for row in self.result_model: if row[self.RESULT_PLUGIN] == plugin: self.result_view.get_selection().select_path(row.path) self.result_view.scroll_to_cell(row.path) row[self.RESULT_DISPLAY] = '<b>%s</b>' % _('Cleaning cruft for "%s"...') % plugin.get_title() self.result_view.expand_row(self.result_model.get_path(row.iter), True) self.janitor_model[plugin_iter][self.JANITOR_SPINNER_ACTIVE] = True self.janitor_model[plugin_iter][self.JANITOR_SPINNER_PULSE] = 0 GObject.timeout_add(50, self._on_clean_spinner_timeout, plugin_iter, t) t.start() else: self.on_scan_button_clicked() self.unset_busy()
def askInstallPackage(self, package, summary, description, homepage): # populate the dialog dia = self.dia dia_xml = self.dia_xml header = _("Install additional software?") body = _("Do you want to install package '%s'?") % package dia.set_title('') header_label = dia_xml.get_object('header_label') header_label.set_markup("<b><big>%s</big></b>" % header) body_label = dia_xml.get_object('body_label') body_label.set_label(body) description_text_view = dia_xml.get_object('description_text_view') tbuf = Gtk.TextBuffer() desc = "%s\n\n%s" % (summary, Helpers.format_description(description)) tbuf.set_text(desc) description_text_view.set_buffer(tbuf) dia.set_icon(Gtk.IconTheme.get_default().load_icon('package-x-generic', 16, False)) # check if another package manager is already running # FIXME: just checking for the existance of the file is # not sufficient, it need to be tested if it can # be locked via apt_pkg.get_lock() # - but that needs to run as root # - a dbus helper might be the best answer here #args = (update_button_status, dia_xml.get_object("yes_button"), # dia_xml.get_object("infolabel")) #args[0](*args[1:]) #timer_id = GObject.timeout_add(750, *args ) # show the dialog res = dia.run() #GObject.source_remove(timer_id) if res != Gtk.ResponseType.YES: dia.hide() return False return True # progress etc
def _update_temp_value(self): if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: return print('Starting timer event') GObject.timeout_add(500, self.temperature_cb)
def _update(self, widget, data=None): # This loop makes sure that we only call _find every 500 ms . if self.timer_id: # We destroy the last event source and create another one. GObject.source_remove(self.timer_id) self.timer_id = GObject.timeout_add(500, self._refresh, widget, data)
def _validate_cb(self, widget): # We create a timeout object to avoid fast lookups on disk when typing. self.timer_id = GObject.timeout_add(500, self._validate, widget.get_child())