消息队列MQ

RabbitMQ学习

本文主要是介绍RabbitMQ学习,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

RabbitMQ学习

使用场景

  1. 消息队列解决什么问题?

    1. 异步处理
    2. 应用解耦
    3. 流量削锋
    4. 日志处理

安装与配置

用户及vhost配置

添加用户

在这里插入图片描述

virtual host管理

在这里插入图片描述

开发指南

Simple简单队列

模型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ItrbYtxD-1632320692103)(images/python-one-overall.png)]

P:消息生产者

红色:阶列

C:消息消费者

不足

耦合性高,生产者—消费者一一对应。队列名变更都得变理

工作队列

模型

在这里插入图片描述

为什么会出现工作队列

Simple队列是一一对应的,而且我们实际开发,生产者发送消息是不费力的,而消费者一般是跟业务相结合的。,消息者接收到消息之后就需要处理。需要花费时间。队列就需要更多的消费者。

现象:

消费者1和2处理的消息是一样的

消费者1:奇数

消费者2:偶数

其实是一个轮询分发(roun-robin)

公平分发(Fail dispatch)

使用公平发分要关闭autoACK,改成手动。

公共的消费类

public class BHFailCustomConsumer extends DefaultConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(BHFailCustomConsumer.class);

    private String consumerName = "defaultConsumer";

    private long delayTime = 100L;

    public BHFailCustomConsumer(Channel channel,long delayTime, String ...name) {
        super(channel);
        try {
            //接收消息,在没有应答前只接收1条消息
            channel.basicQos(1);
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.delayTime = delayTime;
        if(name!=null) {
            this.consumerName = name[0];
        }else{
            this.consumerName = "defaultConsumer "+ UUID.randomUUID().toString();
        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body,"utf-8");
        LOGGER.debug(this.consumerName +"_"+msg);
        try {
            Thread.sleep(delayTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //手动应答
        getChannel().basicAck(envelope.getDeliveryTag(),false);
    }
}

C1,C2,使用不是的delayTime延迟

Connection connection = messageBrokerHelper.getConnection();
        Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
        Consumer consumer = new BHFailCustomConsumer(channel,delayTime,name);
        try {
            LOGGER.info("Consumer {} waiting consume,delay time {}",name,delayTime);
            //关掉自动应答,由消费者手动处理应答
            channel.basicConsume(QUEUE_NAME,false,consumer);
        } catch (IOException e) {
            LOGGER.debug("consumer listener failure",e);
        }

W 生产者

 public void sender(int senderCount){
        Connection connection = messageBrokerHelper.getConnection();
        Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
        try {
            int senderTime = senderCount < 1 ? 1 : senderCount;
            for (int i=0;i<senderTime;i++) {
                String body = "hello,rabbit mq" + i;
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            messageBrokerHelper.closeChannel(channel);
            messageBrokerHelper.closeConnection(connection);
        }

    }

消息应答与持久化

订阅模式

模型

在这里插入图片描述

  1. 一个生产者,多个消费者,每个消费者有自己报价列
  2. 生产者没有直接把消息发送到队列,而是发到了交换机,转发器exchange
  3. 每个队列都要绑定到交换机上
  4. 生产者发送的消息,经过交换机,到达队列。实现一个消息被多个消费者所消费。

场景:

注册->邮件->短信

生产者

消费者

exchange(交换机 转发器)

接收生产者消息,并接收到的消转发给队列

fanout:不处理路由键
在这里插入图片描述

Direct:处理路由键

在这里插入图片描述

路由模式

在这里插入图片描述
在这里插入图片描述

Topic模式

在这里插入图片描述

‘# 匹配一个或者多个

*匹配一个

RPC模式

在这里插入图片描述

消息确认机制(事务+confirm)

两种方式:

AMQP实现了事务机制

Confirm模式

事务机制

  • txselect

    用户将当前channel设置成transation模式

  • txCommit

用于提交事务

  • txRollback

回滚事务

Confirm模式

生产者的实现原理

开启confirm模式

这篇关于RabbitMQ学习的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!