我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbus.service()。
def hr_msrmt_cb(self): value = [] value.append(dbus.Byte(0x06)) value.append(dbus.Byte(randint(90, 130))) if self.hr_ee_count % 10 == 0: value[0] = dbus.Byte(value[0] | 0x08) value.append(dbus.Byte(self.service.energy_expended & 0xff)) value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) self.service.energy_expended = \ min(0xffff, self.service.energy_expended + 1) self.hr_ee_count += 1 print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying
def __init__(self): dbus.service.Object.__init__(self) #initialize variables that will be used during create and run self.bus = None self.package_dir = None self.main_loop = None self._timeout = False self.dbus_name = None self.xml_obj = BTOxml() # cached D-BUS interfaces for _check_polkit_privilege() self.dbus_info = None self.polkit = None self.progress_thread = None self.enforce_polkit = True #Enable translation for strings used bindtextdomain(DOMAIN, LOCALEDIR) textdomain(DOMAIN)
def create_dbus_server(cls, session_bus=False): '''Return a D-BUS server backend instance. Normally this connects to the system bus. Set session_bus to True to connect to the session bus (for testing). ''' backend = Backend() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) if session_bus: backend.bus = dbus.SessionBus() backend.enforce_polkit = False else: backend.bus = dbus.SystemBus() try: backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus) except dbus.exceptions.DBusException as msg: logging.error("Exception when spawning dbus service") logging.error(msg) return None return backend # # Internal methods #
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero' self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Objects to be associated with this service self.managed_objs = [] self.eventloop = async_tools.EventLoop()
def __init__(self, service_id, uuid, primary): """Default initialiser. 1. Registers the service on the D-Bus. 2. Sets up the service UUID and primary flags. :param service_id: :param uuid: service BLE UUID :param primary: whether or not the service is a primary service """ # Setup D-Bus object paths and register service self.path = self.PATH_BASE + str('{0:04d}'.format(service_id)) self.bus = dbus.SystemBus() self.interface = constants.GATT_SERVICE_IFACE dbus.service.Object.__init__(self, self.bus, self.path) self.props = { constants.GATT_SERVICE_IFACE: { 'UUID': uuid, 'Primary': primary} }
def GetAll(self, interface_name): """Return the service properties. This method is registered with the D-Bus at ``org.freedesktop.DBus.Properties`` :param interface: interface to get the properties of. The interface must be ``org.bluez.GattService1`` otherwise an exception is raised. """ if interface_name != constants.GATT_SERVICE_IFACE: raise InvalidArgsException() try: return self.props[interface_name] except KeyError: raise dbus.exceptions.DBusException( 'no such interface ' + interface_name, name=interface_name + '.UnknownInterface')
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the loop that the application runs in GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero/application{}'.format(id(self)) self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Initialise services within the application self.services = [] self.dongle = adapter.Adapter(device_id)
def GetManagedObjects(self): """Get all objects that are managed by the application. Return type is a dictionary whose keys are each registered object and values the properties of the given object. """ response = {} print('GetManagedObjects') for service in self.services: response[service.get_path()] = service.get_properties() chrcs = service.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response
def get_properties(self): """Return a dictionary of the service properties. The dictionary has the following keys: - UUID: the service UUID - Primary: whether the service is the primary service - Characteristics: D-Bus array of the characteristic object paths associated with the service. """ return { constants.GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } }
def GetManagedObjects(self): """Get all objects that are managed by the service. Return type is a dictionary whose keys are each registered object and values the properties of the given object. """ response = {} # print('GetManagedObjects') response[self.get_path()] = self.get_properties() chrcs = self.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response
def get_properties(self): """Return a dictionary of the characteristic properties. The dictionary has the following keys: - Service: the characteristic's service - UUID: the characteristic UUID - Flags: any characteristic flags - Descriptors: D-Bus array of the descriptor object paths associated with the characteristic. """ return { constants.GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } }
def __init__(self, uuid, flags, characteristic): """"Default initialiser. 1. Registers the descriptor on the characteristic path. 2. Sets up initial values. :param uuid: descriptor BLE UUID. :param flags: descriptor flags. :param characteristic: characteristic that the descriptor is associated with. """ # Register the descriptor on the characteristic path self.index = id(self) self.path = characteristic.path + '/desc' + str(self.index) self.bus = characteristic.bus dbus.service.Object.__init__(self, self.bus, self.path) self.uuid = uuid self.flags = flags self.chrc = characteristic
def __init__(self, advert_id, ad_type): """Default initialiser. Creates the interface to the specified advertising data. The DBus path must be specified. :param advert_id: Unique ID of advertisement. :param ad_type: Possible values: "broadcast" or "peripheral" """ # Setup D-Bus object paths and register service self.path = '/ukBaz/bluezero/advertisement{0:04d}'.format(advert_id) self.bus = dbus.SystemBus() self.eventloop = async_tools.EventLoop() self.interface = constants.LE_ADVERTISEMENT_IFACE dbus.service.Object.__init__(self, self.bus, self.path) self.props = { constants.LE_ADVERTISEMENT_IFACE: { 'Type': ad_type, 'ServiceUUIDs': None, 'ManufacturerData': None, 'SolicitUUIDs': None, 'ServiceData': None, 'IncludeTxPower': False } }
def run(self): GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName( "com.spoppy", dbus.SessionBus() ) super(SpoppyDBusService, self).__init__( bus_name, "/com/spoppy" ) self._loop = GObject.MainLoop() while self.running: try: logger.debug('Starting dbus loop') self._loop.run() except KeyboardInterrupt: logger.debug('Loop interrupted, will restart')
def run(self): self.service = SpoppyDBusService(self.lifecycle) self.service_thread = threading.Thread( target=self.service.run ) self.service_thread.start() logger.debug('Service started, waiting for kill signal') self.stop_event.wait() logger.debug('Kill signal received, stopping service') self.service.stop() while True: logger.debug('Joining service thread with timeout 10s') self.service_thread.join(10) if not self.service_thread.is_alive(): break
def read_sdp_service_record(self): print("Reading service record") try: fh = open(BTKbDevice.SDP_RECORD_PATH, "r") except: sys.exit("Could not open the sdp record. Exiting...") return fh.read() #listen for incoming client connections #ideally this would be handled by the Bluez 5 profile #but that didn't seem to work
def __init__(self, adapter, path): ClassLogger.__init__(self) dbus.service.Object.__init__(self, adapter.bus(), path) self.__path = path self.__capability = 'DisplayYesNo' # # These profile modes appear to cause the agent to ignore # pin and passcode requests... # #self.__capability = 'NoInputNoOutput' #self.__capability = 'DisplayOnly' #self.__capability = 'KeyboardOnly' adapter.adapter().RegisterAgent(path, self.__capability)
def __init__(self): # initially set the standard logger self.set_logger(logging.getLogger(__name__)) # initially set an empty configuration self.set_config(configparser.ConfigParser()) # set up the quit signals self.setup_signals( signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP], handler = self.please_stop_now ) # use glib as default mailoop for dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() GLib.threads_init() self.loop = GLib.MainLoop() # create mainloop self.systembus = dbus.SystemBus() # the system bus self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name # register the object on the bus name dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
def start_device_logging(self, devicefile): self.logger.info(_("received request to start logging on device {}").format( devicefile)) # create the object path name on the bus name objectpath = "/".join([CO2MONITOR_OBJECTPATH, utils.devicefile2objectname(devicefile)]) # check if device is already monitored monitored_devices = self.get_monitored_devices_objects() if objectpath in monitored_devices: self.logger.warning(" ".join([ _("There is already a logging thread for device {}."), _("This is odd... I better do nothing.") ]).format(devicefile)) return False else: try: thread = LogThread(devicefile = devicefile) # logger object self.logger.info(_("starting logging thread for device {}").format( devicefile)) # same logger as service thread.set_logger(self.logger) # same config as service thread.set_config(self.config) thread.daemon = True # let this thread be a daemon thread thread.start() except OSError: self.logger.critical(_("Could not access the device file '{}'." ).format(devicefile)) return False except: self.logger.critical(_( "Something went wrong with device file '{}'." ).format(devicefile)) return False return True
def __init__(self, devicefile): # by default, log! self.do_data_logging = True # initially set the standard logger self.set_logger(logging.getLogger(__name__)) # initially set an empty configuration self.set_config(configparser.ConfigParser()) # use glib as default mailoop for dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() GLib.threads_init() self.systembus = dbus.SystemBus() # the system bus self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name self.devicefile = devicefile # set up the device self.device = device.co2device(self.devicefile) # register the object on the bus name objectpath = "/".join([CO2MONITOR_OBJECTPATH, utils.devicefile2objectname(self.devicefile)]) dbus.service.Object.__init__(self, bus_name, objectpath) self.update_status(_("idle")) threading.Thread.__init__(self) # set the config
def __init__(self, taskQueue, resultQueue): self.taskQueue = taskQueue self.resultQueue = resultQueue bus_name = dbus.service.BusName('org.sim.simlab', bus=dbus.SessionBus()) dbus.service.Object.__init__(self, bus_name, '/org/sim/simlab')
def __init__(self, bus, index, advertising_type): self.path = self.PATH_BASE + str(index) self.bus = bus self.ad_type = advertising_type self.service_uuids = None self.manufacturer_data = None self.solicit_uuids = None self.service_data = None self.include_tx_power = None dbus.service.Object.__init__(self, bus, self.path)
def __init__(self, bus): self.path = '/' self.services = [] dbus.service.Object.__init__(self, bus, self.path) self.add_service(HeartRateService(bus, 0)) self.add_service(BatteryService(bus, 1)) self.add_service(TestService(bus, 2))
def add_service(self, service): self.services.append(service)
def GetManagedObjects(self): response = {} print('GetManagedObjects') for service in self.services: response[service.get_path()] = service.get_properties() chrcs = service.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response
def __init__(self, bus, index, uuid, primary): self.path = self.PATH_BASE + str(index) self.bus = bus self.uuid = uuid self.primary = primary self.characteristics = [] dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } }
def __init__(self, bus, index, uuid, flags, characteristic): self.path = characteristic.path + '/desc' + str(index) self.bus = bus self.uuid = uuid self.flags = flags self.chrc = characteristic dbus.service.Object.__init__(self, bus, self.path)
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.HR_MSRMT_UUID, ['notify'], service) self.notifying = False self.hr_ee_count = 0
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BODY_SNSR_LOC_UUID, ['read'], service)
def WriteValue(self, value, options): print('Heart Rate Control Point WriteValue called') if len(value) != 1: raise exceptions.InvalidValueLengthException() byte = value[0] print('Control Point value: ' + repr(byte)) if byte != 1: raise exceptions.FailedException("0x80") print('Energy Expended field reset!') self.service.energy_expended = 0
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BATTERY_LVL_UUID, ['read', 'notify'], service) self.notifying = False self.battery_lvl = 100 GObject.timeout_add(5000, self.drain_battery)
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read', 'write', 'writable-auxiliaries'], service) self.value = [] self.add_descriptor(TestDescriptor(bus, 0, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 1, self))
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['encrypt-read', 'encrypt-write'], service) self.value = [] self.add_descriptor(TestEncryptDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self))
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['secure-read', 'secure-write'], service) self.value = [] self.add_descriptor(TestSecureDescriptor(bus, 2, self)) self.add_descriptor( CharacteristicUserDescriptionDescriptor(bus, 3, self))
def __new__(cls, name, bases, attrs): if bases != (dbus.service.Object,): attrs['GetSecretsImpl'] = attrs.pop('GetSecrets') return super(SecretAgentType, cls).__new__(cls, name, bases, attrs)
def __init__(self, identifier): self.identifier = identifier dbus.service.Object.__init__(self, dbus.SystemBus(), self.object_path) AgentManager.Register(self.identifier)
def run(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus()) dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor") self._loop = gobject.MainLoop() print "Service running..." self._loop.run() print "Service stopped"
def __init__( self ): bus_name = dbus.service.BusName( BUSNAME, bus = dbus.SystemBus() ) super().__init__( bus_name = bus_name, object_path = SUPPORT_OBJECTPATH )
def GetState( self ): """ Return the current runtime state of the service. Returns: str: json encoded, free-form-ish dictionary of runtime state information """ return json.dumps( {} )
def __init__(self, ui, home, tr_ay, new_tr): global tray, new_tray_widget tray = tr_ay new_tray_widget = new_tr bus = dbus.service.BusName( AW_MPRIS_BUS_NAME, bus=dbus.SessionBus()) super().__init__(bus, MPRIS_OBJECT_PATH) self._properties = dbus.Dictionary({ 'DesktopEntry': 'kawaii-player', 'Identity': 'kawaii-player', }, signature='sv') self._player_properties = dbus.Dictionary({ 'Metadata': dbus.Dictionary({ 'mpris:artUrl': '', 'xesam:artist': ['None'], 'xesam:title': 'None', 'xesam:album': 'None' }, signature='sv', variant_level=1), 'CanGoNext': True, 'CanGoPrevious': True, 'CanPause': True, 'CanPlay': True, 'CanControl': True, 'CanStop': True, }, signature='sv', variant_level=2) self.ui = ui self.home = home
def __init__(self, bus, mainloop): bus_name = dbus.service.BusName(INTERFACE, bus=bus) PolicyKitService.__init__(self, bus_name, PATH) self.mainloop = mainloop
def run_dbus_service(self, timeout=None, send_usr1=False): '''Run D-BUS server. If no timeout is given, the server will run forever, otherwise it will return after the specified number of seconds. If send_usr1 is True, this will send a SIGUSR1 to the parent process once the server is ready to take requests. ''' dbus.service.Object.__init__(self, self.bus, '/RecoveryMedia') self.main_loop = GLib.MainLoop() self._timeout = False if timeout: def _quit(): """This function is ran at the end of timeout""" self.main_loop.quit() return True GLib.timeout_add(timeout * 1000, _quit) # send parent process a signal that we are ready now if send_usr1: os.kill(os.getppid(), signal.SIGUSR1) # run until we time out while not self._timeout: if timeout: self._timeout = True self.main_loop.run()
def add_managed_object(self, object): """Add a service to the list of services offered by the Application. :param object: Python object of dbus path to be managed """ self.managed_objs.append(object)
def __init__(self, characteristic_id, uuid, service_obj, value, notifying, flags): """Default initialiser. 1. Registers the characteristc on the D-Bus. 2. Sets up the service UUID and primary flags. :param service_id: :param uuid: service BLE UUID :param service_obj: the service that this characteristic is part of :param value: the initial value of this characteristic :param notifying: boolean representing the state of notification :param flags: """ # Setup D-Bus object paths and register service PATH_BASE = service_obj.get_path() + '/char' self.path = PATH_BASE + str('{0:04d}'.format(characteristic_id)) self.bus = dbus.SystemBus() dbus.service.Object.__init__(self, self.bus, self.path) self.props = { constants.GATT_CHRC_IFACE: { 'UUID': uuid, 'Service': service_obj.get_path(), 'Value': value, 'Notifying': notifying, 'Flags': flags} } for prop in self.props[constants.GATT_CHRC_IFACE].keys(): self.Set(constants.GATT_CHRC_IFACE, prop, self.props[constants.GATT_CHRC_IFACE][prop])
def __init__(self, descriptor_id, uuid, characteristic_obj, value, flags): """Default initialiser. 1. Registers the descriptor on the D-Bus. 2. Sets up the service UUID and primary flags. :param descriptor_id: A unique identifier for this descriptor :param uuid: descriptor BLE UUID :param characteristic_obj: The characteristic that this descriptor is related to :param value: The initial value of the descriptor :param flags: Flags specifying access permissions """ # Setup D-Bus object paths and register service PATH_BASE = characteristic_obj.get_path() + '/desc' self.path = PATH_BASE + str('{0:04d}'.format(descriptor_id)) self.bus = dbus.SystemBus() dbus.service.Object.__init__(self, self.bus, self.path) self.props = { constants.GATT_DESC_IFACE: { 'UUID': uuid, 'Characteristic': characteristic_obj.get_path(), 'Value': value, 'Flags': flags} } for prop in self.props[constants.GATT_DESC_IFACE].keys(): self.Set(constants.GATT_DESC_IFACE, prop, self.props[constants.GATT_DESC_IFACE][prop])
def add_service(self, service): """Add a service to the list of services offered by the Application. :param service: the service to be added (type: peripheral.Service) """ self.services.append(service)
def get_primary_service(self): """Get the *primary* service registered with the Application.""" # services = self.GetManagedObjects() primary_uuid = None for service in self.services: if service.primary: logger.debug(service.uuid) primary_uuid = service.uuid return primary_uuid
def UUID(self): """Return Service UUID""" return self.service.Get(constants.GATT_SERVICE_IFACE, 'UUID')
def add_characteristic(self, characteristic): """Add a characteristic. Adds a characteristic to the list of characteristics offered by the service. :param characteristic: the characteristic to be added. (type: peripheral.Characteristic) """ self.characteristics.append(characteristic)