消息队列MQ

RabbitMQ消息队列

本文主要是介绍RabbitMQ消息队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、引言

消息队列作用:解耦异步削峰

为什么使用消息队列?消息对列有什么好处? - 爱笑的Terry - 博客园https://www.cnblogs.com/terry-love/p/11492397.html

二、RabbitMQ介绍


市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

三、RabbitMQ安装(例子为阿里云服务器部署)


首先查看一下docker中有没有在运行的容器,是否占用了端口号

接着创建文件夹,文件夹中创建 docker-compose.yaml文件

在ymal文件中配置如下
 

version: "3.1"
services:
  rabbitmq:
   image: daocloud.io/library/rabbitmq:management
   restart: always
   container_name: rabbitmq
   ports:
     - 5672:5672
     - 15672:15672
   volumes:
     - ./data:/var/lib/rabbitmq

启动(第一次启动建议前台启动)

docker-compose up

登陆 (注意Google浏览器有兼容问题,使用IE)

注意!我的阿里云的服务器ip为8.130.166.101,大家注意根据自己的ip地址进行更改

后面的15672为端口号

http://8.130.166.101:15672/http://8.130.166.101:15672/

用户名guest 密码guest

输入之后就可以进来了

 

四、RabbitMQ架构【重点


4.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图

4.2 RabbitMQ的完整架构图

完整架构图

完整架构图

五、RabbitMQ的使用【重点


5.1 RabbitMQ的通讯方式

通讯方式

 

 

 

 

 

5.2 Java连接RabbitMQ

5.2.1 创建maven项目

5.2.2 导入依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
​
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

5.2.3 创建工具类连接RabbitMQ

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMqUtils {

    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.166.101");
        connectionFactory.setPort(5672);
        //设置登录用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //存储在哪个位置
        connectionFactory.setVirtualHost("/");
        return connectionFactory.newConnection();
    }

}

 

5.3 Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

 

创建生产者,创建一个channel,发布消息到exchange,指定路由规则。

@Test
    public void Publisher() throws IOException, TimeoutException {
        Channel channel = connection.createChannel();
        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型

        // 向队列 发送消息  " hello-queue"
        channel.basicPublish("","hello-queue",null,"hello-queue".getBytes());
        channel.close();
    }

    @After
    public void destroy() throws IOException {
        connection.close();
    }

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

/*
    * 消费者*/
    @Test
    public void consumerTest() throws IOException {
        //创建管道
        Channel channel = connection.createChannel();
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        //channel和队列绑定
        channel.queueDeclare("hello-queue",true,true,false,null);

        //抱着每次消费者 消费一条数据
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //从队列中获取信息
                System.out.println("接受消息:"+new String(body,"utf-8"));
            }
        };

        // channel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
        // 参数3  消费者

        channel.basicConsume("hello-queue",true,consumer);

        //让程序卡住
        System.in.read();

    }

运行两次生产者

 可以在消费者这里获取到消息

手动ack机制

手动ack 机制:保证消息对应的业务 已经真正的处理了,而不是仅仅接收到该消息

 

 /*
     * 消费者*/
    @Test
    public void consumerTest() throws IOException {
        //创建管道
        Channel channel = connection.createChannel();
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        //channel和队列绑定
        channel.queueDeclare("hello-queue",true,true,false,null);

        //抱着每次消费者 消费一条数据
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //从队列中获取信息
                System.out.println("接受消息:"+new String(body,"utf-8"));
                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // channel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
        // 参数3  消费者

        channel.basicConsume("hello-queue",false,consumer);

        //让程序卡住
        System.in.read();

    }

 

5.4 Work

一个生产者,一个默认的交换机,一个队列,两个消费者

结构图

 

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing

消费者指定Qoa和手动ack

两个消费者消费同一个消息队列

  
public class WorkerQueueTest {
    private Connection connection;


    @Before
    public void init() throws IOException, TimeoutException {
        connection = RabbitMqUtils.getConnection();
    }

    /*
     * 消费者*/
    @Test
    public void consumer1Test() throws IOException {
        //创建管道
        Channel channel = connection.createChannel();
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        //channel和队列绑定
        channel.queueDeclare("work-queue",true,false,false,null);

        //抱着每次消费者 消费一条数据
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //从队列中获取信息
                System.out.println("接受消息:"+new String(body,"utf-8"));
            }
        };

        // channel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
        // 参数3  消费者

        channel.basicConsume("work-queue",true,consumer);

        //让程序卡住
        System.in.read();

    }

    @Test
    public void consumer2Test() throws IOException {
        //创建管道
        Channel channel = connection.createChannel();
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        //channel和队列绑定
        channel.queueDeclare("work-queue",true,false,false,null);

        //抱着每次消费者 消费一条数据
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //从队列中获取信息
                System.out.println("接受消息:"+new String(body,"utf-8"));
            }
        };

        // channel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
        // 参数3  消费者

        channel.basicConsume("work-queue",true,consumer);

        //让程序卡住
        System.in.read();

    }


    @Test
    public void Publisher() throws IOException, TimeoutException {
        Channel channel = connection.createChannel();
        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送十条消息  " hello-queue"
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work-queue",null,("work-queue"+i).getBytes());
        }
        channel.close();
    }

    @After
    public void destroy() throws IOException {
        connection.close();
    }
}

首先运行两个消费者,然后运行一个生产者

可以看到结果如下


 

 

以下一定要先启动消费者在启动生产者

5.5 Publish/Subscribe

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

