消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
消息队列是一种应用间的异步协作机制,常用于以下场景:
可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验。
按照不同的功能, 把一个服务, 拆分成多个系统。比如下订单的过程:扣减库存、生成相应单据、发红包、发短信通知等。每个不同的过程互不影响,通过 MQ 来实现任务的推送和消费。
当一个活动瞬时流量过大,对于不需要及时反馈,并且服务处理比较耗时可能会造成服务响应的缓慢甚至奔溃时,可以将对应的需求先放入队列中,服务顺序去处理对应的请求,后续通知用户结果即可。
异步,解耦,消峰,MQ的三大主要应用场景。
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Connection
网络连接,比如一个 TCP 连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Virtual Host
虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体。
Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。
创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
交换机具有以下属性:
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key
,当消息被发送的时候,需要指定一个 binding_key
,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key
也是支持应用到多个队列中的。
这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
扇形交换机是最基本的交换机类型,它所能做的事情非常简单 ——— 广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要 “思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
主题交换机的 routing_key
需要有一定的规则,交换机和队列的 binding_key
需要采用 .#...... 的格式,每个部分用. 分开,其中:
*
表示一个单词
#
表示任意数量(零个或多个)单词。
假设有一条消息的 routing_key 为 fast.rabbit.white, 那么带有这样 binding_key 的几个队列都会接收这条消息:
fast..
..white
fast.#
……
当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息。
首部交换机是忽略 routing_key
的一种路由方式。路由器和交换机路由的规则是通过 Headers
信息来交换的,这个有点像 HTTP
的 Headers
。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash
的数据结构,消息发送的时候,会携带一组 hash 数据结构的信息,当 Hash 的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash
结构中要求携带一个键 “x-match”,这个键的 Value
可以是 any
或者 all
,这代表消息携带的 Hash
是需要全部匹配 (all),还是仅匹配一个键 (any) 就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串 (string)。
匹配规则 x-match 有下列两种类型:
Headers Exchange 不同于上面三种 Exchange,它是根据 Message 的一些头部信息来分发过滤 Message,忽略routing key 的属性,如果 Header 信息和 message 消息的头信息相匹配,那么这条消息就匹配上了。
pom.xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency>
Producer
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * rabbitmq 生产者 */ public class Producer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("user"); factory.setPassword("user"); //也可以使用url来配置 //factory.setUri("amqp://guest:guest@localhost:5672"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 发送消息 String exchangeName = "exchangeName"; String routingKey1 = "cat"; String routingKey2 = "dong"; for (int i = 0; i < 100; i++) { String msg = "this is msg"; if (i % 2 == 0) { msg = "cat:" + msg; channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); } else { msg = "dong:" + msg; channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); } System.out.println("发送了消息:" + msg); Thread.sleep(1000); } //5. 关闭连接 channel.close(); connection.close(); } }
Consumer
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * rabbitmq 消费者 */ public class Consumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("user"); factory.setPassword("user"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 创建交换机 String exchangeName = "exchangeName"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, false, true, null); // 5.创建队列 String queueName1 = "queueCat"; String queueName2 = "queueDong"; channel.queueDeclare(queueName1, false, false, true, null); channel.queueDeclare(queueName2, false, false, true, null); //5. 通过不同的key 绑定队列 String routingKey1 = "cat"; String routingKey2 = "dong"; channel.queueBind(queueName1, exchangeName, routingKey1); channel.queueBind(queueName2, exchangeName, routingKey2); // 消费者处理 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String format = String.format("[%s]-收到消息:【%s】", envelope.getRoutingKey(), new String(body, StandardCharsets.UTF_8)); System.out.println(format); //消息消费确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 6.绑定消费者进行消费 while (true) { channel.basicConsume(queueName1, false, defaultConsumer); channel.basicConsume(queueName2, false, defaultConsumer); Thread.sleep(1000); } } }
先启动 消费者 创建交换机、队列,进行指定队列的监听,然后启动生产者进行生产消息。
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${rabbitmq.version}</version> </dependency>
application.yml
# rabbitmq spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: guest password: guest
SpringAMQP 项目对 RabbitMQ 做了很好的封装,可以很方便的手动声明队列,交换器,绑定。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * 声明一个直连交换机 * 声明两个队列 * 将两个队列通过 routingKey 绑定到交换机 */ @Component public class DirectDeclare { @Bean(RabbitConfig.Direct.EXCHANGE) public DirectExchange directExchange() { return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build(); } @Bean(RabbitConfig.Direct.QUEUE_ONE) public Queue directQueueOne() { return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE).build(); } @Bean(RabbitConfig.Direct.QUEUE_TWO) public Queue directQueueTwo() { return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO).build(); } @Bean public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY); } @Bean public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY); } }
RabbitTemplate 是 Spring 用于简化同步 RabbitMQ 访问(发送和接收消息)的 Helper 类。
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitService { @Autowired RabbitTemplate rabbitTemplate; public void send(RabbitSendVo sendVo) { String exchange = sendVo.getExchange(); String routingKey = sendVo.getRoutingKey(); String message = sendVo.getMessage(); rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
消息的监听使用 @RabbitListener 来完成,使用 queues 指定需要监听的队列。
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class DirectListener { @RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE}) public void listenerOne(Message message) throws IOException { log.info("【directQueueOne】收到消息:{}", new String(message.getBody())); } }
同时,@RabbitListener 也可以作用于 class 上,然后使用 @RabbitHandler 配合使用。
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@Slf4j @RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE}) public class DirectListener { @RabbitHandler public void listenerOne(Message message) throws IOException { log.info("【directQueueOne】收到消息:{}", new String(message.getBody())); } }
使用 @RabbitListener 可以搭配 @Exchange、@Queue、@QueueBinding 等注解简化使用的过程,exchange、queue 的一些属性也可以在这里配置。
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; /** * 通过注解实现一个主题交换机 */ @Slf4j @Component public class TopicListener { /** * 主题交换机 * 处理 Cat 相关消息 * 通过注解方式创建,命名的都是持久的 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = RabbitConfig.Topic.EXCHANGE, type = ExchangeTypes.TOPIC), key = RabbitConfig.Topic.QUEUE_CAT_KEY, value = @Queue(RabbitConfig.Topic.QUEUE_CAT) )) public void topicCat(Channel channel, Message message) throws IOException { log.info("【topic-cat】收到消息:{}", new String(message.getBody()); } }
涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte [] 数组的解析
RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
SimpleMessageConverter 对于要发送的消息体 body 为 byte [] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含 class 类名,类相应方法等信息。因此性能较差
当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
public interface MessageConverter { /** * Convert a Java object to a Message. * @param object the object to convert * @param messageProperties The message properties. * @return the Message * @throws MessageConversionException in case of conversion failure */ Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; /** * Convert a Java object to a Message. * The default implementation calls {@link #toMessage(Object, MessageProperties)}. * @param object the object to convert * @param messageProperties The message properties. * @param genericType the type to use to populate type headers. * @return the Message * @throws MessageConversionException in case of conversion failure * @since 2.1 */ default Message toMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType) throws MessageConversionException { return toMessage(object, messageProperties); } /** * Convert from a Message to a Java object. * @param message the message to convert * @return the converted Java object * @throws MessageConversionException in case of conversion failure */ Object fromMessage(Message message) throws MessageConversionException; }
SpringAMQP 默认使用 SimpleMessageConverter 来实现 消息的转换,是一个 可以处理字符串、可序列化实例或字节数组的MessageConverter实现。
SimpleMessageConverter.createMessage:
// SimpleMessageConverter.createMessage protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[]) object; messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); } else if (object instanceof String) { try { bytes = ((String) object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert to Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert to serialized Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT); } if (bytes != null) { messageProperties.setContentLength(bytes.length); return new Message(bytes, messageProperties); } throw new IllegalArgumentException(getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); }
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
普通消息实例:
contentType=text/plain
(Body:'测试消息' MessageProperties [headers={spring_listener_return_correlation=d4e4e756-e85e-4ec3-accb-e6b08a7c2f80}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=2, consumerTag=amq.ctag-7eT4L6V5TKzC4GMDE_hOcg, consumerQueue=directQueueOne])
Java Bean 消息实例:
contentType=application/x-java-serialized-object
(Body:'[B@3823efd5(byte[151])' MessageProperties [headers={spring_listener_return_correlation=0dc30aa6-b8cd-4f2f-9129-0fc8b729e8ea}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=1, consumerTag=amq.ctag-U0u4SdugbrO9-SbQ3bD6gg, consumerQueue=directQueueOne])
消息处理方法参数是由 MessageConverter
转化,若使用自定义 MessageConverter 则需要在
RabbitListenerContainerFactory实例中去设置(默认 Spring 使用的实现是
SimpleRabbitListenerContainerFactory`)
使用 SpringAMQP
提供的 Jackson2JsonMessageConverter
实现 消息的 Json 转换。
publisher 和 consumers 使用相同的 MessageConverter。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * 消息序列化方式 */ @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
直接注入是最方便的方法了,版本 spring-boot-starter-amqp 2.4.4
.
当然也可以使用另一种方式,在生产者和消费者两边都设置同样的 MessageConverter, 同时可以配置一些其他业务相关的属性。
@Configuration public class RabbitConfig { /** * consumers 监听容器 * @param listenerContainerFactoryConfigurer listener configurer * @param connectionFactory 连接工厂 * @return RabbitListenerContainerFactory */ @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, CachingConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory(); //使用配置 listenerContainerFactoryConfigurer.configure(listenerContainerFactory, connectionFactory); //配置消息反序列方式 listenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); return listenerContainerFactory; } /** * publisher template * @return RabbitTemplate */ @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); //使用配置 templateConfigurer.configure(rabbitTemplate, connectionFactory); //消息序列化方式 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //避免阻塞,将 publish 和 consumer 的 connection 分开 rabbitTemplate.setUsePublisherConnection(true); return rabbitTemplate; } }
使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
@RabbitListener(queues = "debug") public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) { System.out.println("body:"+body); System.out.println("Headers:"+headers); }
也可以获取单个 Header 属性
@RabbitListener(queues = "debug") public void processMessage1(@Payload String body, @Header String token) { System.out.println("body:"+body); System.out.println("token:"+token); }
RabbitMQ 的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
通过实现 ConfirmCallBack 接口,消息发送到交换器 Exchange 后触发回调。
使用该功能需要开启确认,spring-boot 中配置如下:
spring.rabbitmq.publisher-confirm-type= correlated
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; /** * mq 消息投递到exchange 确认回调 * 处理失败消息 */ @Slf4j @Component("rabbitConfirmCallback") public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("消息投递到exchange失败,cause={}", cause); } } }
correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
ack:消息投递到 broker 的状态,true 表示成功。
cause:表示投递失败的原因。
消息投递到exchange失败,cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct1' in vhost '/', class-id=60, method-id=40)
不管有没有成功发送到 exchange
,都会走 ConfirmCallback
的回调,参数 boolean ack 表示是否成功,case 是失败的原因,例如:NOT_FOUND - no exchange 'direct1' in vhost '/'
通过实现 ReturnCallback
接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的 routingKey
找不到队列时会触发)
使用该功能需要开启确认,spring-boot 中配置如下:
spring.rabbitmq.template.mandatory: true
/** * 消息没有匹配到合适的队列时,退回回调处理 */ @Slf4j @Component("rabbitReturnsCallback") public class ReturnsCallback implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returned) { String exchange = returned.getExchange(); String routingKey = returned.getRoutingKey(); Message message = returned.getMessage(); log.error("消息丢失[exchange={},routingKey={},message={}]", exchange, routingKey, message); } }
[exchange=direct,routingKey=one1,message=(Body:'[B@669a964c(byte[151])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * publisher template * @param connectionFactory 连接工厂 * @param templateConfigurer configurer * @param confirmCallback confimCallback * @param returnsCallback returnsCallback * @return RabbitTemplate */ @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer, RabbitTemplate.ConfirmCallback confirmCallback, RabbitTemplate.ReturnsCallback returnsCallback) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); //使用配置 templateConfigurer.configure(rabbitTemplate, connectionFactory); //消息序列化方式 //rabbitTemplate.setMessageConverter(jsonMessageConverter()); //避免阻塞,将 publish 和 consumer 的 connection 分开 rabbitTemplate.setUsePublisherConnection(true); //confirm 回调,开启publisher-confirm-type:correlated //connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); rabbitTemplate.setConfirmCallback(confirmCallback); //return 回调,开启template.mandatory=true //消息根据 routingKey 无法找到合适的 queue 时,将消息退回,而不是丢失。 //rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(returnsCallback); return rabbitTemplate; } /** * 消息序列化方式 */ @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
spring-boot 中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag: 该消息的 index
multiple:是否批量. true:将一次性 ack 所有小于 deliveryTag
的消息。
假设我先发送三条消息 deliveryTag 分别是 5、6、7,可它们都没有被确认,当我发第四条消息此时 deliveryTag 为 8,multiple 设置为 true,会将 5、6、7、8 的消息全部进行确认。
消费者成功处理后,调用 channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
方法对消息进行确认。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE}) public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException { log.info("【directQueueOne】收到消息:{}", mail); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
确认失败,可以使用 requeue参数
选择是否将消息重新放回队列。
//第一种方式 void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag: 该消息的 index,递增的。
multiple:是否批量. true:将一次性拒绝所有小于 deliveryTag
的消息。
requeue:被拒绝的是否重新入队列。
//第二种方式 void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag: 该消息的 index。
requeue:被拒绝的是否重新入队列。
如果没有其他接收者监控这个
queue
的话,要注意一直无限循环发送的危险。
当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。
建议为队列设置私信队列,当出现异常时,由死信队列监听后再进行处理。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO}) public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception { try { log.info("【directQueueTwo】收到消息:{}", mail); int i = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } }
basicNack、basicReject区别:
channel.basicNack
与 channel.basicReject
的区别在于 basicNack
可以批量拒绝多条消息(一次性拒绝所有小于 deliveryTag 的消息),而 basicReject 一次只能拒绝一条消息。
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ
的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
死信
,在官网中对应的单词为 “Dead Letter”。
死信
是 RabbitMQ
中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
channel.basicNack
或 channel.basicReject
,并且此时 requeue
属性被设置为 false
。那么该消息将成为 “死信”。
TTL - 通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。
设置了 TTL 后,在设置的时间内,没有消费者消费这条消息,那么判定这条消息为过期。
“死信” 消息会被 RabbitMQ 进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
大概可以分为以下步骤:
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由 key。
有了死信交换机和路由 key 后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由 key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
声明 exchange、queue,将 exchange、queue 绑定,然后在其他队列创建时,指定扩展参数设定其死信消息都发送到该队列。
import com.yohaps.mq.rabbittemplate.config.RabbitConfig; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * 声明死信队列 * 就是普通的交换机和队列,接收其他队列的死信消息 */ @Component public class DeadLetterDeclare { @Bean(RabbitConfig.DeadLetter.EXCHANGE) public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange(RabbitConfig.DeadLetter.EXCHANGE).durable(true).build(); } @Bean(RabbitConfig.DeadLetter.QUEUE_ONE) public Queue deadLetterQueueOne() { return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_ONE).build(); } @Bean(RabbitConfig.DeadLetter.QUEUE_TWO) public Queue deadLetterQueueTwo() { return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_TWO).build(); } @Bean public Binding bindingDeadLetterQueueOne(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.DeadLetter.QUEUE_ONE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_ONE_KEY); } @Bean public Binding bindingDeadLetterQueueTwo(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.DeadLetter.QUEUE_TWO) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_TWO_KEY); } }
关键:
在需要配置死信队列的队列创建时,指定死信交换机和key.
队列属性:
import com.yohaps.mq.rabbittemplate.config.RabbitConfig; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * 声明一个直连交换机 * 声明两个队列 * 将两个队列通过 routingKey 绑定到交换机 */ @Component public class DirectDeclare { @Bean(RabbitConfig.Direct.EXCHANGE) public DirectExchange directExchange() { return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build(); } @Bean(RabbitConfig.Direct.QUEUE_ONE) public Queue directQueueOne() { //声明当前队列绑定的死信交换机与指定的key return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE) .withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE) .withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_ONE_KEY) .build(); } @Bean(RabbitConfig.Direct.QUEUE_TWO) public Queue directQueueTwo() { //声明当前队列绑定的死信交换机与指定的key return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO) .withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE) .withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_TWO_KEY) .withArgument("x-message-ttl",20000) .build(); } @Bean public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY); } @Bean public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange, @Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY); } }
死信队列的监听,也是跟普通队列没区别
@Slf4j @Component public class DeadLetterListener { @RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_ONE}) public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException { log.info("【DeadLetterListener-One】收到消息:{}", mail); String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey(); System.out.println(receivedRoutingKey); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_TWO}) public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException { log.info("【DeadLetterListener-Two】收到消息:{}", mail); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
测试:在业务处理中,发生异常,捕捉到异常后,设置 queues=false
,消息会进入到死信队列。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO}) public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception { try { log.info("【directQueueTwo】收到消息:{}", mail); int i = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } }
利用 死信队列 + TTL 的特性,可以实现延迟消息
比如:登录后,发送一条消息(推荐商品,发邮件通知等),设置 TTL 而没有消费者进行消费,过期后就会进入死信队列,在死信队列处理业务。
在消费端在处理消息过程中发生异常,进行重新尝试。
开启重试机制:
# rabbitmq spring: rabbitmq: listener: simple: acknowledge-mode: auto # 自动ack retry: enabled: true max-attempts: 5 # 重试次数 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间 multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
消费端代码模拟发生异常:
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO}) public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception { log.info("【directQueueTwo】收到消息:{}", mail); int i = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
此时启动程序,发送消息后可以看到控制台输出内容如下:
可以看到重试次数是 5 次(包含自身消费的一次),重试时间依次是 2s,4s,8s,10s(上一次间隔时间 * 间隔时间乘子),最后一次重试时间理论上是 16s,但是由于设置了最大间隔时间是 10s,因此最后一次间隔时间只能是 10s,和配置相符合。
注意:
重试并不是 RabbitMQ 重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟 mq 没有任何关系;
因此上述消费者代码不能添加 try {} catch (){},一旦捕获了异常,在自动 ack 模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的;
重试次数耗尽后,依然没有正确消费的处理查看 《MessageReCoverer》
在消费端处理业务的过程中发生异常,会使用 MessageRecoverer
处理。
public interface MessageRecoverer { /** * 已消费但所有重试尝试失败的消息的回调。 * Callback for message that was consumed but failed all retry attempts. * * @param message the message to recover * @param cause the cause of the error */ void recover(Message message, Throwable cause); }
MessageRecoverer 有以下几个实现类:
默认使用 RejectAndDontRequeueRecoverer 配置。
/** * 消费端处理消息,发生异常后,将消息发送到新的队列 */ @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "errorExchange","errorRoutingKey"); }
/** * 消费端处理消息,发生异常后,将消息重新放入队列 * 周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。 */ @Bean public MessageRecoverer messageRecoverer(){ return new ImmediateRequeueMessageRecoverer(); }
在自动 ACK 模式下,消费端发生异常并且在重试次数耗尽后,默认情况 下使用 RejectAndDontRequeueRecoverer
处理(拒绝消息并且不会将消息重新发回队列),也就是 requeue=false
,如果配置了 死信队列 会将消息发送到 死信队列。
一般来说重试实在 自动 ACK
模式下进行的,如果是 手动 ACK
,重试也是正常进行的,但是尝试将消息使用 RepublishMessageRecoverer
重新发布到新的队列会失败,配置了死信队列也不会进入,而在自动 ACK 模式下,三种实习方式都是正常的。
在手动ACK模式下,尝试在重试次数耗尽后,将其放入新的队列处理,出现异常。于是,手动实现重试机制:
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO}) public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException, Int try { log.info("【directQueueTwo】收到消息:{}", mail); int i = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //发生异常后,拒绝消息 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); System.out.println("catch"); /** * 以下手动实现重试,耗尽次数后再进行处理 * 发生异常后,先 ACK ,然后在消息的 header 中记录重试次数,重新将消息发回队列 */ Map<String, Object> headers = message.getMessageProperties().getHeaders(); //重试次数 Integer retryCount; String mapKey = "retry-count"; if (!headers.containsKey(mapKey)) { retryCount = 0; } else { retryCount = (Integer) headers.get(mapKey); } if (++retryCount < 3) { /** * 当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。 * 这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行 * 而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。 * 因此我们采取的解决方案是,将消息进行应答。 * 这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案 */ log.info("已经重试 " + retryCount + " 次"); headers.put("retry-count", retryCount); //1.应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //2.重新发送到MQ中 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType(" channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), basicProperties, message.getBody()); } else { log.info("现在重试次数为:" + retryCount); /** * 不重要的操作放入 死信队列 * 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预 */ //休眠2s 延迟写入队列,触发转入死信队列 Thread.sleep(2000); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
目前实现的最多的就是 At least one
语义,也就是保证消费者至少接收到一次消息,所以会存在重复消息的情况,需要我们业务代码进行幂等的处理.
可以给发送的消息一个唯一 ID,将 ID 存储在分布式缓存(或数据库)中,这样在每次接收到消息之后,进行比较,看是否是重复的消息。
@Service public class RabbitService { @Autowired RabbitTemplate rabbitTemplate; public void send(RabbitSendVo sendVo) { String exchange = sendVo.getExchange(); String routingKey = sendVo.getRoutingKey(); Mail mail = sendVo.getMail(); //默认使用 UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(); //correlationData.setId("1"); rabbitTemplate.convertAndSend(exchange, routingKey, mail,correlationData); } }
需要保证缓存一定是高可用的
一个 queue
,有多个 consumer
去消费,这样就会造成顺序的错误,consumer
从 MQ 里面读取数据是有序的,但是每个 consumer
的执行时间是不固定的,无法保证先读到消息的 consumer
一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
这时,可以采用单消费者模式,一个队列只绑定一个消费者。
在订单高峰期,rabbitmq
上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!
在 SpringBoot 中:
spring.rabbitmq.listener.prefetch=1
// 表示不限制消息大小, 一次只处理一条消息, 限制只是当前消费者有效 channel.basicQos(0, 1, false);
默认情况下,rabbitmq
中的 queue
的最大长度和总字节数不受限制的(仅受全局内存,磁盘阈值的影响)。
当 queue
达到限制的阈值时,如果此 queue
配置了 dead-lettered
,则 queue
最前面的消息将被转发到 dead-lettered
,如果没有配置则最前面的消息将会被直接丢弃。从而达到通过转发或丢弃 queue
最前面的消息,来为新消息腾出空间的目的。
可以通过参数来配置 queue 的最大长度和最大总字节数:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-length", 10); // 设置queue的最大长度10 args.put("x-max-length-bytes", 1024); // 设置最大总字节数1KB channel.queueDeclare("myqueue", false, false, false, args);