Java 类org.apache.hadoop.hdfs.DistributedRaidFileSystem 实例源码

项目:hadoop-EAR    文件:DirectoryStripeReader.java   
/**
 * Helper function of Encoder.
 */
public List<List<Block>> getSrcStripes() {
  List<List<Block>> srcStripes = new ArrayList<List<Block>>();
  for (int i=0;i<srcStripeList.size();i++) {
    List<BlockInfo> biList=srcStripeList.get(i);
    List curSrcStripe = new ArrayList<Block>();
    for (int j=0;j<biList.size();j++) {
      int fileIdx = biList.get(j).fileIdx;
      int blockId = biList.get(j).blockId;
      FileStatus curFs = lfs.get(fileIdx);

      try {
        if (fs instanceof DistributedRaidFileSystem) {
          curSrcStripe.add(
              ((DistributedRaidFileSystem)fs).toDistributedFileSystem().getLocatedBlocks(curFs.getPath(),
              0L, curFs.getLen()).get(blockId).getBlock());
        }
      } catch (IOException e) {
        ;
      }
    }
    srcStripes.add(curSrcStripe);
  }
  return srcStripes;
}
项目:hadoop-EAR    文件:GenThread.java   
public GenThread(Configuration conf, Path input, Path output,
    RunTimeConstants rtc) throws IOException {
  this.inputPath = input;
  this.outputPath = output;
  this.fs = FileSystem.newInstance(conf);
  if (fs instanceof DistributedRaidFileSystem) {
    fs = ((DistributedRaidFileSystem)fs).getFileSystem();
  }
  this.buffer = new byte[rtc.buffer_size];
  if (test_buffer_size > rtc.buffer_size) 
    test_buffer_size = rtc.buffer_size;
}
项目:hadoop-EAR    文件:StatisticsCollector.java   
public double getEffectiveReplication() {
  if (lastRaidStatistics == null) {
    return -1;
  }
  DFSClient dfs;
  double totalPhysical;
  try {
      /* Add by RH start */
      if(FileSystem.get(conf) instanceof DistributedRaidFileSystem){
          dfs = ((DistributedRaidFileSystem)FileSystem.get(conf)).getClient();
      }else{
          dfs = ((DistributedFileSystem)FileSystem.get(conf)).getClient();
      }
      /* Add by RH end */
      /* Commented by RH start */
    //dfs = ((DistributedFileSystem)FileSystem.get(conf)).getClient();
      /* Commented by RH end */
    totalPhysical = dfs.getNSDiskStatus().getDfsUsed();
  } catch (IOException e) {
    return -1;
  }
  double notRaidedPhysical = totalPhysical;
  double totalLogical = 0;
  for (Codec codec : Codec.getCodecs()) {
    String code = codec.id;
    Statistics st = lastRaidStatistics.get(code);
    totalLogical += st.getSourceCounters(RaidState.RAIDED).getNumLogical();
    notRaidedPhysical -= st.getSourceCounters(RaidState.RAIDED).getNumBytes();
    notRaidedPhysical -= st.getParityCounters().getNumBytes();
  }
  totalLogical += notRaidedPhysical / dfs.getDefaultReplication();
  if (totalLogical == 0) {
    // divided by 0
    return -1;
  }
  return totalPhysical / totalLogical;
}
项目:hadoop-EAR    文件:RaidShell.java   
public void purgeParity(String cmd, String[] args, int startIndex)
    throws IOException {
  if (startIndex + 1 >= args.length) {
    printUsage(cmd);
    throw new IllegalArgumentException("Insufficient arguments");
  }
  Path parityPath = new Path(args[startIndex]);
  AtomicLong entriesProcessed = new AtomicLong(0);
  System.err.println("Starting recursive purge of " + parityPath);

  Codec codec = Codec.getCodec(args[startIndex + 1]);
  FileSystem srcFs = parityPath.getFileSystem(conf);
  if (srcFs instanceof DistributedRaidFileSystem) {
    srcFs = ((DistributedRaidFileSystem)srcFs).getFileSystem();
  }
  FileSystem parityFs = srcFs;
  String parityPrefix = codec.parityDirectory;
  DirectoryTraversal obsoleteParityFileRetriever =
    new DirectoryTraversal(
      "Purge File ",
      java.util.Collections.singletonList(parityPath),
      parityFs,
      new PurgeMonitor.PurgeParityFileFilter(conf, codec, null,
          srcFs, parityFs,
        parityPrefix, null, entriesProcessed),
      1,
      false);
  FileStatus obsolete = null;
  while ((obsolete = obsoleteParityFileRetriever.next()) !=
            DirectoryTraversal.FINISH_TOKEN) {
    PurgeMonitor.performDelete(parityFs, obsolete.getPath(), false);
  }
}
项目:hadoop-EAR    文件:RaidUtils.java   
/**
 * gets the parity blocks corresponding to file
 * returns the parity blocks in case of DFS
 * and the part blocks containing parity blocks
 * in case of HAR FS
 */
private static BlockLocation[] getParityBlocks(final Path filePath,
                                        final long blockSize,
                                        final long numStripes,
                                        final RaidInfo raidInfo) 
  throws IOException {
  FileSystem parityFS = raidInfo.parityPair.getFileSystem();

  // get parity file metadata
  FileStatus parityFileStatus = raidInfo.parityPair.getFileStatus(); 
  long parityFileLength = parityFileStatus.getLen();

  if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe *
      blockSize) {
    throw new IOException("expected parity file of length" + 
                          (numStripes * raidInfo.parityBlocksPerStripe *
                           blockSize) +
                          " but got parity file of length " + 
                          parityFileLength);
  }

  BlockLocation[] parityBlocks = 
    parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);

  if (parityFS instanceof DistributedFileSystem ||
      parityFS instanceof DistributedRaidFileSystem) {
    long parityBlockSize = parityFileStatus.getBlockSize();
    if (parityBlockSize != blockSize) {
      throw new IOException("file block size is " + blockSize + 
                            " but parity file block size is " + 
                            parityBlockSize);
    }
  } else if (parityFS instanceof HarFileSystem) {
    LOG.debug("HAR FS found");
  } else {
    LOG.warn("parity file system is not of a supported type");
  }

  return parityBlocks;
}
项目:hadoop-EAR    文件:TestDirectoryReadConstruction.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
      "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
      "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:hadoop-EAR    文件:TestParityMovement.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
           "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
           "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:hadoop-EAR    文件:TestParityMovement.java   
public void testRenameHar() throws Exception {
  try {
    mySetup("xor", 1);

    Path[] testPathList = new Path[] {
        new Path ("/user/dikang/raidtest/rename/f1"),
        new Path ("/user/dikang/raidtest/rename/f2"),
        new Path ("/user/dikang/raidtest/rename/f3")};

    Path destHarPath = new Path ("/destraid/user/dikang/raidtest/rename");

    DistributedRaidFileSystem raidFs = getRaidFS();
    for (Path srcPath : testPathList) {
      TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
          1, 8, 8192L);
    }

    raidFs.mkdirs(destHarPath);
    raidFs.mkdirs(new Path(destHarPath, "rename" + RaidNode.HAR_SUFFIX));

    raidFs.rename(new Path("/user/dikang/raidtest"), 
        new Path("/user/dikang/raidtest1"));
    fail("Expected fail for HAR rename");
  } catch (IOException ie) {
    String message = ie.getMessage();
    assertTrue(message.contains("HAR dir"));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
           "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
           "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
public void testRename() throws Exception {
  try {

    long[] crcs = new long[3];
    int[] seeds = new int[3];
    short repl = 1;
    Path dirPath = new Path("/user/dikang/raidtest");

    mySetup();
    DistributedRaidFileSystem raidFs = getRaidFS();
    Path[] files = TestRaidDfs.createTestFiles(dirPath,
        fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
    FileStatus stat = raidFs.getFileStatus(dirPath); 
    Codec codec = Codec.getCodec("dir-rs");
    RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec,
        new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    Path destPath = new Path("/user/dikang/raidtest_new");

    assertTrue(raidFs.exists(dirPath));
    assertFalse(raidFs.exists(destPath));

    ParityFilePair parity = ParityFilePair.getParityFile(
        codec, stat, conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    // do the rename file
    assertTrue(raidFs.rename(dirPath, destPath));
    // verify the results.
    assertFalse(raidFs.exists(dirPath));
    assertTrue(raidFs.exists(destPath));
    assertFalse(raidFs.exists(srcParityPath));
    FileStatus srcDest = raidFs.getFileStatus(destPath);
    parity = ParityFilePair.getParityFile(codec, srcDest, conf);
    assertTrue(raidFs.exists(parity.getPath()));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
public void testDeleteOneFile() throws Exception {
  try {
    long[] crcs = new long[3];
    int[] seeds = new int[3];
    short repl = 1;
    Path dirPath = new Path("/user/dikang/raidtest");

    mySetup();
    DistributedRaidFileSystem raidFs = getRaidFS();
    Path[] files = TestRaidDfs.createTestFiles(dirPath,
        fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
    FileStatus stat = raidFs.getFileStatus(dirPath); 
    Codec codec = Codec.getCodec("dir-rs");
    RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec,
        new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    ParityFilePair parity = ParityFilePair.getParityFile(
        codec, stat, conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));

    // delete one file
    assertTrue(raidFs.delete(files[0]));
    // verify the results
    assertFalse(raidFs.exists(files[0]));
    // we still have the parity file
    assertTrue(raidFs.exists(srcParityPath));

    // delete the left files
    assertTrue(raidFs.delete(files[1]));
    assertTrue(raidFs.delete(files[2]));

    // we will not touch the parity file.
    assertTrue(raidFs.exists(srcParityPath));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
public void testDeleteDirRaidedFile() throws Exception {

  long[] crcs = new long[3];
  int[] seeds = new int[3];
  short repl = 1;
  Path dirPath = new Path("/user/dikang/raidtest");

  mySetup();
  //disable trash
  conf.setInt("fs.trash.interval", 0);

  DistributedRaidFileSystem raidFs = getRaidFS();
  Path[] files = TestRaidDfs.createTestFiles(dirPath,
      fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
  FileStatus stat = raidFs.getFileStatus(dirPath); 
  Codec codec = Codec.getCodec("dir-rs");
  RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec,
      new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
    false, repl, repl);

  try {
    raidFs.delete(files[0]);
    fail();
  } catch (Exception ex) {
    LOG.warn("Excepted error: " + ex.getMessage(), ex);
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidDfs.java   
static private DistributedRaidFileSystem getRaidFS(FileSystem fileSys,
    Configuration conf)
    throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:hadoop-EAR    文件:TestReadConstruction.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
      "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
      "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:RDFS    文件:GenThread.java   
public GenThread(Configuration conf, Path input, Path output,
    RunTimeConstants rtc) throws IOException {
  this.inputPath = input;
  this.outputPath = output;
  this.fs = FileSystem.newInstance(conf);
  if (fs instanceof DistributedRaidFileSystem) {
    fs = ((DistributedRaidFileSystem)fs).getFileSystem();
  }
  this.buffer = new byte[rtc.buffer_size];
  if (test_buffer_size > rtc.buffer_size) 
    test_buffer_size = rtc.buffer_size;
}
项目:RDFS    文件:RaidShell.java   
public void purgeParity(String cmd, String[] args, int startIndex)
    throws IOException {
  if (startIndex + 1 >= args.length) {
    printUsage(cmd);
    throw new IllegalArgumentException("Insufficient arguments");
  }
  Path parityPath = new Path(args[startIndex]);
  AtomicLong entriesProcessed = new AtomicLong(0);
  System.err.println("Starting recursive purge of " + parityPath);

  Codec codec = Codec.getCodec(args[startIndex + 1]);
  FileSystem srcFs = parityPath.getFileSystem(conf);
  if (srcFs instanceof DistributedRaidFileSystem) {
    srcFs = ((DistributedRaidFileSystem)srcFs).getFileSystem();
  }
  FileSystem parityFs = srcFs;
  String parityPrefix = codec.parityDirectory;
  DirectoryTraversal obsoleteParityFileRetriever =
    new DirectoryTraversal(
      "Purge File ",
      java.util.Collections.singletonList(parityPath),
      parityFs,
      new PurgeMonitor.PurgeParityFileFilter(conf, codec, srcFs, parityFs,
        parityPrefix, null, entriesProcessed),
      1,
      false);
  FileStatus obsolete = null;
  while ((obsolete = obsoleteParityFileRetriever.next()) !=
            DirectoryTraversal.FINISH_TOKEN) {
    PurgeMonitor.performDelete(parityFs, obsolete.getPath(), false);
  }
}
项目:RDFS    文件:TestParityMovement.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
           "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
           "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:RDFS    文件:TestParityMovement.java   
public void testRenameHar() throws Exception {
  try {
    mySetup("xor", 1);

    Path[] testPathList = new Path[] {
        new Path ("/user/dikang/raidtest/rename/f1"),
        new Path ("/user/dikang/raidtest/rename/f2"),
        new Path ("/user/dikang/raidtest/rename/f3")};

    Path destHarPath = new Path ("/destraid/user/dikang/raidtest/rename");

    DistributedRaidFileSystem raidFs = getRaidFS();
    for (Path srcPath : testPathList) {
      TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
          1, 8, 8192L);
    }

    raidFs.mkdirs(destHarPath);
    raidFs.mkdirs(new Path(destHarPath, "rename" + RaidNode.HAR_SUFFIX));

    raidFs.rename(new Path("/user/dikang/raidtest"), 
        new Path("/user/dikang/raidtest1"));
    fail("Expected fail for HAR rename");
  } catch (IOException ie) {
    String message = ie.getMessage();
    assertTrue(message.contains("HAR dir"));
  } finally {
    stopCluster();
  }
}
项目:RDFS    文件:TestDirectoryRaidDfs.java   
static private DistributedRaidFileSystem getRaidFS(FileSystem fileSys,
    Configuration conf)
    throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:RDFS    文件:TestReadConstruction.java   
private DistributedRaidFileSystem getRaidFS() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
  Configuration clientConf = new Configuration(conf);
  clientConf.set("fs.hdfs.impl", 
      "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
  clientConf.set("fs.raid.underlyingfs.impl", 
      "org.apache.hadoop.hdfs.DistributedFileSystem");
  clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
  URI dfsUri = dfs.getUri();
  return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
}
项目:hadoop-EAR    文件:DatanodeBenThread.java   
private static DistributedFileSystem getDFS(FileSystem fs)
  throws IOException {
  if (fs instanceof DistributedRaidFileSystem)
    fs = ((DistributedRaidFileSystem)fs).getFileSystem();
  return (DistributedFileSystem)fs;
}
项目:hadoop-EAR    文件:RaidShell.java   
public void checkFile(String cmd, String[] args, int startIndex)
    throws IOException {
  if (startIndex >= args.length) {
    printUsage(cmd);
    throw new IllegalArgumentException("Insufficient arguments");
  }
  for (int i = startIndex; i < args.length; i++) {
    Path p = new Path(args[i]);
    FileSystem fs = p.getFileSystem(conf);
    // if we got a raid fs, get the underlying fs 
    if (fs instanceof DistributedRaidFileSystem) {
      fs = ((DistributedRaidFileSystem) fs).getFileSystem();
    }
    // We should be able to cast at this point.
    DistributedFileSystem dfs = (DistributedFileSystem) fs;
    RemoteIterator<Path> corruptIt = dfs.listCorruptFileBlocks(p);
    int count = 0;
    while (corruptIt.hasNext()) {
      count++;
      Path corruptFile = corruptIt.next();
      // Result of checking.
      String result = null;
      FileStatus stat = fs.getFileStatus(corruptFile);
      if (stat.getReplication() < fs.getDefaultReplication()) {
        RaidInfo raidInfo = RaidUtils.getFileRaidInfo(stat, conf);
        if (raidInfo.codec == null) {
          result = "Below default replication but no parity file found";
        } else {
          boolean notRecoverable = isFileCorrupt(dfs, stat);
          if (notRecoverable) {
            result = "Missing too many blocks to be recovered " + 
              "using parity file " + raidInfo.parityPair.getPath();
          } else {
            result = "Has missing blocks but can be read using parity file " +
              raidInfo.parityPair.getPath();
          }
        }
      } else {
        result = "At default replication, not raided";
      }
      out.println("Result of checking " + corruptFile + " : " +
        result);
    }
    out.println("Found " + count + " files with missing blocks");
  }
}
项目:hadoop-EAR    文件:TestParityMovement.java   
public void testRename() throws Exception {
  try {
    mySetup("xor", 1);

    Path srcPath = new Path("/user/dhruba/raidtest/rename/f1");
    Path destPath = new Path("/user/dhruba/raidtest/rename/f2");

    Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f3");
    Path destDirPath = new Path("/user/dhruba/raidtest/rename2");
    Path destPath2 = new Path("/user/dhruba/raidtest/rename2/f3");

    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
                          1, 8, 8192L);
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 
                          1, 8, 8192L);
    DistributedRaidFileSystem raidFs = getRaidFS();
    assertTrue(raidFs.exists(srcPath));
    assertFalse(raidFs.exists(destPath));
    // generate the parity files.
    doRaid(srcPath, Codec.getCodec("xor"));
    doRaid(srcPath2, Codec.getCodec("xor"));

    FileStatus srcStat = fileSys.getFileStatus(srcPath); 
    ParityFilePair parity = ParityFilePair.getParityFile(
        Codec.getCodec("xor"), 
        srcStat, 
        conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    assertFalse(raidFs.exists(destPath));

    // do the rename file
    assertTrue(raidFs.rename(srcPath, destPath));
    // verify the results.
    assertFalse(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(destPath));
    assertFalse(raidFs.exists(srcParityPath));
    FileStatus srcDest = fileSys.getFileStatus(destPath);
    parity = ParityFilePair.getParityFile(Codec.getCodec("xor"),
        srcDest,
        conf);
    assertTrue(raidFs.exists(parity.getPath()));

    // rename the dir
    assertFalse(raidFs.exists(destDirPath));
    assertTrue(raidFs.rename(srcPath2.getParent(), destDirPath));
    // verify the results.
    assertFalse(raidFs.exists(srcPath2.getParent()));
    assertTrue(raidFs.exists(destDirPath));
    FileStatus srcDest2 = fileSys.getFileStatus(destPath2);
    parity = ParityFilePair.getParityFile(Codec.getCodec("xor"),
        srcDest2,
        conf);
    assertTrue(raidFs.exists(parity.getPath()));

    // try to rename not existed file.
    Path notExistedPath = new Path("/user/dhruba/raidtest/raidnotexist");
    Path notExistedPath2 = new Path("/user/dhruba/raidtest/raidnotexist2");
    assertFalse(raidFs.rename(notExistedPath, notExistedPath2));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestParityMovement.java   
public void testIgnoreCheckingParity() throws Exception {
  try {
    mySetup("xor", 1);
    Path srcPath = new Path("/tmp/raidtest/delete/f1");
    Path srcPath2 = new Path("/tmp/raidtest/rename/f2");
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
        1, 8, 8192L);
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 
        1, 8, 8192L);
    DistributedRaidFileSystem raidFs = getRaidFS();
    assertTrue(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcPath2));

    // generate the parity files.
    doRaid(srcPath, Codec.getCodec("xor"));
    doRaid(srcPath2, Codec.getCodec("xor"));

    FileStatus srcStat = fileSys.getFileStatus(srcPath);
    FileStatus srcStat2 = fileSys.getFileStatus(srcPath2);
    ParityFilePair parity = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcStat, 
        conf);
    Path srcParityPath = parity.getPath();
    ParityFilePair parity2 = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcStat2, 
        conf);
    Path srcParityPath2 = parity2.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // test delete file
    raidFs.delete(srcPath);
    // verify we did not delete the parityPath
    assertFalse(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcParityPath));

    // test rename file
    raidFs.rename(srcPath2, new Path("/tmp/raidtest/rename/f3"));
    // verify we did not rename the parityPath
    assertFalse(raidFs.exists(srcPath2));
    assertTrue(raidFs.exists(srcParityPath2));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestParityMovement.java   
public void testIgnoreCheckingParity2() throws Exception {
  try {
    mySetup("xor", 1);
    conf.set(DistributedRaidFileSystem.DIRECTORIES_IGNORE_PARITY_CHECKING_KEY,
        "/ignore1/test/,/ignore2/test/");
    Path srcPath = new Path("/ignore1/test/rename/f1");
    Path srcPath2 = new Path("/ignore2/test/rename/f1");
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
        1, 8, 8192L);
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 
        1, 8, 8192L);
    DistributedRaidFileSystem raidFs = getRaidFS();
    assertTrue(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcPath2));

    // generate the parity files.
    doRaid(srcPath, Codec.getCodec("xor"));
    doRaid(srcPath2, Codec.getCodec("xor"));

    FileStatus srcStat = fileSys.getFileStatus(srcPath);
    FileStatus srcStat2 = fileSys.getFileStatus(srcPath2);
    ParityFilePair parity = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcStat, 
        conf);
    Path srcParityPath = parity.getPath();
    ParityFilePair parity2 = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcStat2, 
        conf);
    Path srcParityPath2 = parity2.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // test rename files
    raidFs.rename(srcPath, new Path("/ignore1/test/rename/f2"));
    raidFs.rename(srcPath2, new Path("/ignore2/test/rename/f2"));
    // verify we did not rename the parityPath
    assertFalse(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcParityPath));
    assertFalse(raidFs.exists(srcPath2));
    assertTrue(raidFs.exists(srcParityPath2));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
public void testRenameOneFile() throws Exception {
  try {
    long[] crcs = new long[3];
    int[] seeds = new int[3];
    short repl = 1;
    Path dirPath = new Path("/user/dikang/raidtest");

    mySetup();
    DistributedRaidFileSystem raidFs = getRaidFS();
    Path[] files = TestRaidDfs.createTestFiles(dirPath,
        fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
    FileStatus stat = raidFs.getFileStatus(dirPath); 
    Codec codec = Codec.getCodec("dir-rs");
    RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec,
        new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    Path destPath = new Path("/user/dikang/raidtest_new");

    assertTrue(raidFs.exists(dirPath));
    assertFalse(raidFs.exists(destPath));

    ParityFilePair parity = ParityFilePair.getParityFile(
        codec, stat, conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));

    raidFs.mkdirs(destPath);
    // do the rename file
    assertTrue(raidFs.rename(files[0], new Path(destPath, "file0")));
    // verify the results.
    assertFalse(raidFs.exists(files[0]));
    assertTrue(raidFs.exists(new Path(destPath, "file0")));
    assertTrue(raidFs.exists(srcParityPath));

    // rename the left files
    assertTrue(raidFs.rename(files[1], new Path(destPath, "file1")));
    assertTrue(raidFs.rename(files[2], new Path(destPath, "file2")));
    assertFalse(raidFs.exists(srcParityPath));

    Path newParityPath = new Path(codec.parityDirectory, 
        "user/dikang/raidtest_new");
    assertTrue(raidFs.exists(newParityPath));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidParityMovement.java   
public void testDeleteAndUndelete() throws Exception {
  try {
    long[] crcs = new long[3];
    int[] seeds = new int[3];
    short repl = 1;
    Path dirPath = new Path("/user/dikang/raidtest");

    mySetup();
    DistributedRaidFileSystem raidFs = getRaidFS();
    Path[] files = TestRaidDfs.createTestFiles(dirPath,
        fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
    FileStatus stat = raidFs.getFileStatus(dirPath); 
    Codec codec = Codec.getCodec("dir-rs");
    RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec,
        new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    ParityFilePair parity = ParityFilePair.getParityFile(
        codec, stat, conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));

    // do the delete file
    assertTrue(raidFs.delete(dirPath));

    // verify the results.
    assertFalse(raidFs.exists(dirPath));
    assertFalse(raidFs.exists(srcParityPath));

    // do the undelete using non-exist userName
    String nonExistedUser = UUID.randomUUID().toString();
    assertFalse(raidFs.undelete(dirPath, nonExistedUser));

    // verify the results
    assertFalse(raidFs.exists(dirPath));
    assertFalse(raidFs.exists(srcParityPath));

    // do the undelete file using current userName
    assertTrue(raidFs.undelete(dirPath, null));
    //verify the results.
    assertTrue(raidFs.exists(dirPath));
    assertTrue(raidFs.exists(srcParityPath));
  } finally {
    stopCluster();
  }
}
项目:hadoop-EAR    文件:TestReadConstruction.java   
private boolean doThePartialTest(Codec codec,
                              int blockNum,
                              int[] corruptBlockIdxs) throws Exception {
  long blockSize = 8192 * 1024L;
  int bufferSize = 4192 * 1024;

  Path srcPath = new Path("/user/dikang/raidtest/file" + 
                          UUID.randomUUID().toString());

  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
      1, blockNum, blockSize);

  DistributedRaidFileSystem raidFs = getRaidFS();
  assertTrue(raidFs.exists(srcPath));

  // generate the parity files.
  doRaid(srcPath, codec);

  FileStatus file1Stat = fileSys.getFileStatus(srcPath);
  long length = file1Stat.getLen();
  LocatedBlocks file1Loc =
      RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
          srcPath.toUri().getPath(),
          0, length);
  // corrupt file1

  for (int idx: corruptBlockIdxs) {
    corruptBlock(file1Loc.get(idx).getBlock(), 
                              dfs);
  }
  RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath,
                       corruptBlockIdxs, blockSize);

  // verify the partial read
  byte[] buffer = new byte[bufferSize];
  FSDataInputStream in = raidFs.open(srcPath);

  long numRead = 0;
  CRC32 newcrc = new CRC32();

  int num = 0;
  while (num >= 0) {
    num = in.read(numRead, buffer, 0, bufferSize);
    if (num < 0) {
      break;
    }
    numRead += num;
    newcrc.update(buffer, 0, num);
  }
  in.close();

  if (numRead != length) {
    LOG.info("Number of bytes read " + numRead +
             " does not match file size " + length);
    return false;
  }

  LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
  if (newcrc.getValue() != crc) {
    LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + 
              newcrc.getValue() + " vs. " + crc);
    return false;
  }
  return true;
}
项目:RDFS    文件:DatanodeBenThread.java   
private static DistributedFileSystem getDFS(FileSystem fs)
  throws IOException {
  if (fs instanceof DistributedRaidFileSystem)
    fs = ((DistributedRaidFileSystem)fs).getFileSystem();
  return (DistributedFileSystem)fs;
}
项目:RDFS    文件:RaidShell.java   
/**
 * gets the parity blocks corresponding to file
 * returns the parity blocks in case of DFS
 * and the part blocks containing parity blocks
 * in case of HAR FS
 */
private BlockLocation[] getParityBlocks(final Path filePath,
                                        final long blockSize,
                                        final long numStripes,
                                        final RaidInfo raidInfo) 
  throws IOException {


  final String parityPathStr = raidInfo.parityPair.getPath().toUri().
    getPath();
  FileSystem parityFS = raidInfo.parityPair.getFileSystem();

  // get parity file metadata
  FileStatus parityFileStatus = parityFS.
    getFileStatus(new Path(parityPathStr));
  long parityFileLength = parityFileStatus.getLen();

  if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe *
      blockSize) {
    throw new IOException("expected parity file of length" + 
                          (numStripes * raidInfo.parityBlocksPerStripe *
                           blockSize) +
                          " but got parity file of length " + 
                          parityFileLength);
  }

  BlockLocation[] parityBlocks = 
    parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);

  if (parityFS instanceof DistributedFileSystem ||
      parityFS instanceof DistributedRaidFileSystem) {
    long parityBlockSize = parityFileStatus.getBlockSize();
    if (parityBlockSize != blockSize) {
      throw new IOException("file block size is " + blockSize + 
                            " but parity file block size is " + 
                            parityBlockSize);
    }
  } else if (parityFS instanceof HarFileSystem) {
    LOG.debug("HAR FS found");
  } else {
    LOG.warn("parity file system is not of a supported type");
  }

  return parityBlocks;
}
项目:RDFS    文件:RaidShell.java   
public void checkFile(String cmd, String[] args, int startIndex)
    throws IOException {
  if (startIndex >= args.length) {
    printUsage(cmd);
    throw new IllegalArgumentException("Insufficient arguments");
  }
  for (int i = startIndex; i < args.length; i++) {
    Path p = new Path(args[i]);
    FileSystem fs = p.getFileSystem(conf);
    // if we got a raid fs, get the underlying fs 
    if (fs instanceof DistributedRaidFileSystem) {
      fs = ((DistributedRaidFileSystem) fs).getFileSystem();
    }
    // We should be able to cast at this point.
    DistributedFileSystem dfs = (DistributedFileSystem) fs;
    RemoteIterator<Path> corruptIt = dfs.listCorruptFileBlocks(p);
    int count = 0;
    while (corruptIt.hasNext()) {
      count++;
      Path corruptFile = corruptIt.next();
      // Result of checking.
      String result = null;
      FileStatus stat = fs.getFileStatus(p);
      if (stat.getReplication() < fs.getDefaultReplication()) {
        RaidInfo raidInfo = getFileRaidInfo(corruptFile);
        if (raidInfo.codec == null) {
          result = "Below default replication but no parity file found";
        } else {
          boolean notRecoverable = isFileCorrupt(dfs, corruptFile);
          if (notRecoverable) {
            result = "Missing too many blocks to be recovered " + 
              "using parity file " + raidInfo.parityPair.getPath();
          } else {
            result = "Has missing blocks but can be read using parity file " +
              raidInfo.parityPair.getPath();
          }
        }
      } else {
        result = "At default replication, not raided";
      }
      out.println("Result of checking " + corruptFile + " : " +
        result);
    }
    out.println("Found " + count + " files with missing blocks");
  }
}
项目:RDFS    文件:TestParityMovement.java   
public void testRename() throws Exception {
  try {
    mySetup("xor", 1);

    Path srcPath = new Path("/user/dhruba/raidtest/rename/f1");
    Path destPath = new Path("/user/dhruba/raidtest/rename/f2");

    Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f3");
    Path destDirPath = new Path("/user/dhruba/raidtest/rename2");
    Path destPath2 = new Path("/user/dhruba/raidtest/rename2/f3");

    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
                          1, 8, 8192L);
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 
                          1, 8, 8192L);
    DistributedRaidFileSystem raidFs = getRaidFS();
    assertTrue(raidFs.exists(srcPath));
    assertFalse(raidFs.exists(destPath));
    // generate the parity files.
    doRaid(srcPath, Codec.getCodec("xor"));
    doRaid(srcPath2, Codec.getCodec("xor"));
    ParityFilePair parity = ParityFilePair.getParityFile(
        Codec.getCodec("xor"), 
        srcPath, 
        conf);
    Path srcParityPath = parity.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    parity = ParityFilePair.getParityFile(Codec.getCodec("xor"),
        destPath,
        conf);
    assertNull(parity);
    // do the rename file
    assertTrue(raidFs.rename(srcPath, destPath));
    // verify the results.
    assertFalse(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(destPath));
    assertFalse(raidFs.exists(srcParityPath));
    parity = ParityFilePair.getParityFile(Codec.getCodec("xor"),
        destPath,
        conf);
    assertTrue(raidFs.exists(parity.getPath()));

    // rename the dir
    assertFalse(raidFs.exists(destDirPath));
    assertTrue(raidFs.rename(srcPath2.getParent(), destDirPath));
    // verify the results.
    assertFalse(raidFs.exists(srcPath2.getParent()));
    assertTrue(raidFs.exists(destDirPath));

    parity = ParityFilePair.getParityFile(Codec.getCodec("xor"),
        destPath2,
        conf);
    assertTrue(raidFs.exists(parity.getPath()));
  } finally {
    stopCluster();
  }
}
项目:RDFS    文件:TestParityMovement.java   
public void testDeleteAndUndelete() throws Exception {
  try {
    mySetup("xor", 1);

    Path srcPath = new Path("/user/dhruba/raidtest/rename/f1");
    Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f2");

    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
        1, 8, 8192L);
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 
        1, 8, 8192L);
    DistributedRaidFileSystem raidFs = getRaidFS();
    assertTrue(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcPath2));

    // generate the parity files.
    doRaid(srcPath, Codec.getCodec("xor"));
    doRaid(srcPath2, Codec.getCodec("xor"));
    ParityFilePair parity = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcPath, 
        conf);
    Path srcParityPath = parity.getPath();
    ParityFilePair parity2 = ParityFilePair.getParityFile(
        Codec.getCodec("xor"),
        srcPath2, 
        conf);
    Path srcParityPath2 = parity2.getPath();
    assertTrue(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // do the delete file
    assertTrue(raidFs.delete(srcPath));

    // verify the results.
    assertFalse(raidFs.exists(srcPath));
    assertFalse(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // do the undelete using non-exist userName
    String nonExistedUser = UUID.randomUUID().toString();
    assertFalse(raidFs.undelete(srcPath, nonExistedUser));

    // verify the results
    assertFalse(raidFs.exists(srcPath));
    assertFalse(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // do the undelete file using current userName
    assertTrue(raidFs.undelete(srcPath, null));
    //verify the results.
    assertTrue(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));

    // delete the dir
    assertTrue(raidFs.delete(srcPath2.getParent()));
    // verify the results.
    assertFalse(raidFs.exists(srcPath2.getParent()));
    assertFalse(raidFs.exists(srcParityPath));
    assertFalse(raidFs.exists(srcParityPath2));
    assertFalse(raidFs.exists(srcParityPath.getParent()));

    // undelete the dir
    assertTrue(raidFs.undelete(srcPath2.getParent(), null));
    // verify the results.
    assertTrue(raidFs.exists(srcPath));
    assertTrue(raidFs.exists(srcPath2));
    assertTrue(raidFs.exists(srcParityPath));
    assertTrue(raidFs.exists(srcParityPath2));
  } finally {
    stopCluster();
  }
}
项目:RDFS    文件:TestReadConstruction.java   
private boolean doThePartialTest(Codec codec,
                              int blockNum,
                              int[] corruptBlockIdxs) throws Exception {
  long blockSize = 8192L;
  int bufferSize = 4192;

  Path srcPath = new Path("/user/dikang/raidtest/file" + 
                          UUID.randomUUID().toString());

  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
      1, blockNum, blockSize);

  DistributedRaidFileSystem raidFs = getRaidFS();
  assertTrue(raidFs.exists(srcPath));

  // generate the parity files.
  doRaid(srcPath, codec);

  FileStatus file1Stat = fileSys.getFileStatus(srcPath);
  long length = file1Stat.getLen();
  LocatedBlocks file1Loc =
      RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
          srcPath.toUri().getPath(),
          0, length);
  // corrupt file1

  for (int idx: corruptBlockIdxs) {
    corruptBlock(file1Loc.get(idx).getBlock().getBlockName(), 
                              dfs);
  }
  RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath,
                       corruptBlockIdxs, blockSize);

  // verify the partial read
  byte[] buffer = new byte[bufferSize];
  FSDataInputStream in = raidFs.open(srcPath);

  long numRead = 0;
  CRC32 newcrc = new CRC32();

  int num = 0;
  while (num >= 0) {
    num = in.read(numRead, buffer, 0, bufferSize);
    if (num < 0) {
      break;
    }
    numRead += num;
    newcrc.update(buffer, 0, num);
  }
  in.close();

  if (numRead != length) {
    LOG.info("Number of bytes read " + numRead +
             " does not match file size " + length);
    return false;
  }

  LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
  if (newcrc.getValue() != crc) {
    LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + 
              newcrc + " vs. " + crc);
    return false;
  }
  return true;
}
项目:mapreduce-fork    文件:RaidShell.java   
/**
 * gets the parity blocks corresponding to file
 * returns the parity blocks in case of DFS
 * and the part blocks containing parity blocks
 * in case of HAR FS
 */
