相对于 RxJava 多线程的学习曲线,Flow 对线程的切换友好地多。
在之前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾经介绍过 Flow 的切换线程,以及 flowOn 操作符。
Flow 只需使用 flowOn 操作符,而不必像 RxJava 需要去深入理解 observeOn、subscribeOn 之间的区别。
RxJava 的 observeOn 操作符,接收一个 Scheduler 参数,用来指定下游操作运行在特定的线程调度器 Scheduler 上。
Flow 的 flowOn 操作符,接收一个 CoroutineContext 参数,影响的是上游的操作。
例如:
fun main() = runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .collect { println("${Thread.currentThread().name}: $it") } }
flow builder 和 map 操作符都会受到flowOn
的影响,并使用 Dispatchers.io 线程池。
再例如:
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher() fun main() = runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .map { it+1 } .flowOn(customerDispatcher) .collect { println("${Thread.currentThread().name}: $it") } }
flow builder 和两个 map 操作符都会受到两个flowOn
的影响,其中 flow builder 和第一个 map 操作符跟上面的例子一样,第二个 map 操作符会切换到指定的 customerDispatcher 线程池。
在 Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介绍 buffer 操作符对应 RxJava Backpressure 中的 BUFFER 策略。
事实上 buffer 操作符也可以并发地执行任务,它是除了使用 flowOn 操作符之外的另一种方式,只是不能显示地指定 Dispatchers。
例如:
fun main() = runBlocking { val time = measureTimeMillis { flow { for (i in 1..5) { delay(100) emit(i) } } .buffer() .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
执行结果:
1 2 3 4 5 Collected in 1676 ms
在上述例子中,所有的 delay 所花费的时间是2000ms。然而通过 buffer 操作符并发
地执行 emit,再顺序地执行 collect 函数后,所花费的时间在 1700ms 左右。
如果去掉 buffer 操作符。
fun main() = runBlocking { val time = measureTimeMillis { flow { for (i in 1..5) { delay(100) emit(i) } } .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
执行结果:
1 2 3 4 5 Collected in 2039 ms
所花费的时间比刚才多了300多ms。
在讲解并行操作之前,先来了解一下并发和并行的区别。
并发(concurrency):是指一个处理器同时处理多个任务。
并行(parallelism):是多个处理器或者是多核的处理器同时处理多个不同的任务。并行是同时发生的多个并发事件,具有并发的含义,而并发则不一定是并行。
RxJava 可以借助 flatMap 操作符实现并行,亦可以使用 ParallelFlowable 类实现并行操作。
下面,以 flatMap 操作符为例实现 RxJava 的并行:
Observable.range(1,100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<String>() { @Override public void accept(String str) throws Exception { System.out.println(str); } });
Flow 也有相应的操作符 flatMapMerge 可以实现并行。
fun main() = runBlocking { val result = arrayListOf<Int>() for (index in 1..100){ result.add(index) } result.asFlow() .flatMapMerge { flow { emit(it) } .flowOn(Dispatchers.IO) } .collect { println("$it") } }
总体而言,Flow 相比于 RxJava 更加简洁一些。
该系列的相关文章: