Zookeeper源码学习记录
知识点记录:
static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run() { LOG.info("Purge task started."); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occurred while purging.", e); } LOG.info("Purge task completed."); } }
Properties类的使用
集群模式启动:runFromConfig(config);
单机模式启动:ZooKeeperServerMain.main(args);
https://blog.csdn.net/jpf254/article/details/80787018
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
startServerCnxnFactory();
public ZooKeeperThread(String threadName) { super(threadName); setUncaughtExceptionHandler(uncaughtExceptionalHandler); }
This class manages the quorum protocol.
//构建选票,并确定选举算法
真正的选举过程是FastLeaderElection.java中的lookForLeader();
选票比较规则:
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher. */
/** * Check if a pair (server id, zxid) succeeds our current vote. * 比较选票的大小 */ protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug( "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}", newId, curId, Long.toHexString(newZxid), Long.toHexString(curZxid)); if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
在public void run() {} 中的
try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); // Add delay jitter before we switch to LOOKING // state to reduce the load of ObserverMaster if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; }