@Override public void reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) { try { LOG.info("Reconfiguring " + property + " to " + newVal); this.refreshVolumes(newVal); } catch (IOException e) { throw new ReconfigurationException(property, newVal, getConf().get(property), e); } } else { throw new ReconfigurationException( property, newVal, getConf().get(property)); } }
/** * Reconfigure a DataNode by setting a new list of volumes. * * @param dn DataNode to reconfigure * @param newVols new volumes to configure * @throws Exception if there is any failure */ private static void reconfigureDataNode(DataNode dn, File... newVols) throws Exception { StringBuilder dnNewDataDirs = new StringBuilder(); for (File newVol: newVols) { if (dnNewDataDirs.length() > 0) { dnNewDataDirs.append(','); } dnNewDataDirs.append(newVol.getAbsolutePath()); } try { dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnNewDataDirs.toString()); } catch (ReconfigurationException e) { // This can be thrown if reconfiguration tries to use a failed volume. // We need to swallow the exception, because some of our tests want to // cover this case. LOG.warn("Could not reconfigure DataNode.", e); } }
@Test(timeout=60000) public void testReplicatingAfterRemoveVolume() throws InterruptedException, TimeoutException, IOException, ReconfigurationException { startDFSCluster(1, 2); final FileSystem fs = cluster.getFileSystem(); final short replFactor = 2; Path testFile = new Path("/test"); createFile(testFile, 4, replFactor); DataNode dn = cluster.getDataNodes().get(0); Collection<String> oldDirs = getDataDirs(dn); String newDirs = oldDirs.iterator().next(); // Keep the first volume. dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); assertFileLocksReleased( new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); triggerDeleteReport(dn); waitReplication(fs, testFile, 1, 1); DFSTestUtil.waitReplication(fs, testFile, replFactor); }
private void testAcquireWithMaxConcurrentMoversShared( int maxConcurrentMovers) throws IOException, ReconfigurationException { DataNode[] dns = null; try { dns = createDNsForTest(1); testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], maxConcurrentMovers); } catch (IOException ioe) { throw ioe; } catch (ReconfigurationException re) { throw re; } finally { shutDownDNs(dns); } }
/** * Test changing the number of volumes does not impact the disk failure * tolerance. */ @Test public void testTolerateVolumeFailuresAfterAddingMoreVolumes() throws InterruptedException, ReconfigurationException, IOException { final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); final File dn0VolNew = new File(dataDir, "data_new"); final DataNode dn0 = cluster.getDataNodes().get(0); final String oldDataDirs = dn0.getConf().get( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); // Add a new volume to DN0 dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, oldDataDirs + "," + dn0VolNew.getAbsolutePath()); // Fail dn0Vol1 first and hot swap it. DataNodeTestUtils.injectDataDirFailure(dn0Vol1); checkDiskErrorSync(dn0); assertTrue(dn0.shouldRun()); // Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure. DataNodeTestUtils.injectDataDirFailure(dn0Vol2); checkDiskErrorSync(dn0); assertFalse(dn0.shouldRun()); }
@Override protected void reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { try { if (property.equals(MAX_TRACKER_BLACKLISTS_PROPERTY)) { LOG.info("changing maxTrackerBlacklists to " + newVal + " from " + initialJobRefreshTimeoutMs); maxTrackerBlacklists = Integer.parseInt(newVal); } else if (property.equals(MAX_UNIQUE_COUNTER_NAMES)) { LOG.info("changing maxUniqueCounterNames to " + newVal + " from " + initialJobRefreshTimeoutMs); maxUniqueCounterNames = Integer.parseInt(newVal); } else if (property.equals(INITIAL_JOB_REFRESH_TIMEOUT_MS_PROPERTY)) { LOG.info("changing initialJobRefreshTimeoutMs to " + newVal + " from " + initialJobRefreshTimeoutMs); initialJobRefreshTimeoutMs = Long.parseLong(newVal); } } catch (NumberFormatException e) { LOG.warn("reconfigurePropertyImpl: Invalid property " + property + " or newVal " + newVal); } }
@Override public void reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { String expectation = ""; switch(instance) { case NODEZERO: expectation = DFS_NAMENODE_RPC_ADDRESS1_KEY; break; case NODEONE: expectation = DFS_NAMENODE_RPC_ADDRESS0_KEY; break; } if (property.equals(expectation)) { getConf().set(property, newVal); return; } super.reconfigurePropertyImpl(property, newVal); }
@Override public final String reconfigureProperty(String property, String newVal) throws ReconfigurationException { if (isPropertyReconfigurable(property)) { String oldVal; synchronized(getConf()) { oldVal = getConf().get(property); reconfigurePropertyImpl(property, newVal); if (newVal != null) { getConf().set(property, newVal); } else { getConf().unset(property); } } return oldVal; } else { throw new ReconfigurationException(property, newVal, getConf().get(property)); } }
/** * {@inheritDoc} */ @Override public void reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { if (property.equals("dfs.data.dir")) { try { LOG.info("Reconfigure " + property + " to " + newVal); this.refreshVolumes(newVal); } catch (Exception e) { throw new ReconfigurationException(property, newVal, getConf().get(property), e); } } else { throw new ReconfigurationException(property, newVal, getConf().get(property)); } }
@Override public void reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) { try { LOG.info("Reconfiguring " + property + " to " + newVal); this.refreshVolumes(newVal); } catch (Exception e) { throw new ReconfigurationException(property, newVal, getConf().get(property), e); } } else { throw new ReconfigurationException( property, newVal, getConf().get(property)); } }
/** * Test adding one volume on a running MiniDFSCluster with only one NameNode. */ @Test(timeout=60000) public void testAddOneNewVolume() throws IOException, ReconfigurationException, InterruptedException, TimeoutException { startDFSCluster(1, 1); String bpid = cluster.getNamesystem().getBlockPoolId(); final int numBlocks = 10; addVolumes(1); Path testFile = new Path("/test"); createFile(testFile, numBlocks); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals(1, blockReports.size()); // 1 DataNode assertEquals(3, blockReports.get(0).size()); // 3 volumes // FSVolumeList uses Round-Robin block chooser by default. Thus the new // blocks should be evenly located in all volumes. int minNumBlocks = Integer.MAX_VALUE; int maxNumBlocks = Integer.MIN_VALUE; for (BlockListAsLongs blockList : blockReports.get(0).values()) { minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks()); maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks()); } assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1); verifyFileLength(cluster.getFileSystem(), testFile, numBlocks); }
@Test(timeout=60000) public void testAddVolumesDuringWrite() throws IOException, InterruptedException, TimeoutException, ReconfigurationException { startDFSCluster(1, 1); String bpid = cluster.getNamesystem().getBlockPoolId(); Path testFile = new Path("/test"); createFile(testFile, 4); // Each volume has 2 blocks. addVolumes(2); // Continue to write the same file, thus the new volumes will have blocks. DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8); verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4); // After appending data, there should be [2, 2, 4, 4] blocks in each volume // respectively. List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals(1, blockReports.size()); // 1 DataNode assertEquals(4, blockReports.get(0).size()); // 4 volumes Map<DatanodeStorage, BlockListAsLongs> dnReport = blockReports.get(0); List<Integer> actualNumBlocks = new ArrayList<Integer>(); for (BlockListAsLongs blockList : dnReport.values()) { actualNumBlocks.add(blockList.getNumberOfBlocks()); } Collections.sort(actualNumBlocks); assertEquals(expectedNumBlocks, actualNumBlocks); }
@Test(timeout=60000) public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, ReconfigurationException { // Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2 // volumes. final int numNameNodes = 2; final int numDataNodes = 1; startDFSCluster(numNameNodes, numDataNodes); Path testFile = new Path("/test"); // Create a file on the first namespace with 4 blocks. createFile(0, testFile, 4); // Create a file on the second namespace with 4 blocks. createFile(1, testFile, 4); // Add 2 volumes to the first DataNode. final int numNewVolumes = 2; addVolumes(numNewVolumes); // Append to the file on the first namespace. DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8); List<List<Integer>> actualNumBlocks = getNumBlocksReport(0); assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size()); List<Integer> blocksOnFirstDN = actualNumBlocks.get(0); Collections.sort(blocksOnFirstDN); assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN); // Verify the second namespace also has the new volumes and they are empty. actualNumBlocks = getNumBlocksReport(1); assertEquals(4, actualNumBlocks.get(0).size()); assertEquals(numNewVolumes, Collections.frequency(actualNumBlocks.get(0), 0)); }
@Test(timeout=60000) public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException { startDFSCluster(1, 1); final short replFactor = 1; Path testFile = new Path("/test"); createFile(testFile, 10, replFactor); DataNode dn = cluster.getDataNodes().get(0); Collection<String> oldDirs = getDataDirs(dn); String newDirs = oldDirs.iterator().next(); // Keep the first volume. dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); assertFileLocksReleased( new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); dn.scheduleAllBlockReport(0); try { DFSTestUtil.readFile(cluster.getFileSystem(), testFile); fail("Expect to throw BlockMissingException."); } catch (BlockMissingException e) { GenericTestUtils.assertExceptionContains("Could not obtain block", e); } Path newFile = new Path("/newFile"); createFile(newFile, 6); String bpid = cluster.getNamesystem().getBlockPoolId(); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals((int)replFactor, blockReports.size()); BlockListAsLongs blocksForVolume1 = blockReports.get(0).values().iterator().next(); // The first volume has half of the testFile and full of newFile. assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); }
@Test(timeout=180000) public void testRemoveVolumeBeingWritten() throws InterruptedException, TimeoutException, ReconfigurationException, IOException, BrokenBarrierException { // test against removing volumes on the different DataNode on the pipeline. for (int i = 0; i < 3; i++) { testRemoveVolumeBeingWrittenForDatanode(i); } }
@Test(timeout=60000) public void testAddBackRemovedVolume() throws IOException, TimeoutException, InterruptedException, ReconfigurationException { startDFSCluster(1, 2); // Create some data on every volume. createFile(new Path("/test"), 32); DataNode dn = cluster.getDataNodes().get(0); Configuration conf = dn.getConf(); String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY); String keepDataDir = oldDataDir.split(",")[0]; String removeDataDir = oldDataDir.split(",")[1]; dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, keepDataDir); for (int i = 0; i < cluster.getNumNameNodes(); i++) { String bpid = cluster.getNamesystem(i).getBlockPoolId(); BlockPoolSliceStorage bpsStorage = dn.getStorage().getBPStorage(bpid); // Make sure that there is no block pool level storage under removeDataDir. for (int j = 0; j < bpsStorage.getNumStorageDirs(); j++) { Storage.StorageDirectory sd = bpsStorage.getStorageDir(j); assertFalse(sd.getRoot().getAbsolutePath().startsWith( new File(removeDataDir).getAbsolutePath() )); } assertEquals(dn.getStorage().getBPStorage(bpid).getNumStorageDirs(), 1); } // Bring the removed directory back. It only successes if all metadata about // this directory were removed from the previous step. dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir); }
/** * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in * DataNode upon a volume failure. Thus we can run reconfig on the same * configuration to reload the new volume on the same directory as the failed one. */ @Test(timeout=60000) public void testDirectlyReloadAfterCheckDiskError() throws IOException, TimeoutException, InterruptedException, ReconfigurationException { startDFSCluster(1, 2); createFile(new Path("/test"), 32, (short)2); DataNode dn = cluster.getDataNodes().get(0); final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY); File dirToFail = new File(cluster.getDataDirectory(), "data1"); FsVolumeImpl failedVolume = getVolume(dn, dirToFail); assertTrue("No FsVolume was found for " + dirToFail, failedVolume != null); long used = failedVolume.getDfsUsed(); DataNodeTestUtils.injectDataDirFailure(dirToFail); // Call and wait DataNode to detect disk failure. long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); dn.checkDiskErrorAsync(); while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) { Thread.sleep(100); } createFile(new Path("/test1"), 32, (short)2); assertEquals(used, failedVolume.getDfsUsed()); DataNodeTestUtils.restoreDataDirFromFailure(dirToFail); dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir); createFile(new Path("/test2"), 32, (short)2); FsVolumeImpl restoredVolume = getVolume(dn, dirToFail); assertTrue(restoredVolume != null); assertTrue(restoredVolume != failedVolume); // More data has been written to this volume. assertTrue(restoredVolume.getDfsUsed() > used); }
/** Test that a full block report is sent after hot swapping volumes */ @Test(timeout=100000) public void testFullBlockReportAfterRemovingVolumes() throws IOException, ReconfigurationException { Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); // Similar to TestTriggerBlockReport, set a really long value for // dfs.heartbeat.interval, so that incremental block reports and heartbeats // won't be sent during this test unless they're triggered // manually. conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); final DataNode dn = cluster.getDataNodes().get(0); DatanodeProtocolClientSideTranslatorPB spy = DataNodeTestUtils.spyOnBposToNN(dn, cluster.getNameNode()); // Remove a data dir from datanode File dataDirToKeep = new File(cluster.getDataDirectory(), "data1"); dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, dataDirToKeep.toString()); // We should get 1 full report Mockito.verify(spy, timeout(60000).times(1)).blockReport( any(DatanodeRegistration.class), anyString(), any(StorageBlockReport[].class), any(BlockReportContext.class)); }
private void testAcquireOnMaxConcurrentMoversReconfiguration( DataNode dataNode, int maxConcurrentMovers) throws IOException, ReconfigurationException { int defaultMaxThreads = dataNode.getConf().getInt( DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); for (int i = 0; i < defaultMaxThreads; i++) { assertEquals("should be able to get thread quota", true, dataNode.xserver.balanceThrottler.acquire()); } assertEquals("should not be able to get thread quota", false, dataNode.xserver.balanceThrottler.acquire()); // change properties dataNode.reconfigureProperty( DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, String.valueOf(maxConcurrentMovers)); assertEquals("thread quota is wrong", maxConcurrentMovers, dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota int val = Math.abs(maxConcurrentMovers - defaultMaxThreads); if (defaultMaxThreads < maxConcurrentMovers) { for (int i = 0; i < val; i++) { assertEquals("should be able to get thread quota", true, dataNode.xserver.balanceThrottler.acquire()); } } else if (defaultMaxThreads > maxConcurrentMovers) { for (int i = 0; i < val; i++) { assertEquals("should not be able to get thread quota", false, dataNode.xserver.balanceThrottler.acquire()); } } assertEquals("should not be able to get thread quota", false, dataNode.xserver.balanceThrottler.acquire()); }
/** * Test that DN does not shutdown, as long as failure volumes being hot swapped. */ @Test public void testVolumeFailureRecoveredByHotSwappingVolume() throws InterruptedException, ReconfigurationException, IOException { final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); final DataNode dn0 = cluster.getDataNodes().get(0); final String oldDataDirs = dn0.getConf().get( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); // Fail dn0Vol1 first. DataNodeTestUtils.injectDataDirFailure(dn0Vol1); checkDiskErrorSync(dn0); // Hot swap out the failure volume. String dataDirs = dn0Vol2.getPath(); dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirs); // Fix failure volume dn0Vol1 and remount it back. DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1); dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, oldDataDirs); // Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient // resources, thus it should keep running. DataNodeTestUtils.injectDataDirFailure(dn0Vol2); checkDiskErrorSync(dn0); assertTrue(dn0.shouldRun()); }