Broker启动的源码入口是BrokerStartup.java
主要流程和NameServer启动有些类似
BrokerController
BrokerController
public static void main(String[] args) { start(createBrokerController(args)); } public static BrokerController start(BrokerController controller) { try { controller.start(); String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); if (null != controller.getBrokerConfig().getNamesrvAddr()) { tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); } log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
public static BrokerController createBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } // 设置socket 发送缓冲大小 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } try { //PackageConflictDetect.detectFastjson(); // 解析启动参数 args Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } // Broker核心配置类 final BrokerConfig brokerConfig = new BrokerConfig(); // Netty服务器配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // Netty客户端配置TLS final NettyClientConfig nettyClientConfig = new NettyClientConfig(); // 设置Netty客户端是否使用 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); // 设置Netty服务器监听端口为 10911 nettyServerConfig.setListenPort(10911); // Broker 消息配置类 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // broker是否为 slave broker 如果是 则将 消息占用内存百度分 减10%, 默认40%,超过内存的消息将置换出内存 if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } // 读取解析启动配置信息 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); // 检查ROCKETMQ_HOME环境变量, 如果没有直接退出 if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } // 获取Nameserver地址,;分割解析为数组 因为Nameserver可能为集群,所以有多个 String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } // 判断Broker角色,作相应的处理 master有两种同步消息方式 switch (messageStoreConfig.getBrokerRole()) { // 异步复制:生产者写入消息到Master后无需等待消息复制到slave即可返回,消息的复制由旁路线程进行异步复制 case ASYNC_MASTER: /* 同步复制的方式,表现出来的是类似同步双写的策略。即Master写入完消息之后,需要等待Slave的复制成功。 注,这里只需要有一个Slave复制成功并成功应答即算成功,所以在这种模式下,如果有3个Slave,当生产者获得SEND_OK的应答时, 代表消息已经达到Maser和一个Slave(注:这里并不代表已经持久化到磁盘,而只能证明肯定到了PageCache, 是否能刷到磁盘取决于刷盘策略是同步刷盘还是异步刷盘),而还有两个Slave实际上是无法保证的, 并且这里也不支持配置,即不支持如“”同步半数以上”之类的设置 */ case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; /** * 消息发送的状态除了SEND_OK外,还会多出以下的状态: * FLUSH_SLAVE_TIMEOUT : 同步到slave等待超时,即一直等Slave上报同步的进度,但过了超时时间都没有成功没有同步完。 * SLAVE_NOT_AVAILABLE:当前没有可用的Slave。注:如果slave落后master实在太多, * 那个slave也会认为是暂时不可用的slave,直到它同步到接近的范围为止, * 这个不可用的阈值由broker配置haSlaveFallbehindMax(默认是1024 * 1024 * 256)决定 */ case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } // 是否基于dleger技术管理主从同步和CommitLog if (messageStoreConfig.isEnableDLegerCommitLog()) { brokerConfig.setBrokerId(-1); } // 设置HA监听端口 messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // 日志相关 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); // 打印启动参数信息 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); // 基于上面的配置参数 创建 BrokerController final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); // 初始化 BrokerController boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 钩子线程 JVM退出时打印消费时间 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
上面代码可以看到主要是读取一些配置文件,根据配置文件设置不同的一些枚举状态,然后在上面代码中又有一个比较核心的方法
boolean initResult = controller.initialize();
我们进去看看
下面代码很长,可以简单过一下,这里先作一个总结吧
下面的代码主要是配置启动一些线程池,然后作内存持久化到磁盘的操作,然后就是一些Netty的配置,大致就是干了这些事。这里不用太注意细节,源码每个点都看懂太难了,我们主要是挑核心的看,下面代码我也作了一些注释
public boolean initialize() throws CloneNotSupportedException { // 从磁盘中初始化 topicConfigManager boolean result = this.topicConfigManager.load(); // 加载消费偏移 result = result && this.consumerOffsetManager.load(); // 加载消费组信息 result = result && this.subscriptionGroupManager.load(); // 加载过滤器配置 result = result && this.consumerFilterManager.load(); if (result) { try { // 创建消息存储管理组件,默认使用commitLog 管理磁盘消息 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 如果使用Dleger 管理 commitlog 初始化相关信息 if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } result = result && this.messageStore.load(); if (result) { // 启动Netty服务器 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getProcessReplyMessageThreadPoolNums(), this.brokerConfig.getProcessReplyMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.replyThreadPoolQueue, new ThreadFactoryImpl("ProcessReplyMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true)); this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); this.registerProcessor(); final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; // 定时记录broker状态 默认 86400s 一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); // 定时持久化 offset, 5s 持久化一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定时持久化filter, 10s同步一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } initialTransaction(); initialAcl(); initialRpcHooks(); } return result; }
比较关键的一个点就是Broker是如何注册到NameServer的,我们来看看这段代码吧
在创建出BrokerController
后调用了start方法
我们进入这个方法会看到一个定时线程池
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
然后进入方法registerBrokerAll
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // Topic 相关配置信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 处理 TopicConfig信息 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } // 判断是否进行Broker注册 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 真正注册Broker关键入口 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
我们进入到doRegisterBrokerAll
看看,这下面就是注册Broker到NameServer的核心代码,涉及到Netty网络请求相关的代码被封装在registerBrokerAll()
方法中,这段代码看完了我们可以进去看看
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 将Broker 注册到所有NameServer上,NameServer可能是集群有多个,所以这里是List List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); // 如果注册结果大于 0 则继续处理相关逻辑 if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } }
这里我们来看看registerBrokerAll()
方法
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { // 用于存放每个远程注册的返回结果 final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); // NameServer的注册地址 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 构建网络请求头 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // 构建网络请求 body RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); // 多线程注册提高效率 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { //自定义线程池 brokerOuterExecutor.execute(() -> { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
我们这里就把Broker的启动流程分析完了,我们这里并没有一行代码一行代码去看,因为有些代码不知道实际的业务场景,我们也不知道是干嘛的,总的来说看了整个源码的一个核心流程,相关细节并没有很深入,主要是对Broker的启动有了一个大致的了解,后续如果需要修改或者遇到了相关问题,可以很好的找到入库,问题排查也会更轻松