Python itertools 模块,filterfalse() 实例源码
我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用itertools.filterfalse()。
def display_reviews(self, message):
def filter_predicate(x):
return self.__cache.IsInCache(self.__get_cachekey(x, message))
reviews = self.__crucible_regex.findall(message.body['text'])
reviews = filterfalse(filter_predicate, reviews)
if reviews:
attachments = []
for reviewid in filterfalse(filter_predicate, reviews):
self.__cache.AddToCache(self.__get_cachekey(reviewid, message))
try:
msg = self.__get_review_message(reviewid, message._client)
if msg is None:
msg = self.__get_reviewnotfound_message(reviewid)
attachments.append(msg)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print('Invalid auth')
raise
if attachments:
message.send_webapi('', json.dumps(attachments))
def display_issues(self, message):
attachments = []
issues = self.__jira_regex.findall(message.body['text'])
def filter_predicate(x):
return self.__cache.IsInCache(self.__get_cachekey(x, message))
for issue in filterfalse(filter_predicate, issues):
self.__cache.AddToCache(self.__get_cachekey(issue, message))
issue_message = self.get_issue_message(issue)
if issue_message is None:
issue_message = self.__get_issuenotfound_message(issue)
attachments.append(issue_message)
if attachments:
message.send_webapi('', json.dumps(attachments))
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in itertools.filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_everseen(iterable, key=None):
"""List unique elements, preserving order. Remember all elements ever seen.
The snippet is taken form https://docs.python.org/3.6/library/itertools.html#itertools-recipes
>>> unique_everseen('AAAABBBCCDAABBB')
A B C D
>>> unique_everseen('ABBCcAD', str.lower)
A B C D
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def main():
# ????????????
# ??accumulate()????????????,????islice()?????????len(items)-2?????None????????
for n in itertools.islice(itertools.accumulate(cost(item) for item in items[1:]), len(items) - 2, None):
print(n)
# ?????????????
print(n / (len(items) - 1))
# ?????????
owners = {}
for ky, grp in itertools.groupby(sorted(items[1:], key=owner), key=owner):
owners[ky] = len(list(grp))
for member in members[1:]:
print(member[1], " : ", owners[member[0]])
# ??????returned()???,???????????????.???filterfalse()??????,???????????????,????????????
print([items[int(loan[1])] for loan in itertools.filterfalse(returned, loans)])
def make_node_set(node_set, reverse=False):
ids = set()
def is_unique_id(node):
node_id = id(node)
if node_id in ids:
return False
else:
ids.add(node_id)
return True
if not isinstance(node_set, list):
node_set = [node_set]
non_node_member = next(filterfalse(is_any_node, node_set), False)
if non_node_member:
format_str = 'Constructed node set that includes {0} object "{1}"'
raise HqueryEvaluationError(format_str.format(object_type_name(non_node_member), non_node_member))
node_set = list(sorted(filter(is_unique_id, node_set), key=lambda n: n.hq_doc_index, reverse=reverse))
return node_set
def download_manga(manga_name, range_start=1, range_end=None, b_make_cbz=False, remove=False):
"""Download a range of a chapters"""
chapter_urls = get_chapter_urls(manga_name)
if range_end == None : range_end = max(chapter_urls.keys())
for chapter, url in filterfalse (lambda chapter_url:
chapter_url[0] < range_start
or chapter_url[0] > range_end,
chapter_urls.items()):
chapter_number = get_chapter_number(url)
print('===============================================')
print('Chapter ' + chapter_number)
print('===============================================')
image_urls = get_chapter_image_urls(url)
download_urls(image_urls, manga_name, chapter_number)
download_dir = './{0}/{1}'.format(manga_name, chapter_number)
if b_make_cbz is True:
make_cbz(download_dir)
if remove is True: shutil.rmtree(download_dir)
def members():
all_users = json.loads(database.get_all_users())
if not all_users['ok']:
flask.abort(500)
users = [User(username=user['username'], skills=user['skills'], points=user['points'], last_seen='Not Available')
for user in all_users['response']]
order = flask.request.args.get('order')
lang = flask.request.args.get('lang', '').lower()
if order == 'points':
users.sort(key=attrgetter('points'), reverse=True)
else:
users.sort(key=attrgetter('username'))
if lang:
t1, t2 = tee(users)
lang_yes = filter(lambda user: lang in user.skills.lower(), t1)
lang_no = filterfalse(lambda user: lang in user.skills.lower(), t2)
users = list(lang_yes) + list(lang_no)
return flask.render_template('members.html', members=users)
def uniq(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def partition(iterable, predicate):
"""Divide the iterable into two iterables according to the predicate.
>>> evens, odds = partition(range(10), lambda x: not x % 2)
>>> list(evens)
[0, 2, 4, 6, 8]
>>> list(odds)
[1, 3, 5, 7, 9]
"""
t1, t2 = tee(iterable)
return filter(predicate, t1), filterfalse(predicate, t2)
def unique(*iterables, key=None):
"""Yield unique elements, preserving order.
>>> ''.join(unique('AAAABBBCCDAABBB'))
'ABCD'
>>> ''.join(unique('AAAA', 'BBBC', 'CDA', 'ABBB'))
'ABCD'
>>> ''.join(unique('ABBCcAD', key=str.casefold))
'ABCD'
"""
combined = chain.from_iterable(iterables)
yielded = set()
# Avoid inner-loop name lookups
already_yielded = yielded.__contains__
remember = yielded.add
if key is None:
for element in filterfalse(already_yielded, combined):
remember(element)
yield element
else:
for element in combined:
k = key(element)
if not already_yielded(k):
remember(k)
yield element
def remove_duplicates(iterable, key=None):
"""
Renvoie un generateur sur un iterable qui enleve tous les elements en double dans une liste, conservant l'ordre."""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def remove_duplicates(iterable, key=None):
"""
Renvoie un generateur sur un iterable qui enleve tous les elements en double dans une liste, conservant l'ordre."""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def _parse_merged_entities(self):
"""set self._merged_entities to the longest possible(wrapping) tokens
"""
self._merged_entities = list(filterfalse(
lambda token: self._is_wrapped(token, self.entities),
self.entities))
def _parse_all_merged_entities(self):
"""set self._all_merged_entities to the longest possible(wrapping)
tokens including non-entity tokens
"""
self._all_merged_entities = list(filterfalse(
lambda token: self._is_wrapped(token, self.all_entities),
self.all_entities))
def phase1(self): # Compute common names
a = dict(zip(map(os.path.normcase, self.left_list), self.left_list))
b = dict(zip(map(os.path.normcase, self.right_list), self.right_list))
self.common = list(map(a.__getitem__, filter(b.__contains__, a)))
self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a)))
self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b)))
def _filter(flist, skip):
return list(filterfalse(skip.__contains__, flist))
# Demonstration and testing.
#
def test_itertools_filterfalse(self):
"""
Tests whether itertools.filterfalse is available.
"""
from itertools import filterfalse
not_div_by_3 = filterfalse(lambda x: x % 3 == 0, range(8))
self.assertEqual(list(not_div_by_3), [1, 2, 4, 5, 7])
def test_install_aliases(self):
"""
Does the install_aliases() interface monkey-patch urllib etc. successfully?
"""
from future.standard_library import remove_hooks, install_aliases
remove_hooks()
install_aliases()
from collections import Counter, OrderedDict # backported to Py2.6
from collections import UserDict, UserList, UserString
# Requires Python dbm support:
# import dbm
# import dbm.dumb
# import dbm.gnu
# import dbm.ndbm
from itertools import filterfalse, zip_longest
from subprocess import check_output # backported to Py2.6
from subprocess import getoutput, getstatusoutput
from sys import intern
# test_support may not be available (e.g. on Anaconda Py2.6):
# import test.support
import urllib.error
import urllib.parse
import urllib.request
import urllib.response
import urllib.robotparser
self.assertTrue('urlopen' in dir(urllib.request))
def riffle_shuffle(iterable, n=2):
"""Generator that performs a perfect riffle shuffle on the input, using a given number of subdecks."""
return itertools.filterfalse(non, itertools.chain.from_iterable(zip(*list(itertools.zip_longest(*[iter(iterable)]*n)))))
# Mappings
def unique_everseen(iterable, key=None):
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def __get_preview_window(self):
return next(filterfalse(lambda x:
not x.options['previewwindow'],
self.vim.windows), None)
def __get_preview_window(self):
return next(filterfalse(lambda x:
not x.options['previewwindow'],
self.vim.windows), None)
# Needed for openable actions
def parse_lines(msg_list):
msg_list = filterfalse(lambda x: '#' in x, msg_list)
new_list = []
for line in msg_list:
try:
new_list.append(parse_line(line))
except (ValueError, TypeError):
logger.debug('Error parsing {}'.format(line))
return convert_to_df(new_list)
def partition(predicate, iterable):
"""Use a predicate to partition true and false entries.
Reference
---------
Python itertools documentation.
"""
t1, t2 = tee(iterable)
return filterfalse(predicate, t1), filter(predicate, t2)
def phase1(self): # Compute common names
a = dict(zip(map(os.path.normcase, self.left_list), self.left_list))
b = dict(zip(map(os.path.normcase, self.right_list), self.right_list))
self.common = list(map(a.__getitem__, filter(b.__contains__, a)))
self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a)))
self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b)))
def _filter(flist, skip):
return list(filterfalse(skip.__contains__, flist))
# Demonstration and testing.
#
def send_game_start(self, session):
conn_in_room = itertools.filterfalse(
lambda x: x.room is None or x.spectate is True,
self.server.connections)
conns = sorted(conn_in_room, key=lambda x: x.room)
for room_id, room_conns in itertools.groupby(conns, key=lambda x: x.room):
if not room_id:
continue
self.check_song_start(session, room_id, room_conns)
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def phase1(self): # Compute common names
a = dict(zip(map(os.path.normcase, self.left_list), self.left_list))
b = dict(zip(map(os.path.normcase, self.right_list), self.right_list))
self.common = list(map(a.__getitem__, filter(b.__contains__, a)))
self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a)))
self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b)))
def _filter(flist, skip):
return list(filterfalse(skip.__contains__, flist))
# Demonstration and testing.
#
def difference(a, b):
"""
Equivalent to A-B or A\B in set theory. Difference/Relative Complement
:param a: First list of dicts
:param b: Second list of dicts
:return: List of elements in a but not in b
"""
return list(filterfalse(lambda x: x in b, a))
def partition(pred, iterable):
"""Use a predicate to partition entries into false entries and true entries."""
# https://stackoverflow.com/questions/8793772/how-to-split-a-sequence-according-to-a-predicate
# NOTE: this might iterate over the collection twice
# NOTE: need to use filter(s) here because we're lazily dealing with iterators
it1, it2 = itertools.tee(iterable)
return itertools.filterfalse(pred, it1), filter(pred, it2) # pylint: disable=bad-builtin
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def to_python(self, value):
if not value:
return []
if not isinstance(value, str):
raise ValidationError(
'%s is not a valid string value.' % str(value))
result = [item.strip() for item in filterfalse(
lambda item: item.strip() == '', value.split(self.delimiter))]
return result
def parse_address(branch):
"""
expected address structure:
required:
organization : str
add_no : int
optional:
full_address : str
organization synonyms : list of str
country : str
city : str
state : str
zipcode : str
street : str
"""
success = True
try:
org_names = branch.findall(org_path)
def condition(x):
return x.attrib and 'pref' in x.attrib and x.attrib['pref'] == 'Y'
# find first org with pref='Y'
orgs_pref = list(filter(condition, org_names))
orgs_pref = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_pref)))
result_dict = {'organizations_pref': orgs_pref}
orgs_rest = list(filterfalse(condition, org_names))
orgs_rest = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_rest)))
result_dict.update({'organizations': orgs_rest})
suborg_names = branch.findall(suborg_path)
suborgs = list(map(lambda x: x.text, filterfalse(lambda y: y is None, suborg_names)))
result_dict.update({'suborganizations': suborgs})
if branch.attrib:
if add_no_key in branch.attrib:
# TODO add try-catch-raise with logging
# if not int-able : exception triggered
addr_number = int(branch.attrib[add_no_key])
result_dict.update({add_no_key: addr_number})
else:
result_dict.update({add_no_key: 1})
# entries below are optional
add_entry(result_dict, branch, full_address_path)
add_entry(result_dict, branch, country_path)
add_entry(result_dict, branch, city_path)
add_entry(result_dict, branch, state_path)
add_entry(result_dict, branch, zipcode_path)
add_entry(result_dict, branch, street_path)
except:
success = False
result_dict = etree_to_dict(branch)
return success, result_dict
def test_future_moves(self):
"""
Ensure everything is available from the future.moves interface that we
claim and expect. (Issue #104).
"""
from future.moves.collections import Counter, OrderedDict # backported to Py2.6
from future.moves.collections import UserDict, UserList, UserString
from future.moves import configparser
from future.moves import copyreg
from future.moves.itertools import filterfalse, zip_longest
from future.moves import html
import future.moves.html.entities
import future.moves.html.parser
from future.moves import http
import future.moves.http.client
import future.moves.http.cookies
import future.moves.http.cookiejar
import future.moves.http.server
from future.moves import queue
from future.moves import socketserver
from future.moves.subprocess import check_output # even on Py2.6
from future.moves.subprocess import getoutput, getstatusoutput
from future.moves.sys import intern
from future.moves import urllib
import future.moves.urllib.error
import future.moves.urllib.parse
import future.moves.urllib.request
import future.moves.urllib.response
import future.moves.urllib.robotparser
try:
# Is _winreg available on Py2? If so, ensure future.moves._winreg is available too:
import _winreg
except ImportError:
pass
else:
from future.moves import winreg
from future.moves import xmlrpc
import future.moves.xmlrpc.client
import future.moves.xmlrpc.server
from future.moves import _dummy_thread
from future.moves import _markupbase
from future.moves import _thread
def filter_candidates(self, context):
for source in self._current_sources:
ctx = source.context
ctx['matchers'] = context['matchers']
ctx['input'] = context['input']
if context['smartcase']:
ctx['ignorecase'] = re.search(r'[A-Z]', ctx['input']) is None
ctx['mode'] = context['mode']
ctx['async_timeout'] = 0.03 if ctx['mode'] != 'insert' else 0.02
if ctx['prev_input'] != ctx['input'] and ctx['is_interactive']:
ctx['event'] = 'interactive'
ctx['all_candidates'] = self._gather_source_candidates(
ctx, source)
ctx['prev_input'] = ctx['input']
entire = ctx['all_candidates']
if ctx['is_async']:
ctx['event'] = 'async'
entire += self._gather_source_candidates(ctx, source)
if not entire:
yield source.name, entire, [], []
continue
partial = []
ctx['candidates'] = entire
for i in range(0, len(entire), 1000):
ctx['candidates'] = entire[i:i+1000]
matchers = [self._filters[x] for x in
(ctx['matchers'].split(',') if ctx['matchers']
else source.matchers)
if x in self._filters]
self.match_candidates(ctx, matchers)
partial += ctx['candidates']
if len(partial) >= 1000:
break
ctx['candidates'] = partial
for f in [self._filters[x]
for x in source.sorters + source.converters
if x in self._filters]:
ctx['candidates'] = f.filter(ctx)
partial = ctx['candidates']
for c in partial:
c['source'] = source.name
ctx['candidates'] = []
patterns = filterfalse(lambda x: x == '', (
self._filters[x].convert_pattern(context['input'])
for x in source.matchers if self._filters[x]))
yield source.name, entire, partial, patterns
def add_many(self, name, timestamp_pairs, chunks_size=2000, *args, **kwargs):
"""
:param name:
:param timestamp_pairs: [("timestamp",data)]
:param chunks_size:
:param args:
:param kwargs:
:return:
"""
incr_key = self.incr_format.format(key=name)
hash_key = self.hash_format.format(key=name)
# remove exist data
# todo maybe other way to optimize this filter code
sorted_timestamps = sorted(timestamp_pairs, key=itemgetter(0))
max_timestamp = sorted_timestamps[-1][0] # max
min_timestamp = sorted_timestamps[0][0] # min
filter_data = self.get_slice(name, start=min_timestamp, end=max_timestamp)
if filter_data:
timestamp_set = set(map(lambda x: x[0], filter_data))
filter_results = itertools.filterfalse(lambda x: x[0] in timestamp_set, sorted_timestamps)
else:
filter_results = sorted_timestamps
chunks_data = helper.chunks(filter_results, chunks_size)
with self._pipe_acquire() as pipe:
for chunks in chunks_data:
start_id = self.client.get(incr_key) or 1 # if key not exist id equal 0
end_id = self.client.incrby(incr_key, amount=len(chunks)) # incr the add length
start_id = int(start_id)
end_id = int(end_id)
ids_range = range(start_id, end_id)
dumps_results = map(lambda x: (x[0], self.serializer.dumps(x[1])), chunks)
mix_data = itertools.zip_longest(dumps_results, ids_range) # [(("timestamp",data),id),...]
mix_data = list(mix_data) # need converted as list
timestamp_ids = map(lambda seq: (seq[0][0], seq[1]), mix_data) # [("timestamp",id),...]
ids_pairs = map(lambda seq: (seq[1], seq[0][1]), mix_data) # [("id",data),...]
timestamp_ids = itertools.chain.from_iterable(timestamp_ids)
ids_values = {k: v for k, v in ids_pairs}
pipe.multi()
pipe.zadd(name, *timestamp_ids)
pipe.hmset(hash_key, ids_values)
pipe.execute()