消息队列MQ

rabbitmq延迟队列

本文主要是介绍rabbitmq延迟队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

rabbitmq延迟队列

延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

延迟队列使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下

1、RabbitMQ中的TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:

  • 队列设置TTL
HashMap<String, Object> args = new HashMap<>(3);
// 设置TTL
args.put("x-message-ttl", 10000);
  • 消息设置TTL
rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalCKey", TtlQueueConfig.NORMAL_C_QUEUE + ":" + message, msg -> {
            // 发送消息时设置消息过期时间
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });

两者的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间

另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

2、延迟队列应用

情景

创建两个普通队列 NORMAL_A_QUEUE和NORMAL_B_QUEUE,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个普通交换机 NORMAL_EXCHANGE 和死信交换机DEAD_EXCHANGE,它们的类型都是direct,创建一个死信队列 DEAD_QUEUE,它们的绑定关系如下:

NORMAL_A_QUEUE队列实现消息延迟10s进入死信队列被消费,NORMAL_B_QUEUE队列实现消息延迟40s进入死信队列被消费

这样一个简单的延迟队列就完成了,不过,如果这样使用的话,每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,这样队列随着需求不断增加,代码实用性并不高

优化

在上面的场景下增加一个普通队列NORMAL_C_QUEUE,该队列不设置TTL时间,而是由生产者发送消息时设置消息TTL,这样在发送消息时指定TTL就不用使队列随着需求的增加而增加了

但是这样还存在一个问题:当发送两条消息到NORMAL_C_QUEUE,第一条消息设置TTL为20s,第二条消息设置TTL为2s,会出现第二条消息过了22s才被消费的情况,造成这种情况的原因是,在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这样显然不符合我们的要求

再优化:基于插件实现延迟队列

rabbitmq_delayed_message_exchange插件可以解决上述问题,实现RabbitMQ检查每个消息是否过期,如果过期则丢入死信队列被消费

插件下载地址:https://www.rabbitmq.com/community-plugins.html

官网下载插件后放置到 RabbitMQ 的安装目录下的 plugins 文件夹里面,笔者下载插件后上传到linux,然后通过以下命令将插件拷贝到指定路径:

cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.11/plugins

安装插件之前停止rabbitmq服务

/sbin/service rabbitmq-server stop

进入 RabbitMQ 的安装目录下的 plugins 文件夹,运行安装命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动rabbitmq服务

/sbin/service rabbitmq-server start

访问rabbitmq,添加交换机时多出x-delayed-message选项说明安装成功

3、整合springboot实现延迟队列应用

导包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

配置文件

spring:
  rabbitmq:
    host: 192.168.84.131
    port: 5672
    username: admin
    password: 123

延迟队列配置类

package com.yl.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 延迟队列配置类
 *
 * @author Y-wee
 */
@Configuration
public class TtlQueueConfig {
    /**
     * 普通交换机名称
     */
    public static final String NORMAL_EXCHANGE = "NORMALEXCHANGE";
    /**
     * 死信交换机名称
     */
    public static final String DEAD_EXCHANGE = "DEADEXCHANGE";
    /**
     * 普通队列A名称
     */
    public static final String NORMAL_A_QUEUE = "NORMALAQUEUE";
    /**
     * 普通队列B名称
     */
    public static final String NORMAL_B_QUEUE = "NORMALBQUEUE";
    /**
     * 普通队列C名称
     */
    public static final String NORMAL_C_QUEUE="NORMALCQUEUE";
    /**
     * 死信队列名称
     */
    public static final String DEAD_QUEUE = "DEADQUEUE";


    /**
     * 声明普通交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    /**
     * 声明死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    /**
     * 声明普通队列A
     *
     * @return
     */
    @Bean
    public Queue normalAQueue() {
        HashMap<String, Object> args = new HashMap<>(3);
        // 设置死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信交换机routingKey
        args.put("x-dead-letter-routing-key", "deadKey");
        // 设置TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NORMAL_A_QUEUE).withArguments(args).build();
    }

    /**
     * 声明普通队列B
     *
     * @return
     */
    @Bean
    public Queue normalBQueue() {
        HashMap<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key", "deadKey");
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(NORMAL_B_QUEUE).withArguments(args).build();
    }

