异步调用的本质,其实是通过开启一个新的线程来执行。如以下例子:
public static void main(String[] args) throws Exception{ System.out.println("主线程 =====> 开始 =====> " + System.currentTimeMillis()); new Thread(() -> { System.out.println("异步线程 =====> 开始 =====> " + System.currentTimeMillis()); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("异步线程 =====> 结束 =====> " + System.currentTimeMillis()); }).start(); Thread.sleep(2000); System.out.println("主线程 =====> 结束 =====> " + System.currentTimeMillis()); }
数据结果如下所示:
主线程 =====> 开始 =====> 1627893837146 异步线程 =====> 开始 =====> 1627893837200 主线程 =====> 结束 =====> 1627893839205 异步线程 =====> 结束 =====> 1627893842212
因为异步任务的实现本质的由新线程来执行任务,所以通过线程池的也可以实现异步执行。写法同我们利用线程池开启多线程一样。
但由于我们的目的不是执行多线程,而是异步执行任务,所以一般需要另外一个线程就够了。
因此区别于执行多线程任务的我们常用的newFixedThreadPool
,在执行异步任务时,我们用newSingleThreadExecutor
来创建一个单个线程的线程池。
public static void main(String[] args) throws Exception{ System.out.println("主线程 =====> 开始 =====> " + System.currentTimeMillis()); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(()->{ System.out.println("异步线程 =====> 开始 =====> " + System.currentTimeMillis()); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("异步线程 =====> 结束 =====> " + System.currentTimeMillis()); }); executorService.shutdown(); // 回收线程池 Thread.sleep(2000); System.out.println("主线程 =====> 结束 =====> " + System.currentTimeMillis()); }
执行结果如下:
主线程 =====> 开始 =====> 1627895467578 异步线程 =====> 开始 =====> 1627895467635 主线程 =====> 结束 =====> 1627895469644 异步线程 =====> 结束 =====> 1627895472649
SpringBoot项目有一个的很重要的特点就是的注解化。如果你的项目是SpringBoot,那就又多了一种选择——@Async注解。
使用起来也非常简单,将要异步执行的代码封装成一个方法,然后用@Async注解该方法,然后在主方法中直接调用就行。
@Test public void mainThread() throws Exception{ System.out.println("主线程 =====> 开始 =====> " + System.currentTimeMillis()); collectionBill.asyncThread(); Thread.sleep(2000); System.out.println("主线程 =====> 结束 =====> " + System.currentTimeMillis()); Thread.sleep(4000); // 用于防止jvm停止,导致异步线程中断 }
@Async public void asyncThread(){ System.out.println("异步线程 =====> 开始 =====> " + System.currentTimeMillis()); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("异步线程 =====> 结束 =====> " + System.currentTimeMillis()); }
执行结果如下:
主线程 =====> 开始 =====> 1627897539948 异步线程 =====> 开始 =====> 1627897539956 主线程 =====> 结束 =====> 1627897541965 异步线程 =====> 结束 =====> 1627897544966
有以下两点需要注意:
@Tranctional
注解,@Async
注解的方法与调用方法不能在同一个类中,否则不生效
CompletableFuture是JDK1.8的新特性,是对Future的扩展。CompletableFuture实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力。
实现代码如下:
public static void main(String[] args) throws Exception{ System.out.println("主线程 =====> 开始 =====> " + System.currentTimeMillis()); ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture.runAsync(() ->{ System.out.println("异步线程 =====> 开始 =====> " + System.currentTimeMillis()); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("异步线程 =====> 结束 =====> " + System.currentTimeMillis()); },executorService); executorService.shutdown(); // 回收线程池 Thread.sleep(2000); System.out.println("主线程 =====> 结束 =====> " + System.currentTimeMillis()); }
同样可以实现类似的结果如下:
主线程 =====> 开始 =====> 1627898354914 异步线程 =====> 开始 =====> 1627898354977 主线程 =====> 结束 =====> 1627898356980 异步线程 =====> 结束 =====> 1627898359979
通过消息队列中间件实现异步处理,也是很常见的一种方式,比如:RabbitMQ、RocketMQ等,这里就不举例了。
我们项目中,某些场景下使用的是Google的 EventBus。EventBus 是 Google.Guava 的事件处理机制,是观察者模式(生产/消费模型)的一种实现。
类似的还有Spring的Event。
EventBus 优点:
EventBus 缺点:
如果需要分布式使用还是需要使用 MQ。
具体实现请参考:https://juejin.cn/post/6864940197269667853