RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务
SHUTDOWN:不再接受新任务,但要处理任务队列里的任务
STOP:不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务
TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程
TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕
源码中ThreadPoolExecutor类的第一个成员变量ctl就是用来表示线程池状态以及线程池中的线程数量,其中它的高3位表示线程池的状态,低29位表示线程池中现有的线程数。AtomicInteger这个类可以通过CAS达到无锁并发,效率比较高,并且用一个变量标识两种含义,企图用最少的变量来减少锁竞争,提高并发效率,Doug Lea大佬这里的设计是值得我们学习的地方。至于为什么用int类型来表示线程池状态信息,Doug Lea大佬也做出了解释:为了将两种含义组装成一个int数,我们将workerCount限制为(2^29)-1(约5亿)线程而不是(2^31)-1 (20亿),如果将来出现问题(线程数不够用),可以将变量更改为一个AtomicLong,并调整上面的shift/mask常量。但是在出现问题之前,使用int可以使这段代码更快、更简单。
corePoolSize:核心线程数,也就是线程池期望池化的最大线程数
maximumPoolSize:最大线程数,也就是允许线程池临时超载后达到的最大线程数,虽然期望将并发状态保持在一定范围,但是在任务过多的情况下,增加非核心线程来临时处理任务。非核心线程数 = maximumPoolSize - corePoolSize
workQueue:阻塞队列,存储实现Runnable接口的线程任务
keepAliveTime:在没有任务时线程的存活时间
threadFactory:线程工厂,用来创建新线程
handler:当任务已满,并且无法再增加线程数时,或拒绝添加任务时,所执行的策略
主要步骤:
1.提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务。
2.如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
3.当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue已满,再判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。
4.如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。
以上是execute/submit方法的源码实现逻辑,可以看出方法内部做了更加精细的判断。在每个执行分支处都判断了当前线程池的状态ctl,主要原因在于execute方法中没有用到重量级锁,ctl虽然可以保证本身变化的原子性,但是不能保证方法内部的代码块的原子性,这里的多次判断能尽量避免并发带来的影响。这里还需要着重指出创建线程方法addWorker中是否可以创建线程的条件:当线程池的状态达到了SHUTDOWN或者之上的状态时候,只有一种情况还需要继续添加线程,那就是线程池已经SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务,这里还可以继续添加线程的初衷是加快执行等待队列中的任务,尽快让线程池关闭。
从runWorker方法可以看出,当getTask方法中获取不到执行任务的时候就会执行线程退出的操作
从下图可以看出返回null值的位置。
结论:非核心线程延迟死亡的条件为线程池处于运行状态&&当前线程数大于最大核心线程数&&获取任务时间超过keepAliveTime。
结论:通过阻塞队列take()方法让线程一直等待,使得Worker的run()方法一直阻塞,直到获取到执行任务,执行完任务后继续阻塞等待,使得线程生命周期一直在RUNNABLE和WAITING状态之间流转,保证核心线程一直存活。
结论:allowCoreThreadTimeOut是ThreadPoolExecutor类的成员属性,将这个变量设置为true就可以跟非核心线程一样,在执行keepAliveTime还未获取到执行任务时就会移出线程池。
从下图线程池添加线程的代码中可以看出,core参数仅用于判断当前线程总数是与最大核心线程数比较还是与最大线程数比较。创建Worker线程类中并没有任何标识来表示一个是否是核心线程还是非核心线程。
结论:核心线程和非核心线程的定义只是线程池设计逻辑上的抽象,在源码实现的角度来看是没有加以区分的。如果从设计逻辑上考虑,非核心线程的创建时机是当前线程池线程总数超过了corePoolSize,且非核心线程超过keepAliveTime后会执行线程退出操作,而核心线程一般会在线程池中一直保活,所以非核心线程也没有机会成为核心线程;从源码角度来看,两者的不同“身份”影响的是当前线程池能达到的并发状态。
线程池并发安全主要有以下三点需要讨论:
1、线程池状态和工作线程数量的并发;
2、Worker容器变更的并发(添加或删除线程);
ThreadPoolExecutor中使用HashSet用于存放线程对象,当添加一个线程时:
当删除一个线程时:
3、任务队列中任务数量的并发(多线程同时操作一个任务)。
以阻塞队列ArrayBlockingQueue为例:
添加对象操作:
取出对象操作:
结论:对于问题1,上面已经说明过了由一个AtomicInteger变量ctl来保证操作的原子性,且通过CAS乐观锁来来保证吞吐量;对于问题2,对于workers集合的操作都是采用ReentrantLock锁来保证的,同一时间只有一个线程可以操作集合对象;对于问题3,这个并发安全性由工作队列BlockingQueue本身来保证线程安全,内部实现同样使用了ReentrantLock锁。以上结论都是基于在写并发程序的时候,尽可能的缩小锁的范围,提高代码的吞吐率为基准的。
创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型的计算场景,如果 CPU 计算和 I/O 操作的耗时是1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适呢?是 3 个线程,如下图所示:CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O 设备的利用率都达到了100%。
通过上面这个例子,我们会发现,对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:我们令 R=I/O 耗时 / CPU 耗时,综合上图,可以这样理解:当线程 A 执行 IO 操作时,另外 R个线程正好执行完各自的 CPU 计算。这样 CPU 的利用率就达到了 100%。故线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
根据前面的一些知识我们已经知道了在ThreadPoolExecutor源码中的工作线程类是Worker(实现了Runnable接口),它是对线程池中工作线程的抽象。那为什么存放工作线程的集合要使用HashSet呢?我们翻看源码就知道ThreadPoolExecutor中对worker集合(HashSet)只有add和remove操作。我们知道Java中HashSet基于HashMap来实现的,只要hash函数和负载因子factor设计的足够好(最大化减少hash碰撞的概率),这些操作对于HashSet来说时间复杂度均为O(1),而且线程数中对线程进入集合的顺序和优先级都没有要求,跟其他集合类相比,在空间复杂度一致的情况下,当然是时间复杂度最好的集合类优先考虑。
1、任务处理策略:线程池优先使用核心线程来处理任务,从任务的添加策略可以看出,先考虑创建核心线程处理,再考虑放到阻塞队列,然后考虑创建非核心线程处理,达到饱和状态时则使用任务拒绝策略
2、线程生命周期管理:通过向阻塞队列取任务的不同操作,能确保线程的存活,take()保证核心线程不死,poll()保证非核心线程存活等待一定时间后释放
3、容量管理:线程池不区分核心线程和非核心线程,线程池是期望达到corePoolSize的并发状态,并允许在不得已情况下超载,达到corePoolSize ~ maximumPoolSize 的并发状态
4、状态管理:线程池状态和线程数量用ctl表示,高3位为状态标识,低29位为当前线程池线程数量。线程池对状态的检测非常苛刻,几乎在所有稍微耗时或影响下一步操作正确性的代码前都校验ctl