Python gtk 模块,main_iteration() 实例源码
我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用gtk.main_iteration()。
def gui_ask_for_api():
"""Gtk dialog for API key insert."""
message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL)
message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>"))
entry = gtk.Entry(max=64)
entry.set_text("Enter your API key")
entry.show()
message.vbox.pack_end(entry)
entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK))
message.set_default_response(gtk.RESPONSE_OK)
message.run()
api_key = entry.get_text().decode('utf8')
fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w')
fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep))
fp.close()
# process buttong click immediately
message.destroy()
while gtk.events_pending():
gtk.main_iteration()
def stopall_collectors(self, button):
self.status_context_menu.tray_ind.set_icon(gtk.STOCK_NO)
self.status_context_menu.stopall_menu_item.set_sensitive(False)
self.status_context_menu.startall_menu_item.set_sensitive(True)
self.stopall_button.set_sensitive(False)
self.startall_button.set_sensitive(True)
i = 0.0
pb = ProgressBarDetails()
pb.setValue(0.0)
while gtk.events_pending():
gtk.main_iteration()
for collector in self.engine.collectors:
if collector.is_enabled() and isinstance(collector, engine.collector.AutomaticCollector):
collector.terminate()
pb.setValue(i/len(self.engine.collectors))
pb.appendText("stopping "+collector.name)
while gtk.events_pending():
gtk.main_iteration()
i += 1
pb.setValue(1.0)
if not pb.emit("delete-event", gtk.gdk.Event(gtk.gdk.DELETE)):
pb.destroy()
def fetch_picture(self):
if self.image_url:
tmp_dest = tempfile.mktemp(prefix='poster_', dir=self.locations['temp'])
self.image = tmp_dest.split('poster_', 1)[1]
dest = "%s.jpg" % tmp_dest
try:
self.progress.set_data(self.parent_window, _("Fetching poster"), _("Wait a moment"), False)
retriever = Retriever(self.image_url, self.parent_window, self.progress, dest, useurllib2=self.useurllib2)
retriever.start()
while retriever.isAlive():
self.progress.pulse()
if self.progress.status:
retriever.join()
while gtk.events_pending():
gtk.main_iteration()
urlcleanup()
except:
log.exception('')
self.image = ""
try:
os.remove("%s.jpg" % tmp_dest)
except:
log.info("Can't remove %s file" % tmp_dest)
else:
self.image = ""
def start(self):
while self.timeline.is_playing:
time.sleep(1.0/self.timeline.framerate)
self.timeline.on_goto(None,NEXT)
# while has fixed frames jump to the next
while self.timeline.frames[self.timeline.active].fixed:
if not self.timeline.is_replay and self.timeline.active >= len(self.timeline.frames)-1:
self.timeline.on_toggle_play(self.play_button)
self.timeline.on_goto(None,NEXT)
# if all frames is already played, and is no replay, stop the loop.
if not self.timeline.is_replay and self.timeline.active >= len(self.timeline.frames)-1:
self.timeline.on_toggle_play(self.play_button)
# call gtk event handler.
while gtk.events_pending():
gtk.main_iteration()
def gui_message(text, message_type):
"""Gtk dialog for error message display"""
message = gtk.MessageDialog(type=message_type, buttons=gtk.BUTTONS_CLOSE)
message.set_markup(text)
message.run()
message.destroy()
while gtk.events_pending():
gtk.main_iteration()
def __init__(self, useGtk=True):
self.context = gobject.main_context_default()
self.loop = gobject.MainLoop()
posixbase.PosixReactorBase.__init__(self)
# pre 2.3.91 the glib iteration and mainloop functions didn't release
# global interpreter lock, thus breaking thread and signal support.
if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
and not useGtk):
self.__pending = self.context.pending
self.__iteration = self.context.iteration
self.__crash = self.loop.quit
self.__run = self.loop.run
else:
import gtk
self.__pending = gtk.events_pending
self.__iteration = gtk.main_iteration
self.__crash = _our_mainquit
self.__run = gtk.main
# The input_add function in pygtk1 checks for objects with a
# 'fileno' method and, if present, uses the result of that method
# as the input source. The pygtk2 input_add does not do this. The
# function below replicates the pygtk1 functionality.
# In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
# g_io_add_watch() takes different condition bitfields than
# gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
# bug.
def update_view_timeout(self):
#print "view: update_view_timeout called at real time ", time.time()
# while the simulator is busy, run the gtk event loop
while not self.simulation.lock.acquire(False):
while gtk.events_pending():
gtk.main_iteration()
pause_messages = self.simulation.pause_messages
self.simulation.pause_messages = []
try:
self.update_view()
self.simulation.target_time = ns.core.Simulator.Now ().GetSeconds () + self.sample_period
#print "view: target time set to %f" % self.simulation.target_time
finally:
self.simulation.lock.release()
if pause_messages:
#print pause_messages
dialog = gtk.MessageDialog(parent=self.window, flags=0, type=gtk.MESSAGE_WARNING, buttons=gtk.BUTTONS_OK,
message_format='\n'.join(pause_messages))
dialog.connect("response", lambda d, r: d.destroy())
dialog.show()
self.play_button.set_active(False)
# if we're paused, stop the update timer
if not self.play_button.get_active():
self._update_timeout_id = None
return False
#print "view: self.simulation.go.set()"
self.simulation.go.set()
#print "view: done."
return True
def startall_collectors(self, button):
self.status_context_menu.tray_ind.set_icon(gtk.STOCK_MEDIA_RECORD)
self.status_context_menu.startall_menu_item.set_sensitive(False)
self.status_context_menu.stopall_menu_item.set_sensitive(True)
self.startall_button.set_sensitive(False)
self.stopall_button.set_sensitive(True)
i = 0.0
pb = ProgressBarDetails()
pb.setValue(0.0)
while gtk.events_pending():
gtk.main_iteration()
for collector in self.engine.collectors:
if collector.is_enabled() and isinstance(collector, engine.collector.AutomaticCollector):
collector.run()
pb.setValue(i / len(self.engine.collectors))
pb.appendText("processing "+collector.name)
pb.pbar.set_text("Starting " + collector.name)
while gtk.events_pending():
gtk.main_iteration()
i += 1
pb.setValue(1.0)
if not pb.emit("delete-event", gtk.gdk.Event(gtk.gdk.DELETE)):
pb.destroy()
def __init__(self):
self.ip_entry_text = ""
self.port_entry_text = ""
self.initial_entry_text = ""
self.comment_entry_text = ""
response = self.save_shot()
if response == gtk.RESPONSE_ACCEPT:
# wait 1 second before taking the snapshot
while gtk.events_pending():
gtk.main_iteration()
sleep(1)
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#take the snapshot
bitmap = autopy.bitmap.capture_screen()
savepath = os.path.join(os.getcwd(), 'plugins', 'collectors', 'manualscreenshot', 'raw')
savefilename = self.comment_entry_text.strip().replace("-","") + "-" + timestamp
savefull = os.path.join(savepath,savefilename+".png")
bitmap.save(savefull) #TODO: Fix to have raw folder created!
#write to metafile
f = open(os.path.join(savepath,"."+savefilename), 'w')
string = "[{\n" + \
" \"ip\" : \"" + self.ip_entry_text + "\"\n"\
" \"port\" : \"" + self.port_entry_text + "\"\n"\
" \"initial\" : \"" + self.initial_entry_text + "\"\n"\
" \"timestamp\" : \"" + timestamp + "\"\n"\
" \"savepath\" : \"" + savefull + "\"\n"\
" \"comment\" : \"" + self.comment_entry_text + "\"\n"\
"}]\n"
f.write(string)
f.close()
#print "write metadata to:",os.path.join(savepath,"."+savefilename)
#print "Screen shot taken:", savefull
#else:
#print "cancelled"
def refresh(self):
while gtk.events_pending():
gtk.main_iteration()
def __init__(self, useGtk=True):
self.context = gobject.main_context_default()
self.loop = gobject.MainLoop()
posixbase.PosixReactorBase.__init__(self)
# pre 2.3.91 the glib iteration and mainloop functions didn't release
# global interpreter lock, thus breaking thread and signal support.
if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
and not useGtk):
self.__pending = self.context.pending
self.__iteration = self.context.iteration
self.__crash = self.loop.quit
self.__run = self.loop.run
else:
import gtk
self.__pending = gtk.events_pending
self.__iteration = gtk.main_iteration
self.__crash = _our_mainquit
self.__run = gtk.main
# The input_add function in pygtk1 checks for objects with a
# 'fileno' method and, if present, uses the result of that method
# as the input source. The pygtk2 input_add does not do this. The
# function below replicates the pygtk1 functionality.
# In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
# g_io_add_watch() takes different condition bitfields than
# gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
# bug.
def popup_message(message):
"""shows popup message while executing decorated function"""
def wrap(f):
def wrapped_f(*args, **kwargs):
if gtk:
window = gtk.Window()
window.set_title('Griffith info')
window.set_position(gtk.WIN_POS_CENTER)
window.set_keep_above(True)
window.stick()
window.set_default_size(200, 50)
label = gtk.Label()
label.set_markup("""<big><b>Griffith:</b>
%s</big>""" % message)
window.add(label)
window.set_modal(True)
window.set_type_hint(gtk.gdk.WINDOW_TYPE_HINT_DIALOG)
window.show_all()
while gtk.events_pending(): # give GTK some time for updates
gtk.main_iteration()
else:
print message,
res = f(*args, **kwargs)
if gtk:
window.destroy()
else:
print ' [done]'
return res
return wrapped_f
return wrap
def _get(self, url, parent_window, decode=True):
# initialize the progress dialog once for the following search process
if not self.progress:
self.progress = Progress(parent_window)
data = None
url = url.encode('utf-8', 'replace')
log.debug('Using url <%s>', url)
self.progress.set_data(parent_window, _('Searching'), _('Wait a moment'), True)
try:
retriever = Retriever(url, parent_window, self.progress)
retriever.start()
while retriever.isAlive():
self.progress.pulse()
if self.progress.status:
retriever.join()
while gtk.events_pending():
gtk.main_iteration()
try:
if retriever.html:
ifile = file(retriever.html[0], 'rb')
try:
data = ifile.read()
finally:
ifile.close()
# check for gzip compressed pages before decoding to unicode
if len(data) > 2 and data[0:2] == '\037\213':
data = gutils.decompress(data)
if decode:
data = data.decode('utf-8', 'replace')
os.remove(retriever.html[0])
except IOError:
log.exception('')
finally:
self.progress.hide()
urlcleanup()
return data
def search(self, parent_window):
# dont use base functionality
# use the Amazon Web API
self.titles = [""]
self.ids = [""]
try:
accesskey = self.config.get('amazon_accesskey', None, section='extensions')
secretkey = self.config.get('amazon_secretkey', None, section='extensions')
if not accesskey or not secretkey:
gutils.error(_('Please configure your Amazon Access Key ID and Secret Key correctly in the preferences dialog.'))
return False
amazon.setLicense(accesskey, secretkey)
locale = self.config.get('amazon_locale', 0, section='extensions')
if locale == '1' or locale == 'UK':
locale = 'uk'
elif locale == '2' or locale == 'DE':
locale = 'de'
elif locale == '3' or locale == 'CA':
locale = 'ca'
elif locale == '4' or locale == 'FR':
locale = 'fr'
elif locale == '5' or locale == 'JP':
locale = 'jp'
else:
locale = None
retriever = AmazonRetriever(self.title, locale, parent_window, self.progress)
retriever.start()
while retriever.isAlive():
self.progress.pulse()
while gtk.events_pending():
gtk.main_iteration()
self.page = retriever.result
except:
try:
log.exception('Error retrieving results from amazon.')
log.error(retriever.result.Request.Errors.Error.Message)
except:
pass
return self.page
def do_events(self):
"""process any outstanding gtk events"""
while gtk.events_pending():
gtk.main_iteration(False)
def run(self):
gtk.gdk.threads_init()
DBusGMainLoop(set_as_default=True)
hmi = HMIService()
while True:
gtk.main_iteration()
def do_dmrmarc(self, do_import):
self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
if not self.do_dmrmarc_prompt():
self.window.set_cursor(None)
return
city = CONF.get("city", "dmrmarc")
state = CONF.get("state", "dmrmarc")
country = CONF.get("country", "dmrmarc")
# Do this in case the import process is going to take a while
# to make sure we process events leading up to this
gtk.gdk.window_process_all_updates()
while gtk.events_pending():
gtk.main_iteration(False)
if do_import:
eset = self.get_current_editorset()
dmrmarcstr = "dmrmarc://%s/%s/%s" % (city, state, country)
eset.do_import(dmrmarcstr)
else:
try:
from chirp import dmrmarc
radio = dmrmarc.DMRMARCRadio(None)
radio.set_params(city, state, country)
self.do_open_live(radio, read_only=True)
except errors.RadioError, e:
common.show_error(e)
self.window.set_cursor(None)
def do_rfinder(self, do_import):
self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
if not self.do_rfinder_prompt():
self.window.set_cursor(None)
return
lat = CONF.get_float("Latitude", "rfinder")
lon = CONF.get_float("Longitude", "rfinder")
passwd = CONF.get("Password", "rfinder")
email = CONF.get("Email", "rfinder")
miles = CONF.get_int("Range_in_Miles", "rfinder")
# Do this in case the import process is going to take a while
# to make sure we process events leading up to this
gtk.gdk.window_process_all_updates()
while gtk.events_pending():
gtk.main_iteration(False)
if do_import:
eset = self.get_current_editorset()
rfstr = "rfinder://%s/%s/%f/%f/%i" % \
(email, passwd, lat, lon, miles)
count = eset.do_import(rfstr)
else:
from chirp.drivers import rfinder
radio = rfinder.RFinderRadio(None)
radio.set_params((lat, lon), miles, email, passwd)
self.do_open_live(radio, read_only=True)
self.window.set_cursor(None)
def do_radioreference(self, do_import):
self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
if not self.do_radioreference_prompt():
self.window.set_cursor(None)
return
username = CONF.get("Username", "radioreference")
passwd = CONF.get("Password", "radioreference")
zipcode = CONF.get("Zipcode", "radioreference")
# Do this in case the import process is going to take a while
# to make sure we process events leading up to this
gtk.gdk.window_process_all_updates()
while gtk.events_pending():
gtk.main_iteration(False)
if do_import:
eset = self.get_current_editorset()
rrstr = "radioreference://%s/%s/%s" % (zipcode, username, passwd)
count = eset.do_import(rrstr)
else:
try:
from chirp import radioreference
radio = radioreference.RadioReferenceRadio(None)
radio.set_params(zipcode, username, passwd)
self.do_open_live(radio, read_only=True)
except errors.RadioError, e:
common.show_error(e)
self.window.set_cursor(None)
def grind(self):
while gtk.events_pending():
gtk.main_iteration(False)
self.prog.pulse()
def open_page(self, parent_window=None, url=None):
if url is None:
url_to_fetch = self.url
else:
url_to_fetch = url
if parent_window is not None:
self.parent_window = parent_window
self.progress.set_data(parent_window, _("Fetching data"), _("Wait a moment"), False)
retriever = Retriever(url_to_fetch, self.parent_window, self.progress, useurllib2=self.useurllib2)
retriever.start()
while retriever.isAlive():
self.progress.pulse()
if self.progress.status:
retriever.join()
while gtk.events_pending():
gtk.main_iteration()
data = None
try:
if retriever.exception is None:
if retriever.html:
ifile = file(retriever.html[0], "rb")
try:
data = ifile.read()
finally:
ifile.close()
# check for gzip compressed pages before decoding to unicode
if len(data) > 2 and data[0:2] == '\037\213':
data = gutils.decompress(data)
try:
# try to decode it strictly
if self.encode:
data = data.decode(self.encode)
except UnicodeDecodeError, exc:
# something is wrong, perhaps a wrong character set
# or some pages are not as strict as they should be
# (like OFDb, mixes utf8 with iso8859-1)
# I want to log the error here so that I can find it
# but the program should not terminate
log.error(exc)
data = data.decode(self.encode, 'ignore')
else:
gutils.urllib_error(_("Connection error"), self.parent_window)
except IOError:
log.exception('')
if url is None:
self.page = data
urlcleanup()
return data
def open_search(self, parent_window, destination=None):
self.titles = [""]
self.ids = [""]
if self.url.find('%s') > 0:
self.url = self.url % self.title
self.url = string.replace(self.url, ' ', '%20')
else:
if not self.usepostrequest:
self.url = string.replace(self.url + self.title, ' ', '%20')
try:
url = self.url.encode(self.encode)
except UnicodeEncodeError:
url = self.url.encode('utf-8')
self.progress.set_data(parent_window, _("Searching"), _("Wait a moment"), True)
if self.usepostrequest:
postdata = self.get_postdata()
retriever = Retriever(url, parent_window, self.progress, destination, useurllib2=self.useurllib2, postdata=postdata)
else:
retriever = Retriever(url, parent_window, self.progress, destination, useurllib2=self.useurllib2)
retriever.start()
while retriever.isAlive():
self.progress.pulse()
if self.progress.status:
retriever.join()
while gtk.events_pending():
gtk.main_iteration()
try:
if retriever.exception is None:
if destination:
# caller gave an explicit destination file
# don't care about the content
return True
if retriever.html:
ifile = file(retriever.html[0], 'rb')
try:
self.page = ifile.read()
finally:
ifile.close()
# check for gzip compressed pages before decoding to unicode
if len(self.page) > 2 and self.page[0:2] == '\037\213':
self.page = gutils.decompress(self.page)
self.page = self.page.decode(self.encode, 'replace')
else:
return False
else:
self.progress.hide()
gutils.urllib_error(_("Connection error"), parent_window)
return False
except IOError:
log.exception('')
finally:
urlcleanup()
return True
def on_terminal_keypress(self, widget, event, *args):
if shortcuts.has_key(get_key_name(event)):
cmd = shortcuts[get_key_name(event)]
if type(cmd) == list:
#comandos predefinidos
if cmd == _COPY:
self.terminal_copy(widget)
elif cmd == _PASTE:
self.terminal_paste(widget)
elif cmd == _COPY_ALL:
self.terminal_copy_all(widget)
elif cmd == _SAVE:
self.show_save_buffer(widget)
elif cmd == _FIND:
self.get_widget('txtSearch').select_region(0, -1)
self.get_widget('txtSearch').grab_focus()
elif cmd == _FIND_NEXT:
if hasattr(self, 'search'):
self.find_word()
elif cmd == _CLEAR:
widget.reset(True, True)
elif cmd == _FIND_BACK:
if hasattr(self, 'search'):
self.find_word(backwards=True)
elif cmd == _CONSOLE_PREV:
widget.get_parent().get_parent().prev_page()
elif cmd == _CONSOLE_NEXT:
widget.get_parent().get_parent().next_page()
elif cmd == _CONSOLE_CLOSE:
wid = widget.get_parent()
page = widget.get_parent().get_parent().page_num(wid)
if page != -1:
widget.get_parent().get_parent().remove_page(page)
wid.destroy()
elif cmd == _CONSOLE_RECONNECT:
if not hasattr(widget, "command"):
widget.fork_command(SHELL)
else:
widget.fork_command(widget.command[0], widget.command[1])
while gtk.events_pending():
gtk.main_iteration(False)
#esperar 2 seg antes de enviar el pass para dar tiempo a que se levante expect y prevenir que se muestre el pass
if widget.command[2]!=None and widget.command[2]!='':
gobject.timeout_add(2000, self.send_data, widget, widget.command[2])
widget.get_parent().get_parent().get_tab_label(widget.get_parent()).mark_tab_as_active()
return True
elif cmd == _CONNECT:
self.on_btnConnect_clicked(None)
elif cmd[0][0:8] == "console_":
page = int(cmd[0][8:]) - 1
widget.get_parent().get_parent().set_current_page(page)
else:
#comandos del usuario
widget.feed_child(cmd)
return True
return False
def do_repeaterbook_proximity(self, do_import):
self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
if not self.do_repeaterbook_proximity_prompt():
self.window.set_cursor(None)
return
loc = CONF.get("location", "repeaterbook")
try:
dist = int(CONF.get("distance", "repeaterbook"))
except:
dist = 20
try:
band = int(CONF.get("band", "repeaterbook")) or '%'
band = str(band)
except:
band = '%'
query = "https://www.repeaterbook.com/repeaters/downloads/CHIRP/" \
"app_direct.php?loc=%s&band=%s&dist=%s" % (loc, band, dist)
print query
# Do this in case the import process is going to take a while
# to make sure we process events leading up to this
gtk.gdk.window_process_all_updates()
while gtk.events_pending():
gtk.main_iteration(False)
fn = tempfile.mktemp(".csv")
filename, headers = urllib.urlretrieve(query, fn)
if not os.path.exists(filename):
LOG.error("Failed, headers were: %s", headers)
common.show_error(_("RepeaterBook query failed"))
self.window.set_cursor(None)
return
try:
# Validate CSV
radio = repeaterbook.RBRadio(filename)
if radio.errors:
reporting.report_misc_error("repeaterbook",
("query=%s\n" % query) +
("\n") +
("\n".join(radio.errors)))
except errors.InvalidDataError, e:
common.show_error(str(e))
self.window.set_cursor(None)
return
except Exception, e:
common.log_exception()
reporting.report_model_usage(radio, "import", True)
self.window.set_cursor(None)
if do_import:
eset = self.get_current_editorset()
count = eset.do_import(filename)
else:
self.do_open_live(radio, read_only=True)