最近在看kafka幂等性的源码的时候,在思考一个问题,既然幂等性是通过producerId + Sequence Number来判断是否重复,那么应该在broker缓存中,有保存producerId 和 Sequence Number,那么如果长时间一直使用,是否会由于 producerId 和 Sequence Number 的增长,造成OOM呢?在网上没找到答案,所以本文通过源码,来找到这个答案
本文不再从具体的producer代码分析了,直接从ApiKeys的代码开始,直接追踪,前面有不理解的朋友,请参考:【转载】万字长文干货 | Kafka 事务性之幂等性实现
【步骤1】kafka.server.KafkaApis#handle
1 def handle(request: RequestChannel.Request) { 2 try { 3 ... 4 request.header.apiKey match { 5 //处理发上来的请求 6 case ApiKeys.PRODUCE => handleProduceRequest(request) 7 ... 8 } 9 } catch { 10 ... 11 } finally { 12 ... 13 } 14 }
【步骤2】kafka.server.KafkaApis#handleProduceRequest
1 def handleProduceRequest(request: RequestChannel.Request) { 2 ... 3 4 if (authorizedRequestInfo.isEmpty) 5 ... 6 else { 7 val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId 8 9 //【重点】添加数据 10 replicaManager.appendRecords( 11 timeout = produceRequest.timeout.toLong, 12 requiredAcks = produceRequest.acks, 13 internalTopicsAllowed = internalTopicsAllowed, 14 isFromClient = true, 15 entriesPerPartition = authorizedRequestInfo, 16 responseCallback = sendResponseCallback, 17 processingStatsCallback = processingStatsCallback) 18 19 ... 20 } 21 }
【步骤3】kafka.server.ReplicaManager#appendRecords
1 def appendRecords(timeout: Long, 2 requiredAcks: Short, 3 internalTopicsAllowed: Boolean, 4 isFromClient: Boolean, 5 entriesPerPartition: Map[TopicPartition, MemoryRecords], 6 responseCallback: Map[TopicPartition, PartitionResponse] => Unit, 7 delayedProduceLock: Option[Lock] = None, 8 processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { 9 if (isValidRequiredAcks(requiredAcks)) { 10 ... 11 //【重点】添加到本地Log 12 val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, 13 isFromClient = isFromClient, entriesPerPartition, requiredAcks) 14 ... 15 } else { 16 ... 17 } 18 }
【步骤4】kafka.server.ReplicaManager#appendToLocalLog
private def appendToLocalLog(internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace(s"Append [$entriesPerPartition] to local log") entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { ... } else { try { //获取分区操作对象 val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => if (partition eq ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") //【重点】在分区中添加数据 partition.appendRecordsToLeader(records, isFromClient, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) } ... } catch { ... } } }
【步骤5】kafka.cluster.Partition#appendRecordsToLeader
1 def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { 2 val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { 3 leaderReplicaIfLocal match { 4 case Some(leaderReplica) => 5 ... 6 7 //【重点】 8 val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) 9 ... 10 case None => 11 ... 12 } 13 } 14 15 ... 16 }
【步骤6】kafka.log.Log#appendAsLeader
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = { //【重点】添加数据 append(records, isFromClient, assignOffsets = true, leaderEpoch) }
【步骤7】kafka.log.Log#append
1 private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { 2 maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { 3 //【重点】检查和验证数据 4 val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) 5 ... 6 } 7 }
【步骤8】kafka.log.Log#analyzeAndValidateRecords
1 private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean): 2 (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = { 3 ... 4 5 //遍历所有批次 6 for (batch <- records.batches.asScala if batch.hasProducerId) { 7 //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型 8 val maybeLastEntry = producerStateManager.lastEntry(batch.producerId) 9 ... 10 if (isFromClient) { 11 //【重点】重点就是findDuplicateBatch方法,是检查每个数据,是否发送重复,这个方法来自ProducerStateEntry类里面 12 maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate => 13 return (updatedProducers, completedTxns.toList, Some(duplicate)) 14 } 15 } 16 17 ... 18 } 19 ... 20 }
【步骤9】kafka.log.ProducerStateEntry#findDuplicateBatch
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { //如果这个批次的producerEpoch(生产代)不等于当前生产代,就没有重复 if (batch.producerEpoch != producerEpoch) None else //【重点】检查重复的sequence num batchWithSequenceRange(batch.baseSequence, batch.lastSequence) }
【步骤10】kafka.log.ProducerStateEntry#batchWithSequenceRange
1 def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = { 2 //【重点】过滤出重复的数据,只要offset范围在缓存范围有重合,代表有重复数据 3 val duplicate = batchMetadata.filter { metadata => 4 firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq 5 } 6 duplicate.headOption 7 }
在这里,我们就知道在【步骤8】,里面的这句,就是producerId缓存的地方,进去查看下
1 //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型 2 8 val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
kafka.log.ProducerStateManager#lastEntry
可以看到 producerId 就缓存在producers里
//可以看到 producerId 就缓存在producers里 def lastEntry(producerId: Long): Option[ProducerStateEntry] = producers.get(producerId)
查看下producers是什么类型
1 private val producers = mutable.Map.empty[Long, ProducerStateEntry]
在上下文搜索下,有没有 删除 prodcuerId的动作,找到下面三个地方
kafka.log.ProducerStateManager#truncateHead
1 def truncateHead(logStartOffset: Long) { 2 val evictedProducerEntries = producers.filter { case (_, producerState) => 3 !isProducerRetained(producerState, logStartOffset) 4 } 5 6 val evictedProducerIds = evictedProducerEntries.keySet 7 8 //【重点】删除producerId 9 producers --= evictedProducerIds 10 removeEvictedOngoingTransactions(evictedProducerIds) 11 removeUnreplicatedTransactions(logStartOffset) 12 13 if (lastMapOffset < logStartOffset) 14 lastMapOffset = logStartOffset 15 16 deleteSnapshotsBefore(logStartOffset) 17 lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) 18 }
向上追踪
kafka.log.ProducerStateManager#truncateAndReload
1 def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { 2 // remove all out of range snapshots 3 deleteSnapshotFiles(logDir, { snapOffset => 4 snapOffset > logEndOffset || snapOffset <= logStartOffset 5 }) 6 7 if (logEndOffset != mapEndOffset) { 8 producers.clear() 9 ongoingTxns.clear() 10 11 // since we assume that the offset is less than or equal to the high watermark, it is 12 // safe to clear the unreplicated transactions 13 unreplicatedTxns.clear() 14 loadFromSnapshot(logStartOffset, currentTimeMs) 15 } else { 16 //【重点】 17 truncateHead(logStartOffset) 18 } 19 }
kafka.log.Log#recoverSegment
这个方法都是在Log初始化的时候,才会调用,所以排除
kafka.log.Log#loadProducerState
这个方法,向上追踪,一个来源于Log初始化,所以排除
一个来源于 kafka.log.Log#truncateTo,是follower副本向leader副本同步数据的时候触发,这个比较可疑
kafka.log.Log#truncateTo
1 private[log] def truncateTo(targetOffset: Long): Boolean = { 2 maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") { 3 if (targetOffset < 0) 4 throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset)) 5 if (targetOffset >= logEndOffset) { 6 info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}") 7 false 8 } else { 9 info(s"Truncating to offset $targetOffset") 10 lock synchronized { 11 checkIfMemoryMappedBufferClosed() 12 if (segments.firstEntry.getValue.baseOffset > targetOffset) { 13 truncateFullyAndStartAt(targetOffset) 14 } else { 15 val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) 16 deletable.foreach(deleteSegment) 17 18 //【重点】清理 19 activeSegment.truncateTo(targetOffset) 20 updateLogEndOffset(targetOffset) 21 this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) 22 this.logStartOffset = math.min(targetOffset, this.logStartOffset) 23 _leaderEpochCache.clearAndFlushLatest(targetOffset) 24 loadProducerState(targetOffset, reloadFromCleanShutdown = false) 25 } 26 true 27 } 28 } 29 } 30 }
kafka.cluster.Partition#truncateTo
1 def truncateTo(offset: Long, isFuture: Boolean) { 2 // The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread 3 // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. 4 inReadLock(leaderIsrUpdateLock) { 5 //【重点】 6 logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture) 7 } 8 }
kafka.server.ReplicaFetcherThread#maybeTruncate
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long] val partitionsWithError = mutable.Set[TopicPartition]() fetchedEpochs.foreach { case (tp, epochOffset) => try { val replica = replicaMgr.getReplicaOrException(tp) val partition = replicaMgr.getPartition(tp).get //如果 epochOffset 有异常 if (epochOffset.hasError) { info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") partitionsWithError += tp } else { val fetchOffset = if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " + s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.") partitionStates.stateValue(tp).fetchOffset } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset) logEndOffset(replica, epochOffset) else epochOffset.endOffset //【重点】 partition.truncateTo(fetchOffset, isFuture = false) replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset) fetchOffsets.put(tp, fetchOffset) } } catch { case e: KafkaStorageException => info(s"Failed to truncate $tp", e) partitionsWithError += tp } } ResultWithPartitions(fetchOffsets, partitionsWithError) }
kafka.server.AbstractFetcherThread#doWork
override def doWork() { //【重点】可能清理 maybeTruncate() val fetchRequest = inLock(partitionMapLock) { val ResultWithPartitions(fetchRequest, partitionsWithError) = buildFetchRequest(states) if (fetchRequest.isEmpty) { trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } handlePartitionsWithErrors(partitionsWithError) fetchRequest } if (!fetchRequest.isEmpty) processFetchRequest(fetchRequest) }
kafka.utils.ShutdownableThread#run
1 override def run(): Unit = { 2 info("Starting") 3 try { 4 while (isRunning) 5 //【重点】一直执行,直到isRunning=false 6 doWork() 7 } catch { 8 case e: FatalExitError => 9 shutdownInitiated.countDown() 10 shutdownComplete.countDown() 11 info("Stopped") 12 Exit.exit(e.statusCode()) 13 case e: Throwable => 14 if (isRunning) 15 error("Error due to", e) 16 } finally { 17 shutdownComplete.countDown() 18 } 19 info("Stopped") 20 }
ShutdownableThread 的实现类有 kafka.server.AbstractFetcherThread
kafka.server.AbstractFetcherThread 的实现类有 kafka.server.ReplicaFetcherThread
kafka.server.ReplicaFetcherThread 这个类被创建,然后运行,就会自动清理了
kafka.server.ReplicaFetcherManager#createFetcherThread
1 override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { 2 val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("") 3 val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" 4 //新建ReplicaFetcherThread 5 new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager) 6 }
kafka.server.AbstractFetcherManager#addFetcherForPartitions
1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) { 2 lock synchronized { 3 val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialFetchOffset) => 4 BrokerAndFetcherId(brokerAndInitialFetchOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))} 5 6 def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) { 7 //【重点2】 8 val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) 9 fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread) 10 //【重点3】 11 fetcherThread.start 12 } 13 14 for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) { 15 val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) 16 fetcherThreadMap.get(brokerIdAndFetcherId) match { 17 case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port => 18 // reuse the fetcher thread 19 case Some(f) => 20 f.shutdown() 21 addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) 22 case None => 23 //【重点1】 24 addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) 25 } 26 27 fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) => 28 tp -> brokerAndInitOffset.initOffset 29 }) 30 } 31 } 32 33 info("Added fetcher for partitions %s".format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) => 34 "[" + topicPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) 35 }
kafka.server.AbstractFetcherManager#resizeThreadPool
1 def resizeThreadPool(newSize: Int): Unit = { 2 def migratePartitions(newSize: Int): Unit = { 3 fetcherThreadMap.foreach { case (id, thread) => 4 val removedPartitions = thread.partitionsAndOffsets 5 removeFetcherForPartitions(removedPartitions.keySet) 6 if (id.fetcherId >= newSize) 7 thread.shutdown() 8 //【重点】 9 addFetcherForPartitions(removedPartitions) 10 } 11 } 12 lock synchronized { 13 val currentSize = numFetchersPerBroker 14 info(s"Resizing fetcher thread pool size from $currentSize to $newSize") 15 numFetchersPerBroker = newSize 16 if (newSize != currentSize) { 17 // We could just migrate some partitions explicitly to new threads. But this is currently 18 // reassigning all partitions using the new thread size so that hash-based allocation 19 // works with partition add/delete as it did before. 20 migratePartitions(newSize) 21 } 22 shutdownIdleFetcherThreads() 23 } 24 }
kafka.server.DynamicThreadPool#reconfigure
1 override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { 2 if (newConfig.numIoThreads != oldConfig.numIoThreads) { 3 //【重点】 4 server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads) 5 } 6 if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) 7 server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) 8 if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) 9 //【重点】 10 server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) 11 if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) 12 server.getLogManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) 13 if (newConfig.backgroundThreads != oldConfig.backgroundThreads) 14 //【重点】 15 server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) 16 }
kafka.server.DynamicBrokerConfig#updateCurrentConfig
1 private def updateCurrentConfig(): Unit = { 2 val newProps = mutable.Map[String, String]() 3 newProps ++= staticBrokerConfigs 4 overrideProps(newProps, dynamicDefaultConfigs) 5 overrideProps(newProps, dynamicBrokerConfigs) 6 val oldConfig = currentConfig 7 val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) 8 if (newConfig ne currentConfig) { 9 currentConfig = newConfig 10 kafkaConfig.updateCurrentConfig(newConfig) 11 12 // Process BrokerReconfigurable updates after current config is updated 13 //【重点】 14 brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) 15 } 16 }
kafka.server.DynamicBrokerConfig#updateBrokerConfig
1 private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) { 2 try { 3 val props = fromPersistentProps(persistentProps, perBrokerConfig = true) 4 dynamicBrokerConfigs.clear() 5 dynamicBrokerConfigs ++= props.asScala 6 //【重点】 7 updateCurrentConfig() 8 } catch { 9 case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: $persistentProps", e) 10 } 11 }
kafka.server.DynamicBrokerConfig#initialize
1 private[server] def initialize(zkClient: KafkaZkClient): Unit = { 2 currentConfig = new KafkaConfig(kafkaConfig.props, false, None) 3 val adminZkClient = new AdminZkClient(zkClient) 4 updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)) 5 val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString) 6 val brokerConfig = maybeReEncodePasswords(props, adminZkClient) 7 //【重点】 8 updateBrokerConfig(kafkaConfig.brokerId, brokerConfig) 9 }
kafka.server.KafkaServer#startup
在KafkaServer的启动方法中,初始化Broker的配置
1 def startup() { 2 try { 3 ... 4 5 val canStartup = isStartingUp.compareAndSet(false, true) 6 if (canStartup) { 7 ... 8 9 // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be 10 // applied after DynamicConfigManager starts. 11 //【重点】 12 config.dynamicConfig.initialize(zkClient) 13 14 ... 15 } 16 } 17 catch { 18 ... 19 } 20 }
自此,我们就追踪到了,清理producerId的源码了,在kafkerServer启动后,初始化Borker的配置的时候,就会启动一个while循环一直执行dowork()方法,一直做清理的操作,所以不用担心producerId占用内存导致OOM