我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gi.repository.GObject.source_remove()。
def new_question(self): """returns: self.ERR_PICKY : if the question is not yet solved and the teacher is picky (== you have to solve the question before a new is asked). self.OK : if a new question was created. self.ERR_NO_ELEMS : if no elements are set to be practised. """ if self.m_timeout_handle: GObject.source_remove(self.m_timeout_handle) self.m_timeout_handle = None if self.get_bool('config/picky_on_new_question') \ and self.q_status in [self.QSTATUS_NEW, self.QSTATUS_WRONG]: return self.ERR_PICKY self.q_status = self.QSTATUS_NO if not self.m_P.header.solmisation_elements: return self.ERR_NO_ELEMS self.m_question = [] for x in range(self.get_int("num_notes")): self.m_question.append(random.choice(self.m_P.header.solmisation_elements)) self.q_status = self.QSTATUS_NEW self.m_transp = random.randint(-5, 6) return self.OK
def new_question(self): """ UI will never call this function unless we have a usable lessonfile. """ if self.m_timeout_handle: GObject.source_remove(self.m_timeout_handle) self.m_timeout_handle = None if solfege.app.m_test_mode: self.m_P.next_test_question() self.q_status = self.QSTATUS_NEW return self.OK if self.get_bool('config/picky_on_new_question') \ and self.q_status in [self.QSTATUS_NEW, self.QSTATUS_WRONG]: return Teacher.ERR_PICKY self.q_status = self.QSTATUS_NO assert self.m_P self.m_P.select_random_question() self.q_status = self.QSTATUS_NEW return self.OK
def new_question(self): """ Return a true value if a new question was created otherwise false. """ if self.m_timeout_handle: GObject.source_remove(self.m_timeout_handle) self.m_timeout_handle = None if self.get_bool('config/picky_on_new_question') \ and self.q_status in [self.QSTATUS_NEW, self.QSTATUS_WRONG]: return Teacher.ERR_PICKY first = self.get_list('first_interval_up') if self.get_string('first_interval_type') == 'melodic': first = first + map(lambda a: -a, self.get_list('first_interval_down')) last = self.get_list('last_interval_up') if self.get_string('last_interval_type') == 'melodic': last = last + map(lambda a: -a, self.get_list('last_interval_down')) if not (first and last): return self.ERR_CONFIGURE self.m_intervals = [random.choice(first), random.choice(last)] self.m_tonikas = [mpd.MusicalPitch().randomize("f", "f'"), mpd.MusicalPitch().randomize("f", "f'")] self.q_status = self.QSTATUS_NEW return self.OK
def flash(self, txt, **kwargs): """Display a message that is automatically removed after some time. If we flash a new message before the old flashed message are removed, we old flashed message are removed. """ if self.__timeout: GObject.source_remove(self.__timeout) self.display(txt, **kwargs) def f(self=self): self.__timeout = None if self.__stack: self.display(self.__stack[-1][0], **self.__stack[-1][1]) else: self.empty() self.__timeout = GObject.timeout_add(2000, f)
def _mic_level_watcher(self, bus, message, *args): structure = message.get_structure() peak = structure.get_value('peak')[0] self._log.info("Peak level %s", peak) if peak > SILENCE_DETECTION_RANGE[0] and not self.speaking_started: self.audio_from_mic = queue.Queue() self._log.info("Speaking started into queue %s", self.audio_from_mic) self.speaking_started = True if self._listening_timeout_timer: GObject.source_remove(self._listening_timeout_timer) self.speaking_finished_event = threading.Event() self.RecognizeCallback(self.audio_from_mic, self._mic_pipeline.dialogid, self.speaking_finished_event) if peak < SILENCE_DETECTION_RANGE[1] and self.speaking_started: self.silence_count += 1 if self.silence_count > 2: self._log.info("Speaking finished") self.speaking_started = False self.silence_count = 0 self._mic_pipeline.set_state(Gst.State.PAUSED) self.speaking_finished_event.set() aplay(ACK_BEEP, _bg=True) return True
def _set_repeat(self, delay=None, interval=None): if delay is not None and self.__repeat[0] is None: self.__tick_id = GObject.timeout_add(10, self._tick_cb) elif delay is None and self.__repeat[0] is not None: GObject.source_remove(self.__tick_id) self.__repeat = (delay, interval)
def set_refresh_interval(self, widget): """ Set new refresh interval and remove existing timeout if already exist. """ interval = int(widget.get_name()) if interval != self.config.get('refresh_interval'): GObject.source_remove(self.services['timeout']) self.run_background(interval) self.cfg_cls.set_key('refresh_interval', interval)
def quit(self, widget): if self.services['timeout']: GObject.source_remove(self.services['timeout']) os.unlink(self.cfg_cls.LOCKFILE) Gtk.main_quit()
def on_songs_start_change(self, widget, event): if self._timer: GObject.source_remove(self._timer) self._timer = None
def set_play(self, pos, time): if self._timer is not None: GObject.source_remove(self._timer) self._timer = None tracks = self._current_album.get_tracks() for index in range(0, pos): time = time + tracks[index].get_length() self._songs_scale.set_value(time+1) self._timer = GObject.timeout_add(1000, self._playing)
def set_pause(self): if self._timer is not None: GObject.source_remove(self._timer) self._timer = None
def end_practise(self): if self.m_timeout_handle: GObject.source_remove(self.m_timeout_handle) self.m_timeout_handle = None self.q_status = self.QSTATUS_NO soundcard.synth.stop()
def new_question(self): """returns: self.ERR_PICKY : if the question is not yet solved and the teacher is picky (== you have to solve the question before a new is asked). self.OK : if a new question was created. self.ERR_NO_ELEMS : if no elements are set to be practised. """ if self.m_timeout_handle: GObject.source_remove(self.m_timeout_handle) self.m_timeout_handle = None if self.get_bool('config/picky_on_new_question') \ and self.q_status in [self.QSTATUS_NEW, self.QSTATUS_WRONG]: return self.ERR_PICKY self.q_status = self.QSTATUS_NO norest_v = [] v = [] for x in self.m_P.header.rhythm_elements: if not (const.RHYTHMS[x][0] == "r" and self.get_bool("not_start_with_rest")): norest_v.append(x) v.append(x) if not v: return self.ERR_NO_ELEMS if not norest_v: return self.ERR_NO_ELEMS self.m_question = [random.choice(norest_v)] for x in range(1, self.get_int("num_beats")): self.m_question.append(random.choice(v)) self.q_status = self.QSTATUS_NEW return self.OK
def on_end_practise(self): #Gtk.idle_remove(self.__idle_tag) GObject.source_remove(self.__idle_tag) soundcard.solfege_c_midi.dsp_close()
def create_holders(self): """ create those |__| that represents one beat """ if self.__timeout: GObject.source_remove(self.__timeout) self.clear() for x in range(self.m_num_beats): self.g_box.pack_start(gu.create_png_image('holder'), False, False, 0) self.m_data = []
def create_holders(self): """ create those |__| that represents one beat """ if self.__timeout: GObject.source_remove(self.__timeout) self.clear() for x in range(self.m_num_notes): self.g_box.pack_start(gu.create_png_image('holder'), False, False, 0) self.m_data = []
def display(self, txt, **kwargs): self.empty() r = re.compile("(\{\w+\})") # Unicode?? self.set_size_request(-1, -1) for child in r.split(txt): m = r.match(child) if m: varname = child[1:][:-1] from solfege import lessonfilegui if isinstance(kwargs[varname], basestring): w = Gtk.Label(label=kwargs[varname]) w.set_name("FlashBarLabel") else: w = lessonfilegui.new_labelobject(kwargs[varname]) elif child: # don't create label for empty string w = Gtk.Label(label=child) w.set_name("FlashBarLabel") self.__content.pack_start(w, False, False, 0) w.show() self.m_sx = max(self.size_request().width, self.m_sx) self.m_sy = max(self.size_request().height, self.m_sy) self.set_size_request(self.m_sx, self.m_sy) if self.__timeout: GObject.source_remove(self.__timeout) self.__timeout = None
def on_up_release(self, eb, ev): if self.m_timeout: GObject.source_remove(self.m_timeout) self.m_timeout = None
def on_down_press(self, eb, ev): if self.m_timeout: GObject.source_remove(self.m_timeout) self.m_timeout = None if ev.type == Gdk.EventType.BUTTON_PRESS: if self.m_value > 0: self.down() if self.m_value > 0: self.m_timeout = GObject.timeout_add(DELAY1, self.on_down_timeout)
def on_down_release(self, eb, ev): if self.m_timeout: GObject.source_remove(self.m_timeout) self.m_timeout = None
def askInstallPackage(self, package, summary, description, homepage): # populate the dialog dia = self.dia dia_xml = self.dia_xml header = _("Install additional software?") body = _("Do you want to install package '%s'?") % package dia.set_title('') header_label = dia_xml.get_object('header_label') header_label.set_markup("<b><big>%s</big></b>" % header) body_label = dia_xml.get_object('body_label') body_label.set_label(body) description_text_view = dia_xml.get_object('description_text_view') tbuf = Gtk.TextBuffer() desc = "%s\n\n%s" % (summary, Helpers.format_description(description)) tbuf.set_text(desc) description_text_view.set_buffer(tbuf) dia.set_icon(Gtk.IconTheme.get_default().load_icon('package-x-generic', 16, False)) # check if another package manager is already running # FIXME: just checking for the existance of the file is # not sufficient, it need to be tested if it can # be locked via apt_pkg.get_lock() # - but that needs to run as root # - a dbus helper might be the best answer here #args = (update_button_status, dia_xml.get_object("yes_button"), # dia_xml.get_object("infolabel")) #args[0](*args[1:]) #timer_id = GObject.timeout_add(750, *args ) # show the dialog res = dia.run() #GObject.source_remove(timer_id) if res != Gtk.ResponseType.YES: dia.hide() return False return True # progress etc
def _update(self, widget, data=None): # This loop makes sure that we only call _find every 500 ms . if self.timer_id: # We destroy the last event source and create another one. GObject.source_remove(self.timer_id) self.timer_id = GObject.timeout_add(500, self._refresh, widget, data)
def _find_cb(self, widget, data): # This loop makes sure that we only call _find every 500 ms . if self.timer_id: # We destroy the last event source and create another one. GObject.source_remove(self.timer_id) self.timer_id = GObject.timeout_add(500, self._find, widget, data)
def Stop (self): gobject.source_remove (self.handle) self.active = False self.handle = None
def __update_refresh_rate(self, menuitem, minutes): if menuitem.get_active(): GObject.source_remove(self.refresh_source) self.refresh_source = GObject.timeout_add(minutes * 60 * 1000, self.__update_price)
def stop_timeout(self): try: if self.timeout_id: GObject.source_remove(self.timeout_id) self.timeout_id = 0 except: pass return True
def stop_ac_timeout(self, query, liststore): try: if self.ac_timeout_id: GObject.source_remove(self.ac_timeout_id) self.ac_timeout_id = 0 except ValueError: pass except: pass if query and liststore: autocomplete(query, liststore) return True
def pointer_position_watch(self, widget, event): if self.gui_window_fullscreen == False: return if self.fullscreen_hide_pointer_id != None: GObject.source_remove(self.fullscreen_hide_pointer_id) self.fullscreen_show_pointer() self.fullscreen_hide_pointer_id = GObject.timeout_add(3000, self.fullscreen_hide_pointer) if event.y < 20: self.gui_toolbar.show() else: self.gui_toolbar.hide()
def test_apthistory_rescan_big(self): """ create big history file and ensure that on rescan the events are still processed """ self._timeouts = [] new_history = os.path.join(self.basedir,"history.log.2") try: os.remove(new_history+".gz") except OSError: pass history = self._get_apt_history() self.assertEqual(len(history.transactions), 186) self._generate_big_history_file(new_history) timer_id = GObject.timeout_add(100, self._glib_timeout) with ExecutionTime("rescan %s byte file" % os.path.getsize(new_history+".gz")): history._rescan(use_cache=False) GObject.source_remove(timer_id) # verify rescan self.assertTrue(len(history.transactions) > 186) # check the timeouts self.assertTrue(len(self._timeouts) > 0) for i in range(len(self._timeouts)-1): # check that we get a max timeout of 0.2s if abs(self._timeouts[i] - self._timeouts[i+1]) > 0.2: raise os.remove(new_history+".gz")
def clear(self): super(PendingStore, self).clear() for sig in self._signals: GObject.source_remove(sig) del sig self._signals = []
def on_entry_changed(widget, data): def _work(): new_text = widget.get_text() (view, enquirer) = data with ExecutionTime("total time"): with ExecutionTime("enquire.set_query()"): enquirer.set_query(get_query_from_search_entry(new_text), limit=100 * 1000, nonapps_visible=NonAppVisibility.ALWAYS_VISIBLE) store = view.tree_view.get_model() with ExecutionTime("store.clear()"): store.clear() with ExecutionTime("store.set_from_matches()"): store.set_from_matches(enquirer.matches) with ExecutionTime("model settle (size=%s)" % len(store)): while Gtk.events_pending(): Gtk.main_iteration() return if widget.stamp: GObject.source_remove(widget.stamp) widget.stamp = GObject.timeout_add(250, _work)
def cleanup_timeout(self): if self._timeout > 0: GObject.source_remove(self._timeout) self._timeout = 0
def set_exhibits(self, exhibits_list): if not exhibits_list: return self.exhibits = exhibits_list self.cursor = 0 for child in self.index_hbox: child.destroy() for sigid in self._dotsigs: GObject.source_remove(sigid) self._dotsigs = [] if len(self.exhibits) > 1: for i, exhibit in enumerate(self.exhibits): dot = ExhibitButton() dot.set_size_request(StockEms.LARGE, StockEms.LARGE) self._dotsigs.append( dot.connect("clicked", self.on_paging_dot_clicked, len(self.exhibits) - 1 - i) # index ) self.index_hbox.pack_end(dot, False, False, 0) self.index_hbox.show_all() self._render_exhibit_at_cursor()
def _on_changed(self, widget): """ Call the actual search method after a small timeout to allow the user to enter a longer search term """ self._check_style() if self._timeout_id > 0: GObject.source_remove(self._timeout_id) self._timeout_id = GObject.timeout_add(self.SEARCH_TIMEOUT, self._emit_terms_changed)
def reset(self): for label, handler in zip(self._labels, self._handlers): GObject.source_remove(handler) label.destroy() self._labels = [] self._handlers = []
def set_app_details(self, app_details): if self._sig > 0: GObject.source_remove(self._sig) self.app_details = app_details self.appname = app_details.display_name self.pkgname = app_details.pkgname self._sig = self.app_details.connect( "screenshots-available", self._on_screenshots_available) self._screenshots = [] self.app_details.query_multiple_screenshots()
def clear(self): self.cancel.cancel() self.cancel.reset() for sig in self._handlers: GObject.source_remove(sig) for child in self: child.destroy()
def _on_axi_stamp_changed(self, monitor, afile, otherfile, event): # we only care about the utime() update from update-a-x-i if not event == Gio.FileMonitorEvent.ATTRIBUTE_CHANGED: return LOG.info("afile '%s' changed" % afile) if self._timeout_id: GObject.source_remove(self._timeout_id) self._timeout_id = None self._timeout_id = GObject.timeout_add(500, self.reopen)
def _on_lowlevel_transactions_changed(self, watcher, current, pending): # cleanup progress signal (to be sure to not leave dbus # matchers around) if self._progress_signal: GObject.source_remove(self._progress_signal) self._progress_signal = None # attach progress-changed signal for current transaction if current: try: trans = client.get_transaction(current) self._progress_signal = trans.connect("progress-changed", self._on_progress_changed) except dbus.DBusException: pass # now update pending transactions self.pending_transactions.clear() for tid in [current] + pending: if not tid: continue try: trans = client.get_transaction(tid, error_handler=lambda x: True) except dbus.DBusException: continue trans_progress = TransactionProgress(trans) try: self.pending_transactions[trans_progress.pkgname] = \ trans_progress except KeyError: # if its not a transaction from us (sc_pkgname) still # add it with the tid as key to get accurate results # (the key of pending_transactions is never directly # exposed in the UI) self.pending_transactions[trans.tid] = trans_progress # emit signal self.inject_fake_transactions_and_emit_changed_signal()
def hide_spinner(self): """ hide the spinner page again and show the content page """ if self._last_timeout_id is not None: GObject.source_remove(self._last_timeout_id) self._last_timeout_id = None self.spinner_view.stop_and_hide() self.set_current_page(self.CONTENT_PAGE)
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file, event): if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT: return if self._timeout_id: GObject.source_remove(self._timeout_id) self._timeout_id = None self._timeout_id = GObject.timeout_add_seconds(10, self.open)