这三种场景都可能会产生消息的丢失,如下图所示:
那么如何保证消息的零丢失呢?
//注册消息监听器处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ //对消息进行处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
上面这段代码中,RocketMQ在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息
当你的消息处理完毕之后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
只有返回了CONSUME_SUCCESS,消费者才会告诉RocketMQ我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了
如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失
为了保证消息不会丢失,在consumeMessage方法中就直接写消息消费的业务逻辑就可以了,如果非要搞一些骚操作,比如下面的代码
//注册消息监听器处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ //开启子线程异步处理消息 new Thread() { ### 最后 给大家送一个小福利 ![](https://www.www.zyiz.net/i/ll/?i=img_convert/58031cadf4a2194c51223083c9f1b164.png) 资料附送高清脑图,高清知识点讲解教程,以及一些面试真题及答案解析。送给需要的提升技术、准备面试跳槽、自身职业规划迷茫的朋友们。 **[CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】](https://codechina.csdn.net/m0_60958482/java-p7)** ![](https://www.www.zyiz.net/i/ll/?i=img_convert/ec33e17b2f22b76ca9e289ad5520f21e.png) 0_60958482/java-p7)** [外链图片转存中...(img-UWBp8l2i-1630839381190)]