fork-join
框架允许在几个工作进程中断某个任务,然后等待结果组合它们。 它在很大程度上利用了多处理器机器的生产能力。 以下是fork-join
框架中使用的核心概念和对象。
Fork是一个进程,其中任务将其分成可以并发执行的较小且独立的子任务。
语法
Sum left = new Sum(array, low, mid); left.fork();
这里Sum
是RecursiveTask
的子类,left.fork()
方法将任务分解为子任务。
连接(Join
)是子任务完成执行后任务加入子任务的所有结果的过程,否则它会持续等待。
语法
left.join();
这里剩下的是Sum
类的一个对象。
它是一个特殊的线程池,旨在使用fork-and-join
任务拆分。
语法
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这里有一个新的ForkJoinPool
,并行级别为4
个CPU。
RecursiveAction
表示不返回任何值的任务。
语法
class Writer extends RecursiveAction { @Override protected void compute() { } }
RecursiveTask
表示返回值的任务。
语法
class Sum extends RecursiveTask { @Override protected Long compute() { return null; } }
以下TestThread
程序显示了基于线程的环境中Fork-Join
框架的使用。
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class TestThread { public static void main(final String[] arguments) throws InterruptedException, ExecutionException { int nThreads = Runtime.getRuntime().availableProcessors(); System.out.println(nThreads); int[] numbers = new int[1000]; for(int i=0; i< numbers.length; i++){ numbers[i] = i; } ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads); Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length)); System.out.println(result); } static class Sum extends RecursiveTask<Long> { int low; int high; int[] array; Sum(int[] array, int low, int high) { this.array = array; this.low = low; this.high = high; } protected Long compute() { if(high - low <= 10) { long sum = 0; for(int i=low; i < high; ++i) sum += array[i]; return sum; } else { int mid = low + (high - low) / 2; Sum left = new Sum(array, low, mid); Sum right = new Sum(array, mid, high); left.fork(); long rightResult = right.compute(); long leftResult = left.join(); return leftResult + rightResult; } } } }
这将产生以下结果 -
4