private BlockLocation[] getParityBlocks(final Path filePath,
                                        final long blockSize,
                                        final long fileStripes,
                                        final RaidInfo raidInfo)
  throws IOException {


  final String parityPathStr = raidInfo.parityPair.getPath().toUri().
    getPath();
  FileSystem parityFS = raidInfo.parityPair.getFileSystem();

  // get parity file metadata
  FileStatus parityFileStatus = parityFS.
    getFileStatus(new Path(parityPathStr));
  long parityFileLength = parityFileStatus.getLen();

  if (parityFileLength != fileStripes * raidInfo.parityBlocksPerStripe *
      blockSize) {
    throw new IOException("expected parity file of length" +
                          (fileStripes * raidInfo.parityBlocksPerStripe *
                           blockSize) +
                          " but got parity file of length " +
                          parityFileLength);
  }

  BlockLocation[] parityBlocks =
    parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);

  if (parityFS instanceof DistributedFileSystem ||
      parityFS instanceof DistributedRaidFileSystem) {
    long parityBlockSize = parityFileStatus.getBlockSize();
    if (parityBlockSize != blockSize) {
      throw new IOException("file block size is " + blockSize +
                            " but parity file block size is " +
                            parityBlockSize);
    }
  } else if (parityFS instanceof HarFileSystem) {
    LOG.debug("HAR FS found");
  } else {
    LOG.warn("parity file system is not of a supported type");
  }

  return parityBlocks;
}
项目:mapreduce-fork    文件:RaidShell.java   
/**
 * checks the raided file system, prints a list of corrupt files to
 * System.out and returns the number of corrupt files
 */
