consumer:
package cn.miaoying.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class TestConsumer { private final ConsumerConnector consumer; private TestConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "20.21.1.xxx:2182,20.21.1.xxx:2183,20.21.1.xxx:2184"); // group 代表一个消费组 props.put("group.id", "jd-group"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map topicCountMap = new HashMap(); //topicCountMap.put(KafkaProducerDemo.TOPIC, new Integer(1)); topicCountMap.put("test_miaoying", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream stream = consumerMap.get("test_miaoying").get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { new TestConsumer().consume(); } }
provider:
package cn.miaoying.consumer; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProvider { public static void main(String[] args) { long events = Long.parseLong("1"); Properties properties = new Properties(); properties.put("metadata.broker.list", "20.21.1.xxx:9093,20.21.1.xxx:9091,20.21.1.xxx:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(properties); Producer producer = new Producer(config); for (int i = 0; i < 5; i++) { long runtime = new Date().getTime(); String ip = "127.0.0.1"; String msg = "test~test~test"; KeyedMessage keyedMessage = new KeyedMessage("test_miaoying", ip, msg); System.out.println(events + "---" + runtime); producer.send(keyedMessage); } producer.close(); } }