(1)一个人只要自己不放弃自己,整个世界也不会放弃你.
(2)天生我才必有大用
(3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
(4)做难事必有所得
(5)精神乃真正的刀锋
(6)战胜对手有两次,第一次在内心中.
(7)好好活就是做有意义的事情.
(8)亡羊补牢,为时未晚
(9)科技领域,没有捷径与投机取巧。
(10)有实力,一年365天都是应聘的旺季,没实力,天天都是应聘的淡季。
(11)基础不牢,地动天摇
(12)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~
(1)微信公众号与关注公众号的用户
(2)是一个被观察者有多个观察者的情况
(1)抽象被观察者角色
public interface Observable { //关注 添加观察者 void addObServer(Observer observer); //取消关注 删除观察者 void removeObserver(Observer observer); //被观察者发出了改变通知观察者 void notifyObservers(); //被观察者发布一条消息的功能 void pushMessage(String message); }
(2)抽象观察者角色
public interface Observer { //被观察者改变了,收到改变通知,观察者做出相应响应 void update(String message); }
(3)具体被观察者角色
public class WechatServerObservable implements Observable{ /** * 容器管理观察者 */ private List<Observer> observerList = new ArrayList<>(); private String message; @Override public void addObServer(Observer observer) { observerList.add(observer); } @Override public void removeObserver(Observer observer) { if(null != observerList){ observerList.remove(observer); } } @Override public void notifyObservers() { for(Observer observer : observerList){ observer.update(message); } } @Override public void pushMessage(String message) { this.message = message; notifyObservers(); } }
(4)具体观察者角色
public class Person implements Observer{ private String name; private String message; public Person(String name) { this.name = name; } @Override public void update(String message) { this.message = message; readMessage(); } private void readMessage(){ System.out.println(String.format("%s收到了推送消息:%s",name,message)); } }
(5)客户端
public class ObserverModelActivity extends AppCompatActivity { @InjectView(R.id.btn_observer_test) private Button btn_observer_test; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_observer_model); InjectUtils.injectViewAndEvent(this); } @OnClick(R.id.btn_observer_test) public void onViewClick(View view){ testObserverModel(); } public void testObserverModel(){ String msg = "重大消息:国家改革委发布智慧农业转型号召"; //1.被观察者 Observable observable = new WechatServerObservable(); //2.观察者 Observer observer1 = new Person("张三"); Observer observer2 = new Person("张化"); Observer observer3 = new Person("张丽"); Observer observer4 = new Person("张雪"); observable.addObServer(observer1); observable.addObServer(observer2); observable.addObServer(observer3); observable.addObServer(observer4); observable.pushMessage(msg); } }
(1)什么时候用到Hook?
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
可以让程序员插入自己的Hook,即先执行程序员自己写的Function,然后再执行RxJava自身的Hook.
如何让onObservableAssembly不为空,满足程序员Hook先执行的条件呢?可以查看此值唯一的赋值处.
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_source1); RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() { @Override public Observable apply(Observable observable) throws Exception { Log.d(Flag.TAG, "apply: 整个项目 全局 监听 到底有多少地方使用 RxJava:" + observable); //不破坏人家的功能 return observable; } }); }
(1)Hook即钩子
(2)程序在执行过程中,想个办法,先让程序执行自己写的一部分功能,然后再执行正常的程序。
public interface Observer<T> { /** * Provides the Observer with the means of cancelling (disposing) the * connection (channel) with the Observable in both * synchronous (from within {@link #onNext(Object)}) and asynchronous manner. * @param d the Disposable instance whose {@link Disposable#dispose()} can * be called anytime to cancel the connection * @since 2.0 */ void onSubscribe(@NonNull Disposable d); /** * Provides the Observer with a new item to observe. * <p> * The {@link Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(@NonNull T t); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. * <p> * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onComplete}. * * @param e * the exception encountered by the Observable */ void one rror(@NonNull Throwable e); /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * <p> * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onComplete(); }
(1)抽象观察者Observer为一个泛型,即在构建具体的观察者时传什么类型就是什么类型
(2)onSubscribe为订阅函数,即在subscribe执行的时候就立即得到执行
(3)onNext拿到上一个事件(卡片或功能)流下来的数据.
(4)onError拿到上一个事件(卡片或功能)流下来的错误数据.
(5)onComplete事件结束
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
(1)ObservableOnSubscribe:自定义source
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
(2)create创建的过程即将自定义的source赋给了io.reactivex.internal.operators.observable.ObservableCreate#source成员变量。
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call one rror because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
(1)实际上是ObservableCreate.subscribe
(2)将自定义观察者传给了它。
(3)进入到被观察者的io.reactivex.Observable#subscribeActual
protected abstract void subscribeActual(Observer<? super T> observer);
(4)这个方法执行之后,会直接回调到
io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
(5)将自定义观察者丢进来,并创建发射器,并且传入自定义观察者。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
(6)然后执行onSubscribe方法,这也就是为什么执行了subscribe方法之后,这个方法马上会得到执行的原因。
observer.onSubscribe(parent);
(7)自定义source将发射器传入进去
source.subscribe(parent);
(8)调发射器的onNext
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //2.2发射器.onNext e.onNext("A"); } }
(9)再由发射器调用自定义观察者onNext
(10)整体调用图,即为一个U型结构。
(1)在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所有的“观察者”才能观察者。
(2)在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要事件起点与终点在“订阅”一次之后,才发出改变通知,终点(观察者)才能观察到。
因为可以有多个操作符如flatMap,map操作符,这就意味着有多个观察者
严格意义上来讲RxJava应用的是发布订阅模式。
(3)RxJava观察者设计模式,没有容器的概念。
(1)对数据进行变换
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
(2)订阅触发之后,调用终点,它是通过ObservableMap.subscribe
io.reactivex.internal.operators.observable.ObservableMap#subscribeActual
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
(3)添加一个MapObserver(终点)包裹
(4)调用了subscribe后
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
(5)io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext
(6)io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext
(7)io.reactivex.functions.Function#apply
(8)封装包裹拆包裹的过程
这即为卡片式思维
(13)map流程分析
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Derry"); e.onComplete(); } }) // ↓ObservableCreate.map 包裹模型中 最里面 .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return 45454; } }) // ObservableMap.map .map(new Function<Integer, Boolean>() { @Override public Boolean apply(Integer integer) throws Exception { return true; } }) // ↓包裹模型中最外面 往上走↑的时候在一层 层的剥开 // ObservableMap.subscribe .subscribe(new Observer<Boolean>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Boolean bool) { Log.d(Flag.TAG, "onNext bool:" + bool); } @Override public void one rror(Throwable e) { } @Override public void onComplete() { } });
(1)起点到终点发射10000个事件
(2)终点处理不过来,这时候采取背压策略。
(3)即不停的生产产品,生产的速度远远的超过消费的速度。内存会不停的消耗增长。
(4)使用Flowable解决背压问题。
(5)map操作符只能发射一次事件,flapMap可以发射多次事件。