该过程并不能保证数据在各个broker的各个磁盘上平均分配,必要时需要人工干预reassign计划才能更好的均衡,尤其是在各个partition大小极度不均衡的情况下
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"
main方法中根据opt调用对应的方法,generateAssignment用来生成计划,executeAssignment用来执行计划
def main(args: Array[String]): Unit = { val opts = validateAndParseArgs(args) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) val time = Time.SYSTEM val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time) val adminClientOpt = createAdminClient(opts) try { if(opts.options.has(opts.verifyOpt)) verifyAssignment(zkClient, adminClientOpt, opts) else if(opts.options.has(opts.generateOpt)) generateAssignment(zkClient, opts) else if (opts.options.has(opts.executeOpt)) executeAssignment(zkClient, adminClientOpt, opts) } catch { case e: Throwable => println("Partitions reassignment failed due to " + e.getMessage) println(Utils.stackTrace(e)) } finally zkClient.close() }
其中generateAssignment方法如下
def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = { val topicsToReassign = parseTopicsData(topicsToMoveJsonString) val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign) if (duplicateTopicsToReassign.nonEmpty) throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet) val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic } val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced val adminZkClient = new AdminZkClient(zkClient) val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign)) val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]() groupedByTopic.foreach { case (topic, assignment) => val (_, replicas) = assignment.head val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size) partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) => new TopicPartition(topic, partition) -> replicas } } (partitionsToBeReassigned, currentAssignment) }
主要是调用AdminUtils.assignReplicasToBrokers
/** * There are 3 goals of replica assignment: * * 1. Spread the replicas evenly among brokers. * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible * * To achieve this goal for replica assignment without considering racks, we: * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift. * * Here is an example of assigning * broker-0 broker-1 broker-2 broker-3 broker-4 * p0 p1 p2 p3 p4 (1st replica) * p5 p6 p7 p8 p9 (1st replica) * p4 p0 p1 p2 p3 (2nd replica) * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) ... def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata], nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, startPartitionId: Int = -1): Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new InvalidPartitionsException("Number of partitions must be larger than 0.") if (replicationFactor <= 0) throw new InvalidReplicationFactorException("Replication factor must be larger than 0.") if (replicationFactor > brokerMetadatas.size) throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.") if (brokerMetadatas.forall(_.rack.isEmpty)) assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId) else { if (brokerMetadatas.exists(_.rack.isEmpty)) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.") assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId) } }
会根据是否有rack信息来走不通的方法,大部分场景用的是assignReplicasToBrokersRackUnaware
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret }
可以看到分配逻辑是:
下面再看executeAssignment
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) { val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString) val adminZkClient = new AdminZkClient(zkClient) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient) // If there is an existing rebalance running, attempt to change its throttle if (zkClient.reassignPartitionsInProgress()) { println("There is an existing assignment running.") reassignPartitionsCommand.maybeLimit(throttle) } else { printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic)) if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0) println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value.")) if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) { println("Successfully started reassignment of partitions.") } else println("Failed to reassign partitions %s".format(partitionAssignment)) } }
主要调用的是reassignPartitions方法
def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = { maybeThrottle(throttle) try { val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkClient, p.topic, p.partition) } if (validPartitions.isEmpty) false else { if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty) throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory") val startTimeMs = System.currentTimeMillis() // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica has not been created yet. if (proposedReplicaAssignment.nonEmpty) alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs) // Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker zkClient.createPartitionReassignment(validPartitions.map({case (key, value) => (new TopicPartition(key.topic, key.partition), value)}).toMap) // Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory. // It may take some time for controller to create replica in the broker. Retry if the replica has not been created. var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis() val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica] while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) { replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable( proposedReplicaAssignment.filterKeys(replica => !replicasAssignedToFutureDir.contains(replica)), adminClientOpt.get, remainingTimeMs) Thread.sleep(100) remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis() } replicasAssignedToFutureDir.size == proposedReplicaAssignment.size } } catch { case _: NodeExistsException => val partitionsBeingReassigned = zkClient.getPartitionReassignment throw new AdminCommandFailedException("Partition reassignment currently in " + "progress for %s. Aborting operation".format(partitionsBeingReassigned)) } }
主要是调用zkClient.createPartitionReassignment方法在zk上创建znode:/kafka/admin/reassign_partitions,这个节点的值就是reassign计划信息,控制逻辑到此就断了,剩下的要看controller的部分
private def initializeControllerContext() { // update controller cache with delete topic information controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach { case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas) } controllerContext.partitionLeadershipInfo.clear() controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // register broker modifications handlers registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id)) // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager startChannelManager() initializePartitionReassignment() info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}") info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}") info(s"Current list of topics in the cluster: ${controllerContext.allTopics}") }
controller初始化的时候调用initializeControllerContext,其中会调用initializePartitionReassignment
private def initializePartitionReassignment() { // read the partitions being reassigned from zookeeper path /admin/reassign_partitions val partitionsBeingReassigned = zkClient.getPartitionReassignment info(s"Partitions being reassigned: $partitionsBeingReassigned") controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map { case (tp, newReplicas) => val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp) tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler) } }
其中会注册zk节点的watcher,具体逻辑在PartitionReassignmentIsrChange
case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent { override def state: ControllerState = ControllerState.PartitionReassignment override def process(): Unit = { if (!isActive) return // check if this partition is still being reassigned or not controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext => val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet if (caughtUpReplicas == reassignedReplicas) { // resume the partition reassignment process info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " + s"partition $partition being reassigned. Resuming partition reassignment") onPartitionReassignment(partition, reassignedPartitionContext) } else { info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " + s"partition $partition being reassigned. Replica(s) " + s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up") } case None => error(s"Error handling reassignment of partition $partition to replicas " + s"${reassignedReplicas.mkString(",")} as it was never created") } } } }
正常情况要调用到onPartitionReassignment才可以做reassign,前提条件是reassign计划的节点都在isr里,除此之外还有一个回调也会触发reassign,即onBrokerStartup
private def onBrokerStartup(newBrokers: Seq[Int]) { info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new // broker via this update. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the // common controlled shutdown case, the metadata will reach the new brokers faster sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica) // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter { case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains) } partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) } // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) if (replicasForTopicsToBeDeleted.nonEmpty) { info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " + s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " + s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics") topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } registerBrokerModificationsHandler(newBrokers) }
下面看onPartitionReassignment的细节
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas if (!areReplicasInIsr(topicPartition, reassignedReplicas)) { info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " + "caught up with the leader") val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet //1. Update AR in ZK with OAR + RAR. updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq) //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq) //3. replicas in RAR - OAR -> NewReplica startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " + "reassigned to catch up with the leader") }
主要看前半部分,最重要的是第2步会发送LeaderAndIsr请求给所有的计划中的replica
最后看broker如何处理LeaderAndIsr请求
def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " + s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " + s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition") } replicaStateChangeLock synchronized { if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " + s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " + s"Latest known controller epoch is $controllerEpoch") leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception) } else { val responseMap = new mutable.HashMap[TopicPartition, Errors] val controllerId = leaderAndIsrRequest.controllerId controllerEpoch = leaderAndIsrRequest.controllerEpoch // First check partition's leader epoch val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]() val newPartitions = leaderAndIsrRequest.partitionStates.asScala.keys.filter(topicPartition => getPartition(topicPartition).isEmpty) leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch if (partition eq ReplicaManager.OfflinePartition) { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) { // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if(stateInfo.basePartitionState.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " + s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " + s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } else { // Otherwise record the error code in response stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition since its associated " + s"leader epoch ${stateInfo.basePartitionState.leaderEpoch} is not higher than the current " + s"leader epoch $partitionLeaderEpoch") responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } } val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => stateInfo.basePartitionState.leader == localBrokerId } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) else Set.empty[Partition] val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap) else Set.empty[Partition] leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition => /* * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. * we need to map this topic-partition to OfflinePartition instead. */ if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition)) allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) ) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { startHighWaterMarksCheckPointThread() hwThreadInitialized = true } val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition)) // Add future replica to partition's map val futureReplicasAndInitialOffset = newOnlineReplicas.filter { replica => logManager.getLog(replica.topicPartition, isFuture = true).isDefined }.map { replica => replica.topicPartition -> BrokerAndInitialOffset(BrokerEndPoint(config.brokerId, "localhost", -1), replica.highWatermark.messageOffset) }.toMap futureReplicasAndInitialOffset.keys.foreach(tp => getPartition(tp).get.getOrCreateReplica(Request.FutureLocalReplicaId)) // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move replica from source dir to destination dir futureReplicasAndInitialOffset.keys.foreach(logManager.abortAndPauseCleaning) replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava) } } }
处理过程如下:首先尝试创建partition,然后检查partition状态,如果一切正常,会开始同步数据并加入partition的isr
创建本地log的位置在
val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition))
getReplica方法会调用到Partition.getReplica
def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId)) def getReplicaOrException(replicaId: Int = localBrokerId): Replica = getReplica(replicaId).getOrElse( throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition"))
最终会调用到LogManager.getOrCreateLog
def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDir = { val preferredLogDir = preferredLogDirs.get(topicPartition) if (isFuture) { if (preferredLogDir == null) throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") } if (preferredLogDir != null) preferredLogDir else nextLogDir().getAbsolutePath } if (!isLogDirOnline(logDir)) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline") try { val dir = { if (isFuture) new File(logDir, Log.logFutureDirName(topicPartition)) else new File(logDir, Log.logDirName(topicPartition)) } Files.createDirectories(dir.toPath) ...
只看前半部分即可,创建log的时候会受限从preferredLogDirs找,如果没有就随机找一个可用的目录,preferredLogDirs的初始化过程为
def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = { // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir if (!getLog(topicPartition).exists(_.dir.getParent == logDir) && !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir)) preferredLogDirs.put(topicPartition, logDir) }
把当前磁盘的数据分布情况放到preferredLogDirs