// 表示当前任务状态 private volatile int state; // state的一个常量:当前任务尚未执行 private static final int NEW = 0; // state的一个常量:当前任务正在结束,尚未完全结束,一种临界状态 private static final int COMPLETING = 1; // state的一个常量:当前任务正常结束 private static final int NORMAL = 2; // state的一个常量:当前任务执行过程中出现了异常,内部封装的Callable的run方法向上抛出了异常 private static final int EXCEPTIONAL = 3; // state的一个常量:当前任务呗取消 private static final int CANCELLED = 4; // state的一个常量:当前任务中断中 private static final int INTERRUPTING = 5; // state的一个常量:当前任务已中断 private static final int INTERRUPTED = 6; // 线程池的submit方法提交的Runnable或Callable对象,Runnable是通过适配器模式变成Callable类型的 private Callable<V> callable; /* * 1. 正常情况:任务正常执行结束,保存执行结果,Callable的返回值 * 2. 异常情况:Callable向上抛出异常,保存异常 */ private Object outcome; // 当前任务被线程执行期间,保存当前执行任务的线程引用 private volatile Thread runner; // 头插队列,当有多个线程等待结果的时候,进入这个队列挂起 private volatile WaitNode waiters;
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
public FutureTask(Callable<V> callable) { // 判空判断 if (callable == null) throw new NullPointerException(); // 初始化成员对象 this.callable = callable; // 初始化任务状态为NEW this.state = NEW; }
初始化属性callable与state
protected void set(V v) { // 使用CAS设置状态从NEW->COMPLETING,如果失败说明任务被cancel了 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置outcome为callable返回的值 outcome = v; // 设置状态从COMPLETING->NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } }
protected void setException(Throwable t) { // 使用CAS设置状态NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 让outcome保存异常 outcome = t; // 设置状态为EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion(); } }
public void run() { /* * 1. state != NEW——条件成立说明当前任务已经被执行过了 * 2. !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())——使用CAS设置runner属性条件成立说明有竞争 * */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 引用属性callable Callable<V> c = callable; /* * 1. c != null——基本都成立,主要为了防止空指针 * 2. state == NEW——为了确保这个任务没有被 cancel掉 */ if (c != null && state == NEW) { // 结果引用 V result; /* * 1. true——表示callable.run执行成功 * 2. false——表示callable.run 抛出了异常执行失败 */ boolean ran; try { // 调用程序员实现的callable代码 result = c.call(); // 设置执行成功,因为如果出现了异常走不到这里 ran = true; } catch (Throwable ex) { // 返回结果是null,因为执行出现了异常 result = null; // 执行失败 ran = false; setException(ex); } // 如果执行成功 if (ran) // 设置结果到outcome属性 set(result); } } finally { // 逻辑上类似于释放锁,也是清空执行当前任务的线程 runner = null; // int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
线程池执行当前任务的入口方法
public V get() throws InterruptedException, ExecutionException { // 引用属性state int s = state; // 如果任务还没执行完或还未执行 if (s <= COMPLETING) // 调用当前方法的线程去阻塞 s = awaitDone(false, 0L); return report(s); }
阻塞获取任务执行结果
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 如果不带超时功能那么就是0 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前线程封装的WaitNode对象 WaitNode q = null; // 是否入队 boolean queued = false; // 自选 for (;;) { // 检查中断状态,如果中断过 if (Thread.interrupted()) { // 从队列中移除当前节点 removeWaiter(q); // 抛出中断异常 throw new InterruptedException(); } // 引用属性state int s = state; // 条件成立:说明当前任务已经有结果了 if (s > COMPLETING) { // 条件成立:说明已经为当前线程创建过node了,此时需要将Node的thread指向空 if (q != null) q.thread = null; // 返回状态 return s; } // 条件成立:说明任务快完成了,调用yield方法让出一些CPU else if (s == COMPLETING) Thread.yield(); // 条件成立:说明是第一次进入自选 else if (q == null) // 创建WaitNode q = new WaitNode(); // 条件成立:第二次自选会来到这里,表示当前线程创建了WaitNode对象但是还没有入队 else if (!queued) // 当前节点的next指向waiter,然后使用CAS修改waiters为当前节点,总结来说就是入队 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 条件成立:只有要加超时功能才会走这里 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 挂起当前线程 LockSupport.park(this); } }
阻塞调用线程
private void removeWaiter(WaitNode node) { // 判断当前节点是不是null,如果不是说明当前节点根本没入队不用出对了 if (node != null) { // 属性为null node.thread = null; // 外部循环的命名 retry: // 自选 for (;;) { /* * 作用:遍历队列 * 1. pred——上一个节点 * 2. q——遍历时的节点,从头结点开始 */ for (WaitNode pred = null, q = waiters, s; q != null; q = s) { // 获取当前结点的下一个节点 s = q.next; // 如果不是要被出队的节点 if (q.thread != null) // 保存当前节点为pred pred = q; // 如果出队的节点不是头结点 else if (pred != null) { // 出队节点的上一个节点的next指向s pred.next = s; if (pred.thread == null) continue retry; } // 如果出队的节点是头结点,使用CAS让waiters指向头结点的下一个节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
出队操作,如果有其他线程因为任务完成或抛出异常导致p.thread=null,也会帮助它们出队
private V report(int s) throws ExecutionException { // 引用属性outcome Object x = outcome; // 条件成立:当前任务正常执行结束 if (s == NORMAL) // 返回结果 return (V)x; // 条件成立:任务被取消 if (s >= CANCELLED) throw new CancellationException(); // 条件成立:任务执行有bug throw new ExecutionException((Throwable)x); }
根据状态返回结果
private void finishCompletion() { // q引用属性waiters for (WaitNode q; (q = waiters) != null;) { // 使用CAS设置 waiters为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自选 for (;;) { // 拿到遍历的当前节点的thread属性 Thread t = q.thread; // 条件成立:说明当前线程不是null if (t != null) { // 唤醒 q.thread = null; LockSupport.unpark(t); } // 获得当前节点的下一个节点 WaitNode next = q.next; // 条件成立:当前节点是最后一个节点 if (next == null) break; // help GC q.next = null; // 推进遍历 q = next; } break; } } // 扩展方法,里面啥都没有 done(); // help GC callable = null; }
走到这里说明任务已经结束,这个方法负责唤醒等待结果所有线程,并清空队列
public boolean cancel(boolean mayInterruptIfRunning) { /* * 如果状态不是NEW或者CAS State失败都会直接返回 * 1. state == NEW——如果状态是NEW * 2. UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)——根据入参把state改为INTERRUPTING或CANCELLED */ if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果是true说明需要给运行中的任务发一个中断信号 if (mayInterruptIfRunning) { try { Thread t = runner; // 条件成立:说明当前任务中在运行 if (t != null) // 给runner线程一个中断 t.interrupt(); } finally { // 修改状态为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒所有get方法阻塞的线程 finishCompletion(); } return true; }
取消任务或中断任务 并唤醒所有get方法阻塞的线程