本文将描述RxJava的设计原理,为了简化,本文并非完全参照RxJava的源码,也不讨论使用RxJava的作用,而从实现角度分析RxJava。本文不讨论RxJava的设计来源,具体请参考“函数式编程”的无副作用。
我们来看一个RxJava的一个简单使用示例:
Observable.just(123) .map(new Function<Integer, String>() { @Override public String apply(Integer i) { return "" + i; }}) .doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println("log:" + s + " Thread:" + Thread.currentThread().getName()); }}) .filter(new Filter<String>() { @Override public boolean filter(String s) { return s != null && s.length() > 0; }}) .subscribeOn(Schedules.ASYNC) .observeOn(Schedules.MAIN) .subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println("result:" + s + " Thread:" + Thread.currentThread().getName()); }});
运行得到结果:
I/System.out: log:123 Thread:Thread-2
I/System.out: result:123 Thread:main
上述RxJava并非使用官方源库,而是本文自定义的RxJava,也能达到官网RxJava一样的效果。
RxJava并非在调用map、doNext、filter、subscribeOn、ObserveOn等操作符时,立即调用内部的方法,基于函数式编程无副作用理论,我们对其进行包一层Observable,在subscribe的使用,也并非立即去消费对应Observer的内容,也调用上一层Observable对应的Observer。如图所示:
从上图可知,RxJava调用操作符时,并没有直接调用到其内部的方法。它每调用一次操作符就new了一个与之对应的Observable对象,调到最后开始subscribe的时候,就new了一个与之对应的Observer传参,最后调到最开始ObservableJust的时候,就开始进行onNext、onError、onComplete等操作,注意,现在在ObservableJust中最开始调用的是最外层的ObserveObserver的onNext,之后再层层往内调用,最后调用到我们传递的Observer。每次调用时,我们可以对其进行线程切换,如在ObservableSubscribeOn层subscribeOn(ASYN)时,就对后续的操作都放到了子线程中执行,再在ObservableObsereOn层中的onNext时,又可以将线程切换到Main线程。
简单起见,我们简化一下上述:
Observable.just(123) .map(new Function<Integer, String>() { @Override public String apply(Integer i) { return "" + i; }}) .subscribeOn(Schedules.ASYNC) .observeOn(Schedules.MAIN) .subscribe(new Observer<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onComplete() { System.out.println("onComplete"); } @Override public void one rror(Throwable r) { System.out.println("onError"); } @Override public void onSubscribe() { System.out.println("onSubscribe"); } });
针对上述案例,我们抽象出两个接口,1、Observer,2、ObservableSource。通过Observable开始分发事件。
public interface ObservableSource<T> { void subscribe(Observer<T> observer); } public interface Observer<T> { void onNext(T t); void onComplete(); void one rror(Throwable r); void onSubscribe(); } public abstract class Observable<T> implements ObservableSource<T> { public static <T> Observable<T> just(T item) { return new ObservableJust<>(item); } public <R> Observable<R> map(Function<T, R> function) { return (Observable<R>) new ObservableMap<>(this, function); } public Observable<T> subscribeOn(Schedules schedules) { return new ObservableSubscribeOn<>(this, schedules); } public Observable<T> observeOn(Schedules schedules) { return new ObservableObserveOn<>(this, schedules); } public void subscribe(Consumer<T> consumer) { this.subscribe(new LambdaObserver<>(consumer, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION)); } }
从ObservableJust出发,本源码简化了RxJava官方的源码,如下:
public class ObservableJust<T> extends Observable<T> { private final T value; public ObservableJust(T value) { this.value = value; } @Override public void subscribe(Observer<T> observer) { // 最后调用这里,才开始onNext等 observer.onSubscribe(); try { observer.onNext(value); observer.onComplete(); } catch (Throwable r) { observer.onError(r); } } }
之后就是map
// 主要用来保存srouce Observable,如ObservableJust.map之后,就new了一个ObservableMap,在该ObservableMap中保存了ObservableJust的引用,这就是装饰器模式,可以参考JVM的IOStream源码理解。 // 这样就能在sbscribe的时候,调用source.subscribe了,并进行功能增强,如线程切换等。 public abstract class AbstractObservableWithUpStream<T, R> extends Observable<T> { 是 protected final ObservableSource<T> source; protected AbstractObservableWithUpStream(ObservableSource<T> source) { this.source = source; } } public class ObservableMap<T, R> extends AbstractObservableWithUpStream<T, R>{ private final Function<T, R> function; public ObservableMap(ObservableSource<T> source, Function<T, R> function) { super(source); this.function = function; } @Override public void subscribe(Observer<T> observer) { source.subscribe(new MapObserver<T, R>((Observer<R>) observer, function)); } private static class MapObserver<T, R> extends BasicObserver<T, R> { final Function<T,R> mapper; public MapObserver(Observer<R> actual, Function<T,R> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { R r = mapper.apply(t); // 这里调用了Function中的apply方法,以进行业务能力扩展 actual.onNext(r); } } }
之后subscribeOn
public class ObservableSubscribeOn<T> extends AbstractObservableWithUpStream<T, T> { private final Schedules schedules; protected ObservableSubscribeOn(ObservableSource<T> source, Schedules schedules) { super(source); this.schedules = schedules; } @Override public void subscribe(Observer<T> observer) { // 在subscribe的时候进行线程切换 if (schedules == Schedules.MAIN) { source.subscribe(new SubscribeOnObserver<>(observer)); } else if(schedules == Schedules.ASYNC) { Schedules.executorService.submit(() -> source.subscribe(new SubscribeOnObserver<>(observer))); } } private static class SubscribeOnObserver<T> extends BasicObserver<T, T> { public SubscribeOnObserver(Observer<T> actual) { super(actual); } @Override public void onNext(T t) { actual.onNext(t); } } }
onServeOn
public class ObservableObserveOn<T> extends AbstractObservableWithUpStream<T, T> { private final Schedules schedules; protected ObservableObserveOn(ObservableSource<T> source, Schedules schedules) { super(source); this.schedules = schedules; } @Override public void subscribe(Observer<T> observer) { source.subscribe(new ObserveOnObserver<T>(observer, schedules)); } private static class ObserveOnObserver<T> extends BasicObserver<T, T> { private final Schedules schedules; public ObserveOnObserver(Observer<T> actual, Schedules schedules) { super(actual); this.schedules = schedules; } @Override public void onNext(T t) { // 在onNext的时候切换线程 if (schedules == Schedules.MAIN) { new Handler(Looper.getMainLooper()).post(() -> { actual.onNext(t); }); } else if (schedules == Schedules.ASYNC) { Schedules.executorService.submit(() -> actual.onNext(t)); } } } }
本文通过图解,源码,以及调用示例进行RxJava分析,同时,我们也可以如何自定义操作符,继承Observable,之后构建对应的操作符的Observable类和Observer类。