我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用gi.repository.GObject.timeout_add_seconds()。
def __init__(self, oneconfviewpickler): '''Controller of the installed pane''' LOG.debug("OneConf Handler init") super(OneConfHandler, self).__init__() # OneConf stuff self.oneconf = DbusConnect() self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed', self.refresh_hosts) self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed', self._on_store_packagelist_changed) self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed', self.on_new_latest_oneconf_sync_timestamp) self.already_registered_hostids = [] self.is_current_registered = False self.oneconfviewpickler = oneconfviewpickler # refresh host list self._refreshing_hosts = False GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY, self.get_latest_oneconf_sync) GObject.idle_add(self.refresh_hosts)
def main(): global indicator, menu indicator = appindicator.Indicator.new(APPINDICATOR_ID, os.path.abspath('closed.svg'), appindicator.IndicatorCategory.SYSTEM_SERVICES) indicator.set_status(appindicator.IndicatorStatus.ACTIVE) pubnub = set_up_pubnub() menu = gtk.Menu() item = gtk.MenuItem('Quit') item.connect('activate', die, pubnub) menu.append(item) menu.show_all() indicator.set_menu(menu) indicator.set_icon(os.path.abspath("closed.svg")) notify.init(APPINDICATOR_ID) GObject.timeout_add_seconds(1, check_caps, pubnub) gtk.main()
def set_sbrick(self, sbrick): self.sbrick = sbrick self.set_sensitive(sbrick is not None) if sbrick is not None: self.refresh_all_values() GObject.timeout_add_seconds(5, self.refresh_updating_values)
def Delay (self, seconds, reply_handler, error_handler): print "Sleeping for %ds" % seconds gobject.timeout_add_seconds (seconds, lambda: reply_handler (seconds))
def _ask_and_repair_broken_cache(self): # wait until the window window is available if self.window_main.props.visible == False: GObject.timeout_add_seconds(1, self._ask_and_repair_broken_cache) return if dialogs.confirm_repair_broken_cache(self.window_main, self.datadir): self.backend.fix_broken_depends()
def initiate_purchase(self, app, iconname, url=None, html=None): """ initiates the purchase workflow inside the embedded webkit window for the item specified """ if not self._ask_for_tos_acceptance_if_needed(): self.emit("terms-of-service-declined") return False self.init_view() self.app = app self.iconname = iconname self.wk.webkit.load_html_string(self.LOADING_HTML, "file:///") self.wk.show() while Gtk.events_pending(): Gtk.main_iteration() if url: self.wk.webkit.load_uri(url) elif html: self.wk.webkit.load_html_string(html, "file:///") else: self.wk.webkit.load_html_string(DUMMY_HTML, "file:///") self.pack_start(self.wk, True, True, 0) # only for debugging if os.environ.get("SOFTWARE_CENTER_DEBUG_BUY"): GObject.timeout_add_seconds(1, _generate_events, self) return True
def get_test_window_purchaseview(): #url = "http://www.animiertegifs.de/java-scripts/alertbox.php" url = "http://www.ubuntu.cohtml=DUMMY_m" #d = PurchaseDialog(app=None, url="http://spiegel.de") from softwarecenter.enums import BUY_SOMETHING_HOST url = BUY_SOMETHING_HOST + "/subscriptions/en/ubuntu/maverick/+new/?%s" % ( urllib.urlencode({ 'archive_id': "mvo/private-test", 'arch': "i386", })) # use cmdline if available if len(sys.argv) > 1: url = sys.argv[1] # useful for debugging #d.connect("key-press-event", _on_key_press) #GObject.timeout_add_seconds(1, _generate_events, d) widget = PurchaseView() from mock import Mock widget.config = Mock() win = Gtk.Window() win.set_data("view", widget) win.add(widget) win.set_size_request(600, 500) win.set_position(Gtk.WindowPosition.CENTER) win.show_all() win.connect('destroy', Gtk.main_quit) widget.initiate_purchase(app=None, iconname=None, url=url) #widget.initiate_purchase(app=None, iconname=None, html=DUMMY_HTML) return win
def queue_next(self): self.cleanup_timeout() self._timeout = GObject.timeout_add_seconds( self.TIMEOUT_SECONDS, self.next_exhibit) return self._timeout
def get_test_window(): w = OneConfViews(Gtk.IconTheme.get_default()) w.show() win = Gtk.Window() win.set_data("pane", w) win.add(w) win.set_size_request(400, 600) win.connect("destroy", lambda x: Gtk.main_quit()) # init the view w.register_computer("AAAAA", "NameA") w.register_computer("ZZZZZ", "NameZ") w.register_computer("DDDDD", "NameD") w.register_computer("CCCCC", "NameC") w.register_computer("", "This computer should be first") w.select_first() GObject.timeout_add_seconds(5, w.register_computer, "EEEEE", "NameE") def print_selected_hostid(widget, hostid, hostname): print "%s selected for %s" % (hostid, hostname) w.connect("computer-changed", print_selected_hostid) w.remove_computer("DDDDD") win.show_all() return win
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file, event): if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT: return if self._timeout_id: GObject.source_remove(self._timeout_id) self._timeout_id = None self._timeout_id = GObject.timeout_add_seconds(10, self.open)
def clear_token_from_ubuntu_sso_sync(appname): """ send a dbus signal to the com.ubuntu.sso service to clear the credentials for the given appname, e.g. _("Ubuntu Software Center") and wait for it to finish (or 2s) """ from ubuntu_sso import ( DBUS_BUS_NAME, DBUS_CREDENTIALS_IFACE, DBUS_CREDENTIALS_PATH, ) # clean loop = GObject.MainLoop() bus = dbus.SessionBus() obj = bus.get_object(bus_name=DBUS_BUS_NAME, object_path=DBUS_CREDENTIALS_PATH, follow_name_owner_changes=True) proxy = dbus.Interface(object=obj, dbus_interface=DBUS_CREDENTIALS_IFACE) proxy.connect_to_signal("CredentialsCleared", loop.quit) proxy.connect_to_signal("CredentialsNotFound", loop.quit) proxy.connect_to_signal("CredentialsError", loop.quit) proxy.clear_credentials(appname, {}) # ensure we don't hang forever here GObject.timeout_add_seconds(2, loop.quit) # run the mainloop until the credentials are clear loop.run()
def __init__(self, db): self.db = db self.distro = get_distro() self.backend = get_install_backend() self.backend.connect("channels-changed", self._remove_no_longer_needed_extra_channels) # kick off a background check for changes that may have been made # in the channels list GObject.timeout_add_seconds(60, self._check_for_channel_updates_timer) # extra channels from e.g. external sources self.extra_channels = [] self._logger = LOG # external API
def get_test_spinner_window(): label = Gtk.Label("foo") spinner_notebook = SpinnerNotebook(label, "random msg") window = Gtk.Window() window.add(spinner_notebook) window.set_size_request(600, 500) window.set_position(Gtk.WindowPosition.CENTER) window.show_all() window.connect('destroy', Gtk.main_quit) spinner_notebook.show_spinner("Loading for 1s ...") GObject.timeout_add_seconds(1, lambda: spinner_notebook.hide_spinner()) return window
def change_app_icon(self): self.indicator.set_icon( self.get_icon() ) gobject.timeout_add_seconds( 60, self.update_server_status_icon )
def start_session(self, host, port, session, username, password, wait): """ Start a session using qtnx """ self.state = "connecting" if not os.path.exists(os.path.expanduser('~/.qtnx')): os.mkdir(os.path.expanduser('~/.qtnx')) # Generate qtnx's configuration file filename = os.path.expanduser('~/.qtnx/%s-%s-%s.nxml') % ( host, port, session.replace("/", "_")) nxml = open(filename, "w+") config = self.NXML_TEMPLATE config = config.replace("WL_NAME", "%s-%s-%s" % (host, port, session.replace("/", "_"))) config = config.replace("WL_SERVER", host) config = config.replace("WL_PORT", str(port)) config = config.replace("WL_COMMAND", "weblive-session %s" % session) nxml.write(config) nxml.close() # Prepare qtnx call cmd = [self.BINARY_PATH, '%s-%s-%s' % (str(host), str(port), session.replace("/", "_")), username, password] def qtnx_countdown(): """ Send progress events every two seconds """ if self.helper_progress == 10: self.state = "connected" self.emit("connected", False) return False else: self.emit("progress", self.helper_progress * 10) self.helper_progress += 1 return True def qtnx_start_timer(): """ As we don't have a way of knowing the connection status, we countdown from 20s """ self.helper_progress = 0 qtnx_countdown() GObject.timeout_add_seconds(2, qtnx_countdown) qtnx_start_timer() if wait == False: # Start in the background and attach a watch for when it exits (self.helper_pid, stdin, stdout, stderr) = GObject.spawn_async( cmd, standard_input=True, standard_output=True, standard_error=True, flags=GObject.SPAWN_DO_NOT_REAP_CHILD) GObject.child_watch_add(self.helper_pid, self._on_qtnx_exit, filename) else: # Start it and wait till it finishes p = subprocess.Popen(cmd) p.wait()