Java教程

Java代码测试Kafka集群收发消息

本文主要是介绍Java代码测试Kafka集群收发消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

consumer:

package cn.miaoying.consumer;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public class TestConsumer {
    private final ConsumerConnector consumer;

    private TestConsumer() {
        Properties props = new Properties();
        // zookeeper 配置
        props.put("zookeeper.connect", "20.21.1.xxx:2182,20.21.1.xxx:2183,20.21.1.xxx:2184");
        // group 代表一个消费组
        props.put("group.id", "jd-group");
        // zk连接超时
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        // 序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map topicCountMap = new HashMap();
        //topicCountMap.put(KafkaProducerDemo.TOPIC, new Integer(1));
        topicCountMap.put("test_miaoying", new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap,
                keyDecoder, valueDecoder);
        KafkaStream stream = consumerMap.get("test_miaoying").get(0);
        ConsumerIterator it = stream.iterator();
        while (it.hasNext())
            System.out.println(it.next().message());
    }

    public static void main(String[] args) {
        new TestConsumer().consume();
    }
}

 

provider:

package cn.miaoying.consumer;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProvider {
    public static void main(String[] args) {

        long events = Long.parseLong("1");
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "20.21.1.xxx:9093,20.21.1.xxx:9091,20.21.1.xxx:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer producer = new Producer(config);
        for (int i = 0; i < 5; i++) {
            long runtime = new Date().getTime();
            String ip = "127.0.0.1";
            String msg = "test~test~test";
            KeyedMessage keyedMessage = new KeyedMessage("test_miaoying", ip, msg);
            System.out.println(events + "---" + runtime);
            producer.send(keyedMessage);
        }
        producer.close();
    }
}

 

这篇关于Java代码测试Kafka集群收发消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!