Python operator 模块,__and__() 实例源码
我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用operator.__and__()。
def merge_conditions_array(conds):
"""If there are multiple affected samples sharing the same parents,
the conditions can be redundant. Simplify the conditions array so that
there is at most one for each genotype/sample. If there are several constraints
for the same genotype, check that they are compatible and take the strongest
(lowest bit value).
:param conds: an array of couples [sample_index, genotype_bit]
:rtype: same as input
"""
merged = []
if not conds:
return merged
# Group by sample index, and get a single common bit for all conds on that sample
conds.sort(key=itemgetter(0))
for idx,group in itertools.groupby(conds, itemgetter(0)):
genbits = [x[1] for x in group] # only the genotype bit
common_bits = reduce(__and__, genbits)
merged.append((idx, common_bits))
return merged
def finish(self, event):
self.targets[event.minion]['finished'] = True
self.targets[event.minion]['success'] = event.success and event.retcode == 0
self.targets[event.minion]['event'] = event
if reduce(operator.__and__, [t['finished'] for t in self.targets.values()]):
self.success = reduce(
operator.__and__, [t['success'] for t in self.targets.values()])
self.finished = True
def intersection_update(self, other):
if isinstance(other, BitSet):
return self._logic(self, operator.__and__, other)
discard = self.discard
for n in self:
if n not in other:
discard(n)
def intersection(self, other):
if isinstance(other, BitSet):
return self._logic(self.copy(), operator.__and__, other)
return BitSet(source=(n for n in self if n in other))
def extract(main_summary, new_profile, start_ds, period, slack, is_sampled):
"""
Extract data from the main summary table taking into account the
retention period and submission latency.
:param main_summary: dataframe pointing to main_summary.v4
:param new_profile: dataframe pointing to new_profile_ping_parquet
:param start_ds: start date of the retention period
:param period: length of the retention period
:param slack: slack added to account for submission latency
:return: a dataframe containing the raw subset of data
"""
start = arrow.get(start_ds, DS_NODASH)
predicates = [
(F.col("subsession_start_date") >= utils.format_date(start, DS)),
(F.col("subsession_start_date") < utils.format_date(start, DS, period)),
(F.col("submission_date_s3") >= utils.format_date(start, DS_NODASH)),
(F.col("submission_date_s3") < utils.format_date(start, DS_NODASH,
period + slack)),
]
if is_sampled:
predicates.append((F.col("sample_id") == "57"))
extract_ms = (
main_summary
.where(reduce(operator.__and__, predicates))
.select(SOURCE_COLUMNS)
)
np = clean_new_profile(new_profile)
extract_np = (
np
.where(reduce(operator.__and__, predicates))
.select(SOURCE_COLUMNS)
)
coalesced_ms = coalesce_new_profile_attribution(extract_ms, np)
return coalesced_ms.union(extract_np)
def valid_pcd(pcd, client_date):
"""Determine if the profile creation date column is valid given the date
reported by the client."""
pcd_format = F.date_format(pcd, 'yyyy-MM-dd')
is_valid_pcd = [
pcd_format.isNotNull(),
(pcd_format > DEFAULT_DATE),
(pcd <= client_date),
]
return (
F.when(reduce(operator.__and__, is_valid_pcd), pcd)
.otherwise(F.lit(None))
)
def __and__(self, rhs):
return op_binary(self, rhs, operator.__and__)
def __rand__(self, lhs):
return op_binary(lhs, self, operator.__and__)
def __iand__(self, v):
return self.op_binary_inplace(v, operator.__and__)
def __and__(self, other):
return self.intersection(other)
def intersection_update(self, other):
if isinstance(other, BitSet):
return self._logic(self, operator.__and__, other)
discard = self.discard
for n in self:
if n not in other:
discard(n)
def intersection(self, other):
if isinstance(other, BitSet):
return self._logic(self.copy(), operator.__and__, other)
return BitSet(source=(n for n in self if n in other))
def __and__(self, other):
return self.intersection(other)
def intersection_update(self, other):
if isinstance(other, BitSet):
return self._logic(self, operator.__and__, other)
discard = self.discard
for n in self:
if n not in other:
discard(n)
def intersection(self, other):
if isinstance(other, BitSet):
return self._logic(self.copy(), operator.__and__, other)
return BitSet(source=(n for n in self if n in other))
def __and__(self, other):
with self._lock:
return operator.__and__(self.__wrapped__, other)
def __rand__(self, other):
with self._lock:
return operator.__and__(other, self.__wrapped__)
def __eq__(self, other):
"""
Determine whether two objects are equal.
:other: other object
:returns: boolean
"""
return type(self) == type(other) and len(self) == len(other) and \
reduce(__and__, map(lambda z: z[0] == z[1], zip(self, other)), True)
def __and__(self, other):
return self.intersection(other)
def intersection_update(self, other):
if isinstance(other, BitSet):
return self._logic(self, operator.__and__, other)
discard = self.discard
for n in self:
if n not in other:
discard(n)
def intersection(self, other):
if isinstance(other, BitSet):
return self._logic(self.copy(), operator.__and__, other)
return BitSet(source=(n for n in self if n in other))
def __and__(self, regex):
"""
Create new optimized version of a & b.
Returns None if there is no interesting optimization.
>>> RegexEmpty() & RegexString('a')
<RegexString 'a'>
"""
if regex.__class__ == RegexEmpty:
return self
new_regex = self._and(regex)
if new_regex:
return new_regex
else:
return RegexAnd((self, regex))
def join(cls, regex):
"""
>>> RegexAnd.join( (RegexString('Big '), RegexString('fish')) )
<RegexString 'Big fish'>
"""
return _join(operator.__and__, regex)
def merge(old, new):
prims = ImmutableTreeList.merge(old.prims, new.prims, operator.__and__)
arrs = ImmutableTreeList.merge(old.arrs, new.arrs, arrays.merge)
tainted = ImmutableTreeList.merge(old.tainted, new.tainted, operator.__or__)
if prims is old.prims and arrs is old.arrs and tainted is old.tainted:
return old
return TypeInfo(prims, arrs, tainted)
def all(items):
return reduce(operator.__and__, items)
# --- test if interpreter supports yield keyword ---
def intersection_update(self, other):
if isinstance(other, BitSet):
return self._logic(self, operator.__and__, other)
discard = self.discard
for n in self:
if n not in other:
discard(n)
def intersection(self, other):
if isinstance(other, BitSet):
return self._logic(self.copy(), operator.__and__, other)
return BitSet(source=(n for n in self if n in other))
def test_and(self):
self._test_incompatible_types_fail(operator.__and__)
self.assertEqual(no_flags & no_flags, no_flags)
self.assertEqual(no_flags & all_flags, no_flags)
self.assertEqual(no_flags & f0, no_flags)
self.assertEqual(no_flags & f1, no_flags)
self.assertEqual(no_flags & f2, no_flags)
self.assertEqual(no_flags & f01, no_flags)
self.assertEqual(no_flags & f02, no_flags)
self.assertEqual(no_flags & f12, no_flags)
self.assertEqual(f0 & no_flags, no_flags)
self.assertEqual(f0 & all_flags, f0)
self.assertEqual(f0 & f0, f0)
self.assertEqual(f0 & f1, no_flags)
self.assertEqual(f0 & f2, no_flags)
self.assertEqual(f0 & f01, f0)
self.assertEqual(f0 & f02, f0)
self.assertEqual(f0 & f12, no_flags)
self.assertEqual(f01 & no_flags, no_flags)
self.assertEqual(f01 & all_flags, f01)
self.assertEqual(f01 & f0, f0)
self.assertEqual(f01 & f1, f1)
self.assertEqual(f01 & f2, no_flags)
self.assertEqual(f01 & f01, f01)
self.assertEqual(f01 & f02, f0)
self.assertEqual(f01 & f12, f1)
self.assertEqual(all_flags & no_flags, no_flags)
self.assertEqual(all_flags & all_flags, all_flags)
self.assertEqual(all_flags & f0, f0)
self.assertEqual(all_flags & f1, f1)
self.assertEqual(all_flags & f2, f2)
self.assertEqual(all_flags & f01, f01)
self.assertEqual(all_flags & f02, f02)
self.assertEqual(all_flags & f12, f12)