消息队列MQ

rabbitmq的消费端限流模式

本文主要是介绍rabbitmq的消费端限流模式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

rabbitmq的消费端限流模式

 

 

rabbitmq-high-producer项目

application.properties文件

server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#是否启用【发布确认】,默认false
#spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否启用【发布返回】,默认false
spring.rabbitmq.publisher-returns=true
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必须在ack确认才能使用
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=2

controller层

package com.qingfeng.rabbitmqhighproducer.qsq.controller;

import com.qingfeng.rabbitmqhighproducer.qsq.service.QsqRabbitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * 消费端限流
 */
@RestController
@RequestMapping("/qsq")
public class QsqDirectSendMessageController {

    @Autowired
    private QsqRabbitService qsqRabbitService;

    //http://127.0.0.1:8081/qsq/sendDirectMessage
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        for (int i=1;i<100;i++) {
            String messageId = String.valueOf(UUID.randomUUID());
            //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
            qsqRabbitService.sendMessageQsq("qsq_direct_exchange", "qsq_direct_routing", messageId);
        }
        return "ok";
    }

    @GetMapping("/sendDirectMessage02")
    public String sendDirectMessage02() {
        String messageId = String.valueOf(UUID.randomUUID());

        //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
        qsqRabbitService.sendMessageQsq("qsq_direct_exchange","qsq_direct_routing",messageId);
        return "ok";
    }

}
QsqRabbitConfig类
package com.qingfeng.rabbitmqhighproducer.qsq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *限流机制
 */
@Configuration
public class QsqRabbitConfig {

    //交换机名称
    public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange";
    //队列名称
    public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue";
    //路由
    public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing";

    /**
     * 交换机
     */
    @Bean(name = "qsqDirectExchange")
    public DirectExchange qsqDirectExchange() {
        // 参数意义:
        // name: 名称
        // durable: true
        // autoDelete: 自动删除
        return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false);
    }

    /**
     * 队列
     */
    @Bean(name ="qsqDirectQueue" )
    public Queue qsqDirectQueue() {
        // 参数意义:
        // name: 名称
        // durable: true为持久化
        return new Queue(QSQ_DIRECT__QUEUE, true);
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingDirectQsq() {
        return BindingBuilder.bind(qsqDirectQueue())
                .to(qsqDirectExchange())
                .with(QSQ_DIRECT__ROUTING);
    }


}
QsqRabbitService类
package com.qingfeng.rabbitmqhighproducer.qsq.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class QsqRabbitService implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 消息从 producer生产者 到 exchange交换机 则会返回一个 confirmCallback 。
     *  producer发送消息confirm 确认模式
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param msg 消息
     */
    public void sendMessageQsq(String exchange, String routingKey, String msg){
        /**
         * 确认模式:
         * 步骤:
         * 1. 确认模式开启:spring.rabbitmq.publisher-confirm-type=correlated
         * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
         */
        //2. 定义回调
        rabbitTemplate.setConfirmCallback(this);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("发送的消息为"+msg);
        this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData);
    }

    /**
     *
     * @param correlationData 相关配置信息
     * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
     * @param cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送确认成功");
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }

}

 

 

rabbitmq-high-consumer项目

application.properties文件

server.port=8082

# ip
spring.rabbitmq.host=127.0.0.1
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#是否启用【发布确认】,默认false
#spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否启用【发布返回】,默认false
spring.rabbitmq.publisher-returns=true
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必须在ack确认才能使用
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=2
QsqRabbitConfig类
package com.qingfeng.rabbitmqhighconsumer.qsq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *限流机制
 */
@Configuration
public class QsqRabbitConfig {

    //交换机名称
    public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange";
    //队列名称
    public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue";
    //路由
    public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing";

    /**
     * 交换机
     */
    @Bean(name = "qsqDirectExchange")
    public DirectExchange qsqDirectExchange() {
        // 参数意义:
        // name: 名称
        // durable: true
        // autoDelete: 自动删除
        return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false);
    }

    /**
     * 队列
     */
    @Bean(name ="qsqDirectQueue" )
    public Queue qsqDirectQueue() {
        // 参数意义:
        // name: 名称
        // durable: true为持久化
        return new Queue(QSQ_DIRECT__QUEUE, true);
    }

    /**
     * 绑定队列和交换机
     */
    @Bean("bindingDirectQsq")
    public Binding bindingDirectQsq() {
        return BindingBuilder.bind(qsqDirectQueue())
                .to(qsqDirectExchange())
                .with(QSQ_DIRECT__ROUTING);
    }




}
QsqListener类
package com.qingfeng.rabbitmqhighconsumer.qsq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Consumer 限流机制
 *  1. 确保ack机制为手动确认。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 *  2. listener-container配置属性  spring.rabbitmq.listener.simple.prefetch=1
 *      perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
 */
@Component
public class QsqListener {

    //限流机制
    @RabbitHandler
    @RabbitListener(queues = "qsq_direct_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(5000);
        //1.获取消息
        System.out.println(new String(message.getBody()));

        //2. 处理业务逻辑
        System.out.println("处理业务逻辑");

        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}

 

 

启动两个项目测试:正确的测试
访问http://127.0.0.1:8081/qsq/sendDirectMessage
rabbitmq-high-provider输出结果
会把生产的消息全发送到MQ服务器



rabbitmq-high-consumer输出结果
 
在配置文件里配置了spring.rabbitmq.listener.simple.prefetch=2
 
consumer两个两个的消费

 


 

 

 

 



 

这篇关于rabbitmq的消费端限流模式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!