Kakfa消费滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag。所谓滞后程度,就是指消费者当前落后于生产者的程度。
比方说,Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。
一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
更可怕的是,由于消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格。这样的话,消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,进而出现马太效应,即那些 Lag 原本就很大的消费者会越来越慢,Lag 也会越来越大。
有 3 种方法监控kafka消费进度。
1.使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息
有时候会有这种,说明运行 kafka-consumer-groups 脚本时没有启动消费者程序,消费者组里没有活跃的成员。但一样可以看到lag值。
2.使用 Kafka Java Consumer API 编程。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理 ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } }
注意第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;第 2 处则是获取订阅分区的最新消息位移;最后 1 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。这段代码只适用于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中没有 AdminClient.listConsumerGroupOffsets 方法。
3.使用 Kafka 自带的 JMX 监控指标。
Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。
Lag 值的含义我们已经反复讲过了,我就不再重复了。这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。
为什么要有lead值?监控到 Lag 越来越大,可能只会给你一个感受,那就是消费者程序变得越来越慢了,至少是追不上生产者程序了,除此之外,你可能什么都不会做。毕竟,有时候这也是能够接受的。但反过来,一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了。
Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
所以比较推荐用自带的 JMX 监控指标监控