/** * 启动伪集群模式 * @param config * @throws IOException * @throws InterruptedException * @throws ConfigException */ public void startFakeCluster(QuorumPeerConfig config) throws IOException{ ServerCnxnFactory cnxnFactory = new NIOServerCnxnFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); QuorumPeer quorumPeer = new QuorumPeer(config.getServers(), config.getDataDir(), config.getDataLogDir(), config.getElectionAlg(), config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(), config.getQuorumListenOnAllIPs(), cnxnFactory, config.getQuorumVerifier()); quorumPeer.setClientAddress(config.getClientPortAddress()); quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), true); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.start(); LOGGER.info("ZkServerCluster Started! ClientPortAddress={}", config.getClientPortAddress()); }
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { this(); this.cnxnFactory = cnxnFactory; this.quorumPeers = quorumPeers; this.electionType = electionType; this.myid = myid; this.tickTime = tickTime; this.initLimit = initLimit; this.syncLimit = syncLimit; this.quorumListenOnAllIPs = quorumListenOnAllIPs; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); this.zkDb = new ZKDatabase(this.logFactory); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; }
/** * Tests purge where the data directory contains old snapshots and data * logs, newest snapshots and data logs */ @Test public void testSnapFilesLessThanToRetain() throws Exception { int nRecentCount = 4; int fileToPurgeCount = 2; AtomicInteger offset = new AtomicInteger(0); tmpDir = ClientBase.createTmpDir(); File version2 = new File(tmpDir.toString(), "version-2"); Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir()); List<File> snapsToPurge = new ArrayList<File>(); List<File> logsToPurge = new ArrayList<File>(); List<File> snaps = new ArrayList<File>(); List<File> logs = new ArrayList<File>(); createDataDirFiles(offset, fileToPurgeCount, version2, snapsToPurge, logsToPurge); createDataDirFiles(offset, nRecentCount, version2, snaps, logs); FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir); PurgeTxnLog.retainNRecentSnapshots(txnLog, snaps); txnLog.close(); verifyFilesAfterPurge(snapsToPurge, false); verifyFilesAfterPurge(logsToPurge, false); verifyFilesAfterPurge(snaps, true); verifyFilesAfterPurge(logs, true); }
@Override protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, this, this.getZkDb()) { @Override protected void setupRequestProcessors() { /** * This method is overridden to make a place to inject * MockSyncRequestProcessor */ RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); } }; return new Leader(this, zk); }
@Override protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { @Override void writePacket(QuorumPacket pp, boolean flush) throws IOException { if (pp != null && pp.getType() == Leader.ACK) { newLeaderMessage = true; try { /** * Delaying the ACK message, a follower sends as * response to a NEWLEADER message, so that the * leader has a chance to send the reconfig and only * then the UPTODATE message. */ Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } super.writePacket(pp, flush); } }; }
@Override protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { @Override void readPacket(QuorumPacket pp) throws IOException { /** * In real scenario got SocketTimeoutException while reading * the packet from leader because of network problem, but * here throwing SocketTimeoutException based on whether * error is injected or not */ super.readPacket(pp); if (injectError && pp.getType() == Leader.PROPOSAL) { String type = LearnerHandler.packetToString(pp); throw new SocketTimeoutException( "Socket timeout while reading the packet for operation " + type); } } }; }
public void internalTestSnapFilesEqualsToRetain(boolean testWithPrecedingLogFile) throws Exception { int nRecentCount = 3; AtomicInteger offset = new AtomicInteger(0); tmpDir = ClientBase.createTmpDir(); File version2 = new File(tmpDir.toString(), "version-2"); Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir()); List<File> snaps = new ArrayList<File>(); List<File> logs = new ArrayList<File>(); createDataDirFiles(offset, nRecentCount, testWithPrecedingLogFile, version2, snaps, logs); FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir); PurgeTxnLog.purgeOlderSnapshots(txnLog, snaps.get(snaps.size() - 1)); txnLog.close(); verifyFilesAfterPurge(snaps, true); verifyFilesAfterPurge(logs, true); }
/** * Creates a ZooKeeperServer instance. It sets everything up, but doesn't * actually start listening for clients until run() is invoked. * * @param dataDir the directory to put the data */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) { serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.zkDb = zkDb; this.tickTime = tickTime; this.minSessionTimeout = minSessionTimeout; this.maxSessionTimeout = maxSessionTimeout; listener = new ZooKeeperServerListenerImpl(this); LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() + " datadir " + txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir()); }
/** * Test verifies that the No Auth enabled Learner is connecting to a No Auth * Leader server. Learner should be able to establish a connection with * Leader as auth is not required. */ @Test(timeout = 30000) public void testNoAuthLearnerConnectsToServerWithAuthNotRequired() throws Exception { File testDataLearner = ClientBase.createTmpDir(); File tmpDir = File.createTempFile("test", ".dir", testDataLearner); tmpDir.delete(); FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpDir, tmpDir); QuorumPeer learnerPeer = createQuorumPeer(tmpDir, true, false, false, "QuorumLearner", "QuorumServer", ""); SimpleLearner sl = new SimpleLearner(ftsl, learnerPeer); File testDataLeader = ClientBase.createTmpDir(); tmpDir = File.createTempFile("test", ".dir", testDataLeader); tmpDir.delete(); tmpDir.mkdir(); Leader leader = null; QuorumPeer peer = createQuorumPeer(tmpDir, true, false, false, "QuorumLearner", "QuorumServer", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE); CountDownLatch learnerLatch = new CountDownLatch(1); leader = createSimpleLeader(tmpDir, peer, learnerLatch); peer.leader = leader; startLearnerCnxAcceptorThread(leader); LOG.info("Start establishing a connection with the Leader"); String hostname = getLeaderHostname(peer); sl.connectToLeader(peer.getQuorumAddress(), hostname); Assert.assertTrue("Leader should accept no auth learner connection", learnerLatch.await(leader.self.tickTime * leader.self.initLimit + 1000, TimeUnit.MILLISECONDS)); ClientBase.recursiveDelete(testDataLearner); ClientBase.recursiveDelete(testDataLeader); }
/** * @param port * @param dataDir * @throws IOException */ FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self, DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, treeBuilder, zkDb, self); this.pendingSyncs = new ConcurrentLinkedQueue<Request>(); }
public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb, QuorumPeer self) throws IOException { super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb, self); }
private ConversableObserver createObserver(File tmpDir, QuorumPeer peer) throws IOException { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); peer.setTxnFactory(logFactory); DataTreeBuilder treeBuilder = new ZooKeeperServer.BasicDataTreeBuilder(); ZKDatabase zkDb = new ZKDatabase(logFactory); ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, treeBuilder, zkDb); peer.setZKDatabase(zkDb); return new ConversableObserver(peer, zk); }
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb, QuorumPeer self) { super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb); this.self = self; }
/** * Default constructor, relies on the config for its agrument values * * @throws IOException */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException { this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder, new ZKDatabase(txnLogFactory)); }
/** * Test verifies that the Leader should authenticate the connecting learner * quorumpeer. After the successful authentication it should add this * learner to the learnerHandler list. */ @Test(timeout = 30000) public void testAuthLearnerConnectsToServerWithAuthNotRequired() throws Exception { File testDataLearner = ClientBase.createTmpDir(); File tmpDir = File.createTempFile("test", ".dir", testDataLearner); tmpDir.delete(); FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpDir, tmpDir); QuorumPeer learnerPeer = createQuorumPeer(tmpDir, true, true, true, "QuorumLearner", "QuorumServer", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE); SimpleLearner sl = new SimpleLearner(ftsl, learnerPeer); File testDataLeader = ClientBase.createTmpDir(); tmpDir = File.createTempFile("test", ".dir", testDataLeader); tmpDir.delete(); tmpDir.mkdir(); Leader leader = null; QuorumPeer peer = createQuorumPeer(tmpDir, true, true, false, "QuorumLearner", "QuorumServer", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE); CountDownLatch learnerLatch = new CountDownLatch(1); leader = createSimpleLeader(tmpDir, peer, learnerLatch); peer.leader = leader; startLearnerCnxAcceptorThread(leader); LOG.info("Start establishing a connection with the Leader"); String hostname = getLeaderHostname(peer); sl.connectToLeader(peer.getQuorumAddress(), hostname); // wait till leader socket soTimeout period Assert.assertTrue("Leader should accept the auth learner connection", learnerLatch.await(leader.self.tickTime * leader.self.initLimit + 1000, TimeUnit.MILLISECONDS)); Assert.assertEquals("Failed to added the learner", 1, leader.getLearners().size()); ClientBase.recursiveDelete(testDataLearner); ClientBase.recursiveDelete(testDataLeader); }
private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer) throws IOException, NoSuchFieldException, IllegalAccessException { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); peer.setTxnFactory(logFactory); Field addrField = peer.getClass().getDeclaredField("myQuorumAddr"); addrField.setAccessible(true); addrField.set(peer, new InetSocketAddress(PortAssignment.unique())); ZKDatabase zkDb = new ZKDatabase(logFactory); LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb); return zk; }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = new QuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
@Test public void testInitialAcceptedCurrent() throws Exception { File tmpDir = File.createTempFile("test", ".dir", testData); tmpDir.delete(); tmpDir.mkdir(); try { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); long zxid = ZxidUtils.makeZxid(3, 3); TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error); ErrorTxn txn = new ErrorTxn(1); byte[] buf = Util.marshallTxnEntry(hdr, txn); Request req = new Request(null, 1, 1, ZooDefs.OpCode.error, ByteBuffer.wrap(buf), null); req.hdr = hdr; req.txn = txn; logFactory.append(req); logFactory.commit(); ZKDatabase zkDb = new ZKDatabase(logFactory); QuorumPeer peer = QuorumPeer.testingQuorumPeer(); peer.setZKDatabase(zkDb); peer.setTxnFactory(logFactory); peer.getLastLoggedZxid(); Assert.assertEquals(3, peer.getAcceptedEpoch()); Assert.assertEquals(3, peer.getCurrentEpoch()); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME)))); Assert.assertEquals(3, Integer .parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME)))); } finally { recursiveDelete(tmpDir); } }
/** * Tests purge where the data directory contains old snapshots and data * logs, newest snapshots and data logs */ @Test public void testSnapFilesLessThanToRetain() throws Exception { int nRecentCount = 4; int fileToPurgeCount = 2; AtomicInteger offset = new AtomicInteger(0); tmpDir = ClientBase.createTmpDir(); File version2 = new File(tmpDir.toString(), "version-2"); Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir()); List<File> snapsToPurge = new ArrayList<File>(); List<File> logsToPurge = new ArrayList<File>(); List<File> snaps = new ArrayList<File>(); List<File> logs = new ArrayList<File>(); createDataDirFiles(offset, fileToPurgeCount, false, version2, snapsToPurge, logsToPurge); createDataDirFiles(offset, nRecentCount, false, version2, snaps, logs); logs.add(logsToPurge.remove(0)); // log that precedes first retained snapshot is also retained /** * The newest log file preceding the oldest retained snapshot is not removed as it may * contain transactions newer than the oldest snapshot. */ logsToPurge.remove(0); FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir); PurgeTxnLog.purgeOlderSnapshots(txnLog, snaps.get(snaps.size() - 1)); txnLog.close(); verifyFilesAfterPurge(snapsToPurge, false); verifyFilesAfterPurge(logsToPurge, false); verifyFilesAfterPurge(snaps, true); verifyFilesAfterPurge(logs, true); }
/** * Tests finding n recent snapshots from set of snapshots and data logs */ @Test public void testFindNRecentSnapshots() throws Exception { int nRecentSnap = 4; // n recent snap shots int nRecentCount = 30; int offset = 0; tmpDir = ClientBase.createTmpDir(); File version2 = new File(tmpDir.toString(), "version-2"); Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir()); List<File> expectedNRecentSnapFiles = new ArrayList<File>(); int counter = offset + (2 * nRecentCount); for (int i = 0; i < nRecentCount; i++) { // simulate log file File logFile = new File(version2 + "/log." + Long.toHexString(--counter)); Assert.assertTrue("Failed to create log File:" + logFile.toString(), logFile.createNewFile()); // simulate snapshot file File snapFile = new File(version2 + "/snapshot." + Long.toHexString(--counter)); Assert.assertTrue("Failed to create snap File:" + snapFile.toString(), snapFile.createNewFile()); // add the n recent snap files for assertion if(i < nRecentSnap){ expectedNRecentSnapFiles.add(snapFile); } } FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir); List<File> nRecentSnapFiles = txnLog.findNRecentSnapshots(nRecentSnap); txnLog.close(); Assert.assertEquals("exactly 4 snapshots ", 4, nRecentSnapFiles.size()); expectedNRecentSnapFiles.removeAll(nRecentSnapFiles); Assert.assertEquals("Didn't get the recent snap files", 0, expectedNRecentSnapFiles.size()); }
/** * Tests purge where the data directory contains old snapshots and data * logs, newest snapshots and data logs, (newest + n) snapshots and data * logs */ @Test public void testSnapFilesGreaterThanToRetain() throws Exception { int nRecentCount = 4; int fileAboveRecentCount = 4; int fileToPurgeCount = 2; AtomicInteger offset = new AtomicInteger(0); tmpDir = ClientBase.createTmpDir(); File version2 = new File(tmpDir.toString(), "version-2"); Assert.assertTrue("Failed to create version_2 dir:" + version2.toString(), version2.mkdir()); List<File> snapsToPurge = new ArrayList<File>(); List<File> logsToPurge = new ArrayList<File>(); List<File> snaps = new ArrayList<File>(); List<File> logs = new ArrayList<File>(); List<File> snapsAboveRecentFiles = new ArrayList<File>(); List<File> logsAboveRecentFiles = new ArrayList<File>(); createDataDirFiles(offset, fileToPurgeCount, version2, snapsToPurge, logsToPurge); createDataDirFiles(offset, nRecentCount, version2, snaps, logs); createDataDirFiles(offset, fileAboveRecentCount, version2, snapsAboveRecentFiles, logsAboveRecentFiles); FileTxnSnapLog txnLog = new FileTxnSnapLog(tmpDir, tmpDir); PurgeTxnLog.retainNRecentSnapshots(txnLog, snaps); txnLog.close(); verifyFilesAfterPurge(snapsToPurge, false); verifyFilesAfterPurge(logsToPurge, false); verifyFilesAfterPurge(snaps, true); verifyFilesAfterPurge(logs, true); verifyFilesAfterPurge(snapsAboveRecentFiles, true); verifyFilesAfterPurge(logsAboveRecentFiles, true); }
/** * For ZOOKEEPER-1046. Verify if cversion and pzxid if incremented * after create/delete failure during restore. */ @Test public void testTxnFailure() throws Exception { long count = 1; File tmpDir = ClientBase.createTmpDir(); FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir); DataTree dt = new DataTree(); dt.createNode("/test", new byte[0], null, 0, -1, 1, 1); for (count = 1; count <= 3; count++) { dt.createNode("/test/" + count, new byte[0], null, 0, -1, count, System.currentTimeMillis()); } DataNode zk = dt.getNode("/test"); // Make create to fail, then verify cversion. LOG.info("Attempting to create " + "/test/" + (count - 1)); doOp(logFile, OpCode.create, "/test/" + (count - 1), dt, zk, -1); LOG.info("Attempting to create " + "/test/" + (count - 1)); doOp(logFile, OpCode.create, "/test/" + (count - 1), dt, zk, zk.stat.getCversion() + 1); LOG.info("Attempting to create " + "/test/" + (count - 1)); doOp(logFile, OpCode.multi, "/test/" + (count - 1), dt, zk, zk.stat.getCversion() + 1); LOG.info("Attempting to create " + "/test/" + (count - 1)); doOp(logFile, OpCode.multi, "/test/" + (count - 1), dt, zk, -1); // Make delete fo fail, then verify cversion. // this doesn't happen anymore, we only set the cversion on create // LOG.info("Attempting to delete " + "/test/" + (count + 1)); // doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk); }
@Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.truncateLog(1); append(zkdb, 200); zkdb.close(); // verify that the truncation and subsequent append were processed // correctly FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); TxnIterator iter = txnlog.read(1); TxnHeader hdr = iter.getHeader(); Record txn = iter.getTxn(); Assert.assertEquals(1, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.next(); hdr = iter.getHeader(); txn = iter.getTxn(); Assert.assertEquals(200, hdr.getZxid()); Assert.assertTrue(txn instanceof SetDataTxn); iter.close(); ClientBase.recursiveDelete(tmpdir); }
@Test public void testTruncationNullLog() throws Exception { File tmpdir = ClientBase.createTmpDir(); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); ZKDatabase zkdb = new ZKDatabase(snaplog); for (int i = 1; i <= 100; i++) { append(zkdb, i); } zkdb.close(); File[] logs = snaplog.getDataDir().listFiles(); for(int i = 0; i < logs.length; i++) { LOG.debug("Deleting: {}", logs[i].getName()); Assert.assertTrue("Failed to delete log file: " + logs[i].getName(), logs[i].delete()); } try { zkdb.truncateLog(1); Assert.assertTrue("Should not get here", false); } catch(IOException e) { Assert.assertTrue("Should have received an IOException", true); } catch(NullPointerException npe) { Assert.fail("This should not throw NPE!"); } ClientBase.recursiveDelete(tmpdir); }
/** * Tests that the ZooKeeper server will fail to start if the * snapshot directory is read only. * * This test will fail if it is executed as root user. */ @Test(timeout = 30000) public void testReadOnlySnapshotDir() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT = PortAssignment.unique(); // Start up the ZK server to automatically create the necessary directories // and capture the directory where data is stored MainThread main = new MainThread(CLIENT_PORT, true); File tmpDir = main.tmpDir; main.start(); Assert.assertTrue("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); // Make the snapshot directory read only File snapDir = new File(main.dataDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); snapDir.setWritable(false); // Restart ZK and observe a failure main = new MainThread(CLIENT_PORT, false, tmpDir); main.start(); Assert.assertFalse("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); snapDir.setWritable(true); main.deleteDirs(); }
/** * Purges the snapshot and logs keeping the last num snapshots and the * corresponding logs. If logs are rolling or a new snapshot is created * during this process, these newest N snapshots or any data logs will be * excluded from current purging cycle. * * @param dataDir the dir that has the logs * @param snapDir the dir that has the snapshots * @param num the number of snapshots to keep * @throws IOException */ public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testPurge() throws Exception { tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.getTxnLogFactory().close(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now corrupt the snapshot PurgeTxnLog.purge(tmpDir, tmpDir, 3); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir); List<File> listLogs = snaplog.findNRecentSnapshots(4); int numSnaps = 0; for (File ff: listLogs) { if (ff.getName().startsWith("snapshot")) { numSnaps++; } } Assert.assertTrue("exactly 3 snapshots ", (numSnaps == 3)); snaplog.close(); zks.shutdown(); }
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb, QuorumPeer self) { super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, zkDb); this.self = self; }
/** * Test verifies that the Leader should authenticate the connecting learner * quorumpeer. After the successful authentication it should add this * learner to the learnerHandler list. */ @Test(timeout = 30000) public void testAuthLearnerConnectsToServerWithAuthRequired() throws Exception { File testDataLearner = ClientBase.createTmpDir(); File tmpDir = File.createTempFile("test", ".dir", testDataLearner); tmpDir.delete(); FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpDir, tmpDir); QuorumPeer learnerPeer = createQuorumPeer(tmpDir, true, true, true, "QuorumLearner", "QuorumServer", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE); SimpleLearner sl = new SimpleLearner(ftsl, learnerPeer); File testDataLeader = ClientBase.createTmpDir(); tmpDir = File.createTempFile("test", ".dir", testDataLeader); tmpDir.delete(); tmpDir.mkdir(); Leader leader = null; QuorumPeer peer = createQuorumPeer(tmpDir, true, true, true, "QuorumLearner", "QuorumServer", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE); CountDownLatch learnerLatch = new CountDownLatch(1); leader = createSimpleLeader(tmpDir, peer, learnerLatch); peer.leader = leader; startLearnerCnxAcceptorThread(leader); LOG.info("Start establishing a connection with the Leader"); String hostname = getLeaderHostname(peer); sl.connectToLeader(peer.getQuorumAddress(), hostname); // wait till leader socket soTimeout period Assert.assertTrue("Leader should accept the auth learner connection", learnerLatch.await(leader.self.tickTime * leader.self.initLimit + 1000, TimeUnit.MILLISECONDS)); Assert.assertEquals("Failed to added the learner", 1, leader.getLearners().size()); ClientBase.recursiveDelete(testDataLearner); ClientBase.recursiveDelete(testDataLeader); }
/** * Tests that the ZooKeeper server will fail to start if the * transaction log directory is read only. * * This test will fail if it is executed as root user. */ @Test(timeout = 30000) public void testReadOnlyTxnLogDir() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT = PortAssignment.unique(); // Start up the ZK server to automatically create the necessary directories // and capture the directory where data is stored MainThread main = new MainThread(CLIENT_PORT, true); File tmpDir = main.tmpDir; main.start(); Assert.assertTrue("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); // Make the transaction log directory read only File logDir = new File(main.logDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); logDir.setWritable(false); // Restart ZK and observe a failure main = new MainThread(CLIENT_PORT, false, tmpDir); main.start(); Assert.assertFalse("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); logDir.setWritable(true); main.deleteDirs(); }
@Override protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { @Override protected void processPacket(QuorumPacket qp) throws Exception { if (stopPing && qp.getType() == Leader.PING) { LOG.info("Follower skipped ping"); throw new SocketException("Socket time out while sending the ping response"); } else { super.processPacket(qp); } } }; }
private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer) throws IOException, NoSuchFieldException, IllegalAccessException { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); peer.setTxnFactory(logFactory); ZKDatabase zkDb = new ZKDatabase(logFactory); LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb); return zk; }
private ConversableFollower createFollower(File tmpDir, QuorumPeer peer) throws IOException { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); peer.setTxnFactory(logFactory); ZKDatabase zkDb = new ZKDatabase(logFactory); FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, zkDb); peer.setZKDatabase(zkDb); return new ConversableFollower(peer, zk); }
/** * Tests that the ZooKeeper server will fail to start if the * snapshot directory is read only. * * This test will fail if it is executed as root user. */ @Test(timeout = 30000) public void testReadOnlySnapshotDir() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT = PortAssignment.unique(); // Start up the ZK server to automatically create the necessary directories // and capture the directory where data is stored MainThread main = new MainThread(CLIENT_PORT, true, null); File tmpDir = main.tmpDir; main.start(); Assert.assertTrue("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); // Make the snapshot directory read only File snapDir = new File(main.dataDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); snapDir.setWritable(false); // Restart ZK and observe a failure main = new MainThread(CLIENT_PORT, false, tmpDir, null); main.start(); Assert.assertFalse("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); snapDir.setWritable(true); main.deleteDirs(); }
/** * Tests that the ZooKeeper server will fail to start if the * transaction log directory is read only. * * This test will fail if it is executed as root user. */ @Test(timeout = 30000) public void testReadOnlyTxnLogDir() throws Exception { ClientBase.setupTestEnv(); final int CLIENT_PORT = PortAssignment.unique(); // Start up the ZK server to automatically create the necessary directories // and capture the directory where data is stored MainThread main = new MainThread(CLIENT_PORT, true, null); File tmpDir = main.tmpDir; main.start(); Assert.assertTrue("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); // Make the transaction log directory read only File logDir = new File(main.logDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); logDir.setWritable(false); // Restart ZK and observe a failure main = new MainThread(CLIENT_PORT, false, tmpDir, null); main.start(); Assert.assertFalse("waiting for server being up", ClientBase .waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2)); main.shutdown(); logDir.setWritable(true); main.deleteDirs(); }
@Test public void testTxnLogElapsedSyncTime() throws IOException { File tmpDir = ClientBase.createEmptyTestDir(); FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"), new File(tmpDir, "data_txnlog")); ZooKeeperServer zks = new ZooKeeperServer(); zks.setTxnLogFactory(fileTxnSnapLog); ZooKeeperServerBean serverBean = new ZooKeeperServerBean(zks); long elapsedTime = serverBean.getTxnLogElapsedSyncTime(); assertEquals(-1, elapsedTime); TxnHeader hdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.setData); Record txn = new SetDataTxn("/foo", new byte[0], 1); Request req = new Request(0, 0, 0, hdr, txn, 0); try { zks.getTxnLogFactory().append(req); zks.getTxnLogFactory().commit(); elapsedTime = serverBean.getTxnLogElapsedSyncTime(); assertNotEquals(-1, elapsedTime); assertEquals(elapsedTime, serverBean.getTxnLogElapsedSyncTime()); } finally { fileTxnSnapLog.close(); } }
/** * test the purge * @throws Exception an exception might be thrown here */ @Test public void testPurge() throws Exception { tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); SyncRequestProcessor.setSnapCount(100); final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); f.startup(zks); Assert.assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); ZooKeeper zk = ClientBase.createZKClient(HOSTPORT); try { for (int i = 0; i< 2000; i++) { zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } finally { zk.close(); } f.shutdown(); zks.getTxnLogFactory().close(); Assert.assertTrue("waiting for server to shutdown", ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); // now corrupt the snapshot PurgeTxnLog.purge(tmpDir, tmpDir, 3); FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir); List<File> listLogs = snaplog.findNRecentSnapshots(4); int numSnaps = 0; for (File ff: listLogs) { if (ff.getName().startsWith("snapshot")) { numSnaps++; } } Assert.assertTrue("exactly 3 snapshots ", (numSnaps == 3)); snaplog.close(); zks.shutdown(); }