MySql教程

多线程导入数据到mysql 一

本文主要是介绍多线程导入数据到mysql 一,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

导入入口controller

import com.hanxiaozhang.dictonecode.domain.TestDO;
import com.hanxiaozhang.dictonecode.service.TestService;
import com.hanxiaozhang.utils.EntityListToExcelUtil;
import com.hanxiaozhang.utils.JsonUtil;
import com.hanxiaozhang.utils.R;
import com.hanxiaozhang.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 */
@Slf4j
@Controller
@RequestMapping()
public class TestController {

    @Autowired
    private TestService testService;

    @GetMapping
    public String excelTest(){

        return "importExcel";

    }

    @ResponseBody
    @PostMapping("/importExcel")
    public R importExcel(@RequestParam(value = "file") MultipartFile file) {

        if (file == null) {
            return R.error(1, "文件不能为空");
        }

        if (StringUtil.isBlank(file.getOriginalFilename()) || file.getSize() == 0) {
            return R.error(1, "文件不能为空");
        }

        long startTime = System.currentTimeMillis();
        log.info("Excel开始导入,logId:[{}]", startTime);
        //数据导入处理
        R r = testService.importExcel(file);

        if ("1".equals(r.get("code").toString())) {
            Map<String, Object> map = (Map) r.get("map");
            map.put("logId",startTime);
            log.info("Excel导入出错,logId:[{}]", startTime);
            return R.error(1, map, "导入时有错误信息");
        }
        long endTime = System.currentTimeMillis();
        log.info("Excel导入成功,logId:[{}],导入Excel耗时(ms):[{}]", startTime,endTime-startTime);
        return r;
    }



    @ResponseBody
    @PostMapping("/exportExcel")
    public void exportExcel(@RequestParam("data") String data, HttpServletResponse response) throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {

        //将Json字符串转Map
        Map<String, Object> params = JsonUtil.jsonToMapSO(data);
        log.info("Excel导出错误信息,logId:[{}]", params.get("logId").toString());
        //response设置返回类型
        setDownloadExcelResponse(response, params.get("fileName").toString());
        //数据导出为excel
        EntityListToExcelUtil.getInstance().
                executeXLSX(JsonUtil.jsonToLinkedHashMapSS(params.get("title").toString()),
                        JsonUtil.jsonToList(params.get("errorData").toString(), TestDO.class),
                        response.getOutputStream());



    }

    /**
     * 设置下载文件响应信息
     *
     * @param response
     * @param fileName
     */
    private void setDownloadExcelResponse(HttpServletResponse response, String fileName) {

        try {
            fileName = new String(fileName.getBytes(), "ISO8859-1");
        } catch (UnsupportedEncodingException e) {
            log.error("该文件[{}]不支持此编码转换,异常消息:[{}]",fileName,e.getMessage());
        }
        response.setContentType("application/vnd.ms-excel;charset=UTF-8");
        response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
        //使用Content-Disposition,一定要确保没有禁止浏览器缓存的操作
        response.setHeader("Pragma", "No-cache");
        response.setHeader("Cache-Control", "No-cache");
        response.setDateHeader("Expires", 0);
    }


}

导入模板

/**
 * 〈一句话功能简述〉<br>
 * 〈数据字典数据导入〉
 */
@Slf4j
public abstract class SaveExcelTemplateHandle<T> implements SaveExcelService<T> {

    @Override
    @Transactional(rollbackFor = Exception.class)
    public ErrorInfoEntity batchSave(List<T> list, MultiThreadEndFlag flag) throws Exception {
        int resultFlag = 0;
        try {
            log.info("batchSave(),当前线程名称:[{}]", Thread.currentThread().getName());
            //创建返回错误信息实体
            ErrorInfoEntity errorInfoEntity = new ErrorInfoEntity();
            //业务操作
            List<T> errorList = handle(list);
            //赋值错误数据
            errorInfoEntity.setErrorList(errorList);
            //操作成功
            resultFlag = 1;
            //等待其他线程完成操作
            flag.waitForEnd(resultFlag);
            //其他线程异常手工回滚
            if (resultFlag == 1 && !flag.allSuccessFlag()) {
                String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚";
                log.info(message);
                throw new Exception(message);
            }
            return errorInfoEntity;
        } catch (Exception e) {
            log.info("batchSave() error,当前线程名称:[{}]", Thread.currentThread().getName());
            log.error(e.toString());
            throw e;
        } finally {
            //本身线程异常抛出异常,并且没有调用flag.waitForEnd()时触发
            if (resultFlag == 0) {
                flag.waitForEnd(resultFlag);
            }
        }
    }

    protected abstract List<T> handle(List<T> list);


}
import com.hanxiaozhang.sourcecode.executor.MyThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import static java.util.concurrent.Executors.*;

/**
 * 〈一句话功能简述〉<br>
 * 〈导入Excel执行器〉
 * <p>
 *
 */
@Slf4j
public class ImportExcelExecutor {


