大家都知道, kafka的客户端是重构过一版的. 之前0.8的producer和consumer是使用scala开发的,后来因为各种原因, 实在是改不动了. 到了0.9版本的时候,使用java重构了kafka的客户端.
虽然现在java版的客户端还在广泛使用,而且没有什么太大的性能问题. 但是根据我这些天对kafka客户端的api的研究, 我总觉得, 总有一天, kafka的客户端还得来一次彻底的重构.因为什么呢? 因为实在是太–乱--啦:
1\. 多版本问题. 每个api都有好几个版本, 但是每个api使用的版本都不一致. 举个例子, 在kafka-client 1.0.0中,broker的版本是2.3.0时: METADATA(拉取topic元数据)的api有1个version, 当前使用版本是1. PRODUCE(生产消息)的api有6个version, 当前使用版本是6 FETCH(拉取消息)的api有5个version, 当前使用版本是5 2\. 报文的数据结构巨复杂 等下实现生产消息的报文的时候,你们会看到,这个报文嵌套了6层,即有6个子结构体.
以上我对kafka api的小小吐槽. 当然也可能是我水平不够,未能理解到它这么设计的用意和深意~~
kafka一直在不断地优化自身, 因此它的消息格式也是一直在变.
在<<Apache kafka实战 (胡夕著)>> 一书中(基于kafka 1.0.0), 作者介绍了到目前为止, 一共有3种消息格式V0,V1,V2. 其中V0和V1由于各种弊端, 早就逐渐的被淘汰了. 现在新版kafka使用的都是V2版本的消息格式. 本文就是在kafka2.3.0上实现的, 使用V2格式的消息能测试通过.
因此这里介绍的消息格式都是V2版本的.
在开始介绍kafka的消息格式之前, 大家还要理解一个概念: 可变长度. 常规的长度字段要么就是使用4个字节,要么就是使用8个字节来表示, 总之这个字段使用的字节数一般都是固定的. 但是在kafka的v2版本的消息里就不一样了. 它参考了Zig-zag的编码方式, 可以使用不同长度的字段来表示不同的数值. 简单来说就是这样: 用 0 来表示 0 用 1 来表示 1 用 2 来表示 -1 用 3 来表示 2 用 4 来表示 -2 ..... 这样的好处就是可以用比较少的字节数来表示绝对值比较小的数字, 不用每个数字都占用4个或8个字节, 从而可以节省很大的空间
了解了"可变长度"这个概念后就可以来看kafka的v2版本的消息格式了. 如下图(截图自<<Apache kafka实战 (胡夕著)>> 一书):
我们来一个一个了解这些字段
1\. 消息总长度. 顾名思义, 就是这条消息的总长度啦. 用的是Zig-zag编码表示 2\. 属性. 一个字节表示(8位), 其中第三位用来表示压缩方式.高5位保留,没有用到 由于我这里的实现没有用到压缩,所以这个字段总是0 3\. 时间戳增量.也是用Zig-zag编码. 所谓增量, 是指针对该消息batch的第一条消息的时间戳的增量. 消息batch接下来会介绍. 4\. 位移增量. 跟时间戳增量含义差不多 5\. key length. 每条kafka消息都可以有key, 这个就表示key的字节数 6\. key. 这个字段就是kafka消息里面的key. 7\. value size. 更key length含义差不多 8 value. 就是kafka消息的内容 9\. header个数. kafka消息也可以带有header 10\. header. kafka的header
看到第3和第4个字段是不是有点一脸懵?没关系, 继续往下看你就明白了.
kafka发送消息的时候并不是有一条发送一条的, 而是把多条消息集中在一起, 然后再一并发送的. 这就是所谓的kafka 消息batch.
而且这消息batch发送到kafka的broker之后, 它也同样不会拆开, 而是原封不动地把这个消息batch发给消费者,或存储到日志文件中.
因此理解这个消息batch对我们实现发送消息和消费消息都是必要的.
消息batch的格式如下图所示:
是不是一下子有点奔溃, 一下子冒出了这么多的字段. 没得办法, 我们再来一个个地看.
首先最后的"消息"就是上面介绍的v2版本格式的消息,可能会有x个, x就是倒数第二个字段"消息个数".
剩下的字段:
1\. 起始位移 最后面的"消息"中第一条消息的位移offset 2\. 长度 表示接下来的报文的长度, 即"消息batch的总长度" - 8Byte(起始位移字段) - 4Byte(长度字段) 3\. 分区leader版本号 我这里的实现写死为-1 4\. 版本号 就是magic. 我们这里是V2,所以是2 5\. CRC 是指接下来的所有字段的CRC码 6\. 属性 跟上面消息中的属性的含义一致 7\. 最大位移增量 就是最后一条消息的"位置增量"的值 8\. 起始时间戳 就是第一条消息的时间戳 9\. 最大时间戳 最后一条消息的时间戳 10\. 后面三个pid epoch, seq三个字段都是跟事务等相关的,我们这里没有用到, 所以都写死为-1
这里的"消息"和"消息batch"我在代码中定义的bean分别是KafkaMsgRecordV2和KafkaMsgRecordBatch. 如果看上面的文字和图片确实不好理解的话, 可以跟着代码看, 或者可以理解得更加深刻. 代码请见文末的github地址.
当然如果你理解了这一段, 那很好.不过也别开心太早.因为上面说了, kafak发送消息的数据结构嵌套了6层, 而富贵论坛才两层. 也就是还有4层等着我们去理解. 当然, 那4层相对是比较简单的. 最难理解的部分已经过去了
kafka每个api的请求都必须带有一个请求的header, 而每个api的响应体中也都带有一个响应的header.requestHeader和responseHeader分别如图所示:
响应的header比较简单, 就是一个correlationId,这个id其实是客户端发送给服务端, 服务端原封不动的返回了. 作用跟zookeeper的xid一样.
我们来看看requestHeader
public enum ApiKeys { /** * 发送消息 */ PRODUCE(0, "Produce", (short) 5), /** * fetch 消息 */ FETCH(1, "Fetch", (short)6), /** * 拉取元数据 */ METADATA(3, "Metadata", (short) 1); public final short id; public final String name; public short apiVersion; ApiKeys(int id, String name, short apiVersion) { this.id = (short) id; this.name = name; this.apiVersion = apiVersion; } }
代码中的id字段就是apiKey, apiVersion对应的就是header中的apiVersion. 正如我们开头吐槽的一样, 每个api的版本都是不一样的. 在这次实现里, 我只实现了3个api. 但实际上kafka提供十几个api.
关联性Id和zkClient中的xid作用是一样的, 主要是把请求和响应对应起来. kafka的响应报文中会包含这个字段.
不管是kafka生产者还是消费者, 都需要指定一个clientId. 在官方的客户端中,如果我们不指定的话, 也会自动生成一个clientId.
最后值得一提的是, 这里的clientIdLen是用两个字节表示的. kafka里面都是用2个字节表示字符串长度的. 这个跟zookeeper里面是不一样的.
生产者的逻辑实现在KafkaClient的send方法:
public ProduceResponse send(KafkaProduceConfig config, String topic , String key, String val)
正如上面一直提到的, 生产者的请求报文一共嵌套了6层, 具体表现为:
1\. ProduceRequest继承KafkaRequestHeader, 持有TopicProduceData对象 2\. TopicProduceData 持有PartitionData对象 3\. PartitionData持有Record对象 4\. Record持有KafkaMsgRecordBatch对象 5\. KafkaMsgRecordBatch持有KafkaMsgRecordV2对象
可以看到, 其实是以"broker信息" => “topic信息” =>“分区信息” => “记录信息” => “消息batch” => "消息"等层次逐渐包装的.
报文的的字段和图示这里就不再给出了,有兴趣的同学可以跟一下代码, 直接从序列化入手, 就可以理解kafka生产者的通讯协议了, 大体逻辑如下所示:
- ProduceRequest.serializable() - KafkaRequestHeader.serializable() - TopicProduceData.serializable() - PartitionData.serializable() - Record.serializable() - KafkaMsgRecordBatch.serializable() - KafkaMsgRecordV2.serializable()
经过上面的一系列serializable, 最终把一个ProduceRequest对象转换成一个ByteBuf,发往kafka的broker, 一条消息就成功的产生了.
生产者的逻辑实现在KafkaClient的poll方法:
public Map<Integer, List<ConsumerRecord>> poll(KafkaConsumerConfig consumerConfig, String topic, int partition, long fetchOffset)
相对于生产者来说, 消费者的请求报文相对简单,也是一个从"broker配置"=>“topic信息” => "分区信息"的包装过程
如下所示:
1\. FetchRequest 继承KafkaRequestHeader, 持有FetchTopicRequest对象 2\. FetchTopicRequest持有FetchTopicPartitionRequest对象
然而, 消费者的响应体就相对比生产者的响应体要复杂的多了.
因为上面说过, 生产者发送broker的"消息batch", broker是不会把它解析成具体的消息的. 而且原封不动地把它保存到日志中去, 从而也是原封不动的被消费者消费到. 因此这个解析消息的工作自然而然地就落到了消费者的肩上.
具体请参见KafkaClient#parseResp()方法
和之前的ZkClient和RedisClient一样, 这里也同样实现了一个kafkaClientTest,方便体验和调试.
这次针对了几种场景进行测试:
生产消息:
private final static String host = "localhost"; private final static int port = 9092; private final static String topic = "testTopic1"; @Test public void testProducer(){ KafkaClient kafkaClient = new KafkaClient("producer-111", host, port); KafkaProduceConfig kafkaConfig = new KafkaProduceConfig(); // 注意这里设置为0时, broker不会响应任何数据, 但是消息实际上是发送到broker了的 short ack = -1; kafkaConfig.setAck(ack); kafkaConfig.setTimeout(30000); ProduceResponse response = kafkaClient.send(kafkaConfig, topic,"testKey","helloWorld1113"); assert ack == 0 || response != null; System.out.println(new Gson().toJson(response)); }
可以在控制台看到消息被消费了:
lhhMacBook-Air:bin$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic1 helloWorld1113
private final static String host = "localhost"; private final static int port = 9092; private final static String topic = "testTopic1"; @Test public void testConsumer(){ // 如果broker上不存在这个topic的话, 直接消费可能会报错, 可以fetch一下metadata, 或先生产消息 // testMetaData(); // testProducer(); KafkaClient kafkaClient = new KafkaClient("consumer-111", host, port); KafkaConsumerConfig consumerConfig = new KafkaConsumerConfig(); consumerConfig.setMaxBytes(Integer.MAX_VALUE); consumerConfig.setMaxWaitTime(30000); consumerConfig.setMinBytes(1); Map<Integer, List<ConsumerRecord>> response = kafkaClient.poll(consumerConfig, topic, 0, 0L); assert response != null && response.size() > 0; Set<Map.Entry<Integer, List<ConsumerRecord>>> entrySet =response.entrySet(); for(Map.Entry<Integer, List<ConsumerRecord>> entry : entrySet){ Integer partition = entry.getKey(); System.out.println("partition" + partition + "的数据:"); for(ConsumerRecord consumerRecord : entry.getValue()){ System.out.println(new Gson().toJson(consumerRecord)); } } }
控制台打印出刚刚生产的消息(包含了之前测试的消息), 说明消费成功:
partition0的数据: {"offset":0,"timeStamp":1573896186007,"key":"testKey","val":"helloWorld"} {"offset":1,"timeStamp":1573896202787,"key":"testKey","val":"helloWorld"} {"offset":2,"timeStamp":1573896309808,"key":"testKey","val":"helloWorld111"} {"offset":3,"timeStamp":1573899639313,"key":"testKey","val":"helloWorld1113"} {"offset":4,"timeStamp":1574011584095,"key":"testKey","val":"helloWorld1113"}
生产消息:
lhhMacBook-Air:bin$ sh kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic222 >hi >h
消费消息输出, 说明消费成功
partition0的数据: {"offset":0,"timeStamp":1574012251856,"val":"hi"} {"offset":1,"timeStamp":1574012270368,"val":"h"}