异步编程是让程序并发运行的一种手段。它允许多个事情 同时发生
,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行,当方法执行完成时通知给主线程根据需要获取其执行结果或者失败异常的原因。使用异步编程可以大大提高我们程序的吞吐量,可以更好的面对更高的并发场景并更好的利用现有的系统资源,同时也会一定程度上减少用户的等待时间等。本文我们一起来看看在 Java
语言中使用异步编程有哪些方式。
在 Java
语言中最简单使用异步编程的方式就是创建一个 Thread
来实现,如果你使用的 JDK
版本是 8 以上的话,可以使用 Lambda 表达式 会更加简洁。为了能更好的体现出异步的高效性,下面提供同步版本和异步版本的示例作为对照:
<pre class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto;">/** * @author mghio * @since 2021-08-01 */ public class SyncWithAsyncDemo { public static void doOneThing() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("doOneThing ---->>> success"); } public static void doOtherThing() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("doOtherThing ---->>> success"); } public synchronized static void main(String[] args) throws InterruptedException { StopWatch stopWatch = new StopWatch("SyncWithAsyncDemo"); stopWatch.start(); // 同步调用版本 // testSynchronize(); // 异步调用版本 testAsynchronize(); stopWatch.stop(); System.out.println(stopWatch); } private static void testAsynchronize() throws InterruptedException { System.out.println("-------------------- testAsynchronize --------------------"); // 创建一个线程执行 doOneThing Thread doOneThingThread = new Thread(SyncWithAsyncDemo::doOneThing, "doOneThing-Thread"); doOneThingThread.start(); doOtherThing(); // 等待 doOneThing 线程执行完成 doOneThingThread.join(); } private static void testSynchronize() { System.out.println("-------------------- testSynchronize --------------------"); doOneThing(); doOtherThing(); } } </pre>
同步执行的运行如下:
注释掉同步调用版本的代码,得到异步执行的结果如下:
从两次的运行结果可以看出,同步版本耗时 4002 ms
,异步版本执行耗时 2064 ms
,异步执行耗时减少将近一半,可以看出使用异步编程后可以大大缩短程序运行时间。
上面的示例的异步线程代码在 main
方法内开启了一个线程 doOneThing-Thread
用来异步执行doOneThing
任务,在这时该线程与 main
主线程并发运行,也就是任务 doOneThing
与任务 doOtherThing
并发运行,则等主线程运行完 doOtherThing
任务后同步等待线程 doOneThing
运行完毕,整体还是比较简单的。
但是这个示例只能作为示例使用,如果用到了生产环境发生事故后果自负,使用上面这种 Thread
方式异步编程存在两个明显的问题。
FutureTask
自 JDK 1.5
开始,引入了 Future
接口和实现 Future
接口的 FutureTask
类来表示异步计算结果。这个 FutureTask
类不仅实现了 Future
接口还实现了 Runnable
接口,表示一种可生成结果的 Runnable
。其可以处于这三种状态:
FutureTask
没有执行 FutureTask.run()
方法之前FutureTask.run()
方法执行的过程中FutureTask.run()
方法正常执行结果或者调用了 FutureTask.cancel(boolean mayInterruptIfRunning)
方法以及在调用 FutureTask.run()
方法的过程中发生异常结束后FutureTask
类实现了 Future
接口的开启和取消任务、查询任务是否完成、获取计算结果方法。要获取 FutureTask
任务的结果,我们只能通过调用 getXXX()
系列方法才能获取,当结果还没出来时候这些方法会被阻塞,同时这了任务可以是 Callable
类型(有返回结果),也可以是 Runnable
类型(无返回结果)。我们修改上面的示例把两个任务方法修改为返回 String
类型,使用 FutureTask
的方法如下:
<pre class="prettyprint hljs gradle" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto;">private static void testFutureTask() throws ExecutionException, InterruptedException { System.out.println("-------------------- testFutureTask --------------------"); // 创建一个 FutureTask(doOneThing 任务) FutureTask<String> futureTask = new FutureTask<>(FutureTaskDemo::doOneThing); // 使用线程池执行 doOneThing 任务 ForkJoinPool.commonPool().execute(futureTask); // 执行 doOtherThing 任务 String doOtherThingResult = doOtherThing(); // 同步等待线程执行 doOneThing 任务结束 String doOneThingResult = futureTask.get(); // 任务执行结果输出 System.out.println("doOneThingResult ---->>> " + doOneThingResult); System.out.println("doOtherThingResult ---->>> " + doOtherThingResult); } </pre>
使用 FutureTask
异步编程方式的耗时和上面的 Thread
方式是差不多的,其本质都是另起一个线程去做 doOneThing
任务然后等待返回,运行结果如下:
这个示例中, doOneThing
和 doOtherThing
都是有返回值的任务(都返回 String
类型结果),我们在主线程 main
中创建一个异步任务 FutureTask
来执行 doOneThing
,然后使用 ForkJoinPool.commonPool()
创建线程池(有关 ForkJoinPool
的介绍见这里),然后调用了线程池的 execute
方法把 futureTask
提交到线程池来执行。
通过示例可以看到,虽然 FutureTask
提供了一些方法让我们获取任务的执行结果、任务是否完成等,但是使用还是比较复杂,在一些较为复杂的场景(比如多个 FutureTask
之间的关系表示)的编码还是比较繁琐,还是当我们调用 getXXX()
系列方法时还是会在任务执行完毕前阻塞调用线程,达不到异步编程的效果,基于这些问题,在 JDK 8
中引入了 CompletableFuture
类,下面来看看如何使用 CompletableFuture
来实现异步编程。
JDK 8
中引入了 CompletableFuture
类,实现了 Future
和 CompletionStage
接口,为异步编程提供了一些列方法,如 supplyAsync
、 runAsync
和 thenApplyAsync
等,除此之外 CompletableFuture
还有一个重要的功能就是可以让两个或者多个 CompletableFuture
进行运算来产生结果。代码如下:
<pre class="prettyprint hljs livescript" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto;">/** * @author mghio * @since 2021-08-01 */ public class CompletableFutureDemo { public static CompletableFuture<String> doOneThing() { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "doOneThing"; }); } public static CompletableFuture<String> doOtherThing(String parameter) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return parameter + " " + "doOtherThing"; }); } public static void main(String[] args) throws ExecutionException, InterruptedException { StopWatch stopWatch = new StopWatch("CompletableFutureDemo"); stopWatch.start(); // 异步执行版本 testCompletableFuture(); stopWatch.stop(); System.out.println(stopWatch); } private static void testCompletableFuture() throws InterruptedException, ExecutionException { // 先执行 doOneThing 任务,后执行 doOtherThing 任务 CompletableFuture<String> resultFuture = doOneThing().thenCompose(CompletableFutureDemo::doOtherThing); // 获取任务结果 String doOneThingResult = resultFuture.get(); // 获取执行结果 System.out.println("DoOneThing and DoOtherThing execute finished. result = " + doOneThingResult); } } </pre>
执行结果如下:
在主线程 main
中首先调用了方法 doOneThing()
方法开启了一个异步任务,并返回了对应的CompletableFuture
对象,我们取名为 doOneThingFuture
,然后在 doOneThingFuture
的基础上使用 CompletableFuture
的 thenCompose()
方法,让 doOneThingFuture
方法执行完成后,使用其执行结果作为 doOtherThing(String parameter)
方法的参数创建的异步任务返回。
我们不需要显式使用 ExecutorService
,在 CompletableFuture
内部使用的是 Fork/Join
框架异步处理任务,因此,它使我们编写的异步代码更加简洁。此外, CompletableFuture
类功能很强大其提供了和很多方便的方法,更多关于 CompletableFuture
的使用请见这篇。