当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为Leader,其他节点更新自己状态为Follower。
Leader更新状态入口:leader.lead()
Follower更新状态入口:follower.followerLeader()
注意:
(1)follower必须要让leader知道自己的状态:epoch、zxid、sid
必须要找出谁是leader;
发起请求连接leader;
发送自己的信息给leader;
leader接收到信息,必须要返回对应的信息给follower。
(2)当leader得知follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
(3)执行数据同步
(4)当leader接收到超过半数follower的ack之后,进入正常工作状态,集群启动完成了
最终总结同步的方式:
(1)DIFF咱两一样,不需要做什么
(2)TRUNC follower的zxid比leader的zxid大,所以Follower要回滚
(3)COMMIT leader的zxid比follower的zxid大,发送Proposal给foloower提交执行
(4)如果follower并没有任何数据,直接使用SNAP的方式来执行数据同步(直接把数据全部序列到follower)
1)在Leader.java种查找lead()方法
void lead() throws IOException, InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { self.tick.set(0); // 恢复数据到内存,启动时,其实已经加载过了 zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // Start thread that waits for connection requests from // new followers. // 等待其他follower节点向leader节点发送同步状态 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); … … } finally { zk.unregisterJMX(this); } }
class LearnerCnxAcceptor extends ZooKeeperCriticalThread { private volatile boolean stop = false; public LearnerCnxAcceptor() { super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk .getZooKeeperServerListener()); } @Override public void run() { try { while (!stop) { Socket s = null; boolean error = false; try { // 等待接收follower的状态同步申请 s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream( s.getInputStream()); // 一旦接收到follower的请求,就创建LearnerHandler对象,处理请求 LearnerHandler fh = new LearnerHandler(s, is, Leader.this); // 启动线程 fh.start(); } catch (SocketException e) { ... ... } ... ... } } catch (Exception e) { LOG.warn("Exception while accepting follower", e.getMessage()); handleException(this.getName(), e); } } public void halt() { stop = true; } }
其中ss的初始化是在创建Leader对象时,创建的socket
private final ServerSocket ss; Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); try { if (self.shouldUsePortUnification() || self.isSslQuorum()) { boolean allowInsecureConnection = self.shouldUsePortUnification(); if (self.getQuorumListenOnAllIPs()) { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort()); } else { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection); } } else { if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(self.getQuorumAddress().getPort()); } else { ss = new ServerSocket(); } } ss.setReuseAddress(true); if (!self.getQuorumListenOnAllIPs()) { ss.bind(self.getQuorumAddress()); } } catch (BindException e) { ... ... } this.zk = zk; this.learnerSnapshotThrottler = createLearnerSnapshotThrottler( maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); }
1)在Follower.java种查找followLeader()方法
void followLeader() throws InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); try { // 查找leader QuorumServer leaderServer = findLeader(); try { // 连接leader connectToLeader(leaderServer.addr, leaderServer.hostname); // 向leader注册 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) throw new Exception("learned about role change"); //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when following the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX((Learner)this); } } / * Returns the address of the node we think is the leader. */ protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id // 选举投票的时候记录的,最后推荐的leader的sid Vote current = self.getCurrentVote(); // 如果这个sid在启动的所有服务器范围中 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before // attempting to connect. // 尝试连接leader的正确IP地址 s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; } protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { this.sock = createSocket(); int initLimitTime = self.tickTime * self.initLimit; int remainingInitLimitTime = initLimitTime; long startNanoTime = nanoTime(); for (int tries = 0; tries < 5; tries++) { try { // recalculate the init limit time because retries sleep for 1000 milliseconds remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000); if (remainingInitLimitTime <= 0) { LOG.error("initLimit exceeded on retries."); throw new IOException("initLimit exceeded on retries."); } sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); if (self.isSslQuorum()) { ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { ... ... } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
void lead() throws IOException, InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { self.tick.set(0); zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // Start thread that waits for connection requests from // new followers. cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); ...... } finally { zk.unregisterJMX(this); } } class LearnerCnxAcceptor extends ZooKeeperCriticalThread { private volatile boolean stop = false; ... ... @Override public void run() { try { while (!stop) { Socket s = null; boolean error = false; try { s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream( s.getInputStream()); LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start(); } catch (SocketException e) { ... ... } } } catch (Exception e) { LOG.warn("Exception while accepting follower", e.getMessage()); handleException(this.getName(), e); } } public void halt() { stop = true; } } 由于public class LearnerHandler extends ZooKeeperThread{},说明LearnerHandler是一个线程。所以fh.start()执行的是LearnerHandler中的run()方法。 public void run() { try { leader.addLearnerHandler(this); // 心跳处理 tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit; ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput); // 从网络中接收消息,并反序列化为packet QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); // 选举结束后,observer和follower都应该给leader发送一个标志信息:FOLLOWERINFO或者OBSERVERINFO if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){ LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); if (learnerInfoData.length >= 8) { this.sid = bbsid.getLong(); } if (learnerInfoData.length >= 12) { this.version = bbsid.getInt(); // protocolVersion } if (learnerInfoData.length >= 20) { long configVersion = bbsid.getLong(); if (configVersion > leader.self.getQuorumVerifier().getVersion()) { throw new IOException("Follower is ahead of the leader (has a later activated configuration)"); } } } else { this.sid = leader.followerCounter.getAndDecrement(); } if (leader.self.getView().containsKey(this.sid)) { LOG.info("Follower sid: " + this.sid + " : info : " + leader.self.getView().get(this.sid).toString()); } else { LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion())); } if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } // 读取Follower发送过来的lastAcceptedEpoch // 选举过程中,所使用的epoch,其实还是上一任leader的epoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; // 读取follower发送过来的zxid long zxid = qp.getZxid(); // Leader根据从Follower获取sid和旧的epoch,构建新的epoch long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); // Leader向Follower发送信息(包含:zxid和newEpoch) QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF // startForwarding() will be called in all cases boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); /* if we are not truncating or sending a diff just send a snapshot */ if (needSnap) { boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER; LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle); try { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, " + "send zxid of db as 0x{}, {} concurrent snapshots, " + "snapshot was {} from throttle", Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid), Long.toHexString(zxidToSend), snapshot.getConcurrentSnapshotNumber(), snapshot.isEssential() ? "exempt" : "not exempt"); // Dump data to peer leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { snapshot.close(); } } LOG.debug("Sending NEWLEADER message to " + sid); // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier() .toString().getBytes(), null); queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); // Start thread that blast packets in the queue to learner startSendingPackets(); /* * Have to wait for the first ACK, wait until * the leader is ready, and only then we can * start processing messages. */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if(qp.getType() != Leader.ACK){ LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp)); return; } if(LOG.isDebugEnabled()){ LOG.debug("Received NEWLEADER-ACK message from " + sid); } leader.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start(); // now that the ack has been processed expect the syncLimit sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); /* * Wait until leader starts up */ synchronized(leader.zk){ while(!leader.zk.isRunning() && !this.isInterrupted()){ leader.zk.wait(20); } } // Mutation packets will be queued during the serialize, // so we need to mark when the peer can actually start // using the data // LOG.debug("Sending UPTODATE message to " + sid); queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (qp.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit; ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { if (LOG.isDebugEnabled()) { LOG.debug("Received ACK from Observer " + this.sid); } } syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: // Process the touches ByteArrayInputStream bis = new ByteArrayInputStream(qp .getData()); DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); int to = dis.readInt(); leader.zk.touch(sess, to); } break; case Leader.REVALIDATE: bis = new ByteArrayInputStream(qp.getData()); dis = new DataInputStream(bis); long id = dis.readLong(); int to = dis.readInt(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(id); boolean valid = leader.zk.checkIfValidGlobalSession(id, to); if (valid) { try { //set the session owner // as the follower that // owns the session leader.zk.setOwner(id, this); } catch (SessionExpiredException e) { LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e); } } if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(id) + " is valid: "+ valid); } dos.writeBoolean(valid); qp.setData(bos.toByteArray()); queuedPackets.add(qp); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitLearnerRequest(si); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); break; } } } catch (IOException e) { ... ... } finally { ... ... } }
void followLeader() throws InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); try { // 查找leader QuorumServer leaderServer = findLeader(); try { // 连接leader connectToLeader(leaderServer.addr, leaderServer.hostname); // 向leader注册 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) throw new Exception("learned about role change"); //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); // 循环等待接收消息 while (this.isRunning()) { // 读取packet信息 readPacket(qp); // 处理packet消息 processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when following the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX((Learner)this); } } protected long registerWithLeader(int pktType) throws IOException{ /* * Send follower info, including last zxid and sid */ long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); // 发送FollowerInfo给Leader writePacket(qp, true); // 读取Leader返回的结果:LeaderInfo readPacket(qp); final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 如果接收到LeaderInfo if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); // 接收leader的epoch if (newEpoch > self.getAcceptedEpoch()) { // 把自己原来的epoch保存在wrappedEpochBytes里 wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); // 把leader发送过来的epoch保存起来 self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { // since we have already acked an epoch equal to the leaders, we cannot ack // again, but we still need to send our lastZxid to the leader so that we can // sync with it if it does assume leadership of the epoch. // the -1 indicates that this reply should not count as an ack for the new epoch wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } // 发送ackepoch给leader(包含了自己的:epoch和zxid) QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } else { if (newEpoch > self.getAcceptedEpoch()) { self.setAcceptedEpoch(newEpoch); } if (qp.getType() != Leader.NEWLEADER) { LOG.error("First packet should have been NEWLEADER"); throw new IOException("First packet should have been NEWLEADER"); } return qp.getZxid(); } }
public void run() { try { leader.addLearnerHandler(this); // 心跳处理 tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit; ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput); // 从网络中接收消息,并反序列化为packet QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); // 选举结束后,observer和follower都应该给leader发送一个标志信息:FOLLOWERINFO 或者OBSERVERINFO if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){ LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); if (learnerInfoData.length >= 8) { this.sid = bbsid.getLong(); } if (learnerInfoData.length >= 12) { this.version = bbsid.getInt(); // protocolVersion } if (learnerInfoData.length >= 20) { long configVersion = bbsid.getLong(); if (configVersion > leader.self.getQuorumVerifier().getVersion()) { throw new IOException("Follower is ahead of the leader (has a later activated configuration)"); } } } else { this.sid = leader.followerCounter.getAndDecrement(); } if (leader.self.getView().containsKey(this.sid)) { LOG.info("Follower sid: " + this.sid + " : info : " + leader.self.getView().get(this.sid).toString()); } else { LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion())); } if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } // 读取Follower发送过来的lastAcceptedEpoch // 选举过程中,所使用的epoch,其实还是上一任leader的epoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; // 读取follower发送过来的zxid long zxid = qp.getZxid(); // 获取leader的最新epoch // 新的leader会构建一个新的epoch long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); // Leader向Follower发送信息(包含:zxid和newEpoch) QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 接收到Follower应答的ackepoch QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); // 保存了对方follower或者observer的状态:epoch和zxid ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF // startForwarding() will be called in all cases // 方法判断Leader和Follower是否需要同步 boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); /* if we are not truncating or sending a diff just send a snapshot */ if (needSnap) { boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER; LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle); try { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); … … // Dump data to peer leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { snapshot.close(); } } if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier() .toString().getBytes(), null); queuedPackets.add(newLeaderQP); } … … while (true) { … … } } catch (IOException e) { ... ... } finally { ... ... } } public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) { /* * When leader election is completed, the leader will set its * lastProcessedZxid to be (epoch < 32). There will be no txn associated * with this zxid. * * The learner will set its lastProcessedZxid to the same value if * it get DIFF or SNAP from the leader. If the same learner come * back to sync with leader using this zxid, we will never find this * zxid in our history. In this case, we will ignore TRUNC logic and * always send DIFF if we have old enough history */ boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; // Keep track of the latest zxid which already queued long currentZxid = peerLastZxid; boolean needSnap = true; boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled(); ReentrantReadWriteLock lock = db.getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); long maxCommittedLog = db.getmaxCommittedLog(); long minCommittedLog = db.getminCommittedLog(); long lastProcessedZxid = db.getDataTreeLastProcessedZxid(); LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}" + " minCommittedLog=0x{} lastProcessedZxid=0x{}" + " peerLastZxid=0x{}", getSid(), Long.toHexString(maxCommittedLog), Long.toHexString(minCommittedLog), Long.toHexString(lastProcessedZxid), Long.toHexString(peerLastZxid)); if (db.getCommittedLog().isEmpty()) { /* * It is possible that committedLog is empty. In that case * setting these value to the latest txn in leader db * will reduce the case that we need to handle * * Here is how each case handle by the if block below * 1. lastProcessZxid == peerZxid -> Handle by (2) * 2. lastProcessZxid < peerZxid -> Handle by (3) * 3. lastProcessZxid > peerZxid -> Handle by (5) */ minCommittedLog = lastProcessedZxid; maxCommittedLog = lastProcessedZxid; } /* * Here are the cases that we want to handle * * 1. Force sending snapshot (for testing purpose) * 2. Peer and leader is already sync, send empty diff * 3. Follower has txn that we haven't seen. This may be old leader * so we need to send TRUNC. However, if peer has newEpochZxid, * we cannot send TRUNC since the follower has no txnlog * 4. Follower is within committedLog range or already in-sync. * We may need to send DIFF or TRUNC depending on follower's zxid * We always send empty DIFF if follower is already in-sync * 5. Follower missed the committedLog. We will try to use on-disk * txnlog + committedLog to sync with follower. If that fail, * we will send snapshot */ if (forceSnapSync) { // Force leader to use snapshot to sync with follower LOG.warn("Forcing snapshot sync - should not see this in production"); } else if (lastProcessedZxid == peerLastZxid) { // Follower is already sync with us, send empty diff LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) + " for peer sid: " + getSid()); queueOpPacket(Leader.DIFF, peerLastZxid); needOpPacket = false; needSnap = false; } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) { // Newer than committedLog, send trunc and done LOG.debug("Sending TRUNC to follower zxidToSend=0x" + Long.toHexString(maxCommittedLog) + " for peer sid:" + getSid()); queueOpPacket(Leader.TRUNC, maxCommittedLog); currentZxid = maxCommittedLog; needOpPacket = false; needSnap = false; } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // Follower is within commitLog range LOG.info("Using committedLog for peer sid: " + getSid()); Iterator<Proposal> itr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog); needSnap = false; } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) { // Use txnlog and committedLog to sync // Calculate sizeLimit that we allow to retrieve txnlog from disk long sizeLimit = db.calculateTxnLogSizeLimit(); // This method can return empty iterator if the requested zxid // is older than on-disk txnlog Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog( peerLastZxid, sizeLimit); if (txnLogItr.hasNext()) { LOG.info("Use txnlog and committedLog for peer sid: " + getSid()); currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog); LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid)); Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog); needSnap = false; } // closing the resources if (txnLogItr instanceof TxnLogProposalIterator) { TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr; txnProposalItr.close(); } } else { LOG.warn("Unhandled scenario for peer sid: " + getSid()); } LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) + " for peer sid: " + getSid()); leaderLastZxid = leader.startForwarding(this, currentZxid); } finally { rl.unlock(); } if (needOpPacket && !needSnap) { // This should never happen, but we should fall back to sending // snapshot just in case. LOG.error("Unhandled scenario for peer sid: " + getSid() + " fall back to use snapshot"); needSnap = true; } return needSnap; }
protected void processPacket(QuorumPacket qp) throws Exception{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); if (hdr.getType() == OpCode.reconfig){ SetDataTxn setDataTxn = (SetDataTxn) txn; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); self.setLastSeenQuorumVerifier(qv, true); } fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; … … case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; default: LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp)); break; } } public void commit(long zxid) { if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } long firstElementZxid = pendingTxns.element().zxid; if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } Request request = pendingTxns.remove(); commitProcessor.commit(request); }
由于public class LearnerHandler extends ZooKeeperThread{},说明LearnerHandler是一个线程。所以fh.start()执行的是LearnerHandler中的run()方法。
public void run() { … … // LOG.debug("Sending UPTODATE message to " + sid); queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) { … … } } catch (IOException e) { ... ... } finally { ... ... } }