以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务。传统的调用方式是同步调用,这会存在一定的性能问题
使用消息队列可以实现异步的通信方式,相比于同步的通信⽅式,异步的⽅式可以让上游快速成功,极大提高系统的吞吐量。在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终⼀致性
Kafka 是⼀个分布式的、⽀持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,⼀个 Kafka 节点就是⼀个 broker,⼀个或者多个 Broker 可以组成⼀个 Kafka 集群 |
Topic | Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定⼀个 topic |
Producer | 消息生产者,向 Broker 发送消息的客户端 |
Consumer | 消息消费者,从 Broker 读取消息的客户端 |
ConsumerGroup | 每个 Consumer 属于⼀个特定的 Consumer Group,⼀条消息可以被多个不同的 Consumer Group 消费,但是⼀个 Consumer Group 中只能有⼀个 Consumer 能够消费该消息 |
Partition | 物理上的概念,⼀个 topic 可以分为多个 partition,每个 partition 内部消息是有序的 |
安装 Kafka 之前需要先安装 JDK 和 Zookeeper,在官网下载 Kafka 安装包:http://kafka.apache.org/downloads,直接解压即可
需要修改配置文件,进⼊到 config 目录内,修改 server.properties
# broker.id 属性在 kafka 集群中必须唯一 broker.id= 0 # kafka 部署的机器 ip 和提供服务的端口号 listeners=PLAINTEXT://192.168.65.60:9092 # kafka 的消息存储文件 log.dir=/usr/local/data/kafka-logs # kafka 连接 zookeeper 的地址 zookeeper.connect= 192.168.65.60:2181
server.properties 核心配置详解:
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个 broker 都可以用⼀个唯⼀的非负整数 id 进行标识,作为 broker 的 名字 |
log.dirs | /tmp/kafka-logs | kafka 存放数据的路径,这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行 |
listeners | PLAINTEXT://192.168.65.60:9092 | server 接受客户端连接的端⼝,ip 配置 kafka 本机 ip 即可 |
zookeeper.connect | localhost:2181 | zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接⽅式为 hostname1:port1,hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间,默认数据保存时间对所有 topic 都⼀样 |
num.partitions | 1 | 创建 topic 的默认分区数 |
default.replication.factor | 1 | ⾃动创建 topic 的默认副本数量,建议设置为⼤于等于 2 |
min.insync.replicas | 1 | 当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每⼀个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
进入到 bin 目录下,使用命令来启动
./kafka-server-start.sh -daemon../config/server.properties
验证是否启动成功:进入到 zk 中的节点看 id 是 0 的 broker 有没有存在(上线)
ls /brokers/ids/
topic 可以实现消息的分类,不同消费者订阅不同的 topic
执行以下命令创建名为 test
的 topic,这个 topic 只有一个 partition,并且备份因子也设置为 1
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test
查看当前 kafka 内有哪些 topic
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181
把消息发送给 broker 中的某个 topic,打开⼀个 kafka 发送消息的客户端,然后开始⽤客户端向 kafka 服务器发送消息
kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到 kafka 集群中。在默认情况下,每一个行会被当做成一个独立的消息
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test
对于 consumer,kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用 kafka 的消费者客户端,从指定 kafka 服务器的指定 topic 中消费消息
方式一:从最后一条消息的 偏移量+1 开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test
消息的发送方会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进行存储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后一个消息的下一个偏移量开始消费
一个消费组里只有一个消费者能消费到某一个 topic 中的消息,可以创建多个消费者,这些消费者在同一个消费组中
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test
在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka 实现多播,只需要让不同的消费者处于不同的消费组即可
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test ./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test
# 查看当前主题下有哪些消费组 ./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list # 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量 ./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
生产者将消息发送给 broker,broker 会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置
主题 Topic 在 kafka 中是⼀个逻辑概念,kafka 通过 topic 将消息进行分类。不同的 topic 会被订阅该 topic 的消费者消费。但是有⼀个问题,如果说这个 topic 的消息非常多,消息是会被保存到 log 日志文件中的,这会出现文件过大的问题,因此,kafka 提出了 Partition 分区的概念
通过 partition 将⼀个 topic 中的消息分区来存储,这样的好处有多个:
为⼀个主题创建多个分区
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1
通以下命令查看 topic 的分区信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
分区的作用:
了解了 Partition,再补充一个 Kafka 细节:在消息日志文件中,kafka 内部创建了 __consumer_offsets 主题包含了 50 个分区。这个主题用来存放消费者某个主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给 kafka 中的默认主题:consumer_offsets。因此 kafka 为了提升这个主题的并发性,默认设置了 50 个分区
hash(consumerGroupId) % __consumer_offsets 主题的分区数
创建三个 server.properties 文件
# 0 1 2 broker.id=2 # 9092 9093 9094 listeners=PLAINTEXT://192.168.65.60:9094 # kafka-logs kafka-logs-1 kafka-logs-2 log.dir=/usr/local/data/kafka-logs-2
通过命令启动三台 broker
./kafka-server-start.sh -daemon../config/server0.properties ./kafka-server-start.sh -daemon../config/server1.properties ./kafka-server-start.sh -daemon../config/server2.properties
搭建完后通过查看 zk 中的 /brokers/ids 看是否启动成功
下面的命令,在创建主题时,除了指明了主题的分区数以外,还指明了副本数,分别是:一个主题,两个分区、三个副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
replicas:当前副本所存在的 broker 节点
leader:副本里的概念
follower:leader 处理所有针对这个 partition 的读写请求,而 follower 被动复制 leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果 leader 所在的 broker 挂掉,那么就会进行新 leader 的选举
isr:可以同步的 broker 节点和已同步的 broker 节点,存放在 isr 集合中
Kafka 集群中由多个 broker 组成,⼀个 broker 存放⼀个 topic 的不同 partition 以及它们的副本
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
/** * 消息的发送方 */ public class MyProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094"); // 把发送的 key 从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 把发送消息 value 从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 2.创建⽣产消息的客户端,传⼊参数 Producer<String, String> producer = new KafkaProducer<String, String>(props); // 3.创建消息 // key: 作⽤是决定了往哪个分区上发 // value: 具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka"); // 4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } }
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));
如果未指定分区,则会通过业务 Key 的 hash 运算,得出要发送的分区,公式为:hash(key)%partitionNum
⽣产者同步发消息,在收到 kafka 的 ack 告知发送成功之前将⼀直处于阻塞状态
// 等待消息发送成功的同步阻塞方法 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法
// 指定发送分区 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order)); // 异步回调方式发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset()); } } });
在同步发送的前提下,生产者在获得集群返回的 ack 之前会⼀直阻塞,那么集群什么时候返回 ack 呢?此时 ack 有三个配置:
min.insync.replicas
(默认为 1 ,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证,一般是金融级别,或跟钱打交道的场景才会使用这种配置props.put(ProducerConfig.ACKS_CONFIG, "1"); // 发送失败,默认会重试三次,每次间隔 100ms props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)
kafka 默认会创建⼀个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
kafka 本地线程会在缓冲区中⼀次拉 16k 的数据,发送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发到 broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
public class MySimpleConsumer { private final static String TOPIC_NAME = "my-replicated-topic"; private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094"); // 消费分组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 1.创建⼀个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 2.消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * 3. poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } } }
无论是自动提交还是手动提交,都需要把所属的 消费组 + 消费的某个主题 + 消费的某个分区 + 消费的偏移量
提交到集群的 _consumer_offsets 主题里面
自动提交:消费者 poll 消息下来以后自动提交 offset
// 是否自动提交 offset,默认就是 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交 offset 的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意:如果消费者还没消费完 poll 下来的消息就自动提交了偏移量,此时消费者挂了,于是下⼀个消费者会从已提交的 offset 的下⼀个位置开始消费消息,之前未被消费的消息就丢失掉了
手动提交:需要把自动提交的配置改成 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交又分成了两种:
手动同步提交
在消费完消息后调用同步提交的方法,当集群返回 ack 前⼀直阻塞,返回 ack 后表示提交成功,执行之后的逻辑
while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 有消息 // ⼿动同步提交 offset, 当前线程会阻塞直到 offset 提交成功 // ⼀般使⽤同步提交, 因为提交之后⼀般也没有什么逻辑代码了 consumer.commitSync(); // ====阻塞=== 提交成功 } }
手动异步提交
在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用
while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后⾯的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } } }); } }
消费者建立与 broker 之间的长连接,开始 poll 消息,默认⼀次 poll 五百条消息
// ⼀次 poll 最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)
可以根据消费速度的快慢来设置,如果两次 poll 的时间超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者
代码中设置了长轮询的时间是 1000 毫秒
while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } }
消费者每隔 1s 向 Kafka 集群发送心跳,集群发现如果有超过 10s 没有续约的消费者,将被踢出消费组,触发该消费组的 rebalance 机制,将该分区交给消费组里的其他消费者进行消费
// consumer 给 broker 发送⼼跳的间隔时间 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); // kafka 如果超过 10 秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
也即从头开始消费消息
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); // 从一小时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) { continue; } // 根据消费⾥的 timestamp 确定 offset Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的 offset+1 开始消费(消费新消息),可以通过以下的设置,让新的消费者第⼀次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
server: port: 8080 spring: kafka: bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 producer: # ⽣产者 retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码⽅式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交 # MANUAL # 手动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE redis: host: 172.16.253.21
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/msg") public class MyKafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") public String sendMessage(){ kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!"); return "send success!"; } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class MyConsumer { @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); // 手动提交offset ack.acknowledge(); } }
配置消费主题、分区和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数 public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); // 手动提交offset ack.acknowledge(); }
Kafka 集群中的 broker 在 zookeeper 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的 controller,负责管理整个集群中的所有分区和副本的状态:
如果消费者没有指明分区消费,那么当消费组里消费者和分区的关系发生变化,就会触发 rebalance 机制,重新调整消费者该消费哪个分区
在触发 rebalance 机制之前,消费者消费哪个分区有三种分配策略:
(分区总数/消费者数量)+1
,之后的消费者是 分区总数/消费者数量
LEO 是某个副本最后消息的消息位置(log-end-offset),HW 是已完成同步的位置。消息在写入 broker 时,且每个 broker 完成这条消息的同步后,HW 才会变化。在这之前,消费者是消费不到这条消息的,在同步完成之后,HW 更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失
生产者:
消费者:
如果生产者发送完消息后,却因为网络抖动,没有收到 ack,但实际上 broker 已经收到了。此时生产者会进行重试,于是 broker 就会收到多条相同的消息,而造成消费者的重复消费
解决方案:
生产者:使用同步发送,ack 设置成非 0 的值
消费者:主题只能设置⼀个分区,消费组中只能有⼀个消费者
所谓消息积压,就是消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩
解决方案:
假设一个应用场景:订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现,实现方案如下: