一个服务发送一个消息给 MQ,即消息中间件,比如 RocketMQ、RabbitMQ、Kafka、ActiveMQ 等等。然后,另一个服务从 MQ 消费到一条消息后进行处理。这就成了基于 MQ 的异步调用。
一、可靠消息最终一致性方案的核心流程
1、上游服务投递消息
如果要实现可靠消息最终一致性方案,一般可以自己写一个可靠消息服务,实现一些业务逻辑。
首先,上游服务需要发送一条消息给可靠消息服务。这条消息一般就是对下游一个服务接口的调用,里面包含了对应的一些请求参数。
然后,可靠消息服务就得把这条消息存储到自己的数据库里面取,状态为“待确认”。
接着,上游服务就可以执行自己本地的数据库操作,根据自己的执行结果,再次调用可靠消息服务接口。
如果本地数据库操作执行成功了,那么就找可靠消息服务确认那条消息。如果本地数据库操作失败了,那么就找可靠消息服务删除那条消息。
此时如果是确认消息,那么可靠消息服务就把数据库里的消息状态更新为“已发送”,同时将消息发送给 MQ。
这里有一个很关键的点,就是更新数据库里的消息状态和投递消息到 MQ。这俩操作,需要放在一个方法里,且开启本地事务。
1)如果数据库更新消息的状态失败了,那么就抛异常退出,就别投递到 MQ。
2)如果投递 MQ 失败报错了,那么就要抛异常让本地数据库事务回滚。
3)这俩操作必须一起成功,或者一起失败。
2、下游服务接收消息
下游服务就一直等着从 MQ 消费消息就好了,如果消费到了消息,就操作本地数据库。
如果操作成功了。就反过来通知可靠消息服务,说自己处理成功了,然后可靠消息服务就会把消息状态设置为“已完成”。
3、确保上游服务对消息的100%可靠投递
上面的流程的一个问题是,如果在上述投递消息的过程中各个环节出现了问题该怎么办?
1)如果上游服务给消息服务发送待确认消息的过程出错来了,那上游服务感知到调用异常,就不会执行下面的流程了。
2)如果上游服务操作完本地数据库之后,通知可靠消息服务确认消息或者删除消息的时候,出现了问题:比如没通知成功,或者没有执行很高,或者是可靠消息服务器没成功的投递到 MQ。这些的情况下,可靠消息服务的数据库里的状态会一直是“待确认”。此时,我们可以在可靠消息服务里开发一个后台定时运行的线程,不停的检查各个消息的状态。如果一直是“待确认”状态,就认为这个消息出了点什么问题。此时可以回调上游服务提供的接口,问问这个老是这个状态对应的数据执行成功没有,如果是执行成功了就将消息的发送状态改成“已发送”,同时投递消息到 MQ,不过这种情况下更多的可能是没有执行成功,此时将可靠消息服务将数据库中的这条消息删除即可。
通过以上两步,可以保证可靠消息服务一定会尝试完成消息到 MQ 的投递。
4、保证下游服务对消息的 100% 可靠接收
如果下游服务消费消息时出了问题,没有消费到,或者是下游服务对消息的处理失败了,怎么办?
其实也没有关系,在可靠消息服务里开发一个后台线程,不断的检查消息状态。如果消息状态一直是“已发送”,始终没有变成“已完成”,那就说明下游服务始终没有处理成功。此时可靠消息服务就可以再次尝试重新投递消息到 MQ,让下游服务再次处理。只要下游服务的接口逻辑实现幂等性,保证多次处理一个消息,不会插入重复数据即可。
5、基于 MQ 实现可靠消息最终一致性方案
在上面的通用方案里,完全依赖可靠消息服务的各种自检机制来确保:
1)如果上游服务的数据库操作没有成功,下游服务是不会收到任何通知的
2)如果上游服务的数据库操作成功了,可靠消息服务会确保将一个调用消息投递给下游服务,而且一定会确保下游服务一定会成功处理这条消息。
通过这套机制,保证了基于 MQ 的异步调用/通知的服务间的分布式事务保障。
其中,阿里开源的 RocketMQ,就是实现了可靠消息服务的所有功能,核心思想跟上面类似。只是 RocketMQ 为了保证高并发、高可用、高性能,做了比较复杂的架构实现,非常优秀。。。
二、可靠消息最终一致性方案的高可用保障生产实践
实际落地生产的时候,如果没有高并发场景的,完全可以参照上面的思路自己基于某个 MQ 中间件开发一个可靠消息服务,如果有高并发场景的,可以用 RocketMQ 的分布式事务支持,上面的那套流程都可以实现。
这套方案里保障高可用性最大的一个依赖点,就是 MQ 的高可用。
任何一种 MQ 中间件都有一整套的高可用保障机制,无论是 RabbitMQ、RocketMQ 还是 Kafka。
如果 MQ 集群整体故障,完全不可用时,就会导致业务系统的各个服务之间无法通过 MQ 来投递消息,导致业务流程中断。
MQ 服务不可用时,服务降级怎么做?
1、故障感知
比如,连续10次尝试投递 MQ 都是异常出错,网络无法连接等问题,说明 MQ 故障不可用,此时触发降级开关。然后根据降级开关来判断这次是写 MQ 还是写 Redis。
2、基于KV存储中队列的降级方案
用 Redis 作为消息继续投递的替代品。
3、下游服务消费 MQ 的降级感知
下游服务消费 MQ 也是通过判断降级开关是不是打开了,来判断自己是从 MQ 消费数据还是从 Redis 取数据。
4、故障恢复
如果降级开关打开以后,需要每隔一段时间尝试给 MQ 投递一个消息,以判断其是否已经恢复。
如果 MQ 已经恢复可以正常投递消息,此时就可以通过关闭降级开关,然后消息继续投递到 MQ,下游服务在确认 KV 存储的各个队列中已经没有数据之后,就可以重新切换为从 MQ 消费消息。
5、更多
上面说的那套方案是通用的降级方案,具体可以根据业务特点来设计。
比如在投入 MQ 时,尽可能的确保数据符合规范,可以被下游服务正确消费,否则下游服务会一直出错。这个就像是 Redis 的事务原理一样,保证数据的正确性,那他就可以大概率的被成功执行,增加 redis 事务的成功率。
比如在服务降级时可以根据业务和代码特点做开关,开关可以用 zookeeper 监听,可以每次使用前从 redis 获取降级开关值。
各种步骤,其实可以达到相同目的的,都可以 “平替”。
摘自于:https://blog.csdn.net/weixin_30978387/article/details/112166141