Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库。
官方文档给予了一句话简单的介绍:
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
Flow 从文档的介绍来看,它有点类似 RxJava 的 Observable。因为 Observable 也有 Cold 、Hot 之分。
Flow 能够返回多个异步计算的值,例如下面的 flow builder :
flow { for (i in 1..5) { delay(100) emit(i) } }.collect{ println(it) }
其中 Flow 接口,只有一个 collect 函数
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) }
如果熟悉 RxJava 的话,则可以理解为 collect() 对应subscribe()
,而 emit() 对应onNext()
。
除了刚刚展示的 flow builder 可以用于创建 flow,还有其他的几种方式:
flowOf()
flowOf(1,2,3,4,5) .onEach { delay(100) } .collect{ println(it) }
asFlow()
listOf(1, 2, 3, 4, 5).asFlow() .onEach { delay(100) }.collect { println(it) }
channelFlow()
channelFlow { for (i in 1..5) { delay(100) send(i) } }.collect{ println(it) }
最后的 channelFlow builder 跟 flow builder 是有一定差异的。
flow 是 Cold Stream。在没有切换线程的情况下,生产者和消费者是同步非阻塞的。
channel 是 Hot Stream。而 channelFlow 实现了生产者和消费者异步非阻塞模型。
下面的代码,展示了使用 flow builder 的情况,大致花费1秒:
fun main() = runBlocking { val start = System.currentTimeMillis() flow { for (i in 1..5) { delay(100) emit(i) } }.collect{ delay(100) println(it) } print("cost ${System.currentTimeMillis()-start}") }
使用 channelFlow builder 的情况,大致花费700毫秒:
fun main() = runBlocking { val start = System.currentTimeMillis() channelFlow { for (i in 1..5) { delay(100) send(i) } }.collect{ delay(100) println(it) } print("cost ${System.currentTimeMillis()-start}") }
当然,flow 如果切换线程的话,花费的时间也是大致700毫秒,跟使用 channelFlow builder 效果差不多。
fun main() = runBlocking { val start = System.currentTimeMillis() flow { for (i in 1..5) { delay(100) emit(i) } }.flowOn(Dispatchers.IO) .collect { delay(100) println(it) } print("cost ${System.currentTimeMillis() - start}") }
相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn
,下面的例子中,展示了 flow builder 和 map 操作符都会受到 flowOn 的影响。
flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .collect { println(it) }
而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。
例如,下面的代码 collect() 则是在 main 线程:
fun main() = runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .collect { println("${Thread.currentThread().name}: $it") } }
执行结果:
main: 1 main: 4 main: 9 main: 16 main: 25
值得注意的地方,不要使用 withContext() 来切换 flow 的线程。
如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。
fun main() = runBlocking { withTimeoutOrNull(2500) { flow { for (i in 1..5) { delay(1000) emit(i) } }.collect { println(it) } } println("Done") }
执行结果:
1 2 Done
Flow 的 API 有点类似于 Java Stream 的 API。它也同样拥有 Intermediate Operations、Terminal Operations。
Flow 的 Terminal 运算符可以是 suspend 函数,如 collect、single、reduce、toList 等;也可以是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch { collect() // tail-call }
整理一下 Flow 的 Terminal 运算符