/** * Verify that the given checksums match the given data. * * The 'mark' of the ByteBuffer parameters may be modified by this function,. * but the position is maintained. * * @param data the DirectByteBuffer pointing to the data to verify. * @param checksums the DirectByteBuffer pointing to a series of stored * checksums * @param fileName the name of the file being read, for error-reporting * @param basePos the file position to which the start of 'data' corresponds * @throws ChecksumException if the checksums do not match */ public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums, String fileName, long basePos) throws ChecksumException { if (type.size == 0) return; if (data.hasArray() && checksums.hasArray()) { final int dataOffset = data.arrayOffset() + data.position(); final int crcsOffset = checksums.arrayOffset() + checksums.position(); verifyChunked(type, summer, data.array(), dataOffset, data.remaining(), bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos); return; } if (NativeCrc32.isAvailable()) { NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data, fileName, basePos); } else { verifyChunked(type, summer, data, bytesPerChecksum, checksums, fileName, basePos); } }
@Override public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { int oldpos = buf.position(); int oldlimit = buf.limit(); boolean success = false; try { int ret = blockReader.read(buf); success = true; updateReadStatistics(readStatistics, ret, blockReader); return ret; } finally { if (!success) { // Reset to original state so that retries work correctly. buf.position(oldpos); buf.limit(oldlimit); } } }
/** * Verify multiple CRC chunks. */ private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) throws IOException { try { clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0); } catch (ChecksumException ce) { LOG.warn("Checksum error in block " + block + " from " + inAddr, ce); // No need to report to namenode when client is writing. if (srcDataNode != null && isDatanode) { try { LOG.info("report corrupt " + block + " from datanode " + srcDataNode + " to namenode"); datanode.reportRemoteBadBlock(srcDataNode, block); } catch (IOException e) { LOG.warn("Failed to report bad " + block + " from datanode " + srcDataNode + " to namenode"); } } throw new IOException("Unexpected checksum mismatch while writing " + block + " from " + inAddr); } }
/** * Compute checksum for chunks and verify the checksum that is read from * the metadata file is correct. * * @param buf buffer that has checksum and data * @param dataOffset position where data is written in the buf * @param datalen length of data * @param numChunks number of chunks corresponding to data * @param checksumOffset offset where checksum is written in the buf * @throws ChecksumException on failed checksum verification */ public void verifyChecksum(final byte[] buf, final int dataOffset, final int datalen, final int numChunks, final int checksumOffset) throws ChecksumException { int dOff = dataOffset; int cOff = checksumOffset; int dLeft = datalen; for (int i = 0; i < numChunks; i++) { checksum.reset(); int dLen = Math.min(dLeft, chunkSize); checksum.update(buf, dOff, dLen); if (!checksum.compare(buf, cOff)) { long failedPos = offset + datalen - dLeft; throw new ChecksumException("Checksum failed at " + failedPos, failedPos); } dLeft -= dLen; dOff += dLen; cOff += checksumSize; } }
private long tailFile(Path file, long startPos) throws IOException { long numRead = 0; FSDataInputStream inputStream = fileSystem.open(file); inputStream.seek(startPos); int len = 4 * 1024; byte[] buf = new byte[len]; int read; while ((read = inputStream.read(buf)) > -1) { LOG.info(String.format("read %d bytes", read)); if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) { LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf))); throw new ChecksumException( String.format("unable to validate bytes"), startPos ); } numRead += read; } inputStream.close(); return numRead + startPos - 1; }
/** * Ask dfs client to read the file */ private void dfsClientReadFile(Path corruptedFile) throws IOException, UnresolvedLinkException { DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath()); byte[] buf = new byte[buffersize]; int nRead = 0; // total number of bytes read try { do { nRead = in.read(buf, 0, buf.length); } while (nRead > 0); } catch (ChecksumException ce) { // caught ChecksumException if all replicas are bad, ignore and continue. LOG.debug("DfsClientReadFile caught ChecksumException."); } catch (BlockMissingException bme) { // caught BlockMissingException, ignore. LOG.debug("DfsClientReadFile caught BlockMissingException."); } }
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException, InterruptedException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); final int SEED = 0xFADED; makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path1, RAM_DISK); // Create another file with a replica on RAM_DISK, which evicts the first. makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); // Sleep for a short time to allow the lazy writer thread to do its job. Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); triggerBlockReport(); // Corrupt the lazy-persisted block file, and verify that checksum // verification catches it. ensureFileReplicasOnStorageType(path1, DEFAULT); cluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1)); exception.expect(ChecksumException.class); DFSTestUtil.readFileBuffer(fs, path1); }
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException, InterruptedException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); final int SEED = 0xFADED; makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path1, RAM_DISK); // Create another file with a replica on RAM_DISK, which evicts the first. makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); // Sleep for a short time to allow the lazy writer thread to do its job. Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); triggerBlockReport(); // Corrupt the lazy-persisted checksum file, and verify that checksum // verification catches it. ensureFileReplicasOnStorageType(path1, DEFAULT); File metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); MiniDFSCluster.corruptBlock(metaFile); exception.expect(ChecksumException.class); DFSTestUtil.readFileBuffer(fs, path1); }
public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { byte buf[] = new byte[TEST_LENGTH]; try { reader.readFully(buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10); reader.readFully(buf, 10, 100); assertArrayRegionsEqual(original, 10, buf, 10, 100); reader.readFully(buf, 110, 700); assertArrayRegionsEqual(original, 110, buf, 110, 700); reader.skip(1); // skip from offset 810 to offset 811 reader.readFully(buf, 811, 5); assertArrayRegionsEqual(original, 811, buf, 811, 5); reader.readFully(buf, 816, 900); if (usingChecksums) { // We should detect the corruption when using a checksum file. Assert.fail("did not detect corruption"); } } catch (ChecksumException e) { if (!usingChecksums) { Assert.fail("didn't expect to get ChecksumException: not " + "using checksums."); } } }
/** check if local FS can handle corrupted blocks properly */ @Test public void testLocalFileCorruption() throws Exception { Configuration conf = new HdfsConfiguration(); Path file = new Path(PathUtils.getTestDirName(getClass()), "corruptFile"); FileSystem fs = FileSystem.getLocal(conf); DataOutputStream dos = fs.create(file); dos.writeBytes("original bytes"); dos.close(); // Now deliberately corrupt the file dos = new DataOutputStream(new FileOutputStream(file.toString())); dos.writeBytes("corruption"); dos.close(); // Now attempt to read the file DataInputStream dis = fs.open(file, 512); try { LOG.info("A ChecksumException is expected to be logged."); dis.readByte(); } catch (ChecksumException ignore) { //expect this exception but let any NPE get thrown } fs.delete(file, true); }
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException, InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); waitForMetric("RamDiskBlocksLazyPersisted", 1); triggerEviction(cluster.getDataNodes().get(0)); // Corrupt the lazy-persisted block file, and verify that checksum // verification catches it. ensureFileReplicasOnStorageType(path1, DEFAULT); cluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1)); exception.expect(ChecksumException.class); DFSTestUtil.readFileBuffer(fs, path1); }
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException, InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); waitForMetric("RamDiskBlocksLazyPersisted", 1); triggerEviction(cluster.getDataNodes().get(0)); // Corrupt the lazy-persisted checksum file, and verify that checksum // verification catches it. ensureFileReplicasOnStorageType(path1, DEFAULT); cluster.corruptMeta(0, DFSTestUtil.getFirstBlock(fs, path1)); exception.expect(ChecksumException.class); DFSTestUtil.readFileBuffer(fs, path1); }
/** check if local FS can handle corrupted blocks properly */ @Test public void testLocalFileCorruption() throws Exception { Configuration conf = new HdfsConfiguration(); Path file = new Path(PathUtils.getTestDirName(getClass()), "corruptFile"); FileSystem fs = FileSystem.getLocal(conf); DataOutputStream dos = fs.create(file); dos.writeBytes("original bytes"); dos.close(); // Now deliberately corrupt the file dos = new DataOutputStream(new FileOutputStream(file.toString())); dos.writeBytes("corruption"); dos.close(); // Now attempt to read the file DataInputStream dis = fs.open(file, 512); try { System.out.println("A ChecksumException is expected to be logged."); dis.readByte(); } catch (ChecksumException ignore) { //expect this exception but let any NPE get thrown } fs.delete(file, true); }
protected static boolean checkCrc(FileSystem fs, Path file, Path crc) throws IOException { byte[] buf = new byte[512*1024*1024]; long len = fs.getFileStatus(file).getLen(); long pos = 0; Boolean gotException = false; FSDataInputStream in = fs.open(file); try { int read = in.read(pos, buf, 0, buf.length); pos += read; while( pos < len ) { read = in.read(pos, buf, 0, buf.length); pos += read; } // IOUtils.readFully(in, buf, 0, buf.length); // cant process more than 2gb files... } catch (ChecksumException e) { gotException = true; } Logger.DEBUG("checksum of " + file + " is " + (gotException ? "incorrect, needs to be redownloaded" : "correct")); in.close(); return !gotException; // real check is 52m 16s (chr1) // just return true here is (crh1) }
private void testFileCorruptionHelper(Configuration conf) throws Exception { Path file = new Path(TEST_ROOT_DIR, "corruptFile"); FileSystem fs = FileSystem.getLocal(conf); DataOutputStream dos = fs.create(file); dos.writeBytes("original bytes"); dos.close(); // Now deliberately corrupt the file dos = new DataOutputStream(new FileOutputStream(file.toString())); dos.writeBytes("corruption"); dos.close(); // Now attempt to read the file DataInputStream dis = fs.open(file, 512); try { System.out.println("A ChecksumException is expected to be logged."); dis.readByte(); } catch (ChecksumException ignore) { //expect this exception but let any NPE get thrown } fs.delete(file, true); }
public void testSmallFileCorruption() throws Exception { long fileSize = 1L; Configuration conf = new Configuration(); conf.setInt("dfs.replication", 1); MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); try { cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); Path file = new Path("/test"); DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L); Block block = DFSTestUtil.getFirstBlock(fs, file); cluster.corruptBlockOnDataNodes(block); try { IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf, true); fail("Didn't get checksum exception"); } catch (ChecksumException ioe) { DFSClient.LOG.info("Got expected checksum exception", ioe); } } finally { cluster.shutdown(); } }
@Override public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { int oldpos = buf.position(); int oldlimit = buf.limit(); boolean success = false; try { int ret = blockReader.read(buf); success = true; return ret; } finally { if (!success) { // Reset to original state so that retries work correctly. buf.position(oldpos); buf.limit(oldlimit); } } }