如果保证消息的可靠性?需要解决如下问题
问题1:生产者能百分之百将消息发送给消息队列!
两种意外情况:
第一,消费者发送消息给MQ失败,消息丢失;
第二,交换机路由到队列失败,路由键写错;
问题2:消费者能百分百接收到请求,且业务执行过程中还不能出错!
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
rabbitmq 整个消息投递的路径为:
消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。
消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback。
我们将利用这两个 callback 控制消息的可靠性投递
演示消息确认模式效果
生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法
1. 在配置文件中,开启生产者发布消息确认模式
2. 编写生产者确认回调方法
3. 在RabbitTemplate中,设置消息发布确认回调方法
4. 请求测试:
测试成功回调:
测试失败回调:
1. 在配置文件中,开启生产者发布消息确认模式
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调 spring.rabbitmq.publisher-confirms=true
2. 编写生产者确认回调方法
//发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法 @Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback { /** * 投递到交换机,不论投递成功还是失败都回调次方法 * @param correlationData 投递相关数据 * @param ack 是否投递到交换机 * @param cause 投递失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("消息进入交换机成功{}"); } else { System.out.println("消息进入交换机失败{} , 失败原因:" + cause); } } }
3. 在RabbitTemplate中,设置消息发布确认回调方法
@Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; /** * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法 * 设置消息确认回调方法 * 设置消息回退回调方法 */ @PostConstruct public void initRabbitTemplate(){ //设置消息确认回调方法 rabbitTemplate.setConfirmCallback(this::confirm); } /** * 投递到交换机,不论投递成功还是失败都回调次方法 * @param correlationData 投递相关数据 * @param ack 是否投递到交换机 * @param cause 投递失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("消息进入交换机成功{}"); } else { System.out.println("消息进入交换机失败{} , 失败原因:" + cause); } } }
4. 请求测试:
测试成功回调: http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
测试失败回调: http://localhost:8080/direct/sendMsg?
exchange=order_xxxxxxx&routingkey=order.A&msg=购买苹果手机
演示消息回退模式效果
消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
1. 在配置文件中,开启生产者发布消息回退模式
2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback
3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
4. 在RabbitTemplate中,设置消息发布回退回调方法
5. 请求测试:
测试成功回调:
测试失败回调:
1. 在配置文件中,开启生产者发布消息回退模式
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调 spring.rabbitmq.publisher-returns=true
2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback
@Component public class RabbitConfirm implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback { //..省略 }
3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
/** * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方 法 * @param message 投递消息内容 * @param replyCode 返回错误状态码 * @param replyText 返回错误内容 * @param exchange 交换机名称 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("交换机路由至消息队列出错:>>>>>>>"); System.out.println("交换机:"+exchange); System.out.println("路由键:"+routingKey); System.out.println("错误状态码:"+replyCode); System.out.println("错误原因:"+replyText); System.out.println("发送消息内容:"+message.toString()); System.out.println("<<<<<<<<"); }
4. 在RabbitTemplate中,设置消息发布回退回调方法
@PostConstruct public void initRabbitTemplate(){ //设置消息确认回调方法 rabbitTemplate.setConfirmCallback(this::confirm); //设置消息回退回调方法 rabbitTemplate.setReturnCallback(this::returnedMessage); }
5. 请求测试失败执行returnedMessage方法: http://localhost:8080/direct/sendMsg?
exchange=order_exchange&routingkey=xxxxx&msg=购买苹果手机
设置publisher-confirms="true" 开启 确认模式。
实现RabbitTemplate.ConfirmCallback接口,重写confirm方法
特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。
设置publisher-returns="true" 开启 退回模式。
实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
特点:消息进入交换机后,只有当从exchange路由到queue失败,才去回调returnedMessage方法;
ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从
RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。
自己实现消费者签收代码:自定义监听器涉及三个对象:三个对象必须注入Spring容器
1. 自定义监听器对象
2. 自定义监听器的适配器Adaptor对象
3. 监听器的容器对象
可以使用RabbitTemplate中提供的签收方式:
演示消费者手动确认效果
自定义消费者接收消息监听器,监听收到消息的内容,手动进行签收;当业务系统抛出异常则拒绝签收,重回队列
1. 搭建新的案例工程consumer-received-ack,用于演示ack消费者签收
2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现
ChannelAwareMessageListener接口
3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A
注入消息队列监听器适配器对象到ioc容器
注入消息队列监听器容器对象到ioc容器:
配置连接工厂
配置自定义监听器适配器对象
配置消息队列
开启手动签收
4. 启动消费者服务,观察控制台,消费者监听器是否与RabbitMQ建立Connection
5. 测试发送消息手动签收
6. 模拟业务逻辑出现异常情况
7. 测试异常情况,演示拒绝签收消息,消息重回队列
1. 搭建新的案例工程consumer-received-ack,搭建过程类似于生产者确认
2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现
ChannelAwareMessageListener接口
/** * 自定义监听器,监听到消息之后,立即执行onMessage方法 */ @Component public class CustomAckConsumerListener implements ChannelAwareMessageListener { /** * 监听到消息之后执行的方法 * @param message 消息内容 * @param channel 消息所在频道 */ @Override public void onMessage(Message message, Channel channel) throws Exception { //获取消息内容 byte[] messageBody = message.getBody(); String msg = new String(messageBody, "utf-8"); System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg); //获取投递标签 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); /** * 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式 * 参数1:消息投递标签 * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息 */ channel.basicAck(deliveryTag,false); System.out.println("手动签收完成:{}"); } }
3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A
注入消息队列监听器适配器对象到ioc容器
注入消息队列监听器容器对象到ioc容器:
配置连接工厂
配置自定义监听器
配置消息队列
开启手动签收
/** * 消费者监听器配置,将监听器绑定到消息队列上 */ @Configuration public class ListenerConfiguration { /** * 注入消息监听器适配器 * @param customAckConsumerListener 自定义监听器对象 */ @Bean public MessageListenerAdapter messageListenerAdapter(CustomAckConsumerListener customAckConsumerListener){ //创建自定义监听器适配器对象 return new MessageListenerAdapter(customAckConsumerListener); } /** * 注入消息监听器容器 * @param connectionFactory 连接工厂 * @param messageListenerAdapter 自定义的消息监听器适配器 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer( ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){ //简单的消息监听器容器对象 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); //绑定消息队列 container.setQueueNames("order.A"); //设置连接工厂对象 container.setConnectionFactory(connectionFactory); //设置消息监听器适配器 container.setMessageListener(messageListenerAdapter); //设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自 动确认消息) container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } }
4. 启动消费者控制,观察控制台,消费者监听器是否与RabbitMQ建立Connection
5. 测试发送消息手动签收,请求地址http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
6. 模拟业务逻辑出现异常情况,修改自定义监听器
@Override public void onMessage(Message message, Channel channel) throws Exception { //获取消息内容 byte[] messageBody = message.getBody(); String msg = new String(messageBody, "utf-8"); System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg); //获取投递标签 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); try { if (msg.contains("苹果")){ throw new RuntimeException("不允许卖苹果手机!!!"); } /** * 手动签收消息 * 参数1:消息投递标签 * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息 */ channel.basicAck(deliveryTag,false); System.out.println("手动签收完成:{}"); } catch (Exception ex){ /** * 手动拒绝签收 * 参数1:当前消息的投递标签 * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息 * 参数3:是否重回队列,true为重回队列,false为不重回 */ channel.basicNack(deliveryTag,false,true); System.out.println("拒绝签收,重回队列:{}"+ex); } }
7. 测试异常情况,演示拒绝签收消息,消息重回队列
请求地址包含苹果,抛出异常:http://localhost:8080/direct/sendMsg?exchange=order_ex
change&routingkey=order.A&msg=购买苹果手机
控制台打印结果
如果想手动签收消息,那么需要自定义实现消息接收监听器,实现
ChannelAwareMessageListener接口
设置AcknowledgeMode模式
none:自动
auto:异常模式
manual:手动
调用channel.basicAck方法签收消息
调用channel.basicNAck方法拒签消息