首先接到项目反馈使用 RocketMQ 会出现如下错误:
错误信息关键点:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。
由于项目组并没有对消息发送失败做任何补偿,导致丢失消息丢失,故需要对这个问题进行深层次的探讨,并加以解决。
首先我们根据关键字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查询,去探究在什么时候会抛出如上错误。根据全文搜索如下图所示:
该方法是在 BrokerFastFailure 中定义的,通过名称即可以看成其设计目的:Broker端快速失败机制。
Broker 端快速失败其原理图如下:
消息发送者向 Broker 发送消息写入请求,Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。
Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。
如果 Broker 端受到内存抖动等因素造成单条写入数据发生抖动,单个 Broker 端积压的请求太多还得不到及时处理,会极大的造成客户端消息发送的延长时间,设想一下,如果由于 Broker 压力增大,写入一条消息需要500ms甚至超过1s,并且队列中积压了5000条消息,消息发送端的默认超时时间为3s,如果按照这样的速度,这些请求在轮到 Broker 执行写入请求时,客户端已经将这个请求超时了,这样不仅会造成大量的无效处理,还会导致客户端发送超时。
故 RocketMQ 为了解决该问题,引入 Broker 端快速失败机制,即开启一个定时调度线程,每隔10毫秒去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200 ms,就会取消该队列中所有已超过200ms的请求,立即向客户端返回失败,这样客户端能尽快进行重试,因为 Broker 都是集群部署,下次重试可以发送到其他 Broker 上,这样能最大程度保证消息发送在默认3s的时间内经过重试机制,能有效避免某一台 Broker 由于瞬时压力大而造成的消息发送不可用,从而实现消息发送的高可用。
从 Broker 端快速失败机制引入的初衷来看,快速失败后会发起重试,除非同一深刻集群内所有的 Broker 都繁忙,不然消息会发送成功,用户是不会感知这个错误的,那为什么用户感知了呢?难道 TIMEOUT_CLEAN_QUEUE 错误,Broker 不重试?
为了解开这个谜团,接下来会采用源码分析的手段去探究真相。接下来将以消息同步发送为例揭示其消息发送处理流程中的核心关键点。
MQ Client 消息发送端首先会利用网络通道将请求发送到 Broker,然后接收到请求结果后并调用 processSendResponse 方法对响应结果进行解析,如下图所示:
在这里,RemotingCommand 的 code 为
RemotingSysResponseCode.SYSTEM_BUSY。
我们从 proccessSendResponse 方法中可以得知,如果 code 为 SYSTEM_BUSY,该方法会抛出 MQBrokerException,响应 code 为 SYSTEM_BUSY,其错误描述为开头部分的错误信息。
那我们沿着该方法的调用链,可以找到其直接调用方为:DefaultMQProducerImpl 的 sendKernelImpl,我们重点考虑如果底层方法抛出 MQBrokerException 该方法会如何处理。
其关键代码如下图所示:
可以看出在 sendKernelImpl 方法中首先会捕捉异常,先执行注册的钩子函数,即就算执行失败,对应的消息发送后置钩子函数也会执行,然后再原封不动的将该异常向上抛出。
sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法调用,下面是其核心实现截图:
从这里可以看出 RocketMQ 消息发送高可用设计一个非常关键的点,重试机制,其实现是在 for 循环中 使用 try catch 将 sendKernelImpl 方法包裹,就可以保证该方法抛出异常后能继续重试。从上文可知,如果 SYSTEM_BUSY 会抛出 MQBrokerException,但发现只有上述几个错误码才会重试,因为如果不是上述错误码,会继续向外抛出异常,此时 for 循环会被中断,即不会重试。
这里非常令人意外的是连 SYSTEM_ERROR 都会重试,却没有包含 SYSTEM_BUSY,显然违背了快速失败的设计初衷,故笔者断定,这是 RocketMQ 的一个BUG,将 SYSTEM_BUSY 遗漏了,后面与 RocketMQ 核心成员进行过沟通,也印证了这点,后续会提一个PR,在上面增加一行代码,将 SYSTEM_BUSY 加上即可。
问题分析到这里,该问题应该就非常明了。
如果大家在网上搜索 TIMEOUT_CLEAN_QUEUE 的解决方法,大家不约而同提出的解决方案是增加 waitTimeMillsInSendQueue 的值,该值默认为200ms,例如将其设置为1000s等等,以前我是反对的,因为我的认知里 Broker 会重试,但现在发现 Broker 不会重试,故提高该值能有效的缓解。
但这是并不是好的解决方案,我会在近期向官方提交一个PR,将这个问题修复,建议大家在公司尽量对自己使用的版本进行修改,重新打一个包即可,因为这已经违背了 Broker 端快速失败的设计初衷。
但在消息发送的业务方,尽量自己实现消息的重试机制,即不依赖 RocketMQ 本身提供的重试机制,因为受制与网络等因素,消息发送不可能百分之百成功,建议大家在消息发送时捕获一下异常,如果发送失败,可以将消息存入数据库,再结合定时任务对消息进行重试,尽最大程度保证消息不丢失。
声明本文为转载文章,版本归原创所有|侵删
作者:丁威
来源:中间件兴趣圈
链接:https://my.oschina.net/u/4052033/blog/4287177
如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失。所以我们要在消费者处理消息失败的时候,重试一定的次数。比如重试3次,如果重试3次之后还是失败,则把这条消息发送到死信队列。
根据业务需求处理失败了的数据。比如保存到失败文件或者数据库等,也可以人工处理后,重新发送给exchange@normal。
如何学习java ,推荐两个白嫖学习的资料:
1、书籍:
codeGoogler/ProgramBooks
2:视频教程:
全网免费Java资源下载SpringBoot、Spring、Mybatis、Redis、RabbitMQ、SpringCloud、高并发(持续更新)