【 好书分享:《Spring 响应式编程》-- 京东】
Spring 响应式编程 随记 – C1 为什么选择响应式 Spring
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (一)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (二)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (三)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (四)
Spring 响应式编程 随记 – C2 Spring 响应式编程基本概念 (五)
Spring 响应式编程 随记 – C3 响应式流 新的标准流 (一)
Spring 响应式编程 随记 – C3 响应式流 新的标准流 (二)
RxJava 能够帮助快速的启动响应式编程。
3.1.1 API 不一致性的问题
大量的同类型响应式库给了更多不同的选择,但也容易使其过度复杂。
如果存在依赖于同一个异步非阻塞通信的两个不一样的API库,我们不得不使用额外的工具/工具类,来讲一个回调转换成另一个回调。
下面举一个例子,展示 ListenableFuture
和 CompletionStage
的兼容实现。
interface AsyncDatabaseClient { <T> CompletionStage<T> store(CompletionStage<T> stage); } final class AsyncAdapters{ public static <T> CompletionStage toCompletion(ListenableFuture<T> future){ CompletableFuture<T> completableFuture = new CompletableFuture<>(); future.addCallback( completableFuture::complete, completableFuture::completeExceptionally ); return future; } public static <T> ListenableFuture toListenable(CompletableFuture<T> stage){ SettableListenableFuture<T> future = new SettableListenableFuture<>(); stage.whenComplete((v,t)->{ if(t==null){ future.set(v); }else{ future.setException(t); } }); return future; } }
在 toCompletion()
方法里面, future.addCallback()
添加回调,提供了和 completableFuture 的集成,并且在这些回调中直接重用 CompletableFuture 的 API。
同理,在toListenable()
方法里面, stage.whenComplete
添加回调,把执行结果提供到 future 。
接着我们来看一下调用的实例。
@RestController public class MyController { //.... @RequestMapping public ListenableFuture<> requestData(){ AsyncRestTemplate httpClient = getHttpClient(); // execute => ListenableFuture AsyncDatabaseClient dbClient = getDbClient(); // store => CompletionStage // [httpClient.execute() is ListenableFuture] // ListenableFuture => CompletionStage => ListenableFuture // in order to trigger complete in the CompletionStage CompletionStage<String> completableStage = toCompletion( httpClient.execute(); ); // in order to trigger future return toListenable( dbClient.store(completableStage); ); } }
这个接口会异步返回 ListenableFuture ,为了存储 AsyncDatabaseClient 的执行结果,我们不得不做两次转换来让两种流进行适配。
Spring 4.x 框架的 ListenableFuture 和 CompletionStage 之间没有直接集成,Spring 5.x 则是拓展了 ListenableFuture 提供 Conpletable 的方法。
3.1.2 消息的拉与推
为了更好的理解 API 不一致性的问题,我们先回顾历史并且分析一个数据订阅交互模型。
在响应式的场景中,多数思路是把数据从源头去推给订阅者,因为很多场景用拉模型的效率不够高。
下面举一个例子,假设我们用拉模型通信,场景是我们需要请求数据库并且过滤一些条件,只取其中的前10个元素。
final AsyncDatabaseClient dbClient = getDbClient(); public CompletionStage<Queue<Item>> list(int count){ BlockingQueue<Item> storage = new ArrayBlockingQueue<>(count); CompletableFuture<Queue<Item>> result = new CompletableFuture<>(); pull("1", storage, result, count); return result; } private void pull(String elementId, Queue<Item> queue, CompletableFuture resultFuture, int count ) { dbClient.getNextAfterId(elementId) .thenAccept(item -> { if (isValid(item)) { queue.offer(item); if (queue.size() == count){ resultFuture.complete(queue); return; } } // again pull(item.getId(),queue,resultFuture,count); }) }
list 方法这里先声明了 Queue 和 CompletableFuture 来存储接受的值,然后开始第一次拉数据。
pull 方法里面请求数据库获取id之后的数据,异步接受结果,如果符合条件则放入队列,队列满则返回resultFuture。队列不满则掉入下方递归继续拉数据。
如果我们把整个请求流程按照时间线来看,就会看到这种拉模式的缺陷所在。
虽然,在服务和数据库之间,用了异步非阻塞的交互,但是这种逐个逐个去请求下一个元素,站在数据库服务的一端来看,大部分的空闲时间都在等待请求,服务端对于要生成数据的数量,也是不知道的,大多数时间都在等。
另外,当有大量的数据需要生成的时候,压力也会集中到服务端。
为了优化整体执行过程并将模型维持为 first class citizen 无限制的一等对象,可以进一步优化代码,把拉取操作和批处理结合起来。
private void pull(String elementId, Queue<Item> queue, CompletableFuture resultFuture, int count ) { dbClient.getNextAfterId(elementId,count) .thenAccept(allItems -> { for( Item item : allItems){ if (isValid(item)) { queue.offer(item); if (queue.size() == count){ resultFuture.complete(queue); return; } } } // again String lastItemId = allItems.get(allItems.size() - 1).getId(); pull(lastItemId,queue,resultFuture,count); }) }
这种改进优化了原先逐个逐个的请求流程,变成一批一批的元素请求。但是这种交互还是可能会存在一些效率低下的情况。
当数据库查询数据的时候,客户端在闲等。
另外发送一批需要更长的处理时间,这导致最后一个数据块可能是请求了超出所需要的数据。
这说明了拉模型的缺陷。
于是,我们可以进一步优化,通过推模型,使其只请求一次,然后当数据项变为可用的时候,由数据源异步推送数据。
public Observable<Item> list(int count){ return dbClient.getStreamOfItems() .filter(item -> isValid(item)) .take(count); }
这里的 list()
方法会返回 Observable 来推送 Item 元素,调用 getStreamOfItems 会对数据库完成一次订阅,take用来在客户端获取特定数量。当达到数量要求,就会发送完成信号,也就是取消订阅信号来关闭数据库的链接。
数据库服务只有在等待第一个响应的时候会有一段时间,接着就会一直连续工作发送Item,直到接受到取消信号。不过数据库可能会生成多余预定数量的元素。