实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:
ZooKeeperServer |-- QuorumZooKeeperServer |-- LeaderZooKeeperServer |-- LearnerZooKeeperServer |-- FollowerZooKeeperServer |-- ObserverZooKeeperServer |-- ReadOnlyZooKeeperServer
// tickTime参数默认值 public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; // 默认tickTime * 2 protected int minSessionTimeout = -1; // 默认tickTime * 20 protected int maxSessionTimeout = -1; // 会话跟踪 protected SessionTracker sessionTracker; // 存储组件 private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; // 缓存数据 private ResponseCache readResponseCache; private ResponseCache getChildrenResponseCache; // zxid会在启动阶段设置为最新lastZxid private final AtomicLong hzxid = new AtomicLong(0); // 请求处理器链入口 protected RequestProcessor firstProcessor; // 缓存变化的数据 final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; // 大请求判断使用的参数 private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; private volatile int largeRequestThreshold = -1;
// 通过zkDb从dataTree中删除Watcher监听器 void removeCnxn(ServerCnxn cnxn); // 创建zkDb(为null时)并loadData加载数据 public void startdata() throws IOException, InterruptedException; // 加载数据、清理session、生成快照 public void loadData() throws IOException, InterruptedException; // 保存zkDb当前快照 public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException; // 从指定的输入流解析数据,生成新的zkDb和SessionTrack public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException; // 使用zkDb.truncateLog(zxid)删除快照数据 public void truncateLog(long zxid) throws IOException; // 通过zkDb获取dataTree.lastProcessedZxid的值 public long getLastProcessedZxid(); // 提交closeSession类型的Request来关闭会话 private void close(long sessionId); // 使用zkDb杀掉会话 protected void killSession(long sessionId, long zxid); // 启动组件 private void startupWithServerState(State state); // 创建RequestProcessor用来处理请求 protected void setupRequestProcessors(); // 创建SessionTracker protected void createSessionTracker(); // 为指定的session生成一个密码 byte[] generatePasswd(long id); // 验证session密码 protected boolean checkPasswd(long sessionId, byte[] passwd); // 使用sessionTracker创建session、生成密码、提交一个createSession请求 long createSession(ServerCnxn cnxn, byte[] passwd, int timeout); // 为指定的session绑定owner public void setOwner(long id, Object owner) throws SessionExpiredException; // 验证session之后使用finishSessionInit方法确定继续通信或者断开连接 protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException; public void finishSessionInit(ServerCnxn cnxn, boolean valid); // checkPasswd->revalidateSession->finishSessionInit public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException; // 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理 public void enqueueRequest(Request si); // 使用firstProcessor处理请求 public void submitRequestNow(Request si); // 处理连接请求,网络IO层调用 public void processConnectRequest( ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException; // 处理业务请求,网络IO层调用 public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException; // sasl认证 private void processSasl( RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException; // 处理transaction public ProcessTxnResult processTxn(TxnHeader hdr, Record txn); public ProcessTxnResult processTxn(Request request); private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn); private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest); // Grant or deny authorization to an operation on a node public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException; // Check a path whether exceeded the quota public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException; private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace) throws KeeperException.QuotaExceededException; // 获取上级父类path private String parentPath(String path) throws KeeperException.BadArgumentsException; // 从Request获取有效的path private String effectiveACLPath( Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException; // 根据Request获取需要的权限类型 private int effectiveACLPerms(Request request); // 检查写权限 public boolean authWriteRequest(Request request);
加载数据、清理session、生成快照:
public void loadData() throws IOException, InterruptedException { // 初始化zxid if (zkDb.isInitialized()) { setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDataBase()); } // 使用killSession方法杀死过期会话 zkDb.getSessions().stream() .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); // 保存快照 // txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap) takeSnapshot(); }
protected void killSession(long sessionId, long zxid) { // 需要清理临时节点 zkDb.killSession(sessionId, zxid); if (sessionTracker != null) { // 删除会话跟踪信息 sessionTracker.removeSession(sessionId); } }
private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); // 创建RequestProcessor用于处理请求 setupRequestProcessors(); // 这是一个限流的组件,不做分析 startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(state); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); }
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor) syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor) firstProcessor).start(); }
RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。
FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。
SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:
PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); }
不同的子类使用了不同的SessionTracker实现类:
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { if (passwd == null) { passwd = new byte[0]; } // 创建一个session long sessionId = sessionTracker.createSession(timeout); // 生成session密码 Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); // 提交createSession请求,该请求会被RequestProcessor处理 CreateSessionTxn txn = new CreateSessionTxn(timeout); cnxn.setSessionId(sessionId); Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); submitRequest(si); return sessionId; }
public void submitRequestNow(Request si) { try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); // 使用firstProcessor处理请求 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { // Update request accounting/throttling limits requestFinished(si); } catch (RequestProcessorException e) { // Update request accounting/throttling limits requestFinished(si); } }
public void processConnectRequest( ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { long sessionId = request.getSessionId(); // 略 if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { // zxid参数有误 throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } int sessionTimeout = request.getTimeOut(); byte[] passwd = request.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the session is setup cnxn.disableRecv(); if (sessionId == 0) { // 创建session long id = createSession(cnxn, passwd, sessionTimeout); } else { validateSession(cnxn, sessionId); // do nothing // 关闭旧的ServerCnxn if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } if (secureServerCnxnFactory != null) { secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } cnxn.setSessionId(sessionId); // 开启新session reopenSession(cnxn, sessionId, passwd, sessionTimeout); } }
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { cnxn.incrOutstandingAndCheckThrottle(h); if (h.getType() == OpCode.auth) { AuthPacket authPacket = request.readRecord(AuthPacket::new); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; // 认证、继续通信或者关闭连接,略 return; } else if (h.getType() == OpCode.sasl) { processSasl(request, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { return; } else { Request si = new Request( cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); int length = request.limit(); if (isLargeRequest(length)) { // 判断large请求 checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); // 提交请求等待firstProcessor处理 submitRequest(si); } } }
// entry point for quorum/Learner.java public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { processTxnForSessionEvents(null, hdr, txn); return processTxnInDB(hdr, txn, null); } // entry point for FinalRequestProcessor.java public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null); final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read if (!writeRequest && !quorumRequest) { return new ProcessTxnResult(); } synchronized (outstandingChanges) { ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. if (writeRequest) { long zxid = hdr.getZxid(); while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = outstandingChanges.remove(); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); if (outstandingChangesForPath.get(cr.path) == cr) { outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. if (quorumRequest) { getZKDatabase().addCommittedProposal(request); } return rc; } } private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { int opCode = (request == null) ? hdr.getType() : request.type; long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; // Add the session to the local session map or global one in zkDB. sessionTracker.commitSession(sessionId, cst.getTimeOut()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } } private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { if (hdr == null) { return new ProcessTxnResult(); } else { return getZKDatabase().processTxn(hdr, txn, digest); } }
集群模式下的ZooKeeperServer基类:
Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor
集群模式下leader节点使用的ZooKeeperServer实现类:
继承QuorumZooKeeperServer
使用的RequestProcessor与父类不同:
// 构建处理器链 protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor( toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点 }
使用LeaderSessionTracker做会话追踪
与learner节点通信
FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾
ToBeAppliedRequestProcessor - 维护toBeApplied列表
CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器
ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; // 内部有维护SyncRequestProcessor和AckRequestProcessor AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean( FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED); }
PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置
LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器
Learner基类:
与ZooKeeperServer类似,只是处理器链不同:
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
使用SyncRequestProcessor来记录leader的提案。
setupRequestProcessors方法:
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor( finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); }
Observer类型节点的ZooKeeperServer实现。
setupRequestProcessors方法:
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor( finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); // 默认false if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }