消息队列MQ

kafka教程

本文主要是介绍kafka教程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1.kafka 系统架构

1.1 broker

  • kafka 集群包含一个或多个服务器,服务器节点称为 broker。

1.2 Tpoic

  • 每条发布到 kafka 集群的消息都有一个类别,这个类别称为 Topic。
  • 类似于数据的表名。
  • 物理上不同的 Topic 的消息分开存储。
  • 逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存储于何处。

1.3 Partition

  • Topic 中的数据分割为一个或多个 partition。
  • 每个 Topic 至少有一个 partition,当生产者生产数据的时候,根据分配策略,选择分区,然后将消息追加到指定的分区的末尾(队列)。

partition 数据路由规则:

  1. 指定了 partition ,则直接使用。
  2. 未指定 partition 但指定 key,通过对 key 的 value 进行 hash 选出一个 partition。
  3. partition 和 key 都未指定,使用轮询选出一个 partition。
  • 每个消息都会有一个自增的编号:标识顺序;用于标识消息的偏移量。
  • 每个 partition 中的数据使用多个 segment 文件存储。
  • partition 中的数据是有序的,不同 partition 间的数据丢失了数据的顺序。
  • 如果 topic 有多个 partition,消费时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。

1.4 Leader

  • 每个 partition 有多个副本,其中有且仅有一个作为 leader,leader 是当前负责数据的读写 partition。
  1. producer 先从 zookeeper 的 “/brokers/.../state”节点找到该 partition 的 leader。
  2. producer 将消息发送给该 leader。
  3. leader 将消息写入本地 log。
  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ack。
  5. leader 收到所有 ISR 中的 relica 的 ack 后,增加 HW 并向 producer 发送 ack。

1.5 Follower

  • follower 跟随 leader,所有写请求都通过 leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。
  • 如果 Leader 失效,则从 Follower 中选举出一个新的 leader。
  • 当 Follower 挂掉、卡主或者同步太慢,leader 会把这个 follower 从 “in sync replicas” (ISR)列表中删除,重新创建一个 Follower。

1.6 replication

  • 数据会存放到 topic 的 partition 中,但是有可能分区会损坏。
  • 我们需要分兑取的数据进行备份(备份多少取决于你对数据的重视程度)
  • 我们将分区 分为 Leader(1) 和 Follower(N)

Leader 负责写入和读取数据;Follower 只负责备份;保证了数据的一致性。

  • 备份数为 N,表示 主+备=(N)

Kafka 分配 Replication 的算法如下:

  1. 将所有 broker(假设供 n 个 broker) 和 待分配的 partition 排序。
  2. 将低 i 个 partition 分配到第 (i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第 ((i+j) mod n)个 broker 上

1.7 producer

  • 生产者即数据的发布者,该角色将消息发布到 Kafka 的 Topic 中
  • broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。
  • 生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。

1.8 consumer

  • 消费者可以从 broker 中读取数据。消费者可以消费多个 topic 数据。
  • Kafka 提供了两套 consumer API
  1. The high-level Consumer API
  2. The SimpleConsumer API
  • high-level Consumer API 提供了一个从 Kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多的关注细节。

1.9 consumer Group

  • 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer  指定 Consumer  Group,若不指定 group 则属于默认的 group)
  • 将多个消费者集中到一起去处理某个 Topic 的数据,可以更快的提高数据的消费能力。
  • 整个消费者组共享一组偏移量(防止数据被重复读取),因为 Topic 有多个分区。

1.10 offset 偏移量

  • 可以唯一的标识一条消息
  • 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。
  • 消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息
  • 我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
  • 消息最终还是会被删除的,默认声明周期为 1 周。

1.11 zookeeper

  • Kafka 通过 zookeeper 来存储集群的 meta 信息。
Kafka拓扑结构

 2.Kafka 环境搭建

基于 Zookeeper 搭建并开启

./zkServer.sh start

若不知道如何安装Zookeeper可参考 Zookeeper 安装教程链接

配置Kafka

1)长传压缩包,解压到 /usr/soft 目录,文件夹名称为 kafka

Kafka 压缩包连接

2)修改配置文件【server.properties】

vim server.properties

修改如下对应的配置项 ,注意修改

#集群中,每个broker.id都不能相同
broker.id=0

#根据端口占用情况修改
port=9092

#开启远程监听,让第三方连接,改自己本机ip地址
listeners=PLAINTEXT://192.168.65.129:9092

#允许彻底删除
delete.enable.topic=true


# log文件目录
log.dirs=/usr/soft/kafka-logs

# 配置 zookeeper 服务器的 ip:port,使用英文逗号分隔
zookeeper.connect=192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181

3)修改环境变量

vim /etc/profile

 在文件末尾添加如下内容

export KAFKA_HOME=/usr/soft/kafka
export PATH=$KAFKA_HOME/bin:$PATH

配置文件生效

source /etc/profile

4)另外两台机器按照 1,2,3的步骤进行操作。

注意:server.properties 中的 broker.id 分别改成 1、2。

5)启动集群。

kafka-server-start.sh /usr/soft/kafka/config/server.properties

 注意:需要启动前要关闭防火墙

  • systemctl stop firewalld.service
  • 永久关闭  systemctl disable firewalld.service

3.常用命令

创建 Topic

kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --create --replication-factor 2 --partitions 2 --topic trans
  • --create: 指定创建topic动作
  • --topic:指定新建topic的名称
  • --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  • --partitions:指定当前创建的kafka分区数量,默认为1个
  • --replication-factor:指定每个分区的复制因子个数,默认1个 

删除 tpoic

kafka-topics.sh --delete --zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --topic trans

查看主题

kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --describe --topic trans

创建生产者

kafka-console-producer.sh --broker-list 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092 --topic trans

创建消费者

kafka-console-consumer.sh --bootstrap-server 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092  --topic trans --from-beginning

在生产者窗口中输入信息,在消费者窗口中可以看到,如下图,【虚拟机1】是生产者,【虚拟机2】是消费者。

4.Kafka 数据检索机制

  •  topic 在物理层面以 partition 为分组,一个 topic 可以分成若干个 partition
  • partition 还可以细分为 Segment,一个 partition 物理上由多个 Segment 组成

segment 的参数有两个:

  1. log.segment.bytes:单个 segment 可容纳的最大数据量,默认为 1GB。
  2. log.segment.ms:Kafka 在 commit 一个未写满的 segment 前,所等待的时间(默认为 7 天)
  • LogSegment 文件由两部分组成,分别为 “.index” 和 “.log” 文件文件,分别表示为 Segment 所有文件和数据文件。

partition 全局的第一个 Segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。

数值大小为 64 位,20位数字长度,没有数字用 0 填充

  • 第一个 segment

00000000000000000000.index

00000000000000000000.log

  • 第二个 segment,文件名以第一个 Segment 的最后一条消息的 offset 组成

00000000000000170410.index

00000000000000170410.log

  • 第三个 segment,文件名以上一个 Segment 的最后一条消息的 offset 组成

00000000000000239430.index