public class PublishSubTest {
    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    /**
     * 测试时,一定要先启动 消费者,在启动生产者
     */
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("pubsub-queue1",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("pubsub-queue1", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("pubsub-queue2",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("pubsub-queue2", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    /**
     * 生产者 模式
     */
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();


        //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        //FANOUT - pubsub 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);

        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
        channel.queueBind("pubsub-queue2", "pubsub-exchange", "");

        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息

        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
            // "pubsub-exchange" 交换机名称
            // ""  路由规则
            channel.basicPublish("pubsub-exchange", "",null,("pubsub--i:" +i).getBytes());

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }
}

消费者还是正常的监听某一个队列即可。

5.6 Routing

以下一定要先启动消费者在启动生产者

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。


消费者没有变化

public class RoutingTest {

    private Connection connection;

    @Before
    public void getConnection() throws IOException, TimeoutException {
        connection = RabbitMqUtils.getConnection();
    }

    @Test
    public void consumer1Test() throws IOException {
        final Channel channel = connection.createChannel();

        channel.queueDeclare("routing-info-queue",true,false,false,null);

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("routing-info-queue接收到消息"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("routing-info-queue",false,consumer);

        System.in.read();
    }

    @Test
    public void  consumer2Test() throws IOException {
        final Channel channel = connection.createChannel();
        channel.queueDeclare("routing-error-queue",true,false,false,null);
        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("routing-error-queue接受消息:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("routing-error-queue",false,consumer);
        System.in.read();
    }

    @Test
    public void publisherTest() throws IOException, TimeoutException {
        Channel channel = connection.createChannel();
        //将 chanel 和 自定义的交换机 绑定 "Routing-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        // DIRECT - Routing 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("Routing-exchange",BuiltinExchangeType.DIRECT);
        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        // 所有消息为 info 的消息都会 由Routing-exchange发送到routing-info-queue 队列中
        channel.queueBind("routing-info-queue","Routing-exchange","info");
        channel.queueBind("routing-error-queue","Routing-exchange","error");
        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息
        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {
            if (i%2==1){//奇数
                channel.basicPublish("Routing-exchange","info",null,("Routing--i="+i).getBytes());
            }else {

                channel.basicPublish("Routing-exchange","error",null,("Routing--i="+i).getBytes());
            }
        }
        channel.close();
    }

    @After
    public void destroy() throws IOException {
        connection.close();
    }
}

运行结果

 

 

5.7 Topic

以下一定要先启动消费者在启动生产者

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。

举个栗子

//2. 创建exchange并指定绑定方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
​
//3. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());

消费者只是监听队列,没变化。

public class TopicTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    /**
     * 测试时,一定要先启动 消费者,在启动生产者
     */
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("topic-queue-1",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("topic-queue-1", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("topic-queue-2",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("topic-queue-2", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    /**
     * 生产者 模式
     */
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();


        //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        // DIRECT - Routing 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);

        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        // 所有消息为 info 的消息都会 由Routing-exchange发送到topic-queue-1 队列中
        channel.queueBind("topic-queue-1", "topic-exchange", "*.orange.*");
        channel.queueBind("topic-queue-2", "topic-exchange", "big.*.*");

        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息
        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
            //参数1: "pubsub-exchange" 交换机名称
            //参数2: ""  路由规则

            if (i%2==1){// 奇数 orange
                channel.basicPublish("topic-exchange", "xxxasdasd.orange.xfsdf",null,("topic--i:" +i).getBytes());

            }else{ // 偶数 error
                channel.basicPublish("topic-exchange", "big.xxxx.uii",null,("routing--i:" +i).getBytes());

            }

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

运行结果:

 

 

六、RabbitMQ整合SpringBoot【重点


6.1 SpringBoot整合RabbitMQ

6.1.1 创建SpringBoot工程

6.1.2 导入依赖

  
  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
​
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
​
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <!--<scope>test</scope>-->
        </dependency>
​
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

6.1.3 application.properties中编写配置文件

spring.rabbitmq.host=8.130.166.101
spring.rabbitmq.port=5672

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.virtual-host=/

6.1.4 声明exchange、queue

@Configuration
public class RabbitConfig {
    @Bean
    public TopicExchange topicExchange(){
        TopicExchange topicExchange = new TopicExchange("springboot-topic-exchange", true, false);
        return topicExchange;
    }

    @Bean
    public Queue queue(){
        Queue queue = new Queue("springboot-queue", true, false, false, null);
        return queue;
    }

    @Bean
    public Binding binding(TopicExchange topicExchange,Queue queue){
        Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("*.java.*");
        return binding;
    }
}

6.1.5 发布消息到RabbitMQ,创建测试类,生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class Mytest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publisherTest(){
        rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?");
        System.out.println("发送消息");
    }
}

6.1.6 创建消费者监听消息

@Component
public class Consumer {
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message){
        System.out.println("消费者得到:"+msg);
        System.out.println("msg = "+message);
    }
}

结果如下:

 

6.2 手动Ack

6.2.1 添加配置文件

spring.rabbitmq.listener.simple.acknowledge-mode=manual

6.2.2 手动ack

时,多次运行Test 则会收到多条未消费的消息

@Component
public class Consumer {
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException {
        System.out.println("消费者得到:"+msg);
        System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag());
        int i = 1/0;
        // 手动ack
        // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
        // 多条消息是否一起ack   false
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

这是运行了三次,在第三次的时候会直接出来多个未消费的对象

 

 

 

七、RabbitMQ的其他操作


7.1 消息的可靠性

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

 

7.2 SpringBoot实现

7.2.1 编写配置文件

#解决数据安全问题
spring.rabbitmq.publisher-confirm-type=simple
spring.rabbitmq.publisher-returns=true

7.2.2 开启Confirm和Return

@Component
public class ConfirmReturnCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 在容器中 加入该bean 会调用 该方法( init 方法 , @PostConstruct 标记方法)
    @PostConstruct//相当于bean生命周期的init
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("CorrelationData"+correlationData);
        System.out.println("s="+s);
        if (b){
            System.out.println("消息到达交换机");
        }
    }
    // return 机制  ,一般情况下 不会回调,只有在交换机的消息不能写入队列才会调用
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("message"+message);
    }
}

 

7.3 避免消息重复消费()

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

id-0(正在执行业务)

id-1(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

生产者,发送消息时,指定messageId

消费者,在消费消息时,根据具体业务逻辑去操作redis

7.4 SpringBoot如何实现消息不重复

7.4.1 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

7.4.2 编写配置文件

#配置redis
spring.redis.host=8.130.166.101
spring.redis.port=6379

7.4.3 修改生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class Mytest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publisherTest(){
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?"+System.currentTimeMillis(),correlationData);
        System.out.println("发送消息");
    }
}

7.4.4 修改消费者

@Component
public class Consumer {
    @Autowired
    private RedisTemplate redisTemplate;
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException {

        System.out.println("消费者得到:"+msg);
        System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag());
        // 得到处理消息的唯一id
        String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
        System.out.println("id="+id);
 // 如果能设置成功说明 该消息没有被处理过
        // 该 id 值为0 代表 正在处理       1代表处理完成
        if (redisTemplate.opsForValue().setIfAbsent(id,"0",10, TimeUnit.SECONDS)){
            System.out.println("消费者,处理该业务:"+msg);
            //处理完之后,变为1
            redisTemplate.opsForValue().set(id,"1",10,TimeUnit.SECONDS);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }else {
            // 如果不能设置key   说明 已经有消费者在处理
            if (redisTemplate.opsForValue().get(id).equals("1")){

                // 手动ack
                // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
                // 多条消息是否一起ack   false
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        }
    }
}

如果运行第一遍出不来的话,就允许第二遍,如果第二遍还不出来就是写错了

 

这篇关于RabbitMQ消息队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!