Python gi.repository.GObject 模块,threads_init() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用gi.repository.GObject.threads_init()

项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def main():
    format_ = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
    logging.basicConfig(level=logging.INFO, format=format_)
    logger = logging.getLogger(__name__)

    if geteuid() == 0:
        logger.error("Running eduVPN client as root is not supported (yet)")
        exit(1)

    GObject.threads_init()

    if have_dbus():
        import dbus.mainloop.glib
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    # import this later so the logging is properly configured
    from eduvpn.ui import EduVpnApp

    edu_vpn_app = EduVpnApp()
    edu_vpn_app.run()
    Gtk.main()
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the loop that the application runs in
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero/application{}'.format(id(self))
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Initialise services within the application
        self.services = []

        self.dongle = adapter.Adapter(device_id)
项目:spoppy    作者:sindrig    | 项目源码 | 文件源码
def run(self):
            GObject.threads_init()
            dbus.mainloop.glib.threads_init()
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            bus_name = dbus.service.BusName(
                "com.spoppy",
                dbus.SessionBus()
            )
            super(SpoppyDBusService, self).__init__(
                bus_name, "/com/spoppy"
            )

            self._loop = GObject.MainLoop()
            while self.running:
                try:
                    logger.debug('Starting dbus loop')
                    self._loop.run()
                except KeyboardInterrupt:
                    logger.debug('Loop interrupted, will restart')
项目:matriz    作者:stressfm    | 项目源码 | 文件源码
def __init__(self, config=None):
        self.config_server_url = config["url"]
        self.port = config["port"]
        self.python = config["python"]
        self.receive = config["receive"]
        self.name = config["name"]
        self.key = config["key"]
        self.record = config["record"]
        self.upnp_client = miniupnpc.UPnP()
        self.connection_attempts = 0
        self.receive_from_ip = config["receive_from_ip"]
        self.receive_from_port = config["receive_from_port"]
        atexit.register(self.cleanup)
        signal.signal(signal.SIGINT, self.cleanup)
        GObject.threads_init()
        Gst.init(None)
项目:gstation-edit    作者:fengalin    | 项目源码 | 文件源码
def __init__( self ):
        GObject.threads_init()

        self.config = SafeConfigParser(allow_no_value=True)
        config_base_path = os.path.expanduser('~/.config/gstation-edit')
        if not os.path.isdir(config_base_path):
            os.makedirs(config_base_path)
        self.config_path = os.path.join(config_base_path, 'settings.cfg')
        self.config.read(self.config_path)


        gtk_builder_file = os.path.join('gstation_edit/resources',
                                        'gstation-edit-one-window.ui')

        self.gtk_builder = Gtk.Builder()
        self.gtk_builder.add_from_file(gtk_builder_file)

        self.js_sniffer = JStationSniffer(sys.argv[0])
        self.midi_dlg = MidiSelectDlg(self.gtk_builder, self.config, self,
                                      self.js_sniffer,
                                      self.on_connected, self.quit)
        self.midi_dlg.gtk_dlg.connect('destroy', self.quit)
项目:gstation-edit    作者:fengalin    | 项目源码 | 文件源码
def __init__( self ):
        GObject.threads_init()

        self.config = SafeConfigParser(allow_no_value=True)
        config_base_path = os.path.expanduser('~/.config/gstation-edit')
        if not os.path.isdir(config_base_path):
            os.makedirs(config_base_path)
        self.config_path = os.path.join(config_base_path, 'settings.cfg')
        self.config.read(self.config_path)

        gtk_builder_file = os.path.join(DATA_ROOT_DIR,
                                        'gstation-edit-one-window.ui')

        self.gtk_builder = Gtk.Builder()
        self.gtk_builder.add_from_file(gtk_builder_file)

        self.main_window = MainWindow(sys.argv[0], self.config, self.gtk_builder)
        self.main_window.gtk_window.connect('destroy', self.quit)
项目:voctomix-outcasts    作者:CarlFK    | 项目源码 | 文件源码
def main():
    GObject.threads_init()
    Gst.init([])

    args = get_args()
    core_ip = socket.gethostbyname(args.host)

    server_caps, args = get_server_conf(core_ip, args.source_id, args)

    pipeline = mk_pipeline(args, server_caps, core_ip)

    clock = get_clock(core_ip)

    run_pipeline(pipeline, clock, args.audio_delay, args.video_delay)
项目:pomodoro-indicator    作者:atareao    | 项目源码 | 文件源码
def main():
    if dbus.SessionBus().request_name(
        'es.atareao.PomodoroIndicator') !=\
            dbus.bus.REQUEST_NAME_REPLY_PRIMARY_OWNER:
        print("application already running")
        exit(0)
    GObject.threads_init()
    Gst.init(None)
    Gst.init_check(None)
    Notify.init('pomodoro-indicator')
    Pomodoro_Indicator()
    Gtk.main()
项目:jcchess    作者:johncheetham    | 项目源码 | 文件源码
def run():
    Game()
    GObject.threads_init()
    Gtk.main()
    return 0
项目:adblockradio    作者:quasoft    | 项目源码 | 文件源码
def __init__(self):
        GObject.threads_init()

        self._loop = GObject.MainLoop()
        self._player = Player()
项目:btcwidget    作者:rafalh    | 项目源码 | 文件源码
def main():
    config.load()
    GObject.threads_init()
    main_win = MainWindow()
    thread = UpdateThread(main_win)
    thread.start()
    signal.signal(signal.SIGINT, signal.SIG_DFL)
    Gtk.main()
项目:budgie-extras    作者:UbuntuBudgie    | 项目源码 | 文件源码
def __init__(self, uuid):
        Budgie.Applet.__init__(self)
        self.app = WsOverviewWin()
        GObject.threads_init()
        self.button = self.app.appbutton
        self.add(self.button)
        self.show_all()
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def run(self):
        GObject.threads_init()
        Gdk.threads_init()
        Gtk.main()
        gtkthread.quit.set()
项目:Ebook-Viewer    作者:michaldaniel    | 项目源码 | 文件源码
def do_activate(self):
        GObject.threads_init()
        gettext.install('easy-ebook-viewer', '/usr/share/easy-ebook-viewer/locale')
        # We only allow a single window and raise any existing ones
        if not self.window:
            # Windows are associated with the application
            # when the last one is closed the application shuts down
            self.window = MainWindow(file_path=self.file_path)
            self.window.connect("delete-event", self.on_quit)
            self.window.set_wmclass("easy-ebook-viewer", "easy-ebook-viewer")
        self.window.show_all()
        if not self.window.book_loaded:
            self.window.header_bar_component.hide_jumping_navigation()
        Gtk.main()
项目:testindicator    作者:logileifs    | 项目源码 | 文件源码
def run(self):
        GObject.threads_init()
        self.fsmonitor.start()
        self.run_tests_in_background()
        gtk.main()
项目:Specdrums_Python_BLE    作者:specdrums    | 项目源码 | 文件源码
def initialize(self):
        """Initialize bluez DBus communication.  Must be called before any other
        calls are made!
        """
        # Ensure GLib's threading is initialized to support python threads, and
        # make a default mainloop that all DBus objects will inherit.  These
        # commands MUST execute before any other DBus commands!
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        # Set the default main loop, this also MUST happen before other DBus calls.
        self._mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        # Get the main DBus system bus and root bluez object.
        self._bus = dbus.SystemBus()
        self._bluez = dbus.Interface(self._bus.get_object('org.bluez', '/'),
                                     'org.freedesktop.DBus.ObjectManager')
项目:Specdrums_Python_BLE    作者:specdrums    | 项目源码 | 文件源码
def initialize(self):
        """Initialize bluez DBus communication.  Must be called before any other
        calls are made!
        """
        # Ensure GLib's threading is initialized to support python threads, and
        # make a default mainloop that all DBus objects will inherit.  These
        # commands MUST execute before any other DBus commands!
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        # Set the default main loop, this also MUST happen before other DBus calls.
        self._mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        # Get the main DBus system bus and root bluez object.
        self._bus = dbus.SystemBus()
        self._bluez = dbus.Interface(self._bus.get_object('org.bluez', '/'),
                                     'org.freedesktop.DBus.ObjectManager')
项目:cryptobar    作者:akibabu    | 项目源码 | 文件源码
def run(self):
        try:
            time.sleep(10) # Give WiFi a chance to connect 
            signal.signal(signal.SIGINT, signal.SIG_DFL)
            self.indicator.set_menu(self.__menu())
            self.refresh_source = GObject.timeout_add(REFRESH_RATES[0] * 60 * 1000, self.__update_price)
            self.__update_price()
            GObject.threads_init()
            gtk.main()
        except Exception:
            pass
项目:pyplaybin    作者:fraca7    | 项目源码 | 文件源码
def start_glib_loop(cls):
        """
        If your program does not use the GLib loop, call this
        first. It initializes threads, GStreamer, and starts the GLib
        loop in a separate thread.
        """
        GObject.threads_init()
        Gst.init(None)

        cls.glib_loop = GObject.MainLoop()
        cls.glib_thread = threading.Thread(target=cls.glib_loop.run)
        cls.glib_thread.start()
项目:KTV    作者:RainMark    | 项目源码 | 文件源码
def __init__(self, obj, obj_hook):
        GObject.threads_init()
        Gst.init(None)
        self.obj = obj
        self.obj_hook = obj_hook
        self.state = Gst.State.NULL
        self.pipeline = Gst.Pipeline()
项目:cozy    作者:geigi    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.ui = None

        GObject.threads_init()
        listen()

        Gtk.Application.__init__(self, application_id='com.github.geigi.cozy')

        GLib.setenv("PULSE_PROP_media.role", "music", True)

        import gettext

        locale.bindtextdomain('cozy', localedir)
        locale.textdomain('cozy')
        gettext.install('cozy', localedir)
项目:python-sense-emu    作者:RPi-Distro    | 项目源码 | 文件源码
def main():
    init_i18n()
    # threads_init isn't required since PyGObject 3.10.2, but just in case
    # we're on something ancient...
    GObject.threads_init()
    app = EmuApplication()
    app.run(sys.argv)
项目:budgie-extras    作者:UbuntuBudgie    | 项目源码 | 文件源码
def __init__(self, uuid):
        Budgie.Applet.__init__(self)
        self.uuid = uuid
        self.connect("destroy", Gtk.main_quit)
        self.settings = cw.settings
        icon = Gtk.Image.new_from_icon_name(
            "budgie-clockworks-panel", Gtk.IconSize.MENU
        )
        self.provider = Gtk.CssProvider.new()
        self.provider.load_from_data(css_data.encode())
        # maingrid
        self.maingrid = Gtk.Grid()
        self.maingrid.set_row_spacing(2)
        self.maingrid.attach(Gtk.Label(" " * 10), 0, 0, 1, 1)
        self.maingrid.set_column_spacing(5)
        self.clocklist = {}
        # create initial clock if it does not exists
        currcl_data = self.read_datafile()
        if currcl_data:
            for cl in currcl_data:
                off = cl[1]
                clname = cl[2]
                self.create_newclock(offset=off, clockname=clname)
        else:
            self.create_newclock(offset=0)
        self.dashbuttonbox = Gtk.Box()
        self.add_button = Gtk.Button()
        self.add_button.set_relief(Gtk.ReliefStyle.NONE)
        self.add_icon = Gtk.Image.new_from_icon_name(
            "list-add-symbolic", Gtk.IconSize.MENU
        )
        self.add_button.set_image(self.add_icon)
        self.search_button = Gtk.Button()
        self.search_button.set_relief(Gtk.ReliefStyle.NONE)
        self.search_icon = Gtk.Image.new_from_icon_name(
            "system-search-symbolic", Gtk.IconSize.MENU
        )
        self.search_button.set_image(self.search_icon)
        self.add_button.connect("clicked", self.create_newclock)
        self.search_button.connect("clicked", self.run_search)
        self.maingrid.attach(self.add_button, 100, 3, 1, 1)
        self.maingrid.attach(self.search_button, 100, 4, 1, 1)
        # throw it in popover
        self.box = Gtk.EventBox()
        self.box.add(icon)
        self.add(self.box)
        self.popover = Budgie.Popover.new(self.box)
        self.popover.add(self.maingrid)
        self.popover.get_child().show_all()
        self.box.show_all()
        self.show_all()
        self.box.connect("button-press-event", self.on_press)
        self.settings.connect("changed", self.update_clockcolor)
        # thread
        GObject.threads_init()
        self.update = Thread(target=self.update_gmt)
        # daemonize the thread to make the indicator stopable
        self.update.setDaemon(True)
        self.update.start()
