消息队列MQ

RabbitMQ-任务模式

本文主要是介绍RabbitMQ-任务模式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

概述

Work Queues,也被称为(Task Queues)任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

以上的角色分别为如下所解释的:

  • P:生产者:任务的发布者
  • C1:消费者1,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者2:领取任务并完成任务,假设完成速度较快

创建生产者

代码如下所示:

java
/**
 * @author: BNZeng
 **/
public class Producer {
    
    @Test
    public void sendMessage() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("hello", false, false, false, null);
        
        for (int i = 1; i <= 100; i++) {
            channel.basicPublish("", "hello", null, ("hello rabbitmq Work Queue → " + i).getBytes());
        }
        
        // 关闭通道和连接
        RabbitMQUtil.closeChannelAndConnection(channel, connection);
        
        System.out.println("消息发送成功");
    }
}

创建消费者 1

代码如下所示:

java
/**
 * @author BNZeng
 */
public class Consumer1 {

    @Test
    public void receiveMessage() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare("hello", false, false, false, null);
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【1】收到消息 → " + new String(body));
            }
        });

        System.out.println("消费者【1】启动成功");

        // 不能让程序结束
        System.in.read();

        // 释放资源
        RabbitMQUtil.closeChannelAndConnection(channel, connection);
    }
}

创建消费者 2

代码如下所示:

java
/**
 * @author BNZeng
 */
public class Consumer2 {

    @Test
    public void receiveMessage() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare("hello", false, false, false, null);

        // 把签收模式变成 false
        channel.basicConsume("hello", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【2】收到消息 → " + new String(body));
            }
        });

        System.out.println("消费者【2】启动成功");

        // 不能让程序结束
        System.in.read();

        // 释放资源
        RabbitMQUtil.closeChannelAndConnection(channel, connection);
    }
}

进行测试

先启动消费者1和消费者2,再启动消息生产者发送消息,发现结果如下:

他们是平均消费的,官网有说明:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

那么实际开发中可能有消费者处理的慢,有的处理的快,那么如何配置呢,引入自动确认机制,

消息的自动确认机制

  • 官方的说明

完成一项任务可能需要几秒钟。您可能想知道,如果一个消费者开始了一项很长的任务,但只完成了一部分就去世了,会发生什么。在我们当前的代码中,一旦 RabbitMQ 向使用者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们就会丢失它正在处理的信息。我们还将丢失所有发送给这个特定 worker 但尚未处理的消息。

但我们不想失去任何任务。如果一个工人死亡,我们希望任务被交付给另一个工人。

接下来改造消费者2用来模拟一下某一个消费者消费慢的情况下会怎么样,改造之后的代码如下:

java
/**
 * @author BNTang
 */
public class Consumer2 {

    @Test
    public void receiveMessage() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();

        Channel channel = connection.createChannel();

        // 一次只处理一条消息
        channel.basicQos(1);
        channel.queueDeclare("hello", false, false, false, null);

        // 把签收模式变成 false
        channel.basicConsume("hello", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println("消费者【2】收到消息 → " + new String(body));
            }
        });

        System.out.println("消费者【2】启动成功");

        // 不能让程序结束
        System.in.read();

        // 释放资源
        RabbitMQUtil.closeChannelAndConnection(channel, connection);
    }
}

运行起来进行测试,结果如下所示:

这篇关于RabbitMQ-任务模式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!