上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 还留了几个小问题。。建议看这篇文章之前可先看下前面那篇文章。这篇文章我们就来分析下上篇文章的几个小问题
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态 int recheck = ctl.get(); // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队 if (! isRunning(recheck) && remove(command)) reject(command); // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务 // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
「excute」方法主要业务逻辑
上述方法的核心主要就是addWorker方法,
private boolean addWorker(Runnable firstTask, boolean core) { // 前面还有一部分就省略了。。。。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这个方法我们先看看这个「work」类吧
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
「work」类实现了「Runnable」接口,然后run方法里面调用了「runWorker」方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 新增创建 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 判断 task 是否为空,如果不为空直接执行 // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这个runwork方法中会优先取worker绑定的任务,如果创建这个worker的时候没有给worker绑定任务,worker就会从队列里面获取任务来执行,执行完之后worker并不会销毁,而是通过while循环不停的执行getTask方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。while (task != null || (task = getTask()) != null) 这个循环条件只要getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。「那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?」答案就在这个getTask()方法里面
private Runnable getTask() { // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true // 这个标记非常之重要,下面会说到 boolean timedOut = false; for (;;) { // 获取ctl变量值 int c = ctl.get(); int rs = runStateOf(c); // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP // 则操作AQS减少工作线程数量,并且返回null,线程被回收 // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 操作AQS将线程池中的线程数量减一 decrementWorkerCount(); return null; } // 获取线程池中的有效线程数量 int wc = workerCountOf(c); // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的 // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收 // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里说明了两点销毁线程的条件: // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize, // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了 // 以上两点满足其一,都可以触发线程超时回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 尝试用AQS将线程池线程数量减一 if (compareAndDecrementWorkerCount(c)) // 减一成功后返回null,线程被回收 return null; // 否则循环重试 continue; } try { // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果poll超时获取任务超时了, 将timeOut设置为true // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
所以保证线程不被销毁的关键代码就是这一句代码
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
只要timed为false这个workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed的值又是通过allowCoreThreadTimeOut和正在运行的线程数量是否大于coreSize控制的。
http://objcoding.com/2019/04/25/threadpool-running/
最近面试BAT,整理一份面试资料《Java面试BATJ通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构、等等。