工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
启动两个线程,一个消息发送线程,来看看这两个工作线程是如何工作的。
package com.uin; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author wanglufei * @description: 工具类 * @date 2022/1/24/12:29 AM */ public class RabbitMQUtils { public static Channel getChannel() throws IOException, TimeoutException { //引入连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
package com.uin.work_queues; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.uin.utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author wanglufei * @description: TODO * @date 2022/1/24/12:40 AM */ public class Consumer_work01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); //接受消息的回调 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接受到的消息:" + new String(message.getBody())); }; //取消消息的回调 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!"); }; /** * 消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.未成功消费的一个回调 * 4.消费者取消消费的回调 */ System.out.println("第一个工作线程!等待接受消息。。。。"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
package com.uin.work_queues; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.uin.utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author wanglufei * @description: TODO * @date 2022/1/24/1:03 AM */ public class Consumer_work02 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); //接受消息的回调 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接受到的消息:" + new String(message.getBody())); }; //取消消息的回调 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!"); }; /** * 消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.未成功消费的一个回调 * 4.消费者取消消费的回调 */ System.out.println("第二个工作线程!等待接受消息。。。。"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
package com.uin.work_queues; import com.rabbitmq.client.Channel; import com.uin.utils.RabbitMQUtils; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; /** * @author wanglufei * @description: TODO * @date 2022/1/24/1:09 AM */ public class Producer_task01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送消息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以二进制传输 System.out.println("发送消息完成:" + message); } } }