public int fsck(final String path) throws IOException {

  FileSystem fs = (new Path(path)).getFileSystem(conf);

  // if we got a raid fs, get the underlying fs
  if (fs instanceof DistributedRaidFileSystem) {
    fs = ((DistributedRaidFileSystem) fs).getFileSystem();
  }

  // check that we have a distributed fs
  if (!(fs instanceof DistributedFileSystem)) {
    throw new IOException("expected DistributedFileSystem but got " +
              fs.getClass().getName());
  }
  final DistributedFileSystem dfs = (DistributedFileSystem) fs;

  // get conf settings
  String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
  String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
  if (!xorPrefix.endsWith("/")) {
    xorPrefix = xorPrefix + "/";
  }
  if (!rsPrefix.endsWith("/")) {
    rsPrefix = rsPrefix + "/";
  }
  LOG.debug("prefixes: " + xorPrefix + ", " + rsPrefix);

  // get a list of corrupted files (not considering parity blocks just yet)
  // from the name node
  // these are the only files we need to consider:
  // if a file has no corrupted data blocks, it is OK even if some
  // of its parity blocks are corrupted, so no further checking is
  // necessary
  final String[] files = RaidDFSUtil.getCorruptFiles(dfs);
  final List<Path> corruptFileCandidates = new LinkedList<Path>();
  for (final String f: files) {
    final Path p = new Path(f);
    // if this file is a parity file
    // or if it does not start with the specified path,
    // ignore it
    if (!p.toString().startsWith(xorPrefix) &&
        !p.toString().startsWith(rsPrefix) &&
        p.toString().startsWith(path)) {
      corruptFileCandidates.add(p);
    }
  }
  // filter files marked for deletion
  RaidUtils.filterTrash(conf, corruptFileCandidates);

  int numberOfCorruptFiles = 0;

  for (final Path corruptFileCandidate: corruptFileCandidates) {
    if (isFileCorrupt(dfs, corruptFileCandidate)) {
      System.out.println(corruptFileCandidate.toString());
      numberOfCorruptFiles++;
    }
  }

  return numberOfCorruptFiles;
}