作为toc端的核心展示页面,对外为不同的用户提供各种入口数据,活动有效性校验.对内调度各个下游服务获取数据进行聚合,因此需要将同步改成异步并行加载
我们通过引入CompletableFuture(下文简称CF)对业务流程进行编排,降低依赖之间的阻塞。本文主要讲述CompletableFuture的使用和原理。
CompletableFuture是由Java 8引入的,对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
CompletableFuture 通常有3种实现过程
静态方法supplyAsync异步调用
CompletableFuture<String > completableFuture = CompletableFuture.supplyAsync(()-> "hello");
返回一个已经用给定值完成的新的CompletableFuture
CompletableFuture<String > completableFuture = CompletableFuture.completedFuture( "hello");
先初始化.再调用各种complete方法
CompletableFuture<String> completableFuture = new CompletableFuture<>(); completableFuture.complete("hello");
我们从常用的第一个方法作为入口来仔细分析学习Doug Lea 老爷子的代码
首先看下类中自带的代码块
static { try { final sun.misc.Unsafe u; UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class<?> k = CompletableFuture.class; RESULT = u.objectFieldOffset(k.getDeclaredField("result")); STACK = u.objectFieldOffset(k.getDeclaredField("stack")); NEXT = u.objectFieldOffset (Completion.class.getDeclaredField("next")); } catch (Exception x) { throw new Error(x); } }
主要是获取CompletableFuture类中的关键字段偏移量,用于后面的原子操作.可以看出来主要的字段有3个,result 是用来存储当前计算出来的结果值,stack是表示当前完成之后需要触发的Completion动作,next是获取下一个Completion操作的栈指针
从supplyAsync追寻调用链往下,可以看到调用的java.util.concurrent.CompletableFuture.AsyncSupply
public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } } }
可见在方法执行到最后会去调用postComplete
方法,去触发所有可以访问的依赖项Completion
final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. 在每个步骤中,变量f持有pop和run的当前依赖项。它每次只沿着一条路径扩展,以避免其他路径的无限递归。 */ CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } }
这里有个压栈和弹栈,详细可以见图
执行tryFire
方法,触发不同的操作,并返回对应Completion.
异步方法(即带Async后缀的方法)可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。
在使用时,最好是传入设立好的线程池,方便资源隔离和日志追溯,下面是在首页查询使用到的线程池demo
ThreadPoolExecutor DISTRIBUTION_POOL = new ThreadPoolExecutor(8, 16, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(500), new ThreadFactoryBuilder().setNameFormat("distribution-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
因为都是查询相关的数据,属于io密集的线程,核心线程数建议为cpu * 2,最大线程数建议为核心线程数 * 2,拒绝策略由于是当前用户查询相关,建议使用调用主线程执行策略.
如果有多个需要执行的模块,比如调用rpc 和查询数据库,可以使用CompletableFuture.allOf()
方法合并执行
CompletableFuture 也提供了一个类似try catch的方法来处理异常的问题exceptionally
,在实践过程中使用该方法来优雅处理对应异常.
CompletableFuture 作为java原生的模块,功能性可能比不上Reactor这种netty等都在用的NIO框架,但是胜在简单易用,在最大可能满足需求的前提下,功能简单意味着上手难度的简单,而且Reactor模型的追溯和理解难度比较高,不常用在业务的代码中,通过使用CompletableFuture首页接口查询性能得到显著提升,不过由于在java8的CompletableFuture并没有时间参数控制执行和返回时间,整体营销,因此某单个rpc超时接口影响整个首页接口超时的情况,只能手动设置rpc超时时间人为去控制