本文详细介绍了RocketMQ源码的环境搭建、核心模块结构和重要类的解析,帮助读者全面理解RocketMQ的工作原理。通过详细的步骤和示例代码,读者可以轻松掌握RocketMQ的消息发送和接收流程。文章还提供了RocketMQ源码阅读的技巧和调试方法,帮助开发者深入探究RocketMQ源码。
RocketMQ源码环境搭建git clone https://github.com/apache/rocketmq.git cd rocketmq
mvn clean install -DskipTests
这个命令会下载所有依赖并编译RocketMQ源码,同时跳过单元测试以加快构建速度。
ROCKETMQ_HOME
指向RocketMQ的根目录。sh bin/mqbroker -n localhost:9876 &
这个命令启动RocketMQ的Broker服务,并绑定到指定的地址localhost:9876
。
RocketMQ源码结构清晰,主要包括以下几个核心模块:
Client模块: 提供客户端发送和接收消息的功能。
org.apache.rocketmq.client
: 包含客户端相关的类,如DefaultMQProducer
和DefaultMQPullConsumer
等。// 创建Producer实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置Producer配置 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start();
org.apache.rocketmq.remoting
: 包含网络传输相关的类,如RemotingCommand
和RemotingServer
等。// 发送消息到Broker RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SINGLE_SEND, null); request.setBody(msg.getBody()); request.setExtFields(MessageExtFieldHelper.createExtFields(msg));
3..
Store模块: 提供持久化存储功能。
org.apache.rocketmq.store
: 包含消息存储相关的类,如DefaultMessageStore
和MessageQueue
等。// DefaultMessageStore类消息存储实现 DefaultMessageStore store = new DefaultMessageStore(); store.load();
MQClientAPI模块: 提供客户端API。
org.apache.rocketmq.client.impl
: 包含消息发送和消费的具体实现类,如MQClientAPIImpl
等。// MQClientAPIImpl类中的消息发送和消费实现 MQClientAPIImpl impl = new MQClientAPIImpl(); impl.send(msg);
MQClientHa模块: 提供客户端容错机制。
org.apache.rocketmq.client.impl
: 包含容错相关类,如MQClientManager
和RebalanceService
等。// MQClientManager类中的容错机制实现 MQClientManager manager = new MQClientManager(); manager.rebalance();
MQClientFactory模块: 提供客户端工厂模式。
org.apache.rocketmq.client.impl.factory
: 包含客户端工厂类,如MQClientFactory
等。// MQClientFactory类客户端工厂模式实现 MQClientFactory factory = new MQClientFactory(); factory.createMQClient();
MQClientAPIOneway模块: 提供单向消息发送。
org.apache.rocketmq.client.impl.MQClientAPIOneway
: 包含单向消息发送的功能。// 单向消息发送实现 MQClientAPIOneway.oneway(msg);
org.apache.rocketmq.admin
: 包含管理相关的类,如MQAdminImpl
等。// MQAdminImpl类中的管理功能实现 MQAdminImpl admin = new MQAdminImpl(); admin.describeBrokerInfo();
各个模块之间相互协作,共同完成分布式消息系统的核心功能。例如:
消息发送流程主要由以下几个步骤构成:
// 创建Producer实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置Producer配置 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); // 创建消息对象 Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到Broker SendResult sendResult = producer.send(msg); // 关闭Producer实例 producer.shutdown();
消息消费流程主要由以下几个步骤构成:
// 创建Consumer实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置Consumer配置 consumer.setNamesrvAddr("localhost:9876"); // 订阅Topic consumer.subscribe("TestTopic", "*"); // 注册消息处理回调函数 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { System.out.printf("Received message: %s %n", new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动Consumer实例 consumer.start();RocketMQ重要类分析
Message
类是RocketMQ中表示消息的核心类。它包含了消息的所有必要信息,如消息主题、标签、消息体、键等。
Message(String topic, String body)
: 构造函数,创建一个包含主题和消息体的消息对象。setMessageId(String msgId)
: 设置消息ID。setTopic(String topic)
: 设置主题。setBody(byte[] body)
: 设置消息体。toString()
: 返回消息的字符串表示形式。// 创建消息对象 Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 设置消息ID msg.setMessageId("123456"); // 设置主题 msg.setTopic("TestTopic"); // 设置消息体 msg.setBody("Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 输出消息信息 System.out.println(msg.toString());
Consumer
类是RocketMQ中表示消费者的抽象类。它提供了订阅和消费消息的功能。
DefaultMQPushConsumer
: 推模式消费者,消息由Broker主动推送给消费者。DefaultMQPullConsumer
: 拉模式消费者,消费者主动从Broker拉取消息。// 创建推模式消费者实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置Consumer配置 consumer.setNamesrvAddr("localhost:9876"); // 订阅Topic consumer.subscribe("TestTopic", "*"); // 注册消息处理回调函数 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { System.out.printf("Received message: %s %n", new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动Consumer实例 consumer.start();
DefaultMessageStore
类是RocketMQ中负责持久化存储的基础类。
load()
: 加载持久化存储中的数据。commitLog()
:提交消息到日志文件。pullMessage()
:从持久化存储中拉取消息。// DefaultMessageStore类中的消息存储实现 DefaultMessageStore store = new DefaultMessageStore(); store.load();
MessageQueue
类是RocketMQ中表示消息队列的类,用于维护消息的队列信息。
putMessage(Message msg)
: 将消息放入队列。pollMessage()
: 从队列中获取消息。size()
: 获取队列中消息的数量。// MessageQueue类中的消息队列操作 MessageQueue queue = new MessageQueue(); queue.putMessage(new Message("Hello RocketMQ")); System.out.println(queue.size());RocketMQ源码阅读技巧
DefaultMQProducer
和Message
类。DefaultMQProducer.send
方法开始,追踪其调用链。RemotingClient
和RemotingCommand
类的网络通信逻辑。MessageStore
类的消息存储实现。// DefaultMQProducer.send方法 public <T> SendResult send(T msg) throws MQClientException { Message message = this.getMQMessage(msg); SendResult sendResult = this.getMQProducerImpl().sendMessage(message, this.defaultMQProducer.getDefaultTopicQueueNums()); return sendResult; } // RemotingCommand发送消息 public CompletableFuture<SendResult> sendDefaultImpl(final String topic, final Message msg) throws RemotingException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SINGLE_SEND, null); request.setBody(msg.getBody()); request.setExtFields(MessageExtFieldHelper.createExtFields(msg)); return this.remotingSendRequest(request, this.namesrvAddr, this.defaultMQProducer.getSendMsgTimeout()); }
DefaultMQPushConsumer
和MessageListenerConcurrently
类。DefaultMQPushConsumer.subscribe
方法开始,追踪其调用链。RemotingCommand
和MessageQueue
类的消息拉取实现。MessageListenerConcurrently
类的消息处理逻辑。// DefaultMQPushConsumer.subscribe方法 public void subscribe(final String topic, final String selector) throws MQClientException { this.subscribeInner(topic, selector, MessageSelector.byTag(selector), null); } // RemotingCommand拉取消息 public CompletableFuture<List<MessageExt>> pull(final String topic, final String consumerGroup, final String queueId, final String beginMessageOffset, final int maxNums) throws RemotingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL, null); request.setBody(MessageExtFieldHelper.createExtFields(topic, queueId, beginMessageOffset, maxNums)); return this.remotingSendRequest(request, this.namesrvAddr, this.defaultMQPushConsumer.getPullMsgTimeout()); } // MessageListenerConcurrently处理消息 public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("Received message: %s %n", new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }常见问题与解答
MessageStore
类将消息写入磁盘。MessageQueue
类维护消息的队列信息。// 示例单元测试 public class RocketMQTest { @Test public void sendMessageTest() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); producer.shutdown(); assertNotNull(sendResult); } }