项目中用到了kafka消息队列,在开发测试过程中发现了消息端设置的最大重试次数失效的情况,具体信息如下:
消费者config文件
@Configuration @EnableKafka @Slf4j public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> demoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(demoContainerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 当使用批量监听器时需要设置为true factory.setBatchListener(false); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 最大重试次数3次 SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> { log.error("消费消息异常.抛弃这个消息,{}", consumerRecord.toString(), e); }, 3); factory.setErrorHandler(seekToCurrentErrorHandler); return factory; }
消费者业务代码
@Component @Slf4j public class DemoSingleConsumer { @Autowired private DemoHandler demoHandler; /** * 监听 topic 进行单条消费 */ @KafkaListener(topics = {KafkaConstants.TOPIC}, groupId = KafkaConstants.GROUPID, containerFactory = "demoContainerFactory", errorHandler = "listenErrorHandler") public void kafkaListener(ConsumerRecord<String, String> message) { log.info("消费消息开始 msg={}", JSONUtil.toJSONString(message.value())); SendMessage message = JSONUtil.parseObject(message.value(), ASendMessage.class); try { demoHandler.process(message); } catch (Throwable e) { log.error("消息消费异常,messageBody={}", JSONObject.toJSONString(message.value()), e); } }
1.kafkatemplate无法注入_kafka消费无限重试问题排查
2.kafka专题:kafka的消息丢失、重复消费、消息积压等线上问题汇总及优化
3.Kafka常见的导致重复消费原因和解决方案