这是《使用Kotlin开发一个现代的APP》系列文章的第三部分,还没看过前2部分的,可以先看一下:
【译】使用Kotlin从零开始写一个现代Android 项目-Part1
【译】使用Kotlin从零开始写一个现代Android 项目-Part2
正文开始!
关于RxJava,一个广泛的概念是-RxJava是用于异步编程的API的Java实现,它具有可观察流和响应式的API。实际上,它是这三个概念的结合:观察者模式、迭代器模式和函数式编程。这里也有其他编程语言实现的库,如:RxSwift、RxJs 、RxNet等。
我RxJava上手很难,有时,它确实很令人困惑,如果实施不当,可能会给您带来一些问题。尽管如此,我们还是值得花时间学习它。我将尝试通过简单的步骤来解释RxJava。
首先,让我们回答一些简单的问题,当您开始阅读有关RxJava时,可能会问自己:
答案是否定的,RxJava只是可以在Android开发中使用的又一个库。如果使用Kotlin开发,它也不是必须的,我希望你明白我说的,它只一个很帮助你的库,就像你使用的所以其他库一样。
你可以直接从RxJava2开始,不过,作为Android开发人员,知道这两种情况对你还是有好处的,因为你可能会参与维护其他人的RxJava1代码。
RxJava能用在任何Java开发平台,不仅仅是Android,比如,对于后端开发来说,RxJava 可以与Spring等框架一起使用,RxAndroid是一个库,其中包含在Android中使用RxJava所需的库。因此,如果要在Android开发中使用RxJava,则必须再添加RxAndroid。稍后,我将解释RxAndroid基于RxJava所添加的内容。
我们没有必要另外再添加一个Rx 库了,因为Kotlin与Java是完全兼容的,这里确实有一个RxKotin库:https://github.com/ReactiveX/... ,不过该库是在RxJava之上编写的。它只是将Kotlin功能添加到RxJava。您可以将RxJava与Kotlin一起使用,而无需使用RxKotlin库。为了简单起见,在这一部分中我将不使用RxKotlin。
要使用RxJava,你需要在build.gradle
中添加如下代码:
dependencies { ... implementation "io.reactivex.rxjava2:rxjava:2.1.8" implementation "io.reactivex.rxjava2:rxandroid:2.0.1" ... }
然后,点击sync
,下载Rxjava库。
我想把RxJava分为以下三部分:
Observables
和 Observers
Observables
和 Observers
我们已经解释了这种模式。您可以将Observable视为数据的源(被观察者
),将Observer视为接收数据的源(观察者
)。
有很多创建Observables的方式,最简单的方法是使用Observable.just()
来获取一个项目并创建Observable来发射该项目。
让我们转到GitRepoRemoteDataSource
类并更改getRepositories
方法,以返回Observable:
class GitRepoRemoteDataSource { fun getRepositories() : Observable<ArrayList<Repository>> { var arrayList = ArrayList<Repository>() arrayList.add(Repository("First from remote", "Owner 1", 100, false)) arrayList.add(Repository("Second from remote", "Owner 2", 30, true)) arrayList.add(Repository("Third from remote", "Owner 3", 430, false)) return Observable.just(arrayList).delay(2,TimeUnit.SECONDS) } }
Observable <ArrayList <Repository >>
表示Observable发出Repository对象的数组列表。如果要创建发出Repository对象的Observable <Repository>,则应使用Observable.from(arrayList)
。
.delay(2,TimeUnit.SECONDS)
表示延迟2s后才开始发射数据。
但是,等等!我们并没有高数Observable何时发射数据啊?Observables通常在一些Observer订阅后就开始发出数据。
请注意,我们不再需要以下接口了
interface OnRepoRemoteReadyCallback { fun onRemoteDataReady(data: ArrayList<Repository>) }
在GitRepoLocalDataSource:
类中做同样的更改
class GitRepoLocalDataSource { fun getRepositories() : Observable<ArrayList<Repository>> { var arrayList = ArrayList<Repository>() arrayList.add(Repository("First From Local", "Owner 1", 100, false)) arrayList.add(Repository("Second From Local", "Owner 2", 30, true)) arrayList.add(Repository("Third From Local", "Owner 3", 430, false)) return Observable.just(arrayList).delay(2, TimeUnit.SECONDS) } fun saveRepositories(arrayList: ArrayList<Repository>) { //todo save repositories in DB } }
同样的,也不需要这个接口了:
interface OnRepoLocalReadyCallback { fun onLocalDataReady(data: ArrayList<Repository>) }
现在,我们需要在repository
中返回Observable
class GitRepoRepository(private val netManager: NetManager) { private val localDataSource = GitRepoLocalDataSource() private val remoteDataSource = GitRepoRemoteDataSource() fun getRepositories(): Observable<ArrayList<Repository>> { netManager.isConnectedToInternet?.let { if (it) { //todo save those data to local data store return remoteDataSource.getRepositories() } } return localDataSource.getRepositories() } }
如果网络已连接,我们从远程数据源返回Observable,否则,从本地数据源返回Observable,同样的,我们也不再需要OnRepositoryReadyCallback
接口。
如你所料,我们需要更改在MainViewModel中获取数据的方式。现在我们应该从gitRepoRepository
获取Observable并订阅它。一旦我们向Observer订阅了该Observable,Observable将开始发出数据:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... fun loadRepositories() { isLoading.set(true) gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{ override fun onSubscribe(d: Disposable) { //todo } override fun onError(e: Throwable) { //todo } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } }
一旦Observer订阅了Observable,onSubscribe
方法将被调用,主要onSubscribe
的参数Disposable
,稍后将讲到它。
每当Observable发出数据时,将调用onNext()
方法。当Observable完成s数据发射时,onComplete()
将被调用一次。之后,Observable终止。
如果发生某些异常,onError()
方法将被回调,然后Observable终止。这意味着Observable将不再发出数据,因此onNext()
不会被调用,也不会调用onComplete()
。
另外,请注意。如果尝试订阅已终止的Observable,则将收到IllegalStateException
。
那么,RxJava如何帮助我们?
onError()
方法中返回,因此我们可以向用户显示适当的错误消息。我们再一次看一下ViewModel的生命周期图
一旦Activity销毁,ViewModel的onCleared
方法将被调用,在onCleared
方法中,我们需要取消所有订阅
class MainViewModel(application: Application) : AndroidViewModel(application) { ... lateinit var disposable: Disposable fun loadRepositories() { isLoading.set(true) gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{ override fun onSubscribe(d: Disposable) { disposable = d } override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if(!disposable.isDisposed){ disposable.dispose() } } }
我们可以优化一下上面的代码:
首先,使用DisposableObserver
替换Observer
,它实现了Disposable并且有dispose()
方法,我们不再需要onSubscribe()
方法,因为我们可以直接在DisposableObserver实例上调用dispose()
。
第二步,替换掉返回Void的.subscribe()
方法,使用.subscribeWith()
方法,他能返回指定的Observer
class MainViewModel(application: Application) : AndroidViewModel(application) { ... lateinit var disposable: Disposable fun loadRepositories() { isLoading.set(true) disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { // todo } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if(!disposable.isDisposed){ disposable.dispose() } } }
上面的代码还可以继续优化:
我们保存了一个Disposable实例,因此,我们才可以在onCleared()
回调中调用dispose()
,但是等等!我们需要为每一个调用都这样做吗?如果有10个回调,那么我们得保存10个实例,在onCleared()
中取消10次订阅?显然不可能,这里有更好的方法,我们应该将它们全部保存在一个存储桶中,并在调用onCleared()
方法时,将它们全部一次处理。我们可以使用CompositeDisposable
。
CompositeDisposable
:可容纳多个Disposable的容器
因此,每次创建一个Disposable,都需要将其添加到CompositeDisposable
中:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... private val compositeDisposable = CompositeDisposable() fun loadRepositories() { isLoading.set(true) compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } })) } override fun onCleared() { super.onCleared() if(!compositeDisposable.isDisposed){ compositeDisposable.dispose() } } }
感谢Kotlin的扩展函数,我们还可以更进一步:
与C#和Gosu相似,Kotlin提供了使用新功能扩展类的能力,而不必继承该类,也就是扩展函数。
让我们创建一个新的包,叫做extensions
,并且添加一个新的文件RxExtensions.kt
import io.reactivex.disposables.CompositeDisposable import io.reactivex.disposables.Disposable operator fun CompositeDisposable.plusAssign(disposable: Disposable) { add(disposable) }
现在我们可以使用+ =
符号将Disposable对象添加到CompositeDisposable实例:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... private val compositeDisposable = CompositeDisposable() fun loadRepositories() { isLoading.set(true) compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if (!compositeDisposable.isDisposed) { compositeDisposable.dispose() } } }
现在,我们运行程序,当你点击Load Data
按钮,2s之后,程序crash,然后,如果查看日志,您将看到onNext
方法内部发生错误,并且异常的原因是:
java.lang.IllegalStateException: Cannot invoke setValue on a background thread
为何会发生这个异常?
RxJava附带有调度器(Schedulers),使我们可以选择在哪个线程代码上执行。更准确地说,我们可以选择使用subscribeOn()
方在哪个线程执行,observeOn()
方法可以观察哪个线程观察者。通常情况下,我们所有的数据层代码都应该在后台线程执行,例如,如果我们使用Schedulers.newThread()
,每当我们调用它时,调度器都会给我们分配一个新的线程,为了简单起见,Scheduler中还有其他一些方法,我将不在本博文中介绍。
可能您已经知道所有UI代码都是在Android 主线程上完成的。 RxJava是Java库,它不了解Android主线程,这就是我们使用RxAndroid的原因。 RxAndroid使我们可以选择Android Main线程作为执行代码的线程。显然,我们的Observer应该在Android Main线程上运行。
让我们更改一下代码:
... fun loadRepositories() { isLoading.set(true) compositeDisposable += gitRepoRepository .getRepositories() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(object : DisposableObserver<ArrayList<Repository>>() { ... }) } ...
然后再运行代码,一切都正常了,nice~
这里还有一些其他的observable 类型
onSuccess()
事件或者异常 Observable<T>
一样,不发射数据,或者发射n个数据,或者发射异常,但是Observable不支持背压,而Flowable却支持。为了记住一些概念,我喜欢将它们与现实中的一些例子类比
把它类比成一个通道,如果你向通道中塞入瓶颈能够接受的最多的商品,这将会变得很糟,这里也是同样的,有时,你的观察者无法处理其收到的事件数量,因此需要放慢速度。
你可以看看RxJava 关于背压的文档:https://github.com/ReactiveX/...
RxJava中,最牛逼的就是它的操作符了,仅用一行代码即可在RxJava中解决一些通常需要10行或更多行的问题。这些是操作符可以帮我们做的:
我给你举一个例子,让我们将数据保存到GitRepoLocalDataSource中。因为我们正在保存数据,所以我们需要Completable来模拟它。假设我们还想模拟1秒的延迟。天真的方法是:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable { return Completable.complete().delay(1,TimeUnit.SECONDS) }
为什么说天真?
Completable.complete()
返回一个Completable实例,该实例在订阅后立即完成。
一旦Completable 完成后,它将终止。因此,之后将不执行任何运算符(延迟是运算符之一)。在这种情况下,我们的Completable不会有任何延迟。让我们找解决方法:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable { return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable() }
为什么是这种方式?
Single.just(1)
创建一个Single实例,并且仅发射一个数字1,因为我们用了delay(1,TimeUnit.SECONDS)
,因此发射操作延迟1s。
toCompletable()
返回一个Completable,它丢弃Single的结果,并在此Single调用onSuccess
时调用onComplete
。
因此,上面的代码将返回Completable,并且1s后调用onComplete()
。
现在,我们应该更改我们的GitRepoRepository。让我们回顾一下逻辑。我们检查互联网连接。如果有互联网连接,我们从远程数据源获取数据,将其保存在本地数据源中并返回数据。否则,我们仅从本地数据源获取数据。看一看:
fun getRepositories(): Observable<ArrayList<Repository>> { netManager.isConnectedToInternet?.let { if (it) { return remoteDataSource.getRepositories().flatMap { return@flatMap localDataSource.saveRepositories(it) .toSingleDefault(it) .toObservable() } } } return localDataSource.getRepositories() }
使用了.flatMap
,一旦remoteDataSource.getRepositories()
发射数据,该项目将被映射到发出相同项目的新Observable。我们从Completable创建的新Observable发射的相同项目保存在本地数据存储中,并且将其转换为发出相同发射项的Single。因为我们需要返回Observable,所以我们必须将Single转换为Observable。
很疯狂,huh? 想象一下RxJava还能为我们做些啥!
RxJava是一个非常有用的工具,去使用它,探索它,我相信你会爱上它的!
以上就是本文得全部内容,下一篇文章将是本系列的最后一篇文章,敬请期待!
本系列已更新完毕:
【译】使用Kotlin从零开始写一个现代Android 项目-Part1
【译】使用Kotlin从零开始写一个现代Android 项目-Part2
【译】使用Kotlin从零开始写一个现代Android 项目-Part3
【译】使用Kotlin从零开始写一个现代Android 项目-Part4
文章首发于公众号:「 技术最TOP 」
,每天都有干货文章持续更新,可以微信搜索「 技术最TOP 」
第一时间阅读,回复【思维导图】【面试】【简历】有我准备一些Android进阶路线、面试指导和简历模板送给你