目录
前言
一、状态
二、运行流程分析
1.run
2.get
3.cancel
4.runAndReset
三、ListenableFutureTask
总结
实现了Runnable接口的类能够新建线程运行,Future接口规范了线程的生命周期,Callable接口能够获得方法的返回值。FutureTask实现了Runnable和Future接口,同时有Callable属性,能够实现三者的功能。
FutureTask有NEW,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING和INTERRUPTED七种状态,创建时状态为NEW,结果未赋值时更新为COMPLETING,结果赋值后更新为NORMAL,发生异常后会将状态置为EXCEPTIONAL,调用cancel方法后更新为CANCELLED或者INTERRUPTING状态。
* NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED
public class FutureTask<V> implements RunnableFuture<V> { private static final int NEW = 0; //任务尚未开始或处于执行期间 private static final int COMPLETING = 1; //任务即将执行完成 private static final int NORMAL = 2; //任务执行完毕 private static final int EXCEPTIONAL = 3; //任务执行期间出现未捕获异常 private static final int CANCELLED = 4; //任务被取消 private static final int INTERRUPTING = 5; //任务正在被中断 private static final int INTERRUPTED = 6; //任务已被中断
/** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
run方法负责任务的执行,在该方法中,执行Callable对象的call方法,如果在运行过程中发生异常,会将异常写入outcome中,正常执行完成将运行结果写入outcome中。
//运行完成之后将返回值设置为outcome,可以通过get方法获取,call方法有返回值。FutureTask实现了runniable接口,重写run方法如下, public void run() { if (state != NEW || //如果状态不属于NEW,那么通过CAS将runner变量由null设为当前线程, !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //获取构造时传入的Callable任务对象 Callable<V> c = callable; if (c != null && state == NEW) { //如果状态为NEW V result; boolean ran; try { result = c.call(); //调用Callable的call方法执行任务 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); //设置异常对象,由调用get方法的线程处理这个异常 } if (ran) //如果任务正常结束则设置返回值和state变量 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) //如果处于任务正在中断状态,则等待直到任务处于已中断状态位置 handlePossibleCancellationInterrupt(s); } }
当运行call方法出现异常之后,会调用setException方法,在该方法中,首先尝试将状态更新为COMPLETING,再将异常赋值给outcome,然后将状态更新为EXCEPTIONAL。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将状态更新为将要完成状态 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //将状态更新为发生异常状态 finishCompletion(); } }
方法运行结束之后会调用set方法,在该方法中首先将当前状态更新为COMPLETING,然后将运行结果赋值给outcome,之后将状态更新为NORMAL。
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将状态更新为将要完成状态 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //将状态更新为正常完成状态 finishCompletion(); } }
*///在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待 private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
在对结果进行赋值之后,会调用finishCompletion方法,在该方法中,首先对等待获取结果的线程进行唤醒,然后执行done方法。
*///唤醒因为等待结果而阻塞的线程 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //移除等待线程 for (;;) {//自旋遍历等待线程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒等待线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //结束之后调用,可以进行重写 done(); callable = null; // to reduce footprint }
在get方法中,如果当前状态小于等于COMPLETING状态,即当前Task还在运行中,使用awaitdown方法,当task执行完成之后调用report方法。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
在awaitDone方法中,还是判断当前任务有没有执行完成,执行完成后将状态返回,多次判断后仍然没有执行完成会将当前线程加入等待队列中,并阻塞。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {//如果线程中断 if (Thread.interrupted()) { //获取并清除中断状态 removeWaiter(q);//移除等待WaitNode throw new InterruptedException(); } int s = state;//如果任务执行结束或被取消(中断),方法结束,返回结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //如果任务即将完成,让当前线程让步,让出cpu Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //如果没有入队,将这个WaitNode加入到FutureTask的等待队列尾部 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //如果设置了超时时间 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞当前线程 LockSupport.park(this); } }
在report方法中,如果任务正常执行完成,就返回结果,如果任务被取消,就抛出异常,如果任务执行过程中出现异常,即处于EXCPTIONAL状态,就抛出所发生的异常。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //如果任务正常执行完成直接返回结果 return (V)x; if (s >= CANCELLED) //如果任务被取消或被中断抛出CancellationException(运行时异常的子类) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
cancel方法负责将任务取消,更改线程状态,最后还会唤醒等待结果的线程。
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && //如果任务状态为NEW并且成功通过CAS将state状态由NEW改为INTERRUPTING或CANCELLED(视参数而定) UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); //调用interrupt中断 } finally { // final state //将state状态设为INTERRUPTED(已中断) UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); //激活所有在等待队列中的线程 } return true; }
runAndReset方法会在运行完任务之后,将任务状态重置,但不会对结果进行赋值,在定时任务线程池中有所应用。
*///任务执行完之后会重置stat的状态为NEW protected boolean runAndReset() { //和run方法类似,通过CAS操作将成员变量runner设置为当前线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try {//不会调用set方法设置返回值(outcome成员变量) c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } //这个方法不会修改state变量的值 return ran && s == NEW; }
FutureTask想要获取任务运行的结果,需要等待任务执行完成,如果调用时间太早,会导致调用线程阻塞,影响效率。在有些情况下,我们需要在任务执行完成之后执行相应的操作,同时结果不会影响主业务逻辑,我们希望在任务执行完成之后自动根据结果进行操作,而不是等主线程调用。spring对FutureTask进行了继承,重写了done方法,如果程序正常执行完成,会调用success方法,异常退出,会执行failure方法。
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> { protected void done() { Throwable cause; try { T result = get(); this.callbacks.success(result); return; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; } catch (ExecutionException ex) { cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; } this.callbacks.failure(cause); } }
本文对FutureTask的源码进行了分析,主要分析run,get,cancel和runAndSet方法,同时,对FutureTask的子类ListenableFutureTask进行了分析,其实现了异步回调功能。