    /**
     * 声明普通队列C
     *
     * @return
     */
    @Bean
    public Queue normalCQueue() {
        HashMap<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key", "deadKey");
        return QueueBuilder.durable(NORMAL_C_QUEUE).withArguments(args).build();
    }


    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 绑定普通队列A与普通交换机
     *
     * @param normalExchange
     * @param normalAQueue
     * @return
     */
    @Bean
    public Binding aQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange,
                                 @Qualifier("normalAQueue") Queue normalAQueue) {
        return BindingBuilder.bind(normalAQueue).to(normalExchange).with("normalAKey");
    }

    /**
     * 绑定普通队列B与普通交换机
     *
     * @param normalExchange
     * @param normalBQueue
     * @return
     */
    @Bean
    public Binding bQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange,
                                 @Qualifier("normalBQueue") Queue normalBQueue) {
        return BindingBuilder.bind(normalBQueue).to(normalExchange).with("normalBKey");
    }

    /**
     * 绑定普通队列C与普通交换机
     *
     * @param normalExchange
     * @param normalCQueue
     * @return
     */
    @Bean
    public Binding cQueueBinding(@Qualifier("normalExchange") DirectExchange normalExchange,
                                 @Qualifier("normalCQueue") Queue normalCQueue) {
        return BindingBuilder.bind(normalCQueue).to(normalExchange).with("normalCKey");
    }

    /**
     * 绑定死信队列与死信交换机
     *
     * @param deadExchange
     * @param deadQueue
     * @return
     */
    @Bean
    public Binding deadQueueBinding(@Qualifier("deadExchange") DirectExchange deadExchange,
                                 @Qualifier("deadQueue") Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("deadKey");
    }

}

基于插件实现延迟队列配置类

package com.yl.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 基于插件的延迟队列配置类
 *
 * @author Y-wee
 */
@Configuration
public class DelayedQueueConfig {

    /**
     * 交换机名称
     */
    public static final String DELAYED_EXCHANGE = "DELAYEDEXCHANGE";
    /**
     * 队列名称
     */
    public static final String DELAYED_QUEUE = "DELAYEDQUEUE";
    /**
     * routingKey
     */
    public static final String DELAYED_ROUTINGKEY = "DELAYEDROUTINGKEY";

    /**
     * 声明自定义交换机
     *
     * @return
     */
    @Bean
    public CustomExchange delayedExchange() {
        HashMap<String, Object> args = new HashMap<>(1);
        // 设置延迟类型为直接类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 声明延迟队列
     *
     * @return
     */
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE);
    }

    /**
     * 将延迟交换机和延迟队列进行绑定
     *
     * @param delayedExchange
     * @param delayedQueue
     * @return
     */
    @Bean
    public Binding delayedBinding(@Qualifier("delayedExchange") CustomExchange delayedExchange,
                                  @Qualifier("delayedQueue") Queue delayedQueue) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
    }
}

生产者

package com.yl.controller;

import com.yl.config.DelayedQueueConfig;
import com.yl.config.TtlQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 消息生产者
 *
 * @author Y-wee
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param message
     */
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        log.info("{}开始发送消息{}到两个普通队列", new Date().toString(), message);

        rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalAKey", TtlQueueConfig.NORMAL_A_QUEUE + ":" + message);
        rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalBKey", TtlQueueConfig.NORMAL_A_QUEUE + ":" + message);
    }

    /**
     * 发送消息,发送时设置消息过期时间
     *
     * @param message
     */
    @GetMapping("/sendExpiredMessage/{message}/{ttlTime}")
    public void sendExpiredMessage(@PathVariable String message, @PathVariable String ttlTime) {
        log.info("{}开始发送消息{}到普通C队列", new Date().toString(), message);

        rabbitTemplate.convertAndSend(TtlQueueConfig.NORMAL_EXCHANGE, "normalCKey", TtlQueueConfig.NORMAL_C_QUEUE + ":" + message, msg -> {
            // 发送消息时设置消息过期时间
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

    /**
     * 基于插件发送延迟消息
     *
     * @param message
     */
    @GetMapping("/sendDelayedMessage/{message}/{ttlTime}")
    public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer ttlTime) {
        log.info("{}开始发送消息{}到延迟队列", new Date().toString(), message);

        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTINGKEY, DelayedQueueConfig.DELAYED_QUEUE + ":" + message, msg -> {
            // 发送消息时设置消息过期时间
            msg.getMessageProperties().setDelay(ttlTime);
            return msg;
        });
    }

}

消费者

package com.yl.consumer;

import com.rabbitmq.client.Channel;
import com.yl.config.DelayedQueueConfig;
import com.yl.config.TtlQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟队列消费者
 *
 * @Y-wee
 */
@Slf4j
@Component
public class TtlConsumer {

    /**
     * 接收延迟队列消息
     *
     * @param message
     * @param channel
     */
    @RabbitListener(queues = TtlQueueConfig.DEAD_QUEUE)
    public void receiveMessage(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("{}收到消息:{}", new Date().toString(), msg);
    }

    /**
     * 接收基于插件延迟队列消息
     *
     * @param message
     */
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE)
    public void receiveDelayedMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("{}收到消息:{}", new Date().toString(), msg);
    }

}
这篇关于rabbitmq延迟队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!