Python dbus 模块,service() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbus.service()

项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def hr_msrmt_cb(self):
        value = []
        value.append(dbus.Byte(0x06))

        value.append(dbus.Byte(randint(90, 130)))

        if self.hr_ee_count % 10 == 0:
            value[0] = dbus.Byte(value[0] | 0x08)
            value.append(dbus.Byte(self.service.energy_expended & 0xff))
            value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))

        self.service.energy_expended = \
                min(0xffff, self.service.energy_expended + 1)
        self.hr_ee_count += 1

        print('Updating value: ' + repr(value))

        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])

        return self.notifying
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def __init__(self):
        dbus.service.Object.__init__(self)

        #initialize variables that will be used during create and run
        self.bus = None
        self.package_dir = None
        self.main_loop = None
        self._timeout = False
        self.dbus_name = None
        self.xml_obj = BTOxml()

        # cached D-BUS interfaces for _check_polkit_privilege()
        self.dbus_info = None
        self.polkit = None
        self.progress_thread = None
        self.enforce_polkit = True

        #Enable translation for strings used
        bindtextdomain(DOMAIN, LOCALEDIR)
        textdomain(DOMAIN)
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def create_dbus_server(cls, session_bus=False):
        '''Return a D-BUS server backend instance.

        Normally this connects to the system bus. Set session_bus to True to
        connect to the session bus (for testing).

        '''
        backend = Backend()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        if session_bus:
            backend.bus = dbus.SessionBus()
            backend.enforce_polkit = False
        else:
            backend.bus = dbus.SystemBus()
        try:
            backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus)
        except dbus.exceptions.DBusException as msg:
            logging.error("Exception when spawning dbus service")
            logging.error(msg)
            return None
        return backend

    #
    # Internal methods
    #
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero'
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Objects to be associated with this service
        self.managed_objs = []
        self.eventloop = async_tools.EventLoop()
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, service_id, uuid, primary):
        """Default initialiser.

        1. Registers the service on the D-Bus.
        2. Sets up the service UUID and primary flags.

        :param service_id:
        :param uuid: service BLE UUID
        :param primary: whether or not the service is a primary service
        """
        # Setup D-Bus object paths and register service
        self.path = self.PATH_BASE + str('{0:04d}'.format(service_id))
        self.bus = dbus.SystemBus()
        self.interface = constants.GATT_SERVICE_IFACE
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.GATT_SERVICE_IFACE: {
                'UUID': uuid,
                'Primary': primary}
        }
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def GetAll(self, interface_name):
        """Return the service properties.

        This method is registered with the D-Bus at
        ``org.freedesktop.DBus.Properties``

        :param interface: interface to get the properties of.

        The interface must be ``org.bluez.GattService1`` otherwise an
        exception is raised.
        """
        if interface_name != constants.GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        try:
            return self.props[interface_name]
        except KeyError:
            raise dbus.exceptions.DBusException(
                'no such interface ' + interface_name,
                name=interface_name + '.UnknownInterface')
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the loop that the application runs in
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero/application{}'.format(id(self))
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Initialise services within the application
        self.services = []

        self.dongle = adapter.Adapter(device_id)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def GetManagedObjects(self):
        """Get all objects that are managed by the application.

        Return type is a dictionary whose keys are each registered object and
        values the properties of the given object.
        """
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def get_properties(self):
        """Return a dictionary of the service properties.

        The dictionary has the following keys:

        - UUID: the service UUID
        - Primary: whether the service is the primary service
        - Characteristics: D-Bus array of the characteristic object paths
          associated with the service.

        """
        return {
            constants.GATT_SERVICE_IFACE: {
                'UUID': self.uuid,
                'Primary': self.primary,
                'Characteristics': dbus.Array(
                    self.get_characteristic_paths(),
                    signature='o')
            }
        }
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def GetManagedObjects(self):
        """Get all objects that are managed by the service.

        Return type is a dictionary whose keys are each registered object and
        values the properties of the given object.
        """
        response = {}
        # print('GetManagedObjects')

        response[self.get_path()] = self.get_properties()
        chrcs = self.get_characteristics()
        for chrc in chrcs:
            response[chrc.get_path()] = chrc.get_properties()
            descs = chrc.get_descriptors()
            for desc in descs:
                response[desc.get_path()] = desc.get_properties()

        return response
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def get_properties(self):
        """Return a dictionary of the characteristic properties.

        The dictionary has the following keys:

        - Service: the characteristic's service
        - UUID: the characteristic UUID
        - Flags: any characteristic flags
        - Descriptors: D-Bus array of the descriptor object paths
          associated with the characteristic.

        """
        return {
            constants.GATT_CHRC_IFACE: {
                'Service': self.service.get_path(),
                'UUID': self.uuid,
                'Flags': self.flags,
                'Descriptors': dbus.Array(
                    self.get_descriptor_paths(),
                    signature='o')
            }
        }
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, uuid, flags, characteristic):
        """"Default initialiser.

        1. Registers the descriptor on the characteristic path.
        2. Sets up initial values.

        :param uuid: descriptor BLE UUID.
        :param flags: descriptor flags.
        :param characteristic: characteristic that the descriptor is associated
                               with.
        """
        # Register the descriptor on the characteristic path
        self.index = id(self)
        self.path = characteristic.path + '/desc' + str(self.index)
        self.bus = characteristic.bus
        dbus.service.Object.__init__(self, self.bus, self.path)

        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, advert_id, ad_type):
        """Default initialiser.

        Creates the interface to the specified advertising data.
        The DBus path must be specified.

        :param advert_id: Unique ID of advertisement.
        :param ad_type: Possible values: "broadcast" or "peripheral"
        """
        # Setup D-Bus object paths and register service
        self.path = '/ukBaz/bluezero/advertisement{0:04d}'.format(advert_id)
        self.bus = dbus.SystemBus()
        self.eventloop = async_tools.EventLoop()
        self.interface = constants.LE_ADVERTISEMENT_IFACE
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.LE_ADVERTISEMENT_IFACE: {
                'Type': ad_type,
                'ServiceUUIDs': None,
                'ManufacturerData': None,
                'SolicitUUIDs': None,
                'ServiceData': None,
                'IncludeTxPower': False
            }
        }
