我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gi.repository.GObject.GObject()。
def __init__(self, app, db): Thread.__init__(self) GObject.GObject.__init__(self) self.db = db self.counter_max = 30 self.counter = self.counter_max self.account_id = app[0] self.account_name = app[1] self.secret_code = Database.fetch_secret_code(app[2]) if self.secret_code: self.code = Code(self.secret_code) else: self.code_generated = False logging.error("Could not read the secret code," "the keyring keys were reset manually") self.logo = app[3] self.start()
def __init__(self, parent, main_window): self.parent = parent Thread.__init__(self) GObject.GObject.__init__(self) self.nom = "applications-db-reader" self.builder = Gtk.Builder.new_from_resource("/org/gnome/Authenticator/applications.ui") self.builder.connect_signals({ "on_close" : self.close_window, "on_key_press": self.on_key_press, "on_apply" : self.select_application }) self.window = self.builder.get_object("ApplicationsWindow") self.window.set_transient_for(main_window) self.listbox = self.builder.get_object("ApplicationsList") self.generate_search_bar() self.stack = self.builder.get_object("ApplicationsStack") self.stack.set_visible_child(self.stack.get_child_by_name("loadingstack")) self.builder.get_object("ApplicationListScrolled").add_with_viewport(self.listbox) self.db = [] self.start()
def import_scc(self, filename): """ Imports simple, single-file scc-profile. Just loads it, checks for shell() actions and asks user to enter name. """ files = self.builder.get_object("lstImportPackage") # Load profile profile = Profile(GuiActionParser()) try: profile.load(filename) except Exception, e: # Profile cannot be parsed. Display error message and let user to quit # Error message reuses page from VDF import, because they are # basically the same log.error(e) self.error(str(e)) return name = ".".join(os.path.split(filename)[-1].split(".")[0:-1]) files.clear() o = GObject.GObject() o.obj = profile files.append(( 2, name, name, _("(profile)"), o )) self.check_shell_commands()
def load_autoswitch(self): """ Transfers autoswitch settings from config to UI """ tvItems = self.builder.get_object("tvItems") cbShowOSD = self.builder.get_object("cbShowOSD") conditions = AutoSwitcher.parse_conditions(self.app.config) model = tvItems.get_model() model.clear() for cond in conditions.keys(): o = GObject.GObject() o.condition = cond o.action = conditions[cond] a_str = o.action.describe(Action.AC_SWITCHER) model.append((o, o.condition.describe(), a_str)) self._recursing = True self.on_tvItems_cursor_changed() cbShowOSD.set_active(bool(self.app.config['autoswitch_osd'])) self._recursing = False
def __init__(self): GObject.Object.__init__(self) settings = Gio.Settings.new("org.ubuntubudgie.plugins.budgie-extras") settings.bind("ws-overview-index", self, 'mode_index', Gio.SettingsBindFlags.DEFAULT) # general self.mode = modes[self.mode_index] self.appbutton = Gtk.Button.new() self.appbutton.set_relief(Gtk.ReliefStyle.NONE) icon = Gtk.Image.new_from_icon_name("1-wso", Gtk.IconSize.MENU) self.appbutton.set_image(icon) self.menu = Gtk.Menu() self.create_menu() self.update = Thread(target=self.show_seconds) # daemonize the thread to make the indicator stopable self.update.setDaemon(True) self.update.start()
def on_mcg_connect(self, connected): if connected: GObject.idle_add(self._connect_connected) self._mcg.load_playlist() self._mcg.load_albums() self._mcg.get_status() self._connect_action.set_state(GLib.Variant.new_boolean(True)) self._play_action.set_enabled(True) self._clear_playlist_action.set_enabled(True) self._panel_action.set_enabled(True) else: GObject.idle_add(self._connect_disconnected) self._connect_action.set_state(GLib.Variant.new_boolean(False)) self._play_action.set_enabled(False) self._clear_playlist_action.set_enabled(False) self._panel_action.set_enabled(False)
def on_mcg_status(self, state, album, pos, time, volume, error): # Album GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_album, album) # State if state == 'play': GObject.idle_add(self._header_bar.set_play) GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_play, pos, time) self._play_action.set_state(GLib.Variant.new_boolean(True)) elif state == 'pause' or state == 'stop': GObject.idle_add(self._header_bar.set_pause) GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_pause) self._play_action.set_state(GLib.Variant.new_boolean(False)) # Volume GObject.idle_add(self._header_bar.set_volume, volume) # Error if error is None: self._infobar.hide() else: self._show_error(error)
def __init__(self, builder): GObject.GObject.__init__(self) self._changing_volume = False self._setting_volume = False # Widgets self._header_bar = builder.get_object('headerbar') self._stack_switcher = StackSwitcher(builder) self._button_connect = builder.get_object('headerbar-connection') self._button_playpause = builder.get_object('headerbar-playpause') self._button_volume = builder.get_object('headerbar-volume') # Signals self._stack_switcher.connect('stack-switched', self.on_stack_switched) self._button_handlers = { 'on_headerbar-connection_active_notify': self.on_connection_active_notify, 'on_headerbar-connection_state_set': self.on_connection_state_set, 'on_headerbar-playpause_toggled': self.on_playpause_toggled, 'on_headerbar-volume_value_changed': self.on_volume_changed, 'on_headerbar-volume_button_press_event': self.on_volume_press, 'on_headerbar-volume_button_release_event': self.on_volume_release }
def _set_tracks(self, album): self._songs_scale.clear_marks() self._songs_scale.set_range(0, album.get_length()) length = 0 for track in album.get_tracks(): cur_length = length if length > 0 and length < album.get_length(): cur_length = cur_length + 1 self._songs_scale.add_mark( cur_length, Gtk.PositionType.RIGHT, GObject.markup_escape_text( Utils.create_track_title(track) ) ) length = length + track.get_length() self._songs_scale.add_mark(length, Gtk.PositionType.RIGHT, "{0[0]:02d}:{0[1]:02d} minutes".format(divmod(length, 60)))
def _change_tracklist_size(self, size, notify=True, store=True): # Set tracklist size if size == TracklistSize.LARGE: self._panel.set_homogeneous(True) self._info_revealer.set_reveal_child(True) elif size == TracklistSize.SMALL: self._panel.set_homogeneous(False) self._info_revealer.set_reveal_child(True) else: self._panel.set_homogeneous(False) self._info_revealer.set_reveal_child(False) # Store size if store: self._tracklist_size = size # Notify signals if notify: self.emit('tracklist-size-changed', size) # Resize image GObject.idle_add(self._resize_image)
def __init__(self, services=[], filters=[avahi.LOOKUP_RESULT_LOCAL], interface=avahi.IF_UNSPEC, protocol=avahi.PROTO_INET): GObject.GObject.__init__(self) self.filters = filters self.services = services self.interface = interface self.protocol = protocol try: self.system_bus = dbus.SystemBus() self.system_bus.add_signal_receiver( self.avahi_dbus_connect_cb, "NameOwnerChanged", "org.freedesktop.DBus", arg0="org.freedesktop.Avahi") except dbus.DBusException as e: logger.error("Error Owning name on D-Bus: %s", e) sys.exit(1) self.db = ServiceTypeDatabase() self.service_browsers = {} self.started = False
def update_menu_item(self): """Update the menu item. :return: """ GObject.idle_add( self.menu_item.set_label, "Ping: " + self.address + self.print_name() + self.state, priority=GObject.PRIORITY_HIGH ) GObject.idle_add( self.menu_item.set_image, self.image, priority=GObject.PRIORITY_HIGH ) GObject.idle_add( self.menu_item.show, priority=GObject.PRIORITY_HIGH )
def __init__(self): try: GObject.GObject.__init__ (self) threading.Thread.__init__ (self) self.setDaemon (True) self.keymap = Gdk.Keymap().get_default() self.display = Display() self.screen = self.display.screen() self.window = self.screen.root self.showscreen = Wnck.Screen.get_default() self.ignored_masks = self.get_mask_combinations(X.LockMask | X.Mod2Mask | X.Mod5Mask) self.map_modifiers() self.raw_keyval = None self.keytext = "" except Exception as cause: print (("init keybinding error: \n", str(cause))) self.display = None return None
def __init__(self, git_uri, window): GObject.GObject.__init__(self) self._git = git_uri self._builder = Gtk.Builder() self._builder.add_from_resource('/com/nautilus/git/ui/branch.ui') self._builder.connect_signals({ "on_cancel": self._close_window, "on_apply": self._update_branch, "branch_changed": self._validate_branch_name }) self._window = self._builder.get_object("window") self._window.set_transient_for(window) self._build_main_widget() self._window.show_all()
def __init__(self, folders): GObject.GObject.__init__(self) Thread.__init__(self) Gtk.Window.__init__(self) # Here i assume that all folders got the same icon... self._folders = folders self.model = [] self._flowbox = Gtk.FlowBox() # Threading stuff self.setDaemon(True) self.run() # Window configurations self.set_default_size(650, 500) self.set_size_request(650, 500) self.set_resizable(True) self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT) # Widgets & Accelerators self._build_header_bar() self._build_content() self._setup_accels()
def __init__(self, db, cache, icons, icon_size=48, global_icon_cache=False): GObject.GObject.__init__(self) self.db = db self.cache = cache # get all categories cat_parser = CategoriesParser(db) self.all_categories = cat_parser.parse_applications_menu( softwarecenter.paths.APP_INSTALL_PATH) # reviews stats loader self.review_loader = get_review_loader(cache, db) # icon jazz self.icons = icons self.icon_size = icon_size self._missing_icon = None # delay this until actually needed if global_icon_cache: self.icon_cache = _app_icon_cache else: self.icon_cache = {}
def _category_translate(self, catname): """ helper that will look into the categories we got from the parser and returns the translated name if it find it, otherwise it resorts to plain gettext """ # look into parsed categories that use .directory translation for cat in self.all_categories: if cat.untranslated_name == catname: return cat.name # try normal translation first translated_catname = _(catname) if translated_catname == catname: # if no normal translation is found, try to find a escaped # translation (LP: #872760) translated_catname = _(GObject.markup_escape_text(catname)) # the parent expect the string unescaped translated_catname = unescape(translated_catname) return translated_catname
def __init__(self, notebook_view, options=None): GObject.GObject.__init__(self) self.notebook_view = notebook_view self.search_entry = SearchEntry() self.search_entry.connect( "terms-changed", self.on_search_terms_changed) self.search_entry.connect( "key-press-event", self.on_search_entry_key_press_event) self.back_forward = BackForwardButton() self.back_forward.connect( "left-clicked", self.on_nav_back_clicked) self.back_forward.connect( "right-clicked", self.on_nav_forward_clicked) self.navhistory = NavigationHistory(self.back_forward, options) self.spinner = Gtk.Spinner() self.all_views = {} self.view_to_pane = {} self._globalise_instance()
def __init__(self, pathname=None, cache=None): GObject.GObject.__init__(self) # initialize at creation time to avoid spurious AttributeError self._use_agent = False self._use_axi = False if pathname is None: pathname = softwarecenter.paths.XAPIAN_PATH self._db_pathname = pathname if cache is None: cache = get_pkg_info() self._aptcache = cache self._additional_databases = [] # the xapian values as read from /var/lib/apt-xapian-index/values self._axi_values = {} # we open one db per thread, thread names are reused eventually # so no memory leak self._db_per_thread = {} self._parser_per_thread = {} self._axi_stamp_monitor = None
def _gio_screenshots_json_download_complete_cb(self, source, result, path): try: res, content, etag = source.load_contents_finish(result) except GObject.GError: # ignore read errors, most likely transient return if content is not None: try: content = json.loads(content) except ValueError as e: LOG.error("can not decode: '%s' (%s)" % (content, e)) content = None if isinstance(content, dict): # a list of screenshots as listsed online screenshot_list = content['screenshots'] else: # fallback to a list of screenshots as supplied by the axi screenshot_list = [] # save for later and emit self._screenshot_list = self._sort_screenshots_by_best_version( screenshot_list) self.emit("screenshots-available", self._screenshot_list)
def __init__(self, untranslated_name, name, iconname, query, only_unallocated=True, dont_display=False, flags=[], subcategories=[], sortmode=SortMethods.BY_ALPHABET, item_limit=0): GObject.GObject.__init__(self) if type(name) == str: self.name = unicode(name, 'utf8').encode('utf8') else: self.name = name.encode('utf8') self.untranslated_name = untranslated_name self.iconname = iconname for subcategory in subcategories: query = xapian.Query(xapian.Query.OP_OR, query, subcategory.query) self.query = query self.only_unallocated = only_unallocated self.subcategories = subcategories self.dont_display = dont_display self.flags = flags self.sortmode = sortmode self.item_limit = item_limit
def _threaded_perform_search(self): self._perform_search_complete = False # generate a name and ensure we never have two threads # with the same name names = [thread.name for thread in threading.enumerate()] for i in range(threading.active_count() + 1, 0, -1): thread_name = 'ThreadedQuery-%s' % i if not thread_name in names: break # create and start it t = threading.Thread( target=self._blocking_perform_search, name=thread_name) t.start() # don't block the UI while the thread is running context = GObject.main_context_default() while not self._perform_search_complete: time.sleep(0.02) # 50 fps while context.pending(): context.iteration() t.join() # call the query-complete callback self.emit("query-complete")
def wait_for_apt_cache_ready(f): """ decorator that ensures that self.cache is ready using a gtk idle_add - needs a cache as argument """ def wrapper(*args, **kwargs): self = args[0] # check if the cache is ready and window = None if hasattr(self, "app_view"): window = self.app_view.get_window() if not self.cache.ready: if window: window.set_cursor(self.busy_cursor) GObject.timeout_add(500, lambda: wrapper(*args, **kwargs)) return False # cache ready now if window: window.set_cursor(None) f(*args, **kwargs) return False return wrapper
def __init__(self): GObject.GObject.__init__(self) InstallBackend.__init__(self) # transaction details for setting as meta self.new_pkgname, self.new_appname, self.new_iconname = '', '', '' # this is public exposed self.pending_transactions = {} self.client = packagekit.Client() self.pkginfo = get_pkg_info() self.pkginfo.open() self._transactions_watcher = PackagekitTransactionsWatcher() self._transactions_watcher.connect('lowlevel-transactions-changed', self._on_lowlevel_transactions_changed)
def __init__(self, oneconfviewpickler): '''Controller of the installed pane''' LOG.debug("OneConf Handler init") super(OneConfHandler, self).__init__() # OneConf stuff self.oneconf = DbusConnect() self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed', self.refresh_hosts) self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed', self._on_store_packagelist_changed) self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed', self.on_new_latest_oneconf_sync_timestamp) self.already_registered_hostids = [] self.is_current_registered = False self.oneconfviewpickler = oneconfviewpickler # refresh host list self._refreshing_hosts = False GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY, self.get_latest_oneconf_sync) GObject.idle_add(self.refresh_hosts)
def __init__(self, cache, db): """ Init a AppEnquire object :Parameters: - `cache`: apt cache (for stuff like the overlay icon) - `db`: a xapian.Database that contians the applications """ GObject.GObject.__init__(self) self.cache = cache self.db = db self.distro = get_distro() self.search_query = SearchQuery(None) self.nonblocking_load = True self.sortmode = SortMethods.UNSORTED self.nonapps_visible = NonAppVisibility.MAYBE_VISIBLE self.limit = DEFAULT_SEARCH_LIMIT self.filter = None self.exact = False self.nr_pkgs = 0 self.nr_apps = 0 self._matches = [] self.match_docids = set()
def _check_url_reachable_and_then_download_cb(self, f, result, user_data=None): self.LOG.debug("_check_url_reachable_and_then_download_cb: %s" % f) try: info = f.query_info_finish(result) etag = info.get_etag() self.emit('file-url-reachable', True) self.LOG.debug("file reachable %s %s %s" % (self.url, info, etag)) # url is reachable, now download the file f.load_contents_async( self._cancellable, self._file_download_complete_cb, None) except GObject.GError as e: self.LOG.debug("file *not* reachable %s" % self.url) self.emit('file-url-reachable', False) self.emit('error', GObject.GError, e) del f
def emit(self, *args): GLib.idle_add(GObject.GObject.emit, self, *args)
def __init__(self): GObject.GObject.__init__(self) TimerManager.__init__(self) self._direction = 0
def __init__(self): GObject.GObject.__init__(self) self.alive = None self.connection = None self.connecting = False self.buffer = "" self._connect() self._requests = [] self._controllers = [] # Ordered as daemon says self._controller_by_id = {} # Source of memory leak
def __init__(self, daemon_manager, controller_id): GObject.GObject.__init__(self) self._dm = daemon_manager self._controller_id = controller_id self._config_file = None self._profile = None self._type = None self._connected = False
def _add_gesture(self, gstr, action, select=False): lstGestures = self.builder.get_object("lstGestures") o = GObject.GObject() o.gstr = gstr o.action = action iter = lstGestures.append( ( GestureComponent.nice_gstr(gstr), action.describe(Action.AC_MENU), o ) ) if select: tvGestures = self.builder.get_object("tvGestures") tvGestures.get_selection().select_iter(iter) self.on_tvGestures_cursor_changed() self.on_btEditAction_clicked()
def __init__(self, icon_path, popupmenu, force=False): GObject.GObject.__init__(self) self.__icon_path = os.path.normpath(os.path.abspath(icon_path)) self.__popupmenu = popupmenu self.__active = True self.__visible = False self.__hidden = False self.__icon = "si-syncthing-unknown" self.__text = "" self.__force = force
def __init__(self, driver, device_id): GObject.GObject.__init__(self) self.buffer = b"" self.buttons = [] self.axes = [] self.subprocess = None self.driver = driver self.device_id = device_id self.errorred = False # To prevent sending 'error' signal multiple times
def import_scc_tar(self, filename): """ Imports packaged profiles. Checks for shell() actions everywhere and ask user to enter main name, check generated ones and optionaly change them as he wish. """ files = self.builder.get_object("lstImportPackage") try: # Open tar tar = tarfile.open(filename, "r:gz") files.clear() # Grab 1st profile name = tar.extractfile(Export.PN_NAME).read() main_profile = "%s.sccprofile" % name parser = GuiActionParser() o = GObject.GObject() o.obj = Profile(parser).load_fileobj(tar.extractfile(main_profile)) files.append(( 2, name, name, _("(profile)"), o )) for x in tar: name = ".".join(x.name.split(".")[0:-1]) if x.name.endswith(".sccprofile") and x.name != main_profile: o = GObject.GObject() o.obj = Profile(parser).load_fileobj(tar.extractfile(x)) files.append(( True, name, name, _("(profile)"), o )) elif x.name.endswith(".menu"): o = GObject.GObject() o.obj = MenuData.from_fileobj(tar.extractfile(x), parser) files.append(( True, name, name, _("(menu)"), o )) except Exception, e: # Either entire tar or some profile cannot be parsed. # Display error message and let user to quit # Error message reuses same page as above. log.error(e) self.error(str(e)) return self.check_shell_commands()
def on_btAdd_clicked(self, *a): """ Handler for "Add Item" button """ tvItems = self.builder.get_object("tvItems") model = tvItems.get_model() o = GObject.GObject() o.condition = Condition() o.action = NoAction() iter = model.append((o, o.condition.describe(), "None")) tvItems.get_selection().select_iter(iter) self.on_tvItems_cursor_changed() self.btEdit_clicked_cb()
def __init__(self,cua,folder): threading.Thread.__init__(self) GObject.GObject.__init__(self) self.setDaemon(True) self.cua = cua self.folder = folder
def __init__(self): GObject.GObject.__init__(self) self.pw = 0 # self.player = Gst.ElementFactory.make("playbin", "player") # self.player.connect("about-to-finish", self.on_player_finished) # bus = self.player.get_bus() # bus.connect("message", self.on_player_message) self.icon = comun.ICON self.active_icon = None self.about_dialog = None self.active = False self.animate = False self.frame = 0 self.pomodoros = 0 self.player = Player() self.notification = Notify.Notification.new('', '', None) self.read_preferences() # self.indicator = appindicator.Indicator.new('Pomodoro-Indicator', self.active_icon, appindicator. IndicatorCategory. HARDWARE) self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE) self.indicator.connect('scroll-event', self.on_scroll) menu = self.get_menu() self.indicator.set_menu(menu) self.connect('session_end', self.on_session_end) self.connect('break_end', self.on_break_end)
def main(): if dbus.SessionBus().request_name( 'es.atareao.PomodoroIndicator') !=\ dbus.bus.REQUEST_NAME_REPLY_PRIMARY_OWNER: print("application already running") exit(0) GObject.threads_init() Gst.init(None) Gst.init_check(None) Notify.init('pomodoro-indicator') Pomodoro_Indicator() Gtk.main()
def __init__(self): GObject.GObject.__init__(self) Gst.init_check(None) self.status = Status.STOPPED self.player = None self.speed = 1.0 self.volume = 1.0 self.removesilence = False self.equalizer = {'band0': 0, 'band1': 0, 'band2': 0, 'band3': 0, 'band4': 0, 'band5': 0, 'band6': 0, 'band7': 0, 'band8': 0, 'band9': 0} self.lastpos = 0