1.响应式编程的产生背景: 为了解决异步编程过程中出现的种种难题,人们提出了各种各样方法来规避这些问题,这些方法称为响应式编程(Reactive Programming),就像面向对象编程、函数式编程一样,响应式编程也是另一种编程范式。响应式编程,本质上是对数据流或某种变化所作出的反应,但是这个变化什么时候发生是未知的,所以他是一种基于异步、回调的方式在处理问题。 2.Reactive Streams组件简介、解决的问题、解决问题原理、解决方法: 1)组件简介: Reactive Streams 组件通过一组最小的接口对象、方法和协议来描述必要的操作从而实现具有非阻塞背压的异步数据流。在传统异步编程中,不同任务分别在不同的线程中执行,协调这些线程执行的先后顺序、线程间的依赖顺序是一件非常麻烦的事情,而Reactive Streams就是为了解决该问题。另外,Reactive Streams规范引入了回压(Back Pressure),可以动态控制线程间消息交换的速率,避免生产者产生过多的消息,消费者消费不完等类似问题。 备注:另种理解 回压(背压)是为了解决上游组件了过量的消息,导致下游组件无法及时处理,从而导致程序崩溃。 该组件的信息如下: <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.10.RELEASE</version> </dependency> 2)解决的问题: 系统之间高并发的大量数据流交互通常采用异步的发布-订阅模式。数据由发布者推送给订阅者的过程中,容易产生的一个问题是,当发布者即生产者产生的数据速度远远大于订阅者即消费者的消费速度时,消费者会承受巨大的资源压力(pressure)而有可能崩溃。 3)解决问题的原理: 为解决上述问题,数据流的速度需要被控制,即流量控制(flow control),以防止快速的数据流不会压垮目标。因此需要反压即背压(back pressure),生产者和消费者之间需要通过实现一种背压机制来互操作。实现这种背压机制要求是异步非阻塞的,如果是同步阻塞的,消费者在处理数据时生产者必须等待,会产生性能问题。 4)解决问题的方法: 响应式流(Reactive Streams)组件通过定义一组实体,接口和操作的方法规范,给出了实现非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等。 4、Reactive Streams的实现源码详解:主要学习组件中的接口定义、规范、及操作规范:
Reactive Streams 组件的源码非常清爽,就四个接口,本章将围绕着每个接口的定义,及其操作规范,原理实现来进行解读,具体如下:
4.1 发布者(Publisher): 发布者是潜在无限数量的序列元素的提供者,根据从订阅者接收到的请求发布这些元素。发布者可以在不同的时间点动态地为多个订阅者(Subscriber)提供服务。发布者的定义如下: public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } 对其理解: 1)subscribe(Subscriber<? super T> s):该方法表示请求发布者开始流式传输数据。这是一个“工厂方法”,可以多次调用,每次启动一个新订阅(Subscription); 2)每个订阅(Subscription)只适用于一个订阅者(Subscriber); 3)订阅者只能向单个发布者订阅一次。如果发布者拒绝订阅尝试或以其他方式失败,它将通过Subscriber.onError发出错误信号。参数-s,表示:将使用来自此发布服务器的信号的订户; 4.2 订阅者(Subscriber): 有四个事件方法,分别在开启订阅、接收数据、发生错误和数据传输结束时被调用。其具体定义如下: public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void one rror(Throwable t); public void onComplete(); } 对其理解: 1)订阅对象(Subscription)是发布者和订阅者之间交互的操作对象,在发布者(Publisher)通过subscribe方法加入订阅者时,会通过调用订阅者(Subscriber)的onSubscribe把订阅对象(Subscription)传给订阅者。 4.3 订阅对象(Subscription): 订阅对象的定义如下: interface Subscription { void request(long n); void cancel(); } 对其理解: 1)订阅者(Subscriber)拿到订阅对象后,通过调用订阅对象的request方法,根据自身消费能力请求n条数据,或者调用cancel方法来停止接收数据。 2)订阅对象(Subscription)的request方法被调用时,会触发订阅者(Subscriber)的onNext事件方法,把数据传输给订阅者。如果数据全部传输完成,则触发订阅者的onComplete事件方法。如果数据传输发生错误,则触发订阅者的onError事件方法。 4.4处理者(Processor) 处理者对象的定义如下: public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 对其理解: 1)处理者既是发布者又是订阅者,用于在发布者和订阅者之间转换数据格式,把发布者的T类型数据转换为订阅者接受的R类型数据。处理者作为数据转换的中介不是必须的。 由以上的接口可以看出,核心在于订阅者可以通过request(long n)方法来控制接收的数据量,达到了实现背压的目的。 参看博文: https://blog.csdn.net/wudaoshihun/article/details/83070086 https://www.cnblogs.com/flydean/p/13935084.html