在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别:
fun main() = runBlocking { (1..5).asFlow() .transform { emit(it * 2) delay(100) emit(it * 4) } .collect { println(it) } }
transform 也可以使用 emit 发射任意值:
fun main() = runBlocking { (1..5).asFlow() .transform { emit(it * 2) delay(100) emit("emit $it") } .collect { println(it) } }
take 操作符只取前几个 emit 发射的值。
fun main() = runBlocking { (1..5).asFlow() .take(2) .collect { println(it) } }
在 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文最后,我整理了 Flow 相关的 Terminal 操作符。本文介绍 reduce 和 fold 两个操作符。
类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。
例如,对平方数列求和:
fun main() = runBlocking { val sum = (1..5).asFlow() .map { it * it } .reduce { a, b -> a + b } println(sum) }
例如,计算阶乘:
fun main() = runBlocking { val sum = (1..5).asFlow().reduce { a, b -> a * b } println(sum) }
也类似于 Kotlin 集合中的 fold 函数,fold 也需要设置初始值。
fun main() = runBlocking { val sum = (1..5).asFlow() .map { it * it } .fold(0) { a, b -> a + b } println(sum) }
在上述代码中,初始值为0就类似于使用 reduce 函数实现对平方数列求和。
而对于计算阶乘:
fun main() = runBlocking { val sum = (1..5).asFlow().fold(1) { a, b -> a * b } println(sum) }
初始值为1就类似于使用 reduce 函数实现计算阶乘。
zip 是可以将2个 flow 进行合并的操作符。
fun main() = runBlocking { val flowA = (1..5).asFlow() val flowB = flowOf("one", "two", "three","four","five") flowA.zip(flowB) { a, b -> "$a and $b" } .collect { println(it) } }
执行结果:
1 and one 2 and two 3 and three 4 and four 5 and five
zip 操作符会把 flowA 中的一个 item 和 flowB 中对应的一个 item 进行合并。即使 flowB 中的每一个 item 都使用了 delay() 函数,在合并过程中也会等待 delay() 执行完后再进行合并。
fun main() = runBlocking { val flowA = (1..5).asFlow() val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) } val time = measureTimeMillis { flowA.zip(flowB) { a, b -> "$a and $b" } .collect { println(it) } } println("Cost $time ms") }
执行结果:
1 and one 2 and two 3 and three 4 and four 5 and five Cost 561 ms
如果 flowA 中 item 个数大于 flowB 中 item 个数:
fun main() = runBlocking { val flowA = (1..6).asFlow() val flowB = flowOf("one", "two", "three","four","five") flowA.zip(flowB) { a, b -> "$a and $b" } .collect { println(it) } }
执行合并后新的 flow 的 item 个数 = 较小的 flow 的 item 个数。
执行结果:
1 and one 2 and two 3 and three 4 and four 5 and five
combine 虽然也是合并,但是跟 zip 不太一样。
使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。
fun main() = runBlocking { val flowA = (1..5).asFlow().onEach { delay(100) } val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) } flowA.combine(flowB) { a, b -> "$a and $b" } .collect { println(it) } }
执行结果:
1 and one 2 and one 3 and one 3 and two 4 and two 5 and two 5 and three 5 and four 5 and five
其实,flattenMerge 不会组合多个 flow ,而是将它们作为单个流执行。
fun main() = runBlocking { val flowA = (1..5).asFlow() val flowB = flowOf("one", "two", "three","four","five") flowOf(flowA,flowB) .flattenConcat() .collect{ println(it) } }
执行结果:
1 2 3 4 5 one two three four five
为了能更清楚地看到 flowA、flowB 作为单个流的执行,对他们稍作改动。
fun main() = runBlocking { val flowA = (1..5).asFlow().onEach { delay(100) } val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) } flowOf(flowA,flowB) .flattenMerge(2) .collect{ println(it) } }
执行结果:
1 one 2 3 two 4 5 three four five
flatMapConcat、flatMapMerge 类似于 RxJava 的 concatMap、flatMap 操作符。
flatMapConcat 由 map、flattenConcat 操作符实现。
@FlowPreview public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenConcat()
在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成。
fun currTime() = System.currentTimeMillis() var start: Long = 0 fun main() = runBlocking { (1..5).asFlow() .onStart { start = currTime() } .onEach { delay(100) } .flatMapConcat { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("$it at ${System.currentTimeMillis() - start} ms from start") } }
执行结果:
1: First at 114 ms from start 1: Second at 619 ms from start 2: First at 719 ms from start 2: Second at 1224 ms from start 3: First at 1330 ms from start 3: Second at 1830 ms from start 4: First at 1932 ms from start 4: Second at 2433 ms from start 5: First at 2538 ms from start 5: Second at 3041 ms from start
flatMapMerge 由 map、flattenMerge 操作符实现。
@FlowPreview public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency)
flatMapMerge 是顺序调用内部代码块,并且并行地执行 collect 函数。
fun currTime() = System.currentTimeMillis() var start: Long = 0 fun main() = runBlocking { (1..5).asFlow() .onStart { start = currTime() } .onEach { delay(100) } .flatMapMerge { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("$it at ${System.currentTimeMillis() - start} ms from start") } }
执行结果:
1: First at 116 ms from start 2: First at 216 ms from start 3: First at 319 ms from start 4: First at 422 ms from start 5: First at 525 ms from start 1: Second at 618 ms from start 2: Second at 719 ms from start 3: Second at 822 ms from start 4: Second at 924 ms from start 5: Second at 1030 ms from start
flatMapMerge 操作符有一个参数 concurrency ,它默认使用DEFAULT_CONCURRENCY
,如果想更直观地了解 flatMapMerge 的并行,可以对这个参数进行修改。例如改成2,就会发现不一样的执行结果。
当发射了新值之后,上个 flow 就会被取消。
fun currTime() = System.currentTimeMillis() var start: Long = 0 fun main() = runBlocking { (1..5).asFlow() .onStart { start = currTime() } .onEach { delay(100) } .flatMapLatest { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("$it at ${System.currentTimeMillis() - start} ms from start") } }
执行结果:
1: First at 114 ms from start 2: First at 220 ms from start 3: First at 321 ms from start 4: First at 422 ms from start 5: First at 524 ms from start 5: Second at 1024 ms from start
由于 Kotlin 语言自身对多平台的支持,使得 Flow 也可以在多平台上使用。
Flow 仍然属于响应式范畴。开发者通过 kotlinx-coroutines-reactive 模块中 Flow.asPublisher() 和 Publisher.asFlow() ,可以方便地将 Flow 跟 Reactive Streams 进行互操作。
该系列的相关文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 异常处理
Kotlin Coroutines Flow 系列(四) 线程操作