/** * Helper method to combine a list of {@link LocatedBlock} with associated * {@link VolumeId} information to form a list of {@link BlockStorageLocation} * . */ static BlockStorageLocation[] convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { // Construct the final return value of VolumeBlockLocation[] BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); List<BlockStorageLocation> volumeBlockLocs = new ArrayList<BlockStorageLocation>(locations.length); for (int i = 0; i < locations.length; i++) { LocatedBlock locBlock = blocks.get(i); List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], volumeIds.toArray(new VolumeId[0])); volumeBlockLocs.add(bsLoc); } return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); }
/** * Returns a disk id (0-based) index from the Hdfs VolumeId object. There is * currently no public API to get at the volume id. We'll have to get it by * accessing the internals. */ public static int getDiskId(VolumeId hdfsVolumeId){ // Initialize the diskId as -1 to indicate it is unknown int diskId = -1; if (hdfsVolumeId != null) { String volumeIdString = hdfsVolumeId.toString(); byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString); if (volumeIdBytes != null && volumeIdBytes.length == 4) { diskId = Utils.toInt(volumeIdBytes); }else if (volumeIdBytes.length == 1) { diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2 } } return diskId; }
/** * Helper method to combine a list of {@link LocatedBlock} with associated * {@link VolumeId} information to form a list of {@link * BlockStorageLocation} * . */ static BlockStorageLocation[] convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { // Construct the final return value of VolumeBlockLocation[] BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); List<BlockStorageLocation> volumeBlockLocs = new ArrayList<>(locations.length); for (int i = 0; i < locations.length; i++) { LocatedBlock locBlock = blocks.get(i); List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], volumeIds.toArray(new VolumeId[0])); volumeBlockLocs.add(bsLoc); } return volumeBlockLocs.toArray(new BlockStorageLocation[]{}); }
private void printBlockMetadata(BlockLocation blockLocation, String[] dataDirs) throws IOException { System.out.println(" Offset: " + blockLocation.getOffset()); System.out.println(" Length: " + blockLocation.getLength()); String[] cachedHosts = blockLocation.getCachedHosts(); if (cachedHosts.length == 0) { System.out.println(" No cached hosts"); } System.out.println(" Replicas:"); VolumeId[] volumeIds = blockLocation instanceof BlockStorageLocation ? (((BlockStorageLocation) blockLocation).getVolumeIds()) : null; String[] hosts = blockLocation.getHosts(); String[] names = blockLocation.getNames(); String[] topologyPaths = blockLocation.getTopologyPaths(); for (int i = 0; i < topologyPaths.length; i++) { int diskId = volumeIds != null ? DistributedFileSystemMetadata.getDiskId(volumeIds[i]) : -1; System.out.println(" Replica (" + i + "):"); System.out.println(" Host: " + hosts[i]); if(diskId == -1) System.out.println(" DiskId: unknown"); else if(dataDirs != null && diskId < dataDirs.length) System.out.println(" Location: " + dataDirs[diskId] + " (DiskId: " + diskId + ")"); else System.out.println(" DiskId: " + diskId); System.out.println(" Name: " + names[i]); System.out.println(" TopologyPaths: " + topologyPaths[i]); } if (cachedHosts.length > 0) { System.out.println(" Cached hosts:"); for (String cachedHost : cachedHosts) { System.out.println(" Host: " + cachedHost); } } }
@Test public void computeHostsDiskIdsCount() throws IOException{ List<BlockLocation> blockStorageLocations = new LinkedList<>(); blockStorageLocations.add(new BlockStorageLocation( new BlockLocation(null, new String[]{"host1", "host2"}, 0, 0), new VolumeId[]{new TVolumeId("3"), new TVolumeId("4")})); blockStorageLocations.add(new BlockStorageLocation( new BlockLocation(null, new String[]{"host2", "host3"}, 0, 0), new VolumeId[]{new TVolumeId("4"), new TVolumeId("5")})); blockStorageLocations.add(new BlockStorageLocation( new BlockLocation(null, new String[]{"host10", "host2"}, 0, 0), new VolumeId[]{new TVolumeId("3"), new TVolumeId("4")})); blockStorageLocations.add(new BlockStorageLocation( new BlockLocation(null, new String[]{"host10", "host3"}, 0, 0), new VolumeId[]{new TVolumeId("8"), new TVolumeId("5")})); blockStorageLocations.add(new BlockLocation(null, new String[]{"host10", "host3", "host3"}, 0, 0)); HashMap<String, HashMap<Integer, Integer>> hosts_diskids = DistributedFileSystemMetadata.computeHostsDiskIdsCount(blockStorageLocations); Assert.assertEquals(1, hosts_diskids.get("host1").get(3).intValue()); Assert.assertEquals(3, hosts_diskids.get("host2").get(4).intValue()); Assert.assertEquals(2, hosts_diskids.get("host3").get(5).intValue()); Assert.assertEquals(2, hosts_diskids.get("host3").get(-1).intValue()); Assert.assertEquals(1, hosts_diskids.get("host10").get(3).intValue()); Assert.assertEquals(1, hosts_diskids.get("host10").get(8).intValue()); Assert.assertEquals(1, hosts_diskids.get("host10").get(-1).intValue()); }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas TraceScope scope = Trace.startSpan("getBlockStorageLocations", traceSampler); Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } } finally { scope.close(); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
@Override public int compareTo(VolumeId arg0) { return 0; }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); } // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} * . * * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeout, getConf().connectToDnViaHostname); // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }
/** * Get block location information about a list of {@link HdfsBlockLocation}. * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} * to * get {@link BlockStorageLocation}s for blocks returned by * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, * long, long)} * . * <p/> * This is done by making a round of RPCs to the associated datanodes, asking * the volume of each block replica. The returned array of * {@link BlockStorageLocation} expose this information as a * {@link VolumeId}. * * @param blockLocations * target blocks on which to query volume location information * @return volumeBlockLocations original block array augmented with additional * volume location information for each replica. */ public BlockStorageLocation[] getBlockStorageLocations( List<BlockLocation> blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { if (!getConf().getHdfsBlocksMetadataEnabled) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); } // Downcast blockLocations and fetch out required LocatedBlock(s) List<LocatedBlock> blocks = new ArrayList<>(); for (BlockLocation loc : blockLocations) { if (!(loc instanceof HdfsBlockLocation)) { throw new ClassCastException("DFSClient#getVolumeBlockLocations " + "expected to be passed HdfsBlockLocations"); } HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; blocks.add(hdfsLoc.getLocatedBlock()); } // Re-group the LocatedBlocks to be grouped by datanodes, with the values // a list of the LocatedBlocks on the datanode. Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = new LinkedHashMap<>(); for (LocatedBlock b : blocks) { for (DatanodeInfo info : b.getLocations()) { if (!datanodeBlocks.containsKey(info)) { datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); } List<LocatedBlock> l = datanodeBlocks.get(info); l.add(b); } } // Make RPCs to the datanodes to get volume locations for its replicas List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsTimeout, getConf().connectToDnViaHostname); // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil .convertToVolumeBlockLocations(blocks, blockVolumeIds); return volumeBlockLocations; }