kafka reassign过程详见:reassign过程
最近kafka集群发生reassign过程卡住的情况,问题发生过程如下
问题日志
2021-07-16 10:35:41,193 INFO kafka.controller.KafkaController: [Controller id=3] 0/2 replicas have caught up with the leader for partition kafka-9 being reassigned. Replica(s) 3,2 still need to catch up
对应代码逻辑
case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent { override def state: ControllerState = ControllerState.PartitionReassignment override def process(): Unit = { if (!isActive) return // check if this partition is still being reassigned or not controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext => val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet if (caughtUpReplicas == reassignedReplicas) { // resume the partition reassignment process info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " + s"partition $partition being reassigned. Resuming partition reassignment") onPartitionReassignment(partition, reassignedPartitionContext) } else { info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " + s"partition $partition being reassigned. Replica(s) " + s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up") } case None => error(s"Error handling reassignment of partition $partition to replicas " + s"${reassignedReplicas.mkString(",")} as it was never created") } } } }
因为此时replica计划的节点不在isr里,只能等待,问题是一直在等待
等待的broker里的日志如下:
2021-07-16 10:35:41,150 WARN state.change.logger: [Broker id=3] Ignoring LeaderAndIsr request from controller 3 with correlation id 969 epoch 44 for partition kafka-9 as the local replica for the partition is in an offline log directory
对应代码
leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch if (partition eq ReplicaManager.OfflinePartition) { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) }
因为partition为offline状态,LeaderAndIsr请求实际不会执行,所以不会加入isr,然后reassign就一直等待
那partition为什么会offline,继续跟进日志发现
2021-07-16 00:25:17,836 INFO kafka.log.LogManager: Stopping serving logs in dir /data/kafka/data
而且当时有大量的磁盘满报错
java.io.IOException: No space left on device
因为之前发生过磁盘满,所以kafka将这个目录标记为offline,所有在这个目录里的partition都会被标记offline,LeaderAnsIsr请求不会执行
可以从controller里搜索 ‘still need to catch up’ 看看有哪些broker一直在等待,然后到这些服务器上看启动之后是否有 ‘Stopping serving logs’ 日志,如果有就会卡住,解决方法就是将等待的broker重启,然后reassign就可以开始