    private final static int MAX_THREAD = 10;


    /**
     * 执行方法(分批创建子线程)
     *
     * @param saveService 保存的服务
     * @param lists       数据List
     * @param groupLen    分组的长度
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <T> List<T> execute(SaveExcelService<T> saveService, List<T> lists, int groupLen) throws ExecutionException, InterruptedException {

        if (lists == null || lists.size() == 0) {
            return null;
        }

        List<T> errorList = new ArrayList<>();

        //创建一个固定线程池
        ExecutorService executorService = new MyThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(), defaultThreadFactory(),
                new MyThreadPoolExecutor.MyAbortPolicy());
        //创建一个Future集合
        List<Future<ErrorInfoEntity>> futures = new ArrayList<>();
        //集合的元素个数
        int size = lists.size();

        //适配线程池数与分组长度
        //Math.ceil()对小数向下“”取整”
        int batches = (int) Math.ceil(size * 1.0 / groupLen);
        //分组超长最大线程限制,则设置分组数为最大线程限制,计算分组集合尺寸
        if (batches > MAX_THREAD) {
            batches = MAX_THREAD;
            groupLen = (int) Math.ceil(size * 1.0 / batches);
        }
        log.info("总条数:[{}],批次数量:[{}],每批数据量:[{}]", size, batches, groupLen);


        MultiThreadEndFlag flag = new MultiThreadEndFlag(batches);

        int startIndex, toIndex, maxIndex = lists.size();

        for (int i = 0; i < batches; i++) {
            //开始索引位置
            startIndex = i * groupLen;
            //截止索引位置
            toIndex = startIndex + groupLen;
            //如果截止索引大于最大索引,截止索引等于最大索引
            if (toIndex > maxIndex) {
                toIndex = maxIndex;
            }
            //截取数组
            List<T> temp = lists.subList(startIndex, toIndex);
            if (temp == null || temp.size() == 0) {
                continue;
            }
            futures.add(executorService.submit(new ImportExcelTask(saveService, temp, flag)));
        }
        flag.end();

        //子线程全部等待返回(存在异常,则直接抛向主线程)
        for (Future<ErrorInfoEntity> future : futures) {
            errorList.addAll(future.get().getErrorList());
        }

        //所有线程返回后,关闭线程池
        executorService.shutdown();

        return errorList;
    }

    private static int getPoolInfo(ThreadPoolExecutor tpe) {

        int queueSize = tpe.getQueue().size();
        System.out.println("当前排队线程数:" + queueSize);

        int activeCount = tpe.getActiveCount();
        System.out.println("当前活动线程数:" + activeCount);

        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("执行完成线程数:" + completedTaskCount);

        long taskCount = tpe.getTaskCount();
        System.out.println("总线程数:" + taskCount);

        //线程池中当前线程的数量,为0时意味着没有任何线程,线程池会终止,此值不会超过MaximumPoolSize
        System.out.println("当前线程的数量:" + tpe.getPoolSize());

        //线程池的初始线程数量(当没有任务提交,或提交任务数小于此值值,实际并不会产生那么多线程数)
        System.out.println("线程池的初始线程数量:" + tpe.getCorePoolSize());

        //线程池可允许最大的线程数
        System.out.println("线程池可允许最大的线程数" + tpe.getMaximumPoolSize());
        tpe.getLargestPoolSize();

        return queueSize;
    }

}
import java.util.List;
import java.util.concurrent.Callable;

/**
 * 〈功能描述〉<br>
 * 〈导入Excel任务〉
 *
 */
public class ImportExcelTask<T> implements Callable<ErrorInfoEntity> {

    /**
     * 保存Excel服务
     */
    private SaveExcelService excelService;

    /**
     * 数据集合
     */
    private List<T> list;

    /**
     * 多线程数据结束标志
     */
    private MultiThreadEndFlag flag;

    /**
     * 构造函数
     *
     * @param excelService
     * @param list
     * @param flag
     */
    public ImportExcelTask(SaveExcelService<T> excelService,List<T> list,MultiThreadEndFlag flag){
        this.excelService=excelService;
        this.list=list;
        this.flag=flag;
    }


    @Override
    public ErrorInfoEntity call() throws Exception {
        return excelService.batchSave(list,flag);
    }


}
import lombok.extern.slf4j.Slf4j;
/**
 * 功能描述: <br>
 * 〈多线程结束标志〉
 *
 */
@Slf4j
public class MultiThreadEndFlag {

    /**
     * 是否解除等待
     */
    private volatile boolean releaseWaitFlag = false;

    /**
     * 是否全部执行成功
     */
    private volatile boolean allSuccessFlag = false;

    /**
     * 线程个数
     */
    private volatile int threadCount = 0;

    /**
     * 失败个数
     */
    private volatile int failCount = 0;

    /**
     * 失败个数
     */
    private int count = 0;

