我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用gi.repository.GObject.MainLoop()。
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the loop that the application runs in GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero/application{}'.format(id(self)) self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Initialise services within the application self.services = [] self.dongle = adapter.Adapter(device_id)
def run(self): GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName( "com.spoppy", dbus.SessionBus() ) super(SpoppyDBusService, self).__init__( bus_name, "/com/spoppy" ) self._loop = GObject.MainLoop() while self.running: try: logger.debug('Starting dbus loop') self._loop.run() except KeyboardInterrupt: logger.debug('Loop interrupted, will restart')
def main(): global bluetooth global ibus bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged) ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket) ibus.cmd = ibus_.IBUSCommands(ibus) ibus.main_thread = threading.Thread(target=ibus.start) ibus.main_thread.daemon = True ibus.main_thread.start() try: mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass except: print("Unable to run the gobject main loop") print("") shutdown() sys.exit(0)
def main(): global ibus ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket) ibus.cmd = ibus_.IBUSCommands(ibus) ibus.main_thread = threading.Thread(target=ibus.start) ibus.main_thread.daemon = True ibus.main_thread.start() try: mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass except: print("Unable to run the gobject main loop") print("") shutdown() sys.exit(0)
def run(self): """ Registers advertisement and services to D-Bus and starts the main loop. """ if self._main_loop: return self._main_loop = GObject.MainLoop() self._disconnect_all() self._register() logger.info("--- Mainloop started ---") try: self._main_loop.run() except KeyboardInterrupt: # ignore exception as it is a valid way to exit the program # and skip to finally clause pass except Exception as e: logger.error(e) finally: logger.info("--- Mainloop finished ---") self._unregister() self._main_loop.quit() self._main_loop = None
def main(): parser = argparse.ArgumentParser() parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='') args = parser.parse_args() adapter_name = args.adapter_name dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() mainloop = GObject.MainLoop() advertising.advertising_main(mainloop, bus, adapter_name) gatt_server.gatt_server_main(mainloop, bus, adapter_name) mainloop.run()
def __init__(self): GObject.threads_init() self._loop = GObject.MainLoop() self._player = Player()
def pair(args): """ Pair to the specified device Args: args (dict): args parsed on the command line Returns: results (dict): return message and code of the operation """ def success(): try: device_manager.trust_device() device_manager.connect_device() format_results({'result': 'Success', 'code': ''}) finally: mainloop.quit() def error(err): try: if err == 'org.freedesktop.DBus.Error.NoReply' and self.dev: code = 'Timeout' device_manager.cancel_device() if err in ('org.bluez.Error.AuthenticationCanceled', 'org.bluez.Error.AuthenticationFailed', 'org.bluez.Error.AuthenticationRejected', 'org.bluez.Error.AuthenticationTimeout'): code = 'AuthenticationError' else: code = 'CreatingDeviceFailed' format_results({'result': 'Error', 'code': code}) finally: mainloop.quit() mainloop = MainLoop() device_manager = DeviceManager(args.device) device_manager.pair_device(success, error) mainloop.run()
def scan_devices( duration = 10 ): """ This causes the bluetooth system to scan for any broadcasting devices. Once found, the devices get added to a dbus backed database specific for bluetooth for retrieval later. Args: duration (int): the amount of time that the scan should run for in seconds. """ adapter = DeviceManager().find_adapter() adapter.StartDiscovery() mainloop = GObject.MainLoop() GObject.timeout_add(duration * 1000, quit, mainloop) mainloop.run()
def run(self): """ Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter. This call blocks until you call `stop()` to stop the main loop. """ if self._main_loop: return self._interface_added_signal = self._bus.add_signal_receiver( self._interfaces_added, dbus_interface='org.freedesktop.DBus.ObjectManager', signal_name='InterfacesAdded') # TODO: Also listen to 'interfaces removed' events? self._properties_changed_signal = self._bus.add_signal_receiver( self._properties_changed, dbus_interface=dbus.PROPERTIES_IFACE, signal_name='PropertiesChanged', arg0='org.bluez.Device1', path_keyword='path') def disconnect_signals(): for device in self._devices.values(): device.invalidate() self._properties_changed_signal.remove() self._interface_added_signal.remove() self._main_loop = GObject.MainLoop() try: self._main_loop.run() disconnect_signals() except Exception: disconnect_signals() raise
def __init__(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() self._mainloop = GObject.MainLoop() self.wifi_manager = WiFiControl() self.callbacks = {} self.current_state = self.OFF_STATE self.current_ssid = None self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
def __init__( self, client_class, timeout=180, capability="KeyboardDisplay", path="/org/bluez/my_bluetooth_agent"): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.client_class = client_class self.timeout = timeout self.capability = capability self.path = path self._bus = dbus.SystemBus() self._mainloop = GObject.MainLoop() _bluetooth.make_discoverable(False)
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1): self._spp = SerialPort(channel) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.service.Object.__init__( self, dbus.SystemBus(), self._spp.profile_path) self.tcp_port_in = tcp_port_in self.tcp_port_out = tcp_port_out self._mainloop = GObject.MainLoop()
def main(): global mainloop global display dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() # Get ServiceManager and AdvertisingManager service_manager = get_service_manager(bus) ad_manager = get_ad_manager(bus) # Create gatt services display = setup_display() app = LedApplication(bus, display) # Create advertisement test_advertisement = LedAdvertisement(bus, 0) mainloop = GObject.MainLoop() # Register gatt services service_manager.RegisterApplication(app.get_path(), {}, reply_handler=register_app_cb, error_handler=register_app_error_cb) # Register advertisement ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) try: mainloop.run() except KeyboardInterrupt: display.clear() display.write_display()
def __init__(self, sony_av_indicator, device_service, state_service, command_service): threading.Thread.__init__(self) self.sony_av_indicator = sony_av_indicator self.device_service = device_service self.state_service = state_service self.command_service = command_service self.properties = { ROOT_INTERFACE: self._get_root_iface_properties(), PLAYER_INTERFACE: self._get_player_iface_properties() } self.main_loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default = True) # self.main_loop = GObject.MainLoop() self.bus = dbus.SessionBus(mainloop = self.main_loop) self.bus_name = self._connect_to_dbus() dbus.service.Object.__init__(self, self.bus_name, OBJECT_PATH)
def start_loop(self): self.loop = threading.Thread(target=GObject.MainLoop().run) self.loop.daemon = True self.loop.start()
def bluetoothConnection(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() obj = bus.get_object(BUS_NAME, "/org/bluez"); profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1") profile_path = "/foo/bar/profile" auto_connect = {"AutoConnect": False} profile_uuid = "1101" profile = Profile(bus, profile_path) profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect) mainloop = GObject.MainLoop() mainloop.run()
def bluetoothConnection(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() obj = bus.get_object(BUS_NAME, "/org/bluez") profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1") profile_path = "/foo/bar/profile" auto_connect = {"AutoConnect": False} profile_uuid = "1101" profile = Profile(bus, profile_path) profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect) mainloop = GObject.MainLoop() try: mainloop.run() except (KeyboardInterrupt, SystemExit): closeThreads()
def run_mainloop_with(self, target): """Start the OS's main loop to process asyncronous BLE events and then run the specified target function in a background thread. Target function should be a function that takes no parameters and optionally return an integer response code. When the target function stops executing or returns with value then the main loop will be stopped and the program will exit with the returned code. Note that an OS main loop is required to process asyncronous BLE events and this function is provided as a convenience for writing simple tools and scripts that don't need to be full-blown GUI applications. If you are writing a GUI application that has a main loop (a GTK glib main loop on Linux, or a Cocoa main loop on OSX) then you don't need to call this function. """ # Spin up a background thread to run the target code. self._user_thread = threading.Thread(target=self._user_thread_main, args=(target,)) self._user_thread.daemon = True # Don't let the user thread block exit. self._user_thread.start() # Spin up a GLib main loop in the main thread to process async BLE events. self._gobject_mainloop = GObject.MainLoop() try: self._gobject_mainloop.run() # Doesn't return until the mainloop ends. except KeyboardInterrupt: self._gobject_mainloop.quit() sys.exit(0) # Main loop finished. Check if an exception occured and throw it, # otherwise return the status code from the user code. if self._exception is not None: # Rethrow exception with its original stack trace following advice from: # http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html raise self._exception[1], None, self._exception[2] else: sys.exit(self._return_code)
def start_glib_loop(cls): """ If your program does not use the GLib loop, call this first. It initializes threads, GStreamer, and starts the GLib loop in a separate thread. """ GObject.threads_init() Gst.init(None) cls.glib_loop = GObject.MainLoop() cls.glib_thread = threading.Thread(target=cls.glib_loop.run) cls.glib_thread.start()
def _listen_for_wifi_state_changes(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) def print_status(status, nm_state): if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED): logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid)) else: logger.info("Wifi status changed: %s (%d)" % (status, nm_state)) def on_state_changed(nm_instance, nm_state, **kwargs): if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL: new_status = WifiConnectionState.CONNECTED elif nm_state > NetworkManager.NM_STATE_DISCONNECTING: new_status = WifiConnectionState.CONNECTING else: new_status = WifiConnectionState.DISCONNECTED self._update_current_ssid() if new_status == self._wifi_status: return print_status(new_status, nm_state) self._wifi_status = new_status self._on_wifi_status_changed() # check initial status: initial_state = NetworkManager.NetworkManager.State on_state_changed(None, initial_state) # listen for changes: NetworkManager.NetworkManager.OnStateChanged(on_state_changed) logger.debug("Start listening to network status changes") # Attention: a GObject.MainLoop() is required for this to work # in this case it is started by the BLE Peripheral object
def run(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) # check initial status: state = NetworkManager.NetworkManager.State self._on_state_changed(None, state) # listen for changes: NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed) logger.debug("Start listening to network status changes") loop = GObject.MainLoop() loop.run()
def __init__(self): """ Initialize app src. """ self._mainloop = GObject.MainLoop() self._pipeline = Gst.Pipeline() # Make elements. self._src = Gst.ElementFactory.make('appsrc', 'appsrc') decode = Gst.ElementFactory.make("decodebin", "decode") self._queueaudio = Gst.ElementFactory.make('queue', 'queueaudio') audioconvert = Gst.ElementFactory.make('audioconvert', 'audioconvert') sink = Gst.ElementFactory.make('alsasink', 'sink') self._src.set_property('stream-type', 'stream') # Add to pipeline. self._pipeline.add(self._src) self._pipeline.add(decode) self._pipeline.add(self._queueaudio) self._pipeline.add(audioconvert) self._pipeline.add(sink) # Link elements. self._src.link(decode) self._queueaudio.link(audioconvert) audioconvert.link(sink) decode.connect('pad-added', self._decode_src_created)
def setUp(self): self.loop = GObject.MainLoop(GObject.main_context_default()) self.error = False
def setUp(self): self.loop = GObject.MainLoop(GObject.main_context_default()) self.error = False self.orig_host = os.environ.get("SOFTWARE_CENTER_RECOMMENDER_HOST") if not "SOFTWARE_CENTER_RECOMMENDER_HOST" in os.environ: server = "https://rec.staging.ubuntu.com" #server = "https://rec.ubuntu.com" os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = server
def __init__(self, xid=0): self.oauth = None self.xid = xid self.loop = GObject.MainLoop(GObject.main_context_default())
def clear_token_from_ubuntu_sso_sync(appname): """ send a dbus signal to the com.ubuntu.sso service to clear the credentials for the given appname, e.g. _("Ubuntu Software Center") and wait for it to finish (or 2s) """ from ubuntu_sso import ( DBUS_BUS_NAME, DBUS_CREDENTIALS_IFACE, DBUS_CREDENTIALS_PATH, ) # clean loop = GObject.MainLoop() bus = dbus.SessionBus() obj = bus.get_object(bus_name=DBUS_BUS_NAME, object_path=DBUS_CREDENTIALS_PATH, follow_name_owner_changes=True) proxy = dbus.Interface(object=obj, dbus_interface=DBUS_CREDENTIALS_IFACE) proxy.connect_to_signal("CredentialsCleared", loop.quit) proxy.connect_to_signal("CredentialsNotFound", loop.quit) proxy.connect_to_signal("CredentialsError", loop.quit) proxy.clear_credentials(appname, {}) # ensure we don't hang forever here GObject.timeout_add_seconds(2, loop.quit) # run the mainloop until the credentials are clear loop.run()
def run(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName("org.qoverview.config", dbus.SessionBus()) dbus.service.Object.__init__(self, bus_name, "/org/qoverview/config") self._loop = GObject.MainLoop() self._loop.run()
def run_pipeline(pipeline, clock, audio_delay=0, video_delay=0): def on_eos(bus, message): print('Received EOS-Signal') sys.exit(1) def on_error(bus, message): print('Received Error-Signal') (error, debug) = message.parse_error() print('Error-Details: #%u: %s' % (error.code, debug)) sys.exit(2) print('starting pipeline...') senderPipeline = Gst.parse_launch(pipeline) senderPipeline.use_clock(clock) # Delay video/audio if required NS_TO_MS = 100000 if video_delay > 0: print('Adjusting video sync: [{} milliseconds]'.format(video_delay)) video_delay = video_delay * NS_TO_MS videosrc = senderPipeline.get_by_name('videosrc') videosrc.get_static_pad('src').set_offset(video_delay) if audio_delay > 0: print('Adjusting audio sync: [{} milliseconds]'.format(audio_delay)) audio_delay = audio_delay * NS_TO_MS audiosrc = senderPipeline.get_by_name('audiosrc') audiosrc.get_static_pad('src').set_offset(audio_delay) # Binding End-of-Stream-Signal on Source-Pipeline senderPipeline.bus.add_signal_watch() senderPipeline.bus.connect("message::eos", on_eos) senderPipeline.bus.connect("message::error", on_error) print("playing...") senderPipeline.set_state(Gst.State.PLAYING) mainloop = GObject.MainLoop() try: mainloop.run() except KeyboardInterrupt: print('Terminated via Ctrl-C') print('Shutting down...') senderPipeline.set_state(Gst.State.NULL) print('Done.') return
def update_from_software_center_agent(db, cache, ignore_cache=False, include_sca_qa=False): """ update index based on the software-center-agent data """ def _available_cb(sca, available): # print "available: ", available LOG.debug("available: '%s'" % available) sca.available = available sca.good_data = True loop.quit() def _error_cb(sca, error): LOG.warn("error: %s" % error) sca.available = [] sca.good_data = False loop.quit() # use the anonymous interface to s-c-agent, scales much better and is # much cache friendlier from softwarecenter.backend.scagent import SoftwareCenterAgent # FIXME: honor ignore_etag here somehow with the new piston based API sca = SoftwareCenterAgent(ignore_cache) sca.connect("available", _available_cb) sca.connect("error", _error_cb) sca.available = None if include_sca_qa: sca.query_available_qa() else: sca.query_available() # create event loop and run it until data is available # (the _available_cb and _error_cb will quit it) context = GObject.main_context_default() loop = GObject.MainLoop(context) loop.run() # process data for entry in sca.available: # process events while context.pending(): context.iteration() try: # now the normal parser parser = SCAApplicationParser(entry) index_app_info_from_parser(parser, db, cache) except Exception as e: LOG.warning("error processing: %s " % e) # return true if we have updated entries (this can also be an empty list) # but only if we did not got a error from the agent return sca.good_data
def im_incoming(queues: Dict[bytes, 'Queue']) -> None: """Loop that maintains signal receiver process.""" def pidgin_to_rxm(account: str, sender: str, message: str, *_: Any) -> None: """Signal receiver process that receives packets from Pidgin.""" sender = sender.split('/')[0] ts = datetime.now().strftime("%m-%d / %H:%M:%S") d_bus = dbus.SessionBus(private=True) obj = d_bus.get_object("im.pidgin.purple.PurpleService", "/im/pidgin/purple/PurpleObject") purple = dbus.Interface(obj, "im.pidgin.purple.PurpleInterface") user = '' for a in purple.PurpleAccountsGetAllActive(): if a == account: user = purple.PurpleAccountGetUsername(a)[:-1] if not message.startswith(TFC): return None try: __, header, payload = message.split('|') # type: Tuple[str, str, str] except ValueError: return None if header.encode() == PUBLIC_KEY_PACKET_HEADER: print("{} - pub key {} > {} > RxM".format(ts, sender, user)) elif header.encode() == MESSAGE_PACKET_HEADER: print("{} - message {} > {} > RxM".format(ts, sender, user)) else: print("Received invalid packet from {}".format(sender)) return None decoded = base64.b64decode(payload) packet = header.encode() + decoded + ORIGIN_CONTACT_HEADER + sender.encode() queues[RXM_OUTGOING_QUEUE].put(packet) while True: with ignored(dbus.exceptions.DBusException, EOFError, KeyboardInterrupt): bus = dbus.SessionBus(private=True, mainloop=DBusGMainLoop()) bus.add_signal_receiver(pidgin_to_rxm, dbus_interface="im.pidgin.purple.PurpleInterface", signal_name="ReceivedImMsg") GObject.MainLoop().run()