MySql教程

kafka自定义producer从mysql获取数据存储至kafka

本文主要是介绍kafka自定义producer从mysql获取数据存储至kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package test;

/**
 * @Description 细节决定成败
 * @Date 2021/5/23 14:45
 * @Author liaoxuan
 **/

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import util.GlobalConfigUtil;

import java.util.Properties;

/**
 * Kafka生产消息工具类
 */
public class KafkaSender {


    private String topic;

    public KafkaSender(String topic){
        super();
        this.topic = topic;
    }
    /**
     * 发送消息到Kafka指定topic
     * @param topic topic名字
     * @param data 数据
     */
    public static void sendMessage(String topic ,  String data){
        Producer<String, String> producer = createProducer();
        producer.send(new KeyedMessage<String , String>(topic , data));
    }

    private static Producer<String , String> createProducer(){
        Properties properties = new Properties();

        properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrapServers);
        properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeperConnect);
        properties.put("serializer.class" , StringEncoder.class.getName());

        return new Producer<String, String>(new ProducerConfig(properties));
    }
}
package test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import util.GlobalConfigUtil;

import java.io.UnsupportedEncodingException;
import java.sql.*;

/**
 * @Description 细节决定成败
 * @Date 2021/5/23 14:46
 * @Author liaoxuan
 **/
public class MysqlToKafkaTest {
    public static void main(String[] args) throws Exception{
        MysqlToKafkaTest a = new MysqlToKafkaTest();
        Connection con = a.getCon();
        String sql = "select * from userconfig";
        Statement statement = con.createStatement();
        ResultSet rs = statement.executeQuery(sql);
        JSONArray objects = resultSetToJson(rs);
        for (int i=0;i<objects.size();i++){
            System.out.println(objects.get(i));
            KafkaSender.sendMessage(GlobalConfigUtil.kafkaInputTopic, String.valueOf(objects.get(i)));
        }

    }

    public  Connection getCon() {
        //数据库连接名称
        String username= GlobalConfigUtil.mysqlUsername;
        //数据库连接密码
        String password=GlobalConfigUtil.mysqlPassword;
        String driver=GlobalConfigUtil.mysqlDriver;
        //其中test为数据库名称
        String url=GlobalConfigUtil.mysqlUrl;
        Connection conn=null;
        try{
            Class.forName(driver);
            conn=(Connection) DriverManager.getConnection(url,username,password);
        }catch(Exception e){
            e.printStackTrace();
        }
        return conn;
    }

        /**
         * ResultSet转JSON
         */
        public static JSONArray resultSetToJson(ResultSet rs) throws SQLException, JSONException, UnsupportedEncodingException {
            // json数组
            JSONArray array = new JSONArray();
            // 获取列数
            ResultSetMetaData metaData = rs.getMetaData();
            int columnCount = metaData.getColumnCount();
            // 遍历ResultSet中的每条数据
            while (rs.next()) {
                JSONObject jsonObj = new JSONObject();
                // 遍历每一列
                for (int i = 1; i <= columnCount; i++) {
                    String value = null;
                    String columnName = metaData.getColumnLabel(i);//列名称
                    if (rs.getString(columnName) != null && !rs.getString(columnName).equals("")) {
                        value = new String(rs.getBytes(columnName), "UTF-8");//列的值,有数据则转码
                        //  System.out.println("===" + value);
                    } else {
                        value = "";//列的值,为空,直接取出去
                    }
                    jsonObj.put(columnName, value);
                }
                array.add(jsonObj);
            }
            rs.close();
            return array;
        }
}

 

 

package util;
/*
        读取配置文件的一个工具类
 */

import java.util.ResourceBundle;

public class GlobalConfigUtil {
    //获取一个资源加载器
    //资源加载器会去自动加载CLASSPATH中的application.properties文件
    private  static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
    //使用resourceBundle的getSting方法 来读取配置
    public static String mysqlUsername = resourceBundle.getString("mysql.username");
    public static String mysqlPassword = resourceBundle.getString("mysql.password");
    public static String mysqlDriver = resourceBundle.getString("mysql.driver");
    public static String mysqlUrl = resourceBundle.getString("mysql.url");
    public static String kafkaBootstrapServers = resourceBundle.getString("kafka.bootstrap.servers");
    public static String kafkaZookeeperConnect = resourceBundle.getString("kafka.zookeeper.connect");
    public static String kafkaInputTopic = resourceBundle.getString("kafka.input.topic");

    public static void main(String[] args) {
        System.out.println(mysqlUsername);
        System.out.println(mysqlPassword);
        System.out.println(mysqlDriver);
        System.out.println(mysqlUrl);
        System.out.println(kafkaBootstrapServers);
        System.out.println(kafkaZookeeperConnect);
        System.out.println(kafkaInputTopic);
    }
}

 

application.properties:

mysql.username=root
mysql.password=root
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://localhost:3306/test


#
#kafka的配置
#
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
kafka.zookeeper.connect=node01:2181,node02:2181,node03:2181
kafka.input.topic=MysqlToKafka
这篇关于kafka自定义producer从mysql获取数据存储至kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!