延迟队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列使用场景
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:
HashMap<String, Object> args = new HashMap<>(3); // 设置TTL args.put("x-message-ttl", 10000);
rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalCKey", TtlQueueConfig.NORMAL_C_QUEUE + ":" + message, msg -> { // 发送消息时设置消息过期时间 msg.getMessageProperties().setExpiration(ttlTime); return msg; });
两者的区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
情景
创建两个普通队列 NORMAL_A_QUEUE和NORMAL_B_QUEUE,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个普通交换机 NORMAL_EXCHANGE 和死信交换机DEAD_EXCHANGE,它们的类型都是direct,创建一个死信队列 DEAD_QUEUE,它们的绑定关系如下:
NORMAL_A_QUEUE队列实现消息延迟10s进入死信队列被消费,NORMAL_B_QUEUE队列实现消息延迟40s进入死信队列被消费
这样一个简单的延迟队列就完成了,不过,如果这样使用的话,每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,这样队列随着需求不断增加,代码实用性并不高
优化
在上面的场景下增加一个普通队列NORMAL_C_QUEUE,该队列不设置TTL时间,而是由生产者发送消息时设置消息TTL,这样在发送消息时指定TTL就不用使队列随着需求的增加而增加了
但是这样还存在一个问题:当发送两条消息到NORMAL_C_QUEUE,第一条消息设置TTL为20s,第二条消息设置TTL为2s,会出现第二条消息过了22s才被消费的情况,造成这种情况的原因是,在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这样显然不符合我们的要求
再优化:基于插件实现延迟队列
rabbitmq_delayed_message_exchange插件可以解决上述问题,实现RabbitMQ检查每个消息是否过期,如果过期则丢入死信队列被消费
插件下载地址:https://www.rabbitmq.com/community-plugins.html
官网下载插件后放置到 RabbitMQ 的安装目录下的 plugins 文件夹里面,笔者下载插件后上传到linux,然后通过以下命令将插件拷贝到指定路径:
cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.11/plugins
安装插件之前停止rabbitmq服务
/sbin/service rabbitmq-server stop
进入 RabbitMQ 的安装目录下的 plugins 文件夹,运行安装命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启动rabbitmq服务
/sbin/service rabbitmq-server start
访问rabbitmq,添加交换机时多出x-delayed-message选项说明安装成功
导包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
配置文件
spring: rabbitmq: host: 192.168.84.131 port: 5672 username: admin password: 123
延迟队列配置类
package com.yl.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * 延迟队列配置类 * * @author Y-wee */ @Configuration public class TtlQueueConfig { /** * 普通交换机名称 */ public static final String NORMAL_EXCHANGE = "NORMALEXCHANGE"; /** * 死信交换机名称 */ public static final String DEAD_EXCHANGE = "DEADEXCHANGE"; /** * 普通队列A名称 */ public static final String NORMAL_A_QUEUE = "NORMALAQUEUE"; /** * 普通队列B名称 */ public static final String NORMAL_B_QUEUE = "NORMALBQUEUE"; /** * 普通队列C名称 */ public static final String NORMAL_C_QUEUE="NORMALCQUEUE"; /** * 死信队列名称 */ public static final String DEAD_QUEUE = "DEADQUEUE"; /** * 声明普通交换机 * * @return */ @Bean public DirectExchange normalExchange() { return new DirectExchange(NORMAL_EXCHANGE); } /** * 声明死信交换机 * * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(DEAD_EXCHANGE); } /** * 声明普通队列A * * @return */ @Bean public Queue normalAQueue() { HashMap<String, Object> args = new HashMap<>(3); // 设置死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信交换机routingKey args.put("x-dead-letter-routing-key", "deadKey"); // 设置TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_A_QUEUE).withArguments(args).build(); } /** * 声明普通队列B * * @return */ @Bean public Queue normalBQueue() { HashMap<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", DEAD_EXCHANGE); args.put("x-dead-letter-routing-key", "deadKey"); args.put("x-message-ttl", 40000); return QueueBuilder.durable(NORMAL_B_QUEUE).withArguments(args).build(); } /** * 声明普通队列C * * @return */ @Bean public Queue normalCQueue() { HashMap<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", DEAD_EXCHANGE); args.put("x-dead-letter-routing-key", "deadKey"); return QueueBuilder.durable(NORMAL_C_QUEUE).withArguments(args).build(); } /** * 声明死信队列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 绑定普通队列A与普通交换机 * * @param normalExchange * @param normalAQueue * @return */ @Bean public Binding aQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalAQueue") Queue normalAQueue) { return BindingBuilder.bind(normalAQueue).to(normalExchange).with("normalAKey"); } /** * 绑定普通队列B与普通交换机 * * @param normalExchange * @param normalBQueue * @return */ @Bean public Binding bQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalBQueue") Queue normalBQueue) { return BindingBuilder.bind(normalBQueue).to(normalExchange).with("normalBKey"); } /** * 绑定普通队列C与普通交换机 * * @param normalExchange * @param normalCQueue * @return */ @Bean public Binding cQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalCQueue") Queue normalCQueue) { return BindingBuilder.bind(normalCQueue).to(normalExchange).with("normalCKey"); } /** * 绑定死信队列与死信交换机 * * @param deadExchange * @param deadQueue * @return */ @Bean public Binding deadQueueBinding(@Qualifier("deadExchange") DirectExchange deadExchange, @Qualifier("deadQueue") Queue deadQueue) { return BindingBuilder.bind(deadQueue).to(deadExchange).with("deadKey"); } }
基于插件实现延迟队列配置类
package com.yl.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * 基于插件的延迟队列配置类 * * @author Y-wee */ @Configuration public class DelayedQueueConfig { /** * 交换机名称 */ public static final String DELAYED_EXCHANGE = "DELAYEDEXCHANGE"; /** * 队列名称 */ public static final String DELAYED_QUEUE = "DELAYEDQUEUE"; /** * routingKey */ public static final String DELAYED_ROUTINGKEY = "DELAYEDROUTINGKEY"; /** * 声明自定义交换机 * * @return */ @Bean public CustomExchange delayedExchange() { HashMap<String, Object> args = new HashMap<>(1); // 设置延迟类型为直接类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } /** * 声明延迟队列 * * @return */ @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } /** * 将延迟交换机和延迟队列进行绑定 * * @param delayedExchange * @param delayedQueue * @return */ @Bean public Binding delayedBinding(@Qualifier("delayedExchange") CustomExchange delayedExchange, @Qualifier("delayedQueue") Queue delayedQueue) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs(); } }
生产者
package com.yl.controller; import com.yl.config.DelayedQueueConfig; import com.yl.config.TtlQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 消息生产者 * * @author Y-wee */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * * @param message */ @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { log.info("{}开始发送消息{}到两个普通队列", new Date().toString(), message); rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalAKey", TtlQueueConfig.NORMAL_A_QUEUE + ":" + message); rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalBKey", TtlQueueConfig.NORMAL_A_QUEUE + ":" + message); } /** * 发送消息,发送时设置消息过期时间 * * @param message */ @GetMapping("/sendExpiredMessage/{message}/{ttlTime}") public void sendExpiredMessage(@PathVariable String message, @PathVariable String ttlTime) { log.info("{}开始发送消息{}到普通C队列", new Date().toString(), message); rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalCKey", TtlQueueConfig.NORMAL_C_QUEUE + ":" + message, msg -> { // 发送消息时设置消息过期时间 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } /** * 基于插件发送延迟消息 * * @param message */ @GetMapping("/sendDelayedMessage/{message}/{ttlTime}") public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer ttlTime) { log.info("{}开始发送消息{}到延迟队列", new Date().toString(), message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTINGKEY, DelayedQueueConfig.DELAYED_QUEUE + ":" + message, msg -> { // 发送消息时设置消息过期时间 msg.getMessageProperties().setDelay(ttlTime); return msg; }); } }
消费者
package com.yl.consumer; import com.rabbitmq.client.Channel; import com.yl.config.DelayedQueueConfig; import com.yl.config.TtlQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 延迟队列消费者 * * @Y-wee */ @Slf4j @Component public class TtlConsumer { /** * 接收延迟队列消息 * * @param message * @param channel */ @RabbitListener(queues = TtlQueueConfig.DEAD_QUEUE) public void receiveMessage(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("{}收到消息:{}", new Date().toString(), msg); } /** * 接收基于插件延迟队列消息 * * @param message */ @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE) public void receiveDelayedMessage(Message message) { String msg = new String(message.getBody()); log.info("{}收到消息:{}", new Date().toString(), msg); } }