目录
1、引入maven
2、基本的生产者和代码注释
3、最简单消费者
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
package cn.enjoyedu.hellokafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * MyHelloProducer * * @author honry.guan * @date 2021-05-06 19:16 */ public class MyHelloProducer { public static void main(String[] args) { Properties properties = new Properties(); //配置连接ip和地址 properties.put("bootstrap.servers","127.0.0.1:9092"); //kafka自带序列化器,可以不用谢全类路径StringSerializer.class也可以,这里作为演示 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String,String> producer = new KafkaProducer<>(properties); try { ProducerRecord<String,String> producerRecord = new ProducerRecord<>("my-hello","name","tom"); producer.send(producerRecord); } finally { //关闭连接 producer.close(); } } }
package cn.enjoyedu.hellokafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * MyHelloConsumer * * @author honry.guan * @date 2021-05-06 19:17 */ public class MyHelloConsumer { public static void main(String[] args) { Properties properties = new Properties(); //配置连接ip和地址 properties.put("bootstrap.servers", "127.0.0.1:9092"); //kafka自带反序列化器,可以不用谢全类路径StringDeserializer.class也可以,这里作为演示 properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //要指定一个群组,否则会报错 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); //订阅主题 kafkaConsumer.subscribe(Collections.singleton("my-hello")); try { while (true) { //读取消息 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } finally { kafkaConsumer.close(); } } }