消息队列MQ

rabbitmq

本文主要是介绍rabbitmq,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

RabbitMQ

文章目录

  • RabbitMQ
    • 概述
    • 交换机四种类型
    • 六大模式
      • 简单模式
      • 工作模式
        • 轮询分发
        • 不公平分发
        • 预取值
      • 发布订阅模式
      • 路由模式
      • 主题模式
      • 发布确认模式
    • 可靠消费
      • 消息应答
      • 消息自动重新入队
      • 代码实现
    • 持久化
    • 发布确认
      • 单个确认发布
      • 批量确认发布
      • 异步确认发布
    • 死信
      • 死信的来源
      • 死信队列处理机制
    • 延迟队列
      • 插件优化
    • SpringBoot中确认发布
      • mandatory 参数获取无法投递消息的感知能力并及时处理(回退消息)
      • 设置备份交换机
    • RabbitMQ其他知识点
      • 幂等、重复消费问题
      • 优先级队列
      • 惰性队列
    • 集群
      • 镜像队列
      • 负载均衡

概述

Message Queue, 消息队列, 先入先出,通信机制。

  • 流量消峰
  • 应用解耦
  • 异步处理

四大MQ对比

  • ActiveMQ
  • RabbitMQ:
  • RocketMQ:高并发
  • Kafka:大数据

四大组件:

  • 生产者:生产数据发送消息
  • 消费者:等待接收消息,并消费处理消息
  • 交换机:是RabbitMQ中非常重要的一个部件,一方面接收来自生产者的消息,另一方面将消息推送到队列中。交换机必须确切直到如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或将消息丢弃,这个是由交换机类型决定的。与队列时1对多关系
  • 队列:RabbitMQ中的一种数据结构,存储消息数据

名词解释:

  • Broker:接收和分发消息的应用, RabbitMQ Server就是Message Broker
  • Connection:publisher/consummer和broker直接的TCP连接
  • Channel:是Connection内部建立的逻辑连接,Channel作为轻量级Connection极大减少了操作系统建立TCP连接的开销
  • Exchange:消息到达broker的第一站,根据规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型由:direct(point-to-point),topic(publisher-subscribe),fanout(multicast)
  • binding:交换机与队列间的绑定

在这里插入图片描述

交换机四种类型

生产者将消息推给交换机,交换机再将消息发送到队列。如果不指定交换机,默认使用默认交换机。

  • direct:处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
  • fanout:Fanout 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上。
  • topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。
  • header:不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。

六大模式

  • 简单模式
  • 工作模式
    • 轮询分发
    • 按需分配,非公平的
  • 发布订阅模式
  • 路由模式
  • 主题模式
  • 发布确认模式

简单模式

  • 简单模式如果不指定交换机,则使用默认交换机。

  • 一般要求指定交换机

  • 简单模式的交换机类型是direct

在这里插入图片描述

  • 生产者
public class Producer {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.80.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建连接
            connection = connectionFactory.newConnection("生产者");
            // 通过连接获取channel
            channel = connection.createChannel();
            // 通过创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
            String queueName = "queue1";

            /*
            * 队列名字
            * 是否要持久化
            * 排他性,是否为独占
            * 是否自动删除,最后一个消息被消费后队列是否自动删除
            * 携带一些附加参数
            * */
            channel.queueDeclare(queueName, false, false, false, null);
            // 准备消息内容
            String message = "hello rabbit!";
            // 发送消息给队列
            channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭channel
            if (channel != null && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 消费者
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.80.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待接收消息...");
        // 推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        // 取消消费的一个回调接口, 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println("消息消费被中断了");
        };
        /*
         * 消费者消费消息
         * 1. 消费哪个队列
         * 2. 消费成功后是否自动挡应答,true自动,false手动
         * 3. 消费者成功消费的回调
         * 4. 消费者未成功消费的回调
         */
        channel.basicConsume("simple", true, deliverCallback, cancelCallback);
    }
}

工作模式

多个消费者轮流从消息队列中取出消息进行消费

  • 工作模式如果不指定交换机,则使用默认交换机
  • 一般要求指定交换机
  • 工作模式的交换机类型是direct

在这里插入图片描述

轮询分发

  • 抽取工具类
public class RabbitMQUtils {
    public static Channel getChannel () throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.80.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
  • 启动两个消费者
public class Consumer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("接收到消息:" + message);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费");
        };
        System.out.println("消费者启动等待消费...");
        // 工作模式,轮询消费
        channel.basicConsume("work", true, deliverCallback, cancelCallback);
    }
}
  • 生产者
public class Producer {
    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMQUtils.getChannel()) {
            channel.queueDeclare("work", false, false, false, null);
            // 从控制台当中接收信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", "work", null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送消息完成:" + message);
            }
        }
    }
}

