对于多个独立的任务,可以以并发的方式执行任务,以提高 CPU 利用率,提高处理效率。
在一个线程池中,开启指定数量的线程,每个线程从任务队列中获取任务执行。
执行的过程中,判断当前线程是否在执行任务的状态,如果没有执行任务,取一条任务执行,如果正在执行,则跳过,下轮再判断。
在所有任务执行完后,关闭线程池。
需要注意的是数据结构的选择,须选择并发类的数据结构,不然可能出现阻塞,死锁等情况。
(具体逻辑参考源码)
/** * 并发执行器示例 */ public class ConcurrentExecutorTest { /** * 测试 */ public static void main(String[] args) { for (int i = 0; i < 100; i++) { test(); } } private static void test() { Map<String, String> paramMap = new LinkedHashMap<>(); for (int i = 0; i < 10; i++) { paramMap.put("key:" + i, "value:" + i); } final ConcurrentExecutor<String, String, Integer> executor = new ConcurrentExecutor<>(5, paramMap, (k, v) -> { ThreadUtil.sleep(10); System.out.println(Thread.currentThread().getName() + "-" + v); final int abs = Math.abs(Objects.hash(v)); if (abs % 3 == 0) { int i = 1 / 0; } return abs; }); executor.execute(); System.out.println("success result: " + executor.getSuccessResultMap()); System.out.println("error result: " + executor.getErrorResultMap()); } }
测试结果
pool-1-thread-1-value:0 pool-1-thread-2-value:1 pool-1-thread-4-value:3 pool-1-thread-3-value:2 pool-1-thread-3-value:8 pool-1-thread-1-value:5 pool-1-thread-5-value:4 pool-1-thread-2-value:6 pool-1-thread-4-value:7 pool-1-thread-3-value:9 success result: {key:2=231604360, key:0=231604358, key:6=231604364, key:5=231604363, key:3=231604361, key:9=231604367, key:8=231604366} error result: {key:1=java.lang.ArithmeticException: / by zero, key:4=java.lang.ArithmeticException: / by zero, key:7=java.lang.ArithmeticException: / by zero}
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.lang.Assert; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.BooleanUtil; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.function.BiFunction; /** * 并发执行器 * <p> * 适用场景:每个任务是独立的,不耦合的 * * @author lilou * @since 2022/6/9 9:05 */ public class ConcurrentExecutor<K, V, R> { /** * 任务参数映射(K:key的类型,V:值的类型) */ private final Map<K, V> paramMap; /** * 成功的任务结果映射(R:结果类型) */ private final Map<K, R> successResultMap; /** * 失败的任务结果映射 */ private final Map<K, Throwable> errorResultMap; /** * 当前运行中的key集合 */ private final Set<K> runningKeySet; /** * 候选任务key队列 */ private final Queue<K> candidateKeyQueue; /** * 同时运行的最大线程数量 */ private final int maxThreadNum; /** * 执行器 */ private final ExecutorService executorService; /** * 具体任务策略 */ private final BiFunction<K, V, R> biFunction; /** * 当前index线程的运行状态,可依据此状态,判断是否立刻从任务参数中获取任务执行 */ private final Map<Integer, Boolean> currentIndexThreadRunningStatusMap; public ConcurrentExecutor(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> biFunction) { Assert.notNull(paramMap, "paramMap不可为空"); Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1"); final int paramSize = paramMap.size(); this.maxThreadNum = Math.min(maxThreadNum, paramSize); // tips: 须转换成同步类的map数据结构,如果错误地使用 this.paramMap = paramMap; 且外部使用了HashMap 或 LinkedHashMap,多测试几遍会发现,偶尔会陷入了阻塞 this.paramMap = Collections.synchronizedMap(paramMap); this.candidateKeyQueue = new ConcurrentLinkedQueue<>(paramMap.keySet()); this.runningKeySet = new ConcurrentHashSet<>(paramSize); this.biFunction = biFunction; this.executorService = ThreadUtil.newExecutor(this.maxThreadNum, this.maxThreadNum, Integer.MAX_VALUE); this.currentIndexThreadRunningStatusMap = new ConcurrentHashMap<>(this.maxThreadNum); this.successResultMap = new ConcurrentHashMap<>(this.paramMap.size()); this.errorResultMap = new ConcurrentHashMap<>(); } public void execute() { while (CollUtil.isNotEmpty(paramMap)) { // 最多同时有 maxRunningThreadNumber 同时消费 taskMap 中的数据 for (int i = 0; i < this.maxThreadNum; i++) { int currentIndex = i; // 当前线程上次还未执行完,暂时跳过 final Boolean isRunning = currentIndexThreadRunningStatusMap.getOrDefault(currentIndex, false); if (BooleanUtil.isTrue(isRunning)) { continue; } // 每个线程只处理和自己相关的 final K candidateKey = pickCandidateKey(); // 当前没有对应key的任务 if (Objects.isNull(candidateKey)) { continue; } // 在线程池中运行任务 executorService.submit(() -> { try { currentIndexThreadRunningStatusMap.put(currentIndex, true); final V data = paramMap.get(candidateKey); // 开始执行任务 final R result = biFunction.apply(candidateKey, data); // 存入正常结果 successResultMap.put(candidateKey, result); } catch (Exception e) { // 存入异常结果 errorResultMap.put(candidateKey, e); } finally { paramMap.remove(candidateKey); candidateKeyQueue.remove(candidateKey); currentIndexThreadRunningStatusMap.remove(currentIndex); } }); } } executorService.shutdown(); } /** * 从候选任务key队列中选择一个任务key */ private K pickCandidateKey() { for (K candidateKey : candidateKeyQueue) { if (!runningKeySet.contains(candidateKey)) { runningKeySet.add(candidateKey); return candidateKey; } } return null; } public Map<K, R> getSuccessResultMap() { return successResultMap; } public Map<K, Throwable> getErrorResultMap() { return errorResultMap; } }