Java 类org.apache.hadoop.fs.PathFilter 实例源码
项目:hadoop-oss
文件:ViewFileSystem.java
@Override
public RemoteIterator<LocatedFileStatus>listLocatedStatus(final Path f,
final PathFilter filter) throws FileNotFoundException, IOException {
final InodeTree.ResolveResult<FileSystem> res = fsState
.resolve(getUriPath(f), true);
final RemoteIterator<LocatedFileStatus> statusIter = res.targetFileSystem
.listLocatedStatus(res.remainingPath);
if (res.isInternalDir()) {
return statusIter;
}
return new RemoteIterator<LocatedFileStatus>() {
@Override
public boolean hasNext() throws IOException {
return statusIter.hasNext();
}
@Override
public LocatedFileStatus next() throws IOException {
final LocatedFileStatus status = statusIter.next();
return (LocatedFileStatus)fixFileStatus(status,
getChrootedPath(res, status, f));
}
};
}
项目:kafka-connect-hdfs
文件:FileUtils.java
private static ArrayList<FileStatus> traverseImpl(Storage storage, Path path, PathFilter filter)
throws IOException {
if (!storage.exists(path.toString())) {
return new ArrayList<>();
}
ArrayList<FileStatus> result = new ArrayList<>();
FileStatus[] statuses = storage.listStatus(path.toString());
for (FileStatus status : statuses) {
if (status.isDirectory()) {
result.addAll(traverseImpl(storage, status.getPath(), filter));
} else {
if (filter.accept(status.getPath())) {
result.add(status);
}
}
}
return result;
}
项目:QDrill
文件:ParquetFormatPlugin.java
boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
try {
if (fs.exists(p)) {
return true;
} else {
if (metaDataFileExists(fs, dir)) {
return true;
}
PathFilter filter = new DrillPathFilter();
FileStatus[] files = fs.listStatus(dir.getPath(), filter);
if (files.length == 0) {
return false;
}
return super.isFileReadable(fs, files[0]);
}
} catch (IOException e) {
logger.info("Failure while attempting to check for Parquet metadata file.", e);
return false;
}
}
项目:hadoop
文件:FileInputFormat.java
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
项目:hadoop
文件:LocatedFileStatusFetcher.java
/**
* @param conf configuration for the job
* @param dirs the initial list of paths
* @param recursive whether to traverse the patchs recursively
* @param inputFilter inputFilter to apply to the resulting paths
* @param newApi whether using the mapred or mapreduce API
* @throws InterruptedException
* @throws IOException
*/
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
rawExec = Executors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
this.conf = conf;
this.inputDirs = dirs;
this.recursive = recursive;
this.inputFilter = inputFilter;
this.newApi = newApi;
}
项目:hadoop
文件:FileInputFormat.java
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
项目:hadoop
文件:HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
try {
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
while (fileStatusIter.hasNext()) {
FileStatus fileStatus = fileStatusIter.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && pathFilter.accept(filePath)) {
jhStatusList.add(fileStatus);
}
}
} catch (FileNotFoundException fe) {
LOG.error("Error while scanning directory " + path, fe);
}
return jhStatusList;
}
项目:hadoop
文件:GenerateData.java
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
项目:ditb
文件:MasterFileSystem.java
/**
* This method is the base split method that splits WAL files matching a filter. Callers should
* pass the appropriate filter for meta and non-meta WALs.
* @param serverNames logs belonging to these servers will be split; this will rename the log
* directory out from under a soft-failed server
* @param filter
* @throws IOException
*/
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames);
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTime();
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
if (this.metricsMasterFilesystem != null) {
if (filter == META_FILTER) {
this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
} else {
this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
}
}
}
项目:ditb
文件:SplitLogManager.java
/**
* Get a list of paths that need to be split given a set of server-specific directories and
* optionally a filter.
*
* See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
* layout.
*
* Should be package-private, but is needed by
* {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
* Configuration, WALFactory)} for tests.
*/
@VisibleForTesting
public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
final PathFilter filter)
throws IOException {
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
for (Path logDir : logDirs) {
final FileSystem fs = logDir.getFileSystem(conf);
if (!fs.exists(logDir)) {
LOG.warn(logDir + " doesn't exist. Nothing to do!");
continue;
}
FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
if (logfiles == null || logfiles.length == 0) {
LOG.info(logDir + " is empty dir, no logs to split");
} else {
Collections.addAll(fileStatus, logfiles);
}
}
FileStatus[] a = new FileStatus[fileStatus.size()];
return fileStatus.toArray(a);
}
项目:ditb
文件:FSVisitor.java
/**
* Iterate over the region store files
*
* @param fs {@link FileSystem}
* @param regionDir {@link Path} to the region directory
* @param visitor callback object to get the store files
* @throws IOException if an error occurred while scanning the directory
*/
public static void visitRegionStoreFiles(final FileSystem fs, final Path regionDir,
final StoreFileVisitor visitor) throws IOException {
FileStatus[] families = FSUtils.listStatus(fs, regionDir, new FSUtils.FamilyDirFilter(fs));
if (families == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("No families under region directory:" + regionDir);
}
return;
}
PathFilter fileFilter = new FSUtils.FileFilter(fs);
for (FileStatus family: families) {
Path familyDir = family.getPath();
String familyName = familyDir.getName();
// get all the storeFiles in the family
FileStatus[] storeFiles = FSUtils.listStatus(fs, familyDir, fileFilter);
if (storeFiles == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("No hfiles found for family: " + familyDir + ", skipping.");
}
continue;
}
for (FileStatus hfile: storeFiles) {
Path hfilePath = hfile.getPath();
visitor.storeFile(regionDir.getName(), familyName, hfilePath.getName());
}
}
}
项目:ditb
文件:IndexFile.java
/**
* Returns all files belonging to the given region directory. Could return an
* empty list.
*
* @param fs The file system reference.
* @param regionDir The region directory to scan.
* @return The list of files found.
* @throws IOException When scanning the files fails.
*/
static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
throws IOException {
List<Path> res = new ArrayList<Path>();
PathFilter dirFilter = new FSUtils.DirFilter(fs);
FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
for(FileStatus dir : familyDirs) {
FileStatus[] files = fs.listStatus(dir.getPath());
for (FileStatus file : files) {
if (!file.isDir()) {
res.add(file.getPath());
}
}
}
return res;
}
项目:big-c
文件:HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
try {
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
while (fileStatusIter.hasNext()) {
FileStatus fileStatus = fileStatusIter.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && pathFilter.accept(filePath)) {
jhStatusList.add(fileStatus);
}
}
} catch (FileNotFoundException fe) {
LOG.error("Error while scanning directory " + path, fe);
}
return jhStatusList;
}
项目:hadoop-manta
文件:MantaRemoteIterator.java
/**
* Creates a new instance wrapping a {@link MantaDirectoryListingIterator}.
*
* @param filter filter object that will filter out results
* @param stream backing stream
* @param path base path that is being iterated
* @param fs reference to the underlying filesystem
* @param autocloseWhenFinished flag indicate whether or not to close all
* resources when we have finished iterating
*/
public MantaRemoteIterator(final PathFilter filter,
final Stream<MantaObject> stream,
final Path path,
final FileSystem fs,
final boolean autocloseWhenFinished) {
this.filter = filter;
if (filter == null) {
this.inner = stream.iterator();
} else {
this.inner = stream.filter(obj -> filter.accept(new Path(obj.getPath()))).iterator();
}
this.closeableStream = stream;
this.path = path;
this.fs = fs;
this.autocloseWhenFinished = autocloseWhenFinished;
this.nextRef.set(nextAcceptable());
}
项目:aliyun-oss-hadoop-fs
文件:FileInputFormat.java
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
项目:aliyun-oss-hadoop-fs
文件:LocatedFileStatusFetcher.java
/**
* @param conf configuration for the job
* @param dirs the initial list of paths
* @param recursive whether to traverse the patchs recursively
* @param inputFilter inputFilter to apply to the resulting paths
* @param newApi whether using the mapred or mapreduce API
* @throws InterruptedException
* @throws IOException
*/
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
rawExec = Executors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
this.conf = conf;
this.inputDirs = dirs;
this.recursive = recursive;
this.inputFilter = inputFilter;
this.newApi = newApi;
}
项目:aliyun-oss-hadoop-fs
文件:MapFileOutputFormat.java
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(Path dir,
Configuration conf) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
String name = path.getName();
if (name.startsWith("_") || name.startsWith("."))
return false;
return true;
}
};
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, filter));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
项目:hadoop-manta
文件:MantaFileSystem.java
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(
final Path path, final PathFilter filter) throws IOException {
LOG.debug("List located status for path: {}", path);
String mantaPath = mantaPath(path);
if (!client.existsAndIsAccessible(mantaPath)) {
throw new FileNotFoundException(mantaPath);
}
/* We emulate a normal filesystem by showing the home directory under root in
* in order to provide compatibility with consumers that expect this behavior. */
if (mantaPath.equals(SEPARATOR)) {
LocatedFileStatus singleEntry = new LocatedFileStatus(new MantaFileStatus(true, path), null);
return new SingleEntryRemoteIterator<>(singleEntry);
}
if (!client.existsAndIsAccessible(mantaPath)) {
throw new FileNotFoundException(mantaPath);
}
Stream<MantaObject> stream = client.listObjects(mantaPath);
return new MantaRemoteIterator(filter, stream, path, this, true);
}
项目:aliyun-oss-hadoop-fs
文件:FileInputFormat.java
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
项目:aliyun-oss-hadoop-fs
文件:GenerateData.java
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
项目:aliyun-oss-hadoop-fs
文件:ViewFileSystem.java
@Override
public RemoteIterator<LocatedFileStatus>listLocatedStatus(final Path f,
final PathFilter filter) throws FileNotFoundException, IOException {
final InodeTree.ResolveResult<FileSystem> res = fsState
.resolve(getUriPath(f), true);
final RemoteIterator<LocatedFileStatus> statusIter = res.targetFileSystem
.listLocatedStatus(res.remainingPath);
if (res.isInternalDir()) {
return statusIter;
}
return new RemoteIterator<LocatedFileStatus>() {
@Override
public boolean hasNext() throws IOException {
return statusIter.hasNext();
}
@Override
public LocatedFileStatus next() throws IOException {
final LocatedFileStatus status = statusIter.next();
return (LocatedFileStatus)fixFileStatus(status,
getChrootedPath(res, status, f));
}
};
}
项目:drill
文件:FileSystemUtil.java
/**
* Will merge given array of filters into one.
* If given array of filters is empty, will return {@link #DUMMY_FILTER}.
*
* @param filters array of filters
* @return one filter that combines all given filters
*/
public static PathFilter mergeFilters(final PathFilter... filters) {
if (filters.length == 0) {
return DUMMY_FILTER;
}
return new PathFilter() {
@Override
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
};
}
项目:streamx
文件:FileUtils.java
private static ArrayList<FileStatus> traverseImpl(Storage storage, Path path, PathFilter filter)
throws IOException {
if (!storage.exists(path.toString())) {
return new ArrayList<>();
}
ArrayList<FileStatus> result = new ArrayList<>();
FileStatus[] statuses = storage.listStatus(path.toString());
for (FileStatus status : statuses) {
if (status.isDirectory()) {
result.addAll(traverseImpl(storage, status.getPath(), filter));
} else {
if (filter.accept(status.getPath())) {
result.add(status);
}
}
}
return result;
}
项目:reair
文件:FsUtils.java
/**
* Set the file modification times for the files on the destination to be the same as the
* modification times for the file on the source.
*
* @param conf configuration object
* @param src source directory
* @param dest destination directory
* @param filter a filter for excluding some files from modification
*
* @throws IOException if there's an error
*/
public static void syncModificationTimes(Configuration conf, Path src, Path dest,
Optional<PathFilter> filter) throws IOException {
Set<FileStatus> srcFileStatuses = getFileStatusesRecursive(conf, src, filter);
Map<String, Long> srcFileModificationTimes = null;
try {
srcFileModificationTimes = getRelativePathToModificationTime(src, srcFileStatuses);
} catch (ArgumentException e) {
throw new IOException("Invalid file statuses!", e);
}
FileSystem destFs = dest.getFileSystem(conf);
for (String file : srcFileModificationTimes.keySet()) {
destFs.setTimes(new Path(dest, file), srcFileModificationTimes.get(file), -1);
}
}
项目:big-c
文件:FileInputFormat.java
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
项目:big-c
文件:LocatedFileStatusFetcher.java
/**
* @param conf configuration for the job
* @param dirs the initial list of paths
* @param recursive whether to traverse the patchs recursively
* @param inputFilter inputFilter to apply to the resulting paths
* @param newApi whether using the mapred or mapreduce API
* @throws InterruptedException
* @throws IOException
*/
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
rawExec = Executors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
this.conf = conf;
this.inputDirs = dirs;
this.recursive = recursive;
this.inputFilter = inputFilter;
this.newApi = newApi;
}
项目:kafka-connect-hdfs
文件:MemoryStorage.java
@Override
public FileStatus[] listStatus(String path, PathFilter filter) throws IOException {
if (failure == Failure.listStatusFailure) {
failure = Failure.noFailure;
throw new IOException("listStatus failed.");
}
List<FileStatus> result = new ArrayList<>();
for (String key: data.keySet()) {
if (key.startsWith(path) && filter.accept(new Path(key))) {
FileStatus status = new FileStatus(data.get(key).size(), false, 1, 0, 0, 0, null, null, null, new Path(key));
result.add(status);
}
}
return result.toArray(new FileStatus[result.size()]);
}
项目:aliyun-maxcompute-data-collectors
文件:AvroUtil.java
/**
* Get the schema of AVRO files stored in a directory
*/
public static Schema getAvroSchema(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
Path fileToTest;
if (fs.isDirectory(path)) {
FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fileStatuses.length == 0) {
return null;
}
fileToTest = fileStatuses[0].getPath();
} else {
fileToTest = path;
}
SeekableInput input = new FsInput(fileToTest, conf);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
Schema result = fileReader.getSchema();
fileReader.close();
return result;
}
项目:aliyun-maxcompute-data-collectors
文件:CombineFileInputFormat.java
/**
* Create a new pool and add the filters to it.
* A pathname can satisfy any one of the specified filters.
* A split cannot have files from different pools.
*/
protected void createPool(PathFilter... filters) {
MultiPathFilter multi = new MultiPathFilter();
for (PathFilter f: filters) {
multi.add(f);
}
pools.add(multi);
}
项目:aliyun-maxcompute-data-collectors
文件:CombineFileInputFormat.java
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (filter.accept(path)) {
return true;
}
}
return false;
}
项目:aliyun-maxcompute-data-collectors
文件:CombineFileInputFormat.java
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("[");
for (PathFilter f: filters) {
buf.append(f);
buf.append(",");
}
buf.append("]");
return buf.toString();
}
项目:hadoop
文件:FileSystemRMStateStore.java
private FileStatus[] listStatusWithRetries(final Path path,
final PathFilter filter) throws Exception {
return new FSAction<FileStatus[]>() {
@Override
public FileStatus[] run() throws Exception {
return fs.listStatus(path, filter);
}
}.runWithRetries();
}
项目:hadoop
文件:JobHistoryUtils.java
private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
PathFilter filter) throws IOException {
List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
if (filter == null) {
return fsList;
} else {
List<FileStatus> filteredList = new LinkedList<FileStatus>();
for (FileStatus fs : fsList) {
if (filter.accept(fs.getPath())) {
filteredList.add(fs);
}
}
return filteredList;
}
}
项目:hadoop
文件:FileInputFormat.java
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
项目:hadoop
文件:FileInputFormat.java
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobConf conf) {
Class<? extends PathFilter> filterClass = conf.getClass(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
null, PathFilter.class);
return (filterClass != null) ?
ReflectionUtils.newInstance(filterClass, conf) : null;
}
项目:hadoop
文件:FileInputFormat.java
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
项目:hadoop
文件:LocatedFileStatusFetcher.java
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
boolean recursive, PathFilter inputFilter) {
this.fs = fs;
this.fileStatus = fileStatus;
this.recursive = recursive;
this.inputFilter = inputFilter;
}
项目:hadoop
文件:CombineFileInputFormat.java
/**
* Create a new pool and add the filters to it.
* A pathname can satisfy any one of the specified filters.
* A split cannot have files from different pools.
*/
protected void createPool(PathFilter... filters) {
MultiPathFilter multi = new MultiPathFilter();
for (PathFilter f: filters) {
multi.add(f);
}
pools.add(multi);
}
项目:hadoop
文件:CombineFileInputFormat.java
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (filter.accept(path)) {
return true;
}
}
return false;
}
项目:hadoop
文件:CombineFileInputFormat.java
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("[");
for (PathFilter f: filters) {
buf.append(f);
buf.append(",");
}
buf.append("]");
return buf.toString();
}