Java教程

编写静态多数据源代码并做定时任务实现数据库数据同步

本文主要是介绍编写静态多数据源代码并做定时任务实现数据库数据同步,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

首先我们在SpringBoot的配置文件中将两个数据源配置出来

server:
  port: 8083

spring:
  datasource:
    remote :
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://21.33.322.22/greer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
      username: 333
      password: 22@2
    mine :
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://23.33.212.22/ferer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
      username: w22
      password: 222
  profiles:
    active: dev

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #开启sql日志

logging:
  level:
    com.starcpdk.mapper: debug

然后我们可以先从mapper层开始写起 , 下面是我的mapper层结构
在这里插入图片描述
下面我们来写配置类进行数据源的配置
首先我们准备一个配置类读取配置文件中的数据库信息,并创建数据源

package com.starcpdk.config;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {
 
    // 表示这个数据源是默认数据源
    @Primary
    // 将这个对象放入spring容器中(交给Spring管理)
    @Bean(name="remoteDataSource")
    // 读取 application.yml 中的配置参数映射成为一个对象
    @ConfigurationProperties(prefix = "spring.datasource.remote")
    public DataSource getDataSource1(){
        // 创建一个数据源
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
 
    @Primary
    @Bean(name="mineDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mine")
    public DataSource getDataSource2(){
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
}

接着我们需要针对这两个数据源进行mapper文件的映射

package com.starcpdk.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 数据源Config2
 */
@Configuration
@MapperScan(basePackages = {"com.starcpdk.mapper.mine"}, sqlSessionFactoryRef = "mineSqlSessionFactory")
public class MybatisMineConfig {
 
    @Autowired
    @Qualifier("mineDataSource")
    private DataSource dataSource;
 
    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean(name="mineSqlSessionFactory")
//    @Primary
    public SqlSessionFactory mineSqlSessionFactory() throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/starcpdk/mapper/mine/xml/*.xml"));
        return bean.getObject();
    }
 
    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean(name="mineSqlSessionTemplate")
//    @Primary
    public SqlSessionTemplate mineSqlSessionTemplate(@Qualifier("mineSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
package com.starcpdk.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 数据源Config1
 */
@Configuration
@MapperScan(basePackages = {"com.starcpdk.mapper.remote"}, sqlSessionFactoryRef  = "remoteSqlSessionFactory")
public class MybatisRemoteConfig {
 
    @Autowired
    @Qualifier("remoteDataSource")
    private DataSource dataSource;
 
    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean(name="remoteSqlSessionFactory")
    @Primary
    // @Qualifier表示查找Spring容器中名字为 preDataSource 的对象
    public SqlSessionFactory remoteSqlSessionFactory() throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/starcpdk/mapper/remote/xml/*.xml"));
        return bean.getObject();
    }
 
    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean(name="remoteSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate remoteSqlSessionTemplate(@Qualifier("remoteSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

最后我们再写一个自己的config配置类就可以了 , 在这个配置类中我们可以写定时任务 , 这里我写的定时任务是靠springboot自带的定时任务实现的

package com.starcpdk.config;

import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import com.starcpdk.pojo.Maindata;
import com.starcpdk.service.mine.MaindataMineService;
import com.starcpdk.service.remote.MaindataRemoteService;
import com.starcpdk.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;

@Configuration
@EnableScheduling   // 2.开启定时任务
@Slf4j

public class MyConfig {

    @Resource
    MaindataMineService maindataMineService;

    @Resource
    MaindataRemoteService maindataRemoteService;


    @Bean
    @Profile({"dev", "test"})// 设置 dev test 环境开启
    public PerformanceInterceptor performanceInterceptor() {
        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
        performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行
        performanceInterceptor.setFormat(true);
        return performanceInterceptor;
    }

    //3.添加定时任务
    @Scheduled(cron = "0 27 9 * * ?")
    private void insertData() {
        log.info("定时任务开始时间: " + LocalDateTime.now());
        log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime());
        log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime());

        String mineDatetime = maindataMineService.maxMineDatetime();
        String remoteDatetime = maindataRemoteService.maxRemoteDatetime();
        HashMap<String, String> map = new HashMap<>();
        map.put("mineDatetime", mineDatetime);
        map.put("remoteDatetime", remoteDatetime);


        if (map.get("mineDatetime") != null){
            List<Maindata> list = maindataRemoteService.getAllData(map);
            log.info("list:", list);
            for (Maindata maindata : list) {
                HashMap<String, Object> insertMap = new HashMap<>();
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        }else {
            List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map);

            log.info("list:", list);
            
            for (Maindata maindata : list) {
            	HashMap<String, Object> insertMap = new HashMap<>();
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        }

        log.info("定时任务结束时间: " + LocalDateTime.now());
    }

}

在定时任务中我写的代码会导致内存溢出
首先我看到堆内存溢出的问题是考虑到HashMap的new过程放到了循环里面 , 大量循环会导致内存溢出 , 因为map对象键相同 , 值会覆盖 , 所以我们可以将hashMap的nnew 对象过程放到循环外面 , 这样整个程序中就只会存在一个map对象了

但是 , 我们发现他还是会内存溢出 , 于时我们就定位到了list集合
我们查询数据库得到的list数据集合中有三百多万条数据 , 也就是说有三百多万个对象存在list集合中 , 这样同样会导致内存溢出

所以我只能通过优化代码实现
我以时间作为限制条件 , 让他每次查询两天的数据进行同步 , 同步完之后再进行下一天数据查询

package com.starcpdk.config;

import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import com.starcpdk.pojo.Maindata;
import com.starcpdk.service.mine.MaindataMineService;
import com.starcpdk.service.remote.MaindataRemoteService;
import com.starcpdk.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;

@Configuration
@EnableScheduling   // 2.开启定时任务
@Slf4j

public class MyConfig {

    @Resource
    MaindataMineService maindataMineService;

    @Resource
    MaindataRemoteService maindataRemoteService;


    @Bean
    @Profile({"dev", "test"})// 设置 dev test 环境开启
    public PerformanceInterceptor performanceInterceptor() {
        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
        performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行
        performanceInterceptor.setFormat(true);
        return performanceInterceptor;
    }

    //3.添加定时任务
    @Scheduled(cron = "0 27 9 * * ?")
    private void insertData() {
        log.info("定时任务开始时间: " + LocalDateTime.now());
        log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime());
        log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime());

        String mineDatetime = maindataMineService.maxMineDatetime();
        String remoteDatetime = maindataRemoteService.maxRemoteDatetime();
        HashMap<String, String> map = new HashMap<>();
        map.put("mineDatetime", mineDatetime);
        map.put("remoteDatetime", remoteDatetime);


        if (map.get("mineDatetime") != null) {
            List<Maindata> list = maindataRemoteService.getAllData(map);
            // log.info("list:", list);
            HashMap<String, Object> insertMap = new HashMap<>();
            for (Maindata maindata : list) {
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        } else {

            String maxMineDateTime = maindataMineService.maxMineDatetime();
            String maxRemoteDatetime = maindataRemoteService.maxRemoteDatetime();

            while (maxMineDateTime != maxRemoteDatetime){
                if (maxMineDateTime == null || "".equals(maxMineDateTime)){
                    map.put("remoteDatetime", maindataRemoteService.minRemoteDatetime());

                    List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map);
                    // log.info("list:", list);
                    HashMap<String, Object> insertMap = new HashMap<>();

                    for (Maindata maindata : list) {
                        insertMap.put("mdId", maindata.getMdId());
                        insertMap.put("sid", maindata.getSid());
                        insertMap.put("sId", "");
                        insertMap.put("stationId", maindata.getStationId());
                        insertMap.put("mdValue", maindata.getMdValue());
                        insertMap.put("mdDatetime", maindata.getMdDatetime());
                        insertMap.put("mdSn", maindata.getMdSn());
                        maindataMineService.insertData(insertMap);
                    }
                }
                map.put("mineDatetime", maxMineDateTime);
                map.put("remoteDatetime", DateUtils.getNextDay(maxMineDateTime, "2"));
                List<Maindata> list = maindataRemoteService.getAllData(map);
                HashMap<String, Object> insertMap = new HashMap<>();

                for (Maindata maindata : list) {
                    insertMap.put("mdId", maindata.getMdId());
                    insertMap.put("sid", maindata.getSid());
                    insertMap.put("sId", "");
                    insertMap.put("stationId", maindata.getStationId());
                    insertMap.put("mdValue", maindata.getMdValue());
                    insertMap.put("mdDatetime", maindata.getMdDatetime());
                    insertMap.put("mdSn", maindata.getMdSn());
                    maindataMineService.insertData(insertMap);
                }
            }
        }

        log.info("定时任务结束时间: " + LocalDateTime.now());
    }

}

如上代码依旧可以优化 , 我们可以采用批量数据导入的方式进行 , 这样的话我们分批进行批量数据导入效率会更高~

这篇关于编写静态多数据源代码并做定时任务实现数据库数据同步的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!