首先我们要思考一下为什么要使用线程池。Java提供了多线程机制让我们能够同时运行多个任务,就像多个任务由多个人同时执行,而不是一个人依次执行这些任务。但是如果我们每次执行任务都创建一个线程,导致的问题有
因此Java提供了默认的线程池,帮助大家解决这些问题,通过ThreadPoolExecutor,我们可以实现多种线程创建回收策略,以适应不同的需求场景。
线程池可以使用的场景有
要正确使用线程池,我们需要理解其中的几个重要参数,通过ThreadPoolExecutor的构造函数可以看到参数如下
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
了解了这些参数之后我们就可以创建一个线程池并使用了,通过下面的注释先对线程池的使用和机制有一个初步认识,后面会进行详细分析。
// 通过构造函数创建一个核心线程数为1,最大线程数为4,keepAliveTime为1分钟,任务队列是容量为10的数组阻塞队列 // 拒绝策略是CallerRunsPolicy,即会由调用线程来执行任务 ExecutorService executorService = new ThreadPoolExecutor(1, 4, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy()); // 通过submit接口提交一个Callable任务,返回一个字符串,这个任务会先sleep 1秒 Future<String> task1Result = executorService.submit(() -> { Thread.sleep(1000); return "hello"; }); // 再通过submit接口提交一个Callable任务,返回一个字符串,这个任务会先sleep 1秒 Future<String> task2Result = executorService.submit(() -> { Thread.sleep(1000); return "hello"; }); // 通过Future.get获取第一个任务的结果 System.out.println(task1Result.get()); // 通过Future.get获取第二个任务的结果 System.out.println(task2Result.get()); // 线程池使用完了,我们需要关闭,不然JVM不会退出,因为JVM退出的条件是当前没有非daemon状态的线程了 // 调用完shutdown之后再提交的线程会被reject,由拒绝策略处理。线程池会继续处理执行任务队列中的任务 executorService.shutdown(); // 等待线程池结束 if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { // 如果执行时间内还没结束,调用shutdownNow情况任务队列 executorService.shutdownNow(); // 再等一分钟 executorService.awaitTermination(1, TimeUnit.MINUTES); }
了解了ThreadPoolExecutor的使用之后,我们通过一张图看一下线程池的内部大体架构
通过上图可以看到ThreadPoolExecutor比较重要的组件是workerPool(工作线程池)、workQueue(任务队列)、rejectionExecutionHandler(拒绝策略)。
整个ThreadPoolExecutor比较重要的部分有
先看一下线程池的状态,线程池一共有5个状态 每个状态的描述为
状态机如下
首先看一下ThreadPoolExecutor的execute的执行逻辑,流程图如下 核心逻辑为
更多细节可以通过代码中的注释查看。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 先读取control state int c = ctl.get(); // 如果当前的worker数量比corePoolSize核心线程数少 if (workerCountOf(c) < corePoolSize) { // 则尝试添加一个core worker,并且传入command作为firstTask执行 if (addWorker(command, true)) // 添加成功,则直接返回,没有添加成功,说明可能其他execute线程触发了addWorker并争抢成功或者 return; // 再重新判断下状态,这段时间内,线程池状态可能出现变化 c = ctl.get(); } // 这时候说明workerCount已经大于等于corePoolSize了,则需要添加到workQueue中,如果添加不了 // 则需要尝试增加非核心线程worker if (isRunning(c) && workQueue.offer(command)) { // 再检查下线程池是不是关闭了 int recheck = ctl.get(); // 如果已经在关闭,则重workQueue里删掉,并调用reject拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果现在线程池的worker数量为0了,说明核心线程回收了,则添加一个worker来执行,避免出现任务没有worker执行的情况 else if (workerCountOf(recheck) == 0) // 添加一个worker,core参数为false addWorker(null, false); } // 如果线程池关闭或队列已满,都会走到这里 // 1. 关闭的情况在addWorker的时候会失败,交由rejectHandler处理 // 2. 如果队列已满,则会尝试添加非核心线程worker,添加失败交由rejectHandler处理 else if (!addWorker(command, false)) reject(command); }
addWorker方法负责创建Worker对象。 首先看一下Worker类的内容。 Worker类需要区分当前是在等待获取任务还是在执行任务中,Worker通过一个不可重入的锁来实现的,先获取到锁才能执行任务。
这是为了把等待任务和执行任务的interrupt区分开。
为了防止worker处理的task中调用corePoolSize的时候会加锁后去interrupt各个worker,如果能重入,则也会把自己的线程中断状态设置成interrupted导致运行中的任务后面可能被中断。 只有在线程池处于STOP之后的状态,才能够interrupt在运行任务中的worker线程。
看一下Worker类的定义
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 负责执行任务的线程,如果ThreadFactory失败则会是null final Thread thread; // 通过execute方法创建时,可能会传入初始的任务 Runnable firstTask; Worker(Runnable firstTask) { // 防止被其他线程设置interrupt状态影响任务执行 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // worker的执行逻辑,也是线程start之后调用的run方法 public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } // 实现AQS的tryAcquire来实现加锁功能。把state 从0cas到1说明加锁成功 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 实现AQS的tryRelease来实现释放锁功能,释放锁实现为把锁状态改为0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 加锁方法 public void lock() { acquire(1); } // 尝试加锁方法 public boolean tryLock() { return tryAcquire(1); } // 释放锁方法 public void unlock() { release(1); } // 判断是否在加锁中 public boolean isLocked() { return isHeldExclusively(); } // 这个是给shutdownNow方法用的,可以在不获取锁的情况下interrupt线程 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 不断循环重试 for (int c = ctl.get();;) { // SHUTDOWN状态时,如果任务队列已经为空了,则不需要新增worker,并且也不能创建Worker执行firstTask // 如果是STOP状态,则肯定返回false,不创建Worker if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; // 下面这个cas重试抢占添加worker的机会,区分创建核心还是非核心线程 for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 已经超过对应的线程的数量,直接返回 return false; // cas失败的重试 if (compareAndIncrementWorkerCount(c)) break retry; // 如果SHUTDOWN,退出到外层的循环重试 c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建Worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁,works这个HashSet通过mainLock加锁实现线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 再次check线程池状态 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // 启动worker中的线程,开始运行run方法,也就是runWorker t.start(); workerStarted = true; } } } finally { // 如果启动失败,比如出现OOM,回滚workerCount等 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
再看一下runWorker方法,里面包含的就是worker的任务获取、执行逻辑。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 为什么在这之前不能interrupt? w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果有firstTask,直接运行firstTask,否则通过getTask从任务队列中阻塞等待获取新任务,如果从队列中获取的是null说明被interrupt了,worker需要退出 while (task != null || (task = getTask()) != null) { // 执行任务之前,先加锁 w.lock(); // 如果现在在STOP状态,则任务需要interrupt // 如果不是,则可能是因为调整参数导致的interrupt需要调用Thread.interrupted方法清理掉中断状态,避免影响任务执行 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任务执行前的回调方法 beforeExecute(wt, task); try { // 传入的Runnable的run方法被执行。 task.run(); // 任务执行后的回调方法 afterExecute(task, null); } catch (Throwable ex) { // 任务执行后的回调方法 afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; // 执行完当前任务后或任务异常退出后,释放锁 w.unlock(); } } // 如果执行到这里,说明是从while循环条件中退出的 completedAbruptly = false; } finally { // 调用processWorkerExit,如果是异常退出会导致worker线程挂掉,会重新创建一个新的worker代替当前worker processWorkerExit(w, completedAbruptly); } }
getTask方法负责从任务队列中不断获取任务,其中可以看到当线程能回收时,会使用keepAliveTime时间进行阻塞队列poll等待来实现的Worker线程超过一定idle空闲时间后回收功能。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 如果线程池处于SHUTDOWN状态,并且任务队列空了,或者处于STOP状态,则当前worker需要退出,因此会返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // allowCoreThreadTimeOut为true说明核心线程也可以回收,否则只回收非核心线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 从任务阻塞队列中poll任务,可以回收时加上等待时间,否则无限期等待。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // timedOut = true; } catch (InterruptedException retry) { // 线程池关闭或者调整线程池配置的时候会被interrupt, // 关闭的情况下次循环中会退出,调整配置则不会影响worker下次获取task timedOut = false; } } }
已有的task会继续执行,但是不会接受新的task
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 把线程池状态改为SHUTDOWN advanceRunState(SHUTDOWN); // 对各个idle worker也就是没有在执行任务的worker的线程调用interrupt方法 interruptIdleWorkers(); // shutdown 回调 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow方法,和shutdown的区别是会修改状态为STOP,并且把队列中的task drain出来
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态为STOP advanceRunState(STOP); // interrupt所有的worker interruptWorkers(); // 情况队列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
tryTerminate方法会尝试关闭线程池
final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果是RUNNING状态,不需要关闭 if (isRunning(c) || // 如果是TIDYING,说明有其他线程在terminate,当前线程不需要处理,也return runStateAtLeast(c, TIDYING) || // 如果是SHUTDOWN状态,并且任务队列中还有任务,还需要等待任务执行完 (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; // 走到这里,说明是如下两种情况中的一种 // 1. SHUTDOWN状态并且队列已经为空 // 2. STOP状态 // 判断如果还有worker存在,则尝试interrupt if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 走到这里,说明worker和任务队列都空了,则需要修改状态为TIDYING并调用terminated回调方法。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // cas修改状态,保证terminated方法不会重复调用 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 修改成TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); // awaitTerminate会await这个condition termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
队列类型 | 特性 |
---|---|
ArrayBlockingQueue | 基于数组的阻塞队列,有界队列 |
LinkedBlockingQueue | 基于链表的阻塞队列,和ArrayBlockingQueue功能上的区别在于可以创建一个无界队列,例如Executors.newFixedThreadPool(int)创建的线程池的队列就是无界的,这种情况下可能出现队列堆积导致OOM的问题 |
SynchronizedQueue | 同步阻塞队列,这个队列是一个没有长度的队列,可以保证任务最快被处理,减少在队列中的停留时间 |
PriorityBlockingQueue | 带有优先级的阻塞队列 |
DelayQueue | 延迟队列,ScheduledThreadPoolExecutor就是使用这个队列实现定时执行和延迟执行功能的 |
这里介绍一下常见的RejectedExecutionHandler
RejectedExecutionHandler | |
---|---|
AbortPolicy | 拒绝时抛出RejectedExecutionException异常,这是默认的拒绝策略 |
DiscardPolicy | 会忽略任务,提交时没有异常 |
DiscardOldestPolicy | 会从任务队列中移除最早的任务并重试提交当前任务 |
CallerRunsPolicy | 使用提交任务的线程也就是调用execute方法的线程去执行这个任务 |
当然我们也可以自定义自己的拒绝策略,例如实现一个阻塞提交线程的拒绝策略,这个和CallerRunsPolicy一样都能让提交者慢下来,但是不会用提交线程去执行任务。
class BlockSubmitRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); } } }
ThreadPoolExecutor还提供了修改corePoolSize和maximumPoolSize等参数的方法,使得我们可以动态调整线程池的参数。
public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); }
如何修改队列长度呢?我们可以实现一个可变长度的阻塞队列即可,通过在LinkedBlockingQueue基础上增加一个加锁修改capacity的队列比较容易实现,因为LinkedBlockingQueue中capacity只作为一个int字段存储没有像ArrayBlockingQueue那样会影响数组长度。所以我们加锁修改capacity后调用notFull.signalAll即可。