消息队列MQ

kafka入门

本文主要是介绍kafka入门,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序!

kafka官网:http://kafka.apache.org/

他的核心功能:

1.高吞吐量:使用延迟低至 2 毫秒的机器集群以网络有限的吞吐量传递消息

2.可扩展:将生产集群扩展到多达一千个代理、每天数万亿条消息、PB 级数据、数十万个分区。弹性扩展和收缩存储和处理。

3.永久存储:将数据流安全地存储在分布式、持久、容错的集群中。

4.高可用性:在可用区上有效地扩展集群或跨地理区域连接单独的集群。

kafka名词解释:

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • 批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
  • 分区Partition:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性。topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个文件进行存储。partition中的数据是有序的,partition之间的数据是没有顺序的。如果topic有多个partition,消费数据时就不能保证数据的顺序(比如A 和B 中 数据都有序,那把A和B的数据放在一块就无序了)。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1,默认是多个。
  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

  • Broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来),相当于leader。
  • 副本Replica: Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表(保持同步的副本列表)中删除,重新创建一个Follower。
  • Zookeeper:  kafka对与zookeeper是强依赖的,是以zookeeper作为基础的,即使不做集群,也需要zk的支持。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行重平衡。
  • 消费者群组Consumer Group: 生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。
  • 偏移量Consumer Offset:  偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据,与下面某个消费者挂掉后相关联,比如这个消费者已经消费了3条消息,然后挂掉了,那么偏移量就会记录下来,在重平衡的时候其他的消费者也会从4开始平均分配数据。
  • 重平衡Rebalance:  消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程,就是说其中一个挂了,那么他的数据会平均分配给其他的消费者。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

在是用kafka的前提下必须是java环境,也就是jdk,其次就是Zookpeer;在启动kafka前必须先启动zk

Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。

这里不做环境准备的演示,直接进入demo

1.添加依赖  做一个简单的生产者与消费组 只需要依赖kafka-clients即可

<!--kafka依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

2.创建一个Kafka生产者

package com.xiaoteng.kafka.simple;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 消息生产者
 */
public class ProducerFastStart {
    //topic
    private static final String TOPIC = "kafkaDemo";

    public static void main(String[] args) {

        //添加kafka的配置信息
        Properties properties = new Properties();
        //配置broker信息
        properties.put("bootstrap.servers","192.168.200.130:9092");
        //key  和 value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //重试次数为10  当发送消息失败时 重试  10次后不成功会报错
        properties.put(ProducerConfig.RETRIES_CONFIG,10);

        //生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装消息  第一个为主题 topic  第二个发送消息的key  第三个就是我们发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>(TOPIC,"msg","hello kafka!");
        //发送消息
        try {
            producer.send(record);
        }catch (Exception e){
            e.printStackTrace();
        }

        //关系消息通道
        producer.close();
    }
}

3.创建消费者

package com.xiaoteng.kafka.simple;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消息消费者
 */
public class ConsumerFastStart {

    //topic
    private static final String TOPIC = "kafkaDemo";

    public static void main(String[] args) {

        //添加配置信息
        Properties properties = new Properties();
        //ip端口
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key  value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //设置分组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅主题  必须与生产者保持一致
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true){
            //收消息间隔  与 消息载体
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }

    }
}

这就已经实现了简单的生产者发送消息,消费者接收消息。 

1.可以看到消费者当前的组为group1,当我启动两个消费者,接收同一个topic,组名都为group1时,生产者去发送消息,同一个组内,只要一个消费者接收到消息。

2.当已经启动了一个group1时,再启动一个gruop2,接收同一个topic时,两个都能接收到生产者发送的消息。

3.当要实现广播功能时,参考2就行,每个消费者都有自己的一个组。每个组只能有一个消费者接收到消息!

 

生产者的工作原理:

 

1.连接上kafka集群  

2.发消息

3.leader将消息写入本地文件

4.将消息同步一份给追随者follower 副本

5.追随者follwer 将数据写入本地文件,并将ack(确认消息)

6.leader收到所有副本的确认消息后,向生产者发送确认消息

ack也分不同配置,不同工作原理。

1.发送类型:

1.发送并忘记(fire-and-forget): 把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息,因为他没有返回ack

2.同步发送: 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

try {
            RecordMetadata recordMetadata = producer.send(record).get();
            long offset = recordMetadata.offset();//偏移量
            System.out.println(offset);
        }catch (Exception e){
            e.printStackTrace();
        }

如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量,并且偏移量是一次递增的。

