ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。
假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。
利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。
例子1:快排
快排有2个步骤:
利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素比该元素大;
对左右的两个子数组分别排序。
左右两个子数组相互独立可以并行计算。利用ForkJoinPool:
例子2:求1到n个数的和
上面的代码用到了 RecursiveAction 和 RecursiveTask 两个类,它们都继承自抽象类ForkJoinTask,用到了其中关键的接口 fork()、join()。二者的区别是一个有返回值,一个没有返回值。
RecursiveAction/RecursiveTask类继承关系:
在ForkJoinPool中,对应的接口如下:
与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。
核心数据结构如下所示:
下面看一下这些核心数据结构的构造过程:
关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的数组得以实现。
这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。在 ForkJoinPool开篇的注释中,Doug Lea 特别提到了工作窃取队列的实现,其陈述来自如下两篇论文:"Dynamic CircularWork-Stealing Deque" by Chase and Lev,SPAA 2005与"Idempotent work stealing" byMichael,Saraswat,and Vechev,PPoPP 2009。读者可以在网上查阅相应论文。
所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。
这个队列只有如下几个操作:
Worker线程自己,在队列头部,通过对top指针执行加、减操作,实现入队或出队,这是单线程的。
其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作。
这个队列,在Dynamic Circular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所以这样命名,是因为有两个关键点:
整个队列是环形的,也就是一个数组实现的RingBuffer。并且base会一直累加,不会减小;top会累加、减小。最后,base、top的值都会大于整个数组的长度,只是计算数组下标的时候,会取top&(queue.length-1),base&(queue.length-1)。因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作。
当top-base=queue.length-1 的时候,队列为满,此时需要扩容;
当top=base的时候,队列为空,Worker线程即将进入阻塞状态。
当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?
通过分析工作窃取队列的特性,我们会发现:在 base 一端,是多线程访问的,但它们只会使base变大,也就是使队列中的元素变少。所以队列为满,一定发生在top一端,对top进行累加的时候,这一端却是单线程的!队列的扩容恰好利用了这个单线程的特性!即在扩容过程中,不可能有其他线程对top进行修改,只有线程对base进行修改。
下图为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但top、base的值是不变的!通过top、base对新的数组长度取模,仍然可以定位到元素在新数组中的位置。
下面结合WorkQueue扩容的代码进一步分析。
类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个生命周期和相关的各种状态。不过ctl变量更加复杂,是一个long型变量,代码如下所示。
ctl变量的64个比特位被分成五部分:
AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进去的参数;
TC:次高的16个比特位,表示Total线程数-parallelism;
ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭;
EC:15个比特位,表示阻塞栈的栈顶线程的wait count(关于什么是wait count,接下来解释);
ID:16个比特位,表示阻塞栈的栈顶线程对应的id。
什么叫阻塞栈呢?
要实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。
在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈。把所有空闲的Worker线程放在一个栈里面,这个栈同样通过链表来实现,名为Treiber Stack。前面讲解Phaser的实现原理的时候,也用过这个数据结构。
下图为所有阻塞的Worker线程组成的Treiber Stack。
首先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于每个WorkQueue或ForkJoinWorkerThread对象的地址;
其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个stackPred变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack。
最后,ctl变量的最低16位,记录了栈的栈顶线程的id;中间的15位,记录了栈顶线程被阻塞的次数,也称为wait count。
构造方法中,有如下的代码:
因为在初始的时候,ForkJoinPool 中的线程个数为 0,所以 AC=0-parallelism,TC=0-parallelism。这意味着只有高32位的AC、TC 两个部分填充了值,低32位都是0填充。
在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?
要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:
空闲状态(放在Treiber Stack里面)。
活跃状态(正在执行某个ForkJoinTask,未阻塞)。
阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。
ctl变量很好地反映出了三种状态:
高32位:u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位;
低32位:c=(int) ctl。
c>0,说明Treiber Stack不为空,有空闲线程;c=0,说明没有空闲线程;
ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;
tc>0,说明总线程数 >parallelism。
在提交任务的时候:
在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程,则开启新线程:
ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber Stack。
下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别是如何入栈的。
当一个线程窃取不到任何任务,也就是处于空闲状态时就会阻塞入栈。
在新的任务到来之后,空闲的线程被唤醒,其核心逻辑在signalWork方法里面。
在明白了工作窃取队列、ctl变量的各种状态、Worker的各种状态,以及线程阻塞—唤醒机制之后,接下来综合这些知识,详细分析任务的提交和执行过程。
关于任务的提交,ForkJoinPool最外层的接口如下所示。
如何区分一个任务是内部任务,还是外部任务呢?
可以通过调用该方法的线程类型判断。
如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该任务放入该线程的局部队列;
否则,是外部线程在调用该方法,则将该任务加入全局队列。
内部提交任务,即上面的q.push(task),会放入该线程的工作窃取队列中,代码如下所示。
由于工作窃取队列的特性,操作是单线程的,所以此处不需要执行CAS操作。
lockedPush(task)方法的实现:
外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同。最后,调用signalWork(),通知一个空闲线程来取。
全局队列有任务,局部队列也有任务,每一个Worker线程都会不间断地扫描这些队列,窃取任务来执行。下面从Worker线程的run方法开始分析:
run()方法调用的是所在ForkJoinPool的runWorker方法,如下所示。
下面详细看扫描过程scan(w, a)。