private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + "longer exists on the DataNode.", Long.toHexString(context.getReportId()), zombie.getStorageID()); assert(namesystem.hasWriteLock()); Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator(); int prevBlocks = zombie.numBlocks(); while (iter.hasNext()) { BlockInfoContiguous block = iter.next(); // We assume that a block can be on only one storage in a DataNode. // That's why we pass in the DatanodeDescriptor rather than the // DatanodeStorageInfo. // TODO: remove this assumption in case we want to put a block on // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) removeStoredBlock(block, zombie.getDatanodeDescriptor()); invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); } assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.", Long.toHexString(context.getReportId()), prevBlocks, zombie.getStorageID()); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = nameNodeProto.registerDatanode(dnRegistration); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports, new BlockReportContext(1, 0, System.nanoTime())); }
private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Mockito.verify(mockNN).blockReport( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(FAKE_BPID), Mockito.<StorageBlockReport[]>anyObject(), Mockito.<BlockReportContext>anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); return false; } } }, 500, 10000); }
private void waitForBlockReport( final DatanodeProtocolClientSideTranslatorPB mockNN1, final DatanodeProtocolClientSideTranslatorPB mockNN2) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return get(mockNN1) || get(mockNN2); } private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) { try { Mockito.verify(mockNN).blockReport( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(FAKE_BPID), Mockito.<StorageBlockReport[]>anyObject(), Mockito.<BlockReportContext>anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); return false; } } }, 500, 10000); }
private void waitForBlockReport( final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Mockito.verify(mockNN).blockReport( Mockito.eq(datanodeRegistration), Mockito.eq(POOL_ID), Mockito.<StorageBlockReport[]>anyObject(), Mockito.<BlockReportContext>anyObject()); return true; } catch (Throwable t) { LOG.info("waiting on block report: " + t.getMessage()); return false; } } }, 500, 100000); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = dataNodeProto.registerDatanode(dnRegistration); dnRegistration.setNamespaceInfo(nsInfo); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; dataNodeProto.blockReport(dnRegistration, bpid, reports, new BlockReportContext(1, 0, System.nanoTime(), 0L)); }
private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + "longer exists on the DataNode.", Long.toHexString(context.getReportId()), zombie.getStorageID()); assert(namesystem.hasWriteLock()); Iterator<BlockInfo> iter = zombie.getBlockIterator(); int prevBlocks = zombie.numBlocks(); while (iter.hasNext()) { BlockInfo block = iter.next(); // We assume that a block can be on only one storage in a DataNode. // That's why we pass in the DatanodeDescriptor rather than the // DatanodeStorageInfo. // TODO: remove this assumption in case we want to put a block on // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) removeStoredBlock(block, zombie.getDatanodeDescriptor()); invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); } assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.", Long.toHexString(context.getReportId()), prevBlocks, zombie.getStorageID()); }
@Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelper.convert(r.getStorage())); long[] blocks = r.getBlocks(); for (int i = 0; i < blocks.length; i++) { reportBuilder.addBlocks(blocks[i]); } builder.addReports(reportBuilder.build()); } builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = nameNodeProto.registerDatanode(dnRegistration); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, new BlockListAsLongs(null, null).getBlockListAsLongs()) }; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports, new BlockReportContext(1, 0, System.nanoTime())); }
@Override // DatanodeProtocol public DatanodeCommand blockReport(DatanodeRegistration nodeReg, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { checkNNStartup(); verifyRequest(nodeReg); if(blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from " + nodeReg + ", reports.length=" + reports.length); } final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; for (int r = 0; r < reports.length; r++) { final BlockListAsLongs blocks = reports[r].getBlocks(); // // BlockManager.processReport accumulates information of prior calls // for the same node and storage, so the value returned by the last // call of this loop is the final updated value for noStaleStorage. // noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(), blocks, context, (r == reports.length - 1)); metrics.incrStorageBlockReportOps(); } if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && !nn.isStandbyState() && noStaleStorages) { return new FinalizeCommand(poolId); } return null; }
public int updateBlockReportContext(BlockReportContext context) { if (curBlockReportId != context.getReportId()) { curBlockReportId = context.getReportId(); curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); } curBlockReportRpcsSeen.set(context.getCurRpc()); return curBlockReportRpcsSeen.cardinality(); }
@Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelper.convert(r.getStorage())); BlockListAsLongs blocks = r.getBlocks(); if (useBlocksBuffer) { reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks()); reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers()); } else { for (long value : blocks.getBlockListAsLongs()) { reportBuilder.addBlocks(value); } } builder.addReports(reportBuilder.build()); } builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; }
public static BlockReportContextProto convert(BlockReportContext context) { return BlockReportContextProto.newBuilder(). setTotalRpcs(context.getTotalRpcs()). setCurRpc(context.getCurRpc()). setId(context.getReportId()). build(); }
/** * Test that DNA_INVALIDATE commands from the standby are ignored. */ @Test public void testIgnoreDeletionsFromNonActive() throws Exception { BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); // Ask to invalidate FAKE_BLOCK when block report hits the // standby Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() })) .when(mockNN2).blockReport( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(FAKE_BPID), Mockito.<StorageBlockReport[]>anyObject(), Mockito.<BlockReportContext>anyObject()); bpos.start(); try { waitForInitialization(bpos); // Should get block reports from both NNs waitForBlockReport(mockNN1); waitForBlockReport(mockNN2); } finally { bpos.stop(); } // Should ignore the delete command from the standby Mockito.verify(mockFSDataset, Mockito.never()) .invalidate(Mockito.eq(FAKE_BPID), (Block[]) Mockito.anyObject()); }
/** * Test that if splitThreshold is zero, then we always get a separate * call per storage. */ @Test(timeout=300000) public void testAlwaysSplit() throws IOException, InterruptedException { startUpCluster(0); NameNode nn = cluster.getNameNode(); DataNode dn = cluster.getDataNodes().get(0); // Create a file with a few blocks. createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); // Insert a spy object for the NN RPC. DatanodeProtocolClientSideTranslatorPB nnSpy = DataNodeTestUtils.spyOnBposToNN(dn, nn); // Trigger a block report so there is an interaction with the spy // object. DataNodeTestUtils.triggerBlockReport(dn); ArgumentCaptor<StorageBlockReport[]> captor = ArgumentCaptor.forClass(StorageBlockReport[].class); Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( any(DatanodeRegistration.class), anyString(), captor.capture(), Mockito.<BlockReportContext>anyObject()); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); }
/** * Tests the behavior when the count of blocks is exactly one less than * the threshold. */ @Test(timeout=300000) public void testCornerCaseUnderThreshold() throws IOException, InterruptedException { startUpCluster(BLOCKS_IN_FILE + 1); NameNode nn = cluster.getNameNode(); DataNode dn = cluster.getDataNodes().get(0); // Create a file with a few blocks. createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); // Insert a spy object for the NN RPC. DatanodeProtocolClientSideTranslatorPB nnSpy = DataNodeTestUtils.spyOnBposToNN(dn, nn); // Trigger a block report so there is an interaction with the spy // object. DataNodeTestUtils.triggerBlockReport(dn); ArgumentCaptor<StorageBlockReport[]> captor = ArgumentCaptor.forClass(StorageBlockReport[].class); Mockito.verify(nnSpy, times(1)).blockReport( any(DatanodeRegistration.class), anyString(), captor.capture(), Mockito.<BlockReportContext>anyObject()); verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE); }
/** * Tests the behavior when the count of blocks is exactly equal to the * threshold. */ @Test(timeout=300000) public void testCornerCaseAtThreshold() throws IOException, InterruptedException { startUpCluster(BLOCKS_IN_FILE); NameNode nn = cluster.getNameNode(); DataNode dn = cluster.getDataNodes().get(0); // Create a file with a few blocks. createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); // Insert a spy object for the NN RPC. DatanodeProtocolClientSideTranslatorPB nnSpy = DataNodeTestUtils.spyOnBposToNN(dn, nn); // Trigger a block report so there is an interaction with the spy // object. DataNodeTestUtils.triggerBlockReport(dn); ArgumentCaptor<StorageBlockReport[]> captor = ArgumentCaptor.forClass(StorageBlockReport[].class); Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( any(DatanodeRegistration.class), anyString(), captor.capture(), Mockito.<BlockReportContext>anyObject()); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); }
@Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { int i = 0; for (StorageBlockReport report : reports) { LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); StorageBlockReport[] singletonReport = { report }; cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, new BlockReportContext(reports.length, i, System.nanoTime())); i++; } }
@Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { LOG.info("Sending combined block reports for " + dnR); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports, new BlockReportContext(1, 0, System.nanoTime())); }
/** Test that a full block report is sent after hot swapping volumes */ @Test(timeout=100000) public void testFullBlockReportAfterRemovingVolumes() throws IOException, ReconfigurationException { Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); // Similar to TestTriggerBlockReport, set a really long value for // dfs.heartbeat.interval, so that incremental block reports and heartbeats // won't be sent during this test unless they're triggered // manually. conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); final DataNode dn = cluster.getDataNodes().get(0); DatanodeProtocolClientSideTranslatorPB spy = DataNodeTestUtils.spyOnBposToNN(dn, cluster.getNameNode()); // Remove a data dir from datanode File dataDirToKeep = new File(cluster.getDataDirectory(), "data1"); dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, dataDirToKeep.toString()); // We should get 1 full report Mockito.verify(spy, timeout(60000).times(1)).blockReport( any(DatanodeRegistration.class), anyString(), any(StorageBlockReport[].class), any(BlockReportContext.class)); }
private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + "longer exists on the DataNode.", Long.toHexString(context.getReportId()), zombie.getStorageID()); assert(namesystem.hasWriteLock()); Iterator<BlockInfo> iter = zombie.getBlockIterator(); int prevBlocks = zombie.numBlocks(); while (iter.hasNext()) { BlockInfo block = iter.next(); // We assume that a block can be on only one storage in a DataNode. // That's why we pass in the DatanodeDescriptor rather than the // DatanodeStorageInfo. // TODO: remove this assumption in case we want to put a block on // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) removeStoredBlock(block, zombie.getDatanodeDescriptor()); Block b = getBlockOnStorage(block, zombie); if (b != null) { invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b); } } assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.", Long.toHexString(context.getReportId()), prevBlocks, zombie.getStorageID()); }
@Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelperClient.convert(r.getStorage())); BlockListAsLongs blocks = r.getBlocks(); if (useBlocksBuffer) { reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks()); reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers()); } else { for (long value : blocks.getBlockListAsLongs()) { reportBuilder.addBlocks(value); } } builder.addReports(reportBuilder.build()); } builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; }
public static BlockReportContextProto convert(BlockReportContext context) { return BlockReportContextProto.newBuilder(). setTotalRpcs(context.getTotalRpcs()). setCurRpc(context.getCurRpc()). setId(context.getReportId()). setLeaseId(context.getLeaseId()). build(); }
/** * Test that DNA_INVALIDATE commands from the standby are ignored. */ @Test public void testIgnoreDeletionsFromNonActive() throws Exception { BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); // Ask to invalidate FAKE_BLOCK when block report hits the // standby Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() })) .when(mockNN2).blockReport( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(FAKE_BPID), Mockito.<StorageBlockReport[]>anyObject(), Mockito.<BlockReportContext>anyObject()); bpos.start(); try { waitForInitialization(bpos); // Should get block reports from both NNs waitForBlockReport(mockNN1); waitForBlockReport(mockNN2); } finally { bpos.stop(); bpos.join(); } // Should ignore the delete command from the standby Mockito.verify(mockFSDataset, Mockito.never()) .invalidate(Mockito.eq(FAKE_BPID), (Block[]) Mockito.anyObject()); }
@Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { int i = 0; for (StorageBlockReport report : reports) { LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); StorageBlockReport[] singletonReport = { report }; cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, new BlockReportContext(reports.length, i, System.nanoTime(), 0L)); i++; } }
@Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { LOG.info("Sending combined block reports for " + dnR); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports, new BlockReportContext(1, 0, System.nanoTime(), 0L)); }