Python types 模块,ClassType() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用types.ClassType()。
def struct_fields(value):
if isinstance(value, types.ClassType) or hasattr(value, '__bases__'):
clazz = value
else:
clazz = type(value)
# Try to get the correct field ordering, if it is available, otherwise
# just look at the class dictionary to find the fields.
fields = getattr(clazz, '__fields', None)
if fields is None:
fields = [p for p in clazz.__dict__.itervalues() if isinstance(p, simple_property)]
fields += [p for p in clazz.__dict__.itervalues() if isinstance(p, simpleseq_property)]
members = inspect.getmembers(value)
for member in members:
if isinstance(member[1], simple_property) or isinstance(member[1], simpleseq_property):
foundMatch = False
for field in fields:
if member[1].id_ == field.id_:
foundMatch = True
break
if not foundMatch:
fields += [member[1]]
return fields
def runTests(self):
if self.catchbreak:
installHandler()
if self.testRunner is None:
self.testRunner = runner.TextTestRunner
if isinstance(self.testRunner, (type, types.ClassType)):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
failfast=self.failfast,
buffer=self.buffer)
except TypeError:
# didn't accept the verbosity, buffer or failfast arguments
testRunner = self.testRunner()
else:
# it is assumed to be a TestRunner instance
testRunner = self.testRunner
self.result = testRunner.run(self.test)
if self.exit:
sys.exit(not self.result.wasSuccessful())
def runTests(self):
if self.catchbreak:
installHandler()
if self.testRunner is None:
self.testRunner = runner.TextTestRunner
if isinstance(self.testRunner, (type, types.ClassType)):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
failfast=self.failfast,
buffer=self.buffer)
except TypeError:
# didn't accept the verbosity, buffer or failfast arguments
testRunner = self.testRunner()
else:
# it is assumed to be a TestRunner instance
testRunner = self.testRunner
self.result = testRunner.run(self.test)
if self.exit:
sys.exit(not self.result.wasSuccessful())
def latestVersionOf(self, anObject):
"""Get the latest version of an object.
This can handle just about anything callable; instances, functions,
methods, and classes.
"""
t = type(anObject)
if t == types.FunctionType:
return latestFunction(anObject)
elif t == types.MethodType:
if anObject.im_self is None:
return getattr(anObject.im_class, anObject.__name__)
else:
return getattr(anObject.im_self, anObject.__name__)
elif t == types.InstanceType:
# Kick it, if it's out of date.
getattr(anObject, 'nothing', None)
return anObject
elif t == types.ClassType:
return latestClass(anObject)
else:
log.msg('warning returning anObject!')
return anObject
def loadAnything(self, thing, recurse=False):
"""
Given a Python object, return whatever tests that are in it. Whatever
'in' might mean.
@param thing: A Python object. A module, method, class or package.
@param recurse: Whether or not to look in subpackages of packages.
Defaults to False.
@return: A C{TestCase} or C{TestSuite}.
"""
if isinstance(thing, types.ModuleType):
if isPackage(thing):
return self.loadPackage(thing, recurse)
return self.loadModule(thing)
elif isinstance(thing, types.ClassType):
return self.loadClass(thing)
elif isinstance(thing, type):
return self.loadClass(thing)
elif isinstance(thing, types.MethodType):
return self.loadMethod(thing)
raise TypeError("No loader for %r. Unrecognized type" % (thing,))
def minimalBases(classes):
"""Reduce a list of base classes to its ordered minimum equivalent"""
classes = [c for c in classes if c is not ClassType]
candidates = []
for m in classes:
for n in classes:
if issubclass(n,m) and m is not n:
break
else:
# m has no subclasses in 'classes'
if m in candidates:
candidates.remove(m) # ensure that we're later in the list
candidates.append(m)
return candidates
def get_formatted_content(self, pyobj):
tc = type(pyobj)
if isinstance(pyobj, object):
tc = pyobj.__class__
if hasattr(pyobj, 'typecode'):
#serializer = pyobj.typecode.serialmap.get(tc)
serializer = pyobj.typecode
else:
serializer = Any.serialmap.get(tc)
if not serializer:
tc = (types.ClassType, pyobj.__class__.__name__)
serializer = Any.serialmap.get(tc)
else:
serializer = Any.serialmap.get(tc)
if not serializer and isinstance(pyobj, time.struct_time):
from pysphere.ZSI.TCtimes import gDateTime
serializer = gDateTime()
if serializer:
return serializer.get_formatted_content(pyobj)
raise EvaluateException('Failed to find serializer for pyobj %s' %pyobj)
def load_component(self, class_name):
"""
Loads and adds new component to editor.
Returns component instance.
"""
if class_name in self.loaded_components:
return self.loaded_components[class_name]
mod = importlib.import_module("scc.gui.ae.%s" % (class_name,))
for x in mod.__all__:
cls = getattr(mod, x)
if isinstance(cls, (type, types.ClassType)) and issubclass(cls, AEComponent):
if cls is not AEComponent:
instance = cls(self.app, self)
self.loaded_components[class_name] = instance
self.components.append(instance)
return instance
def register(cls, action):
action_logger.info("Registering action :%s" % action)
if isinstance(action, (types.FunctionType, staticmethod)):
name = action.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif isinstance(action, types.ClassType) and hasattr(action, "__call__"):
name = action.__name__
action = action()
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif (isinstance(action, (types.InstanceType, types.ObjectType))
and hasattr(action, "__call__")):
if isinstance(action, type):
name = action.__name__
action = action()
else:
name = action.__class__.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
else:
name = str(action)
action_logger.error("Error registering action :%s" % action)
raise UnknownAction("unable to register action %s!!" %name)
def register(action_klass):
"""
Registers a chaos action class with Action class
Args:
action_klass:
Returns:
"""
def _register(chaos_klass):
if chaos_klass.enabled:
name = chaos_klass.__name__
if isinstance(chaos_klass, (type, types.ClassType)):
chaos_klass = chaos_klass()
setattr(action_klass, name, chaos_klass)
action_klass._actions.update([(name.upper(), chaos_klass)])
return chaos_klass
return _register
def register(cls, event):
exe_logger.info("Registering event :%s" % event)
if isinstance(event, (types.FunctionType, staticmethod)):
name = event.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif isinstance(event, types.ClassType) and hasattr(event, "__call__"):
name = event.__name__
event = event()
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif (isinstance(event, (types.InstanceType, types.ObjectType))
and hasattr(event, "__call__")):
if isinstance(event, type):
name = event.__name__
event = event()
else:
name = event.__class__.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
else:
name = str(event)
exe_logger.error("Error registering event :%s" % event)
raise UnknownChaosEvent("unable to register event %s!!" % name)
def register(executor_klass):
"""
Registers a chaos event class with ChaosExecutor class
Args:
executor_klass:
Returns:
"""
def _register(chaos_klass):
if chaos_klass.enabled:
name = chaos_klass.__name__
if isinstance(chaos_klass, (type, types.ClassType)):
chaos_klass = chaos_klass()
setattr(executor_klass, name, chaos_klass)
# event_klass._chaos_events.update([(name.upper(), chaos_klass)])
executor_klass.update_events([(name.upper(), chaos_klass)])
return chaos_klass
return _register
def simplefilter(self, action, category=Warning, lineno=0, append=0, caller_id = 0, severity=0):
with self.warn_lock:
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, None, category, None, lineno, caller_id, severity)
if item in self.filters:
self.filters.remove(item)
if append:
self.filters.append(item)
else:
self.filters.insert(0, item)
def filterwarnings(self, action, message="", category=Warning, module="", lineno=0, append=0, caller_id = 0, severity=0):
with self.warn_lock:
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno, caller_id, severity)
if item in self.filters:
self.filters.remove(item)
if append:
self.filters.append(item)
else:
self.filters.insert(0, item)
# end _Warnings()
def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(%r, *%r, **%r)',
self, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
else:
klass = None
recv = self.send_async(
mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION,
)
)
recv.raise_channelerror = False
return recv
def addDataSource(self, source=SampleDataSource, name='default', sourceModule=None, sourceArgs=[], sourceKwargs={}):
if name in self._sources:
raise Exception('Data source "%s" already exists!' % name)
if utils.isstr(source):
if utils.isstr(sourceModule):
sourceModule = __import__(sourceModule)
if sourceModule is not None:
source = sourceModule.__dict__[source]
elif type(source) in [ types.ClassType, types.TypeType]:
source = source(*sourceArgs, **sourceKwargs)
cds = weakref.WeakSet()
self._sources[name] = (source, cds)
self._lastSamples[name] = source.initialSamples()
for cd in list(self._lostCurveDatas):
if self._tryAddToDataSource(cd, name):
self._lostCurveDatas.remote(cds)
def skip(reason):
"""Unconditionally skip a test."""
import types
def decorator(test_item):
#if not isinstance(test_item, (type, types.ClassType)):
# @functools.wraps(test_item)
# def skip_wrapper(*args, **kwargs):
# raise SkipTest(reason)
# test_item = skip_wrapper
#
#test_item.__unittest_skip__ = True
#test_item.__unittest_skip_why__ = reason
#return test_item
# In older version of Python, tracking skipped tests is
# problematic since the test loader and runner handle this
# internally. For this reason, this compatibility wrapper
# simply wraps skipped tests in a functoin that passes
# silently:
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
pass
return skip_wrapper
return decorator
def validate_type(arg_name, arg, valid_types):
if long_exists:
if valid_types is int:
valid_types = (int, long)
elif type(valid_types) is list and int in valid_types and long not in valid_types:
valid_types.append(long)
if type(valid_types) is list:
valid_types = tuple(valid_types)
if (type(valid_types) is type or # single type, not array of types
type(valid_types) is tuple or # several valid types as tuple
type(valid_types) is types.ClassType): # old style class
if isinstance(arg, valid_types):
return
raise STLTypeError(arg_name, type(arg), valid_types)
else:
raise STLError('validate_type: valid_types should be type or list or tuple of types')
# throws STLError if not exactly one argument is present
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def loadTestsFromName(self, name, module=None):
"""Return a suite of all tests cases given a string specifier.
The name may resolve either to a module, a test case class, a
test method within a test case class, or a callable object which
returns a TestCase or TestSuite instance.
The method optionally resolves the names relative to a given module.
"""
parent, obj = self._resolveName(name, module)
if type(obj) == types.ModuleType:
return self.loadTestsFromModule(obj)
elif (isinstance(obj, (type, types.ClassType)) and
issubclass(obj, unittest.TestCase)):
return self.loadTestsFromTestCase(obj)
elif (type(obj) == types.UnboundMethodType and
isinstance(parent, (type, types.ClassType)) and
issubclass(parent, unittest.TestCase)):
return self.loadSpecificTestFromTestCase(parent, obj.__name__)
elif isinstance(obj, unittest.TestSuite):
return obj
elif hasattr(obj, '__call__'):
test = obj()
if isinstance(test, unittest.TestSuite):
return test
elif isinstance(test, unittest.TestCase):
return self.suiteClass([test])
else:
raise TypeError("calling %s returned %s, not a test" %
(obj, test))
else:
raise TypeError("don't know how to make test from: %s" % obj)
def __init__(self,
id_,
structdef,
name=None,
mode="readwrite",
configurationkind=("configure","property"),
description=None,
fget=None,
fset=None,
fval=None):
"""Construct a property of type struct.
@param structdef the structure definition, which is any class with simple_property attributes
"""
# struct properties have been extended to support multiple kinds,
# similar to simple and simplesequence. For backwards compatibility,
# convert string values into tuples.
if isinstance(configurationkind, str):
configurationkind = (configurationkind,)
_property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval)
if type(structdef) is types.ClassType:
raise ValueError("structdef must be a new-style python class (i.e. inherits from object)")
self.structdef = structdef
self.fields = {} # Map field id's to attribute names
for name, attr in self.structdef.__dict__.items():
if type(attr) is simple_property:
self.fields[attr.id_] = (name, attr)
elif type(attr) is simpleseq_property:
self.fields[attr.id_] = (name, attr)
def __init__(self,
id_,
structdef,
name=None,
defvalue=None,
mode="readwrite",
configurationkind=("configure","property"),
description=None,
fget=None,
fset=None,
fval=None):
# structsequence properties have been extended to support multiple
# kinds, similar to simple and simplesequence. For backwards
# compatibility, convert string values into tuples.
if isinstance(configurationkind, str):
configurationkind = (configurationkind,)
_sequence_property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval)
if type(structdef) is types.ClassType:
raise ValueError("structdef must be a new-style python class (i.e. inherits from object)")
self.structdef = structdef
self.fields = {} # Map field id's to attribute names
for name, attr in self.structdef.__dict__.items():
if type(attr) is simple_property:
self.fields[attr.id_] = (name, attr)
elif type(attr) is simpleseq_property:
self.fields[attr.id_] = (name, attr)
self.defvalue = defvalue
def str_to_class(s):
if s in globals() and isinstance(globals()[s], types.ClassType):
return globals()[s]
return None
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=0):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno)
if append:
filters.append(item)
else:
filters.insert(0, item)
def build_opener(*handlers):
"""Create an opener object from a list of handlers.
The opener will use several default handlers, including support
for HTTP, FTP and when applicable, HTTPS.
If any of the handlers passed as arguments are subclasses of the
default handlers, the default handlers will not be used.
"""
import types
def isclass(obj):
return isinstance(obj, (types.ClassType, type))
opener = OpenerDirector()
default_classes = [ProxyHandler, UnknownHandler, HTTPHandler,
HTTPDefaultErrorHandler, HTTPRedirectHandler,
FTPHandler, FileHandler, HTTPErrorProcessor]
if hasattr(httplib, 'HTTPS'):
default_classes.append(HTTPSHandler)
skip = set()
for klass in default_classes:
for check in handlers:
if isclass(check):
if issubclass(check, klass):
skip.add(klass)
elif isinstance(check, klass):
skip.add(klass)
for klass in skip:
default_classes.remove(klass)
for klass in default_classes:
opener.add_handler(klass())
for h in handlers:
if isclass(h):
h = h()
opener.add_handler(h)
return opener
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def __str__(self):
exc = self.exc
if type(exc) is types.ClassType:
exc = exc.__name__
return 'problem in %s - %s: %s' % (self.filename, exc, self.value)
def register(cls, subclass):
"""Register a virtual subclass of an ABC."""
if not isinstance(subclass, (type, types.ClassType)):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=0):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno)
if append:
filters.append(item)
else:
filters.insert(0, item)
def pickle(ob_type, pickle_function, constructor_ob=None):
if type(ob_type) is _ClassType:
raise TypeError("copy_reg is not intended for use with classes")
if not hasattr(pickle_function, '__call__'):
raise TypeError("reduction functions must be callable")
dispatch_table[ob_type] = pickle_function
# The constructor_ob function is a vestige of safe for unpickling.
# There is no reason for the caller to pass it anymore.
if constructor_ob is not None:
constructor(constructor_ob)
def register(cls, subclass):
"""Register a virtual subclass of an ABC."""
if not isinstance(subclass, (type, types.ClassType)):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def _get_service_func(self, method, params):
"""
@raise InvalidServiceMethodError: Calls to private methods are not
allowed.
@raise UnknownServiceMethodError: Unknown method.
@raise InvalidServiceMethodError: Service method must be callable.
"""
service = None
if isinstance(self.service, (type, types.ClassType)):
service = self.service()
else:
service = self.service
if method is not None:
method = str(method)
if method.startswith('_'):
raise InvalidServiceMethodError(
"Calls to private methods are not allowed")
try:
func = getattr(service, method)
except AttributeError:
raise UnknownServiceMethodError(
"Unknown method %s" % str(method))
if not python.callable(func):
raise InvalidServiceMethodError(
"Service method %s must be callable" % str(method))
return func
if not python.callable(service):
raise UnknownServiceMethodError(
"Unknown method %s" % str(self.service))
return service
def register_entry_points_plugins(entry_point="girlfriend.plugin"):
"""???entry_point??????????????
:param entry_point: ?????entry_point
"""
global plugin_manager
third_party_plugin_mgr = extension.ExtensionManager(
namespace=entry_point, invoke_on_load=False)
for ext in third_party_plugin_mgr:
plugin_object, plugin = ext.plugin, None
# ??????
if isinstance(plugin_object, Plugin):
plugin = plugin_object
# ????
elif isinstance(plugin_object, types.ModuleType):
plugin = Plugin.wrap_module(plugin_object)
# ????
elif isinstance(plugin_object, types.FunctionType):
plugin = Plugin.wrap_function(
ext.name, plugin_object.__doc__, plugin_object)
# ???
elif isinstance(plugin_object, (types.ClassType, types.TypeType)):
plugin = Plugin.wrap_class(plugin_object)
# ?????????
else:
raise InvalidPluginException(
u"?? '{}' ??? '{}'????".format(
ext.name,
type(plugin_object).__name__)
)
plugin_manager.register(plugin)
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if not isinstance(test_item, (type, types.ClassType)):
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
test_item = skip_wrapper
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
return decorator
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def isinst(inst,clazz):
if type(inst) != types.InstanceType or type(clazz)!=types.ClassType:
return isinstance(inst,clazz)
cl = inst.__class__
cl2 = getcurrent(cl)
clazz = getcurrent(clazz)
if issubclass(cl2,clazz):
if cl == cl2:
return WAS
else:
inst.__class__ = cl2
return IS
else:
return ISNT
def __call__(self, request, node, model, viewName):
"""Get a name from my namespace.
"""
from twisted.web.woven import widgets
if viewName == "None":
return widgets.DefaultWidget(model)
vc = getattr(self.namespace, viewName, None)
# don't call modules and random crap in the namespace, only widgets
if vc and isinstance(vc, (type, ClassType)) and issubclass(vc, widgets.Widget):
return vc(model)
def loadClass(self, klass):
"""
Given a class which contains test cases, return a sorted list of
C{TestCase} instances.
"""
if not (isinstance(klass, type) or isinstance(klass, types.ClassType)):
raise TypeError("%r is not a class" % (klass,))
if not isTestCase(klass):
raise ValueError("%r is not a test case" % (klass,))
names = self.getTestCaseNames(klass)
tests = self.sort([self._makeCase(klass, self.methodPrefix+name)
for name in names])
return self.suiteFactory(tests)
def instance(klass, d):
if isinstance(klass, types.ClassType):
return new.instance(klass, d)
elif isinstance(klass, type):
o = object.__new__(klass)
o.__dict__ = d
return o
else:
raise TypeError, "%s is not a class" % klass
def _maybeClass(classnamep):
try:
object
except NameError:
isObject = 0
else:
isObject = isinstance(classnamep, type)
if isinstance(classnamep, ClassType) or isObject:
return qual(classnamep)
return classnamep
def setUnjellyableForClassTree(module, baseClass, prefix=None):
"""
Set all classes in a module derived from C{baseClass} as copiers for
a corresponding remote class.
When you have a heirarchy of Copyable (or Cacheable) classes on
one side, and a mirror structure of Copied (or RemoteCache)
classes on the other, use this to setCopierForClass all your
Copieds for the Copyables.
Each copyTag (the \"classname\" argument to getTypeToCopyFor, and
what the Copyable's getTypeToCopyFor returns) is formed from
adding a prefix to the Copied's class name. The prefix defaults
to module.__name__. If you wish the copy tag to consist of solely
the classname, pass the empty string \'\'.
@param module: a module object from which to pull the Copied classes.
(passing sys.modules[__name__] might be useful)
@param baseClass: the base class from which all your Copied classes derive.
@param prefix: the string prefixed to classnames to form the
unjellyableRegistry.
"""
if prefix is None:
prefix = module.__name__
if prefix:
prefix = "%s." % prefix
for i in dir(module):
i_ = getattr(module, i)
if type(i_) == types.ClassType:
if issubclass(i_, baseClass):
setUnjellyableForClass('%s%s' % (prefix, i), i_)
def _unjelly_class(self, rest):
clist = string.split(rest[0], '.')
modName = string.join(clist[:-1], '.')
if not self.taster.isModuleAllowed(modName):
raise InsecureJelly("module %s not allowed" % modName)
klaus = namedObject(rest[0])
if type(klaus) is not types.ClassType:
raise InsecureJelly("class %s unjellied to something that isn't a class: %s" % (repr(rest[0]), repr(klaus)))
if not self.taster.isClassAllowed(klaus):
raise InsecureJelly("class not allowed: %s" % qual(klaus))
return klaus