消息队列MQ

Kafka生产者源码初识

本文主要是介绍Kafka生产者源码初识,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Kafka生产者整体架构

在这里插入图片描述

  1. 整个生产者客户端主要有两个线程:主线程和Sender线程
  2. RecordAccumulator用来缓存消息,如果创建消息的速度过快,超过sender发给Kafka服务器的速度,会导致缓存空间不足

实现上述架构图的源码

  1. 在初始化生产者时,会初始化一个Sender线程并启动(下截图为KafkaProducer构造方法)
    在这里插入图片描述
  2. 在发送前先经过拦截器过滤
    在这里插入图片描述
  3. 序列化消息、选择分区以及添加消息累加器
    在这里插入图片描述
    3.1 在选择分区时,需要注意分区信息是通过元数据中获取(下面为详细分析)

分区消息源码分析

  1. 根据主题topic查询对应的分区,初始化时没有对应的分区,需要sender线程获取到后才会进行填充
    在这里插入图片描述
  2. 没有获取到分区信息,需要等待更新
    在这里插入图片描述
    在这里插入图片描述
    2.1 通过上图发现,需要等待updateVersion发生变化后才会结束等待,否则直到超时

Sender获取元数据

在这里插入图片描述

  1. KafkaProducer通过Sender进行相应的IO操作,而Sender又调用NetworkClient进行IO操作,NetworkClient底层是对Java NIO进行相应的封装
    在这里插入图片描述
  2. Sender的整体流程
    2.1 连接
    (1)通过SocketChannel连接节点
    (2)将soecketChannel向Selector注册连接事件SelectionKey.OP_CONNECT
    (3)为连接事件Key附属信息KafkaChannel
    2.2 循环Selector上的可读的key
    2.3 读取Selector上的可读的key
    (1)将数据从socketChannel读到ByteBuffer中
    (2)解析响应ByteBuffe
  3. 处理元数据响应:NetworkClient#handleCompletedMetadataResponse
    3.1 更新版本号:this.updateVersion += 1;
    3.2 将响应的主题的分区信息放在Cluster属性中

Sender的整体流程伪代码

连接

SocketChannel socketChannel = SocketChannel.open();
1. 配置SocketChannel 
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
2. 连接node
channel.connect(address);
3. 注册事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);//构造自定义附属信息
key.attach(channel);//附属key信息

轮询可读Key

int numReadyKeys = this.nioSelector.select(timeoutMs);
if (numReadyKeys > 0){
	Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
	 for (SelectionKey key:readyKeys){
			KafkaChannel channel =  (KafkaChannel) key.attachment();
			if (channel.ready()){
				 while ((networkReceive = channel.read()) != null) {
				 	addToStagedReceives(channel, networkReceive);
				 }
			}
	}
}
这篇关于Kafka生产者源码初识的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!