笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》
内容还会不断充实~
传统定义:
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
最新定义:
Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
解耦
允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况
灵活性&峰值处理能力
使系统能够应对突发的高峰流量
异步通信
允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理
点对点模式
一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。
发布/订阅模式
一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费
发布/订阅模式中,又分为两种:
消费者主动拉取消息
Kafka就是属于这种类型
优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息
弊端:需要轮询,查看队列中是否有消息,浪费资源
队列推送消息
类似于公众号推送
弊端:
推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)
zk在这里的作用:
查看该机器上所有topic:
kafka-topics.sh --list --zookeeper ip:zk端口
创建topic:
kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数 #注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数 #副本数 包括 leader 和 follower
删除topic:
kafka-topics.sh --delete --topic first --zookeeper ip:zk端口 #注:执行效果: #Topic first is marked for deletion. 标记为删除 #Note: This will have no impact if delete.topic.enable is not set to true. 只有当delete.topic.enable设为true时才会真正删除
查看topic详情:
kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称
生产者发送消息
kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口
消费者消费消息
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费 #或者: kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费
说明:
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord
如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。
在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。
ProducerBatch的大小和batch.size的关系:
当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。
如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;
如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。
我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据
元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
元数据的更新(二者满足其一即可触发更新):
当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。
参数名称 | 说明 |
---|---|
bootstrap.servers | 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 |
buffer.memory | RecordAccumulator 缓冲区总大小, 默认 32m。 |
batch.size | 缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据, Leader 收到数据后应答。 -1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字 |
retries | 当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性, 默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。 |
分区原则
发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性
源码
//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取当前topic的partition数目 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //没有指定key if (keyBytes == null) { int nextValue = this.nextValue(topic); //当前topic存活的partition数 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //从存活的partition里选取一个partition返回 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { //选取一个partition返回 return Utils.toPositive(nextValue) % numPartitions; } } else { //有key值,将key的hash值与当前topic的partition总数进行取余得到partition值 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
分区原则
源码
DefaultPartitioner:
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size()); } /** * Compute the partition for the given record. * * @param topic The topic name * @param numPartitions The number of partitions of the given {@code topic} * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { if (keyBytes == null) { //没有指定key,采用粘性分区策略 return stickyPartitionCache.partition(topic, cluster); } // hash the keyBytes to choose a partition //指定了key,使用key的哈希值与【分区总数】进行求模 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
StickyPartitionCache:
/** * An internal class that implements a cache used for sticky partitioning behavior. The cache tracks the current sticky * partition for any given topic. This class should not be used externally. */ public class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache; public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) { //尽可能使用上一个分区(所以叫黏性分区策略) Integer part = indexCache.get(topic); if (part == null) { //没办法了(该分区batch已满或已完成),找下一个分区 return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { //当前topic的分区数 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { //当前topic可用分区数 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { //没有可用分区,从所有分区里随机选一个算了 Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { //就一个可用的,别无选择 newPart = availablePartitions.get(0).partition(); } else { //有多个可用的,这就得挑三拣四一下了 while (newPart == null || newPart.equals(oldPart)) { //新分区不能和上一个分区一样,若是一样,就继续选!就是这么倔 int random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); //现在有多个可用分区,当然是从可用分区里选 newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } }
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。因此,kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡
acks参数配置:
图示
ISR
老版本中,follower进入ISR有两个条件:
0.9版本移除了条数限制,原因:
生产者一般都是批量发送数据,假设条数阈值为10条,但生产者一次就发来了12条,这时leader中比所有follower都多12条数据,所有follower都会被移除ISR,但很快一些follower同步完成,又会把他们移入ISR,ISR存在内存中,这就会导致频繁操作内存。而且kafka会将ISR信息写入zookeeper,这也会导致kafka频繁请求zookeeper。
开启参数 enable.idempotence ,默认为 true
// 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
如果未开启幂等性,且max.in.flight.requests.per.connection大于1的话,可能会出现:其中某条消息发送失败,在重试时,该消息后面的消息发送成功,导致乱序。
单分区内有序条件:
参数名称 | 说明 |
---|---|
replica.lag.time.max.ms | ISR 中, 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值, 默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是 10%。 每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。 |
log.index.interval.bytes | 默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间, 默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间, 分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间, 毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔, 默认是 5 分钟 |
log.retention.bytes | 默认等于-1,表示无穷大。 超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略 |
num.io.threads | 默认是 8。 负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。 数据传输线程数,这个参数占总核数的50%的 2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理 |
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
注:
这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。数据不丢失或不重复还得看ack、幂等性和事务
注:index文件中存储的是相对offset,绝对offset那一列只是为了方便看图加上的
.index文件中存储:
.log文件中存储:
注:.indx文件中存的内容是固定的,就是存消息偏移量、存储地址偏移量、消息大小等信息,所以在根据消息偏移量找对应消息时,可以直接采用 消息偏移量 * .index中单个内容的大小,快速找到要读取的消息信息地址,加快查询速度
参数名称 | 说明 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。 |
log.index.interval.bytes | 默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引 |
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:
日志超时后,Kafka提供的清理策略:
Kafka能够高效读写的原因
Kafka本身是分布式集群,可以采用分区技术,并行度高
读数据采用稀疏索引,可以快速定位要消费的数据
顺序写磁盘
页缓存 + 零拷贝技术
相关参数
参数名称 | 说明 |
---|---|
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。 一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
Consumer采用pull(拉)的方式从Broker中读取数据
kafka中没有数据时,pull模式可能会使消费者陷入空转。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时间即为timeout
一个消费者可以订阅一个或多个主题
subscribe
定义
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern)
对于该方法,可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题
如果前后两次订阅了不同的主题,那么消费者以最后一次的为准
如果采用正则表达式的方式订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息
assign
定义
public void assign(Collection<TopicPartition> partitions)
可直接订阅某些主题的特定分区
只有一个参数,用来指定订阅的分区集合
TopicPartition类
public class TopicPartition { private final int partition; private final String topic; //构造器、hashCode... }
注:
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费者组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。assign()方法订阅时无该功能。
KafkaConsumer中有方法:
public List<PartitionInfo> partitionsFor(String topic)
PartitionInfo:
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; // AR集合 private final Node[] replicas; //ISR集合 private final Node[] inSyncReplicas; //OSR集合 private final Node[] offlineReplicas; }
public void unsubscribe()
该方法可以取消通过subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过assign(Collection)方式实现的订阅。
注:如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法
订阅状态:
这些状态是互斥的,一个消费者只能出现其中一种,否则会报出IllegalStateException
public ConsumerRecords<K, V> poll(final Duration timeout)
timeout方法用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞
timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE
poll()方法返回值类型是ConsumerRecords,表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了iterator()方法来遍历消息集内部的消息:
public Iterator<ConsumerRecord<K, V>> iterator()
参数名称 | 说明 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名 |
group.id | 标记消费者所属的消费者组 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率, 默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest: 默认, 自动重置偏移量为最新的偏移量。 none:如果消费组原来的( previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数, 默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间, 默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间, 默认 45s。超过该值,该消费者被移除,消费者组执行再平衡 |
max.poll.interval.ms | 消费者处理消息的最大时长, 默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据 |
fetch.max.bytes | 默认 Default: 52428800( 50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig) or max.message.bytes (topic config) 影响 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数, 默认是 500 条 |
一个Consumer Group中有多个Consumer,一个Topic有多个Partition,所以必然会涉及到Partition的分配问题,即确定哪个Partition由哪个Consumer来消费
kafka分配策略:
可通过partition.assignment.strategy参数来设置分区策略,默认是Range + CooperativeSticky.(Kafka可同时使用多个分区策略)
当消费者组中消费者个数发生变更时,就会触发重新分配。即使消费者数目增加到大于分区数,也会重新分配
粘性分区定义: 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化
由于Consumer在消费过程中可能宕机,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了哪个offset,以便故障后能够继续消费。
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。 key 是 group.id+topic+分区号, value 就是当前 offset 的值。 每隔一段时间, kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
参数auto.offset.reset = earliest | latest | none
默认是 latest
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
kafkaConsumer.seek(TopicPartition, 指定消费的offset);
注意:每次执行完,需要修改消费者组名;
重复消费: 已经消费了数据,但是 offset 没提交。
漏消费: 先提交 offset 后消费,有可能会造成数据的漏消费。
需要用到的类:
/** * 简单发送 */ @Test public void testProducer() { KafkaProducer<String, String> producer = getKafkaProducer(); // 异步发送 producer.send(new ProducerRecord<>("first", "test", "hello kafka")); //同步发送 producer.send(new ProducerRecord<>("first", "test", "hello kafka")).get(); //关闭资源 producer.close(); } /** * 发送后触发回调函数 */ @Test public void testProducerWithCallBack() { KafkaProducer<String, String> producer = getKafkaProducer(); producer.send(new ProducerRecord<>("first", "test", "hello kafka"), new Callback() { //回调方法,会在producer收到ack时触发 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (Objects.isNull(e)) { //没有异常 System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } } }); //关闭资源 producer.close(); } private KafkaProducer<String, String> getKafkaProducer() { //配置 Properties properties = new Properties(); properties.put("bootstrap.servers","shangxiaoying.cn:9092"); properties.put("acks", "all"); //配置的key可以在ProducerConfig中找到 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //这里指定要使用的分区策略 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); return producer; }
public class CustomProducerTransactions { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // key,value 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置事务 id(必须),事务 id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } } }
编写类实现Partitioner接口,重写partition()方法
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 自定义选择分区的逻辑 return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
在生产者配置中指定分区策略
private KafkaProducer<String, String> getKafkaProducer() { //配置 ... properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //这里指定要使用的分区策略 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); return producer; } //发送消息 ...
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 所以 offset 的维护是 Consumer 消费数据时必须考虑的问题。
需要用到的类:
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。 自动提交 offset 的相关参数:
@Test public void testConsumer() { KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(); //订阅topic,可以订阅多个 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true) { //拉取消息,参数为没有消息时的等待时间 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println(record.toString()); } } } private KafkaConsumer<String, String> getKafkaConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01"); //开启自动提交(这里提交是指提交offset) properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //这里是反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties); return kafkaConsumer; }
虽然自动提交 offset 十分简洁便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。
因此 Kafka 还提供了手动提交 offset 的 API。 手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步 提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
@Test public void testConsumer() { KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(); //订阅topic,可以订阅多个 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true) { //拉取消息,参数为没有消息时的等待时间 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println(record.toString()); } //手动提交,不要忘记关掉自动提交 //同步提交,当前线程会阻塞直到 offset 提交成功 // kafkaConsumer.commitSync(); //异步提交 kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { //业务逻辑 } }); } } private KafkaConsumer<String, String> getKafkaConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01"); //关闭自动提交(这里提交是指提交offset) properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //这里是反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties); return kafkaConsumer; }
数据漏消费和重复消费:
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。
要解决上述问题,关键是要保证offset存储和数据处理的一致性。我们可以将offset存入MySQL,使得消息处理和offset存储在同一个事务中,从而保证一致性。
但是:
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。 当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
如何感知到是否发生Rebalance?
思路:
实现:
private static Map<TopicPartition, Long> currentOffset = new HashMap<>(); /** * 自定义存储offset */ @Test public void testCustomOffset() { KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(); //订阅topic,ConsumerRebalanceListener帮助我们实现自定义存储offset kafkaConsumer.subscribe(Collections.singletonList("first"), new ConsumerRebalanceListener() { //该方法会在 Rebalance 之前调用 @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { //Rebalance之前,将每个分区的最新offset提交,这里可以自定义提交至MySQL commitOffset(currentOffset); } //该方法会在 Rebalance 之后调用 @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { currentOffset.clear(); for (TopicPartition partition : collection) { kafkaConsumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费 } } }); while (true) { //最佳实践:将数据处理和offset保存放在一个事务中,从而保证数据不会丢失或重复 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//消费者拉取数据 for (ConsumerRecord<String, String> record : records) { System.out.printf(record.toString()); //维护currentOffset currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); } commitOffset(currentOffset); } } //获取某分区的最新 offset private static long getOffset(TopicPartition partition) { //业务逻辑,比如去MySQL中查询最新offset return 0; } //提交该消费者所有分区的 offset private static void commitOffset(Map<TopicPartition, Long> currentOffset) { //保存offset }
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
时间戳拦截器
public class TimeInterceptor implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), System.currentTimeMillis() + "-" + producerRecord.value()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
计数拦截器
public class CountInterceptor implements ProducerInterceptor { private Integer successCount = 0; private Integer errorCount = 0; @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (Objects.isNull(e)) { successCount ++; } else { errorCount ++; } } @Override public void close() { System.out.println("success: " + successCount + ", error: " + errorCount); } @Override public void configure(Map<String, ?> map) { } }
使用拦截器
@Test public void testInterceptorChain() { //配置 Properties properties = new Properties(); properties.put("bootstrap.servers","shangxiaoying.cn:9092"); properties.put("aacks", "all"); //配置的key可以在ProducerConfig中找到 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //这里指定要使用的分区策略 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner"); //这里可以指定拦截器 List<String> interceptorList = new ArrayList<>(); interceptorList.add("cn.shangxiaoying.kafka.interceptors.TimeInterceptor"); interceptorList.add("cn.shangxiaoying.kafka.interceptors.CountInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); producer.send(new ProducerRecord<>("first", 0, "testInterceptor", "hello,Interceptor")); //该close方法也会调用拦截器的close producer.close(); }
application.properties
# 应用名称 spring.application.name=kafka-learn # 指定 kafka 的地址 spring.kafka.bootstrapservers=ip1:port, ip2:port #指定 key 和 value 的序列化器 spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
@RestController public class ProducerController { // Kafka 模板用来向 kafka 发送数据 @Autowired KafkaTemplate<String, String> kafka; @RequestMapping("/send") public String send(String msg) { kafka.send("first", msg); return "ok"; } }
application.properties
# 指定 kafka 的地址 spring.kafka.bootstrapservers=ip1:port, ip2:port # 指定 key 和 value 的反序列化器 spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer #指定消费者组的 group_id spring.kafka.consumer.group-id=kafka-test
@Configuration public class KafkaConsumer { // 指定要监听的 topic @KafkaListener(topics = "first") public void consumeTopic(String msg) { // 参数: 收到的 value System.out.println("收到的信息: " + msg); } }