之前也学习过消息队列,但一直没有使用的场景,今天项目中遇到了 kafka 那便有了应用场景
Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:
之前 笔者也写过 RabbitMQ 的笔记,传送门
结合 kafka 的下面这些名词来解释其模型会更加容易理解
名称 | 解释 |
---|---|
Broker | kafka 的实例,部署多台 kafka 就是有多个 broker |
Topic | 消息订阅的话题,是这些消息的分类,类似于消息订阅的频道 |
Producer | 生产者,负责往 kafka 发送消息 |
Consumer | 消费者,从 kafka 读取消息来进行消费 |
kafka 和依赖的 zookeeper 是 java 编写的工具,其需要 jdk8 及其以上。笔者这里使用 Docker 安装,偷懒了贪图方便快捷
# 使用 wurstmeister 制作的镜像 docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka # 启动 zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper # 单机启动 kafka docker run -d --name kafka -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
kafka 官网也有很好的介绍,quickstart
# 进入kafka容器 docker exec -it kafka /bin/sh # 进入 bin 目录 cd /opt/kafka_2.13-2.8.1/bin # partitions 分区 # replication 副本因子 # 创建一个主题(参数不懂可直接填写,后面会讲解说明) ./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092 # 查看 ./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 # 写入 topic(回车表示一条消息,ctrl + c 结束输入) # 消息默认存储 7 天,下一步的消费可以验证 ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event # 读取 topic(运行多次可以读取消息,因为默认存储 7 天) ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
SpringBoot 集成了 Kafka,添加依赖后可使用内置的 KafkaTemplate 模板方法来操作 kafka 消息队列
<!-- sprinboot版本管理中有kafka可不写版本号 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
server: port: 8080 spring: # 消息队列 kafka: producer: # broker地址,重试次数,确认接收个数,消息的编解码方式 bootstrap-servers: 101.200.197.22:9092 retries: 3 acks: 1 key-serializer: org.springframework.kafka.support.serializer.StringSerializer value-serializer: org.springframework.kafka.support.serializer.StringSerializer consumer: # broker地址,自动提交,分区offset设置 bootstrap-servers: 101.200.197.22:9092 enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@RestController @RequestMapping("/kafka") public class Producer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/producer1") public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message); SendResult<String, Object> sendResult = future.get(); return sendResult.toString(); } @GetMapping("/producer2") public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("faile"); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("success"); } }); return ""; } }
@Component public class Consumer { @KafkaListener(topics = {"topic1"}) public void onMessage(ConsumerRecord<?, ?> record) { System.out.println(record.value()); } }
kafka |____kafka-logs |____topic1 | |____00000000000000000000.log(存储接收的消息) | |____consumer_offsets-01(消费者偏移量) | |____consumer_offsets-02 |____topic2 |____00000000000000000000.log |____consumer_offsets-01 |____consumer_offsets-02
每台 broker 实例接收到消息后将之存储到 00000.log 里面,保存的方式是先入先出。消息被消费后不会被删除,相反可以设置 topic 的消息保留时间,重要的是 Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的
消费者会将自己消费偏移量 offset 提交给 topic 在 _consumer_offsets 里面保存,然后通过偏移量来确定消息的位置,默认从上次消费的位置开始,添加参数 --frombeginning 则从头开始消费,可获取之前所有存储的消息。kafka 也会定期清除内部的消息,直到保存最新的一条(文件保存的消息默认保存 7 天)
这个在笔者配置消费者的时候发现的问题,启动时报错说没有指定消费组
topic 消息保存的文件 0000.log 可以进行物理切分,这就是分区的概念,类似于数据库的分库分表。这样做的好处在于单个保存的文件不会太大从而影响性能,最重要的是分区后不是单个文件串行执行了,而是多区多文件可并行执行提高了并发能力
分区:消费者会消费同一 topic 的不同分区,所以会保存不同分区的偏移量,其格式为:GroupId + topic + 分区号
副本:副本是对分区的备份,集群中不同的分区在不同的 broker 上,但副本会对该分区备份到指定数量的 broker 上,这些副本有 leader 和 follower 的区别,leader负责读写,挂了再重新选举,副本为了保持数据一致性
生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次
生产者 ack 确认配置: