定义
CompletableFuture
,实现了Future
和CompletionStage
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... }
用法
supplyAsync(Supplier<U> supplier, Executor executor)
异步,有返回值
Callable
就是Supplier
的一个实现,代表这个方法的结果有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now()); return 10; }); System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now()); System.out.println(future.get()); System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now()); // 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:41:10.852 // 主线程:Thread[main,5,main]执行时间:15:41:10.852 // 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:41:12.854 // 10 // 主线程:Thread[main,5,main]结束时间:15:41:12.854
可以看到,效果相当于ExecutorService submit(Callable<T> task)
方法,有返回值
runAsync(Runnable runnable)
异步,无返回值CompletableFuture<?> future = CompletableFuture.runAsync(() -> { System.out.println("子线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now()); }); System.out.println("主线程:" + Thread.currentThread() + "执行时间:" + LocalTime.now()); System.out.println(future.get()); System.out.println("主线程:" + Thread.currentThread() + "结束时间:" + LocalTime.now()); // 主线程:Thread[main,5,main]执行时间:15:47:26.958 // 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]执行时间:15:47:26.958 // 子线程:Thread[ForkJoinPool.commonPool-worker-1,5,main]结束时间:15:47:28.958 // null // 主线程:Thread[main,5,main]结束时间:15:47:28.958
可以看到,效果相当于ExecutorService submit(Runnable task)
方法,无返回值
这两方法各有一个重载版本,可以指定执行异步任务的Executor实现(线程池),如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程
传入自定义的线程池
ExecutorService executorService = Executors.newCachedThreadPool(); CompletableFuture<?> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }, executorService); System.out.println(future.get());
或者使用默认的ForkJoinPool线程池
ForkJoinPool pool = new ForkJoinPool(); CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 10.1; }, pool); System.out.println(future.get());
表示将上一步的操作执行完后接下来执行下一步操作,并且将上一步的结果作为入参
ForkJoinPool pool = new ForkJoinPool(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int sum = 0; try { TimeUnit.SECONDS.sleep(3); for (int i = 1; i <= 100; i++) { sum += i; } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("(1)我是线程:" + Thread.currentThread()); return sum; }, pool); CompletableFuture<Integer> f2 = f1.thenApply((result) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); return result + 1; }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get()); System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main] (1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] (2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] f1结果:5050 f2结果:5051
可以看到,f1和f2共用的一个线程
我们把上一步的f1.thenApply
改为 f1.thenApplyAsync
来看看执行结果
(3)我是线程:Thread[main,5,main] (1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] f1结果:5050 (2)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main] f2结果:5051
可以看到和 thenApply 不一样的地方是,f2里是新开了一个线程来执行
跟 thenApply 一样,接受上一步的执行结果作为入参,但是区别就是 thenAccept 没有返回值
ForkJoinPool pool = new ForkJoinPool(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int sum = 0; try { TimeUnit.SECONDS.sleep(3); for (int i = 1; i <= 100; i++) { sum += i; } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("(1)我是线程:" + Thread.currentThread()); return sum; }, pool); CompletableFuture<Void> f2 = f1.thenAccept((result) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中的result:" + result); }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get()); System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main] (1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] f1结果:5050 (2)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] (2)中的result:5050 f2结果:null
可以看到f2是void无返回,所有获取到的结果为null,跟 thenApply 一样,f1和f2也是用的同一个线程
和 thenAccept 不一样,它没有入参也没有返回值,获取f2的结果时也是null
CompletableFuture<Void> f2 = f1.thenRun(() -> { System.out.println("(2)我是线程:" + Thread.currentThread()); });
ForkJoinPool pool = new ForkJoinPool(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int sum = 0; try { TimeUnit.SECONDS.sleep(3); for (int i = 1; i <= 100; i++) { sum += i; } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("(1)我是线程:" + Thread.currentThread()); return sum; }, pool); CompletableFuture<Integer> f2 = f1.exceptionally((ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); ex.printStackTrace(); return 0; }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get()); System.out.println("f2结果:" + f2.get());
执行结果
(3)我是线程:Thread[main,5,main] (1)我是线程:Thread[ForkJoinPool-1-worker-1,5,main] f1结果:5050 f2结果:5050
f1无异常的时候,f2返回的是f1的结果
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); if (true) { throw new RuntimeException("test ex"); } return 5050; }).exceptionally((ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); ex.printStackTrace(); return 0; }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get());
执行结果
可以看到,当f1中有异常的时候,触发exceptionally
中的执行逻辑
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); return 5050; }); CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中result:" + result); if (ex != null) { ex.printStackTrace(); } }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get()); System.out.println("f2结果:" + f2.get());
执行结果
(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main] (2)我是线程:Thread[main,5,main] (2)中result:5050 (3)我是线程:Thread[main,5,main] f1结果:5050 f2结果:5050
可以看到正常无异常的情况下,f1、f2都能获取到结果,其实可以理解为f2就是f1,因为正常使用的话都是连着一起写的,比如:CompletableFuture.supplyAsync().whenComplete()
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); if (true) { throw new RuntimeException("test ex"); } return 5050; }); CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中result:" + result); if (ex != null) { ex.printStackTrace(); } }); System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("f1结果:" + f1.get()); // get时抛出了异常 System.out.println("f2结果:" + f2.get());
执行结果
可以看到,f1中发生了异常情况,f2中捕获到了异常,但是result没值,
因为result和ex是互斥的关系,有异常时没result,有result时没异常,同时主线程中执行f1.get()方法时,抛出了异常,导致主线程退出;
f2.get()一样会抛出异常,导致主线程中断
无异常的情况
f1的代码复用whenComplete
中无异常的代码
CompletableFuture<String> f2 = f1.handle((result, ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中result:" + result); if (ex != null) { ex.printStackTrace(); } return "我是(2)中的执行结果"; });
执行结果
(1)我是线程:Thread[ForkJoinPool.commonPool-worker-1,5,main] (2)我是线程:Thread[main,5,main] (2)中result:5050 (3)我是线程:Thread[main,5,main] f1结果:5050 f2结果:我是(2)中的执行结果
正常无异常的时候执行结果和
whenComplete
是一样的,只不过f2中可以返回自己的结果,跟f1完全不相关,类型都可以不一样
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); if (true) { throw new RuntimeException("test ex"); } return 5050; }); CompletableFuture<String> f2 = f1.handle((result, ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中result:" + result); if (ex != null) { ex.printStackTrace(); } return "我是(2)中的执行结果"; }); System.out.println("(3)我是线程:" + Thread.currentThread()); // System.out.println("f1结果:" + f1.get()); // f1触发了异常,会中断主线程 System.out.println("f2结果:" + f2.get());
执行结果
f1中触发了异常,异常以ex参数的形式传入到f2中,因为result和异常互斥,此时result为空,f2在打印了异常信息后,返回了自定义的结果,所以主线程中如果直接执行f2.get()能获取到结果,但是当f1出现异常后,在f2调用get方法之前调用了f1的get方法会结束主线程,f2相当于一个新的对象,消化了异常,返回了自己的信息,所以主线程能正常处理f2的get方法
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture对象执行get方法时会抛出异常中断主线程的执行,如果都是正常执行,则get返回null
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); int i = 1 / 0; return 100; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("(2)我是线程:" + Thread.currentThread()); return 200; }); CompletableFuture<Void> f3 = CompletableFuture.allOf(f1, f2).whenComplete((result, ex) -> { System.out.println("(3)我是线程:" + Thread.currentThread()); System.out.println("(3)中result:" + result); if (ex != null) { ex.printStackTrace(); } }); System.out.println("(4)我是线程:" + Thread.currentThread()); System.out.println("f2结果:" + f3.get()); .junit.Test ic void test3() throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("(1)我是线程:" + Thread.currentThread()); if (true) { throw new RuntimeException("test ex"); } return 5050; }); CompletableFuture<Integer> f2 = f1.whenComplete((result, ex) -> { System.out.println("(2)我是线程:" + Thread.currentThread()); System.out.println("(2)中result:" + result); if (ex != null) { ex.printStackTrace(); } }); System.out.println("(3)我是线程:" + Thread.currentThread()); // System.out.println("f1结果:" + f1.get()); // get时抛出了异常 System.out.println("f2结果:" + f2.get());
执行结果