Java教程

【28】RxJava模式与原理

本文主要是介绍【28】RxJava模式与原理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

(1)一个人只要自己不放弃自己,整个世界也不会放弃你.
(2)天生我才必有大用
(3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
(4)做难事必有所得
(5)精神乃真正的刀锋
(6)战胜对手有两次,第一次在内心中.
(7)好好活就是做有意义的事情.
(8)亡羊补牢,为时未晚
(9)科技领域,没有捷径与投机取巧。
(10)有实力,一年365天都是应聘的旺季,没实力,天天都是应聘的淡季。
(11)基础不牢,地动天摇
(12)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~

RxJava模式与原理

文章目录

  • RxJava模式与原理
    • 1.标准观察者与RxJava观察者
      • 1.1标准的观察者设计模式
        • 1.1.1生活中案例
        • 1.1.2生活中案例代码实现
      • 1.2扩展的RxJava观察者设计模式
        • 1.2.1RxJava Hook点
        • 1.2.2RxJava Hook机制
        • 1.2.3RxJava观察者模式
          • 1.2.3.1 Observer源码看看
          • 1.2.3.2 Observable创建过程源码分析
          • 1.2.3.3 subscribe订阅过程源码分析
          • 1.2.3.3Observable创建过程时序图
          • 1.2.3.4Observable与Observer订阅过程时序图
        • 1.2.4.标准观察者设计模式与RxJava观察者设计模式对比
    • 2.map变换操作符原理
    • 3.装饰模型
    • 4.背压

1.标准观察者与RxJava观察者

1.1标准的观察者设计模式

1.1.1生活中案例

(1)微信公众号与关注公众号的用户
(2)是一个被观察者有多个观察者的情况

在这里插入图片描述

1.1.2生活中案例代码实现

(1)抽象被观察者角色

public interface Observable {

    //关注 添加观察者
    void addObServer(Observer observer);
    //取消关注 删除观察者
    void removeObserver(Observer observer);
    //被观察者发出了改变通知观察者
    void notifyObservers();

    //被观察者发布一条消息的功能
    void pushMessage(String message);
}

(2)抽象观察者角色

public interface Observer {

    //被观察者改变了,收到改变通知,观察者做出相应响应
    void update(String message);
}

(3)具体被观察者角色

public class WechatServerObservable implements Observable{
    /**
     * 容器管理观察者
     */
    private List<Observer> observerList = new ArrayList<>();
    private String         message;

    @Override
    public void addObServer(Observer observer) {
        observerList.add(observer);
    }

    @Override
    public void removeObserver(Observer observer) {
        if(null != observerList){
           observerList.remove(observer);
        }
    }

    @Override
    public void notifyObservers() {
        for(Observer observer : observerList){
            observer.update(message);
        }
    }

    @Override
    public void pushMessage(String message) {
        this.message = message;

        notifyObservers();
    }
}

(4)具体观察者角色

public class Person implements Observer{

    private String name;
    private String message;

    public Person(String name) {
        this.name = name;
    }

    @Override
    public void update(String message) {
        this.message = message;
        readMessage();
    }

    private void readMessage(){
        System.out.println(String.format("%s收到了推送消息:%s",name,message));
    }
}

(5)客户端

public class ObserverModelActivity extends AppCompatActivity {

    @InjectView(R.id.btn_observer_test)
    private Button btn_observer_test;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_observer_model);
        InjectUtils.injectViewAndEvent(this);

    }

    @OnClick(R.id.btn_observer_test)
    public void onViewClick(View view){
        testObserverModel();
    }

    public void testObserverModel(){
        String msg = "重大消息:国家改革委发布智慧农业转型号召";

        //1.被观察者
        Observable observable = new WechatServerObservable();

        //2.观察者
        Observer observer1 = new Person("张三");
        Observer observer2 = new Person("张化");
        Observer observer3 = new Person("张丽");
        Observer observer4 = new Person("张雪");

        observable.addObServer(observer1);
        observable.addObServer(observer2);
        observable.addObServer(observer3);
        observable.addObServer(observer4);

        observable.pushMessage(msg);
    }
}

1.2扩展的RxJava观察者设计模式

1.2.1RxJava Hook点

(1)什么时候用到Hook?

  • 整个项目都在使用RxJava,想对RxJava做监听,此时就会使用到Hook技术

在这里插入图片描述

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    
        @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
  • RxJavaPlugins.onAssembly为全局RxJava的Hook,create与map还有其他操作符都有这样一个方法。
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
  • 可以让程序员插入自己的Hook,即先执行程序员自己写的Function,然后再执行RxJava自身的Hook.

  • 如何让onObservableAssembly不为空,满足程序员Hook先执行的条件呢?可以查看此值唯一的赋值处.

    public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    }
  • 即直接调用setOnObservableAssembly函数设置一个值就可以了。
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_source1);

        RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Exception {
                Log.d(Flag.TAG, "apply: 整个项目 全局 监听 到底有多少地方使用 RxJava:" + observable);


                //不破坏人家的功能
                return observable;
            }
        });

    }

