我们都知道在java 使用strem流做多线程处理是非常方便的。
list.parallelStream().forEach(s -> { // 业务处理 });
但是parallelStream是如何实现多线程处理的呢?其实看源码我们会发现parallelStream是使用线程池ForkJoin来调度的,这里我们可以看源码看到
由于我们使用的 forEach,所以直接看类ForEachOps
即可.当然其他方法比如reduce等会有不同的实现类,总的实现类是如下几个
java.util.stream.FindOps.FindOp java.util.stream.ForEachOps java.util.stream.MatchOps.MatchOp java.util.stream.ReduceOps.ReduceOp
里面基本都是调用 evaluateParallel
方法
而 ForEachTask 继承关系如下
这里就找到了我们的答案,核心原理就是Fork Join 框架
而ForkJoinPool的默认线程数是CPU核数 - 1,无参构造方法就可以看出
如何设置 ForkJoinPool的线程数呢
这里有如下几种方法
CountDownLatch countDownLatch = new CountDownLatch(20); int cpu = Runtime.getRuntime().availableProcessors(); System.out.println(cpu); ForkJoinPool pool = new ForkJoinPool(2); List<Integer> list = IntStream.range(0, 20).boxed().collect(Collectors.toList()); pool.submit(() -> { list.parallelStream().forEach(s -> { // 业务处理 System.out.println("thread:" + Thread.currentThread().getName() + "value" + s); countDownLatch.countDown(); }); }); countDownLatch.await();
可以看到始终只有2两个线程
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");
demo验证
System.setProperty( "java.util.concurrent.ForkJoinPool.common.parallelism", "5"); CountDownLatch countDownLatch = new CountDownLatch(5); List<Integer> list = IntStream.range(0, 20).boxed().collect(Collectors.toList()); list.parallelStream().forEach(s -> { // 业务处理 System.out.println("thread:" + Thread.currentThread().getName() + "value" + s); countDownLatch.countDown(); }); countDownLatch.await(); int poolSize = ForkJoinPool.commonPool().getPoolSize(); System.out.println("Pool size: " + poolSize);
这里比较奇怪是设置了5个线程,虽然只有5个,但是主线程也加入了执行,这里暂时不做详细研究,有知道的欢迎留言