public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { this(); this.cnxnFactory = cnxnFactory; this.quorumPeers = quorumPeers; this.electionType = electionType; this.myid = myid; this.tickTime = tickTime; this.initLimit = initLimit; this.syncLimit = syncLimit; this.quorumListenOnAllIPs = quorumListenOnAllIPs; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); this.zkDb = new ZKDatabase(this.logFactory); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; }
/** * Send notifications to all peers upon a change in our vote */ private void sendNotifications() { for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } sendqueue.offer(notmsg); } }
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { this(); this.cnxnFactory = cnxnFactory; this.quorumPeers = quorumPeers; this.electionType = electionType; this.myid = myid; this.tickTime = tickTime; this.initLimit = initLimit; this.syncLimit = syncLimit; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); this.zkDb = new ZKDatabase(this.logFactory); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; }
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } if (lastAcceptedEpoch > epoch) { epoch = lastAcceptedEpoch+1; } connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); if (verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; connectingFollowers.notifyAll(); } else { connectingFollowers.wait(self.getInitLimit()*self.getTickTime()); if (waitingForNewEpoch) { throw new InterruptedException("Out of time to propose an epoch"); } } return epoch; } }
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { return; } if (ss.getCurrentEpoch() != -1) { if (ss.isMoreRecentThan(leaderStateSummary)) { throw new IOException("Follower is ahead of the leader"); } electingFollowers.add(id); } QuorumVerifier verifier = self.getQuorumVerifier(); if (readyToStart && verifier.containsQuorum(electingFollowers)) { electionFinished = true; electingFollowers.notifyAll(); } else { electingFollowers.wait(self.getInitLimit()*self.getTickTime()); if (waitingForNewEpoch) { throw new InterruptedException("Out of time to propose an epoch"); } } } }
public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + ". Current version: " + quorumVerifier.getVersion()); } // assuming that a version uniquely identifies a configuration, so if // version is the same, nothing to do here. if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { return; } lastSeenQuorumVerifier = qv; connectNewPeers(); if (writeToDisk) { try { QuorumPeerConfig.writeDynamicConfig( getNextDynamicConfigFilename(), qv, true); } catch(IOException e){ LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } } }
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, NIOServerCnxn.Factory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { this(); this.cnxnFactory = cnxnFactory; this.quorumPeers = quorumPeers; this.electionType = electionType; this.myid = myid; this.tickTime = tickTime; this.initLimit = initLimit; this.syncLimit = syncLimit; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); this.zkDb = new ZKDatabase(this.logFactory); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; }
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { long start = System.currentTimeMillis(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { connectingFollowers.wait(end - cur); cur = System.currentTimeMillis(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { return; } if (ss.getCurrentEpoch() != -1) { if (ss.isMoreRecentThan(leaderStateSummary)) { throw new IOException("Follower is ahead of the leader, leader summary: " + leaderStateSummary.getCurrentEpoch() + " (current epoch), " + leaderStateSummary.getLastZxid() + " (last zxid)"); } electingFollowers.add(id); } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { electionFinished = true; electingFollowers.notifyAll(); } else { long start = System.currentTimeMillis(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(!electionFinished && cur < end) { electingFollowers.wait(end - cur); cur = System.currentTimeMillis(); } if (!electionFinished) { throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum"); } } } }
/** * This constructor is only used by the existing unit test code. * It defaults to FileLogProvider persistence provider. */ public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, QuorumVerifier quorumConfig) throws IOException { this(quorumPeers, snapDir, logDir, electionAlg, myid,tickTime, initLimit,syncLimit, false, ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), quorumConfig); }
/** * Returns true if a quorum in qv is connected and synced with the leader * and false otherwise * * @param qv, a QuorumVerifier */ public boolean isQuorumSynced(QuorumVerifier qv) { HashSet<Long> ids = new HashSet<Long>(); if (qv.getVotingMembers().containsKey(self.getId())) ids.add(self.getId()); synchronized (forwardingFollowers) { for (LearnerHandler learnerHandler: forwardingFollowers){ if (learnerHandler.synced() && qv.getVotingMembers().containsKey(learnerHandler.getSid())){ ids.add(learnerHandler.getSid()); } } } return qv.containsQuorum(ids); }
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { return; } if (ss.getCurrentEpoch() != -1) { if (ss.isMoreRecentThan(leaderStateSummary)) { throw new IOException("Follower is ahead of the leader, leader summary: " + leaderStateSummary.getCurrentEpoch() + " (current epoch), " + leaderStateSummary.getLastZxid() + " (last zxid)"); } if (ss.getLastZxid() != -1) { electingFollowers.add(id); } } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { electionFinished = true; electingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(!electionFinished && cur < end) { electingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (!electionFinished) { throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum"); } } } }
/** * Start up Leader ZooKeeper server and initialize zxid to the new epoch */ private synchronized void startZkServer() { // Update lastCommitted and Db's zxid to a value representing the new epoch lastCommitted = zk.getZxid(); LOG.info("Have quorum of supporters, sids: [ " + newLeaderProposal.ackSetsToString() + " ]; starting up and setting last processed zxid: 0x{}", Long.toHexString(zk.getZxid())); /* * ZOOKEEPER-1324. the leader sends the new config it must complete * to others inside a NEWLEADER message (see LearnerHandler where * the NEWLEADER message is constructed), and once it has enough * acks we must execute the following code so that it applies the * config to itself. */ QuorumVerifier newQV = self.getLastSeenQuorumVerifier(); Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid()); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and * send leader election notifications to the ensemble. * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ self.updateElectionVote(getEpoch()); zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); }
/** * This constructor is only used by the existing unit test code. * It defaults to FileLogProvider persistence provider. */ public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, QuorumVerifier quorumConfig) throws IOException { this(quorumPeers, snapDir, logDir, electionAlg, myid,tickTime, initLimit,syncLimit, false, ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), quorumConfig); }
public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){ if (qvOLD == null || !qvOLD.equals(qvNEW)) { LOG.warn("Restarting Leader Election"); getElectionAlg().shutdown(); shuttingDownLE = false; startLeaderElection(); } }
public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ synchronized (QV_LOCK) { if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + ". Current version: " + quorumVerifier.getVersion()); } // assuming that a version uniquely identifies a configuration, so if // version is the same, nothing to do here. if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { return; } lastSeenQuorumVerifier = qv; connectNewPeers(); if (writeToDisk) { try { String fileName = getNextDynamicConfigFilename(); if (fileName != null) { QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); } } catch (IOException e) { LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } } } }
private boolean updateLearnerType(QuorumVerifier newQV) { //check if I'm an observer in new config if (newQV.getObservingMembers().containsKey(getId())) { if (getLearnerType()!=LearnerType.OBSERVER){ setLearnerType(LearnerType.OBSERVER); LOG.info("Becoming an observer"); reconfigFlagSet(); return true; } else { return false; } } else if (newQV.getVotingMembers().containsKey(getId())) { if (getLearnerType()!=LearnerType.PARTICIPANT){ setLearnerType(LearnerType.PARTICIPANT); LOG.info("Becoming a voting participant"); reconfigFlagSet(); return true; } else { return false; } } // I'm not in the view if (getLearnerType()!=LearnerType.PARTICIPANT){ setLearnerType(LearnerType.PARTICIPANT); LOG.info("Becoming a non-voting participant"); reconfigFlagSet(); return true; } return false; }
/** * Try to establish a connection to server with id sid. * * @param sid server id */ synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server " + sid); return; } synchronized (self.QV_LOCK) { boolean knownId = false; // Resolve hostname for the remote server before attempting to // connect in case the underlying ip address has changed. self.recreateSocketAddresses(sid); Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView(); QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers(); if (lastCommittedView.containsKey(sid)) { knownId = true; if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) return; } if (lastSeenQV != null && lastProposedView.containsKey(sid) && (!knownId || (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) { knownId = true; if (connectOne(sid, lastProposedView.get(sid).electionAddr)) return; } if (!knownId) { LOG.warn("Invalid server id: " + sid); return; } } }
/** * Writes dynamic configuration file */ public static void writeDynamicConfig(final String dynamicConfigFilename, final QuorumVerifier qv, final boolean needKeepVersion) throws IOException { new AtomicFileWritingIdiom(new File(dynamicConfigFilename), new WriterStatement() { @Override public void write(Writer out) throws IOException { Properties cfg = new Properties(); cfg.load( new StringReader( qv.toString())); List<String> servers = new ArrayList<String>(); for (Entry<Object, Object> entry : cfg.entrySet()) { String key = entry.getKey().toString().trim(); if ( !needKeepVersion && key.startsWith("version")) continue; String value = entry.getValue().toString().trim(); servers.add(key .concat("=") .concat(value)); } Collections.sort(servers); out.write(StringUtils.joinStrings(servers, "\n")); } }); }
private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{ if(isHierarchical){ return new QuorumHierarchical(dynamicConfigProp); } else { /* * The default QuorumVerifier is QuorumMaj */ //LOG.info("Defaulting to majority quorums"); return new QuorumMaj(dynamicConfigProp); } }
public synchronized void initConfigInZKDatabase(QuorumVerifier qv) { if (qv == null) return; // only happens during tests try { if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) { // should only happen during upgrade LOG.warn("configuration znode missing (should only happen during upgrade), creating the node"); this.dataTree.addConfigNode(); } this.dataTree.setData(ZooDefs.CONFIG_NODE, qv.toString().getBytes(), -1, qv.getVersion(), Time.currentWallTime()); } catch (NoNodeException e) { System.out.println("configuration node missing - should not happen"); } }
@Test public void testInitialConfigHasPositiveVersion() throws Exception { qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); ZooKeeper[] zkArr = createHandles(qu); testNormalOperation(zkArr[1], zkArr[2]); for (int i=1; i<4; i++) { String configStr = testServerHasConfig(zkArr[i], null, null); QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr); long version = qv.getVersion(); Assert.assertTrue(version == 0x100000000L); } }