在下面的教程中,我们将演示如何使用Spring Boot配置Spring Kafka。 Spring Boot使用合理的默认配置Spring Kafka。并使用application.yml
属性文件覆盖这些默认值。
项目设置
2.1.4.RELEASE
2.0.0.RELEASE
kafka_2.11-1.0.0
3.5
此前已经学习了如何创建一个Kafka消费者和生产者,它可以手动配置生产者和消费者。 在这个例子中,我们将使用Spring Boot使用合理的默认值来配置它们。
下载并安装Apache Kafka
要下载并安装Apache Kafka,请阅读官方文档( https://kafka.apache.org/quickstart )。 本教程假设服务器使用默认配置启动,并且没有更改服务器端口。
Maven的依赖
这个项目中,使用Apache Maven来管理项目依赖关系。 确保以下依赖关系在类路径中。pom.xml 文件的内容如下所示 -
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zyiz.spring.kafka</groupId> <artifactId>springboot-config</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://www.zyiz.net</url> <description>Spring Kafka Spring Boot</description> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> <defaultGoal>compile</defaultGoal> </build> </project>
整个项目的目录结构如下所示 -
使用Spring Boot发送Spring Kafka消息
Spring Boot根据application.yml
属性文件中配置的属性自动配置并初始化KafkaTemplate。 通过使用@Service
注解,使Sender
类符合Spring容器的要求来执行自动发现。
Sender.java 的代码如下所示 -
package com.zyiz.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${app.topic.foo}") private String topic; public void send(String message){ LOG.info("sending message='{}' to topic='{}'", message, topic); kafkaTemplate.send(topic, message); } }
用Spring Boot接收Kafka消息
ConcurrentKafkaListenerContainerFactory
和KafkaMessageListenerContainer bean
也由Spring Boot自动配置。 可以选择使用application.yml
属性文件来配置这些bean。
通过使用@KafkaListener
来注解一个方法Spring Kafka会自动创建一个消息监听器容器。
Receiver.java 实现的代码如下所示 -
package com.zyiz.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @KafkaListener(topics = "${app.topic.foo}") public void receive(@Payload String message, @Headers MessageHeaders headers) { LOG.info("received message='{}'", message); headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key))); } }
使用application.yml配置应用程序
Spring Boot会尝试根据pom.xml
文件中指定的依赖关系自动配置应用程序,并设置合理的默认值。这里还没有配置任何Consumer,Producer或KafkaTemplate bean,Spring引导将使用spring引导默认值自动配置它们。 这些值可以使用application.yml
属性文件重写。可以找到更多关于Spring Boot Kafka Properties的信息。
还创建了一个在src/main/resources
文件夹中的application.yml
属性文件。 这些属性通过spring引导注入到配置类中。
spring: kafka: consumer: group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer app: topic: foo: foo.t logging: level: root: WARN org.springframework.web: INFO com.zyiz: DEBUG
运行应用程序
在运行这个项目程序之前,需要运行 zookeeper
和 kafka
,如下所示 -
启动zookeeper
服务 -
D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties
启动kafka
服务 -
D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties
最后,编写了一个简单的Spring Boot应用程序来演示应用程序。使这个演示工作,需要在端口9092
上运行的本地主机上的Kafka服务器,这是Kafka的默认配置。
package com.zyiz.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { sender.send("Spring Kafka and Spring Boot Configuration Example"); }
示例
当运行应用程序时,应该得到以下输出。
. ____ _ __ _ _ /\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\ ( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\ \\\\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) 2018-03-14 11:22:25.177 INFO 2892 --- [ main] com.zyiz.kafka.SpringKafkaApplication : Starting SpringKafkaApplication on MY-PC with PID 2892 (F:\\worksp\\spring-kafka\\springboot-config\\target\\classes started by Administrator in F:\\worksp\\spring-kafka\\springboot-config) 2018-03-14 11:22:25.181 DEBUG 2892 --- [ main] com.zyiz.kafka.SpringKafkaApplication : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE 2018-03-14 11:22:25.182 INFO 2892 --- [ main] com.zyiz.kafka.SpringKafkaApplication : No active profile set, falling back to default profiles: default 2018-03-14 11:22:26.869 INFO 2892 --- [ main] com.zyiz.kafka.SpringKafkaApplication : Started SpringKafkaApplication in 2.208 seconds (JVM running for 2.751) 2018-03-14 11:22:26.871 INFO 2892 --- [ main] com.zyiz.kafka.Sender : sending message='Spring Kafka and Spring Boot Configuration Example' to topic='foo.t' ... ... 2018-03-14 11:22:36.035 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:36.156 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:36.163 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 12 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:36.433 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:36.436 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 14 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:38.559 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 16 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:38.559 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:40.028 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE} 2018-03-14 11:22:56.203 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : received message='Spring Kafka and Spring Boot Configuration Example' 2018-03-14 11:22:56.205 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_offset: 0 2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false) 2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@68cba188 2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_timestampType: CREATE_TIME 2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_receivedMessageKey: null 2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_receivedPartitionId: 0 2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_receivedTopic: foo.t 2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.zyiz.kafka.Receiver : kafka_receivedTimestamp: 1520997760772