项目:any_ping_indicator    作者:leggedrobotics    | 项目源码 | 文件源码
def __init__(self, id, name, address, update_rate, number_of_pings,
                 show_indicator, show_text, is_activated=None):
        """Initialize.
        :param id:
        :param address:
        :param update_rate:
        :param number_of_pings:
        :param show_indicator:
        :param is_activated:
        """
        # init gobject
        GObject.GObject.__init__(self)
        GObject.type_register(PingObject)
        GObject.threads_init()
        # ping object properties
        self.id = id
        self.name = name
        if not self.name:
            self.name = address
        self.address = address
        self.update_rate = update_rate
        self.number_of_pings = number_of_pings
        self.show_indicator = show_indicator
        self.show_text = show_text
        if is_activated is None:
            self.is_activated = True
        else:
            self.is_activated = is_activated
        self.icon = "icon_red"
        if not self.is_activated:
            self.icon = "icon_grey"
        self.ping_warning = 50.0
        # indicator menu item
        self.menu_item = gtk.ImageMenuItem("Ping: " + address)
        # result
        self.result = PingStruct(RESULT_NO_RESPONSE, 0.0, 0.0, 0.0, 0.0)
        # gtk image for menu item
        self.image = gtk.Image()
        self.image.set_from_file(resource.image_path("icon_gray", theme.THEME))
        # ping state
        self.state = "Ping: " + self.address + " initializing."
        # store subprocess
        self.process = None
        # mutex
        self.mutex = threading.Lock()
        # thread
        self.thread = None
        self.is_running = False
        # interruptable sleep function
        self.stop_event = threading.Event()
        # current time
        self.time_last = time.time()

    # signal definition returns (id, name, icon name, show_indicator)
项目:any_ping_indicator    作者:leggedrobotics    | 项目源码 | 文件源码
def __init__(self):
        """Initialize the indicator.
        """
        # initialize gobject
        GObject.GObject.__init__(self)
        GObject.type_register(AnyPingIndicator)
        GObject.threads_init()
        # initialize mutex
        self.mutex = threading.Lock()
        # icon counter to get different icon names
        self.icon_count = 0
        # list of icon tuples
        self.list_of_icon_tuple = []
        # get ping objects from config
        self.ping_objects_tuple = config.ping_object_tuples
        self.ping_objects = []
        count = 0
        for item in self.ping_objects_tuple:
            self.ping_objects.append(PingObject(count,
                                                item.name,
                                                item.address,
                                                item.update_rate,
                                                item.number_of_pings,
                                                item.show_indicator,
                                                item.show_text,
                                                item.is_activated))
            self.ping_objects[count].set_ping_warning(config.ping_warning)
            count += 1
        # update list of icon tuples
        self.update_list_of_icon_tuples()
        # init windows variables
        self.preferences_window = None
        self.about_dialog = None
        # check autostart
        self.check_autostart()
        # initialize notification
        notify.init(APPINDICATOR_ID)
        # initialize and build indicator menu
        self.menu = gtk.Menu()
        self.build_menu()
        # initialize indicator
        self.indicator = appindicator.Indicator.new(APPINDICATOR_ID,
                resource.image_path("icon_red", theme.THEME),
                appindicator.IndicatorCategory.SYSTEM_SERVICES)
        self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
        self.indicator.set_menu(self.menu)
        # update indicator icon
        self.update_indicator_icon()
        # start ping objects
        self.start_ping_objects()
项目:cloudcam    作者:revmischa    | 项目源码 | 文件源码
def gst_rtp_pipeline(test_params):
    print("starting gstreamer rtp pipeline..")
    GObject.threads_init()
    Gst.init(None)

    # setup a pipeline which will receive RTP video and decode it, calling new_gst_buffer() on every decoded frame
    udpsrc = Gst.ElementFactory.make("udpsrc", None)
    udpsrc.set_property('port', test_params.rtp_port)
    udpsrc.set_property('caps', Gst.caps_from_string('application/x-rtp, encoding-name=H264, payload=96'))
    rtph264depay = Gst.ElementFactory.make("rtph264depay", None)
    h264parse = Gst.ElementFactory.make("h264parse", None)
    avdec_h264 = Gst.ElementFactory.make("avdec_h264", None)
    sink = Gst.ElementFactory.make("appsink", None)

    pipeline = Gst.Pipeline.new("rtp-pipeline")
    pipeline.add(udpsrc)
    pipeline.add(rtph264depay)
    pipeline.add(h264parse)
    pipeline.add(avdec_h264)
    pipeline.add(sink)

    udpsrc.link(rtph264depay)
    rtph264depay.link(h264parse)
    h264parse.link(avdec_h264)
    avdec_h264.link(sink)

    # gst event loop/bus
    loop_thread = GstMainLoopThread()

    sink.set_property("emit-signals", True)
    sink.connect("new-sample", loop_thread.gst_new_buffer, sink)

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", loop_thread.gst_bus_call, loop_thread.loop)

    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)
    loop_thread.start()

    yield loop_thread

    print("stopping gstreamer rtp pipeline..")
    loop_thread.loop.quit()
    pipeline.set_state(Gst.State.NULL)