前面初步了解了下线程池,包括如何定义一个线程池,线程池的常用构造参数以及jdk的默认实现,今天想结合源码来聊一聊线程池加载任务的顺序、线程池如何回收线程等问题;
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 不合理参数校验 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 任务队列、线程工厂以及拒绝策略的非空校验 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // 成员变量的赋值 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; // 将keepAliveTime转换成纳秒 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
构造方法就进行了一些校验和赋值操作
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // ctl是一个线程安全的Integer,共32位,高3位表示线程池状态,低29位表示活跃线程数的个数; int c = ctl.get(); // 如果活跃线程的个数小于核心线程,则直接创建线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池是RUNNING且当前任务可以成功加入任务队列,在双重check后则尝试加入线程池执行 if (isRunning(c) && workQueue.offer(command)) { /** * 重新获取ctl并再次检查线程池状态是为了防止并发环境下,command(任务)加入队列后线程池的状态可能由RUNNING改为STOP或 * SHUTDOWN。当这种情况发生时需要将任务从任务队列移除并且拒绝 */ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池的活跃线程数为0了,则向池内添加非核心线程任务(由于核心线程允许为0所以会存在活跃线程为0的情况) else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果线程池不是RUNNING状态或者加入队列失败了,这时会尝试直接向池内添加线程任务;如果addWorker失败则直接拒绝 else if (!addWorker(command, false)) reject(command); }
execute方法指明了线程池执行任务顺序的大致方向,做了一些线程状态的校验拒绝工作,但是真正干活的还是下面addWorker方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** 执行任务的线程 */ final Thread thread; /** 线程需要执行的任务,可能为null */ Runnable firstTask; /** 完成的任务数 */ volatile long completedTasks; /** * 构建Worker(addWorker方法里添加worker时会调用) * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 设置AQS的state为-1(即同步状态) setState(-1); // inhibit interrupts until runWorker // 给任务赋值 this.firstTask = firstTask; // 通过线程工厂创建执行任务的线程 this.thread = getThreadFactory().newThread(this); } /** 重写Runnable里的run方法 */ public void run() { runWorker(this); } /** * 是否获取到锁 * 0表示处于未被锁定状态 * 1表示被锁定状态 * @return */ protected boolean isHeldExclusively() { return getState() != 0; } /** * 使用AQS设置线程状态 * @param unused * @return */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 释放锁 * @param unused * @return */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker类既继承了AQS又实现了Runnable接口,所以使其既是一个同步器又是一个线程类
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 获取线程状态 int c = ctl.get(); int rs = runStateOf(c); /*******************************第一部分(校验线程池当前状态是否能新增线程任务)**********************/ /** * rs >= SHUTDOWN 表示STOP、TIDYING、TERMINATED状态 * !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())表示三者只要有一个不成立则条件成立 * 总结: * 1、当rs是非RUNNING状态直接返回false * 2、当rs是非RUNNING状态且firstTask不等于空时返回false; * 3、当rs是非RUNNING状态且workQueue等于空时返回false; * 此处自己琢磨半天总结,如有错误还望大佬指正 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程池的活跃线程数 int wc = workerCountOf(c); // 如果线程池的活跃线程数大于线程池的最大容量CAPACITY(536870911)或者大于等于核心线程数或最大线程数时返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过cas添加活跃线程数量,添加成功则认为当前环境允许添加线程任务所以跳过校验直接进行添加和执行的操作 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // cas失败则继续内存自旋直到成功为止 } } /*******************************第二部分(校验通过进行添加和执行线程的操作)**********************/ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 通过用户指定的线程工厂构建一个活跃线程(没指定走默认工厂) w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 添加活跃线程需要保证线程安全,所以需要加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次获取线程池的状态并检查 int rs = runStateOf(ctl.get()); /** * 当线程池的状态为RUNNING或者是SHUTDOWN但是任务为空则进行添加活跃(工作)线程操作 */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 事先检查线程是否能够启动 throw new IllegalThreadStateException(); // 添加活跃(工作)线程 workers.add(w); // 实时更新线程池的最大活跃线程数 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 如果活跃(工作)线程添加成功则启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果线程启动失败则从活跃线程中移除 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker方法定义了如何将线程任务添加到线程池中去并执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 设置state=0调用tryRelease尝试释放锁,允许中断任务的执行 w.unlock(); // allow interrupts // 下面的循环出现异常则该值为true否则会被值为false boolean completedAbruptly = true; try { // 如果task不为null,或者task为空但是从任务队列里能取到任务则一直循环取出任务并执行 while (task != null || (task = getTask()) != null) { // 加锁; w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 如果线程池正在被终止(处于STOP/TIDYING/TERMINATED状态),则需要确保线程被中断 * 如果线程池为RUNNING/SHUTDOWN状态则清楚中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 设置线程为中断 wt.interrupt(); try { // 无内容的钩子方法留给子类实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 无内容的钩子方法留给子类实现 afterExecute(task, thrown); } } finally { task = null; // 完成任务数加1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { /** * 线程退出 * 如果completedAbruptly=true表示task.run()出了异常,如果如果completedAbruptly=false * 表示while循环拿不到任务了 */ processWorkerExit(w, completedAbruptly); } }
runWorker方法不断循环从队列里取任务并执行,
结合上面的execute方法我们大概可以得出线程池加载任务的顺序:
差不多两天时间的源码读下来越发有种蒙蔽的感觉,线程池涉及的知识点还是比较广泛的,包括AQS、多线程的知识、CAS等等,此篇博文暂且先到这了,其中不完善的地方只能后续慢慢学习补充了,不过从源码中倒是让我学到了一个编码小技巧,
就是当你的代码里有多个变量需要同时进行判断时,并不用让变量一字排开进行或、与的操作,而是对变量进行巧妙的设计,将这些变量定义成整形,相互之间的大小根据业务需求来定,然后可能就一个范围的判断即可替代之前的一堆或与操作了