日志分段文件切分包含以下4个条件,满足其一即可:
log.segment.bytes
配置的值。log.segment.bytes
参数的默认值为 1073741824
,即1GBlog.roll.ms
或log.roll.hours
参数配置的值。如果同时配置了log.roll.ms
和log.roll.hours
参数,那么log.roll.ms
的优先级高,默认情况下,只配置了log.roll.hours
参数,其值为168,即7天。log.index.size.max.bytes
配置的值。log.index.size .max.bytes
的默认值为10485760
,即10MBInteger.MAX_VALUE
, 即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。Controller作为Kafka集群中的核心组件,它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。
Controller与Zookeeper进行交互,获取与更新集群中的元数据信息。其他broker并不直接与zookeeper进行通信,而是与Controller进行通信并同步Controller中的元数据信息。
Kafka集群中每个节点都可以充当Controller节点,但集群中同时只能有一个Controller节点。
Controller简单来说,就是kafka集群的状态管理者
controller竞选机制:简单说,先来先上!
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责维护整个集群中所有分区和副本的状态及分区leader的选举。
当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取zookeeper上的/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
对Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。 对Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。 对Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本选举。
对Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化; 对Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作
对Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化
从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。 对各topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。并将最新信息同步给其他所有broker。
客户端请求创建一个topic时,每一个分区副本在broker上的分配,是由集群controller来决定;
结论:里面会创建出来两个随机数
第一个随机数确定0号分区leader的位置,往后1号分区2号分区的leader依次往后顺延1
第二个随机数确定每个分区的第一个副本的位置 在leader所在机器上往后顺延(随机数+1)台机器,该台机器就是第一个副本的位置,剩余副本依次往后顺延1
// 举例: // broker_id = 0~19 一共20台机器 // 分区数20,副本数10 // 第一个随机数:19 // 第二个随机数:0 (0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8)) (1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) (2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) (3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)) (4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) (5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) (6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) (7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15)) (8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16)) (9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17)) (10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18)) (11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19)) (12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0)) (13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1)) (14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2)) (15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3)) (16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4)) (17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5)) (18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6)) (19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7)) // 其分布策略源码如下: private def assignReplicasToBrokersRackUnaware( nPartitions: Int, //分区的个数 10 replicationFactor: Int, //副本的个数 5 brokerList: Seq[Int],//broker的集合 8 0~7 fixedStartIndex: Int//默认值是-1 固定开始的索引位置 startPartitionId: Int): Map[Int, Seq[Int]] //默认值是-1 分区开始的位置 = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) { fixedStartIndex }else { rand.nextInt(brokerArray.length) } var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) { fixedStartIndex }else { rand.nextInt(brokerArray.length) } for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){ nextReplicaShift += 1 } val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) { replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) } ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret } private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers }
副本因子不能大于 Broker 的个数(报错:Replication factor: 4 larger than available brokers: 3.);
partition_0的第1个副本(leader副本)放置位置是随机从 brokerList 选择的;
其他分区的第1个副本(leader)放置位置相对于paritition_0分区依次往后移(也就是如果我们有5个 Broker,5个分区,假设partition0分区放在broker4上,那么partition1将会放在broker5上;patition2将会放在broker1上;partition3在broker2,依次类);
各分区剩余的副本相对于分区前一个副本偏移随机数nextReplicaShift+1,然后后面的副本依次加1
分区 leader 副本的选举由控制器controller负责具体实施。
当创建分区(创建主题或增加分区都有创建分区的动作)或Leader下线(此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行 leader 的选举动作。
选举策略:按照 ISR集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中
一个分区的AR集合在partition分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变;
生产者工作流程图:
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka 中;
RecordAccumulator主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory
配置,默认值为33554432B
,即32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms
的配置,此参数的默认值为60000
,即60秒。
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,
RecordAccumulator内部为每个分区都维护了一个双端队列,即Deque<ProducerBatch>。
消息写入缓存时,追加到双端队列的尾部;
Sender读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;
与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。
ProducerBatch 大小和batch.size
参数也有着密切的关系。当一条消息(ProducerRecord ) 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看 ProducerBatch中是否还可以写入这个ProducerRecord,如果可以写入就直接写入,如果不可以则需要创建一个新的Producer Batch。在新建 ProducerBatch时评估这条消息的大小是否超过 batch.size
参数大小,如果不超过,那么就以 batch.size
参数的大小来创建 ProducerBatch。
如果生产者客户端需要向很多分区发送消息, 则可以将buffer.memory参数适当调大以增加整体的吞吐量。
Sender从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque<Producer Batch>的形式转变成<Node,List<ProducerBatch>>的形式,其中Node表示Kafka集群broker节点。对于网络连接来说,生产者客户端是与具体broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node, List<ProducerBatch>>的形式之后, Sender会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个Node了,这里的Request是Kafka各种协议请求;
请求在从sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<Nodeld, Deque<request>>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per.connection
,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。通过比较 Deque<Request> 的size与这个参数的大小来判断对应的 Node中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。
kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在构造producer 时通过acks参数指定(在 0.8.2.X 前是通过 request.required.acks 参数设置的)。这个参数支持以下三种值:
acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。
acks = 1:意味着leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader发生崩溃。
acks = all(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
acks | 含义 |
---|---|
0 | Producer往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。 |
1 | Producer往集群发送数据只要 leader成功写入消息就可以发送下一条,只确保Leader 接收成功。 |
-1 或 all | Producer往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。 |
生产者将acks设置为all,是否就一定不会丢数据呢?
否!如果在某个时刻ISR列表只剩leader自己了,那么就算acks=all,收到这条数据还是只有一个点;
可以配合另外一个参数缓解此情况: 最小同步副本数>=2
BROKER端参数: min.insync.replicas(默认1)
生产者的ack=all,也不能完全保证数据发送的100%可靠性
为什么?因为,如果服务端目标partition的同步副本只有leader自己了,此时,它收到数据就会给生产者反馈成功!
可以修改服务端的一个参数(分区最小ISR数[min.insync.replicas]>=2),来避免此问题;
acks是控制kafka服务端向生产者应答消息写入成功的条件;生产者根据得到的确认信息,来判断消息发送是否成功;
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB
一般情况下,这个默认值就可以满足大多数的应用场景了。
这个参数还涉及一些其它参数的联动,比如 broker 端(topic级别参数)的 message.max.bytes参数(默认1000012),如果配置错误可能会引起一些不必要的异常;比如将 broker 端的 message.max.bytes 参数配置为10B ,而 max.request.size参数配置为20B,那么当发送一条大小为 15B 的消息时,生产者客户端就会报出异常;
retries参数用来配置生产者重试的次数,默认值为2147483647,即在发生异常的时候不进行任何重试动作。
消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试 。如果将 retries参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于1的值,那可能会出现错序的现象:如果批次1消息写入失败,而批次2消息写入成功,那么生产者会重试发送批次1的消息,此时如果批次1的消息写入成功,那么这两个批次的消息就出现了错序。
对于某些应用来说,顺序性非常重要 ,比如MySQL binlog的传输,如果出现错误就会造成非常严重的后果;一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection 配置为1 ,而不是把retries配置为0,不过这样也会影响整体的吞吐。
这个参数用来指定消息的压缩方式,默认值为“none",即默认情况下,消息不会被压缩。该参数还可以配置为 "gzip","snappy" 和 "lz4"。对消息进行压缩可以极大地减少网络传输、降低网络I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;
每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在recordAccumulator里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 时间,默认值为0。生产者客户端会在ProducerBatch填满或等待时间超过linger.ms 值时发送出去。
增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
幂等性,就是一个操作重复做,也不会影响最终的结果!
int a = 1;
a++; // 非幂等操作
val map = new HashMap()
map.put(“a”,1); // 幂等操作
在kafka中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性!
用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner
默认分区器的分区规则:
自定义partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口
会触发rebalance(消费者)的事件可能是如下任意一种:
将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何rebalance也涉及到分区分配策略。
kafka有两种的分区分配策略:range(默认) 和 roundrobin(新版本中又新增了另外2种)
我们可以通过partition.assignment.strategy
参数选择 range 或 roundrobin。partition.assignment.strategy
参数默认的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
例1:
假设有TOPIC_A有5个分区,由3个consumer(C1,C2,C3)来消费;5/3得到商1,余2,则每个消费者至少分1个分区,前两个消费者各多1个分区C1: 2个分区,C2:2个分区,C3:1个分区
接下来,就按照“区间”进行分配:
TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_A_3 TOPIC_A-4
C1: TOPIC_A-0, TOPIC_A-1
C2 : TOPIC_A-2, TOPIC_A_3
C3: TOPIC_A-4
例2:
假设TOPIC_A有5个分区,TOPIC_B有3个分区,由2个consumer(C1,C2)来消费
5/2得到商2,余1,则C1有3个分区,C2有2个分区,得到结果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4
3/2得到商1,余1,则C1有2个分区,C2有1个分区,得到结果
C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
以上述“例2”来举例:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3
C3 TOPIC_A-4
对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:
再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)
对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:
消费组在消费数据的时候,有两个角色进行组内的各事务的协调;
角色1: Group Coordinator (组协调器) 位于服务端(就是某个broker)
组协调器的定位:
coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上 查找Group Coordinator的方式: 先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets 中的分区编号; 分区数 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数offset.topic.num.partitions来配置,默认值是50; 找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;
角色2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)
组长的定位:随机选的哦!!!
每个消费组在服务端对应一个GroupCoordinator其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。
消费者客户端中由ConsumerCoordinator组件负责与GroupCoordinator行交互;
ConsumerCoordinator和GroupCoordinator最重要的职责就是负责执行消费者rebalance操作
如果想控制消费者在发生再均衡时执行一些特定的工作,可以通过订阅主题时注册“再均衡监听器”来实现;
场景举例:在发生再均衡时,处理消费位移
如果A消费者消费掉的一批消息还没来得及提交offset,而它所负责的分区在rebalance中转移给了B消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;
代码示例:
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Optional; import java.util.Properties; /** * 消费组再均衡观察 */ public class ConsumerDemo2 { public static void main(String[] args) { //1.创建kafka的消费者对象,附带着把配置文件搞定 Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //2.订阅主题(确定需要消费哪一个或者多个主题) //我现在想看看如果我的消费者组里面,多了一个消费者或者少了一个消费者,他有没有给我做再均衡 consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() { /** * 这个方法是将原来的分配情况全部取消,或者说把所有的分区全部回收了 * 这个全部取消很恶心,原来的消费者消费的好好的,他一下子就给他全部停掉了 * @param collection */ @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { System.out.println("我原来的均衡情况是:"+collection + "我已经被回收了!!"); } /** * 这个方法是当上面的分配情况全部取消以后,调用这个方法,来再次分配,这是在均衡分配后的情况 * @param collection */ @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { System.out.println("我是重新分配后的结果:"+collection); } }); while (true){ consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); } } }
CAP理论作为分布式系统的基础理论,它描述的是一个分布式系统在以下三个特性中:
最多满足其中的两个特性。也就是下图所描述的。分布式系统要么满足CA,要么CP,要么AP。无法同时满足CAP。
分区容错性:指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。事实上我们在设计分布式系统时都会考虑到bug,硬件,网络等各种原因造成的故障,所以即使部分节点或者网络出现故障,我们要求整个系统还是要继续使用的(不继续使用,相当于只有一个分区,那么也就没有后续的一致性和可用性了)
可用性:一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。
一致性:在分布式系统完成某写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。
Kafka 作为一个商业级消息中间件,数据可靠性和可用性是优先考虑的重点,兼顾数据一致性;
参考文档:https://www.cnblogs.com/lilpig/p/16840963.html
Kafka 0.11.0.0 版本开始引入了幂等性与事务这两个特性,以此来实现 EOS ( exactly once semantics ,精确一次处理语义)
生产者在进行发送失败后的重试时(retries),有可能会重复写入消息,而使用 Kafka幂等性功能之后就可以避免这种情况。
开启幂等性功能,只需要显式地将生产者参数 enable.idempotence
设置为 true (默认值为 false):
props.put("enable.idempotence",true);
在开启幂等性功能时,如下几个参数必须正确配置:
如有违反,则会抛出ConfigException异常
;
producer.send(“aaa”)
消息aaa就拥有了一个唯一的序列号, 如果这条消息发送失败,producer内部自动重试(retry),此时序列号不变;
producer.send(“bbb”)
消息bbb拥有一个新的序列号
注意:kafka只保证producer单个会话中的单个分区幂等;
主要原理:
开始事务-->发送一个ControlBatch消息(事务开始)
提交事务-->发送一个ControlBatch消息(事务提交)
放弃事务-->发送一个ControlBatch消息(事务终止)
Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // acks props.setProperty(ProducerConfig.ACKS_CONFIG,"-1"); // 生产者的重试次数 props.setProperty(ProducerConfig.RETRIES_CONFIG,"3"); // 飞行中的请求缓存最大数量 props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3"); // 开启幂等性 props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true"); // 设置事务id props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");
事务控制的代码模板
// 初始化事务 producer.initTransaction( ) // 开启事务 producer.beginTransaction( ) // 干活 // 提交事务 producer.commitTransaction( ) // 异常回滚(放弃事务) catch里面 producer.abortTransaction( )
消费者api是会拉取到尚未提交事务的数据的;只不过可以选择是否让用户看到!
是否让用户看到未提交事务的数据,可以通过消费者参数来配置:
isolation.level=read_uncommitted(默认值) isolation.level=read_committed
用户的程序,要从kafka读取源数据,数据处理的结果又要写入kafka
kafka能实现端到端的事务控制(比起上面的“基础”事务,多了一个功能,通过producer可以将consumer的消费偏移量绑定到事务上提交)
producer.sendOffsetsToTransaction(offsets,consumer_id)
为了实现事务,应用程序必须提供唯一transactional.id,并且开启生产者的幂等性
properties.put ("transactional.id","transactionid00001"); properties.put ("enable.idempotence",true);
“消费kafka-处理-生产结果到kafka”典型场景下的代码结构示例:
package com.doit.day04; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class Exercise_kafka2kafka { public static void main(String[] args) { Properties props = new Properties(); //消费者的 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shouwei"); //自动提交偏移量 props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //写生产者的一些属性 props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //设置ack 开启幂等性必须设置的三个参数 props.setProperty(ProducerConfig.ACKS_CONFIG,"-1"); props.setProperty(ProducerConfig.RETRIES_CONFIG,"3"); props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3"); //开启幂等性 props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true"); //开启事务 props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"id_fro_39_19"); //消费数据 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); KafkaProducer<String, String> producer = new KafkaProducer<>(props); //初始化事务 producer.initTransactions(); //订阅主题 consumer.subscribe(Arrays.asList("eventlog")); while (true){ //拉取数据 ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); try { //开启事务 producer.beginTransaction(); for (ConsumerRecord<String, String> record : poll) { String value = record.value(); //将value的值写入到另外一个topic中 producer.send(new ProducerRecord<String,String>("k2k",value)); } producer.flush(); //提交偏移量 consumer.commitAsync(); //提交事务 producer.commitTransaction(); } catch (ProducerFencedException e) { //放弃事务 producer.abortTransaction(); } } } }
使用Zero-Copy (零拷贝)技术来进一步提升性能;
零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手;
零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换;对于Linux系统而言,零拷贝技术依赖于底层的 sendfile( )方法实现;对应于Java 语言,FileChannal.transferTo( )方法的底层实现就是 sendfile( )方法;