SqlServer教程

KAFKA数据源同步到SQL SERVER数据库代码实现

本文主要是介绍KAFKA数据源同步到SQL SERVER数据库代码实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.ruoyi.quartz.controller;

import com.ruoyi.quartz.domain.LfoSbomP;
import com.ruoyi.quartz.domain.LfoSbomS;
import com.ruoyi.quartz.sbom.model.LogStatus;
import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonLFOSBBBOMBean;
import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonRootBean;
import com.ruoyi.quartz.sbom.process.bean.send.SendJsonLFOServiceBOMBean;
import com.ruoyi.quartz.service.ILfoSbomLogService;
import com.ruoyi.quartz.service.ILfoSbomSService;
import com.ruoyi.quartz.util.LongToDateUtils;
import com.ruoyi.quartz.util.WCLocationConstants;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSONObject;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import com.ruoyi.quartz.service.ILfoSbomPService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author zhulm7
 * @date 2021-09-18 09:45:00
 * 这个方法主要是消费资源,调用资源路径,消费者拉取kafka中的数据,写入到文件中
 * http://localhost/dev-api/message/receive/file/delta
 */

@RestController
@RequestMapping("message/receive")
public class ConsumerFileDeltaController {

    @Autowired
    private ILfoSbomPService lfoPService;

    @Autowired
    private ILfoSbomSService lfosService;


    private static String KAFKA_ACCOUNT_NAME = "kaf_fineReport";
    private static String KAFKA_ACCOUNT_PWD = "XpyO8MBtxC';";
    private static String KAFKA_PRODUCER_SERVER = "n1.ikp.tcp.com:8092,n2.ikp.tcp.com:8092";
    private static String CONSUMER_GROUP_ID = "fineReport";
    private static String CONSUMER_ENABLE_AUTO_COMMIT = "false";
    private static String CONSUMER_AUTO_OFFSET_RESET = "earliest";
    private static String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";
    private static String CONSUMER_SESSION_TIMEOUT_MS = "10000";
    private static String CONSUMER_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private static String CONSUMER_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private static String KAFKA_SECURITY_PROTOCOL = "SASL_SSL";
    private static String KAFKA_SSL_TRUSTSTORE_LOCATION = "";
    private static String KAFKA_SSL_TRUSTSTORE_PWD = "";
    private static String KAFKA_SASL_MECHANISM = "SCRAM-SHA-512";
    private static String CONSUMER_ELOIS_TOPIC = "ebgwc";
    private static KafkaConsumer<String, String> consumer=null;

    /**
     * Consumed data from KAFKA
     * @return    增量方式消费kafka中流式数据处理逻辑
     * @throws Exception    2021-09-26 13:37:00
     */
    @RequestMapping("/file/delta")
    public void receive(String msg) {

        //业务逻辑(增量数据)书写的方式。。。。。。
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "n1.ikp.lenovo.com:9092,n2.ikp.lenovo.com:9092,n3.ikp.lenovo.com:9092,n4.ikp.lenovo.com:9092,n5.ikp.lenovo.com:9092,n6.ikp.lenovo.com:9092");
        props.put("group.id", "fineReport");
        props.put("enable.auto.commit", "false");    // 设置不自动提交
        props.put("auto.offset.reset", "earliest");  // 从头开始记录信息  earliest latest none
        props.put("auto.commit.interval.ms", "1000");// 自动提交间隔
        props.put("session.timeout.ms", "10000");    // 超时时间30秒
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("security.protocol", "SASL_SSL");
        props.put("ssl.endpoint.identification.algorithm", "");
        props.put("ssl.truststore.location", "D:/kafka-2.8.0-src/client_truststore.jks");
        props.put("ssl.truststore.password", "WSO2_sp440");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        //XpyO8MBt
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_fineReport' password='XpyO8MBtcx';");

        final int minBatchSize = 50;
        final int minBuffer_length = 420000000;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        int buffer_length = 0;
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //ebgwc_lfo-sbom-delta 增量
        consumer.subscribe(Collections.singletonList("ebgwc_lfo-sbom-delta"));

