Python oslo_utils.importutils 模块,import_module() 实例源码
我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用oslo_utils.importutils.import_module()。
def _import_modules_from_package():
folder = os.path.dirname(os_faults.__file__)
library_root = os.path.normpath(os.path.join(folder, os.pardir))
for root, dirs, files in os.walk(folder):
for filename in files:
if (filename.startswith('__') or
filename.startswith('test') or
not filename.endswith('.py')):
continue
relative_path = os.path.relpath(os.path.join(root, filename),
library_root)
name = os.path.splitext(relative_path)[0] # remove extension
module_name = '.'.join(name.split(os.sep)) # convert / to .
if module_name not in sys.modules:
module = importutils.import_module(module_name)
sys.modules[module_name] = module
else:
module = sys.modules[module_name]
yield module
def make_plugin_base_section(plugin_group):
elements = []
for item in plugin_group["items"]:
name = item["name"].title() if "SLA" != item["name"] else item["name"]
category_obj = category("%s %ss" % (plugin_group["group"].title(),
name))
elements.append(category_obj)
module, cls = item["base"].split(":")
plugin_base = getattr(importutils.import_module(module), cls)
for p in plugin_base.get_all():
category_obj.append(make_plugin_section(p, item["name"]))
return elements
def __init__(self, host, **kwargs):
"""Create an NWFilter firewall driver
:param host: nova.virt.libvirt.host.Host instance
:param kwargs: currently unused
"""
global libvirt
if libvirt is None:
try:
libvirt = importutils.import_module('libvirt')
except ImportError:
LOG.warning(_LW("Libvirt module could not be loaded. "
"NWFilterFirewall will not work correctly."))
self._host = host
self.static_filters_configured = False
self.handle_security_groups = False
def __init__(self, virtapi, read_only=False):
super(IronicDriver, self).__init__(virtapi)
global ironic
if ironic is None:
ironic = importutils.import_module('ironicclient')
# NOTE(deva): work around a lack of symbols in the current version.
if not hasattr(ironic, 'exc'):
ironic.exc = importutils.import_module('ironicclient.exc')
if not hasattr(ironic, 'client'):
ironic.client = importutils.import_module(
'ironicclient.client')
self.firewall_driver = firewall.load_driver(
default='nova.virt.firewall.NoopFirewallDriver')
self.node_cache = {}
self.node_cache_time = 0
ironicclient_log_level = CONF.ironic.client_log_level
if ironicclient_log_level:
level = py_logging.getLevelName(ironicclient_log_level)
logger = py_logging.getLogger('ironicclient')
logger.setLevel(level)
self.ironicclient = client_wrapper.IronicClientWrapper()
def __init__(self):
"""Initialise the IronicClientWrapper for use.
Initialise IronicClientWrapper by loading ironicclient
dynamically so that ironicclient is not a dependency for
Nova.
"""
global ironic
if ironic is None:
ironic = importutils.import_module('ironicclient')
# NOTE(deva): work around a lack of symbols in the current version.
if not hasattr(ironic, 'exc'):
ironic.exc = importutils.import_module('ironicclient.exc')
if not hasattr(ironic, 'client'):
ironic.client = importutils.import_module(
'ironicclient.client')
self._cached_client = None
def __init__(self, image, partition=None):
"""Create a new local VFS instance
:param image: instance of nova.virt.image.model.Image
:param partition: the partition number of access
"""
super(VFSGuestFS, self).__init__(image, partition)
global guestfs
if guestfs is None:
try:
guestfs = importutils.import_module('guestfs')
except Exception as e:
raise exception.NovaException(
_("libguestfs is not installed (%s)") % e)
self.handle = None
self.mount = False
def try_append_module(name, modules):
""" Append the module into specified module system """
if name not in modules:
modules[name] = importutils.import_module(name)
def __init__(self, db_driver=None):
super(Base, self).__init__()
if not db_driver:
db_driver = CONF.db_driver
self.db = importutils.import_module(db_driver) # pylint: disable=C0103
def Client(version, *args, **kwargs):
module = 'pankoclient.v%s.client' % version
module = importutils.import_module(module)
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)
def import_versioned_module(version, submodule=None):
module = 'bileanclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def _get_classes_from_module(self, module_name):
"""Get the classes from a module that match the type we want."""
classes = []
module = importutils.import_module(module_name)
for obj_name in dir(module):
# Skip objects that are meant to be private.
if obj_name.startswith('_'):
continue
itm = getattr(module, obj_name)
if self._is_correct_class(itm):
classes.append(itm)
return classes
def test_all_public_methods_are_traced(self):
profiler_opts.set_defaults(conf.CONF)
self.config(enabled=True,
group='profiler')
classes = [
'zun.compute.api.API',
'zun.compute.rpcapi.API',
]
for clsname in classes:
# give the metaclass and trace_cls() decorator a chance to patch
# methods of the classes above
six.reload_module(
importutils.import_module(clsname.rsplit('.', 1)[0]))
cls = importutils.import_class(clsname)
for attr, obj in cls.__dict__.items():
# only public methods are traced
if attr.startswith('_'):
continue
# only checks callables
if not (inspect.ismethod(obj) or inspect.isfunction(obj)):
continue
# osprofiler skips static methods
if isinstance(obj, staticmethod):
continue
self.assertTrue(getattr(obj, '__traced__', False), obj)
def test_import_module(self):
dt = importutils.import_module('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def import_versioned_module(version, submodule=None):
module = 'picassoclient.%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def get_subcommand_parser(self, version, do_help=False, argv=None):
parser = self.get_base_parser(argv)
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
actions_module = importutils.import_module(
"harborclient.v%s.shell" % version.ver_major)
self._find_actions(subparsers, actions_module, version, do_help)
self._find_actions(subparsers, self, version, do_help)
self._add_bash_completion_subparser(subparsers)
return parser
def load_policy_modules():
"""Load all modules that contain policies.
Method iterates over modules of :py:mod:`monasca_events_api.policies`
and imports only those that contain following methods:
- list_rules
"""
for modname in _list_module_names():
mod = importutils.import_module(_BASE_MOD_PATH + modname)
if hasattr(mod, 'list_rules'):
yield mod
def load_conf_modules():
"""Load all modules that contain configuration.
Method iterates over modules of :py:mod:`monasca_events_api.conf`
and imports only those that contain following methods:
- list_opts (required by oslo_config.genconfig)
- register_opts (required by :py:currentmodule:)
"""
imported_modules = []
for modname in _list_module_names():
mod = importutils.import_module('monasca_events_api.conf.' + modname)
required_funcs = ['register_opts', 'list_opts']
for func in required_funcs:
if not hasattr(mod, func):
msg = ("The module 'monasca_events_api.conf.%s' should have a"
" '%s' function which returns"
" the config options."
% (modname, func))
LOG.warning(msg)
else:
imported_modules.append(mod)
LOG.debug('Found %d modules that contain configuration',
len(imported_modules))
return imported_modules
def test__verify_driver(self):
cfg.CONF.set_override('enabled_drivers',
['kuryr.lib.binding.drivers.veth'],
group='binding')
driver = importutils.import_module('kuryr.lib.binding.drivers.veth')
binding._verify_driver(driver) # assert no exception raise
driver = importutils.import_module('kuryr.lib.binding.drivers.vlan')
self.assertRaises(exceptions.DriverNotEnabledException,
binding._verify_driver, driver)
def import_versioned_module(version, submodule=None):
module = 'heatclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def import_versioned_module(version, submodule=None):
module = 'monitoringclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def get_subcommand_parser(self):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
submodule = importutils.import_module('zaqar_demo')
self._find_actions(subparsers, submodule)
self._find_actions(subparsers, self)
return parser
def import_zhmcclient():
"""Lazy initialization for zhmcclient
This function helps in lazy loading zhmclient. The zhmcclient can
otherwise be set to fakezhmcclient for unittest framework
"""
LOG.debug("get_zhmcclient")
requests.packages.urllib3.disable_warnings()
global zhmcclient
if zhmcclient is None:
zhmcclient = importutils.import_module('zhmcclient')
return zhmcclient
def import_versioned_module(version, submodule=None):
module = 'glareclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def __init__(self, db_driver=None):
super(Base, self).__init__()
if not db_driver:
db_driver = CONF.db_driver
self.db = importutils.import_module(db_driver)
def _extract_all_actions():
path = os.path.dirname(mistral_actions.__file__)
base_len = len(path[:-len(mistral_actions.__name__)])
action_list = []
for root, dirs, files in os.walk(path):
for f in files:
if f.endswith('.py') and not f.startswith('_'):
module_name = (root + '/' + f)[base_len:-len('.py')].replace(
'/', '.')
module = importutils.import_module(module_name)
action_list.extend(_extract_actions_from_module(module))
return action_list
def get_subcommand_parser(self, do_help=False, argv=None):
parser = self.get_base_parser(argv)
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
actions_module = importutils.import_module(
"mistral_actions.client.shell")
self._find_actions(subparsers, actions_module, do_help)
self._find_actions(subparsers, self, do_help)
self._add_bash_completion_subparser(subparsers)
return parser
def import_versioned_module(version, submodule=None):
module = 'masakariclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def Client(version, *args, **kwargs):
module = 'tricircleclient.v%s.client' % version
module = importutils.import_module(module)
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)
def load_network_driver(network_driver=None):
if not network_driver:
network_driver = CONF.network_driver
if not network_driver:
LOG.error(_LE("Network driver option required, but not specified"))
sys.exit(1)
LOG.info(_LI("Loading network driver '%s'"), network_driver)
return importutils.import_module(network_driver)
def _get_classes_from_module(self, module_name):
"""Get the classes from a module that match the type we want."""
classes = []
module = importutils.import_module(module_name)
for obj_name in dir(module):
# Skip objects that are meant to be private.
if obj_name.startswith('_'):
continue
itm = getattr(module, obj_name)
if self._is_correct_class(itm):
classes.append(itm)
return classes
def __init__(self, db_driver=None):
super(Base, self).__init__()
if not db_driver:
db_driver = CONF.db_driver
self.db = importutils.import_module(db_driver)
def __init__(self, uri, read_only=False,
conn_event_handler=None,
lifecycle_event_handler=None):
global libvirt
if libvirt is None:
libvirt = importutils.import_module('libvirt')
self._uri = uri
self._read_only = read_only
self._conn_event_handler = conn_event_handler
self._lifecycle_event_handler = lifecycle_event_handler
self._skip_list_all_domains = False
self._caps = None
self._hostname = None
self._wrapped_conn = None
self._wrapped_conn_lock = threading.Lock()
self._event_queue = None
self._events_delayed = {}
# Note(toabctl): During a reboot of a domain, STOPPED and
# STARTED events are sent. To prevent shutting
# down the domain during a reboot, delay the
# STOPPED lifecycle event some seconds.
self._lifecycle_delay = 15
def __init__(self):
global libosinfo
try:
if libosinfo is None:
libosinfo = importutils.import_module(
'gi.repository.Libosinfo')
except ImportError as exp:
LOG.info(_LI("Cannot load Libosinfo: (%s)"), exp)
else:
self.loader = libosinfo.Loader()
self.loader.process_default_path()
self.db = self.loader.get_db()
self.oslist = self.db.get_os_list()
def deserialize_remote_exception(data, allowed_remote_exmods):
failure = jsonutils.loads(str(data))
trace = failure.get('tb', [])
message = failure.get('message', "") + "\n" + "\n".join(trace)
name = failure.get('class')
module = failure.get('module')
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
if module != 'exceptions' and module not in allowed_remote_exmods:
return messaging.RemoteError(name, failure.get('message'), trace)
try:
mod = importutils.import_module(module)
klass = getattr(mod, name)
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return messaging.RemoteError(name, failure.get('message'), trace)
ex_type = type(failure)
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
{'__str__': str_override, '__unicode__': str_override})
new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
# Exceptions and not core python exceptions. This is important because
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
return failure