private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
/** * Test the case where the NN crashes after starting a new segment * on all nodes, but before writing the first transaction to it. */ @Test public void testCrashAtBeginningOfSegment() throws Exception { writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); EditLogOutputStream stm = qjm.startLogSegment(4, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); try { waitForAllPendingCalls(qjm.getLoggerSetForTests()); } finally { stm.abort(); } // Make a new QJM qjm = closeLater(new QuorumJournalManager( conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO)); qjm.recoverUnfinalizedSegments(); checkRecovery(cluster, 1, 3); writeSegment(cluster, qjm, 4, 3, true); }
/** * Set up the loggers into the following state: * - JN0: edits 1-3 in progress * - JN1: edits 1-4 in progress * - JN2: edits 1-5 in progress * * None of the loggers have any associated paxos info. */ private void setupLoggers345() throws Exception { EditLogOutputStream stm = qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); failLoggerAtTxn(spies.get(0), 4); failLoggerAtTxn(spies.get(1), 5); writeTxns(stm, 1, 3); // This should succeed to 2/3 loggers writeTxns(stm, 4, 1); // This should only succeed to 1 logger (index 2). Hence it should // fail try { writeTxns(stm, 5, 1); fail("Did not fail to write when only a minority succeeded"); } catch (QuorumException qe) { GenericTestUtils.assertExceptionContains( "too many exceptions to achieve quorum size 2/3", qe); } }
@Test public void testWriteEditsOneSlow() throws Exception { EditLogOutputStream stm = createLogSegment(); writeOp(stm, 1); stm.setReadyToFlush(); // Make the first two logs respond immediately futureReturns(null).when(spyLoggers.get(0)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); // And the third log not respond SettableFuture<Void> slowLog = SettableFuture.create(); Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); stm.flush(); Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L); }
public static EditLogOutputStream writeSegment(MiniJournalCluster cluster, QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // Should create in-progress assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(startTxId)); writeTxns(stm, startTxId, numTxns); if (finalize) { stm.close(); qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); return null; } else { return stm; } }
@Test public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
@Test public void testNumberOfTransactions() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(100, numTrans); }
@Test public void testTwoWriters() throws Exception { long start = 1; NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); bkjm1.format(nsi); BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); EditLogOutputStream out1 = bkjm1.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); try { bkjm2.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); fail("Shouldn't have been able to open the second writer"); } catch (IOException ioe) { LOG.info("Caught exception as expected", ioe); }finally{ out1.close(); } }
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException { EditLogOutputStream out = bkjm.startLogSegment(startTxid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = startTxid; i <= endTxid; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); // finalize the inprogress_1 log segment. bkjm.finalizeLogSegment(startTxid, endTxid); String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); assertNotNull(zkc.exists(zkpath1, false)); assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); return zkpath1; }
private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) { long firstTxId = txid; long lastAcked = txid - 1; try { EditLogOutputStream stm = qjm.startLogSegment(txid); for (int i = 0; i < numTxns; i++) { QJMTestUtil.writeTxns(stm, txid++, 1); lastAcked++; } stm.close(); qjm.finalizeLogSegment(firstTxId, lastAcked); } catch (Throwable t) { thrown.held = t; } return lastAcked; }
/** * Test the case where the NN crashes after starting a new segment * on all nodes, but before writing the first transaction to it. */ @Test public void testCrashAtBeginningOfSegment() throws Exception { writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); EditLogOutputStream stm = qjm.startLogSegment(4); try { waitForAllPendingCalls(qjm.getLoggerSetForTests()); } finally { stm.abort(); } // Make a new QJM qjm = new QuorumJournalManager( conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null, false); qjm.recoverUnfinalizedSegments(); checkRecovery(cluster, 1, 3); writeSegment(cluster, qjm, 4, 3, true); }
/** * Set up the loggers into the following state: * - JN0: edits 1-3 in progress * - JN1: edits 1-4 in progress * - JN2: edits 1-5 in progress * * None of the loggers have any associated paxos info. */ private void setupLoggers345() throws Exception { EditLogOutputStream stm = qjm.startLogSegment(1); failLoggerAtTxn(spies.get(0), 4); failLoggerAtTxn(spies.get(1), 5); writeTxns(stm, 1, 3); // This should succeed to 2/3 loggers writeTxns(stm, 4, 1); // This should only succeed to 1 logger (index 2). Hence it should // fail try { writeTxns(stm, 5, 1); fail("Did not fail to write when only a minority succeeded"); } catch (QuorumException qe) { GenericTestUtils.assertExceptionContains( "too many exceptions to achieve quorum size 2/3", qe); } }
@Test public void testWriteEditsOneSlow() throws Exception { EditLogOutputStream stm = createLogSegment(); writeOp(stm, 1); stm.setReadyToFlush(); // Make the first two logs respond immediately futureReturns(null).when(spyLoggers.get(0)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); // And the third log not respond SettableFuture<Void> slowLog = SettableFuture.<Void>create(); Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); stm.flush(); Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L, false); }