上一篇文章,我们总体介绍了下线程池相关的概念,知道线程池的整体流程,接下来,一起读一下线程池的源码吗,了解一下线程池是如何运行的,源码的入口是java.util.concurrent.ThreadPoolExecutor#execute,同学可以跟着IDEA一起跟下代码
查看UML图我们可以看到,最顶层为Executor,类的作用如下
线程池运行模型,其实就是生产者消费者模型,用户提交任务,任务提供到线程池中供线程池进行分配,线程池会根据线程池的运行状态交给不同线程执行,线程充当消费者(在线程池中以Work形式表现)
日常开发过程中,我们会使用java.util.concurrent.ThreadPoolExecutor#execute方法,我们跟着源码一起看下当我们提交一个任务的时候,线程池都干了啥,源码阅读主流程execute()->addWorker()->run()->getTask()->proxcessWorkExit()
流程图
源码
线程池的核心源码,如流程图所示首先会判断是否小于核心线程池,小于就调用addWorker()创建一个线程并执行存入的任务—>添加队列–>创建非核心->拒绝策略
public void execute(Runnable command) { //1:任务非空校验 if (command == null) throw new NullPointerException(); //2:获取当前线程数 int c = ctl.get(); //线程的数据小于核心线程池,可以直接提交任务并且创建核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //aaddWork true就行核心线程,false为非核心线程的创建 return; //创建任务执行失败了,可能线程池执行了shutDown()函数或者核心线程处理已满,执行下面流程重新获取线程 c = ctl.get(); } //3:判断线程池的状态&&等待队列中添加任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //当前线程池不是运行状态,将任务从阻塞移除并且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //线程池中线程的数量为0 else if (workerCountOf(recheck) == 0) //如果之前的线程已经被销毁完,阻塞队列中会有任务,这里会创建创建一个新的线程回阻塞队列中拿任务 //新线程是核心线程还是非核心现在我那么确定(个人感觉是非核心线程),欢迎去公众号程序员fly留言 addWorker(null, false); } //4:队列也满了,核心线程池也满了,那就创建非核心的线程池吧 else if (!addWorker(command, false)) //队列满了无法再将任务加入队列中,创建普通的线程去执行任务也失败,有可能线程池已经关闭或者线程池已经饱和,拒觉 reject(command); }
上面源码中,我们看到经过有一系列校验之后,主要是调用addWork方法,addWork内部会创建一个Work,封装传入的线程以及相应的任务,一系列校验之后然后调用run()方法,run()方法又被Work重写了。主干流程execute()->addWorker()->run()->getTask()->proxcessWorkExit()从队列中不断获取任务
作用是尝试去线程池登记一个线程,两个for循环是条件筛选,下面的才是启动一个线程
流程图
源码
//firstTask:当前添加的线程需要执行的首个任务 //core:是否标记当前执行的线程是否是核心线程还是普通线程 private boolean addWorker(Runnable firstTask, boolean core) { retry:// 标记点 //这两个for循环是怕是否能够启动一个线程 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. //rs >SHUTDOWN意思线程状态处于stop,tidying,terminated状态。这样只会存在一种情况还需要继续添加线程,这种情况线程 //池已经SHUTDOWN,但是队列中的任务还在排队,firstTask=null代表着不接受新任务,这里继续添加工作线程的原因是消费阻塞队 //列中任务 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //7 if条件判断的这几种情况 wc >= CAPACITY 判断工作线程数量是否超过可表示的最大容量CAPACITY wc >= (core ? corePoolSize : maximumPoolSize))//如果添加的是核心线程池,看是否超过核心线程的容 //量,超过就拒绝。如果添加的是普通线程,看超过线程池的最大容量,超过这两个边界就不接受 return false; //符合上面筛选条件,线程条数+1 成功之后就跳出循环 if (compareAndIncrementWorkerCount(c)) break retry;// break后面跟着retry的原因是再次校验线程池状态是否发生了变化,发生变化继续双层for循环,没变化 的话执行for循环下面的 c = ctl.get(); //线程+1失败,如果这个时候线程池不是running,重新进行外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //这里是真正要启动一个线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //开启可重入锁,独占 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池的运行状态 int rs = runStateOf(ctl.get()); //rs<shutdown:线程池处于ruunning状态 //rs == SHUTDOWN && firstTask == null)线程池关了,没有接受新的任务,这里需要处理阻塞队列中的线程 if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { //isAlive() 方法的作用是判断当前的线程是否处于活动状态。活动状态就是线程处于正在运行或准备开始运行 //你这个线程是活的,还创建啥玩意,扔出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将Work实例添加到线程池中,我们后续源码解读中可以看到Work implements Runnable,它就是个线程 workers.add(w); int s = workers.size(); //当前工作线程数超过线程池出现的最大线程数,刷新最大值 if (s > largestPoolSize) largestPoolSize = s; //添加线程成功,标志位为true workerAdded = true; } } finally { mainLock.unlock(); } //线程创建成功,启动创建线程 if (workerAdded) { //工作线程创建成功,启动该线程,线程处理任务 //start我们可以查看源码可以看到其实是调用Work类中的run方法,run方法其实调用runWork() 好东西在这,读者重点关注 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //失败的话进行回滚前面的操作,将线程池中移除Work实例,通过CAS将工作线程数量workerCount-1 addWorkerFailed(w); } return workerStarted; }
addWork中run方法是调用的Work里面的,Work的run()又调runWork()方法,execute()和addWork()都是做预备队 ,runWork()是线程池真正处理任务的方法。
public void run() { runWorker(this); }
流程图
源码
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //先执行第一次传进来的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts 可以中断,Work在初始化的时候AQS的时候设置为-1.不能中断 //线程退出的原因,true是出现异常了,false是线程正常退出 //写线程池的人为啥不false出现异常true是正常 boolean completedAbruptly = true; try { //线程池复用的核心代码实现 //(task != null || (task = getTask()) != null) task为非空,或者去非阻塞队列取任务不为空,一直循环 //比如当这个线程执行完当前的任务,就会去阻塞队列中取任务来执行,完成线程的复用 while (task != null || (task = getTask()) != null) { w.lock();//加锁 //判断线程池状态为>STOP,表示状态为STOP和TERMINATED状态||线程已经被中断了,检查线程池状态>STOP //线程不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //线程池都中断了,线程老老实实的中断的了,自己想的,不一样意见还请关注程序员fly发表看法,其实我也是菜鸡 wt.interrupt(); try { //执行前 beforeExecute(wt, task); Throwable thrown = null; try { //调用业务中重写的run方法执行业务逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行后 调用钩子函数 afterExecute(task, thrown); } } finally { task = null; //完成任务数+1 w.completedTasks++; w.unlock(); } } //while循环结束 completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
从上面源码中我们可以看到线程池真正处理流程其实就是一个While循环,通过调用getTask不断的从队列中获取任务,实现线程和任务之间的通信,这不就关联到一块了,While循环结束之后,Work会主动消除自己在线程池的引用,获取不到任务的时候就主动调用processWorkerExit()方法,核心线程池可以无限等待,非核心线程池需要限时获取任务,着急的小伙伴直接跳到processWorkerExit()即可查看到这个逻辑,我们先看下While里面的getTask()干了啥
getTask方法从阻塞队列中按照先进先出获取执行的任务,这里会进行多次判断,判断目的是为了控制线程池的数量,线程池不需要多余的线程的时候,就会返回null,为null的话,就会进入回收程序
流程图
源码
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary 仅在必要时检查队列是否为空 //rs >= SHUTDOWN 线程池状态为STOP,tidying,terminated||队列为空 //rs >= STOP || workQueue.isEmpty())线程池状态为STOP, tidying,terminated||队里为空 //这里两种情况下满足1个,工作线程减1 , if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //工作线程减1 decrementWorkerCount(); return null; } int wc = workerCountOf(c);//获取线程池中线程数 //1:allowCoreThreadTimeOut 默认为false, wc > corePoolSize,线程数没有到达最大,就会新建线程执行任务 //1:处 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //工作线程数>最大线程数||timed==true&&(核心工作线程对象) 创建新线程来加速队列中任务 if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //开始获取Task try { //time为true,如果允许超时,则调用poll方法,在指定超时时间后即返回 //2:time为false,通过take()阻塞拉取(非核心线程阻塞),会阻塞到有下一个有效任务时候再返回 //2处 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask()重点在上面的1,2处,存在非核心线程,time=true,就会调用队列的poll()方法限时获取任务,指定时间获取不到就gg。如果是核心线程,会调用take()方法,一直到获取获取任务之后才会被唤醒,通过While获取下一个任务。接下来我们看下线程回收
processWorkerExit销毁的时候会销毁线程的引用,维护核心线程池的数量不销毁
流程图
源码
private void processWorkerExit(Worker w, boolean completedAbruptly) { //completedAbruptly:true时候说明线程池发生异常,正在工作线程数-1 //completedAbruptly:false,说明工作线程无任务执行, if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //追加已经完成的线程数 completedTaskCount += w.completedTasks; //从hashSet中缓慢移除该worker workers.remove(w); } finally { mainLock.unlock(); } //是否结束线程池 tryTerminate(); //是否增加工作线程 int c = ctl.get(); if (runStateLessThan(c, STOP)) { //线程池状态处于runing或者shutdown状态 if (!completedAbruptly) { //正常结束的, //allowCoreThreadTimeOut:默认为false,核心线程即使在空闲时也保持活动状态。 //如果为true,则核心线程使用 keepAliveTime超时等待工作。 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //在等待队列等于NUll之前,线程值都会维护核心线程池数量 if (min == 0 && ! workQueue.isEmpty()) min = 1; //先看这里,当前运行的线程>核心线程数量,不添加Work线程 if (workerCountOf(c) >= min) return; // replacement not needed } //线程是因为异常终止的,添加Work addWorker(null, false); } }
整个源码都读完啦,看完之后问下自己这几个问题吧
自己的情况是这样的,本人学历是双非本科,enen,嗯嗯就是突然感觉或许可以把自己变的更好一点,也希望想找一些小伙伴一起学习,所以把自己平常自己学习到的一些东西分享出来,能帮助各位更好