Java教程

spring-kafka消费者源码分析

本文主要是介绍spring-kafka消费者源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

https://blog.csdn.net/qq_26323323/article/details/84938892

这篇文章对spring-kafka消费端源码分析较为详细,可查看其customer初始化的过程。

整个初始化的开始是从@EnableKafka开始讲起的

初始化的工程归纳如下:

       得到一个含有KafkaListener基本信息的Endpoint,最后Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,也就是一个list中

在afterPropertiesSet方法中遍历endpointDescriptors,并执行对应的方法。

  1. 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers
    1. 初始化Container的"topics", "topicPartitions", "topicPattern","messageListener", "ackCount", "ackTime"等参数
      1. 默认ConcurrentKafkaListenerContainerFactory.createContainerInstance()

      2.  由该方法可知,最终创建的容器是ConcurrentMessageListenerContainer,根据用户设定的参数

  2. 如果我们的KafkaListener注解中有对应的group信息,则将container添加到对应的group中

@KafkaListener注解变为ConcurrentMessageListenerContainer类,这个Container中包含了我们所需要的topic相关信息

ConcurrentMessageListenerContainer的分析

该类实现了lifeCycle接口

启动

  1. 将concurrency设置为topicPartitions大小
  2. 根据分片数创建KafkaMessageListenerContainer,每一个分片创建一个Container
  3. 启动Container,并添加其到containers中
    1. 获取对应的监听类,被包装到BatchMessagingMessageListenerAdapter中
    2. 根据this.listener, this.acknowledgingMessageListener创建对应的listenerConsumer
    3. 将listenerConsumer作为一个任务提交

最终将我们@KafkaListener中的topicPartitions

 @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})
    转换为ListenerConsumer,当concurrency的值大于partitions的数量时,每一个partition生成一个ListenerConsumer。

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者

if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}

ListenerConsumer的工作

  1. 构造Consumer
  2. subscribe对应的topic

总结:通过构造方法,ListenerConsumer完成了Consumer的创建以及topic和partition的监听

轮询拉取的业务逻辑

  1. 不停轮询实现topic的消费
    1. 使用poll方法拉取数据
    2. 获取到数据后执行监听器的回调方法

提交偏移量:

那么消费者是如何提交偏移量的呢?消费者往一个 叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处,因为每个消费者都会在内存中记录自己消费到哪里了。不过,如果悄费者发生崩溃或者有新 的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新 的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交 的偏移量,然后从偏移量指定的地方 继续处理。

提交偏移量有很多种方式

kafka自动提交

最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,那么每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。所以这里auto.commit.interval.ms参数是两次提交偏移量的最小时间间隔,因为提交偏移量也是在poll中实现的,是每次poll的时候判断当前时间和上次提交的时间差是否大于等于5s,是,则需要提交偏移量,否则,本次轮询不需要提交偏移量。源码如下:

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。

在使用自动提交时 ,每次调用轮询方法都会把上一次调用返 回的偏移量提交上去,它并不 知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回 的消息 都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。 一般情况下不会有什么问 题,不过在处理异常或提前退出轮询时要格外小心

提交偏移量除了自动提交,还有手动提交,AckMode中是所有的提交模式枚举
public enum AckMode {
        // 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
        RECORD,
        // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
        BATCH,
        // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
        TIME,
        // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
        COUNT,
        // TIME | COUNT 有一个条件满足时提交
        COUNT_TIME,
        // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
        MANUAL,
        // 手动调用Acknowledgment.acknowledge()后立即提交
        MANUAL_IMMEDIATE,

拉取消息的过程

这篇博文写消费者分析很不错 https://blog.csdn.net/liyiming2017/article/details/89187474

KafkaConsumer有几个主要的组件:

1、消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。

2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。

3、ConsumerNetworkClient是对NetworkClient的封装,他对nio的组件进行封装,实现网络IO。

4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。

5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。
 

可参考的文献:

源码分析Kafka 消息拉取流程

https://cloud.tencent.com/developer/article/1551705

https://blog.csdn.net/evasnowind/article/details/108534598

kafka消费者--加入consumergroup流程

https://blog.csdn.net/asdfsadfasdfsa/article/details/104883173

自动提交是调用poll方法的时候顺便提交的,如果没有调用poll,时间到了也不会提交.

https://zhuanlan.zhihu.com/p/112745985

Kafka consumer消息的拉取及偏移的管理

https://blog.csdn.net/E_Possible/article/details/109564700

Kafka消费者源码解析之二Fetcher

https://blog.csdn.net/lt793843439/article/details/89634643

concurrency问题

https://blog.csdn.net/u010634066/article/details/109778491

https://hengheng.blog.csdn.net/article/details/107468648

https://blog.csdn.net/weixin_39672680/article/details/111744351

这篇关于spring-kafka消费者源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!