不公平分发

int prefetchCount = 1;
channel.basicQos(prefetchCount);

预取值

发布订阅模式

交换机的类型为Fanout,即不处理路由键,只需要简单的将队列绑定到交换机上。

在这里插入图片描述

路由模式

交换机类型是direct,而且需要添加路由将队列绑定到交换机上

在这里插入图片描述

主题模式

支持模糊匹配的路由。交换机类型是topic,需要添加模糊路由进行队列和交换机的绑定

在这里插入图片描述

主题模式的路由不能随意写,必须是一个单词列表,以点分隔开

模糊匹配规则

  • # 表示没有、一个或多个
  • * 表示一个

发布确认模式

见后面

可靠消费

为了保证消息都被消费者消费了,没有丢失,引入了消息应答机制

消息应答

消费应答机制:消费者在接收到消息并且处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了

  • 自动应答
  • 手动应答✔️
    • Channel.basicAck 肯定确认,已成功处理该消息,可以将其丢弃
    • Channel.basicNack 否定确认
    • Channel.basicReject 否定确认,不处理该消息了,可以将其丢弃

消息自动重新入队

如果消费者没有发送ACK确认,RabbitMQ将知道该消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,该消息可以重新分发给另一个消费者。这样可以确保消息不会丢失。

在这里插入图片描述

代码实现

  • 生产者
public void produce() throws Exception {
    try (Channel channel = RabbitMQUtils.getChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入信息:");
        while (sc.hasNext()){
            String message = sc.nextLine();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息" + message);
        }
    }
}
  • 消费者
public void consume() throws Exeception{
    Channel channel = RabbitUtils.getChannel();
    channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    System.out.println("c1等待接收消息...");
    DeleverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        SleepUtils.sleep(1);
        System.out.println("接收到消息" + message);
        // 消息标记tag
        // false 代表直营到接收到的哪个传递的消息,true应答所有消息
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    };
    // 手动应答
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); // 最后一个参数未失败了怎么处理
}
  • 工具类
public class SleepUtils {    public static void sleep(int second){        try {            Thread.sleep(second * 1000);        } catch (InterruptedException e){            Thread.currentThread().interrupt();        }    }}

持久化

为确保消息不会丢失,需要将队列和消息都持久化

发布确认

当消息被投递到匹配的队列后,broker就会发送一个确认给生产者,这就使得生产者直到消息已经正确到达目的地了。

单个确认发布

这是一种同步确认发布地方式,发布一个消息后只有它被确认发布,后续消息才能继续发布

