Work Queues,也被称为(Task Queues)任务模型
。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work
模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
以上的角色分别为如下所解释的:
代码如下所示:
java/** * @author: BNZeng **/ public class Producer { @Test public void sendMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); for (int i = 1; i <= 100; i++) { channel.basicPublish("", "hello", null, ("hello rabbitmq Work Queue → " + i).getBytes()); } // 关闭通道和连接 RabbitMQUtil.closeChannelAndConnection(channel, connection); System.out.println("消息发送成功"); } }
代码如下所示:
java/** * @author BNZeng */ public class Consumer1 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【1】收到消息 → " + new String(body)); } }); System.out.println("消费者【1】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
代码如下所示:
java/** * @author BNZeng */ public class Consumer2 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // 把签收模式变成 false channel.basicConsume("hello", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【2】收到消息 → " + new String(body)); } }); System.out.println("消费者【2】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
先启动消费者1和消费者2,再启动消息生产者发送消息,发现结果如下:
他们是平均消费的,官网有说明:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
那么实际开发中可能有消费者处理的慢,有的处理的快,那么如何配置呢,引入自动确认机制,
完成一项任务可能需要几秒钟。您可能想知道,如果一个消费者开始了一项很长的任务,但只完成了一部分就去世了,会发生什么。在我们当前的代码中,一旦 RabbitMQ 向使用者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们就会丢失它正在处理的信息。我们还将丢失所有发送给这个特定 worker 但尚未处理的消息。
但我们不想失去任何任务。如果一个工人死亡,我们希望任务被交付给另一个工人。
接下来改造消费者2用来模拟一下某一个消费者消费慢的情况下会怎么样,改造之后的代码如下:
java/** * @author BNTang */ public class Consumer2 { @Test public void receiveMessage() throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 一次只处理一条消息 channel.basicQos(1); channel.queueDeclare("hello", false, false, false, null); // 把签收模式变成 false channel.basicConsume("hello", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 手动签收 channel.basicAck(envelope.getDeliveryTag(), false); System.out.println("消费者【2】收到消息 → " + new String(body)); } }); System.out.println("消费者【2】启动成功"); // 不能让程序结束 System.in.read(); // 释放资源 RabbitMQUtil.closeChannelAndConnection(channel, connection); } }
运行起来进行测试,结果如下所示: