上一篇详细分析了 NameServer的启动流程 (不包括底层服务端的启动, 仅限于 NamesrvController层面的启动)。 这一篇 主要 针对NameServer在RocketMQ中的角色原理做介绍。
我们知道 RocektMQ中的组件分为: producer、consumer、broker、nameserver。
RocketMQ身为消息队列,那么 producer 与 consumer 之间必然是要存在数据交互的,那么它们之间的数据交互是需要通过 broker做为中间的代理来实现的。 那么producer、consumer 是如何与broker交互呢? 或者说它们是如何得知 对方的状态呢?
维护状态? 这个我们就想到了注册中心, 也雀食如此,在Kafka中有Zookeeper作为注册中心来维护状态,在Dubbo中同样也是通过Zookeeper来维护的。
有了上面的猜想,那么接下来,我们看一张图:
从上面的图中我们可以得知 nameserver 则是在RocketMQ中起到了 注册中心的作用, 但是它并没有完全像在 Kafka与Dubbo中的Zookeeper 起到至高无上大脑的作用。 因为在RocketMQ中 消息存储的功能是交给了broker来做的。
producer ,consumer, broker 跟 NameServer的联系概要:
producer 发送消息, 需要先通过NameServer 获取到broker和Topic 的对应关系, 以此来获取到broker的地址信息。 之后再根据broker的地址信息与其建立长连接发送消息。同时自己也会定时的向 NameServer拉去最新的路由信息,consumer也是如此。
broker在启动时会将 broker信息,topic信息,QueueData信息注册到 所有的NameServer上(注意:这里是所有的NameServer,也就是分别向每一个NameServer注册这些信息),并和所有NameServer保持长连接,同时也会通过定时任务推送 定时推送这些信息到NameServer上以维护broker状态。
因此 NameServer 可以说在 RocketMQ中实际上起到的是 伪大脑 的作用。
由上一篇介绍的 NamesrvController 可知, 其内部有一个属性 是 RouteInfoManager ,该类则是NameServer中 管理元数据的 核心类, 其内部 几乎维护了NameServer 在RocketMQ中起到核心作用的所有元数据信息。
对照着上图,来看下 RouteInfoManager 的核心属性和构造方法:
public class RouteInfoManager { // Topic 消息队列路由信息,消息发送时根据路由表进行负 载均衡 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 基础信息, 包含 brokerName、 所属集群名称 、 主备 Broker地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 集群信息,存储集群中所有 Broker 名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // Broker 状态信息 。 NameServer 每次 收到心跳包时会 替换该信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Broker上的 FilterServer列表,用于类模式消息过滤 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; // 给元数据变量初始化 public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); } }
RocketMQ 基于订阅发布机制 , 一个 Topic 拥有 多 个消息队 列 ,一 个 Broker 为每一主 题默 认创建 4 个读队列 4 个写队列 。 多个 Broker 组成 一个集群 , BrokerName 由相同的多台 Broker 组 成 Master-Slave 架构 , brokerId 为 0 代表 Master, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间 。