/** * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192 * @throws Exception */ @Test public void testAbandonBeforeACKEpoch() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced Assert.assertEquals(0, l.self.getCurrentEpoch()); } }); }
private void addRequestToSyncProcessor() { long zxid = ZxidUtils.makeZxid(3, 7); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo" + zxid, new byte[0], 1); byte[] buf; try { buf = Util.marshallTxnEntry(hdr, txn); } catch (IOException e) { LOG.error("IOException while adding request to SyncRequestProcessor", e); Assert.fail("IOException while adding request to SyncRequestProcessor!"); return; } NettyServerCnxnFactory factory = new NettyServerCnxnFactory(); final MockNettyServerCnxn nettyCnxn = new MockNettyServerCnxn(null, this, factory); Request req = new Request(nettyCnxn, 1, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; syncProcessor.processRequest(req); }
/** * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192 * @throws Exception */ @Test public void testAbandonBeforeACKEpoch() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced Assert.assertEquals(0, l.self.getCurrentEpoch()); } }); }
@Test public void testDontCare() { MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer)); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 2, ServerState.FOLLOWING)); votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 2), 1, 2, ServerState.FOLLOWING)); votes.put(3L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)); votes.put(4L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING))); }
@Test public void testDontCareVersion() { MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer)); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING)); votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING))); }
@Test public void testLookingNormal() { MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer)); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING))); }
@Test public void testLookingDiffRounds() { MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer)); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.LOOKING)); votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LOOKING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LEADING)); Assert.assertFalse(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING))); }
@Test public void testUnnecessarySnap() throws Exception { testPopulatedLeaderConversation(new PopulatedLeaderConversation() { @Override public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception { Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(2, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); wrappedEpochBytes.putInt(1); qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); } }, 2); }
@Test public void testLeaderBehind() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testUnnecessarySnap() throws Exception { testPopulatedLeaderConversation(new PopulatedLeaderConversation() { @Override public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception { Assert.assertEquals(1, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); Assert.assertEquals(2, l.self.getAcceptedEpoch()); Assert.assertEquals(1, l.self.getCurrentEpoch()); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); wrappedEpochBytes.putInt(1); qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); } }, 2); }
@Test public void testLeaderBehind() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000, 0); byte liBytes[] = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.LEADERINFO, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.DIFF, qp.getType()); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); Assert.assertEquals(Leader.UPTODATE, qp.getType()); } }); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid)); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { TestUtils.deleteFileRecursively(tmpDir); } }
@Test public void testDontCare() { MockFLE fle = new MockFLE(peer, peer.createCnxnManager()); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 2, ServerState.FOLLOWING)); votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 2), 1, 2, ServerState.FOLLOWING)); votes.put(3L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)); votes.put(4L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING))); }
@Test public void testDontCareVersion() { MockFLE fle = new MockFLE(peer, peer.createCnxnManager()); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING)); votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING))); }
@Test public void testLookingNormal() { MockFLE fle = new MockFLE(peer, peer.createCnxnManager()); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LEADING)); Assert.assertTrue(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING))); }
@Test public void testLookingDiffRounds() { MockFLE fle = new MockFLE(peer, peer.createCnxnManager()); HashMap<Long, Vote> votes = new HashMap<Long, Vote>(); votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.LOOKING)); votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING)); votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LOOKING)); votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LEADING)); Assert.assertFalse(fle.termPredicate(votes, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING))); }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = QuorumPeer.testingQuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }