池化技术应该是最常用的提高程序性能的手段,包括线程池与数据库连接池,常量池等等
创建与销毁线程是比较耗费时间的,不利于处理Java程序的高并发,因此引入线程池,也就是维护一组可用的线程,如果有任务,就立即将线程池的空闲线程分配给任务,提升性能,如果线程池内所有的线程都是忙状态的话,可以将任务放到任务队列,或者创建一个新的线程并放入线程池,用于处理新的任务
使用线程池的好处
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。(最原始粗放的服务器实现就是请求绑定一个套接字后就新开一个线程去处理,如果请求量巨大的时候,服务器是肯定要崩的,因为缺乏对线程资源的管理)
线程池监控的方法:
SpringBoot 中的 Actuator
组件
通过ThreadPoolExecutor
的自有接口获取线程池信息
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
举个项目中实际使用的例子:
实际使用时要注意的一般规则
使用线程池,而不是创建单个线程
使用ThreadPoolExecutor
构造函数而不是Executors
工具类,下文有具体的解释
显式的定义线程池名字,以业务名字作区分,便于定位问题
可以使用自定义的ThreadFactory
import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 线程工厂,它设置线程名称,有利于我们定位问题。 */ public final class NamingThreadFactory implements ThreadFactory { private final AtomicInteger threadNum = new AtomicInteger(); private final ThreadFactory delegate; private final String name; /** * 创建一个带名字的线程池生产工厂 */ public NamingThreadFactory(ThreadFactory delegate, String name) { this.delegate = delegate; this.name = name; // TODO consider uniquifying this } @Override public Thread newThread(Runnable r) { Thread t = delegate.newThread(r); t.setName(name + " [#" + threadNum.incrementAndGet() + "]"); return t; } }
使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadNamePrefix + "-%d") .setDaemon(true).build(); ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
不同的业务使用不同的线程池
有依赖关系的任务在使用同一个线程池在稍高的并发状况下可能会出现一种逻辑上的死锁,大概来说就是父任务A中调用了子任务B,父任务与子任务共用一个线程池,当父任务占据了全部的核心线程资源,并且子任务仍未执行时,无法退出对核心线程的占用,而与此同时子任务只能堆积在任务队列中,无法获得线程资源,如果又使用了无界队列的话,则会一直堆积直到OOM,具体的参考线程池运用不当的一次线上事故
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用Thread
的 start
方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this
逃逸问题。
补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误,如果用
volatile
修饰的话应该就能解决这个问题了,不知道Executor框架的出现是如何有助于解决此问题的呢?---不是很清楚
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单
实际上在Executor框架中,还有一个线程池ForkJoinPool
可能用的不太多,此类继承AbstractExecutorService
,文章末尾会介绍到
除了说Executor框架,还有一种说法就是JUC框架
,也就是java.util.concurrent
这个包下的所有的多线程相关类的总称
向线程池提交任务有两种方法:
execute
方法
Runnable
的任务,不提供返回值,源码分析见下文(作为线程池的入口一定是要仔细分析的)submit
方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
ThreadPoolExecutor
没有实现自己的submit方法,而是沿用的父类AbstractExecutorService
的实现
接受Runnable
或Callable
的任务,并提供Future
类型返回值
submit
内部将传入的任务统一封装为RunnableFuture
类型,此类型实现了Runnable
与Future
接口,老缝合怪了~
不同之处就在于传入Runnable
的任务得到的Future
可能无法得到有效的返回值,而Callable
的任务能够得到返回结果
提交Runnable
任务时也可以指定一个返回结果,作为Future
的返回结果,但是这个结果显然并不是任务执行完成的返回值,而是程序员事先传入的值,其作用类似于是一个flag值
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
Runnable
任务会被转换为Callable
类型,如果有传入预期的返回值,call
函数中就会原封不动的返回,但是如果没有传入,就是返回null了submit
内部实际上仍然调用了execute
方法
此处补充Callable
与Runnable
的差异:
补充Future接口的作用
isDone
判断任务是否执行完get方法获得
执行结果
shutdown方法
关闭线程池,线程池的状态变为 SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。
执行shutdown
方法后,可以执行awaitTermination
方法,则会等待指定的时间让线程池关闭,若在指定时间内关闭则返回true,否则false
shutdown源码分析
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 上锁 mainLock.lock(); try { // 判断调用者是否有权限shutdown线程池 checkShutdownAccess(); // CAS 设置线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断所有空闲线程 interruptIdleWorkers(); // 钩子函数 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { // 解锁 mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); }
shutdownNow
方法 闭线程池,线程的状态变为 STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的任务列表。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; // 上锁 mainLock.lock(); try { // 判断调用者是否有权限shutdown线程池 checkShutdownAccess(); // CAS 设置线程池状态为STOP advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 从队列中获取剩余的未执行的工作列表 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); // 返回未执行的任务列表 return tasks; }
interruptWorkers
的解析放到了后文中使用如下两个方法来判断线程池是否完全关闭
isTerminated
() 当调用 shutdown()
方法后,并且所有提交的任务完成后返回为 true,或者是执行shutdownNow
后,线程池内的线程全部被中断,工作线程数量为0后返回trueisShutdown()
当调用 shutdown()
方法后返回为 true。Runnable
或者 Callable
接口的任务对象。Runnable
/Callable
接口的 对象直接交给 execute
执行: ExecutorService.execute(Runnable command)
)或者也可以把 Runnable
对象或Callable
对象提交给 submit
执行(ExecutorService.submit(Runnable task)
或 ExecutorService.submit(Callable <T> task)
)。ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(刚刚也提到过了执行 execute()
方法和 submit()
方法的区别,submit()
会返回一个 FutureTask
对象FutureTask.get()
方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。Executors
工具类,可以创建普通的线程池与可以执行定时任务的线程池,但是简单的创建方法意味着封装的程度高,就会导致自由度低,甚至有一些风险固定线程数量的线程池
该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } // 默认任务队列的长度是Integer.MAX_VALUE public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
maximumPoolSize
将是事实上的无效参数,因为不可能存在任务队列满的情况(可以将任务队列视作系统内最大,所以不用设置最大线程数,因为再多的任务也完全可以缓存在队列中)。所以,通过创建 FixedThreadPool
的源码可以看出创建的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
被设置为同一个值。
keepAliveTime
将是一个无效参数(因为不会有核心线程之外的其余线程)(当然,如果空闲核心线程被允许超时回收的话,就是有用的了,即是,如果空闲就会立即展开回收)corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;所以一旦corePoolSize设置不对的话,将会有大量任务干等着,并且性能也没有完全发挥仅有一个线程的线程池
可以视为是固定线程数量线程池的特值情况,即nThreads为1的情况
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { // 使用包装类包装过的,用来保证: return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
动态分配线程数量的线程池
该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);}
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前线程池中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的 poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成。SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,offer方法将返回false,此时 CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;Integer.MAX_VALUE
,这意味着线程池可以不受控的一直接受任务,直到栈空间OOMpublic static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}
虽然队列使用的是有界队列,但是最大线程数量是Integer.MAX_VALUE
,这意味着线程池可以不受控的一直接受任务,直到栈空间OOM
需要注意的是,尽管ScheduledExecutorService是内部调用了父类ThreadPoolExecutord的构造方法,但是其内部实现的核心入口方法不再是ThreadPoolExecutor的execute方法,而是ScheduledThreadPoolExecutor中的delayExecute方法
定时任务的实现依赖于延迟队列DelayedWorkQueue
可以发现执行定时任务可以使用springboot中的@Scheduled注解,也可以使用底层的定时任务线程池,实际上本线程池基本不会用,因为实现定时任务有其他的方案,比如springboot的注解与quartz等等
备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等
一次性的延迟任务 schedule
方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
ScheduledThreadPoolExecutor
重写了execute
与submit
方法,两个方法内部实际上都是简单地调用schedule
方法来实现的以上一次任务开始为基准固定间隔循环执行任务 scheduleAtFixedRate
方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
以上一次任务结束为基准固定间隔循环执行任务 scheduleWithFixedDelay
方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
后两者的区别见图
引出两个问题:
上边说过了,定时任务线程池的核心入口就是上边三种类型的任务方法中都有的一个方法--就是delayedExecute
,但是在说这个关键的入口方法之前,不得说下,调用方法前对于提交的任务的包装,包装这一块设计到的类比较多,先用一张类图大致把握
首先包装为ScheduledFutureTask
// 用于包装schedule(Runnable)提交的任务// result为null,ns是纳秒为单位的,要触发执行任务的系统时间ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement();}// 包装scheduleWithFixedDelay和scheduleAtFixedRate提交的任务// result 为null// ns是纳秒为单位的,下一次要触发执行任务的系统时间// period是以纳秒为单位的任务循环周期ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement();}// 包装schedule(Callable)提交的任务// ns是纳秒为单位的,要触发执行任务的系统时间ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement();}// 关键的run方法public void run() { // 首先判断是不是周期性执行的任务 boolean periodic = isPeriodic(); // 判断当前的线程池能否执行定时任务,如果不能则取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) // 如果不是周期性任务,也就是一次性的定时任务的话,直接执行提交的任务 ScheduledFutureTask.super.run(); // 如果是周期性执行的任务,首先执行提交的任务,并将任务的状态重置为初始化状态,以备下一次执行 else if (ScheduledFutureTask.super.runAndReset()) { // 执行完毕后计算下一次执行的时间 setNextRunTime(); // 重新提交当前的任务到延时队列中,用于下一个周期的执行 reExecutePeriodic(outerTask); }}// 计算下一次要执行任务的时间// time表示下一次执行任务的时间,period是用来计算time的周期时间private void setNextRunTime() { long p = period; if (p > 0) // scheduleAtFixedRate // 在第一次执行完任务后,下一次要执行的时间就是完全按照周期来执行,不管到底什么时候执行完的(也就是now),之后的每次执行都是如此 time += p; else // scheduleWithFixedDelay // 第一次执行完任务后,下一次要执行的时间是以当前时间为基准计算的,也就是上一次完成任务的时间为基准计算的,之后的每次执行都是如此 time = triggerTime(-p);}// 用于在延迟队列中按照下一次触发的顺序进行排序public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; // 触发时间一致的,按照提交的顺序来 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}// 计算从当前时刻到下次执行任务还有多长时间public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS);}
scheduleWithFixedDelay
与scheduleAtFixedRate
在实现时的区别就在于此次包装过程中,前者传入的周期是unit.toNanos(-delay)
而后者是unit.toNanos(perioid)
setNextRunTime
方法中,详见方法注释period
之外还有一个区别就是outerTask
reExecutePeriodic
方法getDelay
方法最主要的应用就是在延时队列的take
poll
这两个获取任务的方法中,起到了控制获取任务的时间的作用
getDelay
方法来控制获取任务的时延--这两个特性是直观上的延迟任务线程池起作用的关键其次包装为RunnableScheduleFuture
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task;}protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task;}
RunnableScheduledFuture
,但是没有看懂为什么要用这样的一个方法类型提升定时任务线程池的入口方法delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) { // 1. 判断线程池是不是shutdown状态,如果是执行拒绝策略 if (isShutdown()) reject(task); else { // 2. 首先就是向DelayedWorkQueue中添加任务 super.getQueue().add(task); // 3. 不管是一般的线程池还是执行定时任务的线程池,都会在向队列中添加完任务后执行re-check if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 4. 如果通过了recheck,执行此方法 // 确保线程池内有线程运行 ensurePrestart(); }}void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 对于Executors创建的线程池来说,核心线程数量为0,所以会保证有非核心线程执行 if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false);}
SHUTDOWN
的话,直接向队列中添加任务,而没有直接让线程去执行任务的场景从addWorker
开始,后续的就是标准的线程池的线程管理与任务获取的流程了,也就是说定时任务线程池与一般线程池的主要区别在于任务调度部分,而连接任务管理与线程管理的通道--延时队列也需要大致了解下
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { // 任务调度时提交任务的方法就是add方法 public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 按照排序规则,选择合适的位置插入到队列中 siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } // 按照排序规则,选择合适的位置插入到队列中 private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; // 按照RunnableScheduledFuture的time属性进行排序 if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); } // getTask中,核心线程取任务(无超时时间) // 如果当前不能获取,就阻塞等待 public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { // 调用getDelay方法得到需要延时等待的时间 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } // getTask中,非核心线程取任务或则核心线程获取任务(允许超时回收) public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } }
DelayedWorkQueue
的内部存储是RunnableScheduledFuture
类型的数组在关闭线程池章节中,查看源码实际上会发现线程池有许多状态:
/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
将注释翻译下来就是对着几个线程池状态的具体描述:
其中五个状态:
RUNNING:接收新的任务,处理队列中的任务;
SHUTDOWN:不接收新的任务,但处理队列中的任务;
STOP:不接收新的任务,不处理队列中的任务,中断正在执行的任务;
TIDYING:所有任务都终止,有效线程数为0, 线程过度到TIDYING时会调用terminated钩子方法;
TERMINATED:terminated()方法执行完毕后进入该状态;
状态之间的转换:
RUNNING -> SHUTDOWN:调用shutdown方法;
(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法;
SHUTDOWN -> TIDYING:当线程池和任务队列都为空(队列中没有未执行的任务了,并且所有线程都完成了工作处于赋闲状态);
STOP -> TIDYING:当线程池中工作线程数量为0(其实就是变为stop状态,对所有正在执行任务的线程执行中断,也不再处理队列中未处理的任务,一旦中断全部完成,所有工作线程数量就为0了,直接进入tidying状态,也不管队列中的任务了);
TIDYING -> TERMINATED:当terminated方法执行完毕;
状态转换示意图
线程池状态是由命名为ctl的AtomicIntegr的成员变量持有的(共32位),包含以下两个信息:
线程池状态-最高3位
线程池中线程数量-低29位
// 初始化线程池状态-RUNNING 0工作线程private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 设置位数 高3位与低29位分别表示线程池状态与线程池工作线程数量private static final int COUNT_BITS = Integer.SIZE - 3;// 线程的最大数量大概是5亿多private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS; // 111private static final int SHUTDOWN = 0 << COUNT_BITS; // 000private static final int STOP = 1 << COUNT_BITS; // 001private static final int TIDYING = 2 << COUNT_BITS; // 010private static final int TERMINATED = 3 << COUNT_BITS; // 011// Packing and unpacking ctl// 根据ctl获取线程池状态private static int runStateOf(int c) { return c & ~CAPACITY; }// 根据ctl获取线程池中工作线程数量private static int workerCountOf(int c) { return c & CAPACITY; }// 使用runstate与workercount组装ctl,初始状态下rs 为RUNNING wc为0private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) { return c < s;}private static boolean runStateAtLeast(int c, int s) { return c >= s;}// 注意大小比较关系private static boolean isRunning(int c) { return c < SHUTDOWN;}
生产者消费者模型
,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收ThreadPoolExecutor
构造方法(针对参数最多的学习)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor 3 个最重要的参数:
allowCoreThreadTimeOut
,如果设置为true,则在超过keepAliveTime
之后,空闲的核心线程也会被回收
corePoolSize
个线程,也是按需求来的,最先分配任务的corePoolSize
自动成为核心线程execute
方法提交的Runnable
任务,而不是其他的什么复杂的结构ThreadPoolExecutor其他常见参数:
getTask
函数中调用的阻塞队列的poll函数的超时设置饱和策略(对应的是任务拒绝模块)
定义:饱和就是当任务队列满了,并且线程池当前同时运行的线程数量已经达到设定的最大值时的状态,更准确地定义应该是任务拒绝策略,而不仅仅是饱和策略,因为线程池饱和的时候会执行拒绝,线程池状态不是running状态时,也要对新提交的任务执行拒绝策略
任务的拒绝是通过reject函数完成的, 默认提供4个拒绝策略,当然也可以实现自己的拒绝策略
final void reject(Runnable command) { handler.rejectedExecution(command, this);}
ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理,此policy也是使用Executors工具类创建线程池以及我们不指定饱和策略使用ThreadPoolExecutor构造函数时的默认的饱和策略
ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程(一般是主线程)中运行(run)被拒绝的任务,如果执行程序(线程池)已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能(因为Main线程去处理新提交的任务去了,就无法处理新的请求了)。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略
ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。(直接丢弃掉,甚至不会抛出异常)
ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求(所谓的抛弃最早的未处理的任务请求,就是抛弃下一个待处理的任务,处于头部的任务),丢弃后再次尝试提交新的任务
对于以上几种饱和策略的理解补充:
DiscardPolicy
策略)判断线程是不是shutdown状态,如果是会直接忽略新的任务任务队列(线程安全的阻塞队列)
BlockingQueue<Runnable>
,但是这个是一个接口类,真正可以使用的实现类有如下几种(具体的描述看自己总结的线程安全的容器这个文章):
Integr.MAX_VALUE
,使用工具类创建线程池时,newFixedThreadPool与SingleThreadExecutor都是使用的此类型的队列线程工厂
线程工厂
,默认情况下的线程池创建线程的过程都是其内部的DefaultThreadFactory
,但是如果要用自定义的方式创建线程,以实现对于线程池创建的线程的监控与控制的话,就需要用到这个线程工厂的参数任务调度是整个线程池的入口,是整个线程池的核心所在,而这个任务调度对应的实际上就是execute
方法
execute
作为任务调度方法的大致运作流程是根据线程池的运行状态,工作线程的数量与运行策略来决定新提交的任务的三种可能的去向:
线程池中最重要的方法一定是任务的提交执行方法,又由于submit内部实际调用了execute方法,所以直接查看ThreadPoolExecutor的execute方法
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 获取当前线程池内正在运行的线程数量private static int workerCountOf(int c) { return c & CAPACITY;}// 任务队列private final BlockingQueue<Runnable> workQueue;public void execute(Runnable command) { // 首先肯定是检查的任务的有效性,如果为null就要报空指针异常 if (command == null) throw new NullPointerException(); /* \* Proceed in 3 steps: * \* 1. If fewer than corePoolSize threads are running, try to \* start a new thread with the given command as its first \* task. The call to addWorker atomically checks runState and \* workerCount, and so prevents false alarms that would add \* threads when it shouldn't, by returning false. * \* 2. If a task can be successfully queued, then we still need \* to double-check whether we should have added a thread \* (because existing ones died since last checking) or that \* the pool shut down since entry into this method. So we \* recheck state and if necessary roll back the enqueuing if \* stopped, or start a new thread if there are none. * \* 3. If we cannot queue task, then we try to add a new \* thread. If it fails, we know we are shut down or saturated \* and so reject the task. */ // 检查完提交的任务的有效性之后就要执行如上英文注释的三个步骤的处理了 // 获得线程池的状态与当前运行的线程的数量的记录(ThreadPoolExecutor类中定义了五种线程池状态) // ctl更像是一个线程池的运行时上下文的状态维护变量 int c = ctl.get(); // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // addWorker失败,重新获得线程池状态,以进行下一步判断 c = ctl.get(); } // 2.如果当前之行的线程数量大于等于 corePoolSize 或者addWorker失败(失败的原因可能是有效线程的数量已经大于corePoolSize,所以需要缓存任务)后就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会将任务加入到任务队列中,并且判断是否能加入到任务队列中 if (isRunning(c) && workQueue.offer(command)) { // 成功将任务添加到任务队列中 // 再次检查线程池中的线程状态,并再次检查线程池中是否有可用的线程,因为自从上一次检查后,可能有线程已经完成了工作或者线程池已经shutdown了 int recheck = ctl.get(); // 如果线程池状态不是running状态,就要从任务队列中移除任务,相当于一次回滚,并执行构造函数中参数指定的饱和策略 if (! isRunning(recheck) && remove(command)) // 执行回滚 reject(command); // 如果当前线程池是running状态并且工作线程为0(之前运行的工作线程被回收了,corePoolSize也有可能被回收)就新创建一个线程,其中worker的初始任务为null // else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 任务队列已经满了,或者线程池已经不是running状态了 因此做最后的尝试,在创建一个新的线程试试,如果创建失败,表示线程池已经满了,因此执行饱和策略 else if (!addWorker(command, false)) reject(command);}
事实上,源代码的注释上就已经说的很清楚了。
reject
函数corePoolSize
说明还可以无脑添加新的线程并使其执行新的任务(这也说明corePoolSize数量的线程也是懒创建的,不是默认就自动维护这么多数量的线程)maximumPoolSize
如果小于,则创建新的线程并用来执行新的任务;如果有效线程数量已经大于maximumPoolSize
,只能去执行拒绝策略了线程池为了方便的掌握线程的状态与维护线程的周期,设计了工作线程对象Worker
,Worker起作用的最关键的就是实现了Runnable
的接口(使得Worker可以作为线程任务被执行,相当于将提交的任务做了包装)和继承了AQS
类(控制线程的中断,维护线程的生命周期)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; Runnable firstTask; // ...}
Worker中最重要的两个成员变量
thread,在Worker的构造函数中被创建,使用的是ThreadPoolExecutor创建时传入的threadFactory去执行线程的构建
firstTask,firstTask用来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况,执行完firstTask
后再去队列中取后续的任务;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建
firstTask
也是为null
的addWorker函数(任务调度时创建核心线程执行任务或者创建非核心线程)
// 全局锁,并发操作必备private final ReentrantLock mainLock = new ReentrantLock();// 跟踪线程池的最大大小,应该只有在持有全局锁mainLock的前提下才访问此属性private int largestPoolSize;// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合private final HashSet<Worker> workers = new HashSet<>();//获取线程池状态private static int runStateOf(int c) { return c & ~CAPACITY; }//判断线程池的状态是否为 Runningprivate static boolean isRunning(int c) { return c < SHUTDOWN;}// 返回true表示创建并启动线程成功// firstaTask就是这个线程的初始任务// 第二个参数为true表示新的worker也就是工作线程是尝试加到corePool中还是maximumPool中private boolean addWorker(Runnable firstTask, boolean core) { // retry标志位常用于多循环嵌套的流程控制 retry: for (;;) { // 获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); // 如果状态>= SHUTDOWN 表示线程池是正在关闭(SHUTDOWN)或者已经关闭(>SHUTDOWN)状态的处置方法: // 如果此时是shutdown状态,并且没有分配初始任务,并且任务队列不为空,则是允许创建新的worker的(此时新创建的worker用来在SHUTDOWN状态下,执行任务队列中剩余的任务),违反任一则是不允许的,比如线程池已经关闭(>SHUTDOWN)或者是SHUTDOWN状态,但是附加了自己的初始任务,是不允许的,只能执行队列中剩余的任务,或者队列已经为空了,不再需要新的worker了,也会创建失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程池中的线程数量 int wc = workerCountOf(c); // 判断当前线程池的数量与哪个值比较,如果已经达到最终的最大值CAPACITY,立即返回false // 否则根据参数判断与哪个值比较,是最小值还是最大值比较,如果目标是创建核心线程,就和corePoolSize比较,如果已经达到设计大小了,就创建失败,如果目标是创建非核心线程,就和maximumPoolSize比较 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 上一步判断后还可以添加worker,就使用CAS增加worker计数 if (compareAndIncrementWorkerCount(c)) // 跳出整个循环 break retry; // CAS 失败 c = ctl.get(); // 如果线程池状态发生了改变, if (runStateOf(c) != rs) // 从头开始执行整个外部的for循环,重新根据线程池状态进行判决 continue retry; // 省略的else表示的就是CAS失败的原因是线程数量被同步修改了,只需要重新执行内部的for循环,根据线程数量进行判决即可 } } // 线程数量成功更新, // 初始化工作线程启动成功标志 boolean workerStarted = false; // 初始化工作线程创建成功标志 boolean workerAdded = false; Worker w = null; try { // 创建worker实例 w = new Worker(firstTask); // 获得worker持有的线程实例 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取线程池状态 int rs = runStateOf(ctl.get()); // < shutdown也就是running状态下执行操作 // 或者是在SHUTDOWN状态下,并且firstTask为空时执行下述操作 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 判断线程是否已经启动了,若已经启动了,抛出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将新创建的worker实例添加到worker集合中,是一个HashSet集合 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 设置worker添加成功标志位 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // 启动worker t.start(); // 设置worker启动成功标志位 workerStarted = true; } } } finally { if (! workerStarted) // worker启动失败,要执行回滚 // 从工作线程集合中移除新添加的Worker实例 // 线程池状态中线程池数量-1 // tryTerminate addWorkerFailed(w); } // 返回worker是否启动的状态 return workerStarted; }
addWorker
源码中,也能看见线程池状态与线程池数量共同决定流程走向的场景,这就是为什么要把这两个状态维护在一个变量中的原因workers
这个集合来维护线程不被回收,当需要回收时,只需要将其引用消除,也就是将Worker
对象消除即可,jvm
会完成后续的回收(详见线程回收小节)runWorker函数,worker开始执行任务
/** 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢?*/// Worker的线程实例是在Worker构造函数中完成初始化的,注意,传入newThread的是this,也就是Worker实例本身被当做一个Runnable任务提交到了线程中,所以调用线程实例的start方法时,就会执行Runnable任务也就是Worker实例的run方法Worker(Runnable firstTask) { setState(-1); // 将AQS计数设置为-1,目的是为了在worker初始化导致runWorker被执行的期间内不被中断 this.firstTask = firstTask; // thread成员变量由默认的或者指定的线程工厂创建,传入的Runnable参数是Worker实例本身 this.thread = getThreadFactory().newThread(this);}// 新线程启动后执行此方法public void run() { runWorker(this);}// addWorker: // 创建worker实例w = new Worker(firstTask);final Thread t = w.thread;// 开启新线程后执行Runnbale参数的run方法,也就是Worker实例的run方法t.start() /** 上述代码是addWorker中的启动线程的代码* 下边的代码是runWorker中的代码*/ // 实际在新线程中执行的方法final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 首先保存初始任务。再清空初始任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // worker 允许中断,此时worker的状态是空闲状态,可以被回收(中断) // 初始化线程异常退出标志位 boolean completedAbruptly = true; try { // 执行初始化任务,或者while循环不断的阻塞以从任务队列获得新的任务,除非getTask返回null,表示已经无法获得任务,需要执行线程回收 while (task != null || (task = getTask()) != null) { // 成功的获取了任务 // 开始执行任务,不允许中断,此时线程是非空闲状态 w.lock(); // 执行recheck 如果线程池已经关闭了,并且当前线程还没有中断,就要执行对当前线程的中断,否则要保证当前线程不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子函数,默认的钩子函数的函数体为空,可以去构造ThreadPoolExecutor的子类去复写此钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子函数与beforeExecute同理 afterExecute(task, thrown); } } finally { task = null; // worker的完成的任务数量加1,注意此时是线程安全的 w.completedTasks++; // 释放锁 w.unlock(); } } // 线程不是因为异常退出的,而是因为无法获得任务导致退出的 completedAbruptly = false; } finally { // while循环已无法通过getTask获得新的任务了,具体的原因参考后续的getTask方法 // 执行线程回收 // 如果是因为task执行时出现异常,completedAbruptly为true,否则为false processWorkerExit(w, completedAbruptly); } }
addWorker
中启动线程的语句t.start()
转到了runWorker
方法呢?这一块比较绕,可以直接看代码的注释
Worker
类实现Runnable
接口的作用,就是将自己包装为可执行任务geTask
方法getTask方法,也就是从队列中获取任务的方法也是很重要的,主要功能是核心线程获取任务或保持阻塞,非核心线程获取任务,或超时返回null,进而线程生命周期结束
private Runnable getTask() { // 获取任务超时的标志位 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); //线程池状态是STOP之后的状态,表示已经不处理任务了,或者是SHUTDOWN时,任务队列已经为空,想处理也没的处理了,就直接返回null,worker会被直接回收 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 工作线程数量-1 decrementWorkerCount(); return null; } int wc = workerCountOf(c); //是否超时控制,allowCoreThreadTimeOut默认false,代表不对核心线程做超时限制,对于超出核心线程的线程需要控制超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //当线程数大于最大线程数,即线程池已经满了,或者需要做超时控制且上次获取任务就已经超时这两个任一的条件下 //且线程数大于1或者队列为空,尝试将线程数减一并返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; // 失败重试,重新根据线程池状态与线程池中线程数量做判断 continue; } try { //当需要超时控制时,在keepAliveTime时间内没有获取到任务的话会设置超时标志位,如果没有超时限制,则调用take获取任务,此时线程是阻塞等待获取任务的 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 阻塞等待获取任务时,整个worker并没有加锁,也就是被认为是空闲状态,可能会被回收掉 timedOut = false; } } }
这里需要补充的就是任务队列的poll与take方法虽然名称差异比较大,但是唯一的差异在于前者是加了超时时间,后者是阻塞
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收
比如在下边这段代码中
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; // 失败重试,重新根据线程池状态与线程池中线程数量做判断 continue;}
decrementWorkerCount
与compareAndDecrementWorkerCount
二者的区别是什么
decrementWorkerCount
内部是在循环调用compareAndDecrementWorkerCount
,换句话说就是,必须要尝试将工作线程数量-1,因为确实不需要此线程了,而compareAndDecrementWorkerCount
直接拿来用,只是尝试一次将工作线程-1,如果失败的话,就要重新根据状态做出可能与之前不同的判断线程回收,processWorkerExit
实际上,关于线程回收,是有两种场景的:1. 主动的线程回收,比如processWorkerExit
函数这样的 2. 探查式的回收,或者说是被动的回收,比如interruptIdleWorkers
主动回收:在runWorker
函数中,如果无法再获得任务,就会跳出执行此线程回收函数,实际上线程池中线程的回收依赖的是JVM的自动回收,线程池要做的只是把线程的引用消除而已
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 一个标志位,是否是因为发生线程异常,所以进入的此方法 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 工作线程数-1 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; // 加锁,因为要进行审计计数了 mainLock.lock(); try { // 统计此worker的完成的任务数目 completedTaskCount += w.completedTasks; // 从线程池中移除此线程 // 执行remove方法完毕后,实际上已经完成了线程的回收,但是由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程-----即所谓的线程状态自适应的过程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试中断、回收空闲线程 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 线程池状态是RUNNING或SHUTDOWN状态并且并非因为异常导致线程关闭的情况下 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果线程够用,就直接返回,否则还要添加一个worker到线程池 if (workerCountOf(c) >= min) return; // replacement not needed } // 如果因为线程异常导致的线程关闭的话,还需要再向线程池中补充一个worker // 或者是此时线程数量不能满足最小要求时也要再添加一个worker addWorker(null, false); } }
被动回收:上述代码中提到的tryTerminate方法,也就是在某worker结束生命周期后判断线程池是否要关闭以及回收空闲线程,以便有效的管理线程池的生命周期,在所有可能导致线程池终止的地方都调用了此方法
final void tryTerminate() { for (;;) { int c = ctl.get(); //当线程池状态是RUNNING(状态正常)或者已经TIDYING或者已经TERMINATED(线程已经快关闭了)或者SHUTDOWN且还有任务没有被执行(SHUTDOWN状态需要处理完队列中的任务),直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量不为0,需要最多终止1个空闲的线程,上边所述的stop状态或者shutdown状态并且queue为空统称为终止流程开始的状态 // 如果线程数不为0,则中断一个阻塞等待任务的空闲的工作线程 if (workerCountOf(c) != 0) { // 尝试中断最多一个阻塞等待任务的空闲的工作线程 interruptIdleWorkers(ONLY_ONE); return; } // 如果当前工作线程数量为0就准备关闭线程池 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 尝试设置线程池状态为tidying状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 如果设置成功调用此钩子方法 terminated(); } finally { // 钩子方法执行完毕后,设置状态为TERMINATED,并设置线程数量为0 ctl.set(ctlOf(TERMINATED, 0)); // 通知调用awaitTermination的主线程,已经进入了TERMINATION状态 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // CAS失败的话,就重新根据状态进行判断 } }
调用了tryTerminate
方法的地方有
addWorkerFailed
processWorkerExit
shutdown
shutdownNow
remove
从队列中移除某任务purge
从队列中移除所有被取消的任务在被动回收过程中,最重要的就是能了解线程的当前状态,在主动回收中尚且可以知道线程是需要回收的,但是被动回收时实际上并不清楚线程池中线程的状态,Worker通过继承AQS,使用AQS来实现不可重入的独占锁(使用AQS的独占模式)这个功能
interruptIdleWorkers,中断空闲线程,使其不再阻塞等待任务
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 判断线程是否已经被中断,是的话就什么都不做 // 若未被中断,还要尝试获取worker的锁,此时如果worker如果已经通过lock方法获取了锁,则因为其不可重入的特性,导致此处为false,即对该worker不做任务处理 // 使用tryLock方法来判断线程池中的线程是否是空闲状态 if (!t.isInterrupted() && w.tryLock()) { try { // 执行线程中断 t.interrupt(); } catch (SecurityException ignore) { } finally { // worker释放锁 w.unlock(); } } // 如果未true,最多只会中断一个空闲线程,也可能一个线程也没有中断 // 如果为false,则会持续遍历全部的worker,并尝试中断所有的空闲的线程 if (onlyOne) break; } } finally { mainLock.unlock(); }}// shutdownNow函数中调用的中断所有工作线程的方法private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) // 粗暴的中断所有线程 w.interruptIfStarted(); } finally { mainLock.unlock(); }}// 定义在worker类中,粗暴的打断所有的已经执行过runWorker方法的workervoid interruptIfStarted() { Thread t; // getState() >= 0即state != -1,也就是不是刚初始化的Worker,而是已经运行runWorker的Worker // 直接在线程层面执行中断,而不管worker此时是否是正在运行的状态(不用去获取worker的锁) if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } }}// shutdown中调用了中断所有空闲线程的方法private void interruptIdleWorkers() { interruptIdleWorkers(false);}
只有在线程池终止流程开始状态下(线程池状态准备转入TIDYING状态,但是还有空闲线程的时候),传入的参数为true,其余调用都是false,也就是中断所有的空闲线程
为什么仅在tryTerminate
方法中,传入的参数为true,也就是最多中断一个空闲的线程呢?(解释的不是很清除,自己不是很懂.......)
当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量还不为0,这说明,有可能是剩余的所有线程都是阻塞,而不能传递shutdown的指令,在线程池终止流程开始的状态下,必须最多使一个阻塞在等待获取任务的线程中断,才能传播shutdown信号,以免所有的线程陷入等待而无法关闭线程池
中断一个空闲线程,也能保证在线程池已经是SHUTDOWN
状态后,新来的Worker也能最终退出
综上,为了保证线程池未来最终能够终止,总是仅中断一个空闲的工作程序就足够了,但是shutdown
会中断所有空闲的工作程序,以便多余的工作程序迅速退出
参考interruptIdleWorkers
的注释
Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted).
Params:
onlyOne – If true, interrupt at most one worker. This is called only from tryTerminate when termination is otherwise enabled but there are still other workers. In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting. Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit. To guarantee eventual termination, it suffices to always interrupt only one idle worker, but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.
从AQS的角度理解Worker的生命周期
Worker
使用的是AQS
的独占模式,使用独占的特性来判断Worker本身是空闲状态(未上锁)还是工作状态(上锁)
//1. worker初始化Worker(Runnable firstTask) { setState(-1); // 设置AQS计数标志为-1,其目的是为了防止初始化到runWorker执行这段时间内被中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);}// 2. runWorker函数final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 至此worker是被以被中断的,也就是进入了空闲状态 w.unlock(); // ... w.lock();}// worker释放锁public void unlock() { release(1); }// 独占模式下释放资源public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}protected boolean tryRelease(int unused) { // 设置独占的线程为null setExclusiveOwnerThread(null); // 设置状态为0 setState(0); return true;}// worker上锁public void lock() { acquire(1); }public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}// worker的实现,其实根本没有用到参数--1 因为规定就是状态1为上锁的状态,所以直接用的常量1protected boolean tryAcquire(int unused) { // 尝试获得worker的锁,必须保证锁状态的旧状态是0,才能设置状态为1 if (compareAndSetState(0, 1)) { // 设置当前线程为独占线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false;}// interruptIdleWorkers函数执行时尝试中断空闲的线程,会通过尝试获取锁的方法来判断线程的状态// 在tryAcquire方法中尝试设置状态为1,但是状态的当前值应是0(即执行unlock()之后),才能设置成功// 这一点也保证了,在Worker初始化设置状态为-1到runWorker的状态设置为0时,是能够保证不被中断的public boolean tryLock() { return tryAcquire(1); }
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
Runnable vs Callable
Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable
接口Executors
可以实现 Runnable
对象和 Callable
对象之间的相互转换。(Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)execute vs submit
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;submit()
方法用于提交需要返回值的任务。线程池会返回一个 Future
类型的对象,通过这个 Future
对象可以判断任务是否执行成功 ,并且可以通过 Future
的 get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完shutdown vs shutdownNow
shutdown()
:关闭线程池,线程池的状态变为 SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow()
:关闭线程池,线程的状态变为 STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 ListisTerminated vs isShutdown
isShutDown
当调用 shutdown()
方法后返回为 true
public boolean isShutdown() { return ! isRunning(ctl.get());}private static boolean isRunning(int c) { return c < SHUTDOWN;}
>= SHUTDOWN
的状态都返回trueisTerminated
当调用 shutdown()
方法后,并且所有提交的任务完成后返回为 true
TIDYING
状态之后才在tryTerminate
方法中进入TERMINATED
状态public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED);}