RocketMQ消息中间件是由阿里巴巴开源的一款高性能分布式消息队列,广泛应用于异步处理、流量削峰、解耦合等场景,支持多种编程语言,具有高可用、高性能、高可扩展性等特点。RocketMQ基于Java语言实现,主要用于解决大量的异步处理场景,例如订单系统、秒杀系统等。它支持亿级并发的消息生产与消费,具有毫秒级延迟,支持每秒百万级消息的吞吐量,确保消息不丢失,并提供消息持久化功能。
RocketMQ消息中间件简介RocketMQ是由阿里巴巴开源的一款分布式消息中间件。它基于Java语言实现,主要用于解决大量的异步处理场景,例如订单系统、秒杀系统等。RocketMQ支持亿级并发的消息生产与消费,具有毫秒级延迟,支持每秒百万级消息的吞吐量,确保消息不丢失,并提供消息持久化功能。
RocketMQ的特点主要体现在以下几个方面:
RocketMQ广泛应用于以下场景:
在安装RocketMQ之前,需要确保已经安装了Java环境。RocketMQ支持Java 8及更高版本。可以通过以下命令检查Java是否已安装:
java -version
如果未安装Java,可以前往Oracle官网下载安装包,或者使用以下命令安装Java:
# Ubuntu系统 sudo apt-get update sudo apt-get install openjdk-8-jdk # CentOS系统 sudo yum install java-1.8.0-openjdk
此外,还需要确保RocketMQ的依赖项已正确安装。RocketMQ本身依赖于一些Java库,这些库在解压的lib
目录中提供,因此在启动RocketMQ之前,确保这些依赖项已正确配置。
访问RocketMQ官网下载页面,下载最新版本的RocketMQ。
tar -zxvf rocketmq-all-4.9.3-bin-release.tar.gz cd rocketmq-all-4.9.3
rocketmq-all-4.9.3/ ├── bin ├── lib ├── namesrv.log ├── runserver.sh ├── server.jvmopts └── storage
在RocketMQ中,NameServer是一个轻量级的队列管理器,用于管理Broker的地址信息。
nohup sh bin/mqnamesrv &
Broker是消息存储和转发的实体。RocketMQ支持单机模式和集群模式部署。这里以单机模式为例:
nohup sh bin/mqbroker -n localhost:9876 &
使用mqadmin
命令行工具测试RocketMQ是否正常启动:
sh bin/mqadmin topicList localhost:9876
如果输出了topic列表,说明RocketMQ已经成功启动。
RocketMQ消息生产者开发在RocketMQ中,消息发送方被称为生产者。我们可以使用以下代码创建一个生产者实例:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.protocol.NamesrvAddressing; public class Producer { public static void main(String[] args) throws Exception { // 实例化生产者 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); } }
在创建好生产者实例之后,可以通过以下代码发送消息:
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.DefaultMQProducer; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 创建消息 Message message = new Message("TopicTest", // topic "TagA", // tag "OrderID001".getBytes(RemotingHelper.DEFAULT_CHARSET), // body 100 // properties ); // 发送消息 SendResult sendResult = producer.send(message); System.out.println(sendResult.getSendStatus()); } }
RocketMQ支持同步发送和异步发送两种模式。
同步发送是指发送消息时,生产者会等待Broker返回响应信息之后才返回给调用者,这种方式适合应用需要保证消息发送成功的情况。同步发送的代码示例如下:
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message( "TopicTest", "TagA", "OrderID001".getBytes(RemotingHelper.DEFAULT_CHARSET), 100 ); SendResult sendResult = producer.send(msg); System.out.println("发送结果:" + sendResult.getSendStatus()); } }
异步发送是指发送消息时,生产者不会等待Broker返回响应信息,而是直接返回给调用者,这种方式适合应用不需要等待消息发送结果的情况。异步发送的代码示例如下:
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message( "TopicTest", "TagA", "OrderID001".getBytes(RemotingHelper.DEFAULT_CHARSET), 100 ); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功:" + sendResult); } @Override public void onException(Throwable e) { System.out.println("发送失败:" + e.getMessage()); } }); } }RocketMQ消息消费者开发
在RocketMQ中,消息接收方被称为消费者。我们可以通过以下代码创建一个消费者实例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderMessageContext; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeOrderMessageContext; import org.apache.rocketmq.common.protocol.ResponseCode; public class Consumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅主题 consumer.subscribe("TopicTest", "TagA"); // 注册消息回调处理器 consumer.registerMessageListener((msgs, context) -> { for (org.apache.rocketmq.common.message.MessageExt msg : msgs) { System.out.println("接收到消息:" + new String(msg.getBody())); } return ConsumeOrderMessageContext.CONSUME_SUCCESS; }); // 启动消费者 consumer.start(); } }
在创建好消费者实例之后,可以通过以下代码消费消息:
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "TagA"); consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { System.out.println("接收到消息:" + new String(msg.getBody())); } return ConsumeOrderMessageContext.CONSUME_SUCCESS; }); consumer.start(); } }
在RocketMQ中,消费者配置主要涉及以下几个方面:
RocketMQ的消息模型主要包含以下几个部分:
RocketMQ的消息路由主要涉及到以下几个概念:
在RocketMQ中,消息路由的过程如下:
RocketMQ支持消息过滤功能,可以通过设置Filter表达式来过滤接收到的消息。例如,以下代码展示了如何设置Filter表达式:
consumer.subscribe("TopicTest", "TagA", new MessageSelector() { @Override public boolean filterMessage(final String topic, final String tags, final String properties, final byte[] body) { return tags.equals("TagA") && body.toString().startsWith("OrderID"); } });
当消费者消费消息失败时,RocketMQ会自动将消息重新投递到队列中,消费者可以设置重试次数和重试间隔。例如,以下代码展示了如何设置重试次数和重试间隔:
consumer.setMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyResult consumeMessage(List<MessageExt> msgs, ConsumeOrderMessageContext context) { for (MessageExt msg : msgs) { try { // 消费消息 System.out.println("接收到消息:" + new String(msg.getBody())); return new ConsumeOrderlyResult(true, "继续消费"); } catch (Exception e) { // 消费失败,进入重试逻辑 return new ConsumeOrderlyResult(false, "重试"); } } return new ConsumeOrderlyResult(true, "继续消费"); } });常见问题与解决方案
RocketMQ提供了丰富的日志和监控功能,可以通过以下方式查看日志和监控信息: