compositeDisposable = new CompositeDisposable(); List<String> list = new ArrayList<>(); list.add("hello"); list.add("world"); list.add("ni"); list.add("hao"); Disposable disposable = Observable.fromIterable(list) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } }); compositeDisposable.add(disposable);
日志输出如下, 从日志中可以看到, 是在主线程中运行的
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onComplete
改到子线程中
Disposable disposable = Observable.fromIterable(list) .observeOn(Schedulers.io()) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
输出
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onComplete
Disposable disposable = Observable.fromIterable(list) .observeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:"+s); return s + "map after"; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
我们上面通过map对结果进行了转换, 并修改运行的线程, 看下输出
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:world
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:hellomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:worldmap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:nimap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:haomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onComplete
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:"+s); return s + "map after"; } }) .observeOn(Schedulers.io()) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s + " flatMap"); } }) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
看输出
2021-05-31 15:27:38.678 3165-3165/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:world
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:hellomap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:worldmap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:nimap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:haomap after flatMap
2021-05-31 15:27:38.680 3165-6960/com.plbear.lxc E/imlog: onComplete
重复发射多次
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:"+s); return s + " -map "; } }) .observeOn(Schedulers.io()) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s + "-flatMap"); } }) .repeat(2) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onComplete
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:"+s); return s + " -map "; } }) .observeOn(Schedulers.io()) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s + "-flatMap"); } }) .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception { return objectObservable.delay(2, TimeUnit.SECONDS); } }) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
2021-05-31 15:41:09.082 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:09.084 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:41:11.086 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:11.087 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
从上面日志可以看到, repeatWhen会不断的重复执行.
延迟发射
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:"+s); return s + " -map "; } }) .observeOn(Schedulers.io()) .delay(1,TimeUnit.SECONDS) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s + "-flatMap"); } }) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:"+s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:world
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onComplete
当发生时候后重试, 可以设置重试的次数
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:" + s); return s + " -map "; } }) .observeOn(Schedulers.io()) .delay(1, TimeUnit.SECONDS) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { if (s.contains("ni")) { throw new RuntimeException("ni exception"); } return Observable.just(s + "-flatMap"); } }) .retry(2) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:" + s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
2021-05-31 15:52:32.740 14532-14532/com.plbear.lxc E/imlog: apple
2021-05-31 15:52:34.974 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:36.981 14532-15973/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:36.983 14532-15973/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:36.984 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:37.986 14532-15984/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: one rror
retryWhen
当遇到失败的时候,就一直重试
Disposable disposable = Observable.fromIterable(list) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { LogUtilsKt.logcat("map:" + s); return s + " -map "; } }) .observeOn(Schedulers.io()) .delay(1, TimeUnit.SECONDS) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { if (s.contains("ni")) { throw new RuntimeException("ni exception"); } return Observable.just(s + "-flatMap"); } }) .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { return throwableObservable.delay(1, TimeUnit.SECONDS); } }) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext:" + s); } @Override public void one rror(@NonNull Throwable e) { LogUtilsKt.logcat("onError"); } @Override public void onComplete() { LogUtilsKt.logcat("onComplete"); } });
合并多个请求
Disposable disposable = Observable.zip(Observable.fromIterable(list), Observable.fromIterable(list2), new BiFunction<String, String, String>() { @NonNull @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { LogUtilsKt.logcat("apply:"+s+":"+s2); return s + s; } }).subscribeWith(new DisposableObserver<String>(){ @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext"+s); } @Override public void one rror(@NonNull Throwable e) { } @Override public void onComplete() { } });
zipWith
Disposable disposable = Observable.fromIterable(list) .zipWith(Observable.fromIterable(list2), new BiFunction<String, String, String>() { @NonNull @Override public String apply(@NonNull String o, @NonNull String s) throws Exception { LogUtilsKt.logcat("apply:" + s + ":" + s); return o + ":" + s; } }) .subscribeWith(new DisposableObserver<String>() { @Override public void onNext(@NonNull String s) { LogUtilsKt.logcat("onNext" + s); } @Override public void one rror(@NonNull Throwable e) { } @Override public void onComplete() { } });
输出
2021-05-31 19:37:32.812 28110-28110/com.plbear.lxc E/imlog: apple
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: apply:nihao:nihao
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: onNexthello:nihao