我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.type()。
def __init__(self, maxlen=132, indent=4, method='smart', prefix='&', suffix='&'): # Line length should be long enough that contintuation lines can host at # east one character apart of indentation and two continuation signs minmaxlen = indent + len(prefix) + len(suffix) + 1 if maxlen < minmaxlen: msg = 'Maximal line length less than {0} when using an indentation'\ ' of {1}'.format(minmaxlen, indent) raise FyppFatalError(msg) self._maxlen = maxlen self._indent = indent self._prefix = ' ' * self._indent + prefix self._suffix = suffix if method not in ['brute', 'smart', 'simple']: raise FyppFatalError('invalid folding type') if method == 'brute': self._inherit_indent = False self._fold_position_finder = self._get_maximal_fold_pos elif method == 'simple': self._inherit_indent = True self._fold_position_finder = self._get_maximal_fold_pos elif method == 'smart': self._inherit_indent = True self._fold_position_finder = self._get_smart_fold_pos
def _get_callable_argspec_py3(func): sig = inspect.signature(func) args = [] defaults = {} vararg = None for param in sig.parameters.values(): if param.kind == param.POSITIONAL_OR_KEYWORD: args.append(param.name) if param.default != param.empty: defaults[param.name] = param.default elif param.kind == param.VAR_POSITIONAL: vararg = param.name else: msg = "argument '{0}' has invalid argument type".format(param.name) raise FyppFatalError(msg) return args, defaults, vararg # Signature objects are available from Python 3.3 (and deprecated from 3.5)
def validate(self, val) : if self.code.value in DBUS.int_convert : val = DBUS.int_convert[self.code.value](val) elif self.code == TYPE.DOUBLE : if not isinstance(val, float) : raise TypeError("expecting a float, not %s: %s" % (type(val).__name__, repr(val))) #end if elif self.code == TYPE.UNIX_FD : val = DBUS.subtype_uint32(val) elif DBUS.basic_to_ctypes[self.code.value] == ct.c_char_p : if not isinstance(val, str) : raise TypeError("expecting a string, not %s: %s" % (type(val).__name__, repr(val))) #end if else : raise RuntimError("unknown basic type %s" % repr(self.code)) #end if return \ val #end validate #end BasicType
def bus_get_unix_user_async(self, name, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "GetConnectionUnixUser" ) message.append_objects("s", name) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) result = None else : result = reply.expect_return_objects("u")[0] #end if return \ result #end bus_get_unix_user_async
def bus_request_name_async(self, name, flags, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : "asks the D-Bus daemon to register the specified bus name on your behalf. flags is" \ " a combination of NAME_FLAG_xxx bits. Result will be a REQUEST_NAME_REPLY_xxx value" \ " or None on error." message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "RequestName" ) message.append_objects("su", name, flags) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) result = None else : result = reply.expect_return_objects("u")[0] #end if return \ result #end bus_request_name_async
def bus_release_name_async(self, name, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : "asks the D-Bus daemon to release your registration of the specified bus name." message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "ReleaseName" ) message.append_objects("s", name) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) result = None else : result = reply.expect_return_objects("u")[0] #end if return \ result #end bus_release_name_async
def bus_name_has_owner_async(self, name, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : "asks the D-Bus daemon if anybody has claimed the specified bus name." message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "NameHasOwner" ) message.append_objects("s", name) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) result = None else : result = reply.expect_return_objects("b")[0] #end if return \ result #end bus_name_has_owner_async
def bus_start_service_by_name_async(self, name, flags = 0, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "StartServiceByName" ) message.append_objects("su", name, flags) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) result = None else : result = reply.expect_return_objects("u")[0] #end if return \ result #end bus_start_service_by_name
def bus_remove_match_async(self, rule, error = None, timeout = DBUS.TIMEOUT_USE_DEFAULT) : "removes a previously-added match rule for messages you previously wanted" \ " to receive." message = Message.new_method_call \ ( destination = DBUS.SERVICE_DBUS, path = DBUS.PATH_DBUS, iface = DBUS.INTERFACE_DBUS, method = "RemoveMatch" ) message.append_objects("s", rule) reply = await self.send_await_reply(message, timeout = timeout) if error != None and reply.type == DBUS.MESSAGE_TYPE_ERROR : reply.set_error(error) else : reply.expect_return_objects("") #end if #end bus_remove_match_async
def basic(self) : "returns the argument value, assuming it is of a non-container type." argtype = self.arg_type c_result_type = DBUS.basic_to_ctypes[argtype] c_result = c_result_type() dbus.dbus_message_iter_get_basic(self._dbobj, ct.byref(c_result)) if c_result_type == ct.c_char_p : result = c_result.value.decode() else : result = c_result.value #end if if argtype in DBUS.basic_subclasses : result = DBUS.basic_subclasses[argtype](result) #end if return \ result #end basic
def fixed_array(self) : "returns the array elements, assuming the current argument is an array" \ " with a non-container element type." c_element_type = DBUS.basic_to_ctypes[self.element_type] c_result = ct.POINTER(c_element_type)() c_nr_elts = ct.c_int() dbus.dbus_message_iter_get_fixed_array(self._dbobj, ct.byref(c_result), ct.byref(c_nr_elts)) result = [] for i in range(c_nr_elts.value) : elt = c_result[i] if c_element_type == ct.c_char_p : elt = elt.value.decode() else : elt = elt.value #end if result.append(elt) #end for return \ result #end fixed_array #end ExtractIter
def append_basic(self, type, value) : "appends a single value of a non-container type." if type in DBUS.int_convert : value = DBUS.int_convert[type](value) #end if c_type = DBUS.basic_to_ctypes[type] if c_type == ct.c_char_p : if not isinstance(value, str) : raise TypeError \ ( "expecting type %s, got %s" % (TYPE(type), builtins.type(value).__name__) ) #end if value = value.encode() #end if c_value = c_type(value) if not dbus.dbus_message_iter_append_basic(self._dbobj, type, ct.byref(c_value)) : raise CallFailed("dbus_message_iter_append_basic") #end if return \ self #end append_basic
def append_fixed_array(self, element_type, values) : "appends an array of elements of a non-container type." c_elt_type = DBUS.basic_to_ctypes[element_type] nr_elts = len(values) c_arr = (nr_elts * c_elt_type)() for i in range(nr_elts) : if c_elt_type == ct.c_char_p : c_arr[i] = values[i].encode() else : c_arr[i] = values[i] #end if #end for c_arr_ptr = ct.pointer(c_arr) if not dbus.dbus_message_iter_append_fixed_array(self._dbobj, element_type, ct.byref(c_arr_ptr), nr_elts) : raise CallFailed("dbus_message_iter_append_fixed_array") #end if return \ self #end append_fixed_array
def open_container(self, type, contained_signature) : "starts appending an argument of a container type, returning a sub-iterator" \ " for appending the contents of the argument. Can be called recursively for" \ " containers of containers etc." if contained_signature != None : c_sig = contained_signature.encode() else : c_sig = None #end if subiter = builtins.type(self)(self) if not dbus.dbus_message_iter_open_container(self._dbobj, type, c_sig, subiter._dbobj) : raise CallFailed("dbus_message_iter_open_container") #end if return \ subiter #end open_container
def add_conversion(self, predicate, conversion): """ Add a validation on values assigned to the field predicate: Only perform the conversion if this returns True. conversion: A conversion function. The returned value must be of the type of the field. Calling this method will modify the preprocess function. """ if isinstance(predicate, type): typ = predicate def predicate(obj): return isinstance(obj, typ) orig_preprocess = self.preprocess def new_preprocess(obj): if predicate(obj): obj = conversion(obj) assert isinstance(obj, self.type), 'Conversion %s converted to %s - expected %s' % ( conversion, type(obj), self.type) return orig_preprocess(obj) self.preprocess = new_preprocess
def __init__(self, **kwargs): for field in self._fields: if field.collection_type is not None: value = field.collection_type(self, field) try: raw_value = kwargs.pop(field.name) except KeyError: if field.default is not MANDATORY: value._set(deepcopy(field.default)) else: value._set(raw_value) else: try: value = kwargs.pop(field.name) except KeyError: if field.default is MANDATORY: raise MissingField(typed_struct=type(self), field=field) from None value = deepcopy(field.default) field.__set__(self, value) if kwargs: raise NotFields(typed_struct=type(self), field_names=', '.join(kwargs.keys()))
def empty_here(self, path): print('EMPTY here:', path); RETURNED_MESSAGE = ''' <html> <head/> <body> <h3>No file nor folder here.Here 's empty.Click [new-folder] to creat a folder here:<br/>%s</h3> </body> </html> ''' % (self.path); ENC_RETURNED_MESSAGE = RETURNED_MESSAGE.encode(DEFAULT_ENC, 'surrogateescape') f = io.BytesIO(); f.write(ENC_RETURNED_MESSAGE); f.seek(0); self.send_response(200); self.send_header("Content-type", "text/html; charset=%s" % DEFAULT_ENC); self.send_header("Content-Length", str(len(ENC_RETURNED_MESSAGE))); self.end_headers(); return f;
def is_basic(self) : "does this code represent a basic (non-container) type." return \ self.value in DBUS.basic_to_ctypes #end is_basic #end TYPE
def __repr__(self) : return \ "%s(sig = %s)" % (type(self).__name__, repr(self.signature)) #end __repr__ #end Type
def __repr__(self) : return \ "%s(%s)" % (type(self).__name__, repr(self.code)) #end __repr__
def __repr__(self) : return \ "%s()" % type(self).__name__ #end __repr__
def __init__(self, *types) : if len(types) == 0 : raise TypeError("must have at least one element type") #end if if not all(isinstance(t, Type) for t in types) : raise TypeError("struct elements must be Types") #end if self.elttypes = tuple(types) #end __init__
def __repr__(self) : return \ "%s(%s)" % (type(self).__name__, repr(self.elttypes)) #end __repr__
def validate(self, val) : if not isinstance(val, (tuple, list)) or len(val) != len(self.elttypes) : raise TypeError \ ( "need a list or tuple of %d elements, not %s" % (len(self.elttypes), repr(val)) ) #end if return \ type(val)(elttype.validate(elt) for elttype, elt in zip(self.elttypes, val)) #end validate #end StructType
def __init__(self, elttype) : if not isinstance(elttype, Type) : raise TypeError("invalid array element type") #end if self.elttype = elttype #end __init__
def __repr__(self) : return \ "%s[%s]" % (type(self).__name__, repr(self.elttype)) #end __repr__
def __init__(self, keytype, valuetype) : if not isinstance(keytype, BasicType) or not isinstance(valuetype, Type) : raise TypeError("invalid dict key/value type") #end if self.keytype = keytype self.valuetype = valuetype #end keytype
def __repr__(self) : return \ "%s[%s : %s]" % (type(self).__name__, repr(self.keytype), repr(self.valuetype)) #end __repr__
def validate(self, val) : if not isinstance(val, dict) : raise TypeError("need a dict, not %s: %s" % (type(val).__name__, repr(val))) #end if return \ type(val) \ ( (self.keytype.validate(key), self.valuetype.validate(val[key])) for key in val ) #end validate #end DictType
def bus_get(celf, type, private, error = None) : "returns a Connection to one of the predefined D-Bus buses; type is a BUS_xxx value." error, my_error = _get_error(error) result = (dbus.dbus_bus_get, dbus.dbus_bus_get_private)[private](type, error._dbobj) my_error.raise_if_set() if result != None : result = celf(result) #end if return \ result #end bus_get
def new(celf, type) : "type is one of the DBUS.MESSAGE_TYPE_xxx codes. Using one of the type-specific" \ " calls--new_error, new_method_call, new_method_return, new_signal--is probably" \ " more convenient." result = dbus.dbus_message_new(type) if result == None : raise CallFailed("dbus_message_new") #end if return \ celf(result) #end new
def new_error(self, name, message) : "creates a new DBUS.MESSAGE_TYPE_ERROR message that is a reply to this Message." result = dbus.dbus_message_new_error(self._dbobj, name.encode(), (lambda : None, lambda : message.encode())[message != None]()) if result == None : raise CallFailed("dbus_message_new_error") #end if return \ type(self)(result) #end new_error # probably not much point trying to use new_error_printf
def new_method_return(self) : "creates a new DBUS.MESSAGE_TYPE_METHOD_RETURN that is a reply to this Message." result = dbus.dbus_message_new_method_return(self._dbobj) if result == None : raise CallFailed("dbus_message_new_method_return") #end if return \ type(self)(result) #end new_method_return
def copy(self) : "creates a copy of this Message." result = dbus.dbus_message_copy(self._dbobj) if result == None : raise CallFailed("dbus_message_copy") #end if return \ type(self)(result) #end copy
def arg_type(self) : "the type code for this argument." return \ dbus.dbus_message_iter_get_arg_type(self._dbobj) #end arg_type
def element_type(self) : "the contained element type of this argument, assuming it is of a container type." return \ dbus.dbus_message_iter_get_element_type(self._dbobj) #end element_type
def recurse(self) : "creates a sub-iterator for recursing into a container argument." subiter = type(self)(self) dbus.dbus_message_iter_recurse(self._dbobj, subiter._dbobj) return \ subiter #end recurse
def object(self) : "returns the current iterator item as a Python object. Will recursively" \ " process container objects." argtype = self.arg_type if argtype in DBUS.basic_to_ctypes : result = self.basic elif argtype == DBUS.TYPE_ARRAY : if self.element_type == DBUS.TYPE_DICT_ENTRY : result = {} subiter = self.recurse() while True : entry = next(subiter, None) if entry == None or entry.arg_type == DBUS.TYPE_INVALID : # TYPE_INVALID can be returned for an empty dict break assert entry.arg_type == DBUS.TYPE_DICT_ENTRY, "invalid dict entry type %d" % entry.arg_type key, value = tuple(x.object for x in entry.recurse()) result[key] = value #end while else : result = list(x.object for x in self.recurse()) if len(result) != 0 and result[-1] == None : # fudge for iterating into an empty array result = result[:-1] #end if #end if elif argtype == DBUS.TYPE_STRUCT : result = list(x.object for x in self.recurse()) elif argtype == DBUS.TYPE_VARIANT : subiter = self.recurse() subiter = next(subiter) result = (DBUS.Signature(subiter.signature), subiter.object) elif argtype == DBUS.TYPE_INVALID : # fudge for iterating into an empty array result = None else : raise RuntimeError("unrecognized argtype %d" % argtype) #end if return \ result #end object
def expect_return_objects(self, signature) : "expects the Message to be of type DBUS.MESSAGE_TYPE_METHOD_RETURN and its" \ " arguments to conform to the given signature. Raises the appropriate DBusError" \ " if the Message is of type DBUS.MESSAGE_TYPE_ERROR." if self.type == DBUS.MESSAGE_TYPE_METHOD_RETURN : result = self.expect_objects(signature) elif self.type == DBUS.MESSAGE_TYPE_ERROR : raise DBusError(self.error_name, self.expect_objects("s")[0]) else : raise ValueError("unexpected message type %d" % self.type) #end if return \ result #end expect_return_objects
def __getitem__(self, index) : if not isinstance(index, int) or index < 0 or index >= self._nrelts : raise IndexError("AddressEntries[%d] out of range" % index) #end if return \ type(self).Entry(self, index) #end __getitem__ #end AddressEntries
def recurse(self) : subiter = type(self)() dbus.dbus_signature_iter_recurse(self._dbobj, subiter._dbobj) return \ subiter #end recurse
def parse_single_signature(signature) : result = parse_signature(signature) if len(result) != 1 : raise ValueError("only single type expected") #end if return \ result[0] #end parse_single_signature
def signature_validate_single(signature, error = None) : "is signature a single valid type." error, my_error = _get_error(error) result = dbus.dbus_signature_validate_single(signature.encode(), error._dbobj) != 0 my_error.raise_if_set() return \ result #end signature_validate_single
def __init__(self, *, name = None, type, direction, annotations = ()) : if not isinstance(direction, Introspection.DIRECTION) : raise TypeError("direction must be an Introspection.DIRECTION.xxx enum") #end if self.name = name self.type = parse_single_signature(type) self.direction = direction self.annotations = Introspection._get_annotations(annotations) #end __init__ #end Arg
def in_signature(self) : return \ list(a.type for a in self.args if a.direction == Introspection.DIRECTION.IN) #end in_signature
def out_signature(self) : return \ list \ (a.type for a in self.args if a.direction == Introspection.DIRECTION.OUT) #end out_signature
def __init__(self, *, name = None, type, direction = None, annotations = ()) : if direction != None and direction != Introspection.DIRECTION.OUT : raise ValueError("direction can only be Introspection.DIRECTION.OUT") #end if self.name = name self.type = parse_single_signature(type) self.direction = direction self.annotations = Introspection._get_annotations(annotations) #end __init__ #end Arg
def in_signature(self) : return \ list(a.type for a in self.args) #end in_signature #end Signal
def _validate_type(self, value): if not isinstance(value, self.type): raise FieldTypeMismatch(field=self, expected_type=self.type, received_type=type(value)) from None return value
def _process_new_key(self, key): if not isinstance(key, self._field.key_type): raise FieldKeyTypeMismatch(field=self._field, expected_type=self._field.key_type, received_type=type(key)) return key