@Test public void testGetBlockLocations() throws IOException { Path targetFilePath = new Path(targetTestRoot,"data/largeFile"); FileContextTestHelper.createFile(fcTarget, targetFilePath, 10, 1024); Path viewFilePath = new Path("/data/largeFile"); checkFileStatus(fcView, viewFilePath.toString(), fileType.isFile); BlockLocation[] viewBL = fcView.getFileBlockLocations(viewFilePath, 0, 10240+100); Assert.assertEquals(SupportsBlocks ? 10 : 1, viewBL.length); BlockLocation[] targetBL = fcTarget.getFileBlockLocations(targetFilePath, 0, 10240+100); compareBLs(viewBL, targetBL); // Same test but now get it via the FileStatus Parameter fcView.getFileBlockLocations(viewFilePath, 0, 10240+100); targetBL = fcTarget.getFileBlockLocations(targetFilePath, 0, 10240+100); compareBLs(viewBL, targetBL); }
/** * Builds a mapping of block locations to file byte range */ private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(FileStatus status) throws IOException { final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time(); BlockLocation[] blocks; ImmutableRangeMap<Long,BlockLocation> blockMap; blocks = fs.getFileBlockLocations(status, 0 , status.getLen()); ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>(); for (BlockLocation block : blocks) { long start = block.getOffset(); long end = start + block.getLength(); Range<Long> range = Range.closedOpen(start, end); blockMapBuilder = blockMapBuilder.put(range, block); } blockMap = blockMapBuilder.build(); blockMapMap.put(status.getPath(), blockMap); context.stop(); return blockMap; }
/** * Get the host affinity for a row group * @param fileStatus the parquet file * @param start the start of the row group * @param length the length of the row group * @return * @throws IOException */ private Map<String,Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException { BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length); Map<String,Float> hostAffinityMap = Maps.newHashMap(); for (BlockLocation blockLocation : blockLocations) { for (String host : blockLocation.getHosts()) { Float currentAffinity = hostAffinityMap.get(host); float blockStart = blockLocation.getOffset(); float blockEnd = blockStart + blockLocation.getLength(); float rowGroupEnd = start + length; Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) - (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; if (currentAffinity != null) { hostAffinityMap.put(host, currentAffinity + newAffinity); } else { hostAffinityMap.put(host, newAffinity); } } } return hostAffinityMap; }
public BlockLocation[] buildBlockLocations2(String[] hosts, long blockSize) { String[] names = new String[hosts.length]; for (int i = 0; i < hosts.length; i++) { hosts[i] = "host" + i; names[i] = "host:" + port; } BlockLocation[] blockLocations = new BlockLocation[4]; blockLocations[0] = new BlockLocation(new String[]{names[0]}, new String[]{hosts[0]}, 0, blockSize); blockLocations[1] = new BlockLocation(new String[]{names[1]}, new String[]{hosts[1]}, blockSize, blockSize); blockLocations[3] = new BlockLocation(new String[]{names[3]}, new String[]{hosts[3]}, blockSize*2, blockSize); blockLocations[2] = new BlockLocation(new String[]{names[2]}, new String[]{hosts[2]}, blockSize*3, blockSize); return blockLocations; }
@Test public void testGetBlockLocations() throws IOException { Path targetFilePath = new Path(targetTestRoot,"data/largeFile"); FileSystemTestHelper.createFile(fsTarget, targetFilePath, 10, 1024); Path viewFilePath = new Path("/data/largeFile"); Assert.assertTrue("Created File should be type File", fsView.isFile(viewFilePath)); BlockLocation[] viewBL = fsView.getFileBlockLocations(fsView.getFileStatus(viewFilePath), 0, 10240+100); Assert.assertEquals(SupportsBlocks ? 10 : 1, viewBL.length); BlockLocation[] targetBL = fsTarget.getFileBlockLocations(fsTarget.getFileStatus(targetFilePath), 0, 10240+100); compareBLs(viewBL, targetBL); // Same test but now get it via the FileStatus Parameter fsView.getFileBlockLocations( fsView.getFileStatus(viewFilePath), 0, 10240+100); targetBL = fsTarget.getFileBlockLocations( fsTarget.getFileStatus(targetFilePath), 0, 10240+100); compareBLs(viewBL, targetBL); }
@Override protected FileStatus[] listStatus(JobConf job) throws IOException { FileStatus mockFileStatus = mock(FileStatus.class); when(mockFileStatus.getBlockSize()).thenReturn(splitSize); when(mockFileStatus.isDirectory()).thenReturn(false); Path mockPath = mock(Path.class); FileSystem mockFs = mock(FileSystem.class); BlockLocation[] blockLocations = mockBlockLocations(length, splitSize); when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn( blockLocations); when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs); when(mockFileStatus.getPath()).thenReturn(mockPath); when(mockFileStatus.getLen()).thenReturn(length); FileStatus[] fs = new FileStatus[1]; fs[0] = mockFileStatus; return fs; }
@Override public BlockLocation[] getFileBlockLocations( FileStatus stat, long start, long len) throws IOException { if (stat.isDir()) { return null; } System.out.println("File " + stat.getPath()); String name = stat.getPath().toUri().getPath(); BlockLocation[] locs = super.getFileBlockLocations(stat, start, len); if (name.equals(fileWithMissingBlocks)) { System.out.println("Returning missing blocks for " + fileWithMissingBlocks); locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0], new String[0], locs[0].getOffset(), locs[0].getLength()), null); } return locs; }
/** * Helper method to combine a list of {@link LocatedBlock} with associated * {@link VolumeId} information to form a list of {@link BlockStorageLocation} * . */ static BlockStorageLocation[] convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { // Construct the final return value of VolumeBlockLocation[] BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); List<BlockStorageLocation> volumeBlockLocs = new ArrayList<BlockStorageLocation>(locations.length); for (int i = 0; i < locations.length; i++) { LocatedBlock locBlock = blocks.get(i); List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], volumeIds.toArray(new VolumeId[0])); volumeBlockLocs.add(bsLoc); } return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); }
private void checkFile(FileSystem fileSys, Path name) throws IOException { BlockLocation[] locations = fileSys.getFileBlockLocations( fileSys.getFileStatus(name), 0, fileSize); assertEquals("Number of blocks", fileSize, locations.length); FSDataInputStream stm = fileSys.open(name); byte[] expected = new byte[fileSize]; if (simulatedStorage) { for (int i = 0; i < expected.length; ++i) { expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE; } } else { Random rand = new Random(seed); rand.nextBytes(expected); } // do a sanity check. Read the file byte[] actual = new byte[fileSize]; stm.readFully(0, actual); checkAndEraseData(actual, 0, expected, "Read Sanity Test"); stm.close(); }
public void testGetFileBlockLocations() throws IOException { final String f = "/test/testGetFileBlockLocations"; createFile(path(f)); final BlockLocation[] computed = fs.getFileBlockLocations(new Path(f), 0L, 1L); final BlockLocation[] expected = cluster.getFileSystem().getFileBlockLocations( new Path(f), 0L, 1L); assertEquals(expected.length, computed.length); for (int i = 0; i < computed.length; i++) { assertEquals(expected[i].toString(), computed[i].toString()); // Check names String names1[] = expected[i].getNames(); String names2[] = computed[i].getNames(); Arrays.sort(names1); Arrays.sort(names2); Assert.assertArrayEquals("Names differ", names1, names2); // Check topology String topos1[] = expected[i].getTopologyPaths(); String topos2[] = computed[i].getTopologyPaths(); Arrays.sort(topos1); Arrays.sort(topos2); Assert.assertArrayEquals("Topology differs", topos1, topos2); } }
private void waitForBlocks(FileSystem fileSys, Path name) throws IOException { // wait until we have at least one block in the file to read. boolean done = false; while (!done) { try { Thread.sleep(1000); } catch (InterruptedException e) { } done = true; BlockLocation[] locations = fileSys.getFileBlockLocations( fileSys.getFileStatus(name), 0, blockSize); if (locations.length < 1) { done = false; continue; } } }
@Test(timeout=180000) public void testFavoredNodesEndToEnd() throws Exception { //create 10 files with random preferred nodes for (int i = 0; i < NUM_FILES; i++) { Random rand = new Random(System.currentTimeMillis() + i); //pass a new created rand so as to get a uniform distribution each time //without too much collisions (look at the do-while loop in getDatanodes) InetSocketAddress datanode[] = getDatanodes(rand); Path p = new Path("/filename"+i); FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, 4096, (short)3, 4096L, null, datanode); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); //verify the files got created in the right nodes for (BlockLocation loc : locations) { String[] hosts = loc.getNames(); String[] hosts1 = getStringForInetSocketAddrs(datanode); assertTrue(compareNodes(hosts, hosts1)); } } }
@Test(timeout = 180000) public void testFavoredNodesEndToEndForAppend() throws Exception { // create 10 files with random preferred nodes for (int i = 0; i < NUM_FILES; i++) { Random rand = new Random(System.currentTimeMillis() + i); // pass a new created rand so as to get a uniform distribution each time // without too much collisions (look at the do-while loop in getDatanodes) InetSocketAddress datanode[] = getDatanodes(rand); Path p = new Path("/filename" + i); // create and close the file. dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L, null, null).close(); // re-open for append FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND), 4096, null, datanode); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); // verify the files got created in the right nodes for (BlockLocation loc : locations) { String[] hosts = loc.getNames(); String[] hosts1 = getStringForInetSocketAddrs(datanode); assertTrue(compareNodes(hosts, hosts1)); } } }
void writeFile(Path file, FSDataOutputStream stm, int size) throws IOException { long blocksBefore = stm.getPos() / BLOCK_SIZE; TestFileCreation.writeFile(stm, BLOCK_SIZE); // need to make sure the full block is completely flushed to the DataNodes // (see FSOutputSummer#flush) stm.flush(); int blocksAfter = 0; // wait until the block is allocated by DataStreamer BlockLocation[] locatedBlocks; while(blocksAfter <= blocksBefore) { locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations( file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS); blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length; } }
/** * Read and write some JSON * @throws IOException */ @Test(timeout = SWIFT_TEST_TIMEOUT) public void testRWJson() throws IOException { final String message = "{" + " 'json': { 'i':43, 'b':true}," + " 's':'string'" + "}"; final Path filePath = new Path("/test/file.json"); writeTextFile(fs, filePath, message, false); String readJson = readBytesToString(fs, filePath, message.length()); assertEquals(message,readJson); //now find out where it is FileStatus status = fs.getFileStatus(filePath); BlockLocation[] locations = fs.getFileBlockLocations(status, 0, 10); }
/** * Get the host affinity for a row group * * @param fileStatus the parquet file * @param start the start of the row group * @param length the length of the row group * @return * @throws IOException */ private Map<String, Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException { BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length); Map<String, Float> hostAffinityMap = Maps.newHashMap(); for (BlockLocation blockLocation : blockLocations) { for (String host : blockLocation.getHosts()) { Float currentAffinity = hostAffinityMap.get(host); float blockStart = blockLocation.getOffset(); float blockEnd = blockStart + blockLocation.getLength(); float rowGroupEnd = start + length; Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) - (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; if (currentAffinity != null) { hostAffinityMap.put(host, currentAffinity + newAffinity); } else { hostAffinityMap.put(host, newAffinity); } } } return hostAffinityMap; }
/** * Builds a mapping of block locations to file byte range */ private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(FileStatus status) throws IOException { final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time(); BlockLocation[] blocks; ImmutableRangeMap<Long,BlockLocation> blockMap; blocks = fs.getFileBlockLocations(status, 0 , status.getLen()); ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<>(); for (BlockLocation block : blocks) { long start = block.getOffset(); long end = start + block.getLength(); Range<Long> range = Range.closedOpen(start, end); blockMapBuilder = blockMapBuilder.put(range, block); } blockMap = blockMapBuilder.build(); blockMapMap.put(status.getPath(), blockMap); context.stop(); return blockMap; }
/** * Wait for all files in waitList to have replication number equal to rep. */ private void waitForReplication() throws IOException { for (PathData item : waitList) { out.print("Waiting for " + item + " ..."); out.flush(); boolean printedWarning = false; boolean done = false; while (!done) { item.refreshStatus(); BlockLocation[] locations = item.fs.getFileBlockLocations(item.stat, 0, item.stat.getLen()); int i = 0; for(; i < locations.length; i++) { int currentRep = locations[i].getHosts().length; if (currentRep != newRep) { if (!printedWarning && currentRep > newRep) { out.println("\nWARNING: the waiting time may be long for " + "DECREASING the number of replications."); printedWarning = true; } break; } } done = i == locations.length; if (done) break; out.print("."); out.flush(); try {Thread.sleep(10000);} catch (InterruptedException e) {} } out.println(" done"); } }
@Override public BlockLocation[] getFileBlockLocations(FileStatus fs, long start, long len) throws IOException { final InodeTree.ResolveResult<FileSystem> res = fsState.resolve(getUriPath(fs.getPath()), true); return res.targetFileSystem.getFileBlockLocations( new ViewFsFileStatus(fs, res.remainingPath), start, len); }
@Override public BlockLocation[] getFileBlockLocations(final Path f, final long start, final long len) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { InodeTree.ResolveResult<AbstractFileSystem> res = fsState.resolve(getUriPath(f), true); return res.targetFileSystem.getFileBlockLocations(res.remainingPath, start, len); }
/** * Compute HDFS blocks distribution of a given file, or a portion of the file * @param fs file system * @param status file status of the file * @param start start position of the portion * @param length length of the portion * @return The HDFS blocks distribution */ static public HDFSBlocksDistribution computeHDFSBlocksDistribution( final FileSystem fs, FileStatus status, long start, long length) throws IOException { HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); BlockLocation [] blockLocations = fs.getFileBlockLocations(status, start, length); for(BlockLocation bl : blockLocations) { String [] hosts = bl.getHosts(); long len = bl.getLength(); blocksDistribution.addHostsAndBlockWeight(hosts, len); } return blocksDistribution; }
void compareBLs(BlockLocation[] viewBL, BlockLocation[] targetBL) { Assert.assertEquals(targetBL.length, viewBL.length); int i = 0; for (BlockLocation vbl : viewBL) { Assert.assertEquals(vbl.toString(), targetBL[i].toString()); Assert.assertEquals(targetBL[i].getOffset(), vbl.getOffset()); Assert.assertEquals(targetBL[i].getLength(), vbl.getLength()); i++; } }
/** * Convert a LocatedBlocks to BlockLocations[] * @param blocks a LocatedBlocks * @return an array of BlockLocations */ public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) { if (blocks == null) { return new BlockLocation[0]; } return locatedBlocks2Locations(blocks.getLocatedBlocks()); }
public static void main(String[] args) throws Exception { String uri = "hdfs://hadoop-master:9000/"; Configuration config = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), config, "root"); FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus file : listStatus) { System.out.println("[" + (file.isFile() ? "file" : "dir") + "] " + file.getPath().getName()); } RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); log.info("block size:{}",fileStatus.getBlockSize()); log.info("owner : {}", fileStatus.getOwner()); log.info("replication : {}" ,fileStatus.getReplication()); log.info("permission : {}", fileStatus.getPermission()); log.info("path name : {}",fileStatus.getPath().getName()); log.info("========block info========="); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation blockLocation : blockLocations){ log.info("block offset : {}",blockLocation.getOffset()); log.info("block length : {}",blockLocation.getLength()); String[] dataNodes = blockLocation.getHosts(); for (String dataNode : dataNodes){ log.info("dataNode :{}",dataNode); } } } }
@Override public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { try { return underlyingFs.getFileBlockLocations(p, start, len); } catch(FSError e) { throw propagateFSError(e); } }
private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(path); if(blockMap == null) { blockMap = buildBlockMap(path); } return blockMap; }
private ImmutableRangeMap<Long,BlockLocation> getBlockMap(FileStatus status) throws IOException{ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(status.getPath()); if (blockMap == null) { blockMap = buildBlockMap(status); } return blockMap; }
public BlockLocation[] buildBlockLocations(String[] hosts, long blockSize) { String[] names = new String[hosts.length]; for (int i = 0; i < hosts.length; i++) { hosts[i] = "host" + i; names[i] = "host:" + port; } BlockLocation[] blockLocations = new BlockLocation[3]; blockLocations[0] = new BlockLocation(new String[]{names[0], names[1], names[2]}, new String[]{hosts[0], hosts[1], hosts[2]}, 0, blockSize); blockLocations[1] = new BlockLocation(new String[]{names[0], names[2], names[3]}, new String[]{hosts[0], hosts[2], hosts[3]}, blockSize, blockSize); blockLocations[2] = new BlockLocation(new String[]{names[0], names[1], names[3]}, new String[]{hosts[0], hosts[1], hosts[3]}, blockSize*2, blockSize); return blockLocations; }
@Override protected List<CompleteFileWork> runInner() throws Exception { final List<CompleteFileWork> work = Lists.newArrayList(); boolean error = false; if (blockify && !compressed(status)) { try { ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(status); for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) { work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(status, l.getValue().getOffset(), l.getValue().getLength())), l.getValue().getOffset(), l.getValue().getLength(), status)); } } catch (IOException e) { logger.warn("failure while generating file work.", e); error = true; } } if (!blockify || error || compressed(status)) { work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(status)), 0, status.getLen(), status)); } // This if-condition is specific for empty CSV file // For CSV files, the global variable blockify is set as true // And if this CSV file is empty, rangeMap would be empty also // Therefore, at the point before this if-condition, work would not be populated if(work.isEmpty()) { work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(status)), 0, 0, status)); } return work; }
private BlockLocation[] mockBlockLocations(long size, long splitSize) { int numLocations = (int) (size / splitSize); if (size % splitSize != 0) numLocations++; BlockLocation[] blockLocations = new BlockLocation[numLocations]; for (int i = 0; i < numLocations; i++) { String[] names = new String[] { "b" + i }; String[] hosts = new String[] { "host" + i }; blockLocations[i] = new BlockLocation(names, hosts, i * splitSize, Math.min(splitSize, size - (splitSize * i))); } return blockLocations; }
@Test public void testStriper() throws Exception { final Random r = new Random(); final Configuration conf = new Configuration(); final FileSystem fs = FileSystem.getLocal(conf).getRaw(); conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024); final FilePool pool = new FilePool(conf, base) { @Override public BlockLocation[] locationsFor(FileStatus stat, long start, long len) throws IOException { return new BlockLocation[] { new BlockLocation() }; } }; pool.refresh(); final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024; final InputStriper striper = new InputStriper(pool, expectedPoolSize); int last = 0; for (int i = 0; i < expectedPoolSize; last = Math.min(expectedPoolSize - i, r.nextInt(expectedPoolSize))) { checkSplitEq(fs, striper.splitFor(pool, last, 0), last); i += last; } final InputStriper striper2 = new InputStriper(pool, expectedPoolSize); checkSplitEq(fs, striper2.splitFor(pool, expectedPoolSize, 0), expectedPoolSize); }
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
public String[] getLocations() throws IOException { HashSet<String> hostSet = new HashSet<String>(); for (Path file : getPaths()) { FileSystem fs = file.getFileSystem(getJob()); FileStatus status = fs.getFileStatus(file); BlockLocation[] blkLocations = fs.getFileBlockLocations(status, 0, status.getLen()); if (blkLocations != null && blkLocations.length > 0) { addToSet(hostSet, blkLocations[0].getHosts()); } } return hostSet.toArray(new String[hostSet.size()]); }