pom
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
application.properties
spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 spring.kafka.producer.retries=5 spring.kafka.producer.acks=all spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.transaction-id-prefix=transaction-id- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.properties.enable.idempotence=true spring.kafka.consumer.group-id=group1 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.properties.isolation.level=read_committed spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
日志
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%p %d{yyyy-MM-dd HH:mm:ss} - %m%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <!-- 控制台输出日志级别 --> <root level="ERROR"> <appender-ref ref="STDOUT" /> </root> <logger name="org.springframework.kafka" level="INFO" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <!--事务控制--> <logger name="org.springframework.kafka.transaction" level="debug" additivity="false"> <appender-ref ref="STDOUT" /> </logger> </configuration>
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListeners; import org.springframework.messaging.handler.annotation.SendTo; import java.io.IOException; /** * 程序启动入口 */ @SpringBootApplication @EnableKafka public class KafkaSpringBootApplication { public static void main(String[] args) throws IOException { SpringApplication.run(KafkaSpringBootApplication.class, args); System.in.read(); } /** * 消费者topic04,同时将消息发送给topic05 * @param cr * @return */ @KafkaListeners(value = {@KafkaListener(topics = {"topic04"})}) @SendTo(value = {"topic05"}) public String listenner(ConsumerRecord<?, ?> cr) { System.out.println("消费到数据:"+cr.value()); return cr.value() + "demo"; } }
service类
import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * 通过KafkaTemplate发送消息 * Transactional可以通过这个配置使用kafka的相关事务 */ @Transactional @Service public class OrderService { @Autowired private KafkaTemplate kafkaTemplate; public void saveOrder(String id, Object message) { kafkaTemplate.send(new ProducerRecord("topic04", id, message)); } }
测试类
import com.baizhi.KafkaSpringBootApplication; import com.baizhi.OrderService; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; /** * 通过单元测试类来发送消息 */ @SpringBootTest(classes = {KafkaSpringBootApplication.class}) @RunWith(SpringRunner.class) public class KafkaTempolateTests { @Autowired private KafkaTemplate kafkaTemplate; @Autowired private OrderService orderService; @Test public void testOrderService() { orderService.saveOrder("001", "诗和远方"); } @Test public void testKafkaTemplate() { kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { return kafkaOperations.send(new ProducerRecord("topic04", "002", "this is a demo")); } }); } }