    /**
     * 初始化子线程的总数
     *
     * @param count
     */
    public MultiThreadEndFlag(int count) {
        threadCount = count;
    }


    public boolean allSuccessFlag() {
        return allSuccessFlag;
    }

    /**
     * 等待全部结束
     *
     * @param resultFlag
     */
    public synchronized void waitForEnd(int resultFlag) {
        //统计失败的线程个数
        if (resultFlag == 0) {
            failCount++;
        }
        threadCount--;
        // log.info("waitForEnd(),等待全部结束:[{}],[{}]", threadCount, Thread.currentThread().getName());
        while (!releaseWaitFlag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行结束通知
     */
    public synchronized void go() {
        releaseWaitFlag = true;
        //结果都显示成功
        allSuccessFlag = (failCount == 0);
        notifyAll();
    }

    /**
     * 等待结束
     */
    public void end() {
        while (threadCount > 0) {
            waitFunc(50);
        }
        log.info("线程全部执行完毕通知");
        go();
    }

    /**
     * 等待
     */
    private void waitFunc(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
import java.util.List;


/**
 * 〈一句话功能简述〉<br>
 * 〈保存ExcelService〉
 *
 */

public interface SaveExcelService<T> {


    /**
     * 批量保存
     *
     * @param list
     * @param flag
     * @return
     * @throws Exception
     */
    ErrorInfoEntity batchSave(List<T> list, MultiThreadEndFlag flag) throws Exception;

}
@Slf4j
@Service
public class TestSaveExcelServiceImpl extends SaveExcelTemplateHandle<TestDO> {


    @Resource
    private TestDao testDao;

    /**
     * 处理相关数据(抛异常则全部回滚,否则则返回前端未上传列表)
     * 业务类
     * @param list
     * @return
     */
    protected List<TestDO> handle(List<TestDO> list) {
        List<TestDO> errorList = new ArrayList<>();
        list.forEach(x -> {
            boolean flag=true;
            List<String> errorMsg=new ArrayList<>();
            //模拟一个业务数据错误,姓名不能为空
            if (StringUtil.isBlank(x.getName())) {
                errorMsg.add("姓名不能为空!");
                flag=false;
                throw new RuntimeException();
            }
            //模拟一个业务数据错误,类型不能为空
            if (StringUtil.isBlank(x.getType())){
                errorMsg.add("类型不能为空!");
                flag=false;
            }
            if (flag){
                testDao.save(x);
            }else {
                x.setRemarks(String.join("\n",errorMsg));
                errorList.add(x);
            }
        });

        return errorList;
    }


}
import com.hanxiaozhang.dictonecode.dao.TestDao;
import com.hanxiaozhang.dictonecode.domain.TestDO;
import com.hanxiaozhang.dictonecode.service.TestService;
import com.hanxiaozhang.importexcel.TestSaveExcelServiceImpl;
import com.hanxiaozhang.importexcel.ImportExcelExecutor;
import com.hanxiaozhang.importexcelnew.TestSaveExcelNewServiceImpl;
import com.hanxiaozhang.utils.ExcelToEntityListUtil;
import com.hanxiaozhang.utils.R;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;


@Slf4j
@Service
public class TestServiceImpl implements TestService {

    @Resource
    private TestDao testDao;

    @Autowired
    private TestSaveExcelServiceImpl testSaveExcelService;

    @Autowired
    private TestSaveExcelNewServiceImpl dictSaveExcelNewService;

    @Override
    public R importExcel(MultipartFile file) {

        try {
            //读取Excel中数据
            ArrayList<TestDO> list = ExcelToEntityListUtil.getInstance().execute(TestDO.class, file.getInputStream(), initTitleToAttr());
            log.info("读取Excel中数据的条数:[{}]",list.size());
            //多线程处理数据,并导出错误数据
             List<TestDO> errorList = ImportExcelExecutor.execute(testSaveExcelService, list, 14);
            // List<TestDO> errorList = ImportExcelNewExecutor.execute(dictSaveExcelNewService, list, 14);
            //封装错误数据
            if (errorList!=null&&!errorList.isEmpty()) {
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("errorData", errorList);
                map.put("title", initAttrToTitle());
                map.put("fileName", "有问题数据.xlsx");
                return R.error(map);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvalidFormatException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return R.ok();
    }

    private Map<String,String> initTitleToAttr(){
        Map<String, String> map = new LinkedHashMap<>(8);
        map.put("姓名","name");
        map.put("值","value");
        map.put("类型","type");
        map.put("描述","description");
        map.put("时间","createDate");
        return map;
    }

    private Map<String,String> initAttrToTitle(){
        Map<String, String> map = new LinkedHashMap<>(8);
        map.put("name","姓名");
        map.put("value","值");
        map.put("type","类型");
        map.put("description","描述");
        map.put("createDate","时间");
        map.put("remarks","数据问题备注");
        return map;
    }


}
public interface TestService {



    R importExcel(MultipartFile file);


}

这篇关于多线程导入数据到mysql 一的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!