版本信息kafka 1.0.2
spring-kafka高版本兼容低版本
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
KafkaProducerController
package com.lew.sp.controller; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; /** * @Author llewcg * @Description 生产者生产消息 */ @RestController public class KafkaProducerController { @Autowired KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/asyncSendMess/{msg}") public String asyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, msg)); try { SendResult<Integer, String> sendResult = future.get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return "success"; } @RequestMapping("/noAsyncSendMess/{msg}") public String noAsyncSendMess(@PathVariable String msg) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(new ProducerRecord<Integer, String>("gc_spring_1", 1, "gc_well_spring")); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable throwable) { System.out.println("发送失败"); } @Override public void onSuccess(SendResult<Integer, String> sendResult) { RecordMetadata recordMetadata = sendResult.getRecordMetadata(); System.out.println(recordMetadata.topic() + "\t" + recordMetadata.offset()); } }); return "success"; } }
CusConsumer
package com.lew.sp.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @Author llewcg * @Description */ @Component public class CusConsumer { @KafkaListener(topics = "gc-spring-02") public void consumerMess(ConsumerRecord<Integer, String> consumerRecord){ Optional<ConsumerRecord<Integer, String>> consumerRecordOptional = Optional.ofNullable(consumerRecord); if(consumerRecordOptional.isPresent()){ System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value()); } } }
KafkaConfig
修改自动注入的配置
package com.lew.sp.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author lewcg * @Description 修改默认配置 */ @Configuration public class KafkaConfig { @Bean public NewTopic topic1() { return new NewTopic("ntp-1", 2, (short) 1); } @Bean public NewTopic topic2() { return new NewTopic("ntp-02", 3, (short) 1); } /* @Bean public KafkaAdmin newAdmin(){ Map<String, Object> config = new HashMap<>(); config.put("xxx","xxx"); return new KafkaAdmin(config); }*/ /* @Bean public KafkaTemplate<Integer, String> newTemplate(ProducerFactory<Integer, String> producerFactory){ Map<String, Object> config = new HashMap<>(); //覆盖原有设置 config.put("xxx","xxx"); return new KafkaTemplate<Integer, String>(producerFactory, config); }*/ }
演示效果