C/C++教程

RocketMQ高阶业务问题及解决方案

本文主要是介绍RocketMQ高阶业务问题及解决方案,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

RocketMq全链路消息零丢失方案

  • 发送消息到mq零丢失: 事务消息
  • Broker 存储消息零丢失:同步刷盘+主从机制
  • Consumer 消费消息零丢失:手动提交offset + 自动故障转移

Broker消息零丢失方案:同步刷盘 + Raft协议主从同步

Broker 是负责存储消息的,怎么保证消息发送到Broker后,一定不会丢失呢?

刷盘失败

首先RocketMq一般情况下,为了保证高吞吐量,使用的是异步刷盘策略。但是这种策略会出现消息写入os cache成功,但是异步写入磁盘的时候失败了。那么这条消息就丢失了。 所以需要改成同步刷盘,使用这种策略后,只要Broker 告诉我们消息发送成功,那么消息就一定被写入磁盘了。

磁盘损坏

只要消息写入到磁盘,消息就一定不会丢失吗? 显然不是的,如果磁盘文件损坏的话,这些消息也就丢失了。 所以必须使用 Broker 主从架构,也就是说让一个 Master Broker 有一个 Slave Broker去同步主节点的数据,而一条消息写入成功,必须让Slave Broker也写入成功,保证数据有多个副本冗余。 这样的话,就算Master Broker的磁盘损坏了,也还有Slave Broker的数据

Consumer消息零丢失方案:手动提交offset + 自动故障转移

Consumer消费消息的时候,理论上将也是有可能丢失消息的。 比如当 Consumer 收到消息后,还没进行业务处理,就直接返回成功,提交offset,接着Consumer 就崩溃了,业务还没处理完,Broker 收到 offset 却以为 Consumer 消费成功了,那么这条消息就丢失了。 所以,在RocketMq中,是先处理业务,然后最后在返回CONSUME_SUCCESS,这样的话,即使处理业务的时候,消费者挂了,只要没返回CONSUME_SUCCESS,Broker都认为这个消息还没被消费,还会再次push。 当Broker 感知到消费者挂掉的时候,它会把该机器没处理完的消息,交给消费组里的另一台机器去消费,这种方式实现了故障转移。

顺序消息

通常情况下MQ的消息是无序的,因为MQ会根据算法把消息发送到不同的MessageQueueConsumer也会开启多线程去进行消费,因此并不难保证消息有序。 但是,在一些场景,我们又需要消息有序。例如订单create update delete这几个状态要发送消息给其他业务方,业务方希望消息严格按照订单这几个状态的顺序来发送,否则可能会出现先更新订单,发下订单不存在的情况。

同一个MessageQueue

如果要想保证消息有序,首先要让同一个订单的消息都进入到同一个MessageQueue中,MessageQueue是先进先出的,可以保证订单的消息在该队列中有序。 那么如何让同一个订单的消息都进入到同一个MessageQueue中呢? RocketMq提供了MessageQueueSelector类,我们可以按照订单ID对MessageQueue的数量取模,然后发送消息的时候指定这个MessageQueue。

Consumer单线程消费

虽然我们保证了同一个订单ID下的消息都在同一个MessageQueue中,但是Consumer默认是开启多线程消费的,如果消费订单创建消息的时候超时了,那么还是不能保证消息有序。 RocketMq提供了MessageListenerOrderly监听器,该类可以保证对每一个MessageQueue都使用一个线程去消费。底层是通过ConcurrentHashMap来加锁,使得同一时间只有一个线程可以消费。

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}
复制代码
//ConsumeMessageOrderlyService.java
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
            //...
            }
复制代码

Consumer消费失败怎么办

思考一个问题,如果consumer消费的时候失败了,我们返回重试,这时候消息的顺序就乱了,这种情况要如何处理呢? RocketMQ提供了SUSPEND_CURRENT_QUEUE_A_MOMENT状态,当返回这个状态后,MQ会暂停一段时间再消息,不会把消息放入重试队列。

延迟消息

RocketMQ提供了延迟消息的功能,非常方便。其实它的实现原理就是给延迟的消息新开一个队列。

  • 消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-1
  • 消息存储到SCHEDULE_TOPIC_XXXX上,把原有主题设置成属性。
  • 定时任务DeliverDelayedMessageTimerTask每隔1秒从SCHEDULE_TOPIC_XXXX获取消息。一个 task 处理一个级别的延时消息
  • 根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中,供消费者消费。

MQ消息中有百万积压怎么处理

假设在订单场景中,我们的消费者挂掉了,而订单量是很巨大的,在短时间内就堆积了几百万条消息,这种情况该怎么处理呢?

  • 根据MessageQueue的数量,扩充消费者机器 需要注意的是,机器和线程数量增大后,可能会对数据库造成成倍的压力!
  • 增加消费者线程数
  • 如果不能增加机器,则修改代码,新增一个 Topic,把积压的消息写入新的Topic中,部署多台Consumer去消费新的Topic

消息队列崩溃怎么办

在一些金融级场景,由于涉及到金钱,因此服务一定要高可用,但是如果我们的消息队列崩溃了,服务却依赖消息队列发送消息,这时要怎么处理呢? 针对这种场景,通常要在生产者的系统中设计高可用的降级方案,比如在发送MQ的代码里try catch捕获异常,如果发现有异常,进行重试。 如果发现超过3次都是失败的,这时候可能MQ已经崩溃了,此时必须把这条消息进行持久化,可以存储到DB、nosql(如redis的list结构)、磁盘文件中等等。 然后开启一个后台定时任务,去尝试把失败持久化的消息重新发送到MQ。

这里必须按照顺序发送,存储时也要按照顺序存储

为什么要给RocketMQ增加消息限流功能保证其高可用性

限流功能其实是对MQ系统的一种保护。 如果某个程序员代码里写了个bug,死循环不停的往MQ里写消息,并且如果有10台机器的话,那可能没一会MQ系统就被打挂了,影响到其他业务系统的使用。 因此,一般可以先通过压测测一下你的MQ最多可以抗多少QPS,然后做好限流。

这篇关于RocketMQ高阶业务问题及解决方案的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!