RouteInfoManager基本结构:
public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //topic和queue列表的映射 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//brokerName和Broker信息的映射 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//集群名和BrokerName的映射 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;//broker地址和broker运行状态的映射 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//broker地址和过滤器列表的映射 public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); } }
其他角色会主动向NamerServer更新状态,其主要逻辑在DefaultRequestProcessor类中,根据不同的请求码做对应的处理:
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
此外,断开的事件也会触发状态更新,具体逻辑在BrokerHousekeepingService类中
此外,NameServer还有定时检查时间戳的逻辑,Broker向NameServer发送心跳包会更新时间戳,档NameServer检查到时间戳长时间没有更新后,便会触发清理逻辑,如下:
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
每10秒检测一次,超过2分钟,会将Broker从RouteInfoManager维护的列表中去除。