项目:spoppy    作者:sindrig    | 项目源码 | 文件源码
def run(self):
            GObject.threads_init()
            dbus.mainloop.glib.threads_init()
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            bus_name = dbus.service.BusName(
                "com.spoppy",
                dbus.SessionBus()
            )
            super(SpoppyDBusService, self).__init__(
                bus_name, "/com/spoppy"
            )

            self._loop = GObject.MainLoop()
            while self.running:
                try:
                    logger.debug('Starting dbus loop')
                    self._loop.run()
                except KeyboardInterrupt:
                    logger.debug('Loop interrupted, will restart')
项目:spoppy    作者:sindrig    | 项目源码 | 文件源码
def run(self):
        self.service = SpoppyDBusService(self.lifecycle)
        self.service_thread = threading.Thread(
            target=self.service.run
        )
        self.service_thread.start()

        logger.debug('Service started, waiting for kill signal')
        self.stop_event.wait()
        logger.debug('Kill signal received, stopping service')
        self.service.stop()
        while True:
            logger.debug('Joining service thread with timeout 10s')
            self.service_thread.join(10)
            if not self.service_thread.is_alive():
                break
项目:BlogCode    作者:yaptb    | 项目源码 | 文件源码
def read_sdp_service_record(self):

        print("Reading service record")

        try:
            fh = open(BTKbDevice.SDP_RECORD_PATH, "r")
        except:
            sys.exit("Could not open the sdp record. Exiting...")

        return fh.read()   



    #listen for incoming client connections
    #ideally this would be handled by the Bluez 5 profile 
    #but that didn't seem to work
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def __init__(self, adapter, path):
    ClassLogger.__init__(self)
    dbus.service.Object.__init__(self, adapter.bus(), path)

    self.__path = path
    self.__capability = 'DisplayYesNo'

    #
    # These profile modes appear to cause the agent to ignore
    # pin and passcode requests...
    #
    #self.__capability = 'NoInputNoOutput'
    #self.__capability = 'DisplayOnly'
    #self.__capability = 'KeyboardOnly'

    adapter.adapter().RegisterAgent(path, self.__capability)
项目:co2monitor    作者:nobodyinperson    | 项目源码 | 文件源码
def __init__(self):
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())
        # set up the quit signals
        self.setup_signals(
            signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
            handler = self.please_stop_now
        )

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.loop = GLib.MainLoop() # create mainloop

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
        # register the object on the bus name
        dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
项目:co2monitor    作者:nobodyinperson    | 项目源码 | 文件源码
def start_device_logging(self, devicefile):
        self.logger.info(_("received request to start logging on device {}").format(
            devicefile))
        # create the object path name on the bus name
        objectpath = "/".join([CO2MONITOR_OBJECTPATH,
            utils.devicefile2objectname(devicefile)])
        # check if device is already monitored
        monitored_devices = self.get_monitored_devices_objects()
        if objectpath in monitored_devices:
            self.logger.warning(" ".join([
            _("There is already a logging thread for device {}."),
            _("This is odd... I better do nothing.")
            ]).format(devicefile))
            return False
        else:
            try:
                thread = LogThread(devicefile = devicefile) # logger object
                self.logger.info(_("starting logging thread for device {}").format(
                    devicefile))
                # same logger as service
                thread.set_logger(self.logger)
                # same config as service
                thread.set_config(self.config)
                thread.daemon = True # let this thread be a daemon thread
                thread.start()
            except OSError:
                self.logger.critical(_("Could not access the device file '{}'."
                    ).format(devicefile))
                return False
            except:
                self.logger.critical(_(
                    "Something went wrong with device file '{}'."
                    ).format(devicefile))
                return False
            return True
项目:co2monitor    作者:nobodyinperson    | 项目源码 | 文件源码
def __init__(self, devicefile):
        # by default, log!
        self.do_data_logging = True
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name

        self.devicefile = devicefile

        # set up the device
        self.device = device.co2device(self.devicefile)

        # register the object on the bus name
        objectpath = "/".join([CO2MONITOR_OBJECTPATH,
            utils.devicefile2objectname(self.devicefile)])
        dbus.service.Object.__init__(self, bus_name, objectpath)
        self.update_status(_("idle"))
        threading.Thread.__init__(self)


    # set the config
项目:simLAB    作者:kamwar    | 项目源码 | 文件源码
def __init__(self, taskQueue, resultQueue):
        self.taskQueue = taskQueue
        self.resultQueue = resultQueue
        bus_name = dbus.service.BusName('org.sim.simlab', bus=dbus.SessionBus())
        dbus.service.Object.__init__(self, bus_name, '/org/sim/simlab')
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.include_tx_power = None
        dbus.service.Object.__init__(self, bus, self.path)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(HeartRateService(bus, 0))
        self.add_service(BatteryService(bus, 1))
        self.add_service(TestService(bus, 2))
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def add_service(self, service):
        self.services.append(service)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.HR_MSRMT_UUID,
                ['notify'],
                service)
        self.notifying = False
        self.hr_ee_count = 0
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BODY_SNSR_LOC_UUID,
                ['read'],
                service)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def WriteValue(self, value, options):
        print('Heart Rate Control Point WriteValue called')

        if len(value) != 1:
            raise exceptions.InvalidValueLengthException()

        byte = value[0]
        print('Control Point value: ' + repr(byte))

        if byte != 1:
            raise exceptions.FailedException("0x80")

        print('Energy Expended field reset!')
        self.service.energy_expended = 0
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BATTERY_LVL_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.battery_lvl = 100
        GObject.timeout_add(5000, self.drain_battery)
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read', 'write', 'writable-auxiliaries'],
                service)
        self.value = []
        self.add_descriptor(TestDescriptor(bus, 0, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 1, self))
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['encrypt-read', 'encrypt-write'],
                service)
        self.value = []
        self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['secure-read', 'secure-write'],
                service)
        self.value = []
        self.add_descriptor(TestSecureDescriptor(bus, 2, self))
        self.add_descriptor(
                CharacteristicUserDescriptionDescriptor(bus, 3, self))
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def __new__(cls, name, bases, attrs):
        if bases != (dbus.service.Object,):
            attrs['GetSecretsImpl'] = attrs.pop('GetSecrets')
        return super(SecretAgentType, cls).__new__(cls, name, bases, attrs)
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def __init__(self, identifier):
        self.identifier = identifier
        dbus.service.Object.__init__(self, dbus.SystemBus(), self.object_path)
        AgentManager.Register(self.identifier)
项目:Steam-Supervisor-for-Linux    作者:prozacgod    | 项目源码 | 文件源码
def run(self):
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus())
    dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor")

    self._loop = gobject.MainLoop()
    print "Service running..."
    self._loop.run()
    print "Service stopped"
项目:bjarkan    作者:GetWellNetwork    | 项目源码 | 文件源码
def __init__( self ):
        bus_name = dbus.service.BusName( BUSNAME, bus = dbus.SystemBus() )
        super().__init__( bus_name = bus_name, object_path = SUPPORT_OBJECTPATH )
项目:bjarkan    作者:GetWellNetwork    | 项目源码 | 文件源码
def GetState( self ):
        """
        Return the current runtime state of the service.

        Returns:
            str: json encoded, free-form-ish dictionary of runtime state information
        """
        return json.dumps( {} )
项目:kawaii-player    作者:kanishka-linux    | 项目源码 | 文件源码
def __init__(self, ui, home, tr_ay, new_tr):
        global tray, new_tray_widget
        tray = tr_ay
        new_tray_widget = new_tr
        bus = dbus.service.BusName(
            AW_MPRIS_BUS_NAME,
            bus=dbus.SessionBus())
        super().__init__(bus, MPRIS_OBJECT_PATH)

        self._properties = dbus.Dictionary({
            'DesktopEntry': 'kawaii-player',
            'Identity': 'kawaii-player', 
            }, signature='sv')

        self._player_properties = dbus.Dictionary({
            'Metadata': dbus.Dictionary({
                'mpris:artUrl': '',
                'xesam:artist': ['None'],
                'xesam:title': 'None',
                'xesam:album': 'None'
            }, signature='sv', variant_level=1), 
            'CanGoNext': True,
            'CanGoPrevious': True,
            'CanPause': True,
            'CanPlay': True,
            'CanControl': True,
            'CanStop': True,
        }, signature='sv', variant_level=2)

        self.ui = ui
        self.home = home
