通过前文 线程的创建与使用 ,我们对线程有了一定了解。线程的创建与销毁需要依赖操作系统,其代价是比较高昂的,频繁地创建与销毁线程对系统性能影响较大。
出于线程管理的需要,线程池应运而生。线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。使用线程池的好处在于:
在Java中,线程池是由Executor框架实现的,Executor是最顶层的接口定义,其子类和实现类包括:ExecutorService,ScheduledExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool等。
类图如下:
Executors中提供了一系列静态方法创建线程池:
观察源码发现,这些静态方法其实还是调用了 ThreadPoolExecutor 这个类,下面是一部分源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
前面提到的java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,线程池的诸多功能都是在这个类中实现的,值得我们好好研究一番。
一开始,我们先用一个通俗的例子来帮助我们理解线程池的运行机制:
假如我们建了一家加工厂,那么第一个问题来了:工人编制规模是多少?(这个数字就对应线程池的 corePoolSize,即线程池核心线程数量)
接下来,假定工厂满编20个工人,那么第二个问题就是工人怎么招?(也就是线程池的线程初始化策略)根据老板的豪气程度无非有三种方式:
不管怎么样,工人总是要来的,那么工人的劳动合同该如何签呢?是终身劳动合同,厂子不倒,人员不散?还是铁打的营盘流水的兵?
接下来,工厂的生产线也建设起来了(在线程池里称之为 HashSet<Worker> workers),工人进入这条生产线进行生产。
完事俱备,工厂开始接受订单。为了更好地调度生产,一个调度员入职,英文名字execute。在拿到订单后,调度员execute按既定流程开始工作:
工厂当然不能稀里糊涂地一门心思生产,毕竟工厂业绩老板是很关心的,于是生产总量要被统计(即 completedTaskCount,线程池已完成的任务数)。工厂最多有过多少工人也被顺手统计了(即 largestPoolSize,线程池出现过的最大线程数)
当有一天,工厂因为某原因关闭时,会有两种情形:
根据上面的例子,提炼出线程池执行任务的流程图,当然这个流程图比较简略。
更多教程请访问码农之家在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); }
下面解释下一下构造器中各个参数的含义:
1. 天:TimeUnit.DAYS; 2. 小时:TimeUnit.HOURS; 3. 分钟:TimeUnit.MINUTES; 4. 秒:TimeUnit.SECONDS; 5. 毫秒:TimeUnit.MILLISECONDS; 6. 微秒:TimeUnit.MICROSECONDS; 7. 纳秒:TimeUnit.NANOSECONDS;
1. **ArrayBlockingQueue**:基于数组结构的有界阻塞队列,按FIFO排序任务; 2. **LinkedBlockingQuene**:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene; 3. **SynchronousQuene**:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene; 4. **priorityBlockingQuene**:具有优先级的无界阻塞队列;
1. **ThreadPoolExecutor.AbortPolicy**:丢弃任务并抛出RejectedExecutionException异常。 2. **ThreadPoolExecutor.DiscardPolicy**:也是丢弃任务,但是不抛出异常。 3. **ThreadPoolExecutor.DiscardOldestPolicy**:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) 4. **ThreadPoolExecutor.CallerRunsPolicy**:由调用线程处理该任务
ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系:
接下来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
// 任务缓存队列,用来存放等待执行的任务 private final BlockingQueue<Runnable> workQueue; //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁 private final ReentrantLock mainLock = new ReentrantLock(); //用来存放工作集 private final HashSet<Worker> workers = new HashSet<Worker>(); //线程存活时间 private volatile long keepAliveTime; //是否允许为核心线程设置存活时间 private volatile boolean allowCoreThreadTimeOut; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int corePoolSize; //线程池最大能容忍的线程数 private volatile int maximumPoolSize; //线程池中当前的线程数 private volatile int poolSize; //任务拒绝策略 private volatile RejectedExecutionHandler handler; //线程工厂,用来创建线程 private volatile ThreadFactory threadFactory; //用来记录线程池中曾经出现过的最大线程数 private int largestPoolSize; //用来记录已经执行完毕的任务个数 private long completedTaskCount;
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个 volatile 变量用来保证线程之间的可见性。
下面的几个static final变量表示runState可能的几个取值,有以下几个状态:
诸多函数的调用关系图:
1. 成功调用了addWorker()(剩下的执行任务要交给后续方法去完成了) 2. 未能调用addWorker并拒绝本次任务,返回null。参数说明:
1. Runnable command:待执行的任务执行流程:
1. 通过 ctl.get() 得到线程池的当前线程数,如果线程数小于corePoolSize,则调用 **addWorker(commond,true)** 方法创建新的线程执行任务,否则执行步骤2; 2. 步骤1失败,说明已经无法再创建新线程,那么考虑将任务放入阻塞队列,等待执行完任务的线程来处理。基于此,判断线程池是否处于Running状态(只有Running状态的线程池可以接受新任务),如果任务添加到任务队列成功则进入步骤3,失败则进入步骤4; 3. 来到这一步需要说明任务已经加入任务队列,这时要二次校验线程池的状态,会有以下情形: - 线程池不再是Running状态了,需要将任务从任务队列中移除,如果移除成功则**拒绝本次任务**。 - 线程池是Running状态,则判断线程池工作线程是否为0,是则调用 **addWorker(commond,true)** 添加一个没有初始任务的线程(这个线程将去获取已经加入任务队列的本次任务并执行),否则进入步骤4; - 线程池不是Running状态,但从任务队列移除任务失败(可能已被某线程获取?),进入步骤4; 4. 将线程池扩容至maximumPoolSize并调用 **addWorker(commond,false)** 方法创建新的线程执行任务,失败则**拒绝本次任务**。流程图:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * 在将来的某个时候执行给定的任务。任务可以在新线程中执行,也可以在现有的池线程中执行。 * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 如果由于此执行器已关闭或已达到其容量而无法提交任务以供执行,则由当前的{@code RejectedExecutionHandler}处理该任务。 * @param command the task to execute 待执行的任务命令 * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务启动新线程。 * 对addWorker的调用以原子方式检查runState和workerCount, * 因此可以通过返回false来防止在不应该添加线程时出现错误警报。 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果一个任务可以成功排队,那么我们仍然需要仔细检查两点,其一,我们是否应该添加一个线程 * (因为自从上次检查至今,一些存在的线程已经死亡),其二,线程池状态此时已改变成非运行态。因此,我们重新检查状态,如果检查不通过,则移除已经入列的任务,如果检查通过且线程池线程数为0,则启动新线程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 3. 如果无法将任务加入任务队列,则将线程池扩容到极限容量并尝试创建一个新线程, * 如果失败则拒绝任务。 */ int c = ctl.get(); // 步骤1:判断线程池当前线程数是否小于线程池大小 if (workerCountOf(c) < corePoolSize) { // 增加一个工作线程并添加任务,成功则返回,否则进行步骤2 // true代表使用coreSize作为边界约束,否则使用maximumPoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 步骤2:不满足workerCountOf(c) < corePoolSize或addWorker失败,进入步骤2 // 校验线程池是否是Running状态且任务是否成功放入workQueue(阻塞队列) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次校验,如果线程池非Running且从任务队列中移除任务成功,则拒绝该任务 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池工作线程数量为0,则新建一个空任务的线程 else if (workerCountOf(recheck) == 0) // 如果线程池不是Running状态,是加入不进去的 addWorker(null, false); } // 步骤3:如果线程池不是Running状态或任务入列失败,尝试扩容maxPoolSize后再次addWorker,失败则拒绝任务 else if (!addWorker(command, false)) reject(command); }
1. Runnable firstTask:新创建的线程应该首先运行的任务(如果没有,则为空)。 2. boolean core:该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用corePollSize 作为约束值,否则使用maximumPoolSize。执行流程:
1. 外层循环判断线程池的状态是否可以新增工作线程。这层校验基于下面两个原则: - 线程池为Running状态时,既可以接受新任务也可以处理任务 - 线程池为关闭状态时只能新增空任务的工作线程(worker)处理任务队列(workQueue)中的任务不能接受新任务 2. 内层循环向线程池添加工作线程并返回是否添加成功的结果。 - 首先校验线程数是否已经超限制,是则返回false,否则进入下一步 - 通过CAS使工作线程数+1,成功则进入步骤3,失败则再次校验线程池是否是运行状态,是则继续内层循环,不是则返回外层循环 3. 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程 - 首先获取锁之后,再次校验线程池状态(具体校验规则见代码注解),通过则进入下一步,未通过则添加线程失败 - 线程池状态校验通过后,再检查线程是否已经启动,是则抛出异常,否则尝试将线程加入线程池 - 检查线程是否启动成功,成功则返回true,失败则进入 addWorkerFailed 方法流程图:
private boolean addWorker(Runnable firstTask, boolean core) { // 外层循环:判断线程池状态 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.线程池为非Running状态(Running状态则既可以新增核心线程也可以接受任务) * 2.线程为shutdown状态且firstTask为空且队列不为空 * 3.满足条件1且条件2不满足,则返回false * 4.条件2解读:线程池为shutdown状态时且任务队列不为空时,可以新增空任务的线程来处理队列中的任务 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 内层循环:线程池添加核心线程并返回是否添加成功的结果 for (;;) { int wc = workerCountOf(c); // 校验线程池已有线程数量是否超限: // 1.线程池最大上限CAPACITY // 2.corePoolSize或maximumPoolSize(取决于入参core) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过CAS操作使工作线程数+1,跳出外层循环 if (compareAndIncrementWorkerCount(c)) break retry; // 线程+1失败,重读ctl c = ctl.get(); // Re-read ctl // 如果此时线程池状态不再是running,则重新进行外层循环 if (runStateOf(c) != rs) continue retry; // 其他 CAS 失败是因为工作线程数量改变了,继续内层循环尝试CAS对线程数+1 // else CAS failed due to workerCount change; retry inner loop } } /** * 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程 */ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 下面代码需要加锁:线程池主锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 持锁期间重新检查,线程工厂创建线程失败或获取锁之前关闭的情况发生时,退出 int c = ctl.get(); int rs = runStateOf(c); // 再次检验线程池是否是running状态或线程池shutdown但线程任务为空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程已经启动,则抛出非法线程状态异常 // 为什么会存在这种状态呢?未解决 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //加入线程池 int s = workers.size(); // 如果当前工作线程数超过线程池曾经出现过的最大线程数,刷新后者值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); // 释放锁 } if (workerAdded) { // 工作线程添加成功,启动该线程 t.start(); workerStarted = true; } } } finally { //线程启动失败,则进入addWorkerFailed if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker类是内部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。
Worker类主要维护正在运行任务的线程的中断控制状态,以及其他次要的记录。这个类适时地继承了AbstractQueuedSynchronizer类,以简化获取和释放锁(该锁作用于每个任务执行代码)的过程。这样可以防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。
我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁,因为我们不希望工作任务在调用setCorePoolSize之类的池控制方法时能够重新获取锁。另外,为了在线程真正开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ // 通过构造函数初始化, Worker(Runnable firstTask) { //设置AQS的同步状态 // state:锁状态,-1为初始值,0为unlock状态,1为lock状态 setState(-1); // inhibit interrupts until runWorker 在调用runWorker前,禁止中断 this.firstTask = firstTask; // 线程工厂创建一个线程 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); //runWorker()是ThreadPoolExecutor的方法 } // Lock methods // The value 0 represents the unlocked state. 0代表“没被锁定”状态 // The value 1 represents the locked state. 1代表“锁定”状态 protected boolean isHeldExclusively() { return getState() != 0; } /** * 尝试获取锁的方法 * 重写AQS的tryAcquire(),AQS本来就是让子类来实现的 */ protected boolean tryAcquire(int unused) { // 判断原值为0,且重置为1,所以state为-1时,锁无法获取。 // 每次都是0->1,保证了锁的不可重入性 if (compareAndSetState(0, 1)) { // 设置exclusiveOwnerThread=当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 * 不是state-1,而是置为0 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中断(如果运行) * shutdownNow时会循环对worker线程执行 * 且不需要获取worker锁,即使在worker运行时也可以中断 */ void interruptIfStarted() { Thread t; //如果state>=0、t!=null、且t没有被中断 //new Worker()时state==-1,说明不能中断 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
1. Worker w:封装的Worker,携带了工作线程的诸多要素,包括 **Runnable**(待处理任务)、lock(锁)、completedTasks(记录线程池已完成任务数)执行流程:
1. 判断当前任务或者从任务队列中获取的任务是否不为空,都为空则进入步骤2,否则进入步骤3 2. 任务为空,则将completedAbruptly置为false(即线程不是突然终止),并执行processWorkerExit(w,completedAbruptly) 方法进入线程退出程序 3. 任务不为空,则进入循环,并加锁 4. 判断是否为线程添加中断标识,以下两个条件满足其一则添加中断标识:线程池状态>=STOP,即STOP或TERMINATED一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED)执行前置方法 beforeExecute(wt, task)(该方法为空方法,由子类实现)后执行task.run() 方法执行任务(执行不成功抛出相应异常)执行后置方法 afterExecute(task, thrown)(该方法为空方法,由子类实现)后将线程池已完成的任务数+1,并释放锁。再次进行循环条件判断。流程图:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // allow interrupts // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断 w.unlock(); // 线程退出的原因,true是任务导致,false是线程正常退出 boolean completedAbruptly = true; try { // 当前任务和从任务队列中获取的任务都为空,方停止循环 while (task != null || (task = getTask()) != null) { //上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识 * 条件1:线程池状态>=STOP,即STOP或TERMINATED * 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED), * 条件1与条件2任意满意一个,且wt不是中断状态,则中断wt,否则进入下一步 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); //当前线程调用interrupt()中断 try { //执行前(空方法,由子类重写实现) beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行后(空方法,由子类重写实现) afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; //完成任务数+1 w.unlock(); //释放锁 } } // completedAbruptly = false; } finally { //处理worker的退出 processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { // 最新一次poll是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 条件1:线程池状态SHUTDOWN、STOP、TERMINATED状态 * 条件2:线程池STOP、TERMINATED状态或workQueue为空 * 条件1与条件2同时为true,则workerCount-1,并且返回null * 注:条件2是考虑到SHUTDOWN状态的线程池不会接受任务,但仍会处理任务 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /** * 下列两个条件满足任意一个,则给当前正在尝试获取任务的工作线程设置阻塞时间限制(超时会被销毁?不太确定这点),否则线程可以一直保持活跃状态 * 1.allowCoreThreadTimeOut:当前线程是否以keepAliveTime为超时时限等待任务 * 2.当前线程数量已经超越了核心线程数 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程 // 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时) // 条件2:wc > 1或任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 移除工作线程,成功则返回null,不成功则进入下轮循环 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 执行到这里,说明已经经过前面重重校验,开始真正获取task了 try { // 如果工作线程阻塞时间受限,则使用poll(),否则使用take() // poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // r不为空,则返回该Runnable if (r != null) return r; // 没能获取到Runable,则将最近获取任务是否超时设置为true timedOut = true; } catch (InterruptedException retry) { // 响应中断,进入下一次循环前将最近获取任务超时状态置为false timedOut = false; } } }
1. Worker w:要结束的工作线程。 2. boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true。执行流程:
1. 如果 completedAbruptly 为 true,即工作线程因为异常突然死亡,则执行工作线程-1操作。 2. 主线程获取锁后,线程池已经完成的任务数追加 w(当前工作线程) 完成的任务数,并从worker的set集合中移除当前worker。 3. 根据线程池状态进行判断是否执行tryTerminate()结束线程池。 4. 是否需要增加工作线程,如果线程池还没有完全终止,仍需要保持一定数量的线程。 1. 如果当前线程是突然终止的,调用addWorker()创建工作线程 2. 当前线程不是突然终止,但当前工作线程数量小于线程池需要维护的线程数量,则创建工作线程。需要维护的线程数量为corePoolSize(取决于成员变量 allowCoreThreadTimeOut是否为 false)或1。源码详读:
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 1.工作线程-1操作 * 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1 * 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 2.从线程set集合中移除工作线程,该过程需要加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将该worker已完成的任务数追加到线程池已完成的任务数 completedTaskCount += w.completedTasks; // HashSet<Worker>中移除该worker workers.remove(w); } finally { mainLock.unlock(); } // 3.根据线程池状态进行判断是否结束线程池 tryTerminate(); /** * 4.是否需要增加工作线程 * 线程池状态是running 或 shutdown * 如果当前线程是突然终止的,addWorker() * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker() * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程 */ int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意传进去的参数是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意传进去的参数是null ++n; return n; }
注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take();
即等待任务队列中有任务。
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。workQueue的类型为BlockingQueue,通常可以取下面三种类型:
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略: