Spring消费者和生产者

Spring消费者和生产者

本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。

下载并安装Apache Kafka

要下载并安装Apache Kafka,请阅读官方文档( https://kafka.apache.org/quickstart )。 本教程假设服务器使用默认配置启动,并且没有更改服务器端口。

Maven依赖

这个项目中,使用Apache Maven来管理项目依赖关系。 确保以下依赖关系在类路径中。pom.xml 文件的内容如下所示 -

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zyiz.spring.kafka</groupId>
    <artifactId>producer-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://www.zyiz.net/kafka</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>

</project>

Spring Kafka发送消息到主题

使用ProducerKafkaTemplate类发送消息,并提供将数据发送到Kafka主题的高级操作。 提供异步和同步方法,异步方法返回Future

package com.zyiz.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

为了能成功地发送消息给Kafka主题,我们需要配置KafkaTemplate。 此配置由SenderConfig类处理。

使用ProducerFactory的实现来配置KafkaTemplate,更具体地说是使用DefaultKafkaProducerFactory。可以使用Map <String,Object>来初始化这个生产者工厂,从ProducerConfig类获取的键。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG指定用于建立到Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,而不管这里指定哪些服务器用于引导/此列表仅影响用于发现全套服务器的初始主机。 此列表应采用host1:port1host2:port2...的形式。由于这些服务器仅用于初始连接以发现完整集群成员资格(可能会动态更改),因此此列表不需包含完整集合 的服务器(不过,如果服务器停机,可能需要多个服务器)。

  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG指定实现org.apache.kafka.common.serialization.Serializer接口的键的序列化程序类。

  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG指定实现org.apache.kafka.common.serialization.Serializer接口的值的序列化程序类。

有关配置选项的完整列表,请查看ProducerConfig类(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html )。

package com.zyiz.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Spring Kafka监听来自主题的消息

接下来,我们将演示如何监听来自Kafka主题的消息。 Receiver类将使用来自Kafka主题的消息。 创建Listen()方法并使用@KafkaListener注解对其进行了注释,该注释将方法标记为指定主题上的Kafka消息侦听器的目标。

package com.zyiz.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        LOG.info("received message='{}'", message);
    }

}

该机制需要在@Configuration类和侦听器容器工厂之一上使用@EnableKafka注解,该工厂用于配置底层ConcurrentMessageListenerContainer

使用SenderConfig类中使用的相同类型的键/值反序列化器是非常重要的。

  • ConsumerConfig.GROUP_ID_CONFIG指定一个唯一的字符串,标识此用户所属的用户组。
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定在Kafka中没有初始偏移量或当前偏移量不再存在于服务器上(例如,因为该数据已被删除)时要执行的操作:
    • earliest: 自动将偏移重置为最早的偏移量
    • latest: 自动将偏移量重置为最新的偏移量
    • none: 如果未找到消费者组的前一个偏移量,则向消费者抛出异常
    • anything else: 向消费者抛出异常。

消费者用消费者组名称标记自己,并且发布到主题的每个记录都被传送到每个订阅消费者组中的一个消费者实例。 消费者实例可以在单独的进程中或在单独的机器上。

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载均衡。 如果所有消费者实例具有不同的消费者组,则每个记录将被广播到所有消费者进程。

有关配置选项的完整列表,请查看ConsumerConfig类(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html )。

package com.zyiz.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

使用application.yml配置应用程序

创建一个在src/main/resources文件夹中的application.yml属性文件。 这些属性通过spring引导注入到配置类中。

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.zyiz: DEBUG

运行应用程序

最后,编写一个简单的Spring Boot应用程序来演示应用程序。 为了使这个演示工作,需要在端口9092上运行的本地主机上的Kafka服务器,这是Kafka的默认配置。

在运行这个项目程序之前,需要运行 zookeeper 和 kafka ,如下所示 -

启动zookeeper服务 -

D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties

启动kafka服务 -

D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties

应用程序的实现 -

package com.zyiz.kafka;

import com.zyiz.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerConsumerApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka Producer and Consumer Example");
    }
}

当我们运行应用程序时,应该会得到类似下面的输出。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

2018-03-14 14:40:41.454  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Starting ProducerConsumerApplication on MY-PC with PID 9740 (F:\worksp\spring-kafka\producer-consumer\target\classes started by Administrator in F:\worksp\spring-kafka\producer-consumer)
2018-03-14 14:40:41.458 DEBUG 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
2018-03-14 14:40:41.458  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : No active profile set, falling back to default profiles: default
2018-03-14 14:40:47.512  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Started ProducerConsumerApplication in 6.567 seconds (JVM running for 7.084)
2018-03-14 14:40:47.514  INFO 9740 --- [           main] com.zyiz.kafka.producer.Sender         : sending message='Spring Kafka Producer and Consumer Example' to topic='foo.t'
2018-03-14 14:40:49.413  INFO 9740 --- [ntainer#0-0-C-1] com.zyiz.kafka.consumer.Receiver       : received message='Spring Kafka Producer and Consumer Example'