【 好书分享:《Spring 响应式编程》-- 京东】
RxJava 库 是 Reactive Extensions 的 Java虚拟机实现,近似于观察者模式,迭代器模式,函数式编程的组合。
2.2.1 响应式流 = 观察者 + 迭代器
通过事件分离生产者和消费者。
迭代器模式:不希望生产者在消费者出现之前生产数据的场景。
public interface Iterator<T> { T next(); boolean hasNext(); } // 结合观察者 public interface RxObserver<T> { void onNext(T next); // 通知新值 void onComplete(); // 通知结束 void one rror(Exception e); // 通知出错 }
RxObserver 类似于前面介绍的观察者模式中的 Observer。
对于订阅的内容和订阅者,我们可以定义 Observale 和 Subcriber
同时,我们定义 Subcription 来控制 Observable 和 Subscriber 之间的运行时关系。Subscription 可以检查订阅状态,并且在必要的时候取消订阅。
生产者和消费者之间的Subcription契约如下:
---------| — onNext —> | -----------
| Observable | – onComplete -> | Observer |
--------- | — one rror —> |------------
根据RxJava中的规则,Observable 事件源可以发送0-N个元素,通过声明成功和引发错误来指示结束。
所以 Observable 事件源会对相关联的 Subscriber 多次调用 onNext,最后调用 onComplete 或 one rror。
2.2.2 生产和消费流数据
把 Observable 视为一个事件生成器,在订阅时会给订阅者传播事件。
Observable<String> observable = Obervable.create( new Observable.OnSubcribe<String>(){ @Override public void call(Subcriber<? super String> sub){ sub.onNext("Hello, reactive world!"); sub.onCompleted(); } } ); // 当订阅者出现时立刻会触发 call() // lambda写法 Observable<String> observable = Obervable.create( sub -> { sub.onNext("Hello, reactive world!"); sub.onCompleted(); } );
Observable 是可重用的,每个订阅者在订阅之后就会立刻收到这个消息事件。RxJava 1.2.7开始,Observable的创建因为不安全被弃用,因为它可能生成太多元素导致订阅者负载过多。即这种方法不支持背压。
对应的订阅者代码如下:
Subcriber<String> subscriber = new Subcriber<String>(){ @Override public void onNext(String s){ System.out.println("receive: " + s); } @Override public void onCompleted(){ System.out.println("Done."); } @Override public void one rror(Throwable e){ System.err.println(e); } }
所以现在订阅者 Subcriber 和 消息源 Observable 可以一起工作了。Subcriber 必须要实现Observer观察者的方法,定义 onNext 来响应新事件,定义 onCompleted 来响应流完成,定义 one rror 来响应错误。
最后只要在 Observable 添加订阅关系即可完成这个响应式demo程序。
observable.subscribe(subcriber); // output: // receive: Hello, reactive world! // Done. // 更简化的lambda写法 Observable<String> observable = Obervable.create( sub -> { sub.onNext("Hello, reactive world!"); sub.onCompleted(); } ); observable.subscribe( s -> System.out.println("receive: " + s), () -> System.out.println("Done"), System.err::println; ); // output: // receive: Hello, reactive world! // Done.
Rxjava 库对于创建消息源 Observable 实例很灵活。
Observable.just("1","2","3","4"); Observable.from(new String[]{"A", "B", "C", "D"}); Observable.from(Collections.emptyList()); Observale<String> msg1 = Observable.fromCallable(()->"hello-"); Future<String> future = Executors.newCachedThreadPool().submit(() -> "world"); Observale<String> msg2 = Observable.from(future); //组合多个流 处理顺序按照参数顺序 Observale<String> msg = Observable.concat(msg1, msg2, Observable.just(".")); msg.forEach(System.out::print); // output: hello-world.
虽然有onError信号来处理出错,看起来不用去为异常定义处理程序,但是在发生错误的时候,默认的 Subscriber 实现仍然会抛出一个 rx.exceptions.OnErrorNotImplementedException