3.异步发送: 调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码

   try {       producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    //如果异常为空,说明异步发送成功
                    if (null!=exception){
                        exception.printStackTrace();
                    }
                    //获取偏移量
                    System.out.println(metadata.offset());
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。

2.参数详解:

通过上面的入门代码,已经看到了几个必要的参数(bootstrap.servers、序列化器,重试等)     基本上都在producerConfig这个类中能找到

bootstrap.servers:就是kafka连接地址与端口号。

retries重试:生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

生产者还有很多可配置的参数,在kafka官方文档中都有说明,大部分都有合理的默认值,所以没有必要去修改它们,不过有几个参数在内存使用,性能和可靠性方法对生产者有影响

acks:指的是producer的消息发送确认机制

acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应,也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。高吞吐量,低可靠性。

acks=1: 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。默认情况下是等于1,也是吞吐量与可靠性的一个折中方案。 但是等于1的情况下,也有可能出现数据丢失,就是leader返回一个acks给producer后,还没得及同步消息给follwoer就挂了,导致数据丢失了。

acks=all :  只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过他的延迟比acks=1时更高。低吞吐量,高可靠性。

消费者工作原理:

1.连接kafka集群

2.从kafka集群中拉取信息

3.kafka集群根据偏移量查询信息 offser就是偏移量 一个long型数值

4.将数据同步给消费者

1.参数详解:

与生产者类似 基本上都在ConsumerConfig这个类中能找到,向上面的入门代码中,连接kafka,反序列化,以及分组都有  或者直接去kafka文档看。

 eauto.commit:  该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。如果把它设置为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率(就是多长时间提交一次)。

auto.offset.reset: 

 1.earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

 2.latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

 3.none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

 4.anything else(其他情况,不是上面三种情况就抛出异常): 向consumer抛出异常

2.提交和偏移量: 

每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

 

如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。如下图:

 

 

消费者当前提交的偏移量为2,而他已经消费到11了,当他挂了时,在进行重平衡的时候,是从提交的偏移量开始继续消费的,导致了数据的重复消费。

 

 

 如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。如下图:

 

 

当他提交的偏移量为11时,而他只消费到了3,在他挂了一个进行重平衡时,是从提交的偏移量开始消费的,导致数据的丢失。

 

3.自动提交偏移量:

enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms控制,默认值是5秒。

 

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

 

 

4.提交当前偏移量(同步提交)

enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

主要分为两部:1.把enable.auto.commit设置为false 2.

//设置同步提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");



        while (true){
            //收消息间隔  与 消息载体
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key());
                System.out.println(record.value());
                try {
                    //手动/同步 提交 偏移量
                    consumer.commitSync();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

5.提交当前偏移量(异步提交)

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

 

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });
}

5.提交当前偏移量(同步和异步组合提交)

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try {
    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
      //异步提交
        consumer.commitAsync();
    }
}catch (Exception e){
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
      //不管异步成功与否,都同步提交一下,确保消息每一次都提交了
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

SpringBoot集成Kafka

1.依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiaoteng.kafka</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>Demo project for Spring Boot</description>
    <!-- 继承Spring boot工程 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
    </parent>
    <properties>
        <kafka.version>2.2.7.RELEASE</kafka.version>
        <kafka.client.version>2.0.1</kafka.client.version>
        <fastjson.version>1.2.58</fastjson.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.client.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.client.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>

</project>

2.application.yml 配置文件,我这是偷懒,把两者都放到一块了,既是生产者也是消费者

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test-hello-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.创建生产者

package com.xiaoteng.kafka.controller;

import com.alibaba.fastjson.JSON;
import com.xiaoteng.kafka.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello() {
        //第一个参数:topics  
        //第二个参数:消息内容  字符串信息
        kafkaTemplate.send("hello-kafka", "哈喽kafka");
        return "ok";
    }

    @GetMapping("/hello2")
    public String hello2() {
        User u = new User();
        u.setName("张三");
        //第一个参数:topics     []   "{}"
        //第二个参数:消息内容   对象转为json 把对象发过去
        kafkaTemplate.send("hello-kafka2", JSON.toJSONString(u));//发送消息
        return "消息ok";
    }
}

4.消费者

package com.xiaoteng.kafka.listener;

import com.alibaba.fastjson.JSON;
import com.xiaoteng.kafka.entity.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Optional;

@Component
public class HelloListener {

    @KafkaListener(topics = {"hello-kafka"})
    public void receiverMessage(ConsumerRecord<?, ?> record) {
        //接收信息,判断是否为空
        Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
        //如果不为空
        if (optional.isPresent()) {
            //拿到对象信息
            Object value = record.value();
            System.out.println(value);
            //展示 哈喽kafka
        }
    }

    @KafkaListener(topics = {"hello-kafka2"})
    public void listener(String record) {
        //拿到信息  转为对象
        User user = JSON.parseObject(record, User.class);
        System.out.println("record:" + record);
        System.out.println("对象" + user);
        //展示   record:{"name":"张三"}
        //展示    对象User{name='张三'}
        //我这里的user对象 只设置了一个Name属性
    }
}

 

这篇关于kafka入门的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!