/** * Helper function of Encoder. */ public List<List<Block>> getSrcStripes() { List<List<Block>> srcStripes = new ArrayList<List<Block>>(); for (int i=0;i<srcStripeList.size();i++) { List<BlockInfo> biList=srcStripeList.get(i); List curSrcStripe = new ArrayList<Block>(); for (int j=0;j<biList.size();j++) { int fileIdx = biList.get(j).fileIdx; int blockId = biList.get(j).blockId; FileStatus curFs = lfs.get(fileIdx); try { if (fs instanceof DistributedRaidFileSystem) { curSrcStripe.add( ((DistributedRaidFileSystem)fs).toDistributedFileSystem().getLocatedBlocks(curFs.getPath(), 0L, curFs.getLen()).get(blockId).getBlock()); } } catch (IOException e) { ; } } srcStripes.add(curSrcStripe); } return srcStripes; }
public GenThread(Configuration conf, Path input, Path output, RunTimeConstants rtc) throws IOException { this.inputPath = input; this.outputPath = output; this.fs = FileSystem.newInstance(conf); if (fs instanceof DistributedRaidFileSystem) { fs = ((DistributedRaidFileSystem)fs).getFileSystem(); } this.buffer = new byte[rtc.buffer_size]; if (test_buffer_size > rtc.buffer_size) test_buffer_size = rtc.buffer_size; }
public double getEffectiveReplication() { if (lastRaidStatistics == null) { return -1; } DFSClient dfs; double totalPhysical; try { /* Add by RH start */ if(FileSystem.get(conf) instanceof DistributedRaidFileSystem){ dfs = ((DistributedRaidFileSystem)FileSystem.get(conf)).getClient(); }else{ dfs = ((DistributedFileSystem)FileSystem.get(conf)).getClient(); } /* Add by RH end */ /* Commented by RH start */ //dfs = ((DistributedFileSystem)FileSystem.get(conf)).getClient(); /* Commented by RH end */ totalPhysical = dfs.getNSDiskStatus().getDfsUsed(); } catch (IOException e) { return -1; } double notRaidedPhysical = totalPhysical; double totalLogical = 0; for (Codec codec : Codec.getCodecs()) { String code = codec.id; Statistics st = lastRaidStatistics.get(code); totalLogical += st.getSourceCounters(RaidState.RAIDED).getNumLogical(); notRaidedPhysical -= st.getSourceCounters(RaidState.RAIDED).getNumBytes(); notRaidedPhysical -= st.getParityCounters().getNumBytes(); } totalLogical += notRaidedPhysical / dfs.getDefaultReplication(); if (totalLogical == 0) { // divided by 0 return -1; } return totalPhysical / totalLogical; }
public void purgeParity(String cmd, String[] args, int startIndex) throws IOException { if (startIndex + 1 >= args.length) { printUsage(cmd); throw new IllegalArgumentException("Insufficient arguments"); } Path parityPath = new Path(args[startIndex]); AtomicLong entriesProcessed = new AtomicLong(0); System.err.println("Starting recursive purge of " + parityPath); Codec codec = Codec.getCodec(args[startIndex + 1]); FileSystem srcFs = parityPath.getFileSystem(conf); if (srcFs instanceof DistributedRaidFileSystem) { srcFs = ((DistributedRaidFileSystem)srcFs).getFileSystem(); } FileSystem parityFs = srcFs; String parityPrefix = codec.parityDirectory; DirectoryTraversal obsoleteParityFileRetriever = new DirectoryTraversal( "Purge File ", java.util.Collections.singletonList(parityPath), parityFs, new PurgeMonitor.PurgeParityFileFilter(conf, codec, null, srcFs, parityFs, parityPrefix, null, entriesProcessed), 1, false); FileStatus obsolete = null; while ((obsolete = obsoleteParityFileRetriever.next()) != DirectoryTraversal.FINISH_TOKEN) { PurgeMonitor.performDelete(parityFs, obsolete.getPath(), false); } }
/** * gets the parity blocks corresponding to file * returns the parity blocks in case of DFS * and the part blocks containing parity blocks * in case of HAR FS */ private static BlockLocation[] getParityBlocks(final Path filePath, final long blockSize, final long numStripes, final RaidInfo raidInfo) throws IOException { FileSystem parityFS = raidInfo.parityPair.getFileSystem(); // get parity file metadata FileStatus parityFileStatus = raidInfo.parityPair.getFileStatus(); long parityFileLength = parityFileStatus.getLen(); if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe * blockSize) { throw new IOException("expected parity file of length" + (numStripes * raidInfo.parityBlocksPerStripe * blockSize) + " but got parity file of length " + parityFileLength); } BlockLocation[] parityBlocks = parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength); if (parityFS instanceof DistributedFileSystem || parityFS instanceof DistributedRaidFileSystem) { long parityBlockSize = parityFileStatus.getBlockSize(); if (parityBlockSize != blockSize) { throw new IOException("file block size is " + blockSize + " but parity file block size is " + parityBlockSize); } } else if (parityFS instanceof HarFileSystem) { LOG.debug("HAR FS found"); } else { LOG.warn("parity file system is not of a supported type"); } return parityBlocks; }
private DistributedRaidFileSystem getRaidFS() throws IOException { DistributedFileSystem dfs = (DistributedFileSystem)fileSys; Configuration clientConf = new Configuration(conf); clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); URI dfsUri = dfs.getUri(); return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf); }
public void testRenameHar() throws Exception { try { mySetup("xor", 1); Path[] testPathList = new Path[] { new Path ("/user/dikang/raidtest/rename/f1"), new Path ("/user/dikang/raidtest/rename/f2"), new Path ("/user/dikang/raidtest/rename/f3")}; Path destHarPath = new Path ("/destraid/user/dikang/raidtest/rename"); DistributedRaidFileSystem raidFs = getRaidFS(); for (Path srcPath : testPathList) { TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); } raidFs.mkdirs(destHarPath); raidFs.mkdirs(new Path(destHarPath, "rename" + RaidNode.HAR_SUFFIX)); raidFs.rename(new Path("/user/dikang/raidtest"), new Path("/user/dikang/raidtest1")); fail("Expected fail for HAR rename"); } catch (IOException ie) { String message = ie.getMessage(); assertTrue(message.contains("HAR dir")); } finally { stopCluster(); } }
public void testRename() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Path destPath = new Path("/user/dikang/raidtest_new"); assertTrue(raidFs.exists(dirPath)); assertFalse(raidFs.exists(destPath)); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // do the rename file assertTrue(raidFs.rename(dirPath, destPath)); // verify the results. assertFalse(raidFs.exists(dirPath)); assertTrue(raidFs.exists(destPath)); assertFalse(raidFs.exists(srcParityPath)); FileStatus srcDest = raidFs.getFileStatus(destPath); parity = ParityFilePair.getParityFile(codec, srcDest, conf); assertTrue(raidFs.exists(parity.getPath())); } finally { stopCluster(); } }
public void testDeleteOneFile() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // delete one file assertTrue(raidFs.delete(files[0])); // verify the results assertFalse(raidFs.exists(files[0])); // we still have the parity file assertTrue(raidFs.exists(srcParityPath)); // delete the left files assertTrue(raidFs.delete(files[1])); assertTrue(raidFs.delete(files[2])); // we will not touch the parity file. assertTrue(raidFs.exists(srcParityPath)); } finally { stopCluster(); } }
public void testDeleteDirRaidedFile() throws Exception { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); //disable trash conf.setInt("fs.trash.interval", 0); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); try { raidFs.delete(files[0]); fail(); } catch (Exception ex) { LOG.warn("Excepted error: " + ex.getMessage(), ex); } finally { stopCluster(); } }
static private DistributedRaidFileSystem getRaidFS(FileSystem fileSys, Configuration conf) throws IOException { DistributedFileSystem dfs = (DistributedFileSystem)fileSys; Configuration clientConf = new Configuration(conf); clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); URI dfsUri = dfs.getUri(); return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf); }
public void purgeParity(String cmd, String[] args, int startIndex) throws IOException { if (startIndex + 1 >= args.length) { printUsage(cmd); throw new IllegalArgumentException("Insufficient arguments"); } Path parityPath = new Path(args[startIndex]); AtomicLong entriesProcessed = new AtomicLong(0); System.err.println("Starting recursive purge of " + parityPath); Codec codec = Codec.getCodec(args[startIndex + 1]); FileSystem srcFs = parityPath.getFileSystem(conf); if (srcFs instanceof DistributedRaidFileSystem) { srcFs = ((DistributedRaidFileSystem)srcFs).getFileSystem(); } FileSystem parityFs = srcFs; String parityPrefix = codec.parityDirectory; DirectoryTraversal obsoleteParityFileRetriever = new DirectoryTraversal( "Purge File ", java.util.Collections.singletonList(parityPath), parityFs, new PurgeMonitor.PurgeParityFileFilter(conf, codec, srcFs, parityFs, parityPrefix, null, entriesProcessed), 1, false); FileStatus obsolete = null; while ((obsolete = obsoleteParityFileRetriever.next()) != DirectoryTraversal.FINISH_TOKEN) { PurgeMonitor.performDelete(parityFs, obsolete.getPath(), false); } }
private static DistributedFileSystem getDFS(FileSystem fs) throws IOException { if (fs instanceof DistributedRaidFileSystem) fs = ((DistributedRaidFileSystem)fs).getFileSystem(); return (DistributedFileSystem)fs; }
public void checkFile(String cmd, String[] args, int startIndex) throws IOException { if (startIndex >= args.length) { printUsage(cmd); throw new IllegalArgumentException("Insufficient arguments"); } for (int i = startIndex; i < args.length; i++) { Path p = new Path(args[i]); FileSystem fs = p.getFileSystem(conf); // if we got a raid fs, get the underlying fs if (fs instanceof DistributedRaidFileSystem) { fs = ((DistributedRaidFileSystem) fs).getFileSystem(); } // We should be able to cast at this point. DistributedFileSystem dfs = (DistributedFileSystem) fs; RemoteIterator<Path> corruptIt = dfs.listCorruptFileBlocks(p); int count = 0; while (corruptIt.hasNext()) { count++; Path corruptFile = corruptIt.next(); // Result of checking. String result = null; FileStatus stat = fs.getFileStatus(corruptFile); if (stat.getReplication() < fs.getDefaultReplication()) { RaidInfo raidInfo = RaidUtils.getFileRaidInfo(stat, conf); if (raidInfo.codec == null) { result = "Below default replication but no parity file found"; } else { boolean notRecoverable = isFileCorrupt(dfs, stat); if (notRecoverable) { result = "Missing too many blocks to be recovered " + "using parity file " + raidInfo.parityPair.getPath(); } else { result = "Has missing blocks but can be read using parity file " + raidInfo.parityPair.getPath(); } } } else { result = "At default replication, not raided"; } out.println("Result of checking " + corruptFile + " : " + result); } out.println("Found " + count + " files with missing blocks"); } }
public void testRename() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/user/dhruba/raidtest/rename/f1"); Path destPath = new Path("/user/dhruba/raidtest/rename/f2"); Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f3"); Path destDirPath = new Path("/user/dhruba/raidtest/rename2"); Path destPath2 = new Path("/user/dhruba/raidtest/rename2/f3"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertFalse(raidFs.exists(destPath)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertFalse(raidFs.exists(destPath)); // do the rename file assertTrue(raidFs.rename(srcPath, destPath)); // verify the results. assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(destPath)); assertFalse(raidFs.exists(srcParityPath)); FileStatus srcDest = fileSys.getFileStatus(destPath); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), srcDest, conf); assertTrue(raidFs.exists(parity.getPath())); // rename the dir assertFalse(raidFs.exists(destDirPath)); assertTrue(raidFs.rename(srcPath2.getParent(), destDirPath)); // verify the results. assertFalse(raidFs.exists(srcPath2.getParent())); assertTrue(raidFs.exists(destDirPath)); FileStatus srcDest2 = fileSys.getFileStatus(destPath2); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), srcDest2, conf); assertTrue(raidFs.exists(parity.getPath())); // try to rename not existed file. Path notExistedPath = new Path("/user/dhruba/raidtest/raidnotexist"); Path notExistedPath2 = new Path("/user/dhruba/raidtest/raidnotexist2"); assertFalse(raidFs.rename(notExistedPath, notExistedPath2)); } finally { stopCluster(); } }
public void testIgnoreCheckingParity() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/tmp/raidtest/delete/f1"); Path srcPath2 = new Path("/tmp/raidtest/rename/f2"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); FileStatus srcStat2 = fileSys.getFileStatus(srcPath2); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); ParityFilePair parity2 = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat2, conf); Path srcParityPath2 = parity2.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // test delete file raidFs.delete(srcPath); // verify we did not delete the parityPath assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcParityPath)); // test rename file raidFs.rename(srcPath2, new Path("/tmp/raidtest/rename/f3")); // verify we did not rename the parityPath assertFalse(raidFs.exists(srcPath2)); assertTrue(raidFs.exists(srcParityPath2)); } finally { stopCluster(); } }
public void testIgnoreCheckingParity2() throws Exception { try { mySetup("xor", 1); conf.set(DistributedRaidFileSystem.DIRECTORIES_IGNORE_PARITY_CHECKING_KEY, "/ignore1/test/,/ignore2/test/"); Path srcPath = new Path("/ignore1/test/rename/f1"); Path srcPath2 = new Path("/ignore2/test/rename/f1"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); FileStatus srcStat2 = fileSys.getFileStatus(srcPath2); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); ParityFilePair parity2 = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat2, conf); Path srcParityPath2 = parity2.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // test rename files raidFs.rename(srcPath, new Path("/ignore1/test/rename/f2")); raidFs.rename(srcPath2, new Path("/ignore2/test/rename/f2")); // verify we did not rename the parityPath assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcParityPath)); assertFalse(raidFs.exists(srcPath2)); assertTrue(raidFs.exists(srcParityPath2)); } finally { stopCluster(); } }
public void testRenameOneFile() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Path destPath = new Path("/user/dikang/raidtest_new"); assertTrue(raidFs.exists(dirPath)); assertFalse(raidFs.exists(destPath)); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); raidFs.mkdirs(destPath); // do the rename file assertTrue(raidFs.rename(files[0], new Path(destPath, "file0"))); // verify the results. assertFalse(raidFs.exists(files[0])); assertTrue(raidFs.exists(new Path(destPath, "file0"))); assertTrue(raidFs.exists(srcParityPath)); // rename the left files assertTrue(raidFs.rename(files[1], new Path(destPath, "file1"))); assertTrue(raidFs.rename(files[2], new Path(destPath, "file2"))); assertFalse(raidFs.exists(srcParityPath)); Path newParityPath = new Path(codec.parityDirectory, "user/dikang/raidtest_new"); assertTrue(raidFs.exists(newParityPath)); } finally { stopCluster(); } }
public void testDeleteAndUndelete() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // do the delete file assertTrue(raidFs.delete(dirPath)); // verify the results. assertFalse(raidFs.exists(dirPath)); assertFalse(raidFs.exists(srcParityPath)); // do the undelete using non-exist userName String nonExistedUser = UUID.randomUUID().toString(); assertFalse(raidFs.undelete(dirPath, nonExistedUser)); // verify the results assertFalse(raidFs.exists(dirPath)); assertFalse(raidFs.exists(srcParityPath)); // do the undelete file using current userName assertTrue(raidFs.undelete(dirPath, null)); //verify the results. assertTrue(raidFs.exists(dirPath)); assertTrue(raidFs.exists(srcParityPath)); } finally { stopCluster(); } }
private boolean doThePartialTest(Codec codec, int blockNum, int[] corruptBlockIdxs) throws Exception { long blockSize = 8192 * 1024L; int bufferSize = 4192 * 1024; Path srcPath = new Path("/user/dikang/raidtest/file" + UUID.randomUUID().toString()); long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, blockNum, blockSize); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); // generate the parity files. doRaid(srcPath, codec); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file1 for (int idx: corruptBlockIdxs) { corruptBlock(file1Loc.get(idx).getBlock(), dfs); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); // verify the partial read byte[] buffer = new byte[bufferSize]; FSDataInputStream in = raidFs.open(srcPath); long numRead = 0; CRC32 newcrc = new CRC32(); int num = 0; while (num >= 0) { num = in.read(numRead, buffer, 0, bufferSize); if (num < 0) { break; } numRead += num; newcrc.update(buffer, 0, num); } in.close(); if (numRead != length) { LOG.info("Number of bytes read " + numRead + " does not match file size " + length); return false; } LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc); if (newcrc.getValue() != crc) { LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + newcrc.getValue() + " vs. " + crc); return false; } return true; }
/** * gets the parity blocks corresponding to file * returns the parity blocks in case of DFS * and the part blocks containing parity blocks * in case of HAR FS */ private BlockLocation[] getParityBlocks(final Path filePath, final long blockSize, final long numStripes, final RaidInfo raidInfo) throws IOException { final String parityPathStr = raidInfo.parityPair.getPath().toUri(). getPath(); FileSystem parityFS = raidInfo.parityPair.getFileSystem(); // get parity file metadata FileStatus parityFileStatus = parityFS. getFileStatus(new Path(parityPathStr)); long parityFileLength = parityFileStatus.getLen(); if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe * blockSize) { throw new IOException("expected parity file of length" + (numStripes * raidInfo.parityBlocksPerStripe * blockSize) + " but got parity file of length " + parityFileLength); } BlockLocation[] parityBlocks = parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength); if (parityFS instanceof DistributedFileSystem || parityFS instanceof DistributedRaidFileSystem) { long parityBlockSize = parityFileStatus.getBlockSize(); if (parityBlockSize != blockSize) { throw new IOException("file block size is " + blockSize + " but parity file block size is " + parityBlockSize); } } else if (parityFS instanceof HarFileSystem) { LOG.debug("HAR FS found"); } else { LOG.warn("parity file system is not of a supported type"); } return parityBlocks; }
public void checkFile(String cmd, String[] args, int startIndex) throws IOException { if (startIndex >= args.length) { printUsage(cmd); throw new IllegalArgumentException("Insufficient arguments"); } for (int i = startIndex; i < args.length; i++) { Path p = new Path(args[i]); FileSystem fs = p.getFileSystem(conf); // if we got a raid fs, get the underlying fs if (fs instanceof DistributedRaidFileSystem) { fs = ((DistributedRaidFileSystem) fs).getFileSystem(); } // We should be able to cast at this point. DistributedFileSystem dfs = (DistributedFileSystem) fs; RemoteIterator<Path> corruptIt = dfs.listCorruptFileBlocks(p); int count = 0; while (corruptIt.hasNext()) { count++; Path corruptFile = corruptIt.next(); // Result of checking. String result = null; FileStatus stat = fs.getFileStatus(p); if (stat.getReplication() < fs.getDefaultReplication()) { RaidInfo raidInfo = getFileRaidInfo(corruptFile); if (raidInfo.codec == null) { result = "Below default replication but no parity file found"; } else { boolean notRecoverable = isFileCorrupt(dfs, corruptFile); if (notRecoverable) { result = "Missing too many blocks to be recovered " + "using parity file " + raidInfo.parityPair.getPath(); } else { result = "Has missing blocks but can be read using parity file " + raidInfo.parityPair.getPath(); } } } else { result = "At default replication, not raided"; } out.println("Result of checking " + corruptFile + " : " + result); } out.println("Found " + count + " files with missing blocks"); } }
public void testRename() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/user/dhruba/raidtest/rename/f1"); Path destPath = new Path("/user/dhruba/raidtest/rename/f2"); Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f3"); Path destDirPath = new Path("/user/dhruba/raidtest/rename2"); Path destPath2 = new Path("/user/dhruba/raidtest/rename2/f3"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertFalse(raidFs.exists(destPath)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcPath, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), destPath, conf); assertNull(parity); // do the rename file assertTrue(raidFs.rename(srcPath, destPath)); // verify the results. assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(destPath)); assertFalse(raidFs.exists(srcParityPath)); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), destPath, conf); assertTrue(raidFs.exists(parity.getPath())); // rename the dir assertFalse(raidFs.exists(destDirPath)); assertTrue(raidFs.rename(srcPath2.getParent(), destDirPath)); // verify the results. assertFalse(raidFs.exists(srcPath2.getParent())); assertTrue(raidFs.exists(destDirPath)); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), destPath2, conf); assertTrue(raidFs.exists(parity.getPath())); } finally { stopCluster(); } }
public void testDeleteAndUndelete() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/user/dhruba/raidtest/rename/f1"); Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f2"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcPath, conf); Path srcParityPath = parity.getPath(); ParityFilePair parity2 = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcPath2, conf); Path srcParityPath2 = parity2.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // do the delete file assertTrue(raidFs.delete(srcPath)); // verify the results. assertFalse(raidFs.exists(srcPath)); assertFalse(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // do the undelete using non-exist userName String nonExistedUser = UUID.randomUUID().toString(); assertFalse(raidFs.undelete(srcPath, nonExistedUser)); // verify the results assertFalse(raidFs.exists(srcPath)); assertFalse(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // do the undelete file using current userName assertTrue(raidFs.undelete(srcPath, null)); //verify the results. assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // delete the dir assertTrue(raidFs.delete(srcPath2.getParent())); // verify the results. assertFalse(raidFs.exists(srcPath2.getParent())); assertFalse(raidFs.exists(srcParityPath)); assertFalse(raidFs.exists(srcParityPath2)); assertFalse(raidFs.exists(srcParityPath.getParent())); // undelete the dir assertTrue(raidFs.undelete(srcPath2.getParent(), null)); // verify the results. assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); } finally { stopCluster(); } }
private boolean doThePartialTest(Codec codec, int blockNum, int[] corruptBlockIdxs) throws Exception { long blockSize = 8192L; int bufferSize = 4192; Path srcPath = new Path("/user/dikang/raidtest/file" + UUID.randomUUID().toString()); long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, blockNum, blockSize); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); // generate the parity files. doRaid(srcPath, codec); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file1 for (int idx: corruptBlockIdxs) { corruptBlock(file1Loc.get(idx).getBlock().getBlockName(), dfs); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); // verify the partial read byte[] buffer = new byte[bufferSize]; FSDataInputStream in = raidFs.open(srcPath); long numRead = 0; CRC32 newcrc = new CRC32(); int num = 0; while (num >= 0) { num = in.read(numRead, buffer, 0, bufferSize); if (num < 0) { break; } numRead += num; newcrc.update(buffer, 0, num); } in.close(); if (numRead != length) { LOG.info("Number of bytes read " + numRead + " does not match file size " + length); return false; } LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc); if (newcrc.getValue() != crc) { LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + newcrc + " vs. " + crc); return false; } return true; }
/** * gets the parity blocks corresponding to file * returns the parity blocks in case of DFS * and the part blocks containing parity blocks * in case of HAR FS */ private BlockLocation[] getParityBlocks(final Path filePath, final long blockSize, final long fileStripes, final RaidInfo raidInfo) throws IOException { final String parityPathStr = raidInfo.parityPair.getPath().toUri(). getPath(); FileSystem parityFS = raidInfo.parityPair.getFileSystem(); // get parity file metadata FileStatus parityFileStatus = parityFS. getFileStatus(new Path(parityPathStr)); long parityFileLength = parityFileStatus.getLen(); if (parityFileLength != fileStripes * raidInfo.parityBlocksPerStripe * blockSize) { throw new IOException("expected parity file of length" + (fileStripes * raidInfo.parityBlocksPerStripe * blockSize) + " but got parity file of length " + parityFileLength); } BlockLocation[] parityBlocks = parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength); if (parityFS instanceof DistributedFileSystem || parityFS instanceof DistributedRaidFileSystem) { long parityBlockSize = parityFileStatus.getBlockSize(); if (parityBlockSize != blockSize) { throw new IOException("file block size is " + blockSize + " but parity file block size is " + parityBlockSize); } } else if (parityFS instanceof HarFileSystem) { LOG.debug("HAR FS found"); } else { LOG.warn("parity file system is not of a supported type"); } return parityBlocks; }
/** * checks the raided file system, prints a list of corrupt files to * System.out and returns the number of corrupt files */ public int fsck(final String path) throws IOException { FileSystem fs = (new Path(path)).getFileSystem(conf); // if we got a raid fs, get the underlying fs if (fs instanceof DistributedRaidFileSystem) { fs = ((DistributedRaidFileSystem) fs).getFileSystem(); } // check that we have a distributed fs if (!(fs instanceof DistributedFileSystem)) { throw new IOException("expected DistributedFileSystem but got " + fs.getClass().getName()); } final DistributedFileSystem dfs = (DistributedFileSystem) fs; // get conf settings String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath(); if (!xorPrefix.endsWith("/")) { xorPrefix = xorPrefix + "/"; } if (!rsPrefix.endsWith("/")) { rsPrefix = rsPrefix + "/"; } LOG.debug("prefixes: " + xorPrefix + ", " + rsPrefix); // get a list of corrupted files (not considering parity blocks just yet) // from the name node // these are the only files we need to consider: // if a file has no corrupted data blocks, it is OK even if some // of its parity blocks are corrupted, so no further checking is // necessary final String[] files = RaidDFSUtil.getCorruptFiles(dfs); final List<Path> corruptFileCandidates = new LinkedList<Path>(); for (final String f: files) { final Path p = new Path(f); // if this file is a parity file // or if it does not start with the specified path, // ignore it if (!p.toString().startsWith(xorPrefix) && !p.toString().startsWith(rsPrefix) && p.toString().startsWith(path)) { corruptFileCandidates.add(p); } } // filter files marked for deletion RaidUtils.filterTrash(conf, corruptFileCandidates); int numberOfCorruptFiles = 0; for (final Path corruptFileCandidate: corruptFileCandidates) { if (isFileCorrupt(dfs, corruptFileCandidate)) { System.out.println(corruptFileCandidate.toString()); numberOfCorruptFiles++; } } return numberOfCorruptFiles; }