为什么使用消息队列?消息对列有什么好处? - 爱笑的Terry - 博客园https://www.cnblogs.com/terry-love/p/11492397.html
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
docker-compose up
登陆 (注意Google浏览器有兼容问题,使用IE)
http://8.130.166.101:15672/http://8.130.166.101:15672/
用户名guest 密码guest
输入之后就可以进来了
重点
】
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单架构图 |
---|
完整架构图
完整架构图 |
---|
重点
】通讯方式 |
---|
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMqUtils { public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("8.130.166.101"); connectionFactory.setPort(5672); //设置登录用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //存储在哪个位置 connectionFactory.setVirtualHost("/"); return connectionFactory.newConnection(); } }
一个生产者,一个默认的交换机,一个队列,一个消费者
结构图 |
---|
创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
@Test public void Publisher() throws IOException, TimeoutException { Channel channel = connection.createChannel(); // 参数1:指定exchange,使用""。 // 参数2:指定路由的规则,使用具体的队列名称。 // 参数3:指定传递的消息所携带的properties,使用null。 // 参数4:指定发布的具体消息,byte[]类型 // 向队列 发送消息 " hello-queue" channel.basicPublish("","hello-queue",null,"hello-queue".getBytes()); channel.close(); } @After public void destroy() throws IOException { connection.close(); }
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
/* * 消费者*/ @Test public void consumerTest() throws IOException { //创建管道 Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 //channel和队列绑定 channel.queueDeclare("hello-queue",true,true,false,null); //抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //从队列中获取信息 System.out.println("接受消息:"+new String(body,"utf-8")); } }; // channel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // 参数3 消费者 channel.basicConsume("hello-queue",true,consumer); //让程序卡住 System.in.read(); }
运行两次生产者
可以在消费者这里获取到消息
手动ack 机制:保证消息对应的业务 已经真正的处理了,而不是仅仅接收到该消息
/* * 消费者*/ @Test public void consumerTest() throws IOException { //创建管道 Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 //channel和队列绑定 channel.queueDeclare("hello-queue",true,true,false,null); //抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //从队列中获取信息 System.out.println("接受消息:"+new String(body,"utf-8")); // 所有的业务都完成之后 可以手动的ack // envelope.getDeliveryTag() // 消息标记 0 1 2 // false ack 之后 不删除 channel.basicAck(envelope.getDeliveryTag(),false); } }; // channel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // 参数3 消费者 channel.basicConsume("hello-queue",false,consumer); //让程序卡住 System.in.read(); }
一个生产者,一个默认的交换机,一个队列,两个消费者
结构图 |
---|
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
消费者指定Qoa和手动ack
两个消费者消费同一个消息队列
public class WorkerQueueTest { private Connection connection; @Before public void init() throws IOException, TimeoutException { connection = RabbitMqUtils.getConnection(); } /* * 消费者*/ @Test public void consumer1Test() throws IOException { //创建管道 Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 //channel和队列绑定 channel.queueDeclare("work-queue",true,false,false,null); //抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //从队列中获取信息 System.out.println("接受消息:"+new String(body,"utf-8")); } }; // channel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // 参数3 消费者 channel.basicConsume("work-queue",true,consumer); //让程序卡住 System.in.read(); } @Test public void consumer2Test() throws IOException { //创建管道 Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 //channel和队列绑定 channel.queueDeclare("work-queue",true,false,false,null); //抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //从队列中获取信息 System.out.println("接受消息:"+new String(body,"utf-8")); } }; // channel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // 参数3 消费者 channel.basicConsume("work-queue",true,consumer); //让程序卡住 System.in.read(); } @Test public void Publisher() throws IOException, TimeoutException { Channel channel = connection.createChannel(); //参数1: 交换机名称 没有就是默认 "" // 参数2 :队列名称 或者是 消息类型信息 路由规则进行匹配 // 参数3: 指定传递的消息所携带的properties,使用null。 // 参数4:消息 // 向队列 发送十条消息 " hello-queue" for (int i = 0; i < 10; i++) { channel.basicPublish("","work-queue",null,("work-queue"+i).getBytes()); } channel.close(); } @After public void destroy() throws IOException { connection.close(); } }
以下一定要先启动消费者在启动生产者
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
public class PublishSubTest { private Connection connection; @Before // 在@Test 之前调用初始化数据 public void init() throws IOException, TimeoutException { connection = RabbitMqUtils.getConnection(); } /** * 测试时,一定要先启动 消费者,在启动生产者 */ @Test // 进行单元测试 public void consumer1Test() throws IOException { // channel 管道 连接 消费者和队列 final Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 // chanel 和 队列绑定 channel.queueDeclare("pubsub-queue1",true,false,false,null); // 抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 从队列中获取消息 并处理器 System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") ); // 所有的业务都完成之后 可以手动的ack // envelope.getDeliveryTag() // 消息标记 0 1 2 // false ack 之后 不删除 channel.basicAck(envelope.getDeliveryTag(),false); } }; // chanel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // false 需要手动ack // 参数3 消费者 channel.basicConsume("pubsub-queue1", false,consumer); // 让程序一致卡在这里 消费者 可以一致消费消息 System.in.read();// 等待客户端命令行 的输入 } @Test // 进行单元测试 public void consumer2Test() throws IOException { // channel 管道 连接 消费者和队列 final Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 // chanel 和 队列绑定 channel.queueDeclare("pubsub-queue2",true,false,false,null); // 抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 从队列中获取消息 并处理器 System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") ); // 所有的业务都完成之后 可以手动的ack // envelope.getDeliveryTag() // 消息标记 0 1 2 // false ack 之后 不删除 channel.basicAck(envelope.getDeliveryTag(),false); } }; // chanel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // false 需要手动ack // 参数3 消费者 channel.basicConsume("pubsub-queue2", false,consumer); // 让程序一致卡在这里 消费者 可以一致消费消息 System.in.read();// 等待客户端命令行 的输入 } /** * 生产者 模式 */ @Test public void publishTest() throws IOException, TimeoutException { Channel channel = connection.createChannel(); //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange" //参数1: exchange的名称 //参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics //FANOUT - pubsub 交换机 会将消息发送到 所有的队列中 channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT); //参数1 队列名 //参数2 交换机名 //参数3 路由规则 channel.queueBind("pubsub-queue1", "pubsub-exchange", ""); channel.queueBind("pubsub-queue2", "pubsub-exchange", ""); //参数1: 交换机名称 没有就是默认 "" // 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配 // 参数3: 指定传递的消息所携带的properties,使用null。 // 参数4:消息 // 向队列 发送消息 // 发送10条数据 每个消费者得到5条数据 for (int i = 0; i < 10; i++) { // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange // "pubsub-exchange" 交换机名称 // "" 路由规则 channel.basicPublish("pubsub-exchange", "",null,("pubsub--i:" +i).getBytes()); } channel.close(); } @After// 在@Test 之后进行 ,是数据销毁 public void destroy() throws IOException { connection.close(); } }
消费者还是正常的监听某一个队列即可。
以下一定要先启动消费者在启动生产者
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
消费者没有变化
public class RoutingTest { private Connection connection; @Before public void getConnection() throws IOException, TimeoutException { connection = RabbitMqUtils.getConnection(); } @Test public void consumer1Test() throws IOException { final Channel channel = connection.createChannel(); channel.queueDeclare("routing-info-queue",true,false,false,null); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("routing-info-queue接收到消息"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; // chanel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // false 需要手动ack // 参数3 消费者 channel.basicConsume("routing-info-queue",false,consumer); System.in.read(); } @Test public void consumer2Test() throws IOException { final Channel channel = connection.createChannel(); channel.queueDeclare("routing-error-queue",true,false,false,null); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("routing-error-queue接受消息:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("routing-error-queue",false,consumer); System.in.read(); } @Test public void publisherTest() throws IOException, TimeoutException { Channel channel = connection.createChannel(); //将 chanel 和 自定义的交换机 绑定 "Routing-exchange" //参数1: exchange的名称 //参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics // DIRECT - Routing 交换机 会将消息发送到 所有的队列中 channel.exchangeDeclare("Routing-exchange",BuiltinExchangeType.DIRECT); //参数1 队列名 //参数2 交换机名 //参数3 路由规则 // 所有消息为 info 的消息都会 由Routing-exchange发送到routing-info-queue 队列中 channel.queueBind("routing-info-queue","Routing-exchange","info"); channel.queueBind("routing-error-queue","Routing-exchange","error"); //参数1: 交换机名称 没有就是默认 "" // 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配 // 参数3: 指定传递的消息所携带的properties,使用null。 // 参数4:消息 // 向队列 发送消息 // 发送10条数据 每个消费者得到5条数据 for (int i = 0; i < 10; i++) { if (i%2==1){//奇数 channel.basicPublish("Routing-exchange","info",null,("Routing--i="+i).getBytes()); }else { channel.basicPublish("Routing-exchange","error",null,("Routing--i="+i).getBytes()); } } channel.close(); } @After public void destroy() throws IOException { connection.close(); } }
运行结果
以下一定要先启动消费者在启动生产者
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
//2. 创建exchange并指定绑定方式 channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC); channel.queueBind("topic-queue-1","topic-exchange","*.red.*"); channel.queueBind("topic-queue-2","topic-exchange","fast.#"); channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit"); //3. 发布消息到exchange,同时指定路由的规则 channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes()); channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes()); channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
消费者只是监听队列,没变化。
public class TopicTest { private Connection connection; @Before // 在@Test 之前调用初始化数据 public void init() throws IOException, TimeoutException { connection = RabbitMqUtils.getConnection(); } /** * 测试时,一定要先启动 消费者,在启动生产者 */ @Test // 进行单元测试 public void consumer1Test() throws IOException { // channel 管道 连接 消费者和队列 final Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 // chanel 和 队列绑定 channel.queueDeclare("topic-queue-1",true,false,false,null); // 抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 从队列中获取消息 并处理器 System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") ); // 所有的业务都完成之后 可以手动的ack // envelope.getDeliveryTag() // 消息标记 0 1 2 // false ack 之后 不删除 channel.basicAck(envelope.getDeliveryTag(),false); } }; // chanel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // false 需要手动ack // 参数3 消费者 channel.basicConsume("topic-queue-1", false,consumer); // 让程序一致卡在这里 消费者 可以一致消费消息 System.in.read();// 等待客户端命令行 的输入 } @Test // 进行单元测试 public void consumer2Test() throws IOException { // channel 管道 连接 消费者和队列 final Channel channel = connection.createChannel(); //参数1:queue - 指定队列的名称 //参数2:durable - 当前队列是否需要持久化(true) //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除 //参数5:arguments - 指定当前队列的其他信息 // chanel 和 队列绑定 channel.queueDeclare("topic-queue-2",true,false,false,null); // 抱着每次消费者 消费一条数据 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 从队列中获取消息 并处理器 System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") ); // 所有的业务都完成之后 可以手动的ack // envelope.getDeliveryTag() // 消息标记 0 1 2 // false ack 之后 不删除 channel.basicAck(envelope.getDeliveryTag(),false); } }; // chanel 和 消费者绑定 // 参数1 队列名称 // 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送 // false 需要手动ack // 参数3 消费者 channel.basicConsume("topic-queue-2", false,consumer); // 让程序一致卡在这里 消费者 可以一致消费消息 System.in.read();// 等待客户端命令行 的输入 } /** * 生产者 模式 */ @Test public void publishTest() throws IOException, TimeoutException { Channel channel = connection.createChannel(); //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange" //参数1: exchange的名称 //参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics // DIRECT - Routing 交换机 会将消息发送到 所有的队列中 channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC); //参数1 队列名 //参数2 交换机名 //参数3 路由规则 // 所有消息为 info 的消息都会 由Routing-exchange发送到topic-queue-1 队列中 channel.queueBind("topic-queue-1", "topic-exchange", "*.orange.*"); channel.queueBind("topic-queue-2", "topic-exchange", "big.*.*"); //参数1: 交换机名称 没有就是默认 "" // 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配 // 参数3: 指定传递的消息所携带的properties,使用null。 // 参数4:消息 // 向队列 发送消息 // 发送10条数据 每个消费者得到5条数据 for (int i = 0; i < 10; i++) { // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange //参数1: "pubsub-exchange" 交换机名称 //参数2: "" 路由规则 if (i%2==1){// 奇数 orange channel.basicPublish("topic-exchange", "xxxasdasd.orange.xfsdf",null,("topic--i:" +i).getBytes()); }else{ // 偶数 error channel.basicPublish("topic-exchange", "big.xxxx.uii",null,("routing--i:" +i).getBytes()); } } channel.close(); } @After// 在@Test 之后进行 ,是数据销毁 public void destroy() throws IOException { connection.close(); } }
运行结果:
重点
】<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!--<scope>test</scope>--> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>
spring.rabbitmq.host=8.130.166.101 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
@Configuration public class RabbitConfig { @Bean public TopicExchange topicExchange(){ TopicExchange topicExchange = new TopicExchange("springboot-topic-exchange", true, false); return topicExchange; } @Bean public Queue queue(){ Queue queue = new Queue("springboot-queue", true, false, false, null); return queue; } @Bean public Binding binding(TopicExchange topicExchange,Queue queue){ Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("*.java.*"); return binding; } }
@SpringBootTest @RunWith(SpringRunner.class) public class Mytest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void publisherTest(){ rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?"); System.out.println("发送消息"); } }
@Component public class Consumer { @RabbitListener(queues = "springboot-queue") public void consumer1(String msg, Channel channel, Message message){ System.out.println("消费者得到:"+msg); System.out.println("msg = "+message); } }
结果如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
此时,多次运行Test 则会收到多条未消费的消息
@Component public class Consumer { @RabbitListener(queues = "springboot-queue") public void consumer1(String msg, Channel channel, Message message) throws IOException { System.out.println("消费者得到:"+msg); System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag()); int i = 1/0; // 手动ack // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag() // 多条消息是否一起ack false channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
这是运行了三次,在第三次的时候会直接出来多个未消费的对象
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
#解决数据安全问题 spring.rabbitmq.publisher-confirm-type=simple spring.rabbitmq.publisher-returns=true
@Component public class ConfirmReturnCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; // 在容器中 加入该bean 会调用 该方法( init 方法 , @PostConstruct 标记方法) @PostConstruct//相当于bean生命周期的init public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("CorrelationData"+correlationData); System.out.println("s="+s); if (b){ System.out.println("消息到达交换机"); } } // return 机制 ,一般情况下 不会回调,只有在交换机的消息不能写入队列才会调用 @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("message"+message); } }
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
重复消费 |
---|
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
消费者,在消费消息时,根据具体业务逻辑去操作redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
#配置redis spring.redis.host=8.130.166.101 spring.redis.port=6379
@SpringBootTest @RunWith(SpringRunner.class) public class Mytest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void publisherTest(){ CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?"+System.currentTimeMillis(),correlationData); System.out.println("发送消息"); } }
@Component public class Consumer { @Autowired private RedisTemplate redisTemplate; @RabbitListener(queues = "springboot-queue") public void consumer1(String msg, Channel channel, Message message) throws IOException { System.out.println("消费者得到:"+msg); System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag()); // 得到处理消息的唯一id String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); System.out.println("id="+id); // 如果能设置成功说明 该消息没有被处理过 // 该 id 值为0 代表 正在处理 1代表处理完成 if (redisTemplate.opsForValue().setIfAbsent(id,"0",10, TimeUnit.SECONDS)){ System.out.println("消费者,处理该业务:"+msg); //处理完之后,变为1 redisTemplate.opsForValue().set(id,"1",10,TimeUnit.SECONDS); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else { // 如果不能设置key 说明 已经有消费者在处理 if (redisTemplate.opsForValue().get(id).equals("1")){ // 手动ack // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag() // 多条消息是否一起ack false channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } } } }