rabbitmq的消费端限流模式
rabbitmq-high-producer项目
application.properties文件
server.port=8081 # ip spring.rabbitmq.host=127.0.0.1 #默认5672 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #密码 spring.rabbitmq.password=guest #连接到代理时用的虚拟主机 spring.rabbitmq.virtual-host=/ #是否启用【发布确认】,默认false #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=correlated #是否启用【发布返回】,默认false spring.rabbitmq.publisher-returns=true #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=manual #rabbitmq限流,必须在ack确认才能使用 #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #最大的消费者数量 spring.rabbitmq.listener.simple.max-concurrency=10 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量) spring.rabbitmq.listener.simple.prefetch=2
controller层
package com.qingfeng.rabbitmqhighproducer.qsq.controller; import com.qingfeng.rabbitmqhighproducer.qsq.service.QsqRabbitService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * 消费端限流 */ @RestController @RequestMapping("/qsq") public class QsqDirectSendMessageController { @Autowired private QsqRabbitService qsqRabbitService; //http://127.0.0.1:8081/qsq/sendDirectMessage @GetMapping("/sendDirectMessage") public String sendDirectMessage() { for (int i=1;i<100;i++) { String messageId = String.valueOf(UUID.randomUUID()); //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM qsqRabbitService.sendMessageQsq("qsq_direct_exchange", "qsq_direct_routing", messageId); } return "ok"; } @GetMapping("/sendDirectMessage02") public String sendDirectMessage02() { String messageId = String.valueOf(UUID.randomUUID()); //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM qsqRabbitService.sendMessageQsq("qsq_direct_exchange","qsq_direct_routing",messageId); return "ok"; } }
QsqRabbitConfig类
package com.qingfeng.rabbitmqhighproducer.qsq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** *限流机制 */ @Configuration public class QsqRabbitConfig { //交换机名称 public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange"; //队列名称 public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue"; //路由 public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing"; /** * 交换机 */ @Bean(name = "qsqDirectExchange") public DirectExchange qsqDirectExchange() { // 参数意义: // name: 名称 // durable: true // autoDelete: 自动删除 return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false); } /** * 队列 */ @Bean(name ="qsqDirectQueue" ) public Queue qsqDirectQueue() { // 参数意义: // name: 名称 // durable: true为持久化 return new Queue(QSQ_DIRECT__QUEUE, true); } /** * 绑定队列和交换机 */ @Bean public Binding bindingDirectQsq() { return BindingBuilder.bind(qsqDirectQueue()) .to(qsqDirectExchange()) .with(QSQ_DIRECT__ROUTING); } }
QsqRabbitService类
package com.qingfeng.rabbitmqhighproducer.qsq.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class QsqRabbitService implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 消息从 producer生产者 到 exchange交换机 则会返回一个 confirmCallback 。 * producer发送消息confirm 确认模式 * @param exchange 交换机 * @param routingKey 路由键 * @param msg 消息 */ public void sendMessageQsq(String exchange, String routingKey, String msg){ /** * 确认模式: * 步骤: * 1. 确认模式开启:spring.rabbitmq.publisher-confirm-type=correlated * 2. 在rabbitTemplate定义ConfirmCallBack回调函数 */ //2. 定义回调 rabbitTemplate.setConfirmCallback(this); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); System.out.println("发送的消息为"+msg); this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData); } /** * * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("消息id:" + correlationData.getId()); if (ack) { System.out.println("消息发送确认成功"); } else { System.out.println("消息发送确认失败:" + cause); } } }
rabbitmq-high-consumer项目
application.properties文件
server.port=8082 # ip spring.rabbitmq.host=127.0.0.1 #默认5672 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #密码 spring.rabbitmq.password=guest #连接到代理时用的虚拟主机 spring.rabbitmq.virtual-host=/ #是否启用【发布确认】,默认false #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=correlated #是否启用【发布返回】,默认false spring.rabbitmq.publisher-returns=true #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=manual #rabbitmq限流,必须在ack确认才能使用 #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #最大的消费者数量 spring.rabbitmq.listener.simple.max-concurrency=10 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量) spring.rabbitmq.listener.simple.prefetch=2
QsqRabbitConfig类
package com.qingfeng.rabbitmqhighconsumer.qsq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** *限流机制 */ @Configuration public class QsqRabbitConfig { //交换机名称 public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange"; //队列名称 public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue"; //路由 public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing"; /** * 交换机 */ @Bean(name = "qsqDirectExchange") public DirectExchange qsqDirectExchange() { // 参数意义: // name: 名称 // durable: true // autoDelete: 自动删除 return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false); } /** * 队列 */ @Bean(name ="qsqDirectQueue" ) public Queue qsqDirectQueue() { // 参数意义: // name: 名称 // durable: true为持久化 return new Queue(QSQ_DIRECT__QUEUE, true); } /** * 绑定队列和交换机 */ @Bean("bindingDirectQsq") public Binding bindingDirectQsq() { return BindingBuilder.bind(qsqDirectQueue()) .to(qsqDirectExchange()) .with(QSQ_DIRECT__ROUTING); } }
QsqListener类
package com.qingfeng.rabbitmqhighconsumer.qsq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Consumer 限流机制 * 1. 确保ack机制为手动确认。 spring.rabbitmq.listener.simple.acknowledge-mode=manual * 2. listener-container配置属性 spring.rabbitmq.listener.simple.prefetch=1 * perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。 */ @Component public class QsqListener { //限流机制 @RabbitHandler @RabbitListener(queues = "qsq_direct_queue") public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(5000); //1.获取消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑"); //3. 签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } }
启动两个项目测试:正确的测试 访问http://127.0.0.1:8081/qsq/sendDirectMessage
rabbitmq-high-provider输出结果 会把生产的消息全发送到MQ服务器
rabbitmq-high-consumer输出结果
在配置文件里配置了spring.rabbitmq.listener.simple.prefetch=2
consumer两个两个的消费