你会怎么实现下面这个场景?应用首页有三个优先级从高到低的弹窗,展示内容依赖网络请求,若低优先级弹窗请求先返回则需等待,让高优先级先展示。
串行请求是最容易想到的解决方案,即先请求最高优先级的弹窗,当它返回展示后再请求第二优先级弹窗。但这样会拉长所有弹窗的展示时间。
性能更好的方案是同时并行发出三个请求,但网络请求时长的不确定性使得最高优先级的弹窗不一定优先返回,所以得设计一种优先级阻塞机制。本文使用 协程 + 链式队列 实现这个机制。
把单个异步任务进行抽象:
// 单个异步任务 class Item { companion object { // 默认异步优先级 const val PRIORITY_DEFAULT = 0 } // 异步操作:耗时的异步操作 var suspendAction: (suspend () -> Any?)? = null set(value) { field = value value?.let { // 启动协程执行异步操作 GlobalScope.launch { deferred = async { it.invoke() } } } } // 异步响应:异步操作结束后要做的事情 var resumeAction: ((Any?) -> Unit)? = null // 异步结果:异步操作返回值 var deferred: Deferred<*>? = null // 异步优先级 var priority: Int = PRIORITY_DEFAULT } 复制代码
异步任务有三个主要的属性,分别是异步操作suspendAction
、异步响应resumeAction
、异步结果deferred
。每当异步操作被赋值时,就启动协程执行它,并将其返回值保存在deferred
中。
为了确保异步任务的优先级,把多个异步任务用链的方式串起来,组成异步任务链:
class Item { companion object { const val PRIORITY_DEFAULT = 0 } var suspendAction: (suspend () -> Any?)? = null set(value) { field = value value?.let { GlobalScope.launch { deferred = async { it.invoke() } } } } var resumeAction: ((Any?) -> Unit)? = null var deferred: Deferred<*>? = null var priority: Int = PRIORITY_DEFAULT // 异步任务前结点(Item 包含 Item) internal var next: Item? = null // 异步任务后结点(Item 包含 Item) internal var pre: Item? = null // 在当前结点后插入新结点 internal fun addNext(item: Item) { next?.let { it.pre = item item.next = it item.pre = this this.next = item } ?: also { // 尾结点插入 this.next = item item.pre = this item.next = null } } // 在当前结点前插入新结点 internal fun addPre(item: Item) { pre?.let { it.next = item item.pre = it item.next = this this.pre = item } ?: also { // 头结点插入 item.next = this item.pre = null this.pre = item } } } 复制代码
用 自己包含自己 的方式就能实现链式结构。Android 消息列表也用同样的结构:
public final class Message implements Parcelable { Message next; ... } 复制代码
链必须有个头结点,存放头结点的类就是存放整个链的类,就好像消息列表MessageQueue
一样:
public final class MessageQueue { Message mMessages; } 复制代码
模仿消息列表,写一个异步任务链:
// 异步任务链 class SuspendList { // 头结点 private var head: Item = emptyItem() // 向异步任务链插入结点 fun add(item: Item) { // 从头结点向后查找插入位置,找到再后插入 head.findItem(item.priority).addNext(item) } // 根据优先级向后查找插入位置(优先级升序) private fun Item.findItem(priority: Int): Item { // 当前结点 var p: Item? = this // 当前结点的后续结点 var next: Item? = p?.next // 从当前结点开始向后遍历异步任务链 while (next != null) { // 若优先级介于当前结点和其后续结点之间,则表示找到插入位置 if (priority in p!!.priority until next.priority) { break } p = p.next next = p?.next } return p!! } // 观察异步任务链并按优先级阻塞 fun observe() = GlobalScope.launch(Dispatchers.Main) { // 从头结点向后遍历异步任务链 var p: Item? = head.next while (p != null) { // 在每个异步结果上阻塞,直到异步任务完成后执行异步响应 p.resumeAction?.invoke(p.deferred?.await()) p = p.next } } // 异步任务(已讲解不再赘述) class Item { ... } } // 空结点(头结点) fun emptyItem(): SuspendList.Item = SuspendList.Item().apply { priority = -1 } 复制代码
SuspendList
持有链的头结点,为了使“头插入”和“中间插入”复用一套代码,将头结点设置为“空结点”。
异步任务链上的任务按优先级升序排列(优先级数字越小优先级越高)。这保证了优先级最高的异步任务总是在链表头。
优先级阻塞:当所有异步任务都被添加到链之后,调用observe()
观察整个异步任务链。该方法启动了一个协程,在协程中从头结点向后遍历链,并在每个异步任务的Deferred
上阻塞。因为链表已按优先级排序,所以阻塞时也是按优先级从高到低进行的。
真实业务场景中,需要统一安排优先级的异步任务可能是跨界面的。这就要求异步任务链能全局访问,单例是一个最直接的选择,但它限制了整个 App 中异步任务链的数量:
// 私有构造函数 class SuspendList private constructor() { companion object { // 静态 map 存放所有异步任务链 var map = ArrayMap<String, SuspendList>() // 根据 key 构建异步任务链 fun of(key: String): SuspendList = map[key] ?: let { val p = SuspendList() map[key] = p p } } ... } 复制代码
然后就可以像这样使用异步任务链:
// 构建异步任务链 SuspendList.of("dialog").apply { // 向链添加异步任务1 add(Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 }) // 向链添加异步任务2 add(Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 }) }.observe() suspend fun fetchUser(): String { delay(4000) return "User Taylor" } suspend fun fetchActivity(): String { delay(5000) return "Activity Bonus" } private fun onActivityResume(activity: Any?) { Log.v("test", "activity(${activity.toString()}) resume") } private fun onUserResume(user: Any?) { Log.v("test", "user(${user.toString()}) resume") } 复制代码
上述代码构建了一个名为 dialog 的异步任务链,向其中添加了两个异步任务,并按优先级观察它们的结果。
其中Item()
是一个顶层方法,用于构建单个异步任务:
fun Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init) 复制代码
这是构建对象 DSL 的标准写法,详细讲解可以参见这里。
运用 DSL 的思路还可以进一步将构建代码简化成这样:
SuspendList.of("dialog") { Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 } Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 } }.observe() 复制代码
不过需要对原先的of()
和Item()
做一些调整:
// 新增接收者为SuspendList的 lambda 参数,为构建异步任务提供外层环境 fun of(key: String, init: SuspendList.() -> Unit): SuspendList = (map[key] ?: let { val p = SuspendList() map[key] = p p }).apply(init) // 将构建异步任务声明为SuspendList的扩展方法 // 构建异步任务后将其插入到异步任务链中 fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init).also { add(it) } 复制代码
若某个高优先级的异步任务迟迟不能结束,其它任务只能都被阻塞?
得加个超时参数:
class Item { // 为异步操作赋值时,不再立刻构建协程 var suspendAction: (suspend () -> Any?)? = null // 超时时长 var timeout: Long = -1 ... } 复制代码
为单个异步任务添加超时时长参数,还得重构一下异步任务的构建函数:
fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply { // 为异步任务设置各种参数 init() // 启动协程 GlobalScope.launch { // 将异步任务结果包装成 Deferred deferred = async { // 若需要超时机制 if (timeout > 0) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } // 不需要超时机制 else { suspendAction?.invoke() } } } }.also { add(it) } 复制代码
原本在suspendAction
赋值时就立马启动协程,现在将其延后,等所有参数都设置完毕后才启动。这样可以避免“先为 suspendAction 赋值,再为 timeout 赋值”case 下超时无效的 bug。
使用withTimeoutOrNull()
实现超时机制,当超时发生时,业务会从resumeAction
中获得null
。
构建异步任务链时使用了GlobalScope.launch()
启动协程,其创建的协程不符合structured-concurrency
。所以需要手动管理生命周期:
class SuspendList private constructor() { class Item { // 为异步任务新增 Job 属性,指向其对应的协程 internal var job:Job? = null ... } // observer()返回类型为 Job,业务层可以在需要的时候取消它 fun observe() = GlobalScope.launch(Dispatchers.Main) { var p: Item? = head.next while (p != null) { p.resumeAction?.invoke(p.deferred?.await()) // 当异步任务响应被处理后,取消其协程以释放资源 p.job?.cancel() p = p.next } } } fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply { init() // 将该异步任务的协程存储在 job 中 job = GlobalScope.launch { deferred = async { if (timeout > 0) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } else { suspendAction?.invoke() } } } }.also { add(it) } 复制代码
本篇的完整源码可以点击这里
这是该系列的第十二篇,系列文章目录如下:
Kotlin基础 | 白话文转文言文般的Kotlin常识
Kotlin基础 | 望文生义的Kotlin集合操作
Kotlin实战 | 用实战代码更深入地理解预定义扩展函数
Kotlin实战 | 使用DSL构建结构化API去掉冗余的接口方法
Kotlin基础 | 抽象属性的应用场景
Kotlin进阶 | 动画代码太丑,用DSL动画库拯救,像说话一样写代码哟!
Kotlin基础 | 用约定简化相亲
Kotlin基础 | 2 = 12 ?泛型、类委托、重载运算符综合应用
Kotlin实战 | 语法糖,总有一颗甜到你(持续更新)
Kotlin 实战 | 干掉 findViewById 和 Activity 中的业务逻辑
Kotlin基础 | 为什么要这样用协程?
Kotlin 应用 | 用协程控制多个并行异步结果的优先级