partition 数据路由规则:
- 指定了 partition ,则直接使用。
- 未指定 partition 但指定 key,通过对 key 的 value 进行 hash 选出一个 partition。
- partition 和 key 都未指定,使用轮询选出一个 partition。
- producer 先从 zookeeper 的 “/brokers/.../state”节点找到该 partition 的 leader。
- producer 将消息发送给该 leader。
- leader 将消息写入本地 log。
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ack。
- leader 收到所有 ISR 中的 relica 的 ack 后,增加 HW 并向 producer 发送 ack。
Leader 负责写入和读取数据;Follower 只负责备份;保证了数据的一致性。
Kafka 分配 Replication 的算法如下:
- 将所有 broker(假设供 n 个 broker) 和 待分配的 partition 排序。
- 将低 i 个 partition 分配到第 (i mod n)个 broker 上
- 将第 i 个 partition 的第 j 个 replica 分配到第 ((i+j) mod n)个 broker 上
- The high-level Consumer API
- The SimpleConsumer API
基于 Zookeeper 搭建并开启
./zkServer.sh start
若不知道如何安装Zookeeper可参考 Zookeeper 安装教程链接
配置Kafka
1)长传压缩包,解压到 /usr/soft 目录,文件夹名称为 kafka
Kafka 压缩包连接
2)修改配置文件【server.properties】
vim server.properties
修改如下对应的配置项 ,注意修改
#集群中,每个broker.id都不能相同 broker.id=0 #根据端口占用情况修改 port=9092 #开启远程监听,让第三方连接,改自己本机ip地址 listeners=PLAINTEXT://192.168.65.129:9092 #允许彻底删除 delete.enable.topic=true # log文件目录 log.dirs=/usr/soft/kafka-logs # 配置 zookeeper 服务器的 ip:port,使用英文逗号分隔 zookeeper.connect=192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181
3)修改环境变量
vim /etc/profile
在文件末尾添加如下内容
export KAFKA_HOME=/usr/soft/kafka export PATH=$KAFKA_HOME/bin:$PATH
配置文件生效
source /etc/profile
4)另外两台机器按照 1,2,3的步骤进行操作。
注意:server.properties 中的 broker.id 分别改成 1、2。
5)启动集群。
kafka-server-start.sh /usr/soft/kafka/config/server.properties
注意:需要启动前要关闭防火墙
- systemctl stop firewalld.service
- 永久关闭 systemctl disable firewalld.service
创建 Topic
kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --create --replication-factor 2 --partitions 2 --topic trans
删除 tpoic
kafka-topics.sh --delete --zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --topic trans
查看主题
kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --describe --topic trans
创建生产者
kafka-console-producer.sh --broker-list 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092 --topic trans
创建消费者
kafka-console-consumer.sh --bootstrap-server 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092 --topic trans --from-beginning
在生产者窗口中输入信息,在消费者窗口中可以看到,如下图,【虚拟机1】是生产者,【虚拟机2】是消费者。
segment 的参数有两个:
- log.segment.bytes:单个 segment 可容纳的最大数据量,默认为 1GB。
- log.segment.ms:Kafka 在 commit 一个未写满的 segment 前,所等待的时间(默认为 7 天)
partition 全局的第一个 Segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。
数值大小为 64 位,20位数字长度,没有数字用 0 填充
- 第一个 segment
00000000000000000000.index
00000000000000000000.log
- 第二个 segment,文件名以第一个 Segment 的最后一条消息的 offset 组成
00000000000000170410.index
00000000000000170410.log
- 第三个 segment,文件名以上一个 Segment 的最后一条消息的 offset 组成
00000000000000239430.index
00000000000000239430.log
0:At least one 消息不会丢,但可能会重复传输
1:At most once 消息可能会丢,但绝不会重复传输
3:Exactly once 每条消息肯定会被传输一次且仅传输一次
producer 可以选择是否为数据的写入接收 ack,有以下几种选项:request.required.acks
关键词:
我们备份数据就是防止数据丢失,当主节点挂掉时,可以启用备份节点
ISR(in sync Replicas)
物理消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作就无法读取到刚刚已提交而未处理的消息。这对应于 At most once。
如果在处理消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
partition_num = 2,启动一个 consumer 进程订阅这个 topic,对应的, stream_num 设置为 2,也就是说启动连个线程并行处理 message。
如果 auto.commit.enable=true
如果 auto.commit.enable=false
解决办法1:将多线程转成单线程
手动 commit offset,并且对 partition_num 启同样数目的 consumer 进程,这样就能保证一个 consumer 进程占有一个 partition,commit offset的时候不会影响别的 partition 的 offset。但这个方法比较局限,因为 partition 和 consumer 的数目必须严格对应
解决办法2:批量 commit
手动 commit offset,另外在 consumer 端将将所有 拉取的数据缓存到 queue 里,当把queue 里所有的数据处理完之后,再批量提交 offset,这样就能保证只有处理完才被 commit。
注意,需要先导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>
public class Hello01Producer extends Thread { private Producer<String,String> producer; /** * 构造器函数 */ public Hello01Producer(String threadName) { //设置线程名称 super.setName(threadName); //创建配置文件列表 Properties properties = new Properties(); //kafka地址,多个地址用逗号分隔 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092"); //key 值的序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName()); //VALUE 值的序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName()); //写出的应答方式 properties.put(ProducerConfig.ACKS_CONFIG,"1"); //在重试发送失败的request前的等待时间 properties.put(ProducerConfig.RETRIES_CONFIG,"0"); //批量写出 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //创建生产者对象 producer = new KafkaProducer<>(properties); } @Override public void run() { //初始化一个计数器 int count = 0; System.out.println("Hello01Producer.run()——开始发送数据"); while (count < 100000) { String key = String.valueOf(count++); String value = Thread.currentThread().getName() + "--" + count; //封装消息对象 ProducerRecord<String,String> msg = new ProducerRecord<>("trans",key,value); //发送消息到 Kafka producer.send(msg); //打印消息 System.out.println("Producer.send--" +key + "--"+value); //每隔0.1秒发送一次 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { Hello01Producer producer = new Hello01Producer("测试发送"); producer.start(); Thread.sleep(Long.MAX_VALUE); } }
public class Hello01Consumer extends Thread { private Consumer<String,String> consumer; public Hello01Consumer(String name) { super.setName(name); //创建配置文件列表 Properties properties = new Properties(); //kafka地址,多个地址用逗号分隔 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092"); //创建组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-data"); //key 值的序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName()); //VALUE 值的序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName()); //开启自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); //自动提交时间间隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //自动重置的偏移量 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //实例化消费组 consumer = new KafkaConsumer<String, String>(properties); } @Override public void run() { //创建list ,元素为 topic 和对应分区 List<TopicPartition> list = new ArrayList<>(); list.add(new TopicPartition("trans",0)); list.add(new TopicPartition("trans",1)); //assign和subscribe 都可以消费 topic consumer.assign(list); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String,String> record : records) { System.out.println("接收到数据:" + record.value() + " partition:"+record.partition() + " offset:"+record.offset()); } } } public static void main(String[] args) throws InterruptedException { Hello01Consumer consumer = new Hello01Consumer("测试接收"); consumer.start(); Thread.sleep(Long.MAX_VALUE); } }
有可能一个消费者取出了一条数据,但是还没有处理完,但是消费者被关闭了。
消费者自动提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
若提交间隔 > 单条数据执行时间 (重复)
若提交间隔 < 单条数据执行时间 (丢失)
一般来说,每个 partition 能处理的吞吐为 几 MB/s (具体需要根据本地环境测试),增加更多的 partition 意味着:
调整准则:
此参数决定的是 records 复制的数目,建议至少设置为 2,一般是 3,最高设置为 4。
更高的 replication factor(假设数目为 N),意味着:
调整准则:
为了大幅度提高 producer 写入吞吐量,需要定期批量写文件:
log.flush.interval.message=10000
log.flush.interval.ms=1000