还是先来小demo
@Test public void testFuture() throws ExecutionException, InterruptedException { FutureTask<String> test = new FutureTask<>(() -> { logMessage("要睡了"); TimeUnit.SECONDS.sleep(5); logMessage("睡醒了"); return "1"; }); Thread thread = new Thread(test); thread.start(); logMessage(test.get()); } public static void logMessage(Object o) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o); }
还是从他的继承关系来看,Future接口是什么?Runnable我知道,但是Future是什么?
/* * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method * Future表示异步执行。提供了几个方法,用于,检查计算是否完成、等待计算完成以及获取计算结果。 * 主要记住,他就是用来用来执行异步任务的。并且提供了一些有用的api */ public interface Future<V> { /* 1. 尝试取消任务的执行,如果这个任务还没有执行,直接取消,这个任务就不会再运行了 2. 如果任务已经完成,或者已经被取消了,或者说因为一些其他的原因导致不能取消,返回false 3. 如果任务已经开始运行,mayInterruptIfRunning参数表示是否要中断执行这个任务的线程来停止任务 在这个方法调用之后,后面调用isDone的方法就会返回true,如果这方法返回true,后序调用isCancelled方法也会返回true */ boolean cancel(boolean mayInterruptIfRunning); /* 判断这个任务是否被取消掉 */ boolean isCancelled(); /** 如果这个任务完成,或者在完成的时候被正常的终止,或者异常,或者取消,这些情况下,都会返回true */ boolean isDone(); /** 等待结果,一直等 */ V get() throws InterruptedException, ExecutionException; /** 等待结果,增加超时时间 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future表示异步获取结果,规定了一些方法,还得看具体的实现是怎么做的。所以,这里直接看FutureTask
// 表示当前这个异步计算的状态, private volatile int state; // 下面都是一些不同状态的码值。 private static final int NEW = 0; //新建 private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常 private static final int EXCEPTIONAL = 3;// 异常 private static final int CANCELLED = 4;// 取消 private static final int INTERRUPTING = 5;// 中断中 private static final int INTERRUPTED = 6;// 中断 // 等会要运行的callable private Callable<V> callable; //结果存放的地方,并且不是volatile修饰的。是为了保护读和写的状态 // 在有结果的时候,这就是结果,如果异常的话,这就是异常。所以在Future里面异常的话,得通过get方法获取到 // 这让我想到了线程池,submit方法也会返回一个future,线程池异常异常了会怎么办? private Object outcome; // 运行的线程 private volatile Thread runner; // 等待的节点队列, private volatile WaitNode waiters;
这些属性下面的代码和文章里面会不间断的引用。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
这种构造方法肯定是见过的最多的,只是一个赋值操作,将传递进来的callable赋值,并且将state
变为NEW。
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
这我可没用过,之前也没在意过,这里确实很特殊。有一个resutl,并且将这个两个参数包装成Executors.callable(runnable, result);
,那就看看这个方法。
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
运行原来的task,返回的是result,那是不是意味着,传递一个Runnable
,对result一顿操作,最后又将这个result返回来。比如下面的这个样子
@Test public void testFuture() throws ExecutionException, InterruptedException { HashMap<String, String> map = new HashMap<>(); FutureTask<Map> test = new FutureTask<>(() -> { logMessage("要睡了"); map.put("a","a"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } logMessage("睡醒了"); },map); Thread thread = new Thread(test); thread.start(); logMessage(test.get()); }
传递一个空map进去,然后一顿操作,返回返回。么的一点点的问题。
要知道,他还是继承了Runnable接口,并且传递给了Thread类,那么Runnable肯定要看run方法,并且Future肯定实在run方法里面做手脚的。
public void run() { //如果当前的状态不是NEw或者替换cas将 runner变为当前线程失败,直接返回,不需要运行。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 拿到callable Callable<V> c = callable; // 准备开始运行了 if (c != null && state == NEW) { // 接受结果 V result; boolean ran; try { // 运行方法, result = c.call(); ran = true; } catch (Throwable ex) { //如果出现了任何的异常,将result变为null,将ran变为false。 result = null; ran = false; // setException(ex); } // 如果除了异常,就不会走到这个方法, // 正常的话肯定是这个方法,这里肯定是设置结果给outcome。 if (ran) set(result); } } finally { // 这里用的就不是cas操作了,直接变为null,防止再次调用。 runner = null; // 在runner变为null,之后,重新读取state的值。 // 这里为啥要再次读取,state是volatile修饰的,难道说这里是为了防止发生指令重排序。 int s = state; // 如果说状态是 INTERRUPTING之前的状态,(new,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING)只要不是中断。 if (s >= INTERRUPTING) // 在看看这个方法是干了什么事情? handlePossibleCancellationInterrupt(s); } }
// 出现异常了之后,就调用这个方法,这个操作也很清晰了, protected void setException(Throwable t) { // 使用cas操作将当前的future的状态变为完成中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 将异常信息赋值给outcome outcome = t; //将state变为exception UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 再看看这个干了什么事情 finishCompletion(); } }
//这个方法的作用没有看懂。 // 看看源码上面的注释写的把 // 这个方法是为了确保调用cancel(true)时候产生的中断,只有在task run或者task runAndReset的时候才能投递。 private void handlePossibleCancellationInterrupt(int s) { // 如果说当前future的状态是INTERRUPTING,就说明,当前的future正在中断中,那么直接就等等。 if (s == INTERRUPTING) while (state == INTERRUPTING) // 当前线程放弃执行,让出cpu的使用权。 Thread.yield(); // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
// 还是同样的操作,state先变为COMPLETING,最后变为NORMAL,设置值,之后调用finishCompletion方法,唤醒等待。 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
// 看看这个方法里面做了什么事情。 // 看了这个操作是拿到了等待的所有线程,并且唤醒移除他们,并且调用done方法。 private void finishCompletion() { // assert state > COMPLETING; // 拿到等待队列,从头开始,循环遍历,,unpark,,移除是从头开始的, for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 移除,help gc, q.next = null; // unlink to help gc q = next; } break; } } //这个方法是留给拓展的 done(); callable = null; // to reduce footprint }
总结:
Future里面本质还是一个Runnable,在run方法里面,会将当前线程设置为执行future的线程。调用callable的call方法,在出现异常的时候会将state先变为完成中,然后将异常设置为结果,之后将state变为异常。并且唤醒等待的线程(从头开始循环,唤醒),从等待队列里面移除掉(从头开始移除)。如果没有异常,state先是完成中,设置值,最后变为normal(正常)。
上面执行之后,在最后,将 runner 变为null,并且判断如果状态是中断中(INTERRUPTING)当前线程等待,等等。
问题:
在finally方法里面,为什么要再次判断获取state。
这个我着实没有看懂,是不是为了防止指令重排序。
在finally方法里面,为什么要在这里面将 runner 变为null
因为 finally 在方法的最后执行,所以,不管成功还是失败,这里都会将 runner 变为null。能保证任务执行一次之后,就将当前执行的线程赋值为null
handlePossibleCancellationInterrupt 方法是在什么样具体的场景下面才会出现。
// get方法,如果state<= COMPLETING,说明还没有运行结束。就要等待,如果不是,就report public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
// 这个方法就是将当前线程添加到等待队列里面去,注意,WaitNode是用 volatile修饰的。我猜肯定cas的操作。 // 需要注意的是,这个方法返回的state,真正获取值的操作是通过s的不同的值,在report方法里面获取的。 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 死循环 for (;;) { // 当前的线程是否中断了。interrupted的方法是属于Thread类的,并且这个方法会清楚掉thread intercept states的状态。 // 如果当前线程中断过, if (Thread.interrupted()) { // 移除掉,并且抛出异常 removeWaiter(q); throw new InterruptedException(); } // future已经完成了。直接返回s int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 就再等等,放弃cpu的执行权。 Thread.yield(); //如果q是null,这就是第一次循环吗,到这里说明他 stats <= COMPLETING,说明任务还没有执行或者才new出来, // 构建等待节点,连接队列,入队, // 第一次循环来,q肯定是null,所以构建等待队列 // 第一次循环就结束了, // 第二次循环来的时候就不会q就!=null了 else if (q == null) //如果说第一次循环构建了q,然后发送中断了,将他传递到removeWaiter方法里面,会造成无效的链表的查找。 q = new WaitNode(); // 第二次循环来了就开始入队了。 else if (!queued) //这里采用的头插入法。 // 如果插入成功,queued就是true,并且第三次循环来的时候就不会走到这里了。 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 在入队之后,如果有time,就unpark住当前线程,将当前线程阻塞在当前对象上面, else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } //堵塞在当前对象上面 LockSupport.parkNanos(this, nanos); } else // 一直堵塞,等待unpark。 LockSupport.park(this); } }
//通过传递进来的状态判断,如果当前的状态是 NORMAL, private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; // 如果取消,中断中,中断,抛异常 if (s >= CANCELLED) throw new CancellationException(); // 如果不是,就抛出ExecutionException,在ExecutionException里面包装原始异常 throw new ExecutionException((Throwable)x); }
// 这个前提是当前当前的节点已经构建好了, 如果node为null那就确实没有意义 /** 这个前提是,当前线程已经中断了。 1. 一开始先把node的thread变为null。在awaitDone方法拿到WaitNode的节点的引用了,先把他变为null,这个WaitNode的thread肯定是volatile修饰的,所以,这里能直接变为null。 2. 一开始变为null,在后面的操作里面,才会循环遍历找出来,然后从等待队列里面移除。 3. 这里是单链表的删除操作,肯定有两个指针(前置节点和当前节点) - 分为两种情况 1. 要删除的节点是头结点,如果要删除是头结点,前两个if都不会进去,直接到第四个,将q变为s,也就是将头结点变为头结点的下一个。 2. 如果不是头结点 那第一个if一直可以进去,一直找到了,就会进去第二个if,先直接删除,pred.next = s;(要删除的是p)。 在看看后面的判断,如果前置节点变为null,这意味着,此时此刻也有别的线程要删除pred节点,那当前的这个就直接放弃 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
总结
get方法,调用的时候,会通过 state 判断状态,如果 state完成,就会走到
report
方法,还是继续通过 state判断,如果是正常的结果,就直接返回,如果不是正常的,就抛出异常。没有完成,就会到awaitDone方法里面,在这个方法里面会构建等待节点,通过cas操作添加到等待队列里面,然后park住。在这个过程中,还会判断当前线程是否发生中断,如果发送中断,还会将当前线程从头结点中移除。
也就是说,在get里面一个线程到阻塞会循环最少会循环两次,一个是构建节点,一次是cas替换,之后才是堵塞,如果cas替换失败了,还会继续,所以,最少有两次尝试。
public boolean cancel(boolean mayInterruptIfRunning) { // 不满足条件 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // if (mayInterruptIfRunning) { try { //调用运行future线程的中断方法 Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 状态变为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 上面已经分析过了 finishCompletion(); } return true; }
关于Future的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。