在JDK7之前,并行处理数据集合非常麻烦。首先需要自己明确的把包含数据的数据结构分成若干个子部分,第二需要给每个子部分分配一个独立的线程;第三需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分合并起来。
Doug Lea 在JDK7中引入了fork/join框架,让这些操作更稳定,更不易出错。
本节主要内容:
1. 用并行流并行处理数据
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流
学完本节期望能达到:
1. 熟练使用并行流,来加速业务性能
2. 了解流内部的工作原理,以防止误用的情况
3. 通过Spliterator控制数据块的划分方式
可以通过对数据源调用parallelStream方法来将源转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样可以自动将工作负荷转到多核中并行处理。
考虑下面一个实现:给定正整数n,计算 1 + 2 + … n的和。
使用stream的实现:
private static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); }
将上面的顺序流转换为并行流,实现如下:
private static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }
即通过调用方法parallel可将顺序流转换为并行流。
但需要注意的是流仅在终端操作时才开始执行,所以当前流是顺序流还是并行流以最靠近终端操作的流类型为准,示例:
list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);
此种情况并不会按预想的先使用并行流执行过滤,再按顺序流执行映射转换。而是整个流水线操作都按并行流执行。
并行流内部使用了默认的ForkJoinPool, 它默认的线程数量就是处理器的数量(Runtime.getRuntime().availableProcessors())。也可以通过设置系统属性来改变它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一个全局设置,会影响所有的并行流,一般而言线程数等于处理器数量是一个合理的数值,不需要修改。
一般而言,同一个功能给我们的感觉是并行流性能会比顺序流性能更好。然而在软件工程中,优化性能的黄金准则是:测量。我们开发了程序,用来测量4种写法的累加,看看性能如何:
@Slf4j public class SumSample { /** * 顺序流、并行流性能测试 * 实现1~1亿整型数字累加 * */ public static void main(String[] args) { CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000))); } /** * 内部迭代方式实现累加 */ private static long forSum(long n) { long result = 0; for (int i = 1; i <= n; i ++) { result += i; } return result; } /** * 顺序流实现累加 */ private static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); } /** * 并行流实现累加 */ private static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); } /** * long原生流范围实现累加 */ private static long longParallelSum(long n) { return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum); } } // result: 2022-01-18 10:53:59.035 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 58 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 1420 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.627 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 4167 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 60
使用四种方法实现1~1亿个数的累加,这是在i7 2.4GHz 6core/12threads CPU的执行结果。让人很意外,并非是并行流性能最好,反而是最差的,最朴实的for循环单线程性能最佳。
原因:
通过上面的比较需要意识到:并行编程比较复杂,有时候甚至违反直觉。如果用的不对(如本例,采用了一个不易并行化的操作iterate),甚至会让性能更差。所以了解parallel方法背后的执行细节非常必要。
仅高效求和的示例,可用LongStream.rangeClosed高效替代iterate实现并行计算。它的优点是:
通过示例演示它的并行执行性能比同样是并行流的iterate版本要快了70倍。可见它有效利用了并行。
上面的执行结果可以看出LongStream.rangeClosed的性能还是比for略慢一点,原因是:
并行化是有代价的,并行过程中需要对流做递归划分,把流的归纳操作分配到不同的线程,最后合并。且多个核心之间移动数据的代价也很大。
使用并行流加速性能需要确保用对,如果计算结果是错误的,再快也没意义。
误用并行流而产生错误的首要原因是使用的算法改变了某些共享状态。 如下面示例:
class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } //result: 2022-01-18 11:40:16.943 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sideEffectSum: 1 + ... + 100_000_000, result: 1037016191509285 2022-01-18 11:40:16.944 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 40
从上面示例看出虽然很快,但结果是错误的。 原因是total += value非原子操作,出现了竞态条件。如果使用同步来修复,就失去了并行的意义。 所以写并行流时一定要考虑多个线程是否会修改共享对象的可变状态。
一些高效使用并行流的建议:
一些常见的数据源的可分解性汇总:
想要正确的使用并行流,了解它背后的实现原理至关重要。 并行流背后就是采用的Fork/Join框架。
// TODO: 待补充
// TODO: 待补充