ForkJoin是由JDK1.7后提供多线并发处理框架,ForkJoin的框架的基本思想是分而治之。使用ForkJoin将相同的计算任务通过多线程的进行执行。从而能提高数据的计算速度。
分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。
使用ForkJoin框架,需要创建一个ForkJoin的任务。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。
我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。
RecursiveTask在进行exec之后会使用一个result的变量进行接受返回的结果。而RecursiveAction在exec后是不会保存返回结果。
ForkJoinTask
: 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类
RecursiveAction
: 无结果返回的任务RecursiveTask
: 有返回结果的任务说明:
fork
: 让task异步执行join
: 让task同步执行,可以获取返回值ForkJoinPool
执行 ForkJoinTask
,
三中提交方式:
execute
异步,无返回结果submit
异步,有返回结果 (返回Future<T>
)invoke
同步,有返回结果 (会阻塞)package com.bxwell.hj360.device.module.devicedatamanagement.service.impl; import com.bxwell.dc.service.api.data.vo.*; import com.bxwell.hj360.common.base.common.until.StringUtil; import com.bxwell.hj360.common.model.device.BxWellDeviceFactor; import org.springframework.util.ObjectUtils; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.stream.Collectors; import java.util.stream.IntStream; public class TimeForkJoinPoolCalcRate { class CalcTask extends RecursiveAction{ private final Integer MAX = 100; private String deviceId; String factorId; private Map<String, BigDecimal> unitRateMap; private Map<String, BxWellDeviceFactor> cunit; private List<MinDataShowVO> listMinDataShowVO; private List<HourDataShowVO> listHourDataShowVO; private List<DayDataShowVO> listDayDataShowVO; private BigDecimal rateBig; private BigDecimal totalUnitIdRateBig; public CalcTask(String deviceId,String factorId ,Map<String, BigDecimal> unitRateMap,Map<String, BxWellDeviceFactor> cunit, List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO,BigDecimal rateBig,BigDecimal totalUnitIdRateBig){ this.deviceId = deviceId; this.factorId = factorId; this.unitRateMap = unitRateMap; this.cunit = cunit; this.listMinDataShowVO = listMinDataShowVO; this.listHourDataShowVO = listHourDataShowVO; this.listDayDataShowVO = listDayDataShowVO; this.rateBig = rateBig; this.totalUnitIdRateBig = totalUnitIdRateBig; } @Override protected void compute() { if(listMinDataShowVO != null){ if(listMinDataShowVO.size() <= MAX){ handleMinData(listMinDataShowVO); }else { handleThreadNum(listMinDataShowVO); } }else if(listHourDataShowVO != null){ if(listHourDataShowVO.size() <= MAX){ handleHourData(listHourDataShowVO); }else { handleThreadNum(listHourDataShowVO); } }else if(listDayDataShowVO != null){ if(listDayDataShowVO.size() <= MAX){ handleDayData(listDayDataShowVO); }else { handleThreadNum(listDayDataShowVO); } } } private void handleThreadNum(List<?> list){ int res = list.size() <= 1000 ? 1 : list.size() <= 10000 ? 2 : list.size() <= 50000 ? 3 : list.size() <= 100000 ? 4 : 5; switch (res){ case 1 : CreateCalcTask(list,res + 5); break; case 2 : CreateCalcTask(list,res + 10); break; case 3 : CreateCalcTask(list,res + 15); break; case 4 : CreateCalcTask(list,res + 20); break; case 5 : CreateCalcTask(list,res + 30); break; } } private void handleMinData(List<MinDataShowVO> list){ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); for(MinDataShowVO vo : list){ if(!StringUtil.isEmpty(vo.getEndTime())){ vo.setTimeStr(format.format(vo.getTime())+"-"+vo.getEndTime()); } if(null != rateBig){ if(null != vo.getMin()){ vo.setMin(new BigDecimal(SampleForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig)))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig)))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig)))); } }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点 if(null != vo.getMin()){ vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin()))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax()))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg()))); } } if(null != totalUnitIdRateBig && null != vo.getCou()){ vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig)))); } if(null == totalUnitIdRateBig || null == vo.getCou()){ //前端需处理成斜杠 vo.setCou(null); } } } private void handleHourData(List<HourDataShowVO> list){ for(HourDataShowVO vo : list){ if(null != rateBig){ if(null != vo.getMin()){ vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig)))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig)))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig)))); } }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点 if(null != vo.getMin()){ vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin()))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax()))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg()))); } } if(null != totalUnitIdRateBig){ if(null != vo.getCou()){ vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig)))); } }else{ //前端需处理成斜杠 vo.setCou(null); } } } private void handleDayData(List<DayDataShowVO> list){ for(DayDataShowVO vo : list){ if(null != rateBig){ if(null != vo.getMin()){ vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig)))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig)))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig)))); } }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点 if(null != vo.getMin()){ vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin()))); } if(null != vo.getMax()){ vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax()))); } if(null != vo.getAvg()){ vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg()))); } } if(null != totalUnitIdRateBig){ if(null != vo.getCou()){ vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig)))); } }else{ //前端需处理成斜杠 vo.setCou(null); } } } private void CreateCalcTask(List<?> list,int num){ int cou = 0; if(list != null){ List<? extends List<?>> lists = rangeList(list, list.size() / (num - 1)); while (cou < num){ CalcTask calcTask = new CalcTask(deviceId,factorId, unitRateMap, cunit, listMinDataShowVO != null ? (List<MinDataShowVO>)lists.get(cou) : null, listHourDataShowVO != null ? (List<HourDataShowVO>)lists.get(cou) : null, listDayDataShowVO != null ? (List<DayDataShowVO>)lists.get(cou) : null, rateBig,totalUnitIdRateBig); cou++; calcTask.fork(); calcTask.join(); } } } /** * 平均分割集合 最后一个角标数据取余 * @param list 数据集合 * @param size [] * @param <T> * @return */ private <T> List<List<T>> rangeList(List<T>list , int size){ if(ObjectUtils.isEmpty(list)){ return Collections.emptyList(); } int block = (list.size() + size -1) / size; return IntStream.range(0,block). boxed().map(i->{ int start = i*size; int end = Math.min(start + size,list.size()); return list.subList(start,end); }).collect(Collectors.toList()); } } /** * 返回保留几位小数 * @param str * @return */ private static int countNum(String str){ int d = str.indexOf("."); String str2 = str.substring(d+1,str.length()); String[] split = str2.split(""); int num = 2; for (int a = 0;a < split.length;a++) { if(!split[a].equals("0")){ if(a > 1){ num = a+2; break; } } } return num; } /** * 根据规则保留有效数字 * @param val * @return */ public static String handleRateNum(BigDecimal val){ if(val == null){ return null; } String strVal = val.toPlainString(); BigDecimal vall = new BigDecimal(strVal); BigDecimal bigdecimal = null; //小于0.01跟-0.01需要保留有效位数两位 if(vall.compareTo(new BigDecimal(0.01)) == -1){ bigdecimal = vall.setScale(countNum(strVal),BigDecimal.ROUND_HALF_UP); }else{ //大于0.01需要保留两位小数 bigdecimal = vall.setScale(2,BigDecimal.ROUND_HALF_UP); } //如果保留的两位小数是00的话直接取整 if(bigdecimal.intValue() > 0 && String.valueOf(bigdecimal).contains("00")){ return String.valueOf(bigdecimal.intValue()); }else{ return bigdecimal.toPlainString(); } } /** * * @param deviceId * @param factorId * @param unitRateMap 单位转换map * @param cunit 自定义单位 基准单位map 累计值单位 * @param listMinDataShowVO * @param listHourDataShowVO * @param listDayDataShowVO * @throws InterruptedException */ public static void CalcRate(String deviceId,String factorId, Map<String, BigDecimal> unitRateMap, Map<String, BxWellDeviceFactor> cunit, List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO) throws InterruptedException { //自定义单位 BigDecimal rateBig = unitRateMap.get(cunit.get(deviceId +"_"+ factorId)); //自定义累计单位 String factorTotalUnitId = cunit.get(deviceId + "_" + factorId).getFactorTotalUnitId(); String totalUnitId = cunit.get(deviceId + "_" + factorId).getTotalUnitId(); BigDecimal totalUnitIdRateBig = unitRateMap.get(factorTotalUnitId + "_" + totalUnitId); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(new TimeForkJoinPoolCalcRate().new CalcTask(deviceId,factorId, unitRateMap, cunit, listMinDataShowVO,listHourDataShowVO,listDayDataShowVO, rateBig,totalUnitIdRateBig)); pool.shutdown(); } }