00000000000000239430.log

  • 消息都具有固定的物理结构,包括:offset(8 bytes)、消息体的大小(4 bytes)、crc32(4 bytes)、magic(1 Byte)、attribute(1 Byte)、key length(4 Bytes)、key(4 Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

5.数据的安全性

0:At least one 消息不会丢,但可能会重复传输

1:At most once 消息可能会丢,但绝不会重复传输

3:Exactly once 每条消息肯定会被传输一次且仅传输一次

5.1 producer delivery guarantee

producer 可以选择是否为数据的写入接收 ack,有以下几种选项:request.required.acks

  • asks=0:Producer 在 ISR 中的 Leader 已经成功收到的数据并得到确认后发送下一条 Message。若 Producer 没有收到确认,可能会重复发消息。
  • asks=1:这意味着 Producer 无需等待来自 Broker 的确认而继续发送下一条消息。
  • asks=all:Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。 

5.2 ISR 机制

关键词:

  • AR :Assigned Replicas 用来标识副本的全集。
  • OSR:out sync Replicas 离开同步队列的副本。
  • ISR:in sync Replicas 加入同步队列的副本。
  • ISR = Leader + 没有落后太多的副本(follower);AR = OSR + ISR

 我们备份数据就是防止数据丢失,当主节点挂掉时,可以启用备份节点

  • producer--push-->leader
  • leader<--pull--follower
  • follower 每隔一定时间去 Leader 拉取数据,来保证数据的同步

ISR(in sync Replicas)

  • 当主节点挂掉,并不是去 follower 选择主,而是从 ISR 中选择主
  • 判断标准:超过 10 秒钟没有同步数据(replica.lag.time.max.ms=10000);主副节点差 4000 条数据(replica.lag.time.max.messages=4000)
  • 脏节点选举:kafka 采用一种降级措施来处理;选举第一个恢复的 node 作为 leader 提供服务,以它的数据为基准,这个措施被称为脏 leader 选举。

5.3 broker 数据存储机制

物理消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824

5.4 consumer delivery guarantee

  • 如果 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取过程,那 kafka 确保了 Exactly once。
  • 读完消息先 commit 再处理消息

如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作就无法读取到刚刚已提交而未处理的消息。这对应于 At most once。

  • 读完消息先处理再 commit 数据

如果在处理消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。

  • 如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。经典的做法是引入两阶段提交。
  • Kafka 默认保证 At least once。

5.5 数据的消费

partition_num = 2,启动一个 consumer 进程订阅这个 topic,对应的, stream_num 设置为 2,也就是说启动连个线程并行处理 message。

如果 auto.commit.enable=true

  • 当 consumer 拉取了一些数据但还没有处完全处理掉的时候
  • 刚刚到 commit interval 发出了提交 offset 操作,接着 consumer crash 掉了。
  • 这是 拉取的数据还没未完成处理但已经被 commit 了,因此没有机会再次被处理,数据丢失

如果 auto.commit.enable=false

  • 假设 consumer 的两个拉取各自拿了一条数据,并且由两个线程同时处理
  • 这时线程 t1 处理完 partition1 的数据,手动提交 offset。
  • 这里需要说明的是,当手动执行 commit 的时候,实际上是对这个 consumer 进程所占有 partition 进行 commit。也就是说即使 t2 没有处理完 partition2的数据,offset 也被 t1 提交了。这时如果 consumer crash 掉,t2 正在处理的这条数据就丢失了。

解决办法1:将多线程转成单线程

手动 commit offset,并且对 partition_num 启同样数目的 consumer 进程,这样就能保证一个 consumer 进程占有一个 partition,commit offset的时候不会影响别的 partition 的 offset。但这个方法比较局限,因为 partition 和 consumer 的数目必须严格对应

解决办法2:批量 commit

手动 commit offset,另外在 consumer 端将将所有 拉取的数据缓存到 queue 里,当把queue 里所有的数据处理完之后,再批量提交 offset,这样就能保证只有处理完才被 commit。 

6.Java API

注意,需要先导入依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

6.1 生产者代码

public class Hello01Producer extends Thread {
    private Producer<String,String> producer;
    /**
     * 构造器函数
     */
    public Hello01Producer(String threadName) {
        //设置线程名称
        super.setName(threadName);
        //创建配置文件列表
        Properties properties = new Properties();
        //kafka地址,多个地址用逗号分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092");
        //key 值的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
        //VALUE 值的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
        //写出的应答方式
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        //在重试发送失败的request前的等待时间
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        //批量写出
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //创建生产者对象
        producer = new KafkaProducer<>(properties);
    }

    @Override
    public void run() {
        //初始化一个计数器
        int count = 0;
        System.out.println("Hello01Producer.run()——开始发送数据");
        while (count < 100000) {
            String key = String.valueOf(count++);
            String value = Thread.currentThread().getName() + "--" + count;
            //封装消息对象
            ProducerRecord<String,String> msg = new ProducerRecord<>("trans",key,value);
            //发送消息到 Kafka
            producer.send(msg);
            //打印消息
            System.out.println("Producer.send--" +key + "--"+value);
            //每隔0.1秒发送一次
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Hello01Producer producer = new Hello01Producer("测试发送");
        producer.start();
        Thread.sleep(Long.MAX_VALUE);
    }
}

6.2 消费者代码

public class Hello01Consumer extends Thread {
    private Consumer<String,String> consumer;

    public Hello01Consumer(String name) {
        super.setName(name);
        //创建配置文件列表
        Properties properties = new Properties();
        //kafka地址,多个地址用逗号分隔
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092");
        //创建组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-data");
        //key 值的序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName());
        //VALUE 值的序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName());
        //开启自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //自动重置的偏移量
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //实例化消费组
        consumer = new KafkaConsumer<String, String>(properties);
    }

    @Override
    public void run() {
        //创建list ,元素为 topic 和对应分区
        List<TopicPartition> list = new ArrayList<>();
        list.add(new TopicPartition("trans",0));
        list.add(new TopicPartition("trans",1));
        //assign和subscribe 都可以消费 topic
        consumer.assign(list);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String,String> record : records) {
                System.out.println("接收到数据:" + record.value() + " partition:"+record.partition() + " offset:"+record.offset());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Hello01Consumer consumer = new Hello01Consumer("测试接收");
        consumer.start();
        Thread.sleep(Long.MAX_VALUE);
    }
}

6.3 重复消费和数据的丢失

有可能一个消费者取出了一条数据,但是还没有处理完,但是消费者被关闭了。

消费者自动提交时间间隔 

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")

若提交间隔 > 单条数据执行时间 (重复)

若提交间隔 < 单条数据执行时间 (丢失)

7.Kafka 优化

9.1 Partition 数目

一般来说,每个 partition 能处理的吞吐为 几 MB/s (具体需要根据本地环境测试),增加更多的 partition 意味着:

  • 更高的并发与吞吐
  • 可以扩展更多的 consumers(同一个 consumer group 中)
  • 若是集群中有较多的 brokers,则可更大程度上利用闲置的 brokers。
  • 但是会造成 Zookeeper 的更多选举。
  • 也会在 Kafka 中打开更多的文件。

调整准则:

  • 一般来说,若是集群较小(小于 6 个 brokers),则 partition_num = 2 x broker_num。这里主要考虑的是之后的扩展。若是集群扩展了一般(例如 12 个),则不用担心会有 partition 不足的现象发生。
  • 一般来说,若是集群较大(超过 12 个 brokers),则  partition_num = broker_num。因为这里不再需要再考虑集群的扩展情况,与 broker 相同的 partition 数已经足够应付常规场景。若有必要再手动调整。
  • 考虑最高峰吞吐需要的并行 consumer 数,调整 partition 的数目。若是应用场景需要 20 个consumer(同一个 consumer group)并行消费,则据此设置为 20 个 partition
  • 考虑 producer 所需的吞吐,调整 partition 数目(如果 producer 的吞吐非常高,或者是在接下来两年内都比较高,则增加 partition 的数目)

9.2 Replication factor

此参数决定的是 records 复制的数目,建议至少设置为 2,一般是 3,最高设置为 4。

更高的 replication factor(假设数目为 N),意味着:

  • 系统更未定(允许 N-1 个 broker 宕机)
  • 更多的副本(如果 acks=all,则会造成较高的延时)
  • 系统磁盘的使用率会更高(一般若是 RF 为 3,则相对于 RF 为 2 时,会占据更多 50% 的磁盘空间)

调整准则:

  • 以 3 为起始(当然至少需要有 3 个 broker,同时也不建议 一个 Kafka 集群中节点数少于 3 个节点)
  • 如果 replication 性能称为了性能瓶颈或是一个 issue,则建议使用性能更好的 broker,而不是降低 RF 的数目
  • 永远不要在生产环境中设置 RF 为 1

9.3 批量写入

为了大幅度提高 producer 写入吞吐量,需要定期批量写文件:

  • 每当 producer 写入 10000 条消息时,刷数据到磁盘

log.flush.interval.message=10000

  • 每隔1秒钟,刷数据到磁盘

log.flush.interval.ms=1000

这篇关于kafka教程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!