2021SC@SDUSC
本次继续syncWithLeader的分析
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); zk.startup();
follower发送给leader 一个ack
qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if(qp.getType() != Leader.ACK){ LOG.error("Next packet was supposed to be an ACK"); return; } LOG.info("Received NEWLEADER-ACK message from " + getSid()); leader.waitForNewLeaderAck(getSid(), qp.getZxid());
leader 处理 发送过来的ack
public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { synchronized (newLeaderProposal.ackSet) { if (quorumFormed) { return; } long currentZxid = newLeaderProposal.packet.getZxid(); if (zxid != currentZxid) { LOG.error("NEWLEADER ACK from sid: " + sid + " is from a different epoch - current 0x" + Long.toHexString(currentZxid) + " receieved 0x" + Long.toHexString(zxid)); return; } if (isParticipant(sid)) { newLeaderProposal.ackSet.add(sid); } if (self.getQuorumVerifier().containsQuorum( newLeaderProposal.ackSet)) { quorumFormed = true; newLeaderProposal.ackSet.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit() * self.getTickTime(); while (!quorumFormed && cur < end) { newLeaderProposal.ackSet.wait(end - cur); cur = Time.currentElapsedTime(); } if (!quorumFormed) { throw new InterruptedException( "Timeout while waiting for NEWLEADER to be acked by quorum"); } } } }
利用 newLeaderProposal.ackSet.add(sid);
把发送ack的follower加入到集合中,当有超过一半的follower发送了ack之后。
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
通知follower 数据同步完毕