1.2.2RxJava Hook机制

(1)Hook即钩子
(2)程序在执行过程中,想个办法,先让程序执行自己写的一部分功能,然后再执行正常的程序。

1.2.3RxJava观察者模式

在这里插入图片描述

1.2.3.1 Observer源码看看
public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void one rror(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

(1)抽象观察者Observer为一个泛型,即在构建具体的观察者时传什么类型就是什么类型
(2)onSubscribe为订阅函数,即在subscribe执行的时候就立即得到执行
(3)onNext拿到上一个事件(卡片或功能)流下来的数据.
(4)onError拿到上一个事件(卡片或功能)流下来的错误数据.
(5)onComplete事件结束

1.2.3.2 Observable创建过程源码分析
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

(1)ObservableOnSubscribe:自定义source

  • io.reactivex.internal.operators.observable.ObservableCreate
public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

(2)create创建的过程即将自定义的source赋给了io.reactivex.internal.operators.observable.ObservableCreate#source成员变量。

1.2.3.3 subscribe订阅过程源码分析
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call one rror because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

(1)实际上是ObservableCreate.subscribe
(2)将自定义观察者传给了它。
(3)进入到被观察者的io.reactivex.Observable#subscribeActual

protected abstract void subscribeActual(Observer<? super T> observer);

(4)这个方法执行之后,会直接回调到
io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

(5)将自定义观察者丢进来,并创建发射器,并且传入自定义观察者。

 CreateEmitter<T> parent = new CreateEmitter<T>(observer);

(6)然后执行onSubscribe方法,这也就是为什么执行了subscribe方法之后,这个方法马上会得到执行的原因。

observer.onSubscribe(parent);

(7)自定义source将发射器传入进去

source.subscribe(parent);

(8)调发射器的onNext

new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception {
                                //2.2发射器.onNext
                                e.onNext("A");
                            }
                        }

(9)再由发射器调用自定义观察者onNext

(10)整体调用图,即为一个U型结构。

在这里插入图片描述

1.2.3.3Observable创建过程时序图

在这里插入图片描述

1.2.3.4Observable与Observer订阅过程时序图

在这里插入图片描述

1.2.4.标准观察者设计模式与RxJava观察者设计模式对比

(1)在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所有的“观察者”才能观察者。

(2)在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要事件起点与终点在“订阅”一次之后,才发出改变通知,终点(观察者)才能观察到。

  • 为什么是多个“被观察者”?

因为可以有多个操作符如flatMap,map操作符,这就意味着有多个观察者

严格意义上来讲RxJava应用的是发布订阅模式。

在这里插入图片描述

(3)RxJava观察者设计模式,没有容器的概念。

2.map变换操作符原理

在这里插入图片描述

(1)对数据进行变换

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

(2)订阅触发之后,调用终点,它是通过ObservableMap.subscribe
io.reactivex.internal.operators.observable.ObservableMap#subscribeActual

  • source是上层事件
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

(3)添加一个MapObserver(终点)包裹

(4)调用了subscribe后

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

(5)io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext

(6)io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext

(7)io.reactivex.functions.Function#apply

(8)封装包裹拆包裹的过程

在这里插入图片描述

在这里插入图片描述

这即为卡片式思维

(13)map流程分析

在这里插入图片描述

3.装饰模型

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Derry");
                e.onComplete();
            }
        })

        // ↓ObservableCreate.map   包裹模型中 最里面
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return 45454;
            }
        })

        // ObservableMap.map
        .map(new Function<Integer, Boolean>() {
            @Override
            public Boolean apply(Integer integer) throws Exception {
                return true;
            }
        })

        // ↓包裹模型中最外面   往上走↑的时候在一层 层的剥开
        // ObservableMap.subscribe
        .subscribe(new Observer<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) { }

            @Override
            public void onNext(Boolean bool) {
                Log.d(Flag.TAG, "onNext bool:" + bool);
            }

            @Override
            public void one rror(Throwable e) { }

            @Override
            public void onComplete() { }
        });

在这里插入图片描述

在这里插入图片描述

4.背压

(1)起点到终点发射10000个事件
(2)终点处理不过来,这时候采取背压策略。
(3)即不停的生产产品,生产的速度远远的超过消费的速度。内存会不停的消耗增长。
(4)使用Flowable解决背压问题。

(5)map操作符只能发射一次事件,flapMap可以发射多次事件。

这篇关于【28】RxJava模式与原理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!