本文详细介绍了RocketMQ的架构和功能,包括生产者和消费者的手写实现,以及RocketMQ在集群模式下的配置与启动。此外,还探讨了RocketMQ的日志查看与监控方法。手写RocketMQ学习涵盖了从环境搭建到实际应用的全过程。
RocketMQ是由阿里巴巴开源的消息中间件,它被广泛应用于分布式应用的异步通信和流量削峰填谷。RocketMQ基于高可用的设计,提供了丰富的消息功能,包括消息发布与订阅、消息过滤、消息回溯、消息重试等。
RocketMQ的主要特点包括:
RocketMQ适用于以下场景:
RocketMQ的安装步骤如下:
可以通过以下命令下载RocketMQ的源码:
git clone https://github.com/apache/rocketmq.git cd rocketmq
tar -zvxf rocketmq-all-4.9.2-release.tar.gz cd rocketmq-all-4.9.2-release
编辑系统环境变量文件(如Linux的/etc/profile
),添加以下内容:
export ROCKETMQ_HOME=/path/to/your/rocketmq export PATH=$PATH:$ROCKETMQ_HOME/bin
执行以下命令使环境变量生效:
source /etc/profile
启动NameServer:
sh bin/mqnamesrv
启动Broker:
sh bin/mqbroker -n localhost:9876
启动完成后,可以通过浏览器访问http://localhost:9876
查看NameServer的监控页面。
在RocketMQ的安装目录中,可以通过修改conf
目录下的配置文件来配置RocketMQ的环境变量。主要的配置文件包括:
broker.properties
:Broker的配置文件,主要用于设置Broker的名称、IP地址、端口等。namesrv.properties
:NameServer的配置文件,主要用于设置NameServer的端口等信息。logback
:日志配置文件,可以设置日志的输出格式和存储位置。tools.sh
:脚本文件,用于启动、停止RocketMQ服务。例如,修改broker.properties
文件,设置Broker名称和IP地址:
brokerName=broker-a brokerId=0 brokerAddr=127.0.0.1:10911
在conf
目录下的tools.sh
文件中,可以配置RocketMQ的启动参数,例如:
# broker的配置 export BROKER_HOME=/path/to/broker export JAVA_HOME=/path/to/java # 启动Broker sh mqbroker -c ../conf/broker.properties
启动RocketMQ服务的步骤如下:
sh bin/mqnamesrv
sh bin/mqbroker -n localhost:9876
http://localhost:9876
查看NameServer的监控页面。生产者代码主要包括以下几个部分:
DefaultMQProducer
创建生产者实例,并设置生产者名称。start
方法启动生产者。send
方法发送消息。shutdown
方法关闭生产者。示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); // 创建消息 String message = "Hello RocketMQ"; Message msg = new Message("TopicTest", // topic "TagA", // tag message.getBytes() // message body ); // 发送消息 SendResult sendResult = producer.send(msg); System.out.println(sendResult); // 关闭生产者 producer.shutdown(); } }
创建生产者实例:创建DefaultMQProducer
实例,并设置生产者名称。
设置NameServer地址:通过setNamesrvAddr
方法设置NameServer的地址。
启动生产者:调用start
方法启动生产者,生产者会连接到NameServer,并获取到Broker的地址信息。
创建消息:创建Message
对象,设置消息的主题(Topic)、标签(Tag)、消息体(Message Body)。
发送消息:调用send
方法发送消息,此方法是一个阻塞方法,会等待消息发送完成。
shutdown
方法关闭生产者,释放所有资源。RocketMQ提供了多种异常处理和重试机制。以下是一些常见的异常处理和重试机制:
消息发送异常:如果消息发送过程中发生异常,可以通过捕获MQClientException
和MQBrokerException
来处理。
例如,设置生产者发送消息的最大重试次数:
producer.setSendMsgTimeout(3000); // 设置发送超时时间 producer.setRetryTimesWhenSendFailed(2); // 设置重试次数
例如,异步发送消息的代码:
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("Message sent successfully"); } @Override public void onException(Throwable e) { System.out.println("Message sent failed, reason: " + e.getMessage()); } });
通过以上步骤,可以实现一个简单的RocketMQ生产者,并处理各种异常情况。
消费者代码主要包括以下几个部分:
DefaultMQPushConsumer
或DefaultMQQueueingConsumer
创建消费者实例,并设置消费者名称。subscribe
方法订阅消息。start
方法启动消费者。示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageQueue; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws Exception { // 创建消费者实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅消息 consumer.subscribe("TopicTest", "*"); // 设置消费位置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 注册消息处理回调函数 consumer.registerMessageListener((MessageQueue mq, List<MessageExt> msgs) -> { for (MessageExt msg : msgs) { System.out.println("Received message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者 consumer.start(); } }
RocketMQ提供了两种主要的消费者接收消息的方式:
DefaultMQPushConsumer
类创建的消费者,消费者会主动从Broker拉取消息。这种方式适用于需要实时处理消息的应用场景。DefaultMQQueueingConsumer
类创建的消费者,消费者会从Broker拉取消息并存入队列中。这种方式适用于需要处理大量消息的应用场景。例如,使用QueueingConsumer的代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQQueueingConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageQueue; import org.apache.rocketmq.common.message.MessageExt; public class QueueingConsumerExample { public static void main(String[] args) throws Exception { // 创建QueueingConsumer实例 DefaultMQQueueingConsumer consumer = new DefaultMQQueueingConsumer("QueueingConsumerGroupName"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅消息 consumer.subscribe("TopicTest", "*"); // 设置消费位置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 注册消息处理回调函数 consumer.registerMessageListener((MessageQueue mq, List<MessageExt> msgs) -> { for (MessageExt msg : msgs) { System.out.println("Received message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者 consumer.start(); } } `` ### 消费者消费失败的处理方式 RocketMQ提供了多种消费失败的处理方式: 1. **消息重试**:如果消息消费失败,可以通过配置消息的最大重试次数来实现自动重试。默认情况下,消费者会自动重试消费失败的消息。 2. **消息回溯**:如果消费失败的消息需要重新消费,可以通过设置`consumeFromWhere`来实现消息的回溯消费。 例如,设置消费者消费失败的消息的最大重试次数: ```java consumer.setConsumeRetryMax(2); // 设置最大重试次数
例如,异步消费消息的代码:
consumer.registerMessageListener((MessageQueue mq, List<MessageExt> msgs) -> { for (MessageExt msg : msgs) { System.out.println("Received message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
通过以上步骤,可以实现一个简单的RocketMQ消费者,并处理各种消费失败的情况。
RocketMQ支持集群模式,通过配置NameServer和Broker的集群模式,可以实现高可用和负载均衡。
NameServer支持集群模式,通过配置多个NameServer实例可以实现负载均衡和故障转移。
namesrv.properties
文件,设置NameServer的端口:# namesrv.properties listenPort=9876
sh bin/mqnamesrv -n localhost:9877 sh bin/mqnamesrv -n localhost:9878
# broker.properties namesrvAddr=localhost:9877;localhost:9878
Broker也支持集群模式,通过配置多个Broker实例可以实现负载均衡和故障转移。
broker.properties
文件,设置Broker的名称、IP地址、端口等信息:# broker.properties brokerName=broker-a brokerId=0 brokerAddr=127.0.0.1:10911
sh bin/mqbroker -n localhost:9877 -c ../conf/broker-a.properties sh bin/mqbroker -n localhost:9878 -c ../conf/broker-b.properties
# broker-a.properties brokerClusterName=ClusterA brokerName=broker-a brokerId=0 brokerAddr=127.0.0.1:10911 # broker-b.properties brokerClusterName=ClusterA brokerName=broker-b brokerId=1 brokerAddr=127.0.0.1:10912
构建NameServer集群的步骤如下:
启动多个NameServer实例,每个实例可以运行在不同的机器上。
例如,启动两个NameServer实例:
sh bin/mqnamesrv -n localhost:9877 sh bin/mqnamesrv -n localhost:9878
配置NameServer集群地址:
# broker.properties namesrvAddr=localhost:9877;localhost:9878
构建Broker集群的步骤如下:
启动多个Broker实例,每个实例可以运行在不同的机器上。
例如,启动两个Broker实例:
sh bin/mqbroker -n localhost:9877 -c ../conf/broker-a.properties sh bin/mqbroker -n localhost:9878 -c ../conf/broker-b.properties
配置Broker集群地址:
# broker-a.properties brokerClusterName=ClusterA brokerName=broker-a brokerId=0 brokerAddr=127.0.0.1:10911 # broker-b.properties brokerClusterName=ClusterA brokerName=broker-b brokerId=1 brokerAddr=127.0.0.1:10912
通过以上步骤,可以构建一个简单的RocketMQ集群模式,实现高可用和负载均衡。
RocketMQ提供了详细的日志输出,通过查看和分析这些日志,可以了解RocketMQ的运行状态和问题排查。
RocketMQ的日志文件通常位于logs
目录下,包括以下几个主要的日志文件:
broker.log
:Broker的运行日志,记录Broker的启动、停止、消息处理等信息。namesrv.log
:NameServer的运行日志,记录NameServer的启动、停止、路由信息更新等信息。consumer.log
:消费者的运行日志,记录消费者的启动、停止、消息消费等信息。RocketMQ支持多种日志级别,可以通过修改logback
配置文件来设置日志级别。常用的日志级别包括:
DEBUG
:调试级别,记录详细的调试信息。INFO
:信息级别,记录重要的信息。WARN
:警告级别,记录警告信息。ERROR
:错误级别,记录错误信息。FATAL
:致命错误级别,记录致命错误信息。例如,设置日志级别为INFO
:
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <logger name="org.apache.rocketmq" level="INFO" /> <root level="INFO"> <appender-ref ref="STDOUT" /> </root> </configuration>
可以使用各种日志分析工具来帮助分析RocketMQ的日志,例如:
grep
:命令行工具,可以通过正则表达式搜索日志中的关键字。awk
:命令行工具,可以通过脚本分析日志中的字段。logstash
:日志分析工具,可以将日志文件导入到Elasticsearch中进行分析。ELK
:日志分析套件,包括Elasticsearch、Logstash和Kibana,可以实现日志的收集、分析和可视化。通过以上步骤,可以查看和分析RocketMQ的日志,了解RocketMQ的运行状态和问题排查。
RocketMQ提供了多种监控指标,通过监控这些指标可以了解RocketMQ的运行状态和性能。
RocketMQ的主要监控指标包括:
Broker
:Broker的运行状态和性能指标,包括消息的发送、接收、存储、转发等。NameServer
:NameServer的运行状态和性能指标,包括路由信息的更新、查询等。Consumer
:消费者的运行状态和性能指标,包括消息的消费、处理等。RocketMQ提供了多种监控工具,包括:
RocketMQ Console
:RocketMQ自带的监控控制台,可以通过浏览器访问监控页面。RocketMQ Dashboard
:RocketMQ的控制台插件,提供更多的监控和管理功能。RocketMQ-Tools
:RocketMQ的命令行工具,提供了多种监控和管理命令。Prometheus
:开源的监控系统,可以通过RocketMQ的Prometheus插件进行监控。Grafana
:开源的可视化工具,可以将RocketMQ的监控数据导入Grafana进行可视化。例如,RocketMQ Console的监控页面可以查看Broker的运行状态和性能指标:
http://localhost:9876
可以通过多种方式获取RocketMQ的监控指标,例如:
RocketMQ Console
:通过RocketMQ Console的监控页面获取监控指标。RocketMQ-Tools
:通过RocketMQ-Tools的命令行工具获取监控指标。Prometheus
:通过RocketMQ的Prometheus插件获取监控指标。JMX
:通过JMX获取RocketMQ的监控指标。例如,通过RocketMQ-Tools命令行工具获取Broker的监控指标:
sh bin/mqadmin topics -n localhost:9876
通过以上步骤,可以监控和管理RocketMQ的运行状态和性能。
可以使用脚本进行RocketMQ的监控,以下是一些示例脚本:
可以编写脚本监控Broker的运行状态,例如:
#!/bin/bash # 获取Broker的运行状态 status=$(sh bin/mqadmin brokerList -n localhost:9876 | grep "broker-a") if [ "$status" == "broker-a" ]; then echo "Broker running" else echo "Broker not running" fi
可以编写脚本监控Broker的性能指标,例如:
#!/bin/bash # 获取Broker的性能指标 metrics=$(sh bin/mqadmin topicList -n localhost:9876) if [ "$(echo "$metrics" | grep "TopicTest")" != "" ]; then echo "TopicTest running" else echo "TopicTest not running" fi
可以编写脚本监控NameServer的运行状态,例如:
#!/bin/bash # 获取NameServer的运行状态 status=$(sh bin/mqadmin cluster -n localhost:9876 | grep "ClusterA") if [ "$status" == "ClusterA" ]; then echo "NameServer running" else echo "NameServer not running" fi
可以编写脚本监控NameServer的性能指标,例如:
#!/bin/bash # 获取NameServer的性能指标 metrics=$(sh bin/mqadmin clusterList -n localhost:9876) if [ "$(echo "$metrics" | grep "broker-a")" != "" ]; then echo "TopicTest running" else echo "TopicTest not running" fi
通过以上脚本,可以实现对RocketMQ的监控,并通过脚本进行自动化管理。