@Test(timeout = 30000) public void testGetReconfigureStatus() throws IOException, InterruptedException { ReconfigurationUtil ru = mock(ReconfigurationUtil.class); datanode.setReconfigurationUtil(ru); List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<ReconfigurationUtil.PropertyChange>(); File newDir = new File(cluster.getDataDirectory(), "data_new"); newDir.mkdirs(); changes.add(new ReconfigurationUtil.PropertyChange( DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); changes.add(new ReconfigurationUtil.PropertyChange( "randomKey", "new123", "old456")); when(ru.parseChangedProperties(any(Configuration.class), any(Configuration.class))).thenReturn(changes); final int port = datanode.getIpcPort(); final String address = "localhost:" + port; assertThat(admin.startReconfiguration("datanode", address), is(0)); List<String> outputs = null; int count = 100; while (count > 0) { outputs = getReconfigureStatus("datanode", address); if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { break; } count--; Thread.sleep(100); } assertTrue(count > 0); assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED) List<StorageLocation> locations = DataNode.getStorageLocations( datanode.getConf()); assertThat(locations.size(), is(1)); assertThat(locations.get(0).getFile(), is(newDir)); // Verify the directory is appropriately formatted. assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5; int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4; assertThat(outputs.get(successOffset), containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY)); assertThat(outputs.get(successOffset + 1), is(allOf(containsString("From:"), containsString("data1"), containsString("data2")))); assertThat(outputs.get(successOffset + 2), is(not(anyOf(containsString("data1"), containsString("data2"))))); assertThat(outputs.get(successOffset + 2), is(allOf(containsString("To"), containsString("data_new")))); assertThat(outputs.get(failedOffset), containsString("Change property randomKey")); assertThat(outputs.get(failedOffset + 1), containsString("From: \"old456\"")); assertThat(outputs.get(failedOffset + 2), containsString("To: \"new123\"")); }
/** * Test reconfiguration and check the status outputs. * @param expectedSuccuss set true if the reconfiguration task should success. * @throws IOException * @throws InterruptedException */ private void testGetReconfigurationStatus(boolean expectedSuccuss) throws IOException, InterruptedException { ReconfigurationUtil ru = mock(ReconfigurationUtil.class); datanode.setReconfigurationUtil(ru); List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>(); File newDir = new File(cluster.getDataDirectory(), "data_new"); if (expectedSuccuss) { newDir.mkdirs(); } else { // Inject failure. newDir.createNewFile(); } changes.add(new ReconfigurationUtil.PropertyChange( DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); changes.add(new ReconfigurationUtil.PropertyChange( "randomKey", "new123", "old456")); when(ru.parseChangedProperties(any(Configuration.class), any(Configuration.class))).thenReturn(changes); final int port = datanode.getIpcPort(); final String address = "localhost:" + port; assertThat(admin.startReconfiguration("datanode", address), is(0)); List<String> outputs = null; int count = 100; while (count > 0) { outputs = getReconfigureStatus("datanode", address); if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { break; } count--; Thread.sleep(100); } assertTrue(count > 0); if (expectedSuccuss) { assertThat(outputs.size(), is(4)); } else { assertThat(outputs.size(), is(6)); } List<StorageLocation> locations = DataNode.getStorageLocations( datanode.getConf()); if (expectedSuccuss) { assertThat(locations.size(), is(1)); assertThat(locations.get(0).getFile(), is(newDir)); // Verify the directory is appropriately formatted. assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); } else { assertTrue(locations.isEmpty()); } int offset = 1; if (expectedSuccuss) { assertThat(outputs.get(offset), containsString("SUCCESS: Changed property " + DFS_DATANODE_DATA_DIR_KEY)); } else { assertThat(outputs.get(offset), containsString("FAILED: Change property " + DFS_DATANODE_DATA_DIR_KEY)); } assertThat(outputs.get(offset + 1), is(allOf(containsString("From:"), containsString("data1"), containsString("data2")))); assertThat(outputs.get(offset + 2), is(not(anyOf(containsString("data1"), containsString("data2"))))); assertThat(outputs.get(offset + 2), is(allOf(containsString("To"), containsString("data_new")))); }