Flow 可以使用传统的 try…catch 来捕获异常:
fun main() = runBlocking { flow { emit(1) try { throw RuntimeException() } catch (e: Exception) { e.stackTrace } }.onCompletion { println("Done") } .collect { println(it) } }
另外,也可以使用 catch 操作符来捕获异常。
上一篇文章Flow VS RxJava2曾讲述过 onCompletion 操作符。
但是 onCompletion 不能捕获异常,只能用于判断是否有异常。
fun main() = runBlocking { flow { emit(1) throw RuntimeException() }.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") else println("Done") }.collect { println(it) } }
执行结果:
1 Flow completed exceptionally Exception in thread "main" java.lang.RuntimeException ......
catch 操作符可以捕获来自上游的异常
fun main() = runBlocking { flow { emit(1) throw RuntimeException() } .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") else println("Done") } .catch{ println("catch exception") } .collect { println(it) } }
执行结果:
1 Flow completed exceptionally catch exception
上面的代码如果把 onCompletion、catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游。因此,onCompletion 操作符不再打印"Flow completed exceptionally"
fun main() = runBlocking { flow { emit(1) throw RuntimeException() } .catch{ println("catch exception") } .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") else println("Done") } .collect { println(it) } }
执行结果:
1 catch exception Done
catch 操作符用于实现异常透明化处理。例如在 catch 操作符内,可以使用 throw 再次抛出异常、可以使用 emit() 转换为发射值、可以用于打印或者其他业务逻辑的处理等等。
但是,catch 只是中间操作符不能捕获下游的异常,类似 collect 内的异常。
对于下游的异常,可以多次使用 catch 操作符来解决。
对于 collect 内的异常,除了传统的 try…catch 之外,还可以借助 onEach 操作符。把业务逻辑放到 onEach 操作符内,在 onEach 之后是 catch 操作符,最后是 collect()。
fun main() = runBlocking<Unit> { flow { ...... } .onEach { ...... } .catch { ... } .collect() }
像 RxJava 一样,Flow 也有重试的操作符。
如果上游遇到了异常,并使用了 retry 操作符,则 retry 会让 Flow 最多重试 retries 指定的次数。
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> { require(retries > 0) { "Expected positive amount of retries, but had $retries" } return retryWhen { cause, attempt -> attempt < retries && predicate(cause) } }
例如,下面打印了三次"Emitting 1"、“Emitting 2”,最后两次是通过 retry 操作符打印出来的。
fun main() = runBlocking { (1..5).asFlow().onEach { if (it == 3) throw RuntimeException("Error on $it") }.retry(2) { if (it is RuntimeException) { return@retry true } false } .onEach { println("Emitting $it") } .catch { it.printStackTrace() } .collect() }
执行结果:
Emitting 1 Emitting 2 Emitting 1 Emitting 2 Emitting 1 Emitting 2 java.lang.RuntimeException: Error on 3 ......
retry 操作符最终调用的是 retryWhen 操作符。下面的代码跟刚才的执行结果一致:
fun main() = runBlocking { (1..5).asFlow().onEach { if (it == 3) throw RuntimeException("Error on $it") } .onEach { println("Emitting $it") } .retryWhen { cause, attempt -> attempt < 2 } .catch { it.printStackTrace() } .collect() }
因为 retryWhen 操作符的参数是谓词,当谓词返回 true 时才会进行重试。谓词还接收一个 attempt 作为参数表示尝试的次数,该次数是从0开始的。
RxJava 的 do 操作符能够监听 Observables 的生命周期的各个阶段。
Flow 并没有多那么丰富的操作符来监听其生命周期的各个阶段,目前只有 onStart、onCompletion 来监听 Flow 的创建和结束。
fun main() = runBlocking { (1..5).asFlow().onEach { if (it == 3) throw RuntimeException("Error on $it") } .onStart { println("Starting flow") } .onEach { println("On each $it") } .catch { println("Exception : ${it.message}") } .onCompletion { println("Flow completed") } .collect() }
执行结果:
Starting flow On each 1 On each 2 Flow completed Exception : Error on 3
例举他们的使用场景:
比如,在 Android 开发中使用 Flow 创建网络请求时,通过 onStart 操作符调用 loading 动画以及网络请求结束后通过 onCompletion 操作符取消动画。
再比如,在借助这些操作符做一些日志的打印。
fun <T> Flow<T>.log(opName: String) = onStart { println("Loading $opName") }.onEach { println("Loaded $opName : $it") }.onCompletion { maybeErr -> maybeErr?.let { println("Error $opName: $it") } ?: println("Completed $opName") }
该系列的相关文章: