消息队列是一种应用程序对应用程序的通信方法,是分布式系统的重要组件,可以解决一些应用场景的高并发问题,当不需要立即获得结果,但是并发量又需要进行控制的时候,就需要使用MQ来处理。
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
比如直播平台的送礼操作,一个操作会跟随产生许多其他操作,也就是一个操作的后续业务链很长。
此时为了避免用户送一次礼就要响应很久,影响送礼心情,就可以只做主流业务——扣钱,而其他后续业务链都保存在消息队列里,逐步消化,只保证最终一致性;而前端实时显示特效来响应用户的操作,比如用户送礼连击就会产生连击特效,以此增强给用户的反馈。
假设整个业务链需要10秒完成,但是扣钱只需要200ms,此时通过使用消息队列,就提升了整体业务的响应效率,所以消息队列能用来处理这种需要异步处理的场景
具体哪个业务需要提取出来即时处理,哪些功能存到消息队列里异步处理,就得看产品经理的需求文档以及技术主管与之互撕的结果,这里面的小心思就不属于后端技术领域了。
比如突然有10W个请求,但是后端只能同时处理1W个请求,此时就可以把9W个请求放在消息队列里,后端逐步消化这些请求。
也因此,MQ广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况。
多个应用要对同一消息进行处理,比如用户送礼获得经验,送礼了完成任务也获得经验,这些操作都要调用处理用户经验的接口,此时就可以通过设立请求链,也就是使用消息队列来处理,避免调用接口失败导致整个过程失败。
如果某一操作调用接口失败了,就把这个操作放回队列里重新发送请求,要是一直失败,就需要人工介入,做人工信息补偿。
一个系统的各个模块可分为消息队列、消息生产者和消息消费者,生产者负责产生消息,生产的消息储存在消息队列中,消费者(可能有多个)负责对消息队列中的消息进行处理
例如一个项目,有订单服务、用户服务、商品服务,每当前端发送请求到订单服务,订单服务收到后立马返回响应,并把后续任务放到一个地方,供其他模块使用,这里的订单服务就是生产者,而用户管理、商品管理去消息队列中获取信息并处理,它们就是消费者。
这种系统中,供生产者保存信息的地方就是消息队列
当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分数据库如Redis、Mysql也可实现消息队列的功能。
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
kafka之所以对消息的重复、丢失、错误有容忍度,是因为kafka应用于数据吞吐量高的场景,追求极致的速度。
所有的软件都不是完美的,要在某方便做到极致,就要有所损失,追求极致的速度,就要在功能上让步,追求功能的完善,就要在速度上让步。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
ActiveMQ是由Apache出品,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
使用RocketMQ的一个好处是,可以很轻松的把项目移到阿里云上。
rocketmq/concept.md at master · apache/rocketmq · GitHub
下面以RocketMQ官方GitHub的文档说明为例,说明消息队列的基本概念
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
broker就是我们需要安装和运行的东西
topic是一类消息的集合,比如赠送礼物、购买商品,所以topic相当于业务分类
“ConsumerGroup 由多个Consumer 实例构成”就是指可以有多个消费者,且消费者之间可以分组
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
生产者负责发布消息,比如订单服务负责发布订单,而发布的消息会被broker服务器接收,发送的方式很多,如果需要broker返回确认信息,那么broker要告诉消息生产者它是否接收到数据。
这些发送方式没有绝对的好和坏,都有各自的优点,比如单向发送不需要等待,所以快
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
消费者负责订阅消息以及消费消息,两种消费形式就是主动和被动的区别。
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
我发送一条消息,这条消息肯定得有自己的主题,比如赠送礼物操作,相关消息就应该订阅赠送礼物的topic,即每一条消息都要挂在一个topic下,用topic描述一条消息是干嘛的
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
broker会记录生产者存放在这里的消息,有没有被消费者拿去消费
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
注意点:
“Broker IP列表”,说明一个项目中可以设置多个broker
名字服务相当于Nginx,各个broker就相当于tomcat,由于broker要挂名字服务下,所以名字服务要先启动
但是集群之间不进行数据互通,不存在master、slaver的概念,就是普通的集群
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
就是消费者主动调用,通常需要做定时任务,每隔一段时间去查询一次broker
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
就是broker主动推送给各个消费者,这种方式实时性自然最高
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
就是说几个生产者构成一个集合,要是某个生产者挂了,其他生产者顶上
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
就是说几个消费者构成一个集合,要是某个消费者挂了,其他消费者顶上
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
只需要知道每条消息有唯一的Message ID即可
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
就是在同一个topic下挂载的消息,继续细分消息所属业务的东西,标签可有可无,仅仅是topic的一个扩展
RocketMQ属于java应用,需要jdk环境
如图,准备好maven、rocketmq的源码、rocketmq的web控制台
由于安装rocketMQ需要编译源码,所以需要maven
将压缩包放在/opt目录下,解压缩包
tar xvf apache-maven-3.6.0-bin.tar.gz
移动文件夹到/usr/local目录下
mv apache-maven-3.6.0 /usr/local/
修改linux系统的环境变量,目的是为了能像win系统那样,在各处都能使用maven的命令
vim /etc/profile
来到环境变量的文件中,在文件末尾追加如下代码,export就是linux的环境变量命令
export MAVEN_HOME=/usr/local/apache-maven-3.6.0 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin
接着重新刷新配置文件
source /etc/profile
然后进入maven的配置文件setting.xml
vim /usr/local/apache-maven-3.6.0/conf/settings.xml
需要在配置文件中配置镜像源,如果在公司,公司有自己的远程仓库,通常设为自己公司的,如果不在公司或者公司没有相关服务器,就设为阿里云的镜像源
<mirror> <id>aliyun</id> <mirrorOf>central</mirrorOf> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror>
将RocketMQ解压
unzip rocketmq-all-4.3.0-source-release.zip
移动解压出来的文件夹到/user/local目录下
mv rocketmq-all-4.3.0 /usr/local/
进入rocketMQ的目录
cd /usr/local/rocketmq-all-4.3.0
在rocketMQ目录下编译RocketMQ,执行命令后,maven会帮助我们把所有的项目都编译,并且把依赖都准备好
mvn -Prelease-all -DskipTests clean install -U 说明: mvn 表示用maven进行编译 -P 表示要引入的properties -Prelease-all 表示要编译的是正式版本所有东西 -D 表示要指定的profile -DskipTests 表示跳过测试用例 不加这个的话RocketMQ会运行一遍,有点浪费时间
RocketMQ的默认设置中需要占用虚拟机8G的内存,对于生成环境下只是进行测试的虚拟机来说,有点过大,所以需要手动修改bin目录下的两个文件runbroker.sh和runserver.sh
首先进入编译后的路径下
cd distribution/target/apache-rocketmq
然后使用vim编辑器打开runbroker.sh
默认设置:
修改后:
不过如图的设置有点小,可能导致运行时内存溢出,所以要结合个人硬件设施去修改
接着使用vim编辑器打开runserver.sh
默认设置:
修改后:
首先需要启动Name Server
> nohup sh bin/mqnamesrv &
使用了nohup命令,简单来说就是会把日志打印到当前目录的nohup.out文件中
运行后的提示
此时可以通过下面代码查看name server的日志
tail -f ~/logs/rocketmqlogs/namesrv.log
使用下面代码就能启动broker
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & 说明 -n 表示要连接的ip和端口号,如果要是再启动一台broker,那么这里-n就不能再写localhost,需要写当前虚拟机的地址 autoCreateTopicEnable=true 表示开启自动创建topic的功能,不开启的话要是创建新的topic会失败
运行后的提示
可以通过下面的代码查看broker的日志
tail -f ~/logs/rocketmqlogs/broker.log
从 Windows 上的开发环境连接到虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙一般都会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP 的会话,关闭后再读写,就有可能导致异常(RemotingTooMuchRequestException: sendDefaultImpl call timeout)
对于配置差的电脑来说,linux的防火墙也是一个不小的负荷,更容易造成这种超时情况,所以生产环境下只是为了测试的话,可以把防火墙关闭
systemctl stop firewalld
安装RocketMQ时跳过了测试案例,但不代表不需要测试,现在要手动测试RocketMQ
首先将localhost:9876临时加入环境变量 便于下面的两行代码使用这个ip和端口号
export NAMESRV_ADDR=localhost:9876
然后测试发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
可以看出发送的消息是很简单的单向消息,每一条消息都有自己的消息ID
接着测试接收信息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收到的消息有一个body值,这就是正文,是字节数组
转为字符串,可以看到就是类似hello world的信息
准备好压缩包
首先要解压文件
unzip rocketmq-externals-master.zip
然后移动文件夹到/user/local下
mv rocketmq-externals-master /usr/local/
进入文件夹,可以看到有许多版本,我们需要进入的是console
/usr/local/rocketmq-externals-master/rocketmq-console/src/main/resources
由于后端用springboot项目,所以要修改配置文件,首先进入文件夹
cd src/main/resources/
然后使用vim编译器修改配置文件
vim application.properties
默认application.properties中设置为空,需要修改为当前name server所在端口和ip
现在可以执行编译安装了,编译完成后可以生成一个jar文件
mvn clean package -Dmaven.test.skip=true
执行启动命令
//这个命令会默认使用8080端口 java -jar target/rocketmq-console-ng-1.0.0.jar & //如果8080端口被占用,就要换一个端口 java -jar target/rocketmq-console-ng-1.0.0.jar --server.port=8085 &
通过访问虚拟机ip+8085端口,就能进入监控平台
通过cluster可以看到集群情况,以及消息的生产和消费情况
通过topic可以看到目前有多少topic
通过message可以查看目前有哪些信息,可以通过topic进行搜索
根据官网文档,编写一个简单实例
常量类
public interface Constants { /** * 默认编码 */ String DEFAULT_ENCODE = "UTF-8"; /** * 系统为一个组 */ String MQ_PRODUCER_GROUP_NAME = "CRM"; /** * RocketMQ的NameServ地址 */ String MQ_NAMESRV_ADDR = "192.168.2.53:9876"; /** * RocketMQ Topic - 创建订单 */ String MQ_TOPIC_CREATE_ORDER = "CREATE_ORDER"; }
实例代码
public class RocketTests { Logger logger = LoggerFactory.getLogger(RocketTests.class); public void test(){ try { //创建消息生产者 DefaultMQProdicer producer = new DefaultMQProducer(MQ_PRODUCER_GROUP_NAME); //设置名称服务的地址 producer.setNamesrvAddr(MQ_NAMESRV_ADDR); //开始消息服务 producer.start(); String mgs = "需要发送的消息内容"; //传入Message的构造函数的是topic和body,其中body是一个字节数组,所以需要将字符串改为数组,并指定编码格式 Message message = new Message(MQ_TOPIC_CREATE_ORDER,msg.getBytes(DEFAULT_ENCODE)); SendResult send = producer.send(message); //SendStatus是一个内置的枚举类,通过比较发送状态判断是否发送成功 if(SendStatus.SEND_OK.equals(send.getSendStatus())){ String msgId = send.getMsgId(); logger.info("消息发送成功,消息ID:{}",msgId) } } catch (MQClientException e) { e.printStackTrace(); } } }
注意点
“new DefaultMQProducer( )”时,传入构造函数的是生产者组名字,这个名字可以随意起名,只需要保证唯一,可以以系统命名或者组命名,像这种固定值通过写在常量类中
“new Message( )”时,需要传入Message的构造函数的是topic和body,其中body是一个字节数组,所以需要将字符串改为数组,并指定编码格式
下面提到的几种问题的解决方案,已经有很成熟的代码了,如果以后开发的项目需要使用消息队列,直接照着网上找到的或者公司给出的代码用就行了,下面只是给出常见的解决方案的原理,能看懂就行了,不需要能自己写出来——反之,真的能自己写出来,那对于刚毕业的学生来说就很厉害了,随便拿个12~13K不是问题
没有哪个中间件敢说自己100%没问题,即便是阿里云提供的收费产品也不敢说
一个产品做的好坏,有时候并不与产品有关,还和产品的战斗体系数值、货币体系有关,要是崩坏了这个产品就没了,所以一个产品不能出错,因此必须具备处理可能发生的错误的途径
虽然MQ说消息不会重复,但是我们要自己确定消息不会被重复消费——把消息存在一个地方,每次消费后删除,每次消费前先查找是否有这个消息,如果没有就说明消息已经被消费,这就避免了重复消费
类似解决重复消费的问题,再创建一个表,把创建的信息ID记录下来,每次发送前先去这个表根据信息id查一下待发送信息是否已经发送,这就解决了重复发送的问题
上述简单实例中只有发送成功的操作,实际开发中如果发送失败,需要写一个循环重新发送,如果在规定的循环次数内都发送失败,就要记录这个信息到数据库或者redis中,并通知客服或者是技术人员,等人工检查并重启服务后,手动发送信息,确保信息一定能送达——即人工介入,人工信息补偿
注意点:
要记录的信息通常是该业务信息的唯一元素,比如订单id
将发送失败的信息记录到数据库或reids后,还需要配套一个后台管理系统给客服以及技术人员使用
通过设置定时任务,实现定时扫描表中记录的失败数据并通知客服和技术人员,这就是自动纠错手段
除了自动手段,通常还需要配置人工反馈通道,也就是客服,当用户找到客服时,客服通过这个后台管理系统查业务信息,比如订单id,发现确实存在业务信息发送失败的情况后,就会通知技术人员,等技术人员解决完毕,客服再手动点击后台管理系统的功能按钮,重新创建订单,解决问题。
如果只配置了定时任务进行定时扫描和通知,那么可以把数据放在redis表中,但如果还配备了人工渠道,那么还是把数据放在mysql中吧,处理完毕就把订单状态改掉
下面是为了解决上述三个问题的修正代码
首先准备一个常量类,记录要使用的reids的库名,这个库用于保存生产的消息id
public interface RedisConstants { /** * 保存MQ生产的消息id * key : 消息id * value : 任意值 */ String MQ_CREATE_ORDER = "mq:create_order"; }
然后修改简单实例的代码
@SpringBootTest public class RocketTests { Logger logger = LoggerFactory.getLogger(RocketTests.class); @Resource private StringRedisTemplate stringRedisTemplate; public static DefaultMQProducer producer; static { try { //创建消息生产者 producer = new DefaultMQProducer(MQ_PRODUCER_GROUP_NAME); //设置名称服务的地址 producer.setNamesrvAddr(MQ_NAMESRV_ADDR); //开始消息服务 producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 同步发送消息 - 会获取消息的回执 */ @Test public void sendMsg(){ try { for (int i = 0; i < 3; i++) { //发送失败最多重试2次. String msg = "需要发送的消息内容"; Message message = new Message(MQ_TOPIC_CREATE_ORDER,msg.getBytes(DEFAULT_ENCODE)); SendResult send = producer.send(message); //SendStatus是一个内置的枚举类,通过比较发送状态判断是否发送成功 if(SendStatus.SEND_OK.equals(send.getSendStatus())){ String msgId = send.getMsgId(); //保存生产的消息id到redis指定的库中,key就是消息id,value随便给1 stringRedisTemplate.opsForHash().put(MQ_CREATE_ORDER,msgId,"1"); logger.info("消息发送成功 , 消息ID : {}",msgId); break; } if(i == 2){ //1.将错误的业务消息保存到消息发送失败的表中[创建发送失败的表或者保存到redis的list中]. //2.创建一个定时任务,每隔5分钟查询一次失败列表,并重新发送消息. //3.创建一个后台功能,允许人工触发失败列表发送消息. } } } catch (Exception e) { e.printStackTrace(); } } }
上面代码中还差一点功能没写,有点麻烦,下面直接说思路
现在已经把生产的消息保存到redis指定的库中,以后要消费消息前,先来到这个redis库中直接执行删除键值对的操作,如果删除成功则继续执行后面的消息消费操作,如果删除失败则说明消息已经被消费,这就避免了重复消费的问题
如果消息消费的过程中失败回滚,那就把这个消息的id重新存回redis中
这种消息模式会获取消息的回执,确保消息送达,具体代码见上面的简单实例,发送的方法如下
.send(消息)
不等待返回结果,不需要确保是否发送成功
常用于保存不重要的数据,比如收集用户经常点击的功能,一段时间后统计一下哪些是高频功能哪些是很少用的功能,不好用的功能就删除或者折叠
发送的方法如下
.sendOneway(消息);
具体代码如下
@SpringBootTest public class RocketTests { Logger logger = LoggerFactory.getLogger(RocketTests.class); public static DefaultMQProducer producer; static { try { //创建消息生产者 producer = new DefaultMQProducer(MQ_PRODUCER_GROUP_NAME); //设置名称服务的地址 producer.setNamesrvAddr(MQ_NAMESRV_ADDR); //开始消息服务 producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 单向发送消息- 不等待返回结果, 不需要确保是否发送成功 */ @Test public void sendOneWay(){ try { String str = "单向消息"; Message message = new Message(MQ_TOPIC_CREATE_ORDER, str.getBytes(DEFAULT_ENCODE)); producer.sendOneway(message); } catch (Exception e) { e.printStackTrace(); } }
异步发送消息的方法通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待broker的响应。
具体代码如下
@SpringBootTest public class RocketTests { Logger logger = LoggerFactory.getLogger(RocketTests.class); public static DefaultMQProducer producer; static { try { //创建消息生产者 producer = new DefaultMQProducer(MQ_PRODUCER_GROUP_NAME); //设置名称服务的地址 producer.setNamesrvAddr(MQ_NAMESRV_ADDR); //开始消息服务 producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 异步消息 */ @Test public void sendAsyncMsg(){ logger.info("========开始发送消息========="); try { //CountDownLatch 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助类,构造函数需要传入一个与线程数相同的计数器 CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { String str = "异步消息 " + i; Message message = new Message(MQ_TOPIC_CREATE_ORDER, str.getBytes(DEFAULT_ENCODE)); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { logger.info("消息发送成功,消息id : {}" ,sendResult.getMsgId()); } @Override public void onException(Throwable e) { logger.error("消息发送失败!"); } }); } //设定等待5秒 countDownLatch.await(5,TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } logger.info("========发送消息结束========="); }
注意点:
为了解决主线程被提前释放的问题,就要主线程执行等待操作,但是并不能人为写死主线程的等待时间,所以就要使用CountDownLatch类,这个类允许一个或多个线程等待直到在其他线程中执行的一组操作完成,这个类的构造函数需要传入一个值作为计数器依据,这个值要与消息数相同,这里的for循环10次,那么计数器的值就是10
“countDownLatch.await(5,TimeUnit.SECONDS)”代表所有信息发送完毕后,再等待5秒钟,然后再执行这条代码之后的代码
消费者要去消息队列中获取数据,整个过程包括实例化消费者对象、设定nameserver地址、订阅topic和tag、创建监听、设置单次推送的信息数量上限以及各种操作失败时的回滚代码、防止重复消费的代码
具体代码如下,部分代码简单的用文字说明(反正实际开发也不写这种底层代码)
@SpringBootTest public class RocketTests { Logger logger = LoggerFactory.getLogger(RocketTests.class); /** * 消费消息,创建消息监听 * @param args */ public static void main(String[] args) { try { //创建消费者对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQ_PRODUCER_GROUP_NAME); //设置nameserv地址 consumer.setNamesrvAddr(MQ_NAMESRV_ADDR); /** * 订阅需要订阅的topic和tag * 使用 * 匹配所有的tag * 使用 tagA || tagB || tagC 监听多个指定的tag消息 * 通过调用多个.subscribe()方法来订阅多个topic */ consumer.subscribe(MQ_TOPIC_CREATE_ORDER,"*"); //设定每次MQ给这个消费者对象发的数据的上限条数 consumer.setConsumeMessageBatchMaxSize(2); //订阅消息后,创建监听 根据registerMessageListener方法的提示信息,new一个MessageListenerConcurrently //MessageListenerConcurrently是一个接口,所以需要去实现这个接口 重写consumeMessage方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //遍历msgs数组,从中获取到消息id和消息内容,即便消息内容是对象也能转为字节数组,但是需要对象的类实现序列化接口 msgs.forEach(messageExt -> { String msgId = messageExt.getMsgId(); // 这里有一步操作 用文字替代了:根据获取到的消息id前往redis删除消息记录,避免消息重复消费 byte[] body = messageExt.getBody(); try { System.out.printf("消息id : %s , 消息内容 : %s \n" , msgId,new String(body,DEFAULT_ENCODE)); //在这里实现业务,也就是“消费消息” //解析消息内容 //根据消息内容中的参数进行业务处理 //处理完毕.... } catch (UnsupportedEncodingException e) { //这里有一步操作 用文字替代了:报错后,将消息id重新放回redis存放生成的消息的库中 因为redis没办法跟着spring事务回滚 只能手动回滚 e.printStackTrace(); } }); //告诉MQ 当前消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
注意点:
订阅topic和tag时,使用 * 匹配所有的tag,使用 tagA || tagB || tagC 监听多个指定的tag消息,通过调用多个.subscribe()方法来订阅多个topic
订阅消息后,使用registerMessageListener方法创建监听,并实现MessageListenerConcurrently接口,重写consumeMessage方法
consumeMessage方法的返回值是一个枚举类,其实就是根据操作结果告诉MQ,是“操作成功”还是“稍后重试”
从list数组中取值时,即便消息内容是对象也可以转为byte数组,只要对象的类实现了序列化接口
如果消费消息的过程中报错,需要将消息id重新放回redis存放生成的消息的库中,因为redis没办法跟着spring事务回滚,只能手动回滚
使用setConsumeMessageBatchMaxSize方法可以设置MQ每次给这个消费者对象发送的数据的上限条数