package com.asiainfo.group.kafka.consumer; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; public class ConsumerDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/"; private static KafkaConsumer<String, String> consumer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"consumer.properties")); consumer = new KafkaConsumer<String,String>(p); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { List<String> topics = new ArrayList<String>(); topics.add("okk"); consumer.subscribe(topics); try { while(true){ ConsumerRecords<String, String> records = consumer.poll(3000); System.err.println("收到了"+records.count()+"条消息"); for (ConsumerRecord<String, String> record : records) { System.err.println("topic:"+record.topic()); System.err.println("partition:"+record.partition()); System.err.println("offset:"+record.offset()); System.err.println("key:"+record.key()); System.err.println("value:"+record.value()); } //同步提交:会一直尝试直至提交成功,会一直阻塞 //consumer.commitSync(); //异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> arg0, Exception arg1) { System.err.println("异步提交偏移量成功!"); } }); //同步提交结合异步提交 //如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的 //如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码 /*try{ consumer.commitSync(); } finally{ consumer.close(); }*/ } } catch (Exception e) { e.printStackTrace(); } finally{ consumer.close(); } } }).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer consumer.wakeup(); } }
2、生产者
package com.asiainfo.group.kafka.producer; import java.io.FileReader; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProductDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/"; private static KafkaProducer<String, String> producer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"producer.properties")); producer = new KafkaProducer<String,String>(p); } catch (Exception e) { e.printStackTrace(); } } /** * 发送并忘记(不关心是否到达) * @throws ExecutionException * @throws InterruptedException */ public void sendAndForget() throws Exception{ for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i); producer.send(record); } producer.close(); } public void syncSend() throws Exception{ long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i); RecordMetadata recordMetadata = producer.send(record).get(); System.err.println("同步发送成功!"); } System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } public void asyncSend(){ long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.err.println("异步发送成功!"); } }); } System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ new ProductDemo().sendAndForget(); //new ProductDemo().syncSend(); //new ProductDemo().asyncSend(); } }3、生产者配置文件
bootstrap.servers=192.168.0.108:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer4、消费者配置文件
bootstrap.servers=192.168.0.108:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=groupByJava1 enable.auto.commit=false