Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序!
kafka官网:http://kafka.apache.org/
他的核心功能:
1.高吞吐量:使用延迟低至 2 毫秒的机器集群以网络有限的吞吐量传递消息
2.可扩展:将生产集群扩展到多达一千个代理、每天数万亿条消息、PB 级数据、数十万个分区。弹性扩展和收缩存储和处理。
3.永久存储:将数据流安全地存储在分布式、持久、容错的集群中。
4.高可用性:在可用区上有效地扩展集群或跨地理区域连接单独的集群。
kafka名词解释:
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
在是用kafka的前提下必须是java环境,也就是jdk,其次就是Zookpeer;在启动kafka前必须先启动zk
Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。
这里不做环境准备的演示,直接进入demo
1.添加依赖 做一个简单的生产者与消费组 只需要依赖kafka-clients
即可
<!--kafka依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> </dependency>
2.创建一个Kafka生产者
package com.xiaoteng.kafka.simple; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 消息生产者 */ public class ProducerFastStart { //topic private static final String TOPIC = "kafkaDemo"; public static void main(String[] args) { //添加kafka的配置信息 Properties properties = new Properties(); //配置broker信息 properties.put("bootstrap.servers","192.168.200.130:9092"); //key 和 value 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //重试次数为10 当发送消息失败时 重试 10次后不成功会报错 properties.put(ProducerConfig.RETRIES_CONFIG,10); //生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); //封装消息 第一个为主题 topic 第二个发送消息的key 第三个就是我们发送的消息 ProducerRecord<String,String> record = new ProducerRecord<String, String>(TOPIC,"msg","hello kafka!"); //发送消息 try { producer.send(record); }catch (Exception e){ e.printStackTrace(); } //关系消息通道 producer.close(); } }
3.创建消费者
package com.xiaoteng.kafka.simple; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * 消息消费者 */ public class ConsumerFastStart { //topic private static final String TOPIC = "kafkaDemo"; public static void main(String[] args) { //添加配置信息 Properties properties = new Properties(); //ip端口 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); //key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //设置分组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1"); //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //订阅主题 必须与生产者保持一致 consumer.subscribe(Collections.singletonList(TOPIC)); while (true){ //收消息间隔 与 消息载体 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key()); System.out.println(record.value()); } } } }
这就已经实现了简单的生产者发送消息,消费者接收消息。
1.可以看到消费者当前的组为group1,当我启动两个消费者,接收同一个topic,组名都为group1时,生产者去发送消息,同一个组内,只要一个消费者接收到消息。
2.当已经启动了一个group1时,再启动一个gruop2,接收同一个topic时,两个都能接收到生产者发送的消息。
3.当要实现广播功能时,参考2就行,每个消费者都有自己的一个组。每个组只能有一个消费者接收到消息!
生产者的工作原理:
1.连接上kafka集群
2.发消息
3.leader将消息写入本地文件
4.将消息同步一份给追随者follower 副本
5.追随者follwer 将数据写入本地文件,并将ack(确认消息)
6.leader收到所有副本的确认消息后,向生产者发送确认消息
ack也分不同配置,不同工作原理。
1.发送类型:
1.发送并忘记(fire-and-forget): 把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息,因为他没有返回ack
2.同步发送: 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
try { RecordMetadata recordMetadata = producer.send(record).get(); long offset = recordMetadata.offset();//偏移量 System.out.println(offset); }catch (Exception e){ e.printStackTrace(); }
如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量,并且偏移量是一次递增的。
3.异步发送: 调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码
try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //如果异常为空,说明异步发送成功 if (null!=exception){ exception.printStackTrace(); } //获取偏移量 System.out.println(metadata.offset()); } }); }catch (Exception e){ e.printStackTrace(); }
如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
2.参数详解:
通过上面的入门代码,已经看到了几个必要的参数(bootstrap.servers、序列化器,重试等) 基本上都在producerConfig这个类中能找到
bootstrap.servers:就是kafka连接地址与端口号。
retries重试:生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
生产者还有很多可配置的参数,在kafka官方文档中都有说明,大部分都有合理的默认值,所以没有必要去修改它们,不过有几个参数在内存使用,性能和可靠性方法对生产者有影响
acks:指的是producer的消息发送确认机制
acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应,也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。高吞吐量,低可靠性。
acks=1: 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。默认情况下是等于1,也是吞吐量与可靠性的一个折中方案。 但是等于1的情况下,也有可能出现数据丢失,就是leader返回一个acks给producer后,还没得及同步消息给follwoer就挂了,导致数据丢失了。
acks=all : 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过他的延迟比acks=1时更高。低吞吐量,高可靠性。
消费者工作原理:
1.连接kafka集群
2.从kafka集群中拉取信息
3.kafka集群根据偏移量查询信息 offser就是偏移量 一个long型数值
4.将数据同步给消费者
1.参数详解:
与生产者类似 基本上都在ConsumerConfig这个类中能找到,向上面的入门代码中,连接kafka,反序列化,以及分组都有 或者直接去kafka文档看。
eauto.commit: 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。如果把它设置为true,还可以通过配置auto.commit.interval.ms
属性来控制提交的频率(就是多长时间提交一次)。
auto.offset.reset:
1.earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2.latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
3.none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
4.anything else(其他情况,不是上面三种情况就抛出异常): 向consumer抛出异常
2.提交和偏移量:
每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
消费者会往一个叫做_consumer_offset
的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。如下图:
消费者当前提交的偏移量为2,而他已经消费到11了,当他挂了时,在进行重平衡的时候,是从提交的偏移量开始继续消费的,导致了数据的重复消费。
如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。如下图:
当他提交的偏移量为11时,而他只消费到了3,在他挂了一个进行重平衡时,是从提交的偏移量开始消费的,导致数据的丢失。
3.自动提交偏移量:
当enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms
控制,默认值是5秒。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
4.提交当前偏移量(同步提交)
把enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
主要分为两部:1.把enable.auto.commit
设置为false 2.
//设置同步提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); while (true){ //收消息间隔 与 消息载体 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key()); System.out.println(record.value()); try { //手动/同步 提交 偏移量 consumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } } }
5.提交当前偏移量(异步提交)
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e); } } }); }
5.提交当前偏移量(同步和异步组合提交)
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try { while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } //异步提交 consumer.commitAsync(); } }catch (Exception e){ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { //不管异步成功与否,都同步提交一下,确保消息每一次都提交了 consumer.commitSync(); }finally { consumer.close(); } }
SpringBoot集成Kafka
1.依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xiaoteng.kafka</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <!-- 继承Spring boot工程 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> </parent> <properties> <kafka.version>2.2.7.RELEASE</kafka.version> <kafka.client.version>2.0.1</kafka.client.version> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.client.version}</version> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.client.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies> </project>
2.application.yml 配置文件,我这是偷懒,把两者都放到一块了,既是生产者也是消费者
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test-hello-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.创建生产者
package com.xiaoteng.kafka.controller; import com.alibaba.fastjson.JSON; import com.xiaoteng.kafka.entity.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController public class HelloController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/hello") public String hello() { //第一个参数:topics //第二个参数:消息内容 字符串信息 kafkaTemplate.send("hello-kafka", "哈喽kafka"); return "ok"; } @GetMapping("/hello2") public String hello2() { User u = new User(); u.setName("张三"); //第一个参数:topics [] "{}" //第二个参数:消息内容 对象转为json 把对象发过去 kafkaTemplate.send("hello-kafka2", JSON.toJSONString(u));//发送消息 return "消息ok"; } }
4.消费者
package com.xiaoteng.kafka.listener; import com.alibaba.fastjson.JSON; import com.xiaoteng.kafka.entity.User; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Optional; @Component public class HelloListener { @KafkaListener(topics = {"hello-kafka"}) public void receiverMessage(ConsumerRecord<?, ?> record) { //接收信息,判断是否为空 Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record); //如果不为空 if (optional.isPresent()) { //拿到对象信息 Object value = record.value(); System.out.println(value); //展示 哈喽kafka } } @KafkaListener(topics = {"hello-kafka2"}) public void listener(String record) { //拿到信息 转为对象 User user = JSON.parseObject(record, User.class); System.out.println("record:" + record); System.out.println("对象" + user); //展示 record:{"name":"张三"} //展示 对象User{name='张三'} //我这里的user对象 只设置了一个Name属性 } }