Python operator 模块,or_() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用operator.or_()。
def get_foreign(self, queryset, search, filters):
# Filter with search string
query = [Q(code__icontains=search), ]
for lang in settings.LANGUAGES_DATABASES:
query.append(Q(**{"{}__name__icontains".format(lang.lower()): search}))
qs = queryset.filter(
reduce(operator.or_, query)
)
category = filters.get('ProductForm_category', None)
if category is None:
category = filters.get('ProductFormCreate_category', None)
if category is None:
category = filters.get('ProductFormCreateCustom_category', None)
if category:
qs = qs.filter(category__pk=category)
return qs[:settings.LIMIT_FOREIGNKEY]
def query_or(cls, query, *values_list, **annotations):
pop_annotations = False
if 'pop_annotations' in annotations:
pop_annotations = annotations['pop_annotations']
annotations.pop('pop_annotations')
annotated_keys = annotations.values()
annotations = {key: F(value) for key, value in annotations.items()}
if isinstance(query, Iterable):
query = reduce(or_, query)
result = cls.objects.filter(query).values(*values_list).annotate(**annotations)
if pop_annotations:
for querydict in result:
for value in annotated_keys:
querydict.pop(value)
return result
# tipos de impuestos aplicables a los productos
def apply_query_in(cls, queryset, options, value):
"""Apply `in` functional query.
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
__values = cls.split_lookup_value(value)
__queries = []
for __value in __values:
__queries.append(
Q('term', **{options['field']: __value})
)
if __queries:
queryset = queryset.query(
six.moves.reduce(operator.or_, __queries)
)
return queryset
def test_ror_scalar(self):
self.check_array_scalar_op(operator.or_, swap=True)
def get_generic_filters(filters):
"""
Returns `django.db.models.query.QuerySet` filters based on request query filters
"""
where = [Q()]
# Generates Django query filter
if filters.get('where'):
keys = set(k for k in filters['where'])
if 'in' in keys:
for param in filters['where']['in']:
for key, val in param.iteritems():
field = reduce(lambda a, kv: a.replace(*kv), TICKET_FILTER_MAPPING, key)
where.append(reduce(operator.or_, [Q(**{field: i}) for i in val]))
if 'like' in keys:
like = []
for param in filters['where']['like']:
for key, val in param.iteritems():
field = reduce(lambda a, kv: a.replace(*kv), TICKET_FILTER_MAPPING, key)
field = field + '__icontains'
like.append(Q(**{field: val[0]}))
if len(like):
where.append(reduce(operator.or_, like))
return where
def toolbar(**kwargs):
""" Get reports/tickets stats
"""
user = kwargs['user']
where = [Q()]
if not AbusePermission.objects.filter(user=user.id).count():
raise Forbidden('You are not allowed to see any category')
user_specific_where = _get_user_specific_where(user)
user_specific_where = reduce(operator.or_, user_specific_where)
where.append(user_specific_where)
# Aggregate all filters
where = reduce(operator.and_, where)
response = _get_toolbar_count(where, user)
return response
def parse_autoblock(data):
# type: (str) -> set
def _parse(item):
# Try to parse item as a single IP
try:
ipaddress.IPv4Address(item)
return {item}
except ipaddress.AddressValueError:
pass
# Try parse item as ip range: ip1-ip2
try:
first_ip, last_ip = [utils.ip2int(i) for i in item.split('-')]
return {utils.int2ip(n) for n in range(first_ip, last_ip + 1)}
except ValueError:
raise APIError(
'Exclude IP\'s are expected to be in the form of '
'10.0.0.1,10.0.0.4 or 10.1.0.10-10.1.1.54 or both '
'comma-separated')
ip_sets = (_parse(unicode(d)) for d in data.split(','))
return reduce(operator.or_, ip_sets)
def _build_logical_expression(self, grammar, terminal_component_names):
terminal_component_symbols = eval("symbols('%s')"%(' '.join(terminal_component_names)))
if isinstance(terminal_component_symbols, Symbol):
terminal_component_symbols = [terminal_component_symbols]
name_to_symbol = {terminal_component_names[i]:symbol for i, symbol in enumerate(terminal_component_symbols)}
terminal_component_names = set(terminal_component_names)
op_to_symbolic_operation = {'not':operator.invert, 'concat':operator.and_, 'gap':operator.and_, 'union':operator.or_, 'intersect':operator.and_}
def logical_expression_builder(component):
if component['id'] in terminal_component_names:
return name_to_symbol[component['id']]
else:
children = component['components']
return reduce(op_to_symbolic_operation[component['operation']],[logical_expression_builder(child) for child in children])
return simplify(logical_expression_builder(grammar))
def normalize(self, newvars=None):
"""Rename auto-generated unique variables"""
def get_indiv_vars(e):
if isinstance(e, IndividualVariableExpression):
return set([e])
elif isinstance(e, AbstractVariableExpression):
return set()
else:
return e.visit(get_indiv_vars,
lambda parts: reduce(operator.or_, parts, set()))
result = self
for i,e in enumerate(sorted(get_indiv_vars(self), key=lambda e: e.variable)):
if isinstance(e,EventVariableExpression):
newVar = e.__class__(Variable('e0%s' % (i+1)))
elif isinstance(e,IndividualVariableExpression):
newVar = e.__class__(Variable('z%s' % (i+1)))
else:
newVar = e
result = result.replace(e.variable, newVar, True)
return result
def filter_queryset(self, request, queryset, view):
search_fields = getattr(view, 'search_fields', None)
search_terms = self.get_search_terms(request)
if not search_fields or not search_terms:
return queryset
orm_lookups = [
self.construct_search(six.text_type(search_field))
for search_field in search_fields
]
base = queryset
for search_term in search_terms:
queries = [
models.Q(**{orm_lookup: search_term})
for orm_lookup in orm_lookups
]
queryset = queryset.filter(reduce(operator.or_, queries))
# Filtering against a many-to-many field requires us to
# call queryset.distinct() in order to avoid duplicate items
# in the resulting queryset.
return distinct(queryset, base)
def __init__(self, code, objects=None):
self._OPERATORS = [
('|', operator.or_),
('^', operator.xor),
('&', operator.and_),
('>>', operator.rshift),
('<<', operator.lshift),
('-', operator.sub),
('+', operator.add),
('%', operator.mod),
('/', operator.truediv),
('*', operator.mul),
]
self._ASSIGN_OPERATORS = [(op + '=', opfunc)
for op, opfunc in self._OPERATORS]
self._ASSIGN_OPERATORS.append(('=', lambda cur, right: right))
self._VARNAME_PATTERN = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
if objects is None:
objects = {}
self.code = code
self._functions = {}
self._objects = objects
def number_of_args(fn):
"""Return the number of positional arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return number_of_args(fn.__wrapped__)
if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()):
return None
else:
return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values())
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly
UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos]
BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf]
TERNARY_OPS = [op.setitem]
if fn in UNARY_OPS:
return 1
elif fn in BINARY_OPS:
return 2
elif fn in TERNARY_OPS:
return 3
else:
raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def _set_actions(self, ctx, op, flags, *, colour):
flags = unique(flags)
config = await self._check_config(ctx)
reduced = reduce(operator.or_, flags)
config.events = op(config.events, reduced)
await ctx.session.add(config)
enabled_flags = ', '.join(f.name for f in ActionFlag if config.events & f)
embed = (discord.Embed(colour=colour, description=', '.join(f.name for f in flags))
.set_author(name=f'Successfully {ctx.command.name}d the following actions')
.add_field(name='The following mod actions will now be logged',
value=enabled_flags, inline=False)
)
await ctx.send(embed=embed)
def registrations_matching_complex_pattern():
"""
Q objects generation can be also automated
"""
predicate_list = [
('event__name__endswith', 'python'),
('member__community__name__contains', 'python')
]
q_object_list = [Q(predicate) for predicate in predicate_list]
pattern = reduce(OR, q_object_list)
registration_number = Registration.objects.filter(pattern).count()
print("{nbr} match the pattern 'python'".format(nbr=registration_number))
## When(), Case() expressions
##############################
def filter(self, *filters):
"""Apply filters to the stream.
This method returns a new stream with the given filters applied. The
filters must be callables that accept the stream object as parameter,
and return the filtered stream.
The call::
stream.filter(filter1, filter2)
is equivalent to::
stream | filter1 | filter2
:param filters: one or more callable objects that should be applied as
filters
:return: the filtered stream
:rtype: `Stream`
"""
return reduce(operator.or_, (self,) + filters)
def _translate_query(self, query):
if isinstance(query, queries.CompoundQuery):
return functools.reduce({
queries.Or: operator.or_,
queries.And: operator.and_
}[query.__class__], [
self._translate_query(q)
for q in query.queries
])
key = query.key
if key.startswith('data.') and isinstance(query.value, str):
key += '.raw'
return elasticsearch_dsl.F({
queries.Equal: 'term'
}[query.__class__], **{key: query.value})
def list(self, filter, sort, page, page_size, user):
if not user.permissions & Permissions.ADMIN:
if not user.uid:
raise exceptions.Unauthorized()
query = functools.reduce(operator.or_, [
Q('data.permissions.*', 'and', Permissions.READ),
Q('data.permissions.{0.type}-*'.format(user), 'and', Permissions.READ),
Q('data.permissions.{0.type}-{0.provider}-*'.format(user), 'and', Permissions.READ),
Q('data.permissions.{0.type}-{0.provider}-{0.id}'.format(user), 'and', Permissions.READ),
])
if filter:
filter &= query
else:
filter = query
return super().list(filter, sort, page, page_size, user)
def normalize(self, newvars=None):
"""Rename auto-generated unique variables"""
def get_indiv_vars(e):
if isinstance(e, IndividualVariableExpression):
return set([e])
elif isinstance(e, AbstractVariableExpression):
return set()
else:
return e.visit(get_indiv_vars,
lambda parts: reduce(operator.or_, parts, set()))
result = self
for i,e in enumerate(sorted(get_indiv_vars(self), key=lambda e: e.variable)):
if isinstance(e,EventVariableExpression):
newVar = e.__class__(Variable('e0%s' % (i+1)))
elif isinstance(e,IndividualVariableExpression):
newVar = e.__class__(Variable('z%s' % (i+1)))
else:
newVar = e
result = result.replace(e.variable, newVar, True)
return result
def get_searched_queryset(self, qs):
model = self.model
term = self.GET["term"]
try:
term = model.autocomplete_term_adjust(term)
except AttributeError:
pass
search_fields = get_autocomplete_search_fields(self.model)
if search_fields:
for word in term.split():
search = [models.Q(**{smart_text(item): smart_text(word)}) for item in search_fields]
search_qs = QuerySet(model)
search_qs.query.select_related = qs.query.select_related
search_qs = search_qs.filter(reduce(operator.or_, search))
qs &= search_qs
else:
qs = model.objects.none()
return qs
def normalize(self, newvars=None):
"""Rename auto-generated unique variables"""
def get_indiv_vars(e):
if isinstance(e, IndividualVariableExpression):
return set([e])
elif isinstance(e, AbstractVariableExpression):
return set()
else:
return e.visit(get_indiv_vars,
lambda parts: reduce(operator.or_, parts, set()))
result = self
for i,e in enumerate(sorted(get_indiv_vars(self), key=lambda e: e.variable)):
if isinstance(e,EventVariableExpression):
newVar = e.__class__(Variable('e0%s' % (i+1)))
elif isinstance(e,IndividualVariableExpression):
newVar = e.__class__(Variable('z%s' % (i+1)))
else:
newVar = e
result = result.replace(e.variable, newVar, True)
return result
def __or__(self, trc):
return self.apply_op2(trc, operator.or_)
def __invert__(self):
members, uncovered = _decompose(self.__class__, self._value_)
inverted_members = [
m for m in self.__class__
if m not in members and not m._value_ & self._value_
]
inverted = reduce(_or_, inverted_members, self.__class__(0))
return self.__class__(inverted)
def get_foreign(self, queryset, search, filters):
# Filter with search string
query = [Q(code__icontains=search), ]
for lang in settings.LANGUAGES_DATABASES:
query.append(Q(**{"{}__name__icontains".format(lang.lower()): search}))
qs = queryset.filter(
reduce(operator.or_, query)
)
family = filters.get('FeatureForm_family', None)
if family is None:
family = filters.get('AttributeForm_family', None)
if family is None:
family = filters.get('FeatureSpecialForm_family', None)
if family is None:
family = filters.get('ProductForm_family', None)
if family is None:
family = filters.get('ProductFormCreate_family', None)
if family is None:
family = filters.get('ProductFormCreateCustom_family', None)
if family:
qs = qs.filter(family__pk=family)
return qs[:settings.LIMIT_FOREIGNKEY]
# ###########################################
def has_result_of_status(self, status, results):
inbound_str = self.items["operation"]["inbound"]
query = Query()
result_q = reduce(or_, [
query.result == a_result for a_result in results])
querys = [query.inbound == inbound_str,
query.inbound_status_id == status.get_status_id(), result_q]
combined_query = reduce(and_, querys)
return self.search_db(combined_query)
def get_result_summaries_by_results(self, results):
query = Query()
querys = [query.result == a_result for a_result in results]
combined_query = reduce(or_, querys)
return self.search_db(combined_query)
def exec(self, proc: Processor):
self.proc = proc
self.args = map(self.expand, self.o_args) # load register values
val = reduce(self.operator, self.args) # apply operator
if operator is addc or operator is subc:
val += int(self.proc.external.carry)
proc.memory.set_register(self.register, val) # set result
if operator is operator.and_ or operator is operator.or_ or operator is operator.xor:
self.proc.set_carry(False)
# increment pc
self.proc.manager.next()
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
return functools.reduce(operator.or_, [cls.interpret(nodelist[i]) for i in range(1,len(nodelist),2)])
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
return functools.reduce(operator.or_, [cls.interpret(nodelist[i]) for i in range(1,len(nodelist),2)])
def construct_nested_search(self, request, view):
"""Construct nested search.
:param request: Django REST framework request.
:param queryset: Base queryset.
:param view: View.
:type request: rest_framework.request.Request
:type queryset: elasticsearch_dsl.search.Search
:type view: rest_framework.viewsets.ReadOnlyModelViewSet
:return: Updated queryset.
:rtype: elasticsearch_dsl.search.Search
"""
if not hasattr(view, 'search_nested_fields'):
return []
query_params = self.get_search_query_params(request)
__queries = []
for search_term in query_params:
for path, fields in view.search_nested_fields.items():
queries = []
for field in fields:
field_key = "{}.{}".format(path, field)
queries.append(
Q("match", **{field_key: search_term})
)
__queries.append(
Q("nested",
path=path,
query=six.moves.reduce(operator.or_, queries)
)
)
return __queries
def apply_query_exclude(cls, queryset, options, value):
"""Apply `exclude` functional query.
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
__values = cls.split_lookup_value(value)
__queries = []
for __value in __values:
__queries.append(
~Q('term', **{options['field']: __value})
)
if __queries:
queryset = queryset.query(
six.moves.reduce(operator.or_, __queries)
)
return queryset
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.or_, items)
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.or_, items)
def test_menu_action(self):
'Test that menu actions are accessible to menu\'s group'
with Transaction().start(DB_NAME, USER, context=CONTEXT):
pool = Pool()
Menu = pool.get('ir.ui.menu')
ModelData = pool.get('ir.model.data')
module_menus = ModelData.search([
('model', '=', 'ir.ui.menu'),
('module', '=', self.module),
])
menus = Menu.browse([mm.db_id for mm in module_menus])
for menu, module_menu in zip(menus, module_menus):
if not menu.action_keywords:
continue
menu_groups = set(menu.groups)
actions_groups = reduce(operator.or_,
(set(k.action.groups) for k in menu.action_keywords
if k.keyword == 'tree_open'))
if not actions_groups:
continue
assert menu_groups <= actions_groups, (
'Menu "%(menu_xml_id)s" actions are not accessible to '
'%(groups)s' % {
'menu_xml_id': module_menu.fs_id,
'groups': ','.join(g.name
for g in menu_groups - actions_groups),
})
def get_search_results(self, request, queryset, search_term):
"""
Returns a tuple containing a queryset to implement the search,
and a boolean indicating if the results may contain duplicates.
"""
# Apply keyword searches.
def construct_search(field_name):
if field_name.startswith('^'):
return "%s__istartswith" % field_name[1:]
elif field_name.startswith('='):
return "%s__iexact" % field_name[1:]
elif field_name.startswith('@'):
return "%s__search" % field_name[1:]
else:
return "%s__icontains" % field_name
use_distinct = False
search_fields = self.get_search_fields(request)
if search_fields and search_term:
orm_lookups = [construct_search(str(search_field))
for search_field in search_fields]
for bit in search_term.split():
or_queries = [models.Q(**{orm_lookup: bit})
for orm_lookup in orm_lookups]
queryset = queryset.filter(reduce(operator.or_, or_queries))
if not use_distinct:
for search_spec in orm_lookups:
if lookup_needs_distinct(self.opts, search_spec):
use_distinct = True
break
return queryset, use_distinct
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
items = [
cls.interpret(nodelist[i])
for i in range(1, len(nodelist), 2)
]
return functools.reduce(operator.or_, items)
def get_search_results(self, request, queryset, search_term):
"""
Returns a tuple containing a queryset to implement the search,
and a boolean indicating if the results may contain duplicates.
"""
# Apply keyword searches.
def construct_search(field_name):
if field_name.startswith('^'):
return "%s__istartswith" % field_name[1:]
elif field_name.startswith('='):
return "%s__iexact" % field_name[1:]
elif field_name.startswith('@'):
return "%s__search" % field_name[1:]
else:
return "%s__icontains" % field_name
use_distinct = False
search_fields = self.get_search_fields(request)
if search_fields and search_term:
orm_lookups = [construct_search(str(search_field))
for search_field in search_fields]
for bit in search_term.split():
or_queries = [models.Q(**{orm_lookup: bit})
for orm_lookup in orm_lookups]
queryset = queryset.filter(reduce(operator.or_, or_queries))
if not use_distinct:
for search_spec in orm_lookups:
if lookup_needs_distinct(self.opts, search_spec):
use_distinct = True
break
return queryset, use_distinct
def orwhere(self, *expressions):
self._where = self._add_query_clauses(
self._where, expressions, operator.or_)
def test(cls, nodelist):
# MUST NOT short-circuit evaluation, or invalid syntax can be skipped!
return functools.reduce(operator.or_, [cls.interpret(nodelist[i]) for i in range(1,len(nodelist),2)])
def union(dfa1, dfa2):
return product(dfa1, operator.or_, dfa2)
def get_query(self, request, term):
qs = self.get_queryset()
if term:
search_filters = []
if len(term.split()) == 1:
if self.search_fields:
for field in self.search_fields:
search_filters.append(Q(**{field: term}))
qs = qs.filter(reduce(operator.or_, search_filters))
else:
# Accounts for 'John Doe' term; will compare against get_full_name
term = term.lower()
qs = [x for x in qs if term in x.get_full_name().lower()]
return qs
def get_search_filter(self, keyword):
search_fields = getattr(self, 'search_fields', None)
if search_fields is None:
search_fields = self.fields # Assume all fields
field_queries = list(
zip(map(lambda x: '%s__icontains' % x, search_fields),
(keyword,)*len(search_fields))
)
lookups = [Q(x) for x in field_queries]
return reduce(operator.or_, lookups)
def _zero_mantissa(dval):
"""Determine whether the mantissa bits of the given double are all
zero."""
bb = _double_as_bytes(dval)
return ((bb[1] & 0x0f) | reduce(operator.or_, bb[2:])) == 0
##
## Functions to test for IEEE 754 special values
##
def get_search_results(self, request, queryset, search_term):
"""
Returns a tuple containing a queryset to implement the search,
and a boolean indicating if the results may contain duplicates.
"""
# Apply keyword searches.
def construct_search(field_name):
if field_name.startswith('^'):
return "%s__istartswith" % field_name[1:]
elif field_name.startswith('='):
return "%s__iexact" % field_name[1:]
elif field_name.startswith('@'):
return "%s__search" % field_name[1:]
else:
return "%s__icontains" % field_name
use_distinct = False
search_fields = self.get_search_fields(request)
if search_fields and search_term:
orm_lookups = [construct_search(str(search_field))
for search_field in search_fields]
for bit in search_term.split():
or_queries = [models.Q(**{orm_lookup: bit})
for orm_lookup in orm_lookups]
queryset = queryset.filter(reduce(operator.or_, or_queries))
if not use_distinct:
for search_spec in orm_lookups:
if lookup_needs_distinct(self.opts, search_spec):
use_distinct = True
break
return queryset, use_distinct
def test_or_scalarzero(self):
self.check_array_scalarzero_op(operator.or_)
def test_ror_scalarzero(self):
self.check_array_scalarzero_op(operator.or_, swap=True)
def test_doubly_broadcasted_or(self):
self.check_array_doubly_broadcasted_op(operator.or_)
def get_user_filters(user):
"""
Returns `django.db.models.query.QuerySet` filters based on allowed category for given user
"""
where = [Q()]
user_specific_where = []
abuse_permissions = AbusePermission.objects.filter(user=user.id)
for perm in abuse_permissions:
if perm.profile.name == 'Expert':
user_specific_where.append(Q(category=perm.category))
elif perm.profile.name == 'Advanced':
user_specific_where.append(Q(category=perm.category, confidential=False))
elif perm.profile.name == 'Beginner':
user_specific_where.append(Q(
priority__in=USER_FILTERS_BEGINNER_PRIORITY,
category=perm.category,
confidential=False,
escalated=False,
moderation=False
))
if len(user_specific_where):
user_specific_where = reduce(operator.or_, user_specific_where)
where.append(user_specific_where)
else:
# If no category allowed
where.append(Q(category=None))
return where