private void validateNumberReplicas(int expectedReplicas) throws IOException { NumberReplicas numberReplicas = blockManager.countNodes(block); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0)); assertThat(numberReplicas.decommissionedReplicas(), is(0)); assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); BlockManagerTestUtil.updateState(blockManager); assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L)); assertThat(blockManager.getExcessBlocksCount(), is(0L)); }
/** * Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even * when the single NORMAL replica is offline (and the effective replication count is 0). */ @Test public void testNormalReplicaOffline() throws Exception { // Stop the datanode hosting the NORMAL replica cluster.stopDataNode(normalDataNode.getXferAddr()); // Force NameNode to detect that the datanode is down BlockManagerTestUtil.noticeDeadDatanode( cluster.getNameNode(), normalDataNode.getXferAddr()); // The live replica count should now be zero (since the NORMAL replica is offline) NumberReplicas numberReplicas = blockManager.countNodes(block); assertThat(numberReplicas.liveReplicas(), is(0)); // The block should be reported as under-replicated BlockManagerTestUtil.updateState(blockManager); assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L)); // The BlockManager should be able to heal the replication count back to 1 // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas BlockManagerTestUtil.computeAllPendingWork(blockManager); DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0); // There should now be 2 *locations* for the block, and 1 *replica* assertThat(getLocatedBlock().getLocations().length, is(2)); validateNumberReplicas(1); }
/** * Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted * towards the corrupt replicas total. */ @Test public void testReadOnlyReplicaCorrupt() throws Exception { // "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica client.reportBadBlocks(new LocatedBlock[] { new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode }) }); // There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt waitForLocations(1); // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count NumberReplicas numberReplicas = blockManager.countNodes(block); assertThat(numberReplicas.corruptReplicas(), is(0)); }
private void validateNumberReplicas(int expectedReplicas) throws IOException { NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0)); assertThat(numberReplicas.decommissionedAndDecommissioning(), is(0)); assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); BlockManagerTestUtil.updateState(blockManager); assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L)); assertThat(blockManager.getExcessBlocksCount(), is(0L)); }
/** * Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even * when the single NORMAL replica is offline (and the effective replication count is 0). */ @Test public void testNormalReplicaOffline() throws Exception { // Stop the datanode hosting the NORMAL replica cluster.stopDataNode(normalDataNode.getXferAddr()); // Force NameNode to detect that the datanode is down BlockManagerTestUtil.noticeDeadDatanode( cluster.getNameNode(), normalDataNode.getXferAddr()); // The live replica count should now be zero (since the NORMAL replica is offline) NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(0)); // The block should be reported as under-replicated BlockManagerTestUtil.updateState(blockManager); assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L)); // The BlockManager should be able to heal the replication count back to 1 // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas BlockManagerTestUtil.computeAllPendingWork(blockManager); DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0); // There should now be 2 *locations* for the block, and 1 *replica* assertThat(getLocatedBlock().getLocations().length, is(2)); validateNumberReplicas(1); }
/** * Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted * towards the corrupt replicas total. */ @Test public void testReadOnlyReplicaCorrupt() throws Exception { // "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica client.reportBadBlocks(new LocatedBlock[] { new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode }) }); // There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt waitForLocations(1); // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.corruptReplicas(), is(0)); }
private static NumberReplicas countReplicas(final FSNamesystem namesystem, final ExtendedBlock block) throws IOException { return (NumberReplicas) new HopsTransactionalRequestHandler( HDFSOperationType.COUNT_NODES) { INodeIdentifier inodeIdentifier; @Override public void setUp() throws StorageException, IOException { inodeIdentifier = INodeUtil.resolveINodeFromBlock(block.getLocalBlock()); } @Override public void acquireLock(TransactionLocks locks) throws IOException { LockFactory lf = LockFactory.getInstance(); locks .add(lf.getIndividualBlockLock(block.getBlockId(), inodeIdentifier)) .add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR)); } @Override public Object performTask() throws StorageException, IOException { return namesystem.getBlockManager().countNodes(block.getLocalBlock()); } }.handle(namesystem); }
/** * Check block information given a blockId number * */ public void blockIdCK(String blockId) { if(blockId == null) { out.println("Please provide valid blockId!"); return; } BlockManager bm = namenode.getNamesystem().getBlockManager(); try { //get blockInfo Block block = new Block(Block.getBlockId(blockId)); //find which file this block belongs to BlockInfoContiguous blockInfo = bm.getStoredBlock(block); if(blockInfo == null) { out.println("Block "+ blockId +" " + NONEXISTENT_STATUS); LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS); return; } BlockCollection bc = bm.getBlockCollection(blockInfo); INode iNode = (INode) bc; NumberReplicas numberReplicas= bm.countNodes(block); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + bc.getBlockReplication()); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes()); out.println("No. of decommission Replica: " + numberReplicas.decommissionedReplicas()); out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas()); //record datanodes that have corrupted block replica Collection<DatanodeDescriptor> corruptionRecord = null; if (bm.getCorruptReplicas(block) != null) { corruptionRecord = bm.getCorruptReplicas(block); } //report block replicas status on datanodes for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); out.print("Block replica on datanode/rack: " + dn.getHostName() + dn.getNetworkLocation() + " "); if (corruptionRecord != null && corruptionRecord.contains(dn)) { out.print(CORRUPT_STATUS+"\t ReasonCode: "+ bm.getCorruptReason(block,dn)); } else if (dn.isDecommissioned() ){ out.print(DECOMMISSIONED_STATUS); } else if (dn.isDecommissionInProgress()) { out.print(DECOMMISSIONING_STATUS); } else { out.print(HEALTHY_STATUS); } out.print("\n"); } } catch (Exception e){ String errMsg = "Fsck on blockId '" + blockId; LOG.warn(errMsg, e); out.println(e.getMessage()); out.print("\n\n" + errMsg); LOG.warn("Error in looking up block", e); } }
private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { return namesystem.getBlockManager().countNodes(block.getLocalBlock()); }
/** * Check block information given a blockId number * */ public void blockIdCK(String blockId) { if(blockId == null) { out.println("Please provide valid blockId!"); return; } try { //get blockInfo Block block = new Block(Block.getBlockId(blockId)); //find which file this block belongs to BlockInfo blockInfo = blockManager.getStoredBlock(block); if(blockInfo == null) { out.println("Block "+ blockId +" " + NONEXISTENT_STATUS); LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS); return; } final INodeFile iNode = namenode.getNamesystem().getBlockCollection(blockInfo); NumberReplicas numberReplicas= blockManager.countNodes(blockInfo); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + blockManager.getExpectedReplicaNum(blockInfo)); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes()); out.println("No. of decommissioned Replica: " + numberReplicas.decommissioned()); out.println("No. of decommissioning Replica: " + numberReplicas.decommissioning()); out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas()); //record datanodes that have corrupted block replica Collection<DatanodeDescriptor> corruptionRecord = null; if (blockManager.getCorruptReplicas(block) != null) { corruptionRecord = blockManager.getCorruptReplicas(block); } //report block replicas status on datanodes for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); out.print("Block replica on datanode/rack: " + dn.getHostName() + dn.getNetworkLocation() + " "); if (corruptionRecord != null && corruptionRecord.contains(dn)) { out.print(CORRUPT_STATUS + "\t ReasonCode: " + blockManager.getCorruptReason(block, dn)); } else if (dn.isDecommissioned() ){ out.print(DECOMMISSIONED_STATUS); } else if (dn.isDecommissionInProgress()) { out.print(DECOMMISSIONING_STATUS); } else { out.print(HEALTHY_STATUS); } out.print("\n"); } } catch (Exception e){ String errMsg = "Fsck on blockId '" + blockId; LOG.warn(errMsg, e); out.println(e.getMessage()); out.print("\n\n" + errMsg); LOG.warn("Error in looking up block", e); } }
private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { final BlockManager blockManager = namesystem.getBlockManager(); return blockManager.countNodes(blockManager.getStoredBlock( block.getLocalBlock())); }