我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用gi.repository.GObject.threads_init()。
def main(): format_ = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' logging.basicConfig(level=logging.INFO, format=format_) logger = logging.getLogger(__name__) if geteuid() == 0: logger.error("Running eduVPN client as root is not supported (yet)") exit(1) GObject.threads_init() if have_dbus(): import dbus.mainloop.glib dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) # import this later so the logging is properly configured from eduvpn.ui import EduVpnApp edu_vpn_app = EduVpnApp() edu_vpn_app.run() Gtk.main()
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the loop that the application runs in GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero/application{}'.format(id(self)) self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Initialise services within the application self.services = [] self.dongle = adapter.Adapter(device_id)
def run(self): GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName( "com.spoppy", dbus.SessionBus() ) super(SpoppyDBusService, self).__init__( bus_name, "/com/spoppy" ) self._loop = GObject.MainLoop() while self.running: try: logger.debug('Starting dbus loop') self._loop.run() except KeyboardInterrupt: logger.debug('Loop interrupted, will restart')
def __init__(self, config=None): self.config_server_url = config["url"] self.port = config["port"] self.python = config["python"] self.receive = config["receive"] self.name = config["name"] self.key = config["key"] self.record = config["record"] self.upnp_client = miniupnpc.UPnP() self.connection_attempts = 0 self.receive_from_ip = config["receive_from_ip"] self.receive_from_port = config["receive_from_port"] atexit.register(self.cleanup) signal.signal(signal.SIGINT, self.cleanup) GObject.threads_init() Gst.init(None)
def __init__( self ): GObject.threads_init() self.config = SafeConfigParser(allow_no_value=True) config_base_path = os.path.expanduser('~/.config/gstation-edit') if not os.path.isdir(config_base_path): os.makedirs(config_base_path) self.config_path = os.path.join(config_base_path, 'settings.cfg') self.config.read(self.config_path) gtk_builder_file = os.path.join('gstation_edit/resources', 'gstation-edit-one-window.ui') self.gtk_builder = Gtk.Builder() self.gtk_builder.add_from_file(gtk_builder_file) self.js_sniffer = JStationSniffer(sys.argv[0]) self.midi_dlg = MidiSelectDlg(self.gtk_builder, self.config, self, self.js_sniffer, self.on_connected, self.quit) self.midi_dlg.gtk_dlg.connect('destroy', self.quit)
def __init__( self ): GObject.threads_init() self.config = SafeConfigParser(allow_no_value=True) config_base_path = os.path.expanduser('~/.config/gstation-edit') if not os.path.isdir(config_base_path): os.makedirs(config_base_path) self.config_path = os.path.join(config_base_path, 'settings.cfg') self.config.read(self.config_path) gtk_builder_file = os.path.join(DATA_ROOT_DIR, 'gstation-edit-one-window.ui') self.gtk_builder = Gtk.Builder() self.gtk_builder.add_from_file(gtk_builder_file) self.main_window = MainWindow(sys.argv[0], self.config, self.gtk_builder) self.main_window.gtk_window.connect('destroy', self.quit)
def main(): GObject.threads_init() Gst.init([]) args = get_args() core_ip = socket.gethostbyname(args.host) server_caps, args = get_server_conf(core_ip, args.source_id, args) pipeline = mk_pipeline(args, server_caps, core_ip) clock = get_clock(core_ip) run_pipeline(pipeline, clock, args.audio_delay, args.video_delay)
def main(): if dbus.SessionBus().request_name( 'es.atareao.PomodoroIndicator') !=\ dbus.bus.REQUEST_NAME_REPLY_PRIMARY_OWNER: print("application already running") exit(0) GObject.threads_init() Gst.init(None) Gst.init_check(None) Notify.init('pomodoro-indicator') Pomodoro_Indicator() Gtk.main()
def run(): Game() GObject.threads_init() Gtk.main() return 0
def __init__(self): GObject.threads_init() self._loop = GObject.MainLoop() self._player = Player()
def main(): config.load() GObject.threads_init() main_win = MainWindow() thread = UpdateThread(main_win) thread.start() signal.signal(signal.SIGINT, signal.SIG_DFL) Gtk.main()
def __init__(self, uuid): Budgie.Applet.__init__(self) self.app = WsOverviewWin() GObject.threads_init() self.button = self.app.appbutton self.add(self.button) self.show_all()
def run(self): GObject.threads_init() Gdk.threads_init() Gtk.main() gtkthread.quit.set()
def do_activate(self): GObject.threads_init() gettext.install('easy-ebook-viewer', '/usr/share/easy-ebook-viewer/locale') # We only allow a single window and raise any existing ones if not self.window: # Windows are associated with the application # when the last one is closed the application shuts down self.window = MainWindow(file_path=self.file_path) self.window.connect("delete-event", self.on_quit) self.window.set_wmclass("easy-ebook-viewer", "easy-ebook-viewer") self.window.show_all() if not self.window.book_loaded: self.window.header_bar_component.hide_jumping_navigation() Gtk.main()
def run(self): GObject.threads_init() self.fsmonitor.start() self.run_tests_in_background() gtk.main()
def initialize(self): """Initialize bluez DBus communication. Must be called before any other calls are made! """ # Ensure GLib's threading is initialized to support python threads, and # make a default mainloop that all DBus objects will inherit. These # commands MUST execute before any other DBus commands! GObject.threads_init() dbus.mainloop.glib.threads_init() # Set the default main loop, this also MUST happen before other DBus calls. self._mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) # Get the main DBus system bus and root bluez object. self._bus = dbus.SystemBus() self._bluez = dbus.Interface(self._bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
def run(self): try: time.sleep(10) # Give WiFi a chance to connect signal.signal(signal.SIGINT, signal.SIG_DFL) self.indicator.set_menu(self.__menu()) self.refresh_source = GObject.timeout_add(REFRESH_RATES[0] * 60 * 1000, self.__update_price) self.__update_price() GObject.threads_init() gtk.main() except Exception: pass
def start_glib_loop(cls): """ If your program does not use the GLib loop, call this first. It initializes threads, GStreamer, and starts the GLib loop in a separate thread. """ GObject.threads_init() Gst.init(None) cls.glib_loop = GObject.MainLoop() cls.glib_thread = threading.Thread(target=cls.glib_loop.run) cls.glib_thread.start()
def __init__(self, obj, obj_hook): GObject.threads_init() Gst.init(None) self.obj = obj self.obj_hook = obj_hook self.state = Gst.State.NULL self.pipeline = Gst.Pipeline()
def __init__(self, **kwargs): self.ui = None GObject.threads_init() listen() Gtk.Application.__init__(self, application_id='com.github.geigi.cozy') GLib.setenv("PULSE_PROP_media.role", "music", True) import gettext locale.bindtextdomain('cozy', localedir) locale.textdomain('cozy') gettext.install('cozy', localedir)
def main(): init_i18n() # threads_init isn't required since PyGObject 3.10.2, but just in case # we're on something ancient... GObject.threads_init() app = EmuApplication() app.run(sys.argv)
def __init__(self, uuid): Budgie.Applet.__init__(self) self.uuid = uuid self.connect("destroy", Gtk.main_quit) self.settings = cw.settings icon = Gtk.Image.new_from_icon_name( "budgie-clockworks-panel", Gtk.IconSize.MENU ) self.provider = Gtk.CssProvider.new() self.provider.load_from_data(css_data.encode()) # maingrid self.maingrid = Gtk.Grid() self.maingrid.set_row_spacing(2) self.maingrid.attach(Gtk.Label(" " * 10), 0, 0, 1, 1) self.maingrid.set_column_spacing(5) self.clocklist = {} # create initial clock if it does not exists currcl_data = self.read_datafile() if currcl_data: for cl in currcl_data: off = cl[1] clname = cl[2] self.create_newclock(offset=off, clockname=clname) else: self.create_newclock(offset=0) self.dashbuttonbox = Gtk.Box() self.add_button = Gtk.Button() self.add_button.set_relief(Gtk.ReliefStyle.NONE) self.add_icon = Gtk.Image.new_from_icon_name( "list-add-symbolic", Gtk.IconSize.MENU ) self.add_button.set_image(self.add_icon) self.search_button = Gtk.Button() self.search_button.set_relief(Gtk.ReliefStyle.NONE) self.search_icon = Gtk.Image.new_from_icon_name( "system-search-symbolic", Gtk.IconSize.MENU ) self.search_button.set_image(self.search_icon) self.add_button.connect("clicked", self.create_newclock) self.search_button.connect("clicked", self.run_search) self.maingrid.attach(self.add_button, 100, 3, 1, 1) self.maingrid.attach(self.search_button, 100, 4, 1, 1) # throw it in popover self.box = Gtk.EventBox() self.box.add(icon) self.add(self.box) self.popover = Budgie.Popover.new(self.box) self.popover.add(self.maingrid) self.popover.get_child().show_all() self.box.show_all() self.show_all() self.box.connect("button-press-event", self.on_press) self.settings.connect("changed", self.update_clockcolor) # thread GObject.threads_init() self.update = Thread(target=self.update_gmt) # daemonize the thread to make the indicator stopable self.update.setDaemon(True) self.update.start()
def __init__(self, id, name, address, update_rate, number_of_pings, show_indicator, show_text, is_activated=None): """Initialize. :param id: :param address: :param update_rate: :param number_of_pings: :param show_indicator: :param is_activated: """ # init gobject GObject.GObject.__init__(self) GObject.type_register(PingObject) GObject.threads_init() # ping object properties self.id = id self.name = name if not self.name: self.name = address self.address = address self.update_rate = update_rate self.number_of_pings = number_of_pings self.show_indicator = show_indicator self.show_text = show_text if is_activated is None: self.is_activated = True else: self.is_activated = is_activated self.icon = "icon_red" if not self.is_activated: self.icon = "icon_grey" self.ping_warning = 50.0 # indicator menu item self.menu_item = gtk.ImageMenuItem("Ping: " + address) # result self.result = PingStruct(RESULT_NO_RESPONSE, 0.0, 0.0, 0.0, 0.0) # gtk image for menu item self.image = gtk.Image() self.image.set_from_file(resource.image_path("icon_gray", theme.THEME)) # ping state self.state = "Ping: " + self.address + " initializing." # store subprocess self.process = None # mutex self.mutex = threading.Lock() # thread self.thread = None self.is_running = False # interruptable sleep function self.stop_event = threading.Event() # current time self.time_last = time.time() # signal definition returns (id, name, icon name, show_indicator)
def __init__(self): """Initialize the indicator. """ # initialize gobject GObject.GObject.__init__(self) GObject.type_register(AnyPingIndicator) GObject.threads_init() # initialize mutex self.mutex = threading.Lock() # icon counter to get different icon names self.icon_count = 0 # list of icon tuples self.list_of_icon_tuple = [] # get ping objects from config self.ping_objects_tuple = config.ping_object_tuples self.ping_objects = [] count = 0 for item in self.ping_objects_tuple: self.ping_objects.append(PingObject(count, item.name, item.address, item.update_rate, item.number_of_pings, item.show_indicator, item.show_text, item.is_activated)) self.ping_objects[count].set_ping_warning(config.ping_warning) count += 1 # update list of icon tuples self.update_list_of_icon_tuples() # init windows variables self.preferences_window = None self.about_dialog = None # check autostart self.check_autostart() # initialize notification notify.init(APPINDICATOR_ID) # initialize and build indicator menu self.menu = gtk.Menu() self.build_menu() # initialize indicator self.indicator = appindicator.Indicator.new(APPINDICATOR_ID, resource.image_path("icon_red", theme.THEME), appindicator.IndicatorCategory.SYSTEM_SERVICES) self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE) self.indicator.set_menu(self.menu) # update indicator icon self.update_indicator_icon() # start ping objects self.start_ping_objects()
def gst_rtp_pipeline(test_params): print("starting gstreamer rtp pipeline..") GObject.threads_init() Gst.init(None) # setup a pipeline which will receive RTP video and decode it, calling new_gst_buffer() on every decoded frame udpsrc = Gst.ElementFactory.make("udpsrc", None) udpsrc.set_property('port', test_params.rtp_port) udpsrc.set_property('caps', Gst.caps_from_string('application/x-rtp, encoding-name=H264, payload=96')) rtph264depay = Gst.ElementFactory.make("rtph264depay", None) h264parse = Gst.ElementFactory.make("h264parse", None) avdec_h264 = Gst.ElementFactory.make("avdec_h264", None) sink = Gst.ElementFactory.make("appsink", None) pipeline = Gst.Pipeline.new("rtp-pipeline") pipeline.add(udpsrc) pipeline.add(rtph264depay) pipeline.add(h264parse) pipeline.add(avdec_h264) pipeline.add(sink) udpsrc.link(rtph264depay) rtph264depay.link(h264parse) h264parse.link(avdec_h264) avdec_h264.link(sink) # gst event loop/bus loop_thread = GstMainLoopThread() sink.set_property("emit-signals", True) sink.connect("new-sample", loop_thread.gst_new_buffer, sink) bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", loop_thread.gst_bus_call, loop_thread.loop) # start play back and listed to events pipeline.set_state(Gst.State.PLAYING) loop_thread.start() yield loop_thread print("stopping gstreamer rtp pipeline..") loop_thread.loop.quit() pipeline.set_state(Gst.State.NULL)