kafka作为消息队列领域的中流砥柱,作为后端开发,除了会使用以外,还需要深入的研究和分析其内在实现和设计,借鉴其优秀的设计理念。同时也可以帮助后续在实际问题定位和解决上提供帮助。本文主要介绍kafka发送到底是如何实现的
下面的代码中使用最简单的配置来展示如何通过kafka发送一条数据:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("test-topic", "this is a test message")); producer.close(); } } 复制代码
KafkaProducer
实例send
函数发送数据根据入口方法send,一步步的对调用链的核心内容进行解析,来揭开kafka发送的神秘面纱
send(ProducerRecord<K, V> record, Callback callback)
函数解析该方法主要是kafka发送消息的入口函数
话不多说,源码走起:
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 先走一波拦截器,拦截器是提供给用户做消息定制化处理逻辑。 ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); //核心调用都在这个方法实现里面 return doSend(interceptedRecord, callback); } 复制代码
ProducerInterceptors
接口)主要是给用户提供一个钩子,让用户在消息记录发送之前,或者producer回调方法执行之前,对消息或者回调信息做一些定制的逻辑处理。
,
分隔的多个类全量名。doSend(ProducerRecord<K, V> record, Callback callback)
函数,核心调用逻辑应该在这里面doSend(ProducerRecord<K, V> record, Callback callback)
函数解析该方法封装了kafka发送的核心逻辑,包含了到sender实际发送之前的所有处理流程。
先描述以下整体流程,然后再看源码,这样大家更有带入感:
让我们通过源码,来看一下是否是上述的流程:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { //第一步,确认当前的topic是有效的,然后获取集群信息(内部包含topic的信息) ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //上述调用中,存在阻塞等待更新的场景,需要减去这部分耗时 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; //第二步,对key和value进行序列化 byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { ... } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { ... } // 第三步,确认当前数据的分区号 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); //计算包装后的kafka消息体的大小 setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); //将拦截器和回调钩子形成一个调用链,在异步回调结果的时候会进行调用 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); //第四步,通过消息追加器(RecordAccumulator)追加数据 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); //第五步, 如果满足发送时机,则唤醒sender函数。由他执行真正的发送逻辑 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } //异常捕获处理逻辑 ... } 复制代码
整个代码的实现流程还是比较清晰的,整体流程上文已经描述,需要注意以下几个核心依赖类:
ProducerInterceptors
拦截器接口MetaData
元数据实体,里面封装和包含了kafka集群的整个元数据信息org.apache.kafka.common.serialization.Serializer
序列化接口,包含key序列化和value序列化Partitioner
分区策略接口和PartitionInfo分区信息实体RecordAccumulator
消息累加器,主要负责消息的合并和加入到缓存中ProducerBatch
kafka实际发送的一条消息实体,是由一条或者多条Record合并而成。Sender
其实是一个Runnable接口的实现类,不断的循环,读取ProducerBatch,然后真正执行发送流程他们的组合关系,如图所示
核心功能是获取topic的详细信息,内部封装了阻塞更新,核心依赖是
org.apache.kafka.clients.Metadata
类
//只保留核心代码,去除不在本文解析范围内的代码 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { //在Metadata中维护了topic的过期更新时间映射关系,add操作更新topic的过期更新时间,如果第一次加入,则需要立即更新(更新标记为) metadata.add(topic); //fetch是一个synchronized同步方法,保障了线程安全,获取一个最新可用的集群信息 Cluster cluster = metadata.fetch(); // 从集群的topic数据中,获取当前topic的分区数,主要是判断该topic的信息是否有效 Integer partitionsCount = cluster.partitionCountForTopic(topic); //有效的话就直接返回 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); // 下面就是阻塞等待topic元数据信息更新成功或者超时 long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { metadata.add(topic); // 返回当前版本号,初始值为0,每次更新完成后会自增,并将 needUpdate 设置为 true int version = metadata.requestUpdate(); //将sender唤醒,向broker发送更新metadata请求 sender.wakeup(); try { //阻塞等待更新 metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { //处理超时异常 } cluster = metadata.fetch(); //kafka存在一个最大阻塞时间,需要减去这部分阻塞耗时。如果超时,则直接返回异常,消息发送失败 elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; //判断是否更新完成 partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); } 复制代码
needUpdate
来控制是否需要更新最终函数的返回是ClusterAndWaitTime
,最新的集群信息和阻塞耗时
key和value的序列化需要实现
org.apache.kafka.common.serialization.Serializer
接口
kafka已经内置了很多序列化,在org.apache.kafka.common.serialization
包下面,最为常用的就是StringSerializer。一般Serializer 和Deserializer 是对应的
通过实现
org.apache.kafka.clients.producer.Partitioner
接口来设计自定义消息分区规则。
kafka 会提供一个默认的分区策略,一般都使用这个分区策略org.apache.kafka.clients.producer.internals.DefaultPartitioner
,分区规则如下:
kafka为了提高效率,会对多条消息进行合并,同时也为了防止消息量比较少时,无法达到合并大小,会有一个最大发送周期的配置,任意一个触发,都会将消息发送出去
看一下RecordAccumulator.append方法的实现逻辑
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { //有一个计数器,来确认并发追加的数量。来确保消息不会处理丢失 appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { //为每一个topic+partionerNum维护一个双端队列,用来保存已经合并后的批量消息体(ProducerBatch) Deque<ProducerBatch> dq = getOrCreateDeque(tp); //防止并发执行 synchronized (dq) { ... //尝试往最后一个ProducerBatch追加消息,如果有足够的内存,则返回结果 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } //如果队列为空或者ProducerBatch没有足够空间了,则需要新建一个ProducerBatch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); //kafka会要求设置一个标准批消息的大小(batchSize),为了提供NIO的Buffer重复利用,省去重新分配和回收的开销,如果消息超过标准大小,则以当前消息大小申请空间 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); //从总缓存池中申请缓存块,这个申请的过程会在后面的文章中专门描述 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { //这里重复尝试的原因是free.allocate会有阻塞等待,可能其他线程已经新建了一个批消息,则直接追加,提高利用效率 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } //根据buffer + record新建一个ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); //追加到双端队列中 dq.addLast(batch); incomplete.add(batch); //主要是finally代码块中需要回收没有使用的缓存块 buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { //如果在第二次synchronized (dq),然后追加到别的线程创建的ProducerBatch,则需要将申请到的buffer给回收掉 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } 复制代码
sender是Runnable接口的实现类,独立一个线程不断的从RecordAccumulator的双端队列中,取符合发送条件的批消息进行发送
kafkaProducer中的主要流程已经介绍完成了,相信大家对kafkaProducer如何对数据进行发送有了一个整体的了解,后续关于合并,实际如何发送会做专门的讲解