Python itertools 模块,ifilter() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.ifilter()。
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0)
def lookup(self, key):
"""
Return the list of values in the RDD for key `key`. This operation
is done efficiently if the RDD has a known partitioner by only
searching the partition that the key maps to.
>>> l = range(1000)
>>> rdd = sc.parallelize(zip(l, l), 10)
>>> rdd.lookup(42) # slow
[42]
>>> sorted = rdd.sortByKey()
>>> sorted.lookup(42) # fast
[42]
>>> sorted.lookup(1024)
[]
>>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
>>> list(rdd2.lookup(('a', 'b'))[0])
['c']
"""
values = self.filter(lambda kv: kv[0] == key).values()
if self.partitioner is not None:
return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)])
return values.collect()
def load_into_db(self, table):
def callback(response):
""" Loads the CSV data contained into the response body and puts it into the appropriate table
"""
data = itertools.ifilter(lambda x: len(x) == 2,
(row.split(',', 1) for row in response.body_as_unicode().split('\r\n')[1:]))
cur = self.db_connection.cursor()
try:
cur.executemany('INSERT INTO %s(pk, data) VALUES (?, ?)' % table, data)
except sqlite3.Error:
self.db_connection.rollback()
raise
else:
self.db_connection.commit()
finally:
cur.close()
for each in self.finalize_data(table):
yield each
return callback
def everything(use_cache=False):
'''Return all the tags within the database as (globals, contents, frames).'''
if use_cache:
g, f = cached()
else:
print >>output, '--> Grabbing globals...'
g = {ea : d for ea, d in globals()}
print >>output, '--> Grabbing contents from all functions...'
res = (function(ea) for ea in db.functions())
f = {}
map(f.update, itertools.imap(dict, itertools.ifilter(None, res)))
print >>output, '--> Grabbing frames from all functions...'
h = {ea : d for ea, d in frames()}
return (g, f, h)
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def phase1(self): # Compute common names
a = dict(izip(imap(os.path.normcase, self.left_list), self.left_list))
b = dict(izip(imap(os.path.normcase, self.right_list), self.right_list))
self.common = map(a.__getitem__, ifilter(b.__contains__, a))
self.left_only = map(a.__getitem__, ifilterfalse(b.__contains__, a))
self.right_only = map(b.__getitem__, ifilterfalse(a.__contains__, b))
def glob(self, prefix, pattern):
'''
Given a path prefix and a pattern, iterate over matching paths.
e.g.
paths = list(s3.glob(
prefix='s3://bodylabs-ants-go-marching/output/feet_on_floor/eff2a0e/',
pattern='*_alignment.ply'
))
'''
import fnmatch
import functools
import itertools
predicate = functools.partial(fnmatch.fnmatch, pat=prefix + pattern)
listing = self.ls(prefix, return_full_urls=True)
return itertools.ifilter(predicate, listing)
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0).count()
def updateStateByKey(self, updateFunc, numPartitions=None):
"""
Return a new "state" DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
@param updateFunc: State update function. If this function returns None, then
corresponding state key-value pair will be eliminated.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
def reduceFunc(t, a, b):
if a is None:
g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
else:
g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
return state.filter(lambda k_v: k_v[1] is not None)
jreduceFunc = TransformFunction(self._sc, reduceFunc,
self._sc.serializer, self._jrdd_deserializer)
dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
def lookup(self, key):
"""
Return the list of values in the RDD for key `key`. This operation
is done efficiently if the RDD has a known partitioner by only
searching the partition that the key maps to.
>>> l = range(1000)
>>> rdd = sc.parallelize(zip(l, l), 10)
>>> rdd.lookup(42) # slow
[42]
>>> sorted = rdd.sortByKey()
>>> sorted.lookup(42) # fast
[42]
>>> sorted.lookup(1024)
[]
>>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
>>> list(rdd2.lookup(('a', 'b'))[0])
['c']
"""
values = self.filter(lambda kv: kv[0] == key).values()
if self.partitioner is not None:
return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)])
return values.collect()
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def testSVM(clf, dataSet):
labels, instances = dataSet.getLabelsAndInstances()
predictionsLoL = map(clf.predict, instances)
numWrong = 0
numPred = 0
for i in range(len(labels)):
l = labels[i]
predList = predictionsLoL[i]
wrongList = it.ifilter(lambda x: x!=l, predList)
numWrong += len(wrongList)
numPred += len(predList)
print "Wrong: ", numWrong
print "Predicted: ", numPred
print "Percent: ", float(numWrong)/numPred
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def visit_ClassDef(self, node):
obj = {
'type': 'class',
'name': node.name,
'docstring': self.get_docstring(node),
'bases': list(ifilter(lambda k: k.get('name') != 'object', [
{'name': i.id} if isinstance(i, ast.Name) else self.visit(i) for i in node.bases
])),
'attributes': [],
'functions': [],
}
for node in imap(self.visit, node.body):
if node['type'] == 'function':
obj['functions'].append(node)
elif node['type'] == 'assign':
obj['attributes'].append(node)
return obj
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __load_classes( self ):
classes = {}#unique symbol id : class decl
is_udt = lambda smbl: smbl.symTag == msdia.SymTagUDT
self.logger.info( 'building udt objects' )
for udt_smbl in itertools.ifilter( is_udt, self.symbols.itervalues() ):
classes[udt_smbl.symIndexId] = self.__create_class(udt_smbl)
self.logger.info( 'building udt objects(%d) - done', len(classes) )
self.logger.info( 'integrating udt objects with namespaces' )
does_parent_exists = self.parent_exists_t( self.global_ns, classes, self.__id2decl )
while classes:
to_be_integrated = len( classes )
self.logger.info( 'there are %d classes to go', len( classes ) )
to_be_deleted = filter( does_parent_exists, classes.itervalues() )
map( self.__update_decls_tree, to_be_deleted )
map( lambda decl: classes.pop( decl.dia_symbols[0].symIndexId )
, to_be_deleted )
if not ( to_be_integrated - len( classes ) ):
for cls in classes.itervalues():
self.logger.warning( 'unable to integrate class "%s" into declarations tree', cls.dia_symbols[0].uname )
break
self.logger.info( 'integrating udt objects with namespaces - done' )
def __load_base_classes( self ):
make_hi = declarations.hierarchy_info_t
is_base_class = lambda smbl: smbl.symTag == msdia.SymTagBaseClass \
and False == smbl.indirectVirtualBaseClass
self.logger.info( 'building class hierarchies' )
for count, smbl in enumerate( itertools.ifilter( is_base_class, self.symbols.itervalues() ) ):
base_id = smbl.type.symIndexId
derived_id = smbl.classParentId
hi_base = make_hi( self.__id2decl[base_id]
, self.__guess_access_type( smbl )
, bool( smbl.virtualBaseClass ) )
self.__id2decl[ derived_id ].bases.append( hi_base )
hi_derived = make_hi( self.__id2decl[derived_id]
, self.__guess_access_type( smbl )
, bool( smbl.virtualBaseClass ) )
self.__id2decl[ base_id ].derived.append( hi_derived )
self.logger.info( 'building class hierarchies(%d) - done', count )
def intersection(self, other):
"""Return the intersection of two sets as a new set.
(I.e. all elements that are in both sets.)
"""
if not isinstance(other, BaseSet):
other = Set(other)
if len(self) <= len(other):
little, big = self, other
else:
little, big = other, self
common = ifilter(big._data.__contains__, little)
return self.__class__(common)
def difference_update(self, other):
"""Remove all elements of another set from this set."""
data = self._data
if not isinstance(other, BaseSet):
other = Set(other)
if self is other:
self.clear()
for elt in ifilter(data.__contains__, other):
del data[elt]
# Python dict-like mass mutations: update, clear
def phase1(self): # Compute common names
a = dict(izip(imap(os.path.normcase, self.left_list), self.left_list))
b = dict(izip(imap(os.path.normcase, self.right_list), self.right_list))
self.common = map(a.__getitem__, ifilter(b.__contains__, a))
self.left_only = map(a.__getitem__, ifilterfalse(b.__contains__, a))
self.right_only = map(b.__getitem__, ifilterfalse(a.__contains__, b))
def iteritems(self):
definitions = type(self).configuration_setting_definitions
version = self.command.protocol_version
return ifilter(
lambda (name, value): value is not None, imap(
lambda setting: (setting.name, setting.__get__(self)), ifilter(
lambda setting: setting.is_supported_by_protocol(version), definitions)))
def iteritems(self):
iteritems = SearchCommand.ConfigurationSettings.iteritems(self)
version = self.command.protocol_version
if version == 1:
if self.required_fields is None:
iteritems = ifilter(lambda (name, value): name != 'clear_required_fields', iteritems)
else:
iteritems = ifilter(lambda (name, value): name != 'distributed', iteritems)
if self.distributed:
iteritems = imap(
lambda (name, value): (name, 'stateful') if name == 'type' else (name, value), iteritems)
return iteritems
# endregion
def iteritems(self):
iteritems = SearchCommand.ConfigurationSettings.iteritems(self)
version = self.command.protocol_version
if version == 2:
iteritems = ifilter(lambda (name, value): name != 'distributed', iteritems)
if self.distributed and self.type == 'streaming':
iteritems = imap(
lambda (name, value): (name, 'stateful') if name == 'type' else (name, value), iteritems)
return iteritems
def iteritems(self):
definitions = type(self).configuration_setting_definitions
version = self.command.protocol_version
return ifilter(
lambda (name, value): value is not None, imap(
lambda setting: (setting.name, setting.__get__(self)), ifilter(
lambda setting: setting.is_supported_by_protocol(version), definitions)))
def iteritems(self):
iteritems = SearchCommand.ConfigurationSettings.iteritems(self)
version = self.command.protocol_version
if version == 1:
if self.required_fields is None:
iteritems = ifilter(lambda (name, value): name != 'clear_required_fields', iteritems)
else:
iteritems = ifilter(lambda (name, value): name != 'distributed', iteritems)
if self.distributed:
iteritems = imap(
lambda (name, value): (name, 'stateful') if name == 'type' else (name, value), iteritems)
return iteritems
# endregion
def iteritems(self):
iteritems = SearchCommand.ConfigurationSettings.iteritems(self)
version = self.command.protocol_version
if version == 2:
iteritems = ifilter(lambda (name, value): name != 'distributed', iteritems)
if self.distributed and self.type == 'streaming':
iteritems = imap(
lambda (name, value): (name, 'stateful') if name == 'type' else (name, value), iteritems)
return iteritems
def _compile_object(self, schema):
"""Validate an object.
Has the same behavior as dictionary validator but work with object
attributes.
For example:
>>> class Structure(object):
... def __init__(self, one=None, three=None):
... self.one = one
... self.three = three
...
>>> validate = Schema(Object({'one': 'two', 'three': 'four'}, cls=Structure))
>>> with raises(MultipleInvalid, "not a valid value for object value @ data['one']"):
... validate(Structure(one='three'))
"""
base_validate = self._compile_mapping(schema,
invalid_msg='for object value')
def validate_object(path, data):
if schema.cls is not UNDEFINED and not isinstance(data, schema.cls):
raise Invalid('expected a {0!r}'.format(schema.cls), path)
iterable = _iterate_object(data)
iterable = ifilter(lambda item: item[1] is not None, iterable)
out = base_validate(path, iterable, {})
return type(data)(**out)
return validate_object
def get_origins(self, name):
return ', '.join(set(itertools.ifilter(None,
(l.origin_name for l in self.lines))))
def get_origins(self, name):
return ', '.join(set(itertools.ifilter(None,
(m.origin_name for m in self.moves))))
def get_origins(self, name):
return ', '.join(set(itertools.ifilter(None,
(m.origin_name for m in self.moves))))
def get_origins(self, name):
return ', '.join(set(itertools.ifilter(None,
(m.origin_name for m in self.moves))))
def get_origins(self, name):
return ', '.join(set(itertools.ifilter(None,
(m.origin_name for m in self.moves))))
def intersection(self, other):
"""Return the intersection of two sets as a new set.
(I.e. all elements that are in both sets.)
"""
if not isinstance(other, BaseSet):
other = Set(other)
if len(self) <= len(other):
little, big = self, other
else:
little, big = other, self
common = ifilter(big._data.__contains__, little)
return self.__class__(common)
def difference_update(self, other):
"""Remove all elements of another set from this set."""
data = self._data
if not isinstance(other, BaseSet):
other = Set(other)
if self is other:
self.clear()
for elt in ifilter(data.__contains__, other):
del data[elt]
# Python dict-like mass mutations: update, clear
def phase1(self): # Compute common names
a = dict(izip(imap(os.path.normcase, self.left_list), self.left_list))
b = dict(izip(imap(os.path.normcase, self.right_list), self.right_list))
self.common = map(a.__getitem__, ifilter(b.__contains__, a))
self.left_only = map(a.__getitem__, ifilterfalse(b.__contains__, a))
self.right_only = map(b.__getitem__, ifilterfalse(a.__contains__, b))
def formatException(self, ei, strip_newlines=True):
lines = traceback.format_exception(*ei)
if strip_newlines:
lines = [itertools.ifilter(
lambda x: x,
line.rstrip().splitlines()) for line in lines]
lines = list(itertools.chain(*lines))
return lines
def build_optimizer(opt, model, infos):
opt.pre_ft = getattr(opt, 'pre_ft', 1)
#model_parameters = itertools.ifilter(lambda p: p.requires_grad, model.parameters())
optimize = opt.optim
if optimize == 'adam':
optimizer = torch.optim.Adam(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'sgd':
optimizer = torch.optim.SGD(model.parameters(), lr=opt.learning_rate, momentum=0.999, weight_decay=0.0005)
elif optimize == 'Adadelta':
optimizer = torch.optim.Adadelta(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'Adagrad':
optimizer = torch.optim.Adagrad(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'Adamax':
optimizer = torch.optim.Adamax(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'ASGD':
optimizer = torch.optim.ASGD(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'LBFGS':
optimizer = torch.optim.LBFGS(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
elif optimize == 'RMSprop':
optimizer = torch.optim.RMSprop(model.parameters(), lr=opt.learning_rate, weight_decay=0.0005)
infos['optimized'] = True
# Load the optimizer
if len(opt.start_from) != 0:
if os.path.isfile(os.path.join(opt.start_from, opt.model_id + '.optimizer.pth')):
optimizer.load_state_dict(torch.load(os.path.join(opt.start_from, opt.model_id + '.optimizer.pth')))
return optimizer, infos
def filter(self, f):
"""
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator):
return filter(f, iterator)
return self.mapPartitions(func, True)
def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
"""
Return a new "state" DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
@param updateFunc: State update function. If this function returns None, then
corresponding state key-value pair will be eliminated.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
if initialRDD and not isinstance(initialRDD, RDD):
initialRDD = self._sc.parallelize(initialRDD)
def reduceFunc(t, a, b):
if a is None:
g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
else:
g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
return state.filter(lambda k_v: k_v[1] is not None)
jreduceFunc = TransformFunction(self._sc, reduceFunc,
self._sc.serializer, self._jrdd_deserializer)
if initialRDD:
initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
initialRDD._jrdd)
else:
dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
def filter(self, f):
"""
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
"""
def func(iterator):
return filter(f, iterator)
return self.mapPartitions(func, True)
def subtractByKey(self, other, numPartitions=None):
"""
Return each (key, value) pair in C{self} that has no pair with matching
key in C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
def filter_func(pair):
key, (val1, val2) = pair
return val1 and not val2
return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])