        try {
            int whileIndex = 0;
            DateFormat df= new SimpleDateFormat("yyyyMMdd");
            String rootFolder="delta";
            String parentFolder=df.format(new Date());
            String folder= WCLocationConstants.WT_HOME+ File.separator+"cluster"+File.separator+rootFolder+File.separator+parentFolder;
            File dir=new File(folder);

            //创建文件夹
            if(!dir.exists()) { dir.mkdirs(); }

            //一次性把lfoNumber加载数据库中的数据,方便后续插入使用,减少连接数据库次数
            Set<String> lst =new HashSet<String>();
            //List<LfoSbomP> lfoplist = lfoPService.initLfoNumberData();//查询的数据量过大
            //for(int i=0;i<lfoplist.size();i++){
                //String lfopnumbernc= lfoplist.get(i).getLfoNumber();
                //lst.add(lfopnumbernc);
            //}
            
            List<String> lfonumberplist = lfoPService.initAllLfoNumberData();
            for(int i=0;i<lfonumberplist.size();i++){
                String lfopnumbernc= lfonumberplist.get(i);
                lst.add(lfopnumbernc);
            }

            DateFormat df1= new SimpleDateFormat("yyyyMMddHHmmss");

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                whileIndex += 1;

                int record_counter = 0;
                int forIndex = 1;

                for (ConsumerRecord<String, String> record : records) {

                    buffer.add(record); //数据添加到缓存里
                    record_counter += 1;//计数器
                    buffer_length += record.value().length();//字符串长度

                    //if(buffer.size() >= minBatchSize){//原来通过消息
                    if(buffer_length > minBuffer_length || record_counter == records.count()){
                        for (int k = 0; k < buffer.size(); k++) {

                            ConsumerRecord<String, String> record_buffer = buffer.get(k);

                            //获取kafka生产者消息时间
                            String datestr = LongToDateUtils.longToDate(record_buffer.timestamp());
                            String topic_name = record_buffer.topic();//主题
                            String partition = record_buffer.partition()+"";//分区
                            String offset=record_buffer.offset()+"";//偏移量
                            String key=record_buffer.key();//日志主键
                            String value=record_buffer.value();//日志value

                            if(StringUtils.isBlank(value)) {
                                continue; //跳过
                            }

                            if(!(value.startsWith("{") && value.endsWith("}"))) {
                                if(StringUtils.isBlank(value)) {
                                    continue;//跳过
                                }
                            }


                            //正常情况写入txt日志文件
                            ReceiveJsonRootBean receiveJsonRootBean = JSONObject.parseObject(value, ReceiveJsonRootBean.class);
                            String LFONumber = receiveJsonRootBean.getLfo().getLfoNumber();

                            //判断LFONumber字符串是否在内存lst中存在,如果存在,那么跳过,如果不存在,那么写入数据库操作
                            boolean contains = lst.contains(LFONumber);
                            if(!contains){//如果不存在,那么执行插入条件
                                //循环遍历结果集

                                //最后把这个number记录数据库中
                                lst.add(LFONumber);

                                //文件格式
                                String filePath = folder + File.separator +"ebgwc_lfo_sbom_delta"+ whileIndex + "_" + forIndex + ".txt";
                                //向文件尾部追加数据  (保留向文件中插入数据的功能)
                                appendToFile(filePath, value+"\r\n");

                                int limit = 200;//分批处理,每次处理200个
                                List<LfoSbomS> buffer_lfosbom = new ArrayList<>();
                                List<SendJsonLFOServiceBOMBean> lfosbom = receiveJsonRootBean.getLfosbom();
                                System.out.printf("测试delta:"+lfosbom.toString());
                                for (int m = 0; m < lfosbom.size(); m++) {

                                    //向数据库中增量插入业务数据
                                    LfoSbomS lfo_s = new LfoSbomS();
                                    lfo_s.setLfoNumber(lfosbom.get(m).getLfoNumber());
                                    lfo_s.setChangeType(lfosbom.get(m).getChangeType());
                                    lfo_s.setServicePart(lfosbom.get(m).getServicePart());
                                    lfo_s.setBacCode(lfosbom.get(m).getBacCode());
                                    lfo_s.setComposedQty(lfosbom.get(m).getComposedQty());
                                    lfo_s.setDescription(lfosbom.get(m).getDescription());
                                    lfo_s.setLongdescription(lfosbom.get(m).getLongdescription());
                                    lfo_s.setOffSet(topic_name+"_"+partition+"_"+offset+"_"+LFONumber+"_"+datestr);//主键
                                    lfo_s.setInsertDate(datestr);//推送日期

                                    //向缓存中插入数据
                                    buffer_lfosbom.add(lfo_s);

                                    //当没存满的时候遍历结束了,也要执行插入操作
                                    if(limit == buffer_lfosbom.size() || m == lfosbom.size()-1){
                                        //要分成200一批次插入增量的分块数据
                                        lfosService.insertListLfoSbomS(buffer_lfosbom);
                                        //清空缓存中的数据
                                        buffer_lfosbom.clear();
                                    }
                                }

                                // 正常增量数据记录日志
                                LfoSbomP lfo_p = new LfoSbomP();
                                lfo_p.setLfoNumber(LFONumber);
                                lfo_p.setChangeType("INS");
                                lfo_p.setPartition(partition);
                                lfo_p.setOffSet(offset);
                                lfo_p.setFilePath(filePath);//文件所在路径
                                lfo_p.setStatus(LogStatus.SUCCESS);
                                lfo_p.setMessage("ebgwc_lfo-sbom-delta");//存放主题
                                //插入日志,调用方法
                                lfoPService.insertLfoSbomP(lfo_p);

                            }else{
                                //如果set集合中存在lfonumber,那么不错处理,非常重要!!!
                            }
                        }

                        //清空缓存
                        buffer.clear();
                        //计数器清零
                        buffer_length = 0;
                        //索引值加1
                        forIndex += 1;

                    }
                }
            }

        } catch (Exception e) {
            System.out.print("commit failed");
            System.out.print(e);
        } finally {
            //consumer.commitSync();
        }
    }



    /**
     * 方法追加文件:使用FileWriter
     */
    public static void appendToFile(String fileName, String content) {
        try {
            //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件
            BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));
            writer.write(content);
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
这篇关于KAFKA数据源同步到SQL SERVER数据库代码实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!