项目:ubuntu-cleaner    作者:gerardpuig    | 项目源码 | 文件源码
def __init__(self, bus, mainloop):
        bus_name = dbus.service.BusName(INTERFACE, bus=bus)
        PolicyKitService.__init__(self, bus_name, PATH)
        self.mainloop = mainloop
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def run_dbus_service(self, timeout=None, send_usr1=False):
        '''Run D-BUS server.

        If no timeout is given, the server will run forever, otherwise it will
        return after the specified number of seconds.

        If send_usr1 is True, this will send a SIGUSR1 to the parent process
        once the server is ready to take requests.
        '''
        dbus.service.Object.__init__(self, self.bus, '/RecoveryMedia')
        self.main_loop = GLib.MainLoop()
        self._timeout = False
        if timeout:
            def _quit():
                """This function is ran at the end of timeout"""
                self.main_loop.quit()
                return True
            GLib.timeout_add(timeout * 1000, _quit)

        # send parent process a signal that we are ready now
        if send_usr1:
            os.kill(os.getppid(), signal.SIGUSR1)

        # run until we time out
        while not self._timeout:
            if timeout:
                self._timeout = True
            self.main_loop.run()
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def add_managed_object(self, object):
        """Add a service to the list of services offered by the Application.

        :param object: Python object of dbus path to be managed
        """
        self.managed_objs.append(object)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, characteristic_id,
                 uuid,
                 service_obj,
                 value,
                 notifying,
                 flags):
        """Default initialiser.

        1. Registers the characteristc on the D-Bus.
        2. Sets up the service UUID and primary flags.

        :param service_id:
        :param uuid: service BLE UUID
        :param service_obj: the service that this characteristic is part of
        :param value: the initial value of this characteristic
        :param notifying: boolean representing the state of notification
        :param flags:
        """
        # Setup D-Bus object paths and register service
        PATH_BASE = service_obj.get_path() + '/char'
        self.path = PATH_BASE + str('{0:04d}'.format(characteristic_id))
        self.bus = dbus.SystemBus()
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.GATT_CHRC_IFACE: {
                'UUID': uuid,
                'Service': service_obj.get_path(),
                'Value': value,
                'Notifying': notifying,
                'Flags': flags}
        }

        for prop in self.props[constants.GATT_CHRC_IFACE].keys():
            self.Set(constants.GATT_CHRC_IFACE,
                     prop,
                     self.props[constants.GATT_CHRC_IFACE][prop])
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self,
                 descriptor_id,
                 uuid,
                 characteristic_obj,
                 value,
                 flags):
        """Default initialiser.

        1. Registers the descriptor on the D-Bus.
        2. Sets up the service UUID and primary flags.

        :param descriptor_id: A unique identifier for this descriptor
        :param uuid: descriptor BLE UUID
        :param characteristic_obj: The characteristic that this descriptor
                                   is related to
        :param value: The initial value of the descriptor
        :param flags: Flags specifying access permissions
        """
        # Setup D-Bus object paths and register service
        PATH_BASE = characteristic_obj.get_path() + '/desc'
        self.path = PATH_BASE + str('{0:04d}'.format(descriptor_id))
        self.bus = dbus.SystemBus()
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.GATT_DESC_IFACE: {
                'UUID': uuid,
                'Characteristic': characteristic_obj.get_path(),
                'Value': value,
                'Flags': flags}
        }
        for prop in self.props[constants.GATT_DESC_IFACE].keys():
            self.Set(constants.GATT_DESC_IFACE,
                     prop,
                     self.props[constants.GATT_DESC_IFACE][prop])
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def add_service(self, service):
        """Add a service to the list of services offered by the Application.

        :param service: the service to be added (type: peripheral.Service)
        """
        self.services.append(service)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def get_primary_service(self):
        """Get the *primary* service registered with the Application."""
        # services = self.GetManagedObjects()
        primary_uuid = None
        for service in self.services:
            if service.primary:
                logger.debug(service.uuid)
                primary_uuid = service.uuid
        return primary_uuid
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def UUID(self):
        """Return Service UUID"""
        return self.service.Get(constants.GATT_SERVICE_IFACE, 'UUID')
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def add_characteristic(self, characteristic):
        """Add a characteristic.

        Adds a characteristic to the list of characteristics offered by the
        service.

        :param characteristic: the characteristic to be added.
                               (type: peripheral.Characteristic)

        """
        self.characteristics.append(characteristic)