消息队列MQ

【架构师面试-消息队列-5】-MQ消息可靠性实战源码解决方案

本文主要是介绍【架构师面试-消息队列-5】-MQ消息可靠性实战源码解决方案,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1:引言

如果保证消息的可靠性?需要解决如下问题

问题1:生产者能百分之百将消息发送给消息队列!

两种意外情况:

第一,消费者发送消息给MQ失败,消息丢失;

第二,交换机路由到队列失败,路由键写错;

问题2:消费者能百分百接收到请求,且业务执行过程中还不能出错!

2:生产者确认

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

confirm 确认模式

return 退回模式

rabbitmq 整个消息投递的路径为:

消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。

消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback。

我们将利用这两个 callback 控制消息的可靠性投递

1:confirm 确认模式

目标

演示消息确认模式效果

生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法

实现步骤

1. 在配置文件中,开启生产者发布消息确认模式

2. 编写生产者确认回调方法

3. 在RabbitTemplate中,设置消息发布确认回调方法

4. 请求测试:

测试成功回调:

测试失败回调:

实现过程

1. 在配置文件中,开启生产者发布消息确认模式

# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms=true

2. 编写生产者确认回调方法

//发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法
@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback {
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}

3. 在RabbitTemplate中,设置消息发布确认回调方法

@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* 设置消息确认回调方法
* 设置消息回退回调方法
*/
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
}
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}

4. 请求测试:

测试成功回调: http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机

测试失败回调: http://localhost:8080/direct/sendMsg?

exchange=order_xxxxxxx&routingkey=order.A&msg=购买苹果手机

2:return 退回模式

目标

演示消息回退模式效果

消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法

实现步骤

1. 在配置文件中,开启生产者发布消息回退模式

2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

4. 在RabbitTemplate中,设置消息发布回退回调方法

5. 请求测试:

测试成功回调:

测试失败回调:

实现过程

1. 在配置文件中,开启生产者发布消息回退模式

# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns=true

2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

@Component
public class RabbitConfirm implements RabbitTemplate.ConfirmCallback
,RabbitTemplate.ReturnCallback {
//..省略
}

3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

/**
* 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方
法
* @param message 投递消息内容
* @param replyCode 返回错误状态码
* @param replyText 返回错误内容
* @param exchange 交换机名称
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
System.out.println("错误状态码:"+replyCode);
System.out.println("错误原因:"+replyText);
System.out.println("发送消息内容:"+message.toString());
System.out.println("<<<<<<<<");
}

4. 在RabbitTemplate中,设置消息发布回退回调方法

@PostConstruct

public void initRabbitTemplate(){

//设置消息确认回调方法

rabbitTemplate.setConfirmCallback(this::confirm);

//设置消息回退回调方法

rabbitTemplate.setReturnCallback(this::returnedMessage);

}

5. 请求测试失败执行returnedMessage方法: http://localhost:8080/direct/sendMsg?

exchange=order_exchange&routingkey=xxxxx&msg=购买苹果手机

3:小结

确认模式

设置publisher-confirms="true" 开启 确认模式。

实现RabbitTemplate.ConfirmCallback接口,重写confirm方法

特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。

退回模式

设置publisher-returns="true" 开启 退回模式。

实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法

特点:消息进入交换机后,只有当从exchange路由到queue失败,才去回调returnedMessage方法;

3:消费者确认(ACK)

ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;

1:消息确认的三种类型

自动确认:acknowledge="none"

手动确认:acknowledge="manual"

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从

RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。

 

自己实现消费者签收代码:自定义监听器涉及三个对象:三个对象必须注入Spring容器

1. 自定义监听器对象

2. 自定义监听器的适配器Adaptor对象

3. 监听器的容器对象

可以使用RabbitTemplate中提供的签收方式:

2:代码实现

目标

演示消费者手动确认效果

自定义消费者接收消息监听器,监听收到消息的内容,手动进行签收;当业务系统抛出异常则拒绝签收,重回队列

实现步骤

1. 搭建新的案例工程consumer-received-ack,用于演示ack消费者签收

2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现

ChannelAwareMessageListener接口

3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A

注入消息队列监听器适配器对象到ioc容器

注入消息队列监听器容器对象到ioc容器:

配置连接工厂

配置自定义监听器适配器对象

配置消息队列

开启手动签收

4. 启动消费者服务,观察控制台,消费者监听器是否与RabbitMQ建立Connection

5. 测试发送消息手动签收

6. 模拟业务逻辑出现异常情况

7. 测试异常情况,演示拒绝签收消息,消息重回队列

实现过程

1. 搭建新的案例工程consumer-received-ack,搭建过程类似于生产者确认

2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现

ChannelAwareMessageListener接口

/**
* 自定义监听器,监听到消息之后,立即执行onMessage方法
*/
@Component
public class CustomAckConsumerListener implements
ChannelAwareMessageListener {
/**
* 监听到消息之后执行的方法
* @param message 消息内容
* @param channel 消息所在频道
*/
@Override
public void onMessage(Message message, Channel channel) throws
Exception {
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
/**
* 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
}
}

3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A

注入消息队列监听器适配器对象到ioc容器

注入消息队列监听器容器对象到ioc容器:

配置连接工厂

配置自定义监听器

配置消息队列

开启手动签收

/**
* 消费者监听器配置,将监听器绑定到消息队列上
*/
@Configuration
public class ListenerConfiguration {
/**
* 注入消息监听器适配器
* @param customAckConsumerListener 自定义监听器对象
*/
@Bean
public MessageListenerAdapter
messageListenerAdapter(CustomAckConsumerListener
customAckConsumerListener){
//创建自定义监听器适配器对象
return new
MessageListenerAdapter(customAckConsumerListener);
}
/**
* 注入消息监听器容器
* @param connectionFactory 连接工厂
* @param messageListenerAdapter 自定义的消息监听器适配器
*/
@Bean
public SimpleMessageListenerContainer
simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter){
//简单的消息监听器容器对象
SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer();
//绑定消息队列
container.setQueueNames("order.A");
//设置连接工厂对象
container.setConnectionFactory(connectionFactory);
//设置消息监听器适配器
container.setMessageListener(messageListenerAdapter);
//设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自
动确认消息)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}

4. 启动消费者控制,观察控制台,消费者监听器是否与RabbitMQ建立Connection

5. 测试发送消息手动签收,请求地址http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机

6. 模拟业务逻辑出现异常情况,修改自定义监听器

@Override
public void onMessage(Message message, Channel channel) throws Exception
{
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
if (msg.contains("苹果")){
throw new RuntimeException("不允许卖苹果手机!!!");
}
/**
* 手动签收消息
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
} catch (Exception ex){
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列,true为重回队列,false为不重回
*/
channel.basicNack(deliveryTag,false,true);
System.out.println("拒绝签收,重回队列:{}"+ex);
}
}

7. 测试异常情况,演示拒绝签收消息,消息重回队列

请求地址包含苹果,抛出异常:http://localhost:8080/direct/sendMsg?exchange=order_ex

change&routingkey=order.A&msg=购买苹果手机

控制台打印结果

2:小结

如果想手动签收消息,那么需要自定义实现消息接收监听器,实现

ChannelAwareMessageListener接口

设置AcknowledgeMode模式

none:自动

auto:异常模式

manual:手动

调用channel.basicAck方法签收消息

调用channel.basicNAck方法拒签消息

这篇关于【架构师面试-消息队列-5】-MQ消息可靠性实战源码解决方案的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!