我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.filter()。
def getAndAssertTableRowAction(self, response, table_name, action_name, row_id): table = response.context[table_name + '_table'] rows = list(moves.filter(lambda x: x.id == row_id, table.data)) self.assertEqual(1, len(rows), "Did not find a row matching id '%s'" % row_id) row_actions = table.get_row_actions(rows[0]) actions = list(moves.filter(lambda x: x.name == action_name, row_actions)) msg_args = (action_name, table_name, row_id) self.assertTrue( len(actions) > 0, "No action named '%s' found in '%s' table for id '%s'" % msg_args) self.assertEqual( 1, len(actions), "Multiple actions named '%s' found in '%s' table for id '%s'" % msg_args) return actions[0]
def getAndAssertTableAction(self, response, table_name, action_name): table = response.context[table_name + '_table'] table_actions = table.get_table_actions() actions = list(moves.filter(lambda x: x.name == action_name, table_actions)) msg_args = (action_name, table_name) self.assertTrue( len(actions) > 0, "No action named '%s' found in '%s' table" % msg_args) self.assertEqual( 1, len(actions), "More than one action named '%s' found in '%s' table" % msg_args) return actions[0]
def _get_volume_row_action_from_ajax(self, res, action_name, row_id): def _matches_row_id(context_row): return (len(context_row.dicts) > 1 and isinstance(context_row.dicts[1], dict) and context_row.dicts[1].get('row_id', None) == row_id) matching = list(moves.filter(lambda r: _matches_row_id(r), res.context)) self.assertTrue(len(matching) > 1, "Expected at least one row matching %s" % row_id) row = matching[-1].dicts[1] matching_actions = list(moves.filter(lambda a: a.name == action_name, row['row_actions'])) self.assertEqual(1, len(matching_actions), "Expected one row action named '%s'" % action_name) return matching_actions[0]
def find_config_files(pdir=None, pfile=None, exten='.conf'): global _BINFILE config_files = [] confdir = os.path.expanduser('~/.') binfile = _get_binary_name() + exten if not _BINFILE: _BINFILE = binfile pdir = pdir or confdir if not os.path.exists(pdir): os.mkdir(pdir, 0o755) if pfile is None: pfile = os.path.join(confdir, binfile) if os.path.exists(pfile): config_files.append(pfile) config_files.extend(_find_default_config()) return list(moves.filter(bool, config_files))
def paths_on_pythonpath(paths): """ Add the indicated paths to the head of the PYTHONPATH environment variable so that subprocesses will also see the packages at these paths. Do this in a context that restores the value on exit. """ nothing = object() orig_pythonpath = os.environ.get('PYTHONPATH', nothing) current_pythonpath = os.environ.get('PYTHONPATH', '') try: prefix = os.pathsep.join(paths) to_join = filter(None, [prefix, current_pythonpath]) new_path = os.pathsep.join(to_join) if new_path: os.environ['PYTHONPATH'] = new_path yield finally: if orig_pythonpath is nothing: os.environ.pop('PYTHONPATH', None) else: os.environ['PYTHONPATH'] = orig_pythonpath
def find_data_files(self, package, src_dir): """Return filenames for package's data files in 'src_dir'""" patterns = self._get_platform_patterns( self.package_data, package, src_dir, ) globs_expanded = map(glob, patterns) # flatten the expanded globs into an iterable of matches globs_matches = itertools.chain.from_iterable(globs_expanded) glob_files = filter(os.path.isfile, globs_matches) files = itertools.chain( self.manifest_files.get(package, []), glob_files, ) return self.exclude_data_files(package, src_dir, files)
def exclude_data_files(self, package, src_dir, files): """Filter filenames for package's data files in 'src_dir'""" files = list(files) patterns = self._get_platform_patterns( self.exclude_package_data, package, src_dir, ) match_groups = ( fnmatch.filter(files, pattern) for pattern in patterns ) # flatten the groups of matches into an iterable of matches matches = itertools.chain.from_iterable(match_groups) bad = set(matches) keepers = ( fn for fn in files if fn not in bad ) # ditch dupes return list(_unique_everseen(keepers))
def respond(self): """Process the current request.""" """ From PEP 333: The start_response callable must not actually transmit the response headers. Instead, it must store them for the server or gateway to transmit only after the first iteration of the application return value that yields a NON-EMPTY string, or upon the application's first invocation of the write() callable. """ response = self.req.server.wsgi_app(self.env, self.start_response) try: for chunk in filter(None, response): if not isinstance(chunk, six.binary_type): raise ValueError('WSGI Applications must yield bytes') self.write(chunk) finally: if hasattr(response, 'close'): response.close()
def _get_digests(images): images = list(images) if not images: return {} for image in images: fabricio.run( 'docker pull %s' % image, ignore_errors=True, quiet=False, use_cache=True, ) command = ( "docker inspect --type image --format '{{index .RepoDigests 0}}' %s" ) % ' '.join(images) digests = fabricio.run(command, ignore_errors=True, use_cache=True) return dict(zip_longest(images, filter(None, digests.splitlines())))
def _leaf_versions(tree, rc): ''' Recursively traverse the versions tree in a depth-first fashion, and collect the last node of each branch, i.e. leaf versions. ''' versions = [] if _is_iterable(tree): for subtree in tree: versions.extend(_leaf_versions(subtree, rc)) if not versions: if rc: last_rc = next(filter(lambda v: v.is_rc, reversed(tree)), None) last_prod = next( filter(lambda v: not v.is_rc, reversed(tree)), None) if last_rc and last_prod and (last_prod < last_rc): versions.extend([last_prod, last_rc]) elif not last_prod: versions.append(last_rc) else: # Either there is no RC, or we ignore the RC as older than # the latest production version: versions.append(last_prod) else: versions.append(tree[-1]) return versions
def apply_move(cls, state, move): new_board = copy(state.board) if move not in range(9) or state.board[move]: raise ValueError('Illegal move') new_board[move] = state.current_player next_player = state.current_player == 'X' and 'O' or 'X' winner = None for player in ['X', 'O']: score = sum([2 ** i for i, spot in enumerate(new_board) if spot == player]) for winning_score in cls.winning_scores: if winning_score & score == winning_score: winner = player if winner is None and len(list(filter(None, new_board))) == 9: winner = Draw return cls.State(board=new_board, current_player=next_player, winner=winner)