消息队列MQ

kafka消息保留策略设置

本文主要是介绍kafka消息保留策略设置,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

项目遇到一个问题,kafka服务器被下电了4天,消息保留策略设置的24小时。服务器上电后,消息过期,消费者拿不到消息。模拟这种场景,复现下

log.retention.minutes=1

设置消息保留时间为1分钟

log.retention.hours

也可以设置多个小时,默认是168个小时,7天。

log.retention.check.interval.ms=10000

同时设置检查过期消息间隔为10秒,为了测试

    public void sendMsg() {
        int messageNo = 1;
        try {
            for(;;) {
                String messageStr="你好,这是第"+messageNo+"条数据";
                producer.send(new ProducerRecord<String, String>(topic,  messageStr));
                System.out.println("发送的信息:" + messageStr);
                Thread.sleep(1000);
                messageNo++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

通过java生产者想kafka发送消息

过两分钟后,消费者消费

    public void receiveMsg() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {
                        System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                    }
                }else{
                    Thread.sleep(1000);
                }
                consumer.commitSync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

可以看到消费者获取不到消息,应为消息已经过期

这篇关于kafka消息保留策略设置的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!