Python functools 模块,wraps() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.wraps()。
def auth(func):
@wraps(func)
def _decorator(request, *args, **kwargs):
auth = get_auth()
if auth.get('disable', False) is True:
return func(request, *args, **kwargs)
if 'authorized_ips' in auth:
ip = get_client_ip(request)
if is_authorized(ip, auth['authorized_ips']):
return func(request, *args, **kwargs)
prepare_credentials(auth)
if request.META.get('HTTP_AUTHORIZATION'):
authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ')
if authmeth.lower() == 'basic':
auth = base64.b64decode(auth).decode('utf-8')
username, password = auth.split(':')
if (username == HEARTBEAT['auth']['username'] and
password == HEARTBEAT['auth']['password']):
return func(request, *args, **kwargs)
response = HttpResponse(
"Authentication failed", status=401)
response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"'
return response
return _decorator
def meta_compile(compile):
"Wraps the compile() method to do dark magic, see `meta_shellcode`."
@functools.wraps(compile)
def compile_wrapper(self, state = None):
if state is None:
state = CompilerState()
self.offset = state.offset
if hasattr(self, '_compile_hook'):
bytes = self._compile_hook(compile, state)
else:
bytes = compile(self, state)
state.next_piece( len(bytes) )
return bytes
return compile_wrapper
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def set_package(fxn):
"""Set __package__ on the returned module.
This function is deprecated.
"""
@functools.wraps(fxn)
def set_package_wrapper(*args, **kwargs):
warnings.warn('The import system now takes care of this automatically.',
DeprecationWarning, stacklevel=2)
module = fxn(*args, **kwargs)
if getattr(module, '__package__', None) is None:
module.__package__ = module.__name__
if not hasattr(module, '__path__'):
module.__package__ = module.__package__.rpartition('.')[0]
return module
return set_package_wrapper
def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
file = self.__dict__['file']
a = getattr(file, name)
if hasattr(a, '__call__'):
func = a
@_functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs)
# Avoid closing the file as long as the wrapper is alive,
# see issue #18879.
func_wrapper._closer = self._closer
a = func_wrapper
if not isinstance(a, int):
setattr(self, name, a)
return a
# The underlying __enter__ method returns the wrong object
# (self.file) so override it to return the wrapper
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
logger.info ("get request without token, path: %s" % request.path)
return json.dumps({'success':'false', 'message':'user or key is null'})
result = post_to_user("/authtoken/", {'token':token})
if result.get('success') == 'true':
username = result.get('username')
beans = result.get('beans')
else:
return result
#if (cur_user == None):
# return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(username, beans, request.form, *args, **kwargs)
return wrapper
def test_wraps(self):
self.assert_ok("""\
from functools import wraps
def my_decorator(f):
dec = wraps(f)
def wrapper(*args, **kwds):
print('Calling decorated function')
return f(*args, **kwds)
wrapper = dec(wrapper)
return wrapper
@my_decorator
def example():
'''Docstring'''
return 17
assert example() == 17
""")
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def ensure_empty_collections(*collections):
"""
Clears collections listed after function has completed.
Will throw an assertion if any collection is not empty when called.
"""
def clear(f):
@wraps(f)
def wrapper(*args, **kwargs):
db = api.common.get_conn()
collection_size = lambda name: len(list(db[name].find()))
for collection in collections:
assert collection_size(collection) == 0, "Collection was not empty: " + collection
result = f(*args, **kwargs)
return result
return wrapper
return clear
def clear_collections(*collections):
"""
Clears collections listed after function has completed.
Will throw an assertion if any collection is not empty when called.
"""
def clear(f):
@wraps(f)
def wrapper(*args, **kwargs):
db = api.common.get_conn()
try:
result = f(*args, **kwargs)
finally:
#Clear the collections.
for collection in collections:
db[collection].remove()
#Ensure they are then empty.
for collection in collections:
collection_size = lambda collection: len(list(db[collection].find()))
assert collection_size(collection) == 0, "Collection: {} was not able to be cleared.".format(collection)
return result
return wrapper
return clear
def block_before_competition(return_result):
"""
Wraps a routing function that should be blocked before the start time of the competition
"""
def decorator(f):
"""
Inner decorator
"""
@wraps(f)
def wrapper(*args, **kwds):
if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp():
return f(*args, **kwds)
else:
return return_result
return wrapper
return decorator
def _silent_connection_failure(func):
"""Decorator used to avoid raising an exception when the database timeouts
Parameters:
func: Function to decorate.
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""Wraps the function to catch timeout exception.
"""
if not FLAGS.disable_mongodb_exception:
return func(*args, **kwargs)
try:
result = func(*args, **kwargs)
except pymongo.errors.ServerSelectionTimeoutError as e:
logging.error("Unable to reach the caching server: %s", e)
return None
return result
return wrapper
def array_stream(func):
"""
Decorates streaming functions to make sure that the stream
is a stream of ndarrays. Objects that are not arrays are transformed
into arrays. If the stream is in fact a single ndarray, this ndarray
is repackaged into a sequence of length 1.
The first argument of the decorated function is assumed to be an iterable of
arrays, or an iterable of objects that can be casted to arrays.
"""
@wraps(func) # thanks functools
def decorated(arrays, *args, **kwargs):
if isinstance(arrays, ndarray):
arrays = (arrays,)
return func(map(atleast_1d, arrays), *args, **kwargs)
return decorated
def call(f):
"""
Wrap a function in a processor that calls `f` on the argument before
passing it along.
Useful for creating simple arguments to the `@preprocess` decorator.
Parameters
----------
f : function
Function accepting a single argument and returning a replacement.
Usage
-----
>>> @preprocess(x=call(lambda x: x + 1))
... def foo(x):
... return x
...
>>> foo(1)
2
"""
@wraps(f)
def processor(func, argname, arg):
return f(arg)
return processor
def require_initialized(exception):
"""
Decorator for API methods that should only be called after
TradingAlgorithm.initialize. `exception` will be raised if the method is
called before initialize has completed.
Usage
-----
@require_initialized(SomeException("Don't do that!"))
def method(self):
# Do stuff that should only be allowed after initialize.
"""
def decorator(method):
@wraps(method)
def wrapped_method(self, *args, **kwargs):
if not self.initialized:
raise exception
return method(self, *args, **kwargs)
return wrapped_method
return decorator
def coerce_numbers_to_my_dtype(f):
"""
A decorator for methods whose signature is f(self, other) that coerces
``other`` to ``self.dtype``.
This is used to make comparison operations between numbers and `Factor`
instances work independently of whether the user supplies a float or
integer literal.
For example, if I write::
my_filter = my_factor > 3
my_factor probably has dtype float64, but 3 is an int, so we want to coerce
to float64 before doing the comparison.
"""
@wraps(f)
def method(self, other):
if isinstance(other, Number):
other = coerce_to_dtype(self.dtype, other)
return f(self, other)
return method
def with_algo(f):
name = f.__name__
if not name.startswith('test_'):
raise ValueError('This must decorate a test case')
tfm_name = name[len('test_'):]
@wraps(f)
def wrapper(self, data_frequency, days=None):
sim_params, source = self.sim_and_source[data_frequency]
algo = TradingAlgorithm(
initialize=initialize_with(self, tfm_name, days),
handle_data=handle_data_wrapper(f),
sim_params=sim_params,
env=self.env,
)
algo.run(source)
return wrapper
def authorized(admin_only=False):
def wrap(user_handler):
@wraps(user_handler)
def authorized_handler(self, *args, **kwargs):
self.set_cache(is_public=False)
request = self.request
if request.method == 'GET':
if not self.current_user_id:
next_url = self.get_argument('next', '/')
self.redirect(self.get_login_url() + "?next=" + next_url, status=302 if request.version == 'HTTP/1.0' else 303)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
elif not self.current_user_id:
raise HTTPError(403)
elif admin_only and not self.is_admin:
raise HTTPError(403)
else:
user_handler(self, *args, **kwargs)
return authorized_handler
return wrap
def memoized(func):
"""Decorator that caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned, and
the function is not re-evaluated.
Based upon from http://wiki.python.org/moin/PythonDecoratorLibrary#Memoize
Nota bene: this decorator memoizes /all/ calls to the function.
For a memoization decorator with limited cache size, consider:
http://code.activestate.com/recipes/496879-memoize-decorator-function-with-cache-size-limit/
"""
cache = {}
@wraps(func)
def func_wrapper(*args, **kwargs):
key = cPickle.dumps((args, kwargs))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
return func_wrapper
def calculate_mantl_vars(func):
"""calculate Mantl vars"""
@wraps(func)
def inner(*args, **kwargs):
name, attrs, groups = func(*args, **kwargs)
# attrs
if attrs.get('role', '') == 'control':
attrs['consul_is_server'] = True
else:
attrs['consul_is_server'] = False
# groups
if attrs.get('publicly_routable', False):
groups.append('publicly_routable')
return name, attrs, groups
return inner
def _check_inputs_and_outputs(fcn):
@functools.wraps(fcn)
def call(conn, inputs, outputs, identifier, *a, **kw):
assert isinstance(inputs, (list, tuple, set)), inputs
assert isinstance(outputs, (list, tuple, set)), outputs
assert '' not in inputs, inputs
assert '' not in outputs, outputs
# this is for actually locking inputs/outputs
inputs, outputs = list(map(str, inputs)), list(map(str, outputs))
locks = inputs + [''] + outputs
if kw.pop('history', None):
igraph = [EDGE_RE.sub('*', inp) for inp in inputs]
ograph = [EDGE_RE.sub('*', out) for out in outputs]
graph_id = EDGE_RE.sub('*', str(identifier))
graph = igraph + [''] + ograph + ['', graph_id]
if all(x.startswith('test.') for x in igraph + ograph):
graph = ['', '']
else:
graph = ['', '']
return fcn(conn, locks, graph, str(identifier), *a, **kw)
return call
def immutable(cls):
cls.__frozen = False
def frozensetattr(self, key, value):
if self.__frozen and not hasattr(self, key):
print("Class {} is frozen. Cannot set {} = {}"
.format(cls.__name__, key, value))
else:
object.__setattr__(self, key, value)
def init_decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
func(self, *args, **kwargs)
self.__frozen = True
return wrapper
cls.__setattr__ = frozensetattr
cls.__init__ = init_decorator(cls.__init__)
return cls
def set_self_blocking(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
self = args[0]
try:
_is_blocking = self.gettimeout()
if _is_blocking == 0:
self.setblocking(True)
return function(*args, **kwargs)
except Exception as e:
raise
finally:
# set orgin blocking
if _is_blocking == 0:
self.setblocking(False)
return wrapper
def adapt_rgb(apply_to_rgb):
"""Return decorator that adapts to RGB images to a gray-scale filter.
This function is only intended to be used for functions that don't accept
volumes as input, since checking an image's shape is fragile.
Parameters
----------
apply_to_rgb : function
Function that returns a filtered image from an image-filter and RGB
image. This will only be called if the image is RGB-like.
"""
def decorator(image_filter):
@functools.wraps(image_filter)
def image_filter_adapted(image, *args, **kwargs):
if is_rgb_like(image):
return apply_to_rgb(image_filter, image, *args, **kwargs)
else:
return image_filter(image, *args, **kwargs)
return image_filter_adapted
return decorator
def patch(obj, func_name, before_func, after_func):
if not hasattr(obj, func_name):
return
target_func = getattr(obj, func_name)
# already patched
if hasattr(target_func, '__stackimpact_orig__'):
return
@wraps(target_func)
def wrapper(*args, **kwds):
if before_func:
before_func(*args, **kwds)
ret = target_func(*args, **kwds)
if after_func:
after_func(ret)
return ret
wrapper.__orig__ = target_func
setattr(obj, func_name, wrapper)
def adapted(func):
@wraps(func)
def wrapper(**kwargs):
final_args = {}
for name, value in kwargs.items():
adapt = func.__annotations__.get(name)
if adapt is not None:
final_args[name] = adapt(value)
else:
final_args[name] = value
result = func(**final_args)
adapt = func.__annotations__.get('return')
if adapt is not None:
return adapt(result)
return result
return wrapper
def admin_access():
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def __call__(self, func_or_msg=None, **kwargs):
""" If the first argument is a function, return a decorator which runs
the wrapped function inside Timer's context manager.
Otherwise, treat the first argument as a 'msg' string and return an updated
Timer instance, referencing the same logger.
A 'level' keyword can also be passed to override self.level.
"""
if isinstance(func_or_msg, collections.Callable):
func = func_or_msg
# use the function name when no explicit 'msg' is provided
if not self.msg:
self.msg = "run '%s'" % func.__name__
@wraps(func)
def wrapper(*args, **kwds):
with self:
return func(*args, **kwds)
return wrapper
else:
msg = func_or_msg or kwargs.get("msg")
level = kwargs.get("level", self.level)
return self.__class__(self.logger, msg, level)
def retry(times=3):
def wrapper(func):
@wraps(func)
def fun(*args, **kwargs):
count = 0
while count < times:
try:
return func(*args, **kwargs)
except Exception as e:
count = count + 1
logger.error("connection failed after retried 3 times. {} {}".format(args, kwargs))
raise Exception("connection failed after retried 3 times. {} {}".format(args, kwargs))
return fun
return wrapper
def verify_request_signature(func):
@wraps(func)
def decorated(*args, **kwargs):
signature = request.headers.get('x-hub-signature', None)
if signature:
elements = signature.split('=')
method = elements[0]
signature_hash = elements[1]
expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest()
if signature_hash != expected_hash:
LOGGER.error('Signature was invalid')
return make_response('', 403)
else:
LOGGER.error('Could not validate the signature')
return func(*args, **kwargs)
return decorated
def logger(command):
"""
Add a logger to the decorated command.
"""
@wraps(command)
# pylint: disable=bad-whitespace
# pylint: disable=unused-argument
def wrapper(bot, update, **kwargs):
command( update, **kwargs)
if command.__name__ == 'unknown':
command.__name__ = _get_command_name(update.message.text)
message = LOG_TEMPLATE.format(user=update.user.first_name,
command=command.__name__)
logging.info(message)
return wrapper
def run_in_executor(f):
"""
A decorator to run the given method in the ThreadPoolExecutor.
"""
@wraps(f)
def new_f(self, *args, **kwargs):
if self.is_shutdown:
return
try:
future = self.executor.submit(f, self, *args, **kwargs)
future.add_done_callback(_future_completed)
except Exception:
log.exception("Failed to submit task to executor")
return new_f
def handle_bad_input_mc(f):
@wraps(f)
def handle(*args, **kwargs):
pre_type = args[0].pre_type
if pre_type == PreType.None:
return handle_bad_input(f)(*args, **kwargs)
EType = {
PreType.SimplePre : SimplePre.InvalidMcOperation,
PreType.SimplePreLAG : SimplePreLAG.InvalidMcOperation
}[pre_type]
Codes = {
PreType.SimplePre : SimplePre.McOperationErrorCode,
PreType.SimplePreLAG : SimplePreLAG.McOperationErrorCode
}[pre_type]
try:
return handle_bad_input(f)(*args, **kwargs)
except EType as e:
error = Codes._VALUES_TO_NAMES[e.code]
print "Invalid PRE operation (%s)" % error
return handle
def validate(validation_class, silent=not settings.DEBUG):
def decorator(fun):
@functools.wraps(fun)
def wrapper(*a, **kw):
if flask.request.method == 'GET':
p = {k: v for k, v in flask.request.values and flask.request.values.items() or {}}
else:
try:
p = flask.request.data and json.loads(flask.request.data.decode())
except Exception:
raise exceptions.WrongParametersException
if silent:
try:
validation_class(p)
except PyCombValidationError:
raise exceptions.WrongParametersException
else:
validation_class(p)
return fun(*a, params=p, **kw)
return wrapper
return decorator
def handle_errors(f):
"""A decorator that allows to ignore certain types of errors."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
param_name = 'ignore_errors'
ignored_errors = kwargs.get(param_name, tuple())
if param_name in kwargs:
del kwargs[param_name]
try:
return f(*args, **kwargs)
except ignored_errors:
# Silently ignore errors
pass
return wrapper
def add_arg_scope(func):
"""Decorates a function with args so it can be used within an arg_scope.
Args:
func: function to decorate.
Returns:
A tuple with the decorated function func_with_args().
"""
@functools.wraps(func)
def func_with_args(*args, **kwargs):
current_scope = _current_arg_scope()
current_args = kwargs
key_func = (func.__module__, func.__name__)
if key_func in current_scope:
current_args = current_scope[key_func].copy()
current_args.update(kwargs)
return func(*args, **current_args)
_add_op(func)
return func_with_args
def locked(path, timeout=None):
"""Decorator which enables locks for decorated function.
Arguments:
- path: path for lockfile.
- timeout (optional): Timeout for acquiring lock.
Usage:
@locked('/var/run/myname', timeout=0)
def myname(...):
...
"""
def decor(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock = FileLock(path, timeout=timeout)
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
return decor
def overloaded(func):
@wraps(func)
def overloaded_func(*args, **kwargs):
for f in overloaded_func.overloads:
try:
return f(*args, **kwargs)
except TypeError:
pass
else:
# it will be nice if the error message prints a list of
# possible signatures here
raise TypeError("No compatible signatures")
def overload_with(func):
overloaded_func.overloads.append(func)
return overloaded_func
overloaded_func.overloads = [func]
overloaded_func.overload_with = overload_with
return overloaded_func
#############
def consumer(func):
"""A decorator, advances func to its first yield point when called.
Modifed this original example code from PEP 342 to use the new
functools.wraps decorator. This convenience function makes it look
like the original function, which is almost always what we want,
especially if we designed the original function to be wrapped in
the first place!
Maybe `consumer` should go into functools too!
"""
from functools import wraps
@wraps(func)
def wrapper(*args,**kw):
gen = func(*args, **kw)
gen.next()
return gen
return wrapper
def retry_with(handle_retry, exceptions, conditions, max_attempts):
def wrapper(func):
@functools.wraps(func)
def decorated(*args, **kwargs):
error = None
result = None
for attempt in range(1, max_attempts + 1):
try:
result = func(*args, **kwargs)
if any(guard(result) for guard in conditions):
handle_retry.retry(func, args, kwargs, None, attempt)
continue
return result
except Exception as err:
error = err
if any(isinstance(err, exc) for exc in exceptions):
handle_retry.retry(func, args, kwargs, err, attempt)
continue
break
logger.info('failed after {} attempts'.format(attempt))
if error is not None:
raise error
return result
return decorated
return wrapper
def restart_on_change(restart_map, stopstart=False, restart_functions=None):
"""Restart services based on configuration files changing
This function is used a decorator, for example::
@restart_on_change({
'/etc/ceph/ceph.conf': [ 'cinder-api', 'cinder-volume' ]
'/etc/apache/sites-enabled/*': [ 'apache2' ]
})
def config_changed():
pass # your code here
In this example, the cinder-api and cinder-volume services
would be restarted if /etc/ceph/ceph.conf is changed by the
ceph_client_changed function. The apache2 service would be
restarted if any file matching the pattern got changed, created
or removed. Standard wildcards are supported, see documentation
for the 'glob' module for more information.
@param restart_map: {path_file_name: [service_name, ...]
@param stopstart: DEFAULT false; whether to stop, start OR restart
@param restart_functions: nonstandard functions to use to restart services
{svc: func, ...}
@returns result from decorated function
"""
def wrap(f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
return restart_on_change_helper(
(lambda: f(*args, **kwargs)), restart_map, stopstart,
restart_functions)
return wrapped_f
return wrap
def cached(func):
"""Cache return values for multiple executions of func + args
For example::
@cached
def unit_get(attribute):
pass
unit_get('test')
will cache the result of unit_get + 'test' for future calls.
"""
@wraps(func)
def wrapper(*args, **kwargs):
global cache
key = str((func, args, kwargs))
try:
return cache[key]
except KeyError:
pass # Drop out of the exception handler scope.
res = func(*args, **kwargs)
cache[key] = res
return res
wrapper._wrapped = func
return wrapper
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
@wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)
except from_exc:
raise to_exc
return inner_translate_exc2
return inner_translate_exc1
def os_requires_version(ostack_release, pkg):
"""
Decorator for hook to specify minimum supported release
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args):
if os_release(pkg) < ostack_release:
raise Exception("This hook is not supported on releases"
" before %s" % ostack_release)
f(*args)
return wrapped_f
return wrap
def os_workload_status(configs, required_interfaces, charm_func=None):
"""
Decorator to set workload status based on complete contexts
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args, **kwargs):
# Run the original function first
f(*args, **kwargs)
# Set workload status now that contexts have been
# acted on
set_os_workload_status(configs, required_interfaces, charm_func)
return wrapped_f
return wrap
def admin_login_required(method):
def is_admin(user):
if isinstance(user.is_admin, bool):
return user.is_admin
else:
return user.is_admin()
@functools.wraps(method)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated:
flash("This section is for logged in users only.", 'warning')
return redirect(url_for('redberry.home'))
if not hasattr(current_user, 'is_admin'):
flash("Redberry expects your user instance to implement an `is_admin` boolean attribute "
"or an `is_admin()` method.", 'warning')
return redirect(url_for('redberry.home'))
if not is_admin(current_user):
flash("This section is for admin users only.", 'warning')
return redirect(url_for('redberry.home'))
return method(*args, **kwargs)
return wrapper
############
# CMS ROUTES
############
def with_context_manager(ctx_manager):
def decorator(fn):
@wraps(fn)
def context_manager_wrapper(*a, **kw):
with ctx_manager:
return fn(*a, **kw)
context_manager_wrapper.djpt_patched = True
return context_manager_wrapper
return decorator
def lazy_property(func):
attribute = '_lazy_' + func.__name__
@property
@functools.wraps(func)
def wrapper(self):
if not hasattr(self, attribute):
setattr(self, attribute, func(self))
return getattr(self, attribute)
return wrapper
# ?bias_shape ? None??????bias
def parse_args(**k):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
pv = {}
for key, val in k.iteritems():
arg_type, default_val = val
pv[key] = parse_single_arg(key, kwargs, arg_type, default_val)
kwargs.update(pv)
return func(*args, **kwargs)
return wrapper
return decorator