ZooKeeperServer
Ctrl + n全局查找Leader,然后ctrl + f 查找lead(
Leader.java
void lead() throws IOException, InterruptedException { ... ... // 启动zookeeper服务 startZkServer(); ... ... } final LeaderZooKeeperServer zk; private synchronized void startZkServer() { ... ... // 启动Zookeeper zk.startup(); ... ... }
LeaderZooKeeperServer.java
@Override public synchronized void startup() { super.startup(); if (containerManager != null) { containerManager.start(); } }
ZookeeperServer.java
public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); // setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
点击PrepRequestProcessor,并查找它的run方法
public void run() { try { while (true) { Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } pRequest(request); } } catch (RequestProcessorException e) { if (e.getCause() instanceof XidRolloverException) { LOG.info(e.getCause().getMessage()); } handleException(this.getName(), e); } catch (Exception e) { handleException(this.getName(), e); } LOG.info("PrepRequestProcessor exited loop!"); } protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.setHdr(null); request.setTxn(null); try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; case OpCode.createTTL: CreateTTLRequest createTtlRequest = new CreateTTLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); break; case OpCode.deleteContainer: case OpCode.delete: DeleteRequest deleteRequest = new DeleteRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; case OpCode.reconfig: ReconfigRequest reconfigRequest = new ReconfigRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest); pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); break; case OpCode.setACL: SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); break; case OpCode.check: CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); break; case OpCode.multi: MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) { Record subrequest = op.toRequestRecord(); int type; Record txn; /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { type = OpCode.error; txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); } /* Prep the request and convert to a Txn */ else { try { pRequest2Txn(op.getType(), zxid, request, subrequest, false); type = request.getHdr().getType(); txn = request.getTxn(); } catch (KeeperException e) { ke = e; type = OpCode.error; txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} aborting" + " remaining multi ops. Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); /* Rollback change records from failed multi-op */ rollbackPendingChanges(zxid, pendingChanges); } } //FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array())); } request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type)); request.setTxn(new MultiTxn(txns)); break; //create/close session don't require request record case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break; //All the rest don't need to create a Txn - just verify session case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: case OpCode.checkWatches: case OpCode.removeWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break; default: LOG.warn("unknown type " + request.type); break; } } catch (KeeperException e) { ... ... } catch (Exception e) { ... ... } request.zxid = zks.getZxid(); nextProcessor.processRequest(request); }