将Runable和Callable包装成RunnableFuture对象,调用子类实现的execute(RunableFuture)防范】
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); // 钩子函数,由子类实现具体的调度逻辑 execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
// 用高3位表示线程池的状态, 总共5个状态,3位正好可以表示 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // CAS 设置保证执行状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止 tryTerminate(); }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // CAS 设置保证执行状态为STOP advanceRunState(STOP); // 中断所有的执行线程 interruptWorkers(); // 取出所有未执行的任务返回,给业务线程机会是否处理该线程 tasks = drainQueue(); } finally { mainLock.unlock(); } // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止 tryTerminate(); return tasks; }
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 线程池为Terminated才会正常结束 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; // 利用条件变量,类似wait notify,但是这里支持等待时长 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 只有一个线程会执行到下面的代码,其他线程在上面return了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // tidying状态才能终止,线程数为0,队列是空 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 钩子方法 terminated(); } finally { // 状态标记为终止 ctl.set(ctlOf(TERMINATED, 0)); // 条件标量通知等待结束的线程可以放行了,之所以是signall->多个线程等待都会被放行 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
执行下面的操作:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 小于核心线程数,需要启动新任务 if (workerCountOf(c) < corePoolSize) { // 会自动检测runState和workerCount, // 如果添加失败,如果返回false,要么线程数超过核心线程数,要么runState已经变更,执行后续的处理 if (addWorker(command, true)) return; c = ctl.get(); } // 如果是运行状态说明,添加失败的原因是超过核心线线程数,先添加到队列中 if (isRunning(c) && workQueue.offer(command)) { // 多线程场景,double-check int recheck = ctl.get(); // 不在运行态,直接回滚 if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); // 运行态,运行线程数等于0 else if (workerCountOf(recheck) == 0) // 第一个任务为null, 会从队列中取1个任务作为第一个任务执行 addWorker(null, false); } // 添加到队列失败,创建非核心线程,执行任务 else if (!addWorker(command, false)) // 执行拒绝策略 reject(command); }
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 创建线程的runable对象传的是this,即worker对象,t.start会执行worker的run方法,调用runWorker(this) this.thread = getThreadFactory().newThread(this); } /** * firstTask: 第一个需要执行的任务 * core: 是否创建核心线程数 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 如果线程池runState 为Stop状态,直接返回false // shutDown状态,会执行队列中的任务,但不会执行新的任务,所以不需要创建新线程 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 大于核心线程数还是大雨最大线程数,取决于core参数,超过了就不能创建新线程了,返回false,外层调用者会执行拒绝策略 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // CAS保证线程安全+1,即workerCount+1 if (compareAndIncrementWorkerCount(c)) // break跳出循环,执行循环后面的内容,continue不会跳出循环 break retry; // CAS失败,说明workerCount已经被其他线程变更, 重新取值判断 c = ctl.get(); // Re-read ctl // 运行状态到了SHUT DOWN以后(STOP, TIDYing)重新跳出到外层循环 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // 其他情况运行状态不变,只需要重新执行下内层循环判断数量 } } // 工作线程数已经+1, 如果真正启动失败,会回滚 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 向容器中添加工作对象 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动的时候,会执行Worker对象的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 添加失败,这里会回滚线程数 addWorkerFailed(w); } return workerStarted; }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // keepAliveTime, getTask会调用阻塞队列的poll方法一直到取到为止, // 如果超时(keepAliveTime)未取到,会抛出中断异常,processWorkerExit会执行,删除工作线程,由GC回收 while (task != null || (task = getTask()) != null) { w.lock(); // shutdown的时候需要清除中断标志位,因为当前线程还要执行线程中的任务 // shutDownNow, 需要确保处于中断状态, 所以在任务中调用中断后,下一次任务会清除中断标志位 // 中断的时候,join,wait, notify等都可以响应中断标志位 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //子类实现钩子函数 beforeExecute(wt, task); try { // 执行任务 task.run(); // 子类实现 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } // 有异常的情况下,该值位true completedAbruptly = false; } finally { // 有异常的情况下,该值位true, 会减少workerCount, // 以便能重新创建线程,所有抛出异常并不会导致没有线程可用 processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 保证至少一个线程运行 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }