首先我们在SpringBoot的配置文件中将两个数据源配置出来
server: port: 8083 spring: datasource: remote : driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://21.33.322.22/greer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC username: 333 password: 22@2 mine : driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://23.33.212.22/ferer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC username: w22 password: 222 profiles: active: dev mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #开启sql日志 logging: level: com.starcpdk.mapper: debug
然后我们可以先从mapper层开始写起 , 下面是我的mapper层结构
下面我们来写配置类进行数据源的配置
首先我们准备一个配置类读取配置文件中的数据库信息,并创建数据源
package com.starcpdk.config; import com.zaxxer.hikari.HikariDataSource; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import javax.sql.DataSource; @Configuration public class DataSourceConfig { // 表示这个数据源是默认数据源 @Primary // 将这个对象放入spring容器中(交给Spring管理) @Bean(name="remoteDataSource") // 读取 application.yml 中的配置参数映射成为一个对象 @ConfigurationProperties(prefix = "spring.datasource.remote") public DataSource getDataSource1(){ // 创建一个数据源 return DataSourceBuilder.create().type(HikariDataSource.class).build(); } @Primary @Bean(name="mineDataSource") @ConfigurationProperties(prefix = "spring.datasource.mine") public DataSource getDataSource2(){ return DataSourceBuilder.create().type(HikariDataSource.class).build(); } }
接着我们需要针对这两个数据源进行mapper文件的映射
package com.starcpdk.config; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * 数据源Config2 */ @Configuration @MapperScan(basePackages = {"com.starcpdk.mapper.mine"}, sqlSessionFactoryRef = "mineSqlSessionFactory") public class MybatisMineConfig { @Autowired @Qualifier("mineDataSource") private DataSource dataSource; /** * 创建 SqlSessionFactory * @return * @throws Exception */ @Bean(name="mineSqlSessionFactory") // @Primary public SqlSessionFactory mineSqlSessionFactory() throws Exception{ SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); // 设置mybatis的xml所在位置 bean.setMapperLocations(new PathMatchingResourcePatternResolver(). getResources("classpath*:com/starcpdk/mapper/mine/xml/*.xml")); return bean.getObject(); } /** * 通过 SqlSessionFactory 来创建 SqlSessionTemplate * @param sqlSessionFactory * @return */ @Bean(name="mineSqlSessionTemplate") // @Primary public SqlSessionTemplate mineSqlSessionTemplate(@Qualifier("mineSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用 return new SqlSessionTemplate(sqlSessionFactory); } }
package com.starcpdk.config; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * 数据源Config1 */ @Configuration @MapperScan(basePackages = {"com.starcpdk.mapper.remote"}, sqlSessionFactoryRef = "remoteSqlSessionFactory") public class MybatisRemoteConfig { @Autowired @Qualifier("remoteDataSource") private DataSource dataSource; /** * 创建 SqlSessionFactory * @return * @throws Exception */ @Bean(name="remoteSqlSessionFactory") @Primary // @Qualifier表示查找Spring容器中名字为 preDataSource 的对象 public SqlSessionFactory remoteSqlSessionFactory() throws Exception{ // 用来创建 SqlSessionFactory 等同于下面配置 // <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> // <property name="dataSource" ref="dataSource" /> // <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/> // </bean> SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中) bean.setMapperLocations(new PathMatchingResourcePatternResolver(). getResources("classpath*:com/starcpdk/mapper/remote/xml/*.xml")); return bean.getObject(); } /** * 通过 SqlSessionFactory 来创建 SqlSessionTemplate * @param sqlSessionFactory * @return */ @Bean(name="remoteSqlSessionTemplate") @Primary public SqlSessionTemplate remoteSqlSessionTemplate(@Qualifier("remoteSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用 return new SqlSessionTemplate(sqlSessionFactory); } }
最后我们再写一个自己的config配置类就可以了 , 在这个配置类中我们可以写定时任务 , 这里我写的定时任务是靠springboot自带的定时任务实现的
package com.starcpdk.config; import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor; import com.starcpdk.pojo.Maindata; import com.starcpdk.service.mine.MaindataMineService; import com.starcpdk.service.remote.MaindataRemoteService; import com.starcpdk.util.DateUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; @Configuration @EnableScheduling // 2.开启定时任务 @Slf4j public class MyConfig { @Resource MaindataMineService maindataMineService; @Resource MaindataRemoteService maindataRemoteService; @Bean @Profile({"dev", "test"})// 设置 dev test 环境开启 public PerformanceInterceptor performanceInterceptor() { PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor(); performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行 performanceInterceptor.setFormat(true); return performanceInterceptor; } //3.添加定时任务 @Scheduled(cron = "0 27 9 * * ?") private void insertData() { log.info("定时任务开始时间: " + LocalDateTime.now()); log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime()); log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime()); String mineDatetime = maindataMineService.maxMineDatetime(); String remoteDatetime = maindataRemoteService.maxRemoteDatetime(); HashMap<String, String> map = new HashMap<>(); map.put("mineDatetime", mineDatetime); map.put("remoteDatetime", remoteDatetime); if (map.get("mineDatetime") != null){ List<Maindata> list = maindataRemoteService.getAllData(map); log.info("list:", list); for (Maindata maindata : list) { HashMap<String, Object> insertMap = new HashMap<>(); insertMap.put("mdId", maindata.getMdId()); insertMap.put("sid", maindata.getSid()); insertMap.put("sId", ""); insertMap.put("stationId", maindata.getStationId()); insertMap.put("mdValue", maindata.getMdValue()); insertMap.put("mdDatetime", maindata.getMdDatetime()); insertMap.put("mdSn", maindata.getMdSn()); maindataMineService.insertData(insertMap); } }else { List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map); log.info("list:", list); for (Maindata maindata : list) { HashMap<String, Object> insertMap = new HashMap<>(); insertMap.put("mdId", maindata.getMdId()); insertMap.put("sid", maindata.getSid()); insertMap.put("sId", ""); insertMap.put("stationId", maindata.getStationId()); insertMap.put("mdValue", maindata.getMdValue()); insertMap.put("mdDatetime", maindata.getMdDatetime()); insertMap.put("mdSn", maindata.getMdSn()); maindataMineService.insertData(insertMap); } } log.info("定时任务结束时间: " + LocalDateTime.now()); } }
在定时任务中我写的代码会导致内存溢出
首先我看到堆内存溢出的问题是考虑到HashMap的new过程放到了循环里面 , 大量循环会导致内存溢出 , 因为map对象键相同 , 值会覆盖 , 所以我们可以将hashMap的nnew 对象过程放到循环外面 , 这样整个程序中就只会存在一个map对象了
但是 , 我们发现他还是会内存溢出 , 于时我们就定位到了list集合
我们查询数据库得到的list数据集合中有三百多万条数据 , 也就是说有三百多万个对象存在list集合中 , 这样同样会导致内存溢出
所以我只能通过优化代码实现
我以时间作为限制条件 , 让他每次查询两天的数据进行同步 , 同步完之后再进行下一天数据查询
package com.starcpdk.config; import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor; import com.starcpdk.pojo.Maindata; import com.starcpdk.service.mine.MaindataMineService; import com.starcpdk.service.remote.MaindataRemoteService; import com.starcpdk.util.DateUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; @Configuration @EnableScheduling // 2.开启定时任务 @Slf4j public class MyConfig { @Resource MaindataMineService maindataMineService; @Resource MaindataRemoteService maindataRemoteService; @Bean @Profile({"dev", "test"})// 设置 dev test 环境开启 public PerformanceInterceptor performanceInterceptor() { PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor(); performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行 performanceInterceptor.setFormat(true); return performanceInterceptor; } //3.添加定时任务 @Scheduled(cron = "0 27 9 * * ?") private void insertData() { log.info("定时任务开始时间: " + LocalDateTime.now()); log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime()); log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime()); String mineDatetime = maindataMineService.maxMineDatetime(); String remoteDatetime = maindataRemoteService.maxRemoteDatetime(); HashMap<String, String> map = new HashMap<>(); map.put("mineDatetime", mineDatetime); map.put("remoteDatetime", remoteDatetime); if (map.get("mineDatetime") != null) { List<Maindata> list = maindataRemoteService.getAllData(map); // log.info("list:", list); HashMap<String, Object> insertMap = new HashMap<>(); for (Maindata maindata : list) { insertMap.put("mdId", maindata.getMdId()); insertMap.put("sid", maindata.getSid()); insertMap.put("sId", ""); insertMap.put("stationId", maindata.getStationId()); insertMap.put("mdValue", maindata.getMdValue()); insertMap.put("mdDatetime", maindata.getMdDatetime()); insertMap.put("mdSn", maindata.getMdSn()); maindataMineService.insertData(insertMap); } } else { String maxMineDateTime = maindataMineService.maxMineDatetime(); String maxRemoteDatetime = maindataRemoteService.maxRemoteDatetime(); while (maxMineDateTime != maxRemoteDatetime){ if (maxMineDateTime == null || "".equals(maxMineDateTime)){ map.put("remoteDatetime", maindataRemoteService.minRemoteDatetime()); List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map); // log.info("list:", list); HashMap<String, Object> insertMap = new HashMap<>(); for (Maindata maindata : list) { insertMap.put("mdId", maindata.getMdId()); insertMap.put("sid", maindata.getSid()); insertMap.put("sId", ""); insertMap.put("stationId", maindata.getStationId()); insertMap.put("mdValue", maindata.getMdValue()); insertMap.put("mdDatetime", maindata.getMdDatetime()); insertMap.put("mdSn", maindata.getMdSn()); maindataMineService.insertData(insertMap); } } map.put("mineDatetime", maxMineDateTime); map.put("remoteDatetime", DateUtils.getNextDay(maxMineDateTime, "2")); List<Maindata> list = maindataRemoteService.getAllData(map); HashMap<String, Object> insertMap = new HashMap<>(); for (Maindata maindata : list) { insertMap.put("mdId", maindata.getMdId()); insertMap.put("sid", maindata.getSid()); insertMap.put("sId", ""); insertMap.put("stationId", maindata.getStationId()); insertMap.put("mdValue", maindata.getMdValue()); insertMap.put("mdDatetime", maindata.getMdDatetime()); insertMap.put("mdSn", maindata.getMdSn()); maindataMineService.insertData(insertMap); } } } log.info("定时任务结束时间: " + LocalDateTime.now()); } }
如上代码依旧可以优化 , 我们可以采用批量数据导入的方式进行 , 这样的话我们分批进行批量数据导入效率会更高~