从用户自己编写的 main 方法开始阅读
package com.atguigu.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.stream.IntStream; public class CustomProducer { public static void main(String[] args) { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建 kafka 生产者对象 try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { // 4. 调用 send 方法,发送消息 IntStream.range(0, 7) .mapToObj(i -> new ProducerRecord<>("first", i, "test", "atguigu")) .forEach(kafkaProducer::send); } } }
点击 main()方法中的 KafkaProducer(),跳转到 KafkaProducer 构造方法。
org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(org.apache.kafka.clients.producer.ProducerConfig, org.apache.kafka.common.serialization.Serializer
// visible for testing @SuppressWarnings("unchecked") KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) { try { this.producerConfig = config; this.time = time; // 获取事务ID String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); // 获取客户端ID this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); LogContext logContext; if (transactionalId == null) logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId)); log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); // 监控Kafka运行情况 JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId))); reporters.add(jmxReporter); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); this.producerMetrics = new KafkaProducerMetrics(metrics); // 获取分区器 this.partitioner = config.getConfiguredInstance( ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); // key和value的序列化 if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; } if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; } // 拦截器处理(拦截器可以有多个) List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); if (interceptors != null) this.interceptors = interceptors; else this.interceptors = new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); // 控制单条日志大小 默认1m this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); // RecordAccumulator 缓冲区大小 默认32m this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); // 压缩 默认是none this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); int deliveryTimeoutMs = configureDeliveryTimeout(config, log); this.apiVersions = new ApiVersions(); this.transactionManager = configureTransactionState(config, logContext); // 缓冲区对象 默认是32m // batch.size 缓冲区一批数据最大值,默认 16k // 压缩方式 默认是none // linger.ms 默认是0 // 重试间隔时间,默认值 100ms。 // delivery.timeout.ms 默认值 2 分钟。 // request.timeout.ms 默认值 30s。 this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager, new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); // 连接上Kafka集群地址 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); // 获取元数据 if (metadata != null) { this.metadata = metadata; } else { // metadata.max.age.ms 默认值 5 分钟。生产者每隔多久需要更新一下自己的元数据 // metadata.max.idle.ms 默认值 5 分钟。网络最多空闲时间设置,超过该阈值,就关闭该网络 this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); // 初始化sender线程 this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; // 把sender线程放到后台 this.ioThread = new KafkaThread(ioThreadName, this.sender, true); // 启动sender线程 this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(Duration.ofMillis(0), true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }
点击 newSender()方法,查看发送线程初始化。
org.apache.kafka.clients.producer.KafkaProducer#newSender
// visible for testing Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { // 缓存请求的个数 默认是5个 int maxInflightRequests = configureInflightRequests(producerConfig); // 请求超时时间 默认30s int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); // 创建一个客户端对象 // clientId 客户端id // maxInflightRequests 缓存请求的个数 默认是5个 // RECONNECT_BACKOFF_MS_CONFIG 默认值 50ms。重试时间间隔 // RECONNECT_BACKOFF_MAX_MS_CONFIG 默认值 1000ms。重试的总时间。每次重试失败时,呈指数增加重试时间,直至达到此最大值 // SEND_BUFFER_CONFIG 默认值 128k。 socket 发送数据的缓冲区大小 // RECEIVE_BUFFER_CONFIG 默认值 32k。socket 接收数据的缓冲区大小 // requestTimeoutMs 默认值 30s。 // SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 默认值 10s。生产者和服务器通信连接建立的时间。如果在超时之前没有建立连接,将关闭通信。 // SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 默认值 30s。生产者和服务器通信,每次连续连接失败时,连接建立超时将呈指数增加,直至达到此最大值。 KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), metadata, clientId, maxInflightRequests, producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), time, true, apiVersions, throttleTimeSensor, logContext); // 0:生产者发送过来不需要应答;1:leader收到,应答;-1:leader和isr队列里面所有的都受到了应答 short acks = configureAcks(producerConfig, log); // 创建Sender线程 return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); }
Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并 找到 sender 对象中的 run()方法。
org.apache.kafka.clients.producer.internals.Sender#Sender
/** * The main run loop for the sender thread */ @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { // sender线程从缓冲区准备拉取数据,刚启动拉不到数据 runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } .... }
点击自己编写的 CustomProducer.java 中的 send()方法。
IntStream.range(0, 7) .mapToObj(i -> new ProducerRecord<>("first", i, "test", "atguigu")) .forEach(kafkaProducer::send);
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); }
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions // 拦截器处理发送的数据 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
点击 onSend()方法,进行拦截器相关处理。
org.apache.kafka.clients.producer.internals.ProducerInterceptors#onSend
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { // 拦截器处理 interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
从拦截器处理中返回,点击 doSend()方法。
org.apache.kafka.clients.producer.KafkaProducer#doSend
/** * Implementation of asynchronously send a record to a topic. */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { // 获取元数据 clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; // 序列化相关操作 byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } // 分区操作 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); // 保证数据大小能够传输(序列化后的 压缩后的) ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } // accumulator缓存 追加数据,result是是否添加成功的结果 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); // 批次大小已经满了或者创建了一个新的批次 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); // 唤醒发送线程 this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
详解默认分区规则。
org.apache.kafka.clients.producer.KafkaProducer#partition
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); // 如果指定分区,按照指定分区配置 return partition != null ? partition : // 分区器选择分区 partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
点击 partition,跳转到 Partitioner 接口。选中 partition,点击 ctrl+ h,查找接口实现类
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
选择默认的分区器 DefaultPartitioner
org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster, int)
/** * Compute the partition for the given record. * * @param topic The topic name * @param numPartitions The number of partitions of the given {@code topic} * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { // 没有指定key if (keyBytes == null) { // 按照粘性分区处理 return stickyPartitionCache.partition(topic, cluster); } // 如果指定key,按照key的hash值对分区取模 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
详解缓冲区大小
org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize
/** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { // 单条消息最大值 maxRequestSize 1m if (size > maxRequestSize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); // totalMemorySize 缓存大小 32m if (size > totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); }
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
/** * Add a record to the accumulator, return the append result * <p> * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created * <p> * * @param tp The topic/partition to which this record is being sent * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record * @param callback The user-supplied callback to execute when the request is complete * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds */ public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch // 获取或者创建一个队列(按照每个主题的分区) Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); // 尝试向队列里面添加数据(正常添加不成功) RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch if (abortOnNewBatch) { // Return a result that will cause another call to append. return new RecordAppendResult(null, false, false, true); } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 取批次大小(默认 16k)和消息大小的最大值(上限默认 1m)。这样设计的主要原因是有可能一条消息的大小大于批次大小。 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); // 申请内存 内存池分配内存 buffer = free.allocate(size, maxTimeToBlock); // Update the current time in case the buffer allocation blocked above. nowMs = time.milliseconds(); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } // 分装内存buffer MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); // 再次封装(得到真正的批次大小) ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callback, nowMs)); // 向队列的末尾添加批次 dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { // 释放内存 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
详解发送线程。
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
进入 sender 发送线程的 run()方法。
// main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } }
/** * Run a single iteration of sending * */ void runOnce() { if (transactionManager != null) { try { // 事务相关操作 transactionManager.maybeResolveSequences(); // do not continue sending if the transaction manager is in a failed state if (transactionManager.hasFatalError()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } // Check whether we need a new producerId. If so, we will enqueue an InitProducerId // request which will be sent below transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); if (maybeSendAndPollTransactionalRequest()) { return; } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request", e); transactionManager.authenticationFailed(e); } } long currentTimeMs = time.milliseconds(); // 发送数据 long pollTimeout = sendProducerData(currentTimeMs); // 获取发送结果 client.poll(pollTimeout, currentTimeMs); }
org.apache.kafka.clients.producer.internals.Sender#sendProducerData
private long sendProducerData(long now) { // 获取元数据 Cluster cluster = metadata.fetch(); // 判断32m缓存是否准备好 // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic, now); log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); } // 删除掉没有准备好发送的数据 // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } } // 发送每个节点数据,进行封装 // create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } accumulator.resetNextBatchExpiryTime(); List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches); // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why // we need to reset the producer id here. if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; failBatch(expiredBatch, new TimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch); } } sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data // that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout = Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; pollTimeout = 0; } // 发送请求 sendProduceRequests(batches, now); return pollTimeout; }
org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
/** * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated * partition batches. * <p> * A destination node is ready to send data if: * <ol> * <li>There is at least one partition that is not backing off its send * <li><b>and</b> those partitions are not muted (to prevent reordering if * {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION} * is set to one)</li> * <li><b>and <i>any</i></b> of the following are true</li> * <ul> * <li>The record set is full</li> * <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li> * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions * are immediately considered ready).</li> * <li>The accumulator has been closed</li> * </ul> * </ol> */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { Deque<ProducerBatch> deque = entry.getValue(); synchronized (deque) { // When producing to a large number of partitions, this path is hot and deques are often empty. // We check whether a batch exists first to avoid the more expensive checks whenever possible. ProducerBatch batch = deque.peekFirst(); if (batch != null) { TopicPartition part = entry.getKey(); Node leader = cluster.leaderFor(part); if (leader == null) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !isMuted(part)) { long waitedTimeMs = batch.waitedTimeMs(nowMs); // 如果不是第一次拉取该批次数据,且等待时间没有超过重试时间,backingOff=true boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; // 如果 backingOff=true,选择重试时间,如果不是重试,选择 lingerMs long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; // 批次大小满足发送条件 boolean full = deque.size() > 1 || batch.isFull(); // 如果超市,也要发送 boolean expired = waitedTimeMs >= timeToWaitMs; boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); boolean sendable = full || expired || exhausted || closed || flushInProgress() || transactionCompleting; if (sendable && !backingOff) { readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
org.apache.kafka.clients.producer.internals.RecordAccumulator#drain
/** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * @param now The current unix time in milliseconds * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize. */ public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); // 发往同一个 broker 节点的数据,打包为一个请求批次。 for (Node node : nodes) { List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); batches.put(node.id(), ready); } return batches; }
org.apache.kafka.clients.producer.internals.Sender#sendProduceRequests
/** * Transfer the record batches into a list of produce requests on a per-node basis */ private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) { for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); }
org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest
/** * Create a produce request from the given record batches */ private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return; final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); // find the minimum magic version used when creating the record sets byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection(); for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // down convert if necessary to the minimum magic used. In general, there can be a delay between the time // that the producer starts building the batch and the time that we send the request, and we may have // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use // the new message format, but found that the broker didn't support it, so we need to down-convert on the // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); if (tpData == null) { tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); tpd.add(tpData); } tpData.partitionData().add(new ProduceRequestData.PartitionProduceData() .setIndex(tp.partition()) .setRecords(records)); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, new ProduceRequestData() .setAcks(acks) .setTimeoutMs(timeout) .setTransactionalId(transactionalId) .setTopicData(tpd)); RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); String nodeId = Integer.toString(destination); // 创建发送请求对象 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); // 发送请求 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long)
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { ensureActive(); String nodeId = clientRequest.destination(); if (!isInternalRequest) { // If this request came from outside the NetworkClient, validate // that we can send data. If the request is internal, we trust // that internal code has done this validation. Validation // will be slightly different for some internal requests (for // example, ApiVersionsRequests can be sent prior to being in // READY state.) if (!canSendRequest(nodeId, now)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); } AbstractRequest.Builder<?> builder = clientRequest.requestBuilder(); try { NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.latestAllowedVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); } // The call to build may also throw UnsupportedVersionException, if there are essential // fields that cannot be represented in the chosen version. // 发送请求 doSend(clientRequest, isInternalRequest, now, builder.build(version)); } catch (UnsupportedVersionException unsupportedVersionException) { // If the version is not supported, skip sending the request over the wire. // Instead, simply add it to the local queue of aborted requests. log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException); ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, unsupportedVersionException, null, null); if (!isInternalRequest) abortedSends.add(clientResponse); else if (clientRequest.apiKey() == ApiKeys.METADATA) metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException)); } }
org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long, org.apache.kafka.common.requests.AbstractRequest)
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { log.debug("Sending {} request with header {} and timeout {} to node {}: {}", clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request); } Send send = request.toSend(header); InFlightRequest inFlightRequest = new InFlightRequest( clientRequest, header, isInternalRequest, request, send, now); // 添加请求到 inFlightRequests this.inFlightRequests.add(inFlightRequest); // 发送数据 selector.send(new NetworkSend(clientRequest.destination(), send)); }
获取服务器端响应
client.poll(pollTimeout, currentTimeMs);
/** * Do actual reads and writes to sockets. * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * @param now The current time in milliseconds * @return The list of responses received */ @Override public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // 获取发送后的响应 // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutConnections(responses, updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }