/** * This method is valid only if the data nodes have simulated data * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param blocksToInject - the blocks * @param bpid - (optional) the block pool id to use for injecting blocks. * If not supplied then it is queried from the in-process NameNode. * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node * */ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject, String bpid) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } if (bpid == null) { bpid = getNamesystem().getBlockPoolId(); } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { List<File> files = new ArrayList<File>(); List<DataNode> datanodes = cluster.getDataNodes(); String poolId = cluster.getNamesystem().getBlockPoolId(); List<Map<DatanodeStorage, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId); for(int i = 0; i < blocks.size(); i++) { DataNode dn = datanodes.get(i); Map<DatanodeStorage, BlockListAsLongs> map = blocks.get(i); for(Map.Entry<DatanodeStorage, BlockListAsLongs> e : map.entrySet()) { for(Block b : e.getValue()) { files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId())); } } } return files; }
/** * Wait for the datanodes in the cluster to process any block * deletions that have already been asynchronously queued. */ public static void waitForDNDeletions(final MiniDFSCluster cluster) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { for (DataNode dn : cluster.getDataNodes()) { if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) { return false; } } return true; } }, 1000, 10000); }
/** * Add a thread which periodically triggers deletion reports, * heartbeats, and NN-side block work. * @param interval millisecond period on which to run */ public void addReplicationTriggerThread(final int interval) { testCtx.addThread(new RepeatingTestThread(testCtx) { @Override public void doAnAction() throws Exception { for (DataNode dn : cluster.getDataNodes()) { DataNodeTestUtils.triggerDeletionReport(dn); DataNodeTestUtils.triggerHeartbeat(dn); } for (int i = 0; i < 2; i++) { NameNode nn = cluster.getNameNode(i); BlockManagerTestUtil.computeAllPendingWork( nn.getNamesystem().getBlockManager()); } Thread.sleep(interval); } }); }
private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { // Do the stats check over 10 heartbeats for (int i = 0; i < 10; i++) { long[] newStats = namenode.getRpcServer().getStats(); // For decommissioning nodes, ensure capacity of the DN is no longer // counted. Only used space of the DN is counted in cluster capacity assertEquals(newStats[0], decommissioning ? info.getDfsUsed() : info.getCapacity()); // Ensure cluster used capacity is counted for both normal and // decommissioning nodes assertEquals(newStats[1], info.getDfsUsed()); // For decommissioning nodes, remaining space from the DN is not counted assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining()); // Ensure transceiver count is same as that DN assertEquals(fsn.getTotalLoad(), info.getXceiverCount()); DataNodeTestUtils.triggerHeartbeat(node); } }
/** * Multiple-NameNode version of injectBlocks. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
@Test public void testClose() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test close testClose(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testAppend() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test append testAppend(bpid, dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToRbw() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToRbw testWriteToRbw(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToTemporary() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToTemporary testWriteToTemporary(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test(timeout=60000) public void testNoBackingReplica() throws Exception { // Cache all three replicas for a file. final Path filename = new Path("/noback"); final short replication = (short) 3; DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC); dfs.addCachePool(new CachePoolInfo("pool")); dfs.addCacheDirective( new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename) .setReplication(replication).build()); waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1"); // Pause cache reports while we change the replication factor. // This will orphan some cached replicas. DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true); try { dfs.setReplication(filename, (short) 1); DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000); // The cache locations should drop down to 1 even without cache reports. waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2"); } finally { DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false); } }
@Test public void testReleaseOnFileDeletion() throws IOException, TimeoutException, InterruptedException { getClusterBuilder().setNumDatanodes(1) .setMaxLockedMemory(BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path, RAM_DISK); assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); // Delete the file and ensure that the locked memory is released. fs.delete(path, false); DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); waitForLockedBytesUsed(fsd, 0); }
@Test public void testClose() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test close testClose(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testAppend() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test append testAppend(bpid, dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToRbw() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test writeToRbw testWriteToRbw(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToTemporary() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test writeToTemporary testWriteToTemporary(dataSet, blocks); } finally { cluster.shutdown(); } }
private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { // Do the stats check over 10 heartbeats for (int i = 0; i < 10; i++) { long[] newStats = namenode.getRpcServer().getStats(); // For decommissioning nodes, ensure capacity of the DN and dfsUsed // is no longer counted towards total assertEquals(newStats[0], decommissioning ? 0 : info.getCapacity()); // Ensure cluster used capacity is counted for normal nodes only assertEquals(newStats[1], decommissioning ? 0 : info.getDfsUsed()); // For decommissioning nodes, remaining space from the DN is not counted assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining()); // Ensure transceiver count is same as that DN assertEquals(fsn.getTotalLoad(), info.getXceiverCount()); DataNodeTestUtils.triggerHeartbeat(node); } }
@Test public void testWriteToRbw() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToRbw testWriteToRbw(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToTempoary() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToTemporary testWriteToTemporary(dataSet, blocks); } finally { cluster.shutdown(); } }
/** * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException( "injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }