大家好,我是小黑,一个在互联网苟且偷生的农民工。
在创建线程时,可以通过new Thread(Runnable)
方式,将任务代码封装在Runnable
的run()
方法中,将Runnable
作为任务提交给Thread
,或者使用线程池的execute(Runnable)
方法处理。
public class RunnableDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(new MyRunnable()); } } class MyRunnable implements Runnable { @Override public void run() { System.out.println("runnable正在执行"); } }
如果你之前有看过或者写过Runnable相关的代码,肯定会看到有说Runnable不能获取任务执行结果的说法,这就是Runnable存在的问题,那么可不可以改造一下来满足使用Runnable并获取到任务的执行结果呢?答案是可以的,但是会比较麻烦。
首先我们不能修改run()
方法让它有返回值,这违背了接口实现的原则;我们可以通过如下三步完成:
Runnable
中定义变量,存储计算结果;如果你有看过我之前的文章,相信要做到功能并不复杂,具体实现可以看我下面的代码。
public class RunnableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { MyRunnable<String> myRunnable = new MyRunnable<>(); new Thread(myRunnable).start(); System.out.println(LocalDateTime.now() + " myRunnable启动~"); MyRunnable.Result<String> result = myRunnable.getResult(); System.out.println(LocalDateTime.now() + " " + result.getValue()); } } class MyRunnable<T> implements Runnable { // 使用result作为返回值的存储变量,使用volatile修饰防止指令重排 private volatile Result<T> result; @Override public void run() { // 因为在这个过程中会对result进行赋值,保证在赋值时外部线程不能获取,所以加锁 synchronized (this) { try { TimeUnit.SECONDS.sleep(2); System.out.println(LocalDateTime.now() + " run方法正在执行"); result = new Result("这是返回结果"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 赋值结束后唤醒等待线程 this.notifyAll(); } } } // 方法加锁,只能有一个线程获取 public synchronized Result<T> getResult() throws InterruptedException { // 循环校验是否已经给结果赋值 while (result == null) { // 如果没有赋值则等待 this.wait(); } return result; } // 使用内部类包装结果而不直接使用T作为返回结果 // 可以支持返回值等于null的情况 static class Result<T> { T value; public Result(T value) { this.value = value; } public T getValue() { return value; } } }
从运行结果我们可以看出,确实能够在主线程中获取到Runnable的返回结果。
以上代码看似从功能上可以满足了我们的要求,但是存在很多并发情况的问题,实际开发中极不建议使用。在我们实际的工作场景中这样的情况非常多,我们不能每次都这样自定义搞一套,并且很容易出错,造成线程安全问题,那么在JDK
中已经给我们提供了专门的API来满足我们的要求,它就是Callable。
我们通过Callable来完成我们上面说的1-1亿的累加功能。
public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Long max = 100_000_000L; Long avgCount = max % 3 == 0 ? max / 3 : max / 3 + 1; // 在FutureTask中存放结果 List<FutureTask<Long>> tasks = new ArrayList<>(); for (int i = 0; i < 3; i++) { Long begin = 1 + avgCount * i; Long end = 1 + avgCount * (i + 1); if (end > max) { end = max; } FutureTask<Long> task = new FutureTask<>(new MyCallable(begin, end)); tasks.add(task); new Thread(task).start(); } for (FutureTask<Long> task : tasks) { // 从task中获取任务处理结果 System.out.println(task.get()); } } } class MyCallable implements Callable<Long> { private final Long min; private final Long max; public MyCallable(Long min, Long max) { this.min = min; this.max = max; } @Override public Long call() { System.out.println("min:" + min + ",max:" + max); Long sum = 0L; for (Long i = min; i < max; i++) { sum = sum + i; } // 可以返回计算结果 return sum; } }
运行结果:
可以在创建线程时将Callable
对象封装在FutureTask
对象中,交给Thread
对象执行。
FutureTask
之所以可以作为Thread
创建的参数,是因为FutureTask
是Runnable
接口的一个实现类。
既然FutureTask也是Runnable接口的实现类,那一定也有run()方法,我们来通过源码看一下是怎么做到有返回值的。
首先在FutureTask中有如下这些信息。
public class FutureTask<V> implements RunnableFuture<V> { // 任务的状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 具体任务对象 private Callable<V> callable; // 任务返回结果或者异常时返回的异常对象 private Object outcome; // 当前正在运行的线程 private volatile Thread runner; // private volatile WaitNode waiters; private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; }
public void run() { // 任务状态的校验 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行callable的call方法获取结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 有异常则设置返回值为ex setException(ex); } // 执行过程没有异常则将结果set if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
在这个方法中的核心逻辑就是执行callable的call()方法,将结果赋值,如果有异常则封装异常。
然后我们看一下get方法如何获取结果的。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 这里会阻塞等待 s = awaitDone(false, 0L); // 返回结果 return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) // 状态异常情况会抛出异常 throw new CancellationException(); throw new ExecutionException((Throwable)x); }
在FutureTask中除了get()
方法还提供有一些其他方法。
get(timeout,unit):获取结果,但只等待指定的时间;
cancel(boolean mayInterruptIfRunning):取消当前任务;
isDone():判断任务是否已完成。
在使用FutureTask来完成异步任务,通过get()方法获取结果时,会让获取结果的线程进入阻塞等待,这种方式并不是最理想的状态。
在JDK8
中引入了CompletableFuture
,对Future进行了改进,可以在定义CompletableFuture
传入回调对象,任务在完成或者异常时,自动回调。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { // 创建CompletableFuture时传入Supplier对象 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier()); //执行成功时 future.thenAccept(new MyConsumer()); // 执行异常时 future.exceptionally(new MyFunction()); // 主任务可以继续处理,不用等任务执行完毕 System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } } class MySupplier implements Supplier<Integer> { @Override public Integer get() { try { // 任务睡眠3s TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return 3 + 2; } } // 任务执行完成时回调Consumer对象 class MyConsumer implements Consumer<Integer> { @Override public void accept(Integer integer) { System.out.println("执行结果" + integer); } } // 任务执行异常时回调Function对象 class MyFunction implements Function<Throwable, Integer> { @Override public Integer apply(Throwable type) { System.out.println("执行异常" + type); return 0; } }
以上代码可以通过lambda表达式进行简化。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { // 任务睡眠3s TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return 3 + 2; }); //执行成功时 future.thenAccept((x) -> { System.out.println("执行结果" + x); }); future.exceptionally((type) -> { System.out.println("执行异常" + type); return 0; }); System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } }
通过示例我们发现CompletableFuture
的优点:
当然这些优点还不足以体现CompletableFuture的强大,还有更厉害的功能。
多个CompletableFuture
可以串行执行,如第一个任务先进行查询,第二个任务再进行更新
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { // 第一个任务 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1234); // 第二个任务 CompletableFuture<Integer> secondFuture = future.thenApplyAsync((num) -> { System.out.println("num:" + num); return num + 100; }); secondFuture.thenAccept(System.out::println); System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } }
CompletableFuture除了可以串行,还支持并行处理。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { // 第一个任务 CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> 1234); // 第二个任务 CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> 5678); // 通过anyOf将两个任务合并为一个并行任务 CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(oneFuture, twoFuture); anyFuture.thenAccept(System.out::println); System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } }
通过anyOf()
可以实现多个任务只有一个成功,CompletableFuture
还有一个allOf()
方法实现了多个任务必须都成功之后的合并任务。
Runnable接口实现的异步线程默认不能返回任务运行的结果,当然可以通过改造实现返回,但是复杂度高,不适合进行改造;
Callable接口配合FutureTask可以满足异步任务结果的返回,但是存在一个问题,主线程在获取不到结果时会阻塞等待;
CompletableFuture进行了增强,只需要指定任务执行结束或异常时的回调对象,在结束后会自动执行,并且支持任务的串行,并行和多个任务都执行完毕后再执行等高级方法。
以上就是本期的全部内容,我们下期见,如果觉得有用点个关注呗。