消息的消费一般有两种模式,推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。kakfa采用的是拉模式,这样可以很好的控制消费速率。那么kafka消费的具体工作流程是什么样的呢?kafka的位移管理又是怎么样的呢?
kafka
是以消费者组进行消费,一个消费者组,由多个consumer
组成,他们和topic
的消费规则如下:
topic
的一个分区只能被消费组中的一个消费者消费。通过这种分组、分区的消费方式,可以提高消费者的吞吐量,同时也能够实现消息的发布/订阅模式和点对点两种模式。
消费者消费总体分为两个步骤,第一步是制定消费的方案,就是这个组下哪个消费者消费哪个分区,第二个是建立网络连接,获取消息数据。
一、制定消费方案
consumerA
,consumerB
, consumerC
向kafka
集群中的协调器coordinator
发送JoinGroup
的请求。coordinator
主要是用来辅助实现消费者组的初始化和分区的分配。coordinator
老大节点选择 = groupid
的hashcode
值 % 50( __consumer_offsets
内置主题位移的分区数量)例如: groupid
的hashcode值 为1,1% 50 = 1
,那么__consumer_offsets
主题的1号分区,在哪个broker
上,就选择这个节点的coordinator
作为这个消费者组的老大。消费者组下的所有的消费者提交offset
的时候就往这个分区去提交offset
。consumer
作为消费中的leader
,比如上图中的consumerB
。leader
制定出消费方案,比如谁来消费哪个分区等,有Range
分区策略、RoundRobin
分区策略等。coordinator
coordinator
就把消费方案下发给各个consumer
, 图中只画了一条线,实际上是会下发到各个consumer
。二、消费者消费细节
现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。
ConsumerNetworkClient
, 发送消费请求,可以进行如下配置:fetch.min.bytes
: 每批次最小抓取大小,默认1字节fetch.max.bytes
: 每批次最大抓取大小,默认50M
fetch.max.wait.ms
:最大超时时间,默认500ms
kafka
集群completedFetches
队列中max.poll.records
一次拉取数据返回消息的最大条数,默认500条。offset
,也就是这个消费者消费到什么位置了,这样下次重启也可以继续从这个位置开始消费,关于offset
的管理后面详细介绍。前面简单提到了消费者组初始化的时候会对分区进行分配,那么具体的分配策略是什么呢,也就是哪个消费者消费哪个分区数据?
kafka有四种主流的分区分配策略: Range
、RoundRobin
、Sticky
、CooperativeSticky
。可以通过配置参数partition.assignment.strategy
,修改分区的分配策略。默认策略是Range + CooperativeSticky
。Kafka可以同时使用多个分区分配策略。
Range
分区策略
Range
分区 是对每个 topic
而言的。对同一个 topic
里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。partitions
数/consumer
数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。如上图所示:有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
这种方式容易造成数据倾斜!如果有 N 多个 topic
,那么针对每个 topic
,消费者 C0都将多消费 1 个分区,topic
越多,C0消费的分区会比其他消费者明显多消费 N 个分区。
RoundRobin
针对集群中所有topic
而言,RoundRobin
轮询分区策略,是把所有的 partition
和所有的consumer
都列出来,然后按照 hashcode
进行排序,最后通过轮询算法来分配 partition
给到各个消费者。
Sticky
是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance
会尽量保持原有分配的分区不变化,这样可以节省开销。
Cooperative Sticky
和Sticky
类似,但是它会将原来的一次大规模rebalance
操作,拆分成了多次小规模的rebalance
,直至最终平衡完成,所以体验上会更好。
关于什么是rebalance
继续往下看你就知道了。
上面也提到了rebalance
,也就是再均衡。当kafka发生下面的情况会进行在均衡,也就是重新给消费者分配分区:
Group Coordinator
发送心跳等情况时,GroupCoordinato
r 会认为消费者己下线。 Group Coorinator
节点发生了变更。 消费者需要保存当前消费到分区的什么位置了,这样哪怕消费者故障,重启后也能继续消费,这就是消费者的维护offset管理。
一、消费者位移offset存储位置
消费者位移offset
存储在哪呢?
kafka0.9
版本之前,consumer
默认将offset
保存在Zookeeper
中consumer
默认将offset
保存在Kafka
一个内置的topic
中,该topic
为__consumer_offsets
,这样可以大量减少和zookeeper
的交互。__consumer_offsets
主题里面采用 key
和 value
的方式存储数据。key
是 group.id+topic+
分区号,value
就是当前 offset
的值。如何查看__consumer_offsets
主题内容?
config/consumer.properties
中添加配置 exclude.internal.topics=false
,默认是 true
,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false
。__consumer_offsets
。bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 -- consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm atter" --from-beginning ## topic1 1号分区 [offset,topic1,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None) ## topic1 0号分区 [offset,topic1,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
二、消费者位移offset提交保存模式
消费者是如何提交保存位移offset呢?
为了使我们能够专注于自己的业务逻辑,kafka默认提供了自动提交offset
的功能。这个由消费者客户端参数 enable.auto.commit
配置, 默认值为 true
。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms
配置,默认值为 5 秒。
poll()
方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动提交会带来什么问题?
自动提交消费位移的方式非常简便,但会带来是重复消费的问题。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象。
我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样 并不能避免重复消费的发送,而且也会使位移提交更加频繁。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更 加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费。手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。
// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer
中的 commitSync()
和 commitAsync()
两种类型的方法。
同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),它必须等待offset
提交完毕,再去消费下一批数据。
// 同步提交 offset consumer.commitSync();
异步提交则没有失败重试机制,故有可能提交失败。它发送完提交offset请求后,就开始消费下一批数据了。
// 异步提交 offset consumer.commitAsync();
那么手动提交会带来什么问题呢?可能会出现"漏消息"的情况。
设置offset
为手动提交,当offset
被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
我们可以通过消费者事物来解决这样的问题。
其实无论是手动提交还是自动提交,都有可能出现消息重复和是漏消息,与我们的编程模型有关,需要我们开发的时候根据消息的重要程度来选择合适的消费方案。
一个正常的消费逻辑需要具备以下几个步骤:
(1)配置消费者客户端参数及创建相应的消费者实例;
(2)订阅主题;
(3)拉取消息并消费;
(4)提交消费位移 offset
;
(5)关闭消费者实例。
public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定义 kakfa 服务的地址,不需要将所有 broker 指定上 props.put("bootstrap.servers", "doitedu01:9092"); // 制定 consumer group props.put("group.id", "g1"); // 是否自动提交 offset props.put("enable.auto.commit", "true"); // 自动提交 offset 的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key 的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value 的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none props.put("auto.offset.reset","earliest"); // 定义 consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费者订阅的 topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("first", "test","test1")); while (true) { // 读取数据,读取超时时间为 100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
consumer.subscribe(Arrays.asList(topicl )); consumer subscribe(Arrays.asList(topic2))
如果消费者采用的是正则表达式的方式(subscribe(Pattern)
)订阅, 在之后的过程中,如果 有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到 新添加的主题中的消息。
consumer.subscribe(Pattern.compile ("topic.*" ));
消费者不仅可以通过 KafkaConsumer.subscribe()
方法订阅主题,还可直接订阅某些主题的指定分区。
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;
通过unsubscribe()
方法采取消主题的订阅。
consumer.unsubscribe();
kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll()
方法, poll()
方法返回的是所订阅的主题(分区)上的一组消息。
对于 poll ()
方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。
public ConsumerRecords<K, V> poll(final Duration timeout)
超时时间参数 timeout
,用来控制 poll()
方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。
有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer
中的 seek(
方法正好提供了这个功能,让我们可以追前消费或回溯消费。
public void seek(TopicPartiton partition,long offset)
最后我们总结一下消费者中重要的参数配置。
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
kafka消费是很重要的一个环节,本文总结kafka消费者的一些重要机制,包括消费者的整个流程,消费的分区策略,消费的再平衡以及消费的位移管理。在明白这些机制以后,简单讲解了如何使用消费者consumer
的API以及消费者中重要的参数。
欢迎关注个人公众号【JAVA旭阳】交流学习!