项目遇到一个问题,kafka服务器被下电了4天,消息保留策略设置的24小时。服务器上电后,消息过期,消费者拿不到消息。模拟这种场景,复现下
log.retention.minutes=1
设置消息保留时间为1分钟
log.retention.hours
也可以设置多个小时,默认是168个小时,7天。
log.retention.check.interval.ms=10000
同时设置检查过期消息间隔为10秒,为了测试
public void sendMsg() { int messageNo = 1; try { for(;;) { String messageStr="你好,这是第"+messageNo+"条数据"; producer.send(new ProducerRecord<String, String>(topic, messageStr)); System.out.println("发送的信息:" + messageStr); Thread.sleep(1000); messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
通过java生产者想kafka发送消息
过两分钟后,消费者消费
public void receiveMsg() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } }else{ Thread.sleep(1000); } consumer.commitSync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } }
可以看到消费者获取不到消息,应为消息已经过期