Stream 接口提供了parallelStream方法来将集合转换为并行流。即将一个集合分为多个数据块,并用不同的线程分别处理每个数据块的流。
并且使用parallelStream 时无需担心内部变量控制,线程数量等问题。
如使用并行流计算1至100000累加之和:
Stream.iterate(1L, param1 -> Math.addExact(param1, 1)) .limit(100000) .parallel() .sequential() .parallel() .reduce(0L, Math::addExact) .longValue();
通常我们认为在数据量到达一定程度时,使用多线程计算会获得更好的性能。但实际效果应该在测量比较之后才直到。
使用并行流和顺序流别计算1至100000 的累加之和,在我的四核英特尔机器上运行结果如下:
long start = System.currentTimeMillis(); Stream.iterate(1L, param1 -> Math.addExact(param1, 1)) .limit(100000) .parallel() .reduce(0L, Math::addExact) .longValue(); System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start)); start = System.currentTimeMillis(); LongStream.rangeClosed(1, 100000) .reduce(0L, Math::addExact); System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start)); Parallel accumulate sum, used 64 ms. Sequential accumulate sum, used 8 ms.
通过以上结果可以看到,并行流计算的耗时竟然是顺序流的好几倍,这与我们的预期结果差距十分的大。
要想明白这差距的原因,首先得明白影响上面并行流的速度的因素有那些:
修复上面两个影响并行流的速度的问题后,重新运行结果如下:
long start = System.currentTimeMillis(); LongStream.rangeClosed(1, 100000) .parallel() .reduce(0L, Math::addExact); System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start)); start = System.currentTimeMillis(); LongStream.rangeClosed(1, 100000) .reduce(0L, Math::addExact); System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start)); Parallel accumulate sum, used 7 ms. Sequential accumulate sum, used 3 ms.
并行流的速度得到了很大提升,这表明并行化时需要使用正确的数据结构。
但是顺序流的速度却仍然更快,这说明并行化也是有代价的,如下:
而并行过程需要对流要递归划分,再把每个子流的归纳操作分配到不同的线程,最后把这些操作的结果合并成一个值。
在子流归纳操作时间过短时,并行化并没有带来性能提升,反而是更加慢了。
再将数据提升至上亿级别进行运算,并行流终于取得了一些领先。
long start = System.currentTimeMillis(); LongStream.rangeClosed(1, 100000000) .parallel() .reduce(0L, Math::addExact); System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start)); start = System.currentTimeMillis(); LongStream.rangeClosed(1, 100000000) .reduce(0L, Math::addExact); System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start)); Parallel accumulate sum, used 79 ms. Sequential accumulate sum, used 264 ms.
关于在什么地方使用parallelStream 没有绝对的建议,而是只能做定性分析。下列是一些可能影响性能的地方:
ParallelStream流背后使用的基础架构是Java 7中引入的Fork/Join分支合并框架。
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。
这其实就是分治算法的并行版本。