@Override public BlockLocation[] getFileBlockLocations( FileStatus stat, long start, long len) throws IOException { if (stat.isDir()) { return null; } System.out.println("File " + stat.getPath()); String name = stat.getPath().toUri().getPath(); BlockLocation[] locs = super.getFileBlockLocations(stat, start, len); if (name.equals(fileWithMissingBlocks)) { System.out.println("Returning missing blocks for " + fileWithMissingBlocks); locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0], new String[0], locs[0].getOffset(), locs[0].getLength()), null); } return locs; }
/** * Get block location info about file * * getBlockLocations() returns a list of hostnames that store * data for a specific file region. It returns a set of hostnames * for every block within the indicated region. * * This function is very useful when writing code that considers * data-placement when performing operations. For example, the * MapReduce system tries to schedule tasks on the same machines * as the data-block the task processes. */ public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException, UnresolvedLinkException { TraceScope scope = getPathTraceScope("getBlockLocations", src); try { LocatedBlocks blocks = getLocatedBlocks(src, start, length); BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; for (int i = 0; i < locations.length; i++) { hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); } return hdfsLocations; } finally { scope.close(); } }
@Test(timeout=60000) public void testUncacheUnknownBlock() throws Exception { // Create a file Path fileName = new Path("/testUncacheUnknownBlock"); int fileLen = 4096; DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD); HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations( fileName, 0, fileLen); // Try to uncache it without caching it first setHeartbeatResponse(uncacheBlocks(locs)); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return fsd.getNumBlocksFailedToUncache() > 0; } }, 100, 10000); }
@Test(timeout=60000) public void testPageRounder() throws Exception { // Write a small file Path fileName = new Path("/testPageRounder"); final int smallBlocks = 512; // This should be smaller than the page size assertTrue("Page size should be greater than smallBlocks!", PAGE_SIZE > smallBlocks); final int numBlocks = 5; final int fileLen = smallBlocks * numBlocks; FSDataOutputStream out = fs.create(fileName, false, 4096, (short)1, smallBlocks); out.write(new byte[fileLen]); out.close(); HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations( fileName, 0, fileLen); // Cache the file and check the sizes match the page size setHeartbeatResponse(cacheBlocks(locs)); DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks, fsd); // Uncache and check that it decrements by the page size too setHeartbeatResponse(uncacheBlocks(locs)); DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); }
private static long[] getBlockSizes(HdfsBlockLocation[] locs) throws Exception { long[] sizes = new long[locs.length]; for (int i=0; i<locs.length; i++) { HdfsBlockLocation loc = locs[i]; String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId(); Block block = loc.getLocatedBlock().getBlock().getLocalBlock(); ExtendedBlock extBlock = new ExtendedBlock(bpid, block); FileInputStream blockInputStream = null; FileChannel blockChannel = null; try { blockInputStream = (FileInputStream)fsd.getBlockInputStream(extBlock, 0); blockChannel = blockInputStream.getChannel(); sizes[i] = blockChannel.size(); } finally { IOUtils.cleanup(LOG, blockChannel, blockInputStream); } } return sizes; }
/** * Creates a cache or uncache DatanodeCommand from an array of locations */ private static DatanodeCommand getResponse(HdfsBlockLocation[] locs, int action) { String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId(); long[] blocks = new long[locs.length]; for (int i=0; i<locs.length; i++) { blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId(); } return new BlockIdCommand(action, bpid, blocks); }
/** * Get block location info about file * * getBlockLocations() returns a list of hostnames that store * data for a specific file region. It returns a set of hostnames * for every block within the indicated region. * * This function is very useful when writing code that considers * data-placement when performing operations. For example, the * MapReduce system tries to schedule tasks on the same machines * as the data-block the task processes. */ public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException { checkOpen(); try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) { LocatedBlocks blocks = getLocatedBlocks(src, start, length); BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; for (int i = 0; i < locations.length; i++) { hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); } return hdfsLocations; } }
private static long[] getBlockSizes(HdfsBlockLocation[] locs) throws Exception { long[] sizes = new long[locs.length]; for (int i=0; i<locs.length; i++) { HdfsBlockLocation loc = locs[i]; String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId(); Block block = loc.getLocatedBlock().getBlock().getLocalBlock(); ExtendedBlock extBlock = new ExtendedBlock(bpid, block); FileChannel blockChannel = ((FileInputStream)fsd.getBlockInputStream(extBlock, 0)).getChannel(); sizes[i] = blockChannel.size(); } return sizes; }
private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException { FileStatus fileStatus = fileSystem.getFileStatus(realFile); BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1); HdfsBlockLocation location = (HdfsBlockLocation) locations[0]; LocatedBlock locatedBlock = location.getLocatedBlock(); ExtendedBlock block = locatedBlock.getBlock(); return toNiceString(block.getBlockId()); }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas TraceScope scope = Trace.startSpan("getBlockStorageLocations", traceSampler); Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } } finally { scope.close(); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }