在之前的文章中我们知道了 RocketMQ 里面的核心功能、架构和概念。并且也介绍了它的简单使用与 Spring Boot 的集成。下面开始我们对 RocketMQ 的源码探索,首先我们先在自己本地搭建 RocketMQ 的源代码环境。
首先我们可以在 rocketmq github 网站下载 Rocket MQ 的源代码,它是以 maven 进行项目管理的。接着把项目导入到自己的开发工具中,博主使用的是 Idea。导入 idea 如下图显示:
下面我们来介绍一下项目中的各个 Module:
名称 | 作用 |
---|---|
acl | RocketMQ 的访问权限控制 |
broker | RocketMQ 的核心模块之一:用于接收消息发送方的请求,主要包括消息处理、存储、以及高可用相关的功能 |
client | RocketMQ 的核心模块之一:消息发送方依赖该模块把消息发送至 broker |
common | RocketMQ 的基础模块:提供一些基础工具以及功能定义 |
distribution | RocketMQ 中的一些核心配置以及服务的功能脚本 |
example | RocketMQ 的功能示例 |
filter | RocketMQ 的消息过滤机制 |
logappender | RocketMQ 的日志 Appender 适配 |
logging | RocketMQ 的日志框架适配 |
namesrv | RocketMQ 的名称服务,主要保存服务的元信息,包括: broker、主题相关的信息 |
openmessaging | RocketMQ 对于 openmessaging 规范的实现 |
remoting | RocketMQ 各个进程之间的相互通信模块,主要是通过 NIO 框架 Netty 来实现的 |
srvutil | 服务的一些工具类 |
store | RocketMQ 消息的日志持久化 |
test | RocketMQ 项目的测试用例 |
tools | RocketMQ 项目提供的一些工具类 |
步骤一:在本地创建一个目录用于 RocketMQ 的运行环境,/Users/zhaoyong/rocketmq
,然后在这个目录之下创建:conf
、logs
以及 store
目录。
步骤二:从 distribution
把 broker.conf
、logback_broker.xml
和 logback_namesrv.xm
配置Copy 到 /Users/zhaoyong/rocketmq/conf
目录下。并且修改 broker.conf
:
broker.conf
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 #nameServer 地址,分号分割 namesrvAddr=127.0.0.1:9876 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 存储路径 storePathRootDir=/Users/zhaoyong/rocketmq/store #commitLog 存储路径 storePathCommitLog=/Users/zhaoyong/rocketmq/store/commitlog # 消费队列存储路径 storePathConsumeQueue=/Users/zhaoyong/rocketmq/store/consumequeue # 消息索引|存储路径 storePathindex=/Users/zhaoyong/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/Users/zhaoyong/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/Users/zhaoyong/rocketmq/store/abort
步骤三:配置 Name Server 的启动信息:
主要是配置以下二点:
org.apache.rocketmq.namesrv.NamesrvStartup
ROCKETMQ_HOME=/Users/zhaoyong/rocketmq
步骤四:启动 Name Server,运行结果如下:
步骤一:配置 Broker 的启动信息
主要配置为以下几点:
org.apache.rocketmq.broker.BrokerStartup
-c /Users/zhaoyong/rocketmq/conf/broker.conf
ROCKETMQ_HOME=/Users/zhaoyong/rocketmq
步骤二:启动 Broker,运行结果如下:
修改 org.apache.rocketmq.example.quickstart.Producer
类,在 在 producer.start()
之前设置命名服务地址:
producer.setNamesrvAddr("localhost:9876"); producer.start();
然后运行 org.apache.rocketmq.example.quickstart.Producer
,运行效果如下:
修改 org.apache.rocketmq.example.quickstart.Consumer
类,在 在 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
之前设置命名服务地址:
producer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
然后运行 org.apache.rocketmq.example.quickstart.Consumer
,运行效果如下:
下面是我们运行消息发送与消息消费后的 RocketMQ 的工具目录,其中 +
后面代表目录,└
后面代码文件。
/Users/zhaoyong/rocketmq └ abort + conf └ broker.conf └ logback_broker.xml └ logback_namesrv.xml + logs + store └ abort └ checkpoint + commitlog └ 00000000000000000000 + config └ consumerFilter.json └ consumerFilter.json.bak └ consumerOffset.json └ consumerOffset.json.bak └ delayOffset.json └ delayOffset.json.bak └ subscriptionGroup.json └ topics.json └ topics.json.bak + consumequeue + TopicTest + 0 └ 00000000000000000000 + 1 └ 00000000000000000000 + 2 └ 00000000000000000000 + 3 └ 00000000000000000000 + index └ 20211222211320060 └ lock
下面是一个消息的运动轨迹: