int rs = runStateOf(c); // Check if queue empty only if necessary. // 线程池状态 >= SHUTDOWN if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 内层自旋 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 工作中的线程数大于线程池的容量,或者已经大于等于核心线程数,或者大于等于最大线程数 // core为true,表示要创建核心线程,false表示要创建非核心线程 // 为什么大于等核心线程数的时候要返回false,因为要添加到缓冲队列,或者创建非核心线程来执行,不能创建核心线程了 return false; if (compareAndIncrementWorkerCount(c)) // 以CAS的方式尝试把线程数加1 // 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程 // 成功后跳出内层自旋 break retry; // CAS失败,再次获取ctl,检查线程池状态 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 线程池状态被改变了,从外层自旋开始再次执行之前的逻辑 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 可以看到两层自旋 + CAS,仅仅是为了把线程池中的线程数加1,还没有新建线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 把task包装成Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取锁之后,再次检查线程池的状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // 检查线程状态 throw new IllegalThreadStateException(); // 添加到worders workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 维护largestPoolSize变量 largestPoolSize = s; workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } if (workerAdded) { // 添加成功 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 执行worker的线程启动失败 addWorkerFailed(w); } return workerStarted;
}
可以看到`addWorker`方法前一部分,用了外层**自旋**判断线程池的状态,内层**自旋 + CAS**给线程池中的线程数加1。后半部分用了`ReentrantLock`保证创建`Worker`对象,以及启动线程的线程安全。一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法),因为每两次之间,线程池的状态都有可能被改变。 ### runWorker 前文在介绍`Worker`内部类时说过,`Worker`会把自己传递给`ThreadFactory`创建的线程执行,最终执行`Worker`的`run`方法,而`Worker`类的`run`方法只有一行代码:
runWorker(this);
所以接下来看看`runWorker`方法是如何实现了
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许外部中断
w.unlock(); // allow interrupts
// 记录worker是不是异常退出的
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 自旋,如果task不为空,或者能从缓冲队列(阻塞队列)中获取任务就继续执行,不能就一直阻塞
// 加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果线程池正在停止,并且当前线程没有被中断,就中断当前线程
wt.interrupt();
try {
// 钩子函数,处理task执行前的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,处理task执行后的逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 运行到这里,说明worker没有异常退出
completedAbruptly = false;
} finally {
// 自旋操作被打断了,说明线程需要被回收
processWorkerExit(w, completedAbruptly);
}
}
第10行代码中,task为null时,会通过`getTask()`方法从缓冲队列中取任务,因为缓冲队列是阻塞队列,所以如果获取不到任务会一直被阻塞,接下来看看`getTask`方法的内部实现 ### getTask `getTask`用于**阻塞**式的从缓冲队列中获取任务。
private Runnable getTask() {
// 线程是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) { // 自旋 // 获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 线程池终止了,或者线程池停止了,且缓冲队列中没有任务了 // 自旋 + CAS方式减少线程计数 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 根据allowCoreThreadTimeOut参数来判断,要不要给核心线程设置等待超时时间 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 当前线程数大于了maximumPoolSize(因为maximumPoolSize可以动态修改)或者当前线程设置了超时时间且已经超时了 // 且线程数大于1或者缓冲队列为空 // 这个条件的意思就是:当前线程需要被回收 if (compareAndDecrementWorkerCount(c)) // 返回null后,上层runWorker方法中断循环,执行processWorkerExit方法回收线程 return null; continue; } try { // 从阻塞队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 成功获取任务 return r; // 没有获取到任务,超时 timedOut = true; } catch (InterruptedException retry) { // 线程被中断,重试 timedOut = false; } }
}
理解该方法的前提,是要理解**阻塞队列**提供的阻塞式API。 这个方法重点关注两点: * 从缓冲队列取任务时,`poll`非阻塞,`take`阻塞,调用哪个由当前线程需不需要被回收来决定 * 该方法返回null之后,上层方法会回收**当前线程** 除了这几个核心方法之外,往线程池提交任务还有一个方法叫`submit`
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
`submit`方法可以接收线程池返回的结果,也就是`Futrue`对象,可以接收`Runnable`对象和`Callable`对象。 至于`Future`、`FutureTask`、`Runnable`、`Callable`之间的关系此处不再赘述。 至此`ThreadPoolExecutor`的核心方法的源码以及执行逻辑已经讲解完毕,再来看一些非核心方法,了解一下即可 * `public void shutdown()`:关闭线程池,已经提交过的任务还会执行(线程池中未运行完毕的,缓冲队列中的) * `public List<Runnable> shutdownNow()`:停止线程池,试图停止正在执行的任务,暂停缓冲队列中的任务,并且返回 * `public void allowCoreThreadTimeOut(boolean value)`:设置**核心线程**是否允许回收 * `protected void beforeExecute(Thread t, Runnable r)`:钩子函数,处理线程执行任务前的逻辑,这里是**空实现** * `protected void afterExecute(Runnable r, Throwable t)`:钩子函数,处理线程执行任务后的逻辑,这里是**空实现** * `public int getActiveCount()`:返回正在执行任务的线程的**大致数量** * `public long getCompletedTaskCount()`:返回执行完成的任务的**大致数量** 除此之外还需要了解的是,构造方法中的七个参数,除了`BlockingQueue`是不能动态设置外,其余六个参数都可以动态设置,分别调用对于的`setXxx`方法即可,当然也可以通过对于的`getXxx`方法获取对应的信息。 鉴于此,我们再来看一个常见的问题 > Java有几种线程池? JDK(准确的说是`java.util.concurrent.Executors`工具类)提供了四种线程池: * `CachedThreadPool`:缓冲线程池 ``` public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } ``` * `FixedThreadPool`:固定线程数的线程池 ``` public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ``` * `SingleThreadExecutor`:单线程的线程池 ``` public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } } ``` * `ScheduledThreadPool`:可定时调度的线程池 ``` public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super()还是调用ThreadPoolExecutor的构造方法 public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } } ``` 仔细看下这四种线程池,最终都调用了ThreadPoolExecutor的构造方法,只是传递的参数有所不同。 * `CachedThreadPool`和`ScheculedThreadPool`设置的最大线程数都是`Integer.MAX_VALUE`,可能线程数过多而产生OOM * `SingleThreadExecutor`和`FixedThreadPool`使用的都是无界队列,最大元素个数为`Integer.MAX_VALUE`,可能缓冲队列中堆积的任务过多,而产生OOM 这两点正是**阿里巴巴代码规范**里禁止使用这四种线程池的原因。 想要使用线程池,必须通过`ThreadPoolExecutor`的方法来**创建线程池**。