public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { this.key = key; this.dataStream = dataStream; this.metaStream = metaStream; this.metaHeader = BlockMetadataHeader.preadHeader(metaStream.getChannel()); if (metaHeader.getVersion() != 1) { throw new IOException("invalid metadata header version " + metaHeader.getVersion() + ". Can only handle version 1."); } this.cache = cache; this.creationTimeMs = creationTimeMs; this.slot = slot; }
public TestFileDescriptorPair() throws IOException { fis = new FileInputStream[2]; for (int i = 0; i < 2; i++) { String name = dir.getDir() + "/file" + i; FileOutputStream fos = new FileOutputStream(name); if (i == 0) { // write 'data' file fos.write(1); } else { // write 'metadata' file BlockMetadataHeader header = new BlockMetadataHeader((short)1, DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4)); DataOutputStream dos = new DataOutputStream(fos); BlockMetadataHeader.writeHeader(dos, header); dos.close(); } fos.close(); fis[i] = new FileInputStream(name); } }
private void doShortCircuitReadAfterEvictionTest() throws IOException, InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); final int SEED = 0xFADED; makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path1, RAM_DISK); waitForMetric("RamDiskBlocksLazyPersisted", 1); // Verify short-circuit read from RAM_DISK. File metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); triggerEviction(cluster.getDataNodes().get(0)); // Verify short-circuit read still works from DEFAULT storage. This time, // we'll have a checksum written during lazy persistence. ensureFileReplicasOnStorageType(path1, DEFAULT); metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); }
private BlockReaderLocal(Builder builder) { this.replica = builder.replica; this.dataIn = replica.getDataStream().getChannel(); this.dataPos = builder.dataPos; this.checksumIn = replica.getMetaStream().getChannel(); BlockMetadataHeader header = builder.replica.getMetaHeader(); this.checksum = header.getChecksum(); this.verifyChecksum = builder.verifyChecksum && (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); this.filename = builder.filename; this.block = builder.block; this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); // Calculate the effective maximum readahead. // We can't do more readahead than there is space in the buffer. int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : ((Math.min(builder.bufferSize, builder.maxReadahead) + bytesPerChecksum - 1) / bytesPerChecksum); if (maxReadaheadChunks == 0) { this.zeroReadaheadRequested = true; maxReadaheadChunks = 1; } else { this.zeroReadaheadRequested = false; } this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; this.storageType = builder.storageType; }
private BlockReaderLocal(Builder builder) { this.replica = builder.replica; this.dataIn = replica.getDataStream().getChannel(); this.dataPos = builder.dataPos; this.checksumIn = replica.getMetaStream().getChannel(); BlockMetadataHeader header = builder.replica.getMetaHeader(); this.checksum = header.getChecksum(); this.verifyChecksum = builder.verifyChecksum && (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); this.filename = builder.filename; this.block = builder.block; this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); // Calculate the effective maximum readahead. // We can't do more readahead than there is space in the buffer. int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : ((Math.min(builder.bufferSize, builder.maxReadahead) + bytesPerChecksum - 1) / bytesPerChecksum); if (maxReadaheadChunks == 0) { this.zeroReadaheadRequested = true; maxReadaheadChunks = 1; } else { this.zeroReadaheadRequested = false; } this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; this.storageType = builder.storageType; this.tracer = builder.tracer; }
private BlockReaderLocal(Builder builder) { this.replica = builder.replica; this.dataIn = replica.getDataStream().getChannel(); this.dataPos = builder.dataPos; this.checksumIn = replica.getMetaStream().getChannel(); BlockMetadataHeader header = builder.replica.getMetaHeader(); this.checksum = header.getChecksum(); this.verifyChecksum = builder.verifyChecksum && (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); this.filename = builder.filename; this.block = builder.block; this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); // Calculate the effective maximum readahead. // We can't do more readahead than there is space in the buffer. int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : ((Math.min(builder.bufferSize, builder.maxReadahead) + bytesPerChecksum - 1) / bytesPerChecksum); if (maxReadaheadChunks == 0) { this.zeroReadaheadRequested = true; maxReadaheadChunks = 1; } else { this.zeroReadaheadRequested = false; } this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; }
public BlockMetadataHeader getMetaHeader() { return metaHeader; }
static private void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile + ", metaFile=" + metaFile + ", oldlen=" + oldlen + ", newlen=" + newlen); if (newlen == oldlen) { return; } if (newlen > oldlen) { throw new IOException("Cannot truncate block to from oldlen (=" + oldlen + ") to newlen (=" + newlen + ")"); } DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); int checksumsize = dcs.getChecksumSize(); int bpc = dcs.getBytesPerChecksum(); long n = (newlen - 1)/bpc + 1; long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize; long lastchunkoffset = (n - 1)*bpc; int lastchunksize = (int)(newlen - lastchunkoffset); byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); try { //truncate blockFile blockRAF.setLength(newlen); //read last chunk blockRAF.seek(lastchunkoffset); blockRAF.readFully(b, 0, lastchunksize); } finally { blockRAF.close(); } //compute checksum dcs.update(b, 0, lastchunksize); dcs.writeValue(b, 0, false); //update metaFile RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); try { metaRAF.setLength(newmetalen); metaRAF.seek(newmetalen - checksumsize); metaRAF.write(b, 0, checksumsize); } finally { metaRAF.close(); } }
private void doShortCircuitReadAfterEvictionTest() throws IOException, InterruptedException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); final int SEED = 0xFADED; makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); // Verify short-circuit read from RAM_DISK. ensureFileReplicasOnStorageType(path1, RAM_DISK); File metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); // Sleep for a short time to allow the lazy writer thread to do its job. Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); // Verify short-circuit read from RAM_DISK once again. ensureFileReplicasOnStorageType(path1, RAM_DISK); metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); // Create another file with a replica on RAM_DISK, which evicts the first. makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); triggerBlockReport(); // Verify short-circuit read still works from DEFAULT storage. This time, // we'll have a checksum written during lazy persistence. ensureFileReplicasOnStorageType(path1, DEFAULT); metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); // In the implementation of legacy short-circuit reads, any failure is // trapped silently, reverts back to a remote read, and also disables all // subsequent legacy short-circuit reads in the ClientContext. If the test // uses legacy, then assert that it didn't get disabled. ClientContext clientContext = client.getClientContext(); if (clientContext.getUseLegacyBlockReaderLocal()) { Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal()); } }
/** * The only way this object can be instantiated. */ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, UserGroupInformation userGroupInformation, Configuration configuration, String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, long startOffset, long length, StorageType storageType, Tracer tracer) throws IOException { final ShortCircuitConf scConf = conf.getShortCircuitConf(); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node .getIpcPort()); // check the cache first BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); if (pathinfo == null) { if (userGroupInformation == null) { userGroupInformation = UserGroupInformation.getCurrentUser(); } pathinfo = getBlockPathInfo(userGroupInformation, blk, node, configuration, conf.getSocketTimeout(), token, conf.isConnectToDnViaHostname(), storageType); } // check to see if the file exists. It may so happen that the // HDFS file has been deleted and this block-lookup is occurring // on behalf of a new HDFS file. This time, the block file could // be residing in a different portion of the fs.data.dir directory. // In this case, we remove this entry from the cache. The next // call to this method will re-populate the cache. FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocalLegacy localBlockReader = null; final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() || storageType.isTransient(); try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); dataIn = new FileInputStream(blkfile); LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset " + "{} length {} short circuit checksum {}", blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck); if (!skipChecksumCheck) { // get the metadata file File metafile = new File(pathinfo.getMetaPath()); checksumIn = new FileInputStream(metafile); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( new DataInputStream(checksumIn), blk); long firstChunkOffset = startOffset - (startOffset % checksum.getBytesPerChecksum()); localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn, tracer); } else { localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, startOffset, dataIn, tracer); } } catch (IOException e) { // remove from cache localDatanodeInfo.removeBlockLocalPathInfo(blk); LOG.warn("BlockReaderLocalLegacy: Removing " + blk + " from cache because local file " + pathinfo.getBlockPath() + " could not be opened."); throw e; } finally { if (localBlockReader == null) { if (dataIn != null) { dataIn.close(); } if (checksumIn != null) { checksumIn.close(); } } } return localBlockReader; }
/** * Find out the number of bytes in the block that match its crc. * * This algorithm assumes that data corruption caused by unexpected * datanode shutdown occurs only in the last crc chunk. So it checks * only the last chunk. * * @param blockFile the block file * @param genStamp generation stamp of the block * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { DataInputStream checksumIn = null; InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); long metaFileLen = metaFile.length(); int crcHeaderLen = DataChecksum.getChecksumHeaderSize(); if (!blockFile.exists() || blockFileLen == 0 || !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), ioFileBufferSize)); // read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( checksumIn, metaFile); int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min( (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, (metaFileLen - crcHeaderLen)/checksumSize); if (numChunks == 0) { return 0; } IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); blockIn = new FileInputStream(blockFile); long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; IOUtils.skipFully(blockIn, lastChunkStartPos); int lastChunkSize = (int)Math.min( bytesPerChecksum, blockFileLen-lastChunkStartPos); byte[] buf = new byte[lastChunkSize+checksumSize]; checksumIn.readFully(buf, lastChunkSize, checksumSize); IOUtils.readFully(blockIn, buf, 0, lastChunkSize); checksum.update(buf, 0, lastChunkSize); long validFileLength; if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc validFileLength = lastChunkStartPos + lastChunkSize; } else { // last chunck is corrupt validFileLength = lastChunkStartPos; } // truncate if extra bytes are present without CRC if (blockFile.length() > validFileLength) { RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); try { // truncate blockFile blockRAF.setLength(validFileLength); } finally { blockRAF.close(); } } return validFileLength; } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; } finally { IOUtils.closeStream(checksumIn); IOUtils.closeStream(blockIn); } }
/** * The only way this object can be instantiated. */ static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf, UserGroupInformation userGroupInformation, Configuration configuration, String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, long startOffset, long length, StorageType storageType) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node .getIpcPort()); // check the cache first BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); if (pathinfo == null) { if (userGroupInformation == null) { userGroupInformation = UserGroupInformation.getCurrentUser(); } pathinfo = getBlockPathInfo(userGroupInformation, blk, node, configuration, conf.socketTimeout, token, conf.connectToDnViaHostname, storageType); } // check to see if the file exists. It may so happen that the // HDFS file has been deleted and this block-lookup is occurring // on behalf of a new HDFS file. This time, the block file could // be residing in a different portion of the fs.data.dir directory. // In this case, we remove this entry from the cache. The next // call to this method will re-populate the cache. FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocalLegacy localBlockReader = null; boolean skipChecksumCheck = conf.skipShortCircuitChecksums || storageType.isTransient(); try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); dataIn = new FileInputStream(blkfile); if (LOG.isDebugEnabled()) { LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size " + blkfile.length() + " startOffset " + startOffset + " length " + length + " short circuit checksum " + !skipChecksumCheck); } if (!skipChecksumCheck) { // get the metadata file File metafile = new File(pathinfo.getMetaPath()); checksumIn = new FileInputStream(metafile); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( new DataInputStream(checksumIn), blk); long firstChunkOffset = startOffset - (startOffset % checksum.getBytesPerChecksum()); localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, firstChunkOffset, checksumIn); } else { localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, startOffset, length, pathinfo, dataIn); } } catch (IOException e) { // remove from cache localDatanodeInfo.removeBlockLocalPathInfo(blk); DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk + " from cache because local file " + pathinfo.getBlockPath() + " could not be opened."); throw e; } finally { if (localBlockReader == null) { if (dataIn != null) { dataIn.close(); } if (checksumIn != null) { checksumIn.close(); } } } return localBlockReader; }
/** * Find out the number of bytes in the block that match its crc. * * This algorithm assumes that data corruption caused by unexpected * datanode shutdown occurs only in the last crc chunk. So it checks * only the last chunk. * * @param blockFile the block file * @param genStamp generation stamp of the block * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { DataInputStream checksumIn = null; InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); long metaFileLen = metaFile.length(); int crcHeaderLen = DataChecksum.getChecksumHeaderSize(); if (!blockFile.exists() || blockFileLen == 0 || !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( checksumIn, metaFile); int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min( (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, (metaFileLen - crcHeaderLen)/checksumSize); if (numChunks == 0) { return 0; } IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); blockIn = new FileInputStream(blockFile); long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; IOUtils.skipFully(blockIn, lastChunkStartPos); int lastChunkSize = (int)Math.min( bytesPerChecksum, blockFileLen-lastChunkStartPos); byte[] buf = new byte[lastChunkSize+checksumSize]; checksumIn.readFully(buf, lastChunkSize, checksumSize); IOUtils.readFully(blockIn, buf, 0, lastChunkSize); checksum.update(buf, 0, lastChunkSize); long validFileLength; if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc validFileLength = lastChunkStartPos + lastChunkSize; } else { // last chunck is corrupt validFileLength = lastChunkStartPos; } // truncate if extra bytes are present without CRC if (blockFile.length() > validFileLength) { RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); try { // truncate blockFile blockRAF.setLength(validFileLength); } finally { blockRAF.close(); } } return validFileLength; } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; } finally { IOUtils.closeStream(checksumIn); IOUtils.closeStream(blockIn); } }
public BlockReaderLocal(DFSClient.Conf conf, String filename, ExtendedBlock block, long startOffset, long length, FileInputStream dataIn, FileInputStream checksumIn, DatanodeID datanodeID, boolean verifyChecksum, FileInputStreamCache fisCache) throws IOException { this.dataIn = dataIn; this.checksumIn = checksumIn; this.startOffset = Math.max(startOffset, 0); this.filename = filename; this.datanodeID = datanodeID; this.block = block; this.fisCache = fisCache; // read and handle the common header here. For now just a version checksumIn.getChannel().position(0); BlockMetadataHeader header = BlockMetadataHeader .readHeader(new DataInputStream( new BufferedInputStream(checksumIn, BlockMetadataHeader.getHeaderSize()))); short version = header.getVersion(); if (version != BlockMetadataHeader.VERSION) { throw new IOException("Wrong version (" + version + ") of the " + "metadata file for " + filename + "."); } this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums; long firstChunkOffset; if (this.verifyChecksum) { this.checksum = header.getChecksum(); this.bytesPerChecksum = this.checksum.getBytesPerChecksum(); this.checksumSize = this.checksum.getChecksumSize(); firstChunkOffset = startOffset - (startOffset % checksum.getBytesPerChecksum()); this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset); int chunksPerChecksumRead = getSlowReadBufferNumChunks( conf.shortCircuitBufferSize, bytesPerChecksum); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); // Initially the buffers have nothing to read. slowReadBuff.flip(); checksumBuff.flip(); long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; IOUtils.skipFully(checksumIn, checkSumOffset); } else { firstChunkOffset = startOffset; this.checksum = null; this.bytesPerChecksum = 0; this.checksumSize = 0; this.offsetFromChunkBoundary = 0; } boolean success = false; try { // Reposition both input streams to the beginning of the chunk // containing startOffset this.dataIn.getChannel().position(firstChunkOffset); success = true; } finally { if (success) { if (LOG.isDebugEnabled()) { LOG.debug("Created BlockReaderLocal for file " + filename + " block " + block + " in datanode " + datanodeID); } } else { if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff); if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff); } } }
/** * Find out the number of bytes in the block that match its crc. * * This algorithm assumes that data corruption caused by unexpected * datanode shutdown occurs only in the last crc chunk. So it checks * only the last chunk. * * @param blockFile the block file * @param genStamp generation stamp of the block * @return the number of valid bytes */ private long validateIntegrity(File blockFile, long genStamp) { DataInputStream checksumIn = null; InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); long metaFileLen = metaFile.length(); int crcHeaderLen = DataChecksum.getChecksumHeaderSize(); if (!blockFile.exists() || blockFileLen == 0 || !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a version BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); short version = header.getVersion(); if (version != BlockMetadataHeader.VERSION) { FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file " + metaFile + " ignoring ..."); } DataChecksum checksum = header.getChecksum(); int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min( (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, (metaFileLen - crcHeaderLen)/checksumSize); if (numChunks == 0) { return 0; } IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); blockIn = new FileInputStream(blockFile); long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; IOUtils.skipFully(blockIn, lastChunkStartPos); int lastChunkSize = (int)Math.min( bytesPerChecksum, blockFileLen-lastChunkStartPos); byte[] buf = new byte[lastChunkSize+checksumSize]; checksumIn.readFully(buf, lastChunkSize, checksumSize); IOUtils.readFully(blockIn, buf, 0, lastChunkSize); checksum.update(buf, 0, lastChunkSize); if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc return lastChunkStartPos + lastChunkSize; } else { // last chunck is corrupt return lastChunkStartPos; } } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; } finally { IOUtils.closeStream(checksumIn); IOUtils.closeStream(blockIn); } }