public void publishMessageIndividually() throws Exception {    try (Channel channel = RabbitMQUtils.getChannel()){        String queueName = "confim_individually";        channel.queueDeclare(queueName, false, false, false, null);        channel.confirmSelect(); // 开启确认发布        long begin = System.currentTimeMills();        for (int i = 0; i < MESSAGE_COUNT; i++){            String message = "第" + i + "条消息";            channel.basicPublish("", queueName, null, message.getBytes());            // 服务端返回false或超时时间内未返回,生产者可以重发消息            boolean flag = channel.waitForConfirm();            if (flag){                System.out.println("消息发送成功");            }        }        long end = System.currentTimeMillis();        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms")    }}

批量确认发布

先发布一批消息,然后一起确认可以极大提高吞吐量,缺点是,如果发生故障,不知道哪个消息出现了问题,也是同步的

public void publishMessageBatch() throws Exception {    try (Channel channel = RabbitMQUtils.getChannel()){        String queueName = "batch_confim";        channel.queueDeclare(queueName, false, false, false, null);        channel.confirmSelect(); // 开启确认发布        int batchSize = 100; // 批量确认消息大小        int messageCount = 0; // 未确认消息个数        long begin = System.currentTimeMills();        for (int i = 0; i < MESSAGE_COUNT; i++){            String message = "第" + i + "条消息";            channel.basicPublish("", queueName, null, message.getBytes());            // 服务端返回false或超时时间内未返回,生产者可以重发消息            messageCount++;            if (messageCount == batchSize){                channel.waitForConfirms();                messageCount = 0;            }        }        // 为了确保还有剩余没有确认消息,再次确认        if (messageCount > 0){            chennel.waitForConfirms();        }        long end = System.currentTimeMillis();        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms")    }}

异步确认发布

利用回调函数来达到消息可靠性传递

在这里插入图片描述

public void publishMessageAsync() throws Exception {    try (Channel channel = RabbitMQUtils.getChannel()){        String name = "async_confirm";        channel.queueDeclare(queueName, false, false, false, null);        // 开启确认发布        channel.confirmSelect();        // 线程安全有序的哈希表,适用于高并发的情况        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();                // 确认收到消息的一个回调        // sequence 消息序列号        // true/false 小于或等于当前序列号的消息/等于当前序列号的消息        ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {            if (multiple) {                // 返回的是小于等于当前序列号的未确认消息,是一个map            ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);            // 清除该部分未确认消息            confirmed.clear();            } else {                // 只清除当前序列号的消息                outstandingConfirms.remove(sequenceNumber);            }                    };                ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {            String message = oustandingConfrims.get(sequenceNumber);            System.out.println("发布的消息" + message + "未被确认,序列号:" + sequenceNumber);        };                // 添加一个异步确认的监听器        channel.addConfirmListener(ackCallback, nackCallback);                long begin = System.currentTimeMillis();        for (int i = 0; i < MESSAGE_COUNT; i++){            String message = "消息" + i;            // 关联序列号和消息体            // 全都是还未确认的消息体            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);            channel.basicPublish("", queueName, null, message.getBytes());        }        long end = System.currentTimeMillis();        System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin));    }}

死信

一般来说,producer将消息投递到broker或者queue里,consumer从queue中取出消息进行消费,但是某些是偶由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就成为死信。

为了保证消息不丢失,需要用到死信队列来处理死信。

死信的来源

  • 消息TTL过期
  • 队列达到最大长度
  • 消息被拒绝

死信队列处理机制

在这里插入图片描述

演示消息TTL过期,处理死信队列

  • 生产者
public void produce() throws Exception {    try (Channel channe = RabbitUtils.getChannel()){        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        // 设置TTL时间        AMQP.BasicProperties().builder().expiration("10000").build();        for (int i = 0; i < 11 ; i++){            String message = "info" +i;            channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes());            System.out.println("生产者发送消息" + message);        }    }}
  • 消费者,开启后关闭,模拟其接收不到消息
public void consumer() throws Exception{    Channel channel = RabbitUtils.getChannel();    // 声明死信和普通交换机类型为direct    channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);    channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明死信队列    String deadQueue = "dead-queue";    channel.queueDeclare(deadQueue, false, false, false, null);    // 死信队列绑定死信交换机    channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead");        // 正常队列绑定死信队列信息    Map<String, Object> params = new HashMap<>();    // 正常队列设置死信交换机    params.put("x-dead-letter-exchange", DEAD_EXCHANGE);    // 正常队列设置死信routing-key    params.put("x-dead-letter-routing-key", "dead");        String normalQueue = "normal-queue";    channel.queueDeclare(normalQueue, false, false, false, params);    channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal");    System.out.println("等待接收消息...");    DeliverCallback deliverCallback = (consumerTag, delivery) -> {        String message = new String(delivery.getBody(), "UTF-8");        System.out.println("Consumer01接收到消息" + message);    };    channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});    }

延迟队列

延迟队列就是存放需要在指定时间被处理的元素的队列

TTL是消息或队列的属性,表面该消息或队列中所有消息的最大存活时间。

如果消息在TTL设定的时间内没有被消费,则消息会变为死信。

创建两个队列QA和QB,分别设置TTL为10s和40s,然后创建一个交换机X和死信交换机Y,再创建一个死信队列QD

插件优化

RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列;如果第一个消息过期时间很长,而第二个消息过期时间很短,第二个消息并不会优先得到执行。

rabbtimq_delayed_message_exchange插件加成放到RabbtiMQ目录下

这样可以定义一个自定义的delay交换机,可以让第二个消息先得到执行

SpringBoot中确认发布

mandatory 参数获取无法投递消息的感知能力并及时处理(回退消息)

设置备份交换机

对于无法投递的消息,既不想丢失消息,又不想设置mandatory参数增加生产者的复杂性,该怎么做呢?

可以利用备份交换机,当交换机接收到一条不可路由的消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,备份交换机类型常常为Fanout

在这里插入图片描述

RabbitMQ其他知识点

幂等、重复消费问题

用户对于同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。

消费者再消费消息后,给MQ返回ack时网络中断,故MQ没有收到确认消息,该条消息会重写发送给消费者,而实际上该消息已经被消费,造成了重复消费问题。

解决办法:利用全局唯一ID,每次消费消息时,用该id判断,该消息是否已经被消费过了

优先级队列

RabbitMQ支持优先级队列,可以在往优先级队列中添加消息时设置优先级,让优先级高的消息先执行

惰性队列

惰性队列会尽可能将消息存入磁盘中,而在消费者消费到相应消息时,才会被加载到内存中,它的一个中要设计目标时能够支持更长的队列,即支持更多的消息存储。

集群

镜像队列

将队列镜像到集群中的其他Broker节点上,如果集群中的一个节点失效了,队列能自动切换到镜像中的另一个节点上以保证服务的可用性。

负载均衡

  • Haproxy + Keepalive
  • Nignx

在这里插入图片描述

这篇关于rabbitmq的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!