private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
/** * Run a simple workload of becoming the active writer and writing * two log segments: 1-3 and 4-6. */ private static int doWorkload(MiniJournalCluster cluster, QuorumJournalManager qjm) throws IOException { int lastAcked = 0; try { qjm.recoverUnfinalizedSegments(); writeSegment(cluster, qjm, 1, 3, true); lastAcked = 3; writeSegment(cluster, qjm, 4, 3, true); lastAcked = 6; } catch (QuorumException qe) { LOG.info("Failed to write at txid " + lastAcked, qe); } return lastAcked; }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.format(QJMTestUtil.FAKE_NSINFO); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); cluster = new MiniJournalCluster.Builder(conf) .build(); cluster.waitActive(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.format(QJMTestUtil.FAKE_NSINFO); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
private void simulateFailute(InjectionEventI event, Object... args) throws IOException { // get the journal node ServletContext context = (ServletContext) args[0]; JournalNode jn = (JournalNode) context .getAttribute(JournalNodeHttpServer.JN_ATTRIBUTE_KEY); // configuration stores the index of the node Configuration conf = jn.getConf(); // check which node this is int jid = conf.getInt(MiniJournalCluster.DFS_JOURNALNODE_TEST_ID, 0); // fail if we are supposed to fail on this event if (event == failOn[jid]) { exceptionsThrown.incrementAndGet(); throw new IOException("Testing failures"); } }
private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt("ipc.client.connect.max.retries", 0); conf.setLong(JournalConfigKeys.DFS_QJOURNAL_CONNECT_TIMEOUT_KEY, 100); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); qjm.transitionJournal(QJMTestUtil.FAKE_NSINFO, Transition.FORMAT, null); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getJournalCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
@Before public void setup() throws Exception { conf = new Configuration(); // Don't retry connections - it just slows down the tests. conf.setInt("ipc.client.connect.max.retries", 0); conf.setLong(JournalConfigKeys.DFS_QJOURNAL_CONNECT_TIMEOUT_KEY, 100); cluster = new MiniJournalCluster.Builder(conf) .build(); qjm = TestQuorumJournalManager.createSpyingQJM(conf, cluster, JID, FAKE_NSINFO); qjm.transitionJournal(QJMTestUtil.FAKE_NSINFO, Transition.FORMAT, null); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
@Test public void testFailToStartWithBadConfig() throws Exception { Configuration conf = new Configuration(); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, "non-absolute-path"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); assertJNFailsToStart(conf, "should be an absolute path"); // Existing file which is not a directory conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, "/dev/null"); assertJNFailsToStart(conf, "is not a directory"); // Directory which cannot be created conf.set(org.apache.hadoop.hdfs.qjournal.protocol.JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, "/proc/does-not-exist"); assertJNFailsToStart(conf, "Could not create"); }
public void setUp(Configuration confg, MiniJournalCluster jCluster, String name) throws Exception { LOG.info("START TEST : " + name); handler = new TestAvatarQJMFailuresHandler(); InjectionHandler.set(handler); FSEditLog.setRuntimeForTesting(Runtime.getRuntime()); conf = confg; if (jCluster == null) { cluster = new MiniAvatarCluster.Builder(conf).numDataNodes(1) .enableQJM(true).build(); } else { cluster = new MiniAvatarCluster.Builder(conf).numDataNodes(1) .enableQJM(true).setJournalCluster(jCluster).build(); } fs = cluster.getFileSystem(); journalCluster = cluster.getJournalCluster(); }
/** * This test simulates the scenario where the upgrade fails after saving image * and ensures that the recovery on the journal nodes work correctly. */ @Test public void testUpgradeFailureAfterSaveImage() throws Exception { h.failAfterSaveImage = true; long[] checksums = getChecksums(); // Upgrade the cluster. MiniJournalCluster journalCluster = cluster.getJournalCluster(); // This upgrade will fail after saving the image. try { cluster = new MiniAvatarCluster.Builder(conf).numDataNodes(1) .format(false).startOpt(StartupOption.UPGRADE) .setJournalCluster(journalCluster).instantionRetries(1).build(); fail("Upgrade did not throw exception"); } catch (IOException ie) { // ignore. } // This will correctly recover the upgrade directories. cluster = new MiniAvatarCluster.Builder(conf).numDataNodes(1).format(false) .setJournalCluster(cluster.getJournalCluster()).build(); verifyUpgrade(checksums, true); }