在构建好kafka对象之后 如果我们想要发送消息 就要调用producer的send方法,参数包括消息体和回调函数 在上一篇文章中我们看到了kafkaproducer的整体架构 了解了发送消息主要涉及到三个点
首先来看一个interceptor的列子
public class KafkaIntercepter implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord producerRecord) { System.out.println("拦截器执行"); return null; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { //用于正常返回时的回调方法 } @Override public void close() { } @Override public void configure(Map<String, ?> map) { //自己配置的类 kafka都是通过反射来进行加载的 利用的是无餐构造 不可能会初始化你所有的数据 所以kafka在反射生成对象之后 会调用configure初始化数据 } } 复制代码
当我们调用send方法的时候,producer做的第一件事就是去执行拦截器 注意拦截器事一个list,被封装在ProducerInterceptors里面 复制代码并且执行每一个拦截器onSend方法
这一段代码有点长 我们首先看看源码 然后再分开说明 看不懂没关系 先看看上面的时序图 再结合源码看 就会有个大概的了解 再看后面具体的讲解 复制代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { '//等待元数据更新' clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; '//开始序列化key和value' byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } '//执行分区器' int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); '//校验消息大小' ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } '//调用append最追加消息' RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); if (result.batchIsFull || result.newBatchCreated) { '//如果消息满了 就唤醒sender线程起来执行' log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } } 复制代码
这个方法首先会从缓存中获取元数据 然后获取当前topic的分区 判断分区是否存在 如果存在就直接返回 如果不存在 就唤醒sender线程 并且等待更新 更新之后返回
Cluster cluster = metadata.fetch(); if (cluster.invalidTopics().contains(topic)) throw new InvalidTopicException(topic); '//将topic添加进元数据缓存' metadata.add(topic, nowMs); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined' // or within the known partition range //返回缓存的数据 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long remainingWaitMs = maxWaitMs; long elapsed = 0; // Issue metadata requests until we have metadata for the topic and the requested partition, // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. do { if (partition != null) { log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); } else { log.trace("Requesting metadata update for topic {}.", topic); } metadata.add(topic, nowMs + elapsed); '//获取元数据版本号 并且设置needUpdate为true 同时唤醒sender线程' int version = metadata.requestUpdate(); sender.wakeup(); try { '//等待更新元数据' metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException( String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs)); } '//重新从缓存中获取' cluster = metadata.fetch(); elapsed = time.milliseconds() - nowMs; if (elapsed >= maxWaitMs) { throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs)); } metadata.maybeThrowExceptionForTopic(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); return new ClusterAndWaitTime(cluster, elapsed); } 复制代码
2.metadata.add 方法 将topic添加进topic map中 value为过期时间
3.cluster.partitionCountForTopic 当前主题下的分区数量
4.metadata.requestUpdate() 更新needUpdate标识为true ,sender会检查这个字段 如果为true 则更新元数据集合。然后就是返回当前元数据集合的版本号 ,sender线程在更新完元数据之后 会对当前版本号自加 我们判断元数据是否更新完成其主要就是依赖这个版本号来做判断的
public synchronized int requestUpdate() { this.needUpdate = true; return this.updateVersion; } 复制代码
public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException { long currentTimeMs = time.milliseconds(); long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs; time.waitObject(this, () -> { // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller. maybeThrowFatalException(); //版本号作为是否更新完成的依据 return updateVersion() > lastVersion || isClosed(); }, deadlineMs); if (isClosed()) throw new KafkaException("Requested metadata update after close"); } 复制代码
waitObject方法 真正执行等待的方法 while死循环做检查
@Override public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs) throws InterruptedException { synchronized (obj) { while (true) { if (condition.get()) return; long currentTimeMs = milliseconds(); if (currentTimeMs >= deadlineMs) throw new TimeoutException("Condition not satisfied before deadline"); //wait等待 obj.wait(deadlineMs - currentTimeMs); } } } 复制代码