zookeeper投票选举源码分析

2023-11-07 01:48

本文主要是介绍zookeeper投票选举源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 leader 选举

Leader 选举会分两个过程
启动的时候的 leader 选举、 leader 崩溃的时候的的选举

1. 服务器启动时的 leader

选举每个节点启动的时候状态都是 LOOKING,处于观望状态,接下来就开始进行选主流程进行 Leader 选举,至少需要两台机器(具体原因前面已经讲过了),我们选取 3 台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器 Server1 启动时,它本身是无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,这个时候两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。选举过程如下

(1) 每个 Server 发出一个投票。由于是初始情况,Server1和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID、epoch,使用(myid, ZXID,epoch)来表示,此时 Server1的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本
轮投票(epoch)、是否来自LOOKING状态的服务器。
(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK 规则如下
i. 优先检查 ZXID。ZXID 比较大的服务器优先作为Leader
ii. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。
对于 Server1 而言,它的投票是(1, 0),接收 Server2的投票为(2, 0),首先会比较两者的 ZXID,均为 0,再
比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,
它不需要更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对
于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出
了 Leader。
(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为
FOLLOWING,如果是 Leader,就变更为 LEADING。

2. 运行过程中的 leader 选举

当集群中的 leader 服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的
Leader 选举,服务器运行期间的 Leader 选举和启动时期的 Leader 选举基本过程是一致的。
(1) 变更状态。Leader 挂后,余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING,然后开
始进入 Leader 选举过程。
(2) 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为
123,Server3的ZXID为122;在第一轮投票中,Server1和 Server3 都会投自己,产生投票(1, 123),(3, 122),
然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。
(3) 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。
(4) 统计投票。与启动时过程相同。
(5) 改变服务器的状态。与启动时过程相同

二 Leader 选举源码分析

1.从入口函数 QUORUMPEERMAIN 开始
2. QuorumPeer 重写了 Thread.start 方法
3. 初始化 LEADERELECTION 调用startLeaderElection
4. QuorumPeer 类的startLeaderElection方法中调用选举算法createElectionAlgorithm
5. createElectionAlgorithm方法 创建FastLeaderElection对象 构造函数中调用FastLeaderElection类的starter方法
6. 接下来调用 fle.start() , 也就是会调用 FastLeaderElection
start()方法,该方法主要是对发送线程和接收线程的初始
化 , 左 边 是 FastLeaderElection 的 start , 右 边 是
messager.start()
7. wsThread 和 wrThread 的 初 始 化 动 作 在
FastLeaderElection 的 starter 方法里面进行,这里面有两
个内部类,一个是 WorkerSender,一个是 WorkerReceiver,
负责发送投票信息和接收投票信息
8. 然后再回到 QuorumPeer.java。 FastLeaderElection 初始
化完成以后,调用 super.start(),最终运行 QuorumPeer 的
run 方法
9. 最重要的就是run方法 run方法中调用setCurrentVote(makeLEStrategy().lookForLeader());,最终根据策略应该运行 FastLeaderElection 中的选举算法

从入口函数 QuorumPeerMain开始

/*** To start the replicated server specify the configuration file name on* the command line.* @param args path to the configfile*/
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(),config.getDataLogDir(),config.getSnapRetainCount(),config.getPurgeInterval());purgeMgr.start();//判断是standalone模式还是集群模式if (args.length == 1 && config.isDistributed()) {//集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}
}public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");MetricsProvider metricsProvider;try {metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(),config.getMetricsProviderConfiguration());} catch (MetricsProviderLifeCycleException error) {throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);}try {ServerMetrics.metricsProviderInitialized(metricsProvider);ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;//为客户端提供读写的server 也就是2181的端口访问功能if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);}quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());quorumPeer.setObserverMasterPort(config.getObserverMasterPort());quorumPeer.setConfigFileName(config.getConfigFilename());quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier() != null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if (quorumPeer.isQuorumSaslAuthEnabled()) {quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();if (config.jvmPauseMonitorToRun) {quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));}//启动主线程quorumPeer.start();ZKAuditProvider.addZKStartStopAuditLog();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);} finally {if (metricsProvider != null) {try {metricsProvider.stop();} catch (Throwable error) {LOG.warn("Error while stopping metrics", error);}}}
}

调用 QuorumPeer 的 start方法

@Override
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}//恢复db  loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxidloadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}//选举初始化startLeaderElection();startJvmPauseMonitor();super.start();
}ResponderThread responder;public synchronized void stopLeaderElection() {responder.running = false;responder.interrupt();
}public synchronized void startLeaderElection() {try {//如果当前节点状态是LOOKING 投票给自己if (getPeerState() == ServerState.LOOKING) {currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch (IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//根据配置获取选举算法 配置选举算法,选举算法有 3 种,可以通过在 zoo.cfg 里
//面进行配置,默认是 fast 选举this.electionAlg = createElectionAlgorithm(electionType);
}@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {Election le = null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 1:throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");case 2:throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");case 3://leader选举io负责类QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if (listener != null) {//启动已绑定的选举线程 等待集群中其他机器连接listener.start();//基于TCP的选举算法FastLeaderElection fle = new FastLeaderElection(this, qcm);fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;
}@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {LOG.debug("Initializing leader election protocol...");return electionAlg;
}protected synchronized void setLeader(Leader newLeader) {leader = newLeader;
}protected synchronized void setFollower(Follower newFollower) {follower = newFollower;
}protected synchronized void setObserver(Observer newObserver) {observer = newObserver;
}public synchronized ZooKeeperServer getActiveServer() {if (leader != null) {return leader.zk;} else if (follower != null) {return follower.zk;} else if (observer != null) {return observer.zk;}return null;
}boolean shuttingDownLE = false;@Override
public void run() {updateThreadName();LOG.debug("Starting quorum peer");try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for (QuorumServer s : getView().values()) {ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {RemotePeerBean rBean = new RemotePeerBean(this, s);try {MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);jmxRemotePeerBean.put(s.id, rBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*/while (running) {//判断当前节点状态switch (getPeerState()) {case LOOKING://如果是LOOKING 则进入选举流程LOG.info("LOOKING");ServerMetrics.getMetrics().LOOKING_COUNT.add(1);if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式来决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {observer.shutdown();setObserver(null);updateServerState();// Add delay jitter before we switch to LOOKING// state to reduce the load of ObserverMasterif (isRunning()) {Observer.waitForObserverElectionDelay();}}break;case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}}} finally {LOG.warn("QuorumPeer main thread exited");MBeanRegistry instance = MBeanRegistry.getInstance();instance.unregister(jmxQuorumBean);instance.unregister(jmxLocalPeerBean);for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {instance.unregister(remotePeerBean);}jmxQuorumBean = null;jmxLocalPeerBean = null;jmxRemotePeerBean = null;}
}

FastLeaderElection 构造方法中调用starter方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {this.stop = false;this.manager = manager;starter(self, manager);
}/*** This method is invoked by the constructor. Because it is a* part of the starting procedure of the object that must be on* any constructor of this class, it is probably best to keep as* a separate method. As we have a single constructor currently,* it is not strictly necessary to have it separate.** @param self      QuorumPeer that created this object* @param manager   Connection manager*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;//业务层发送队列 业务对象ToSendsendqueue = new LinkedBlockingQueue<ToSend>();//业务层接收队列 业务对象Notificationrecvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);
}/*** This method starts the sender and receiver threads.*/
public void start() {this.messenger.start();
}/*** Constructor of class Messenger.** @param manager   Connection manager*/Messenger(QuorumCnxManager manager) {//创建发送投票信息this.ws = new WorkerSender(manager);this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");this.wsThread.setDaemon(true);//创建接收投票信息this.wr = new WorkerReceiver(manager);this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");this.wrThread.setDaemon(true);}/*** Starts instances of WorkerSender and WorkerReceiver*/void start() {//启动业务层发送线程 将消息发送给IO负责类QuorumCnxManagerthis.wsThread.start();//启动业务层接收线程 从IO负责类QuorumCnxManager 接收消息this.wrThread.start();}/*** Starts a new round of leader election. Whenever our QuorumPeer* changes its state to LOOKING, this method is invoked, and it* sends notifications to all other peers. LOOKFORLEADER 开始选举*/public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}self.start_fle = Time.currentElapsedTime();try {/** The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority* of participants has voted for it.*///收到的投票Map<Long, Vote> recvset = new HashMap<Long, Vote>();/** The votes from previous leader elections, as well as the votes from the current leader election are* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than* the electionEpoch of the received notifications) in a leader election.*///存储选举结果Map<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = minNotificationInterval;synchronized (this) {//增加逻辑时钟logicalclock.incrementAndGet();//更新自己的zxid和epochupdateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid));//发送投票 包括发送给自己sendNotifications();SyncedLearnerTracker voteSet;/** Loop in which we exchange notifications until we find a leader*///进行while循环 直到选举出leaderwhile ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/** Remove next notification from queue, times out after 2 times* the termination time*///从IO线程里拿到投票信息 自己的投票也在这里处理Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*///如果为空 消息发完了 继续发送 一直到选出leader为止if (n == null) {if (manager.haveDelivered()) {sendNotifications();} else {//消息还没投递出去 可能是其他server还没启动 尝试再连接manager.connectAll();}/** Exponential backoff*///延长超时时间int tmpTimeOut = notTimeout * 2;notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);LOG.info("Notification time out: {}", notTimeout);//收到投票消息 判断收到的消息是不是属于这个集群内} else if (validVoter(n.sid) && validVoter(n.leader)) {/** Only proceed if the vote comes from a replica in the current or next* voting view for a replica in the current or next voting view.*///判断收到的消息的节点的状态switch (n.state) {case LOOKING:if (getInitLastLoggedZxid() == -1) {LOG.debug("Ignoring notification as our zxid is -1");break;}if (n.zxid == -1) {LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break;}// If notification > current, replace and send messages out//判断接收到的节点epoch大于logicalclock  则表示当前是新一轮的选举if (n.electionEpoch > logicalclock.get()) {//更新本地logicalclocklogicalclock.set(n.electionEpoch);//清空接收队列recvset.clear();//检查收到的消息是否可以胜出 依次比较epoch zxid myidif (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {//胜出后 把投票改为对方的票据updateProposal(n.leader, n.zxid, n.peerEpoch);} else {//否则 票据不变updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}//继续广播 让其他节点知道我现在的票据sendNotifications();//如果收到的消息epoch小于当前节点的epoch 则忽略这条消息} else if (n.electionEpoch < logicalclock.get()) {LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",Long.toHexString(n.electionEpoch),Long.toHexString(logicalclock.get()));break;//如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",n.sid,n.leader,Long.toHexString(n.zxid),Long.toHexString(n.electionEpoch));// don't care about the version if it's in LOOKING state//添加到本机投票集合 用来做选举终结判断recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));//判断选举是否结束 默认算法是超过半数server同意if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leader//一直等到新的通知到达 直到超时while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*///确定leaderif (n == null) {//修改状态setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}}break;//OBSERVING 不参与选举投票case OBSERVING:LOG.debug("Notification from observer: {}", n.sid);break;//这两种需要参与选举case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*///判断epoch是否相同if (n.electionEpoch == logicalclock.get()) {//如果相同 加入本机的投票集合recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));//判断是否结束 如果结束 确认leader是否有效if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {//修改自己的状态并返回投票结果setPeerState(n.leader, voteSet);Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify that* a majority are following the same leader.** Note that the outofelection map also stores votes from the current leader election.* See ZOOKEEPER-1732 for more information.*/outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {synchronized (this) {logicalclock.set(n.electionEpoch);setPeerState(n.leader, voteSet);}Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}return null;} finally {try {if (self.jmxLeaderElectionBean != null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());}}/*** Send notifications to all peers upon a change in our vote*//***   广播消息*/private void sendNotifications() {//循环发送for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();//消息实体ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"+ " {} (myid), 0x{} (n.peerEpoch) ",proposedLeader,Long.toHexString(proposedZxid),Long.toHexString(logicalclock.get()),sid,self.getId(),Long.toHexString(proposedEpoch));//添加到发送队列 这个队列会被workersender消费sendqueue.offer(notmsg);}}class WorkerSender extends ZooKeeperThread {volatile boolean stop;QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager) {super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {//从发送队列中获取消息实体ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if (m == null) {continue;}process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m     message to send*/void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);manager.toSend(m.sid, requestBuffer);}}

消息如何广播,看 sendNotifications 方法

  1. 调用sendNotifications sendqueue.offer(notmsg);
  2. WorkerSender 的run方法里执行process方法 进行发送广播

FastLeaderElection 选举过程

其实在这个投票过程中就涉及到几个类,
FastLeaderElection:FastLeaderElection实现了Election接口,实现各服务器之间基于 TCP 协议进行选举
Notification:内部类,Notification 表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的 id、zxid、选举周期等信息
ToSend:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的 id、zxid、选举周期等信息
Messenger : Messenger 包 含 了 WorkerReceiver 和WorkerSender 两个内部类;
WorkerReceiver:实现了 Runnable 接口,是选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue 中
WorkerSender:也实现了 Runnable 接口,为选票发送器,其会不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中

这篇关于zookeeper投票选举源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360426

相关文章

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

Linux中的more 和 less区别对比分析

《Linux中的more和less区别对比分析》在Linux/Unix系统中,more和less都是用于分页查看文本文件的命令,但less是more的增强版,功能更强大,:本文主要介绍Linu... 目录1. 基础功能对比2. 常用操作对比less 的操作3. 实际使用示例4. 为什么推荐 less?5.

spring-gateway filters添加自定义过滤器实现流程分析(可插拔)

《spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔)》:本文主要介绍spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔),本文通过实例图... 目录需求背景需求拆解设计流程及作用域逻辑处理代码逻辑需求背景公司要求,通过公司网络代理访问的请求需要做请