关于协程,我在网上看到最多的说法是协程是轻量级的线程。那么协程首先应该解决的问题就是程序中我们常常遇到的 “异步” 的问题。我们看看官网介绍的几个使用例子。
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3' implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3' 复制代码
import kotlinx.coroutines.* fun main() { GlobalScope.launch { // 在后台启动一个新的协程并继续 delay(1000L) println("World!") } println("Hello,") // 主线程中的代码会立即执行 runBlocking { // 但是这个表达式阻塞了主线程 delay(2000L) // ……我们延迟 2 秒来保证 JVM 的存活 } } 复制代码
suspend fun doSomethingUsefulOne(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // 假设我们在这里也做了一些有用的事 return 29 } val time = measureTimeMillis { val one = doSomethingUsefulOne() val two = doSomethingUsefulTwo() println("The answer is ${one + two}") } println("Completed in $time ms") 复制代码
The answer is 42 Completed in 2015 ms 复制代码
val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") 复制代码
The answer is 42 Completed in 1017 ms 复制代码
class MyTest { @Test fun testMySuspendingFunction() = runBlocking<Unit> { // 这里我们可以使用任何喜欢的断言风格来使用挂起函数 } } 复制代码
Thread(Runnable { try { val qrCode: Bitmap = CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE) runOnUiThread { img_qr_code.setImageBitmap(qrCode) } } catch (e: WriterException) { e.printStackTrace() } }).start() } 复制代码
Executors.newSingleThreadExecutor().execute { try { val qrCode: Bitmap = CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE) runOnUiThread { img_qr_code.setImageBitmap(qrCode) } } catch (e: WriterException) { e.printStackTrace() } } 复制代码
Observable.just(SHARE_QR_CODE) .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { return CodeCreator.createQRCode(ShareActivity.this, s); } }) .subscribe(new Consumer<Bitmap>() { @Override public void accept(Bitmap bitmap) throws Exception { img_qr_code.setImageBitmap(bitmap); } }); 复制代码
val job = GlobalScope.launch(Dispatchers.IO) { val bitmap = CodeCreator.createQRCode(ShareActivity.this, SHARE_QR_CODE) launch(Dispatchers.Main) { img_qr_code.setImageBitmap(bitmap) } } } 复制代码
通过这个例子,可以看出使用协程的非常方便解决 "异步回调" 问题。 相比传统的Thread及Excutors,RxJava将嵌套回调转换成链式调用的形式,提高了代码可读性。协程直接将链式调用转换成了协程内的顺序调用,"代码更加精简"。
启动了 1000个协程,并且为每个协程都输出一个点
var startTime = System.currentTimeMillis() repeat(times) { i -> // 启动大量的协程 GlobalScope.launch(Dispatchers.IO) { Log.d(this@MainActivity.toString(), "$i=.") } } var endTime = System.currentTimeMillis() - startTime; Log.d(this@MainActivity.toString(), "endTime=$endTime") 复制代码
执行结果:endTime=239 ms
var startTime = System.currentTimeMillis() repeat(times) { i ->// 启动大量的线程 Thread(Runnable { Log.d(this@MainActivity.toString(), "$i=.") }).start() } var endTime = System.currentTimeMillis() - startTime; 复制代码
执行结果:endTime=3161 ms
var startTime = System.currentTimeMillis() var executors = Executors.newCachedThreadPool() repeat(times) { i -> // 使用线程池 executors.execute { Log.d(this@MainActivity.toString(), "$i=.") } } var endTime = System.currentTimeMillis() - startTime; Log.d(this@MainActivity.toString(), "endTime=$endTime") 复制代码
执行结果:endTime=143 ms
var startTime = System.currentTimeMillis() repeat(times) { i -> // 启动Rxjava Observable.just("").subscribeOn(Schedulers.io()) .subscribe { Log.d(this@MainActivity.toString(), "$i=.") } } var endTime = System.currentTimeMillis() - startTime; Log.d(this@MainActivity.toString(), "endTime=$endTime") 复制代码
执行结果:endTime=241 ms
GlobalScope.launch(Dispatchers.IO) { print("hello world") } 复制代码
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } 复制代码
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) } 复制代码
@InternalCoroutinesApi public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) = when (this) { CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion) CoroutineStart.ATOMIC -> block.startCoroutine(completion) CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion) CoroutineStart.LAZY -> Unit // will start lazily } 复制代码
@InternalCoroutinesApi public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } 复制代码
@InternalCoroutinesApi public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } 复制代码
inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWith(result) } } } } 复制代码
dispatcher.dispatch(context, this) 复制代码
我们看 DefaultScheduler.IO最后的dispatch方法:
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { DefaultExecutor.dispatch(context, block) } 复制代码
internal inner class Worker private constructor() : Thread() 复制代码
suspend fun hello(){ delay(100) print("hello world") } 复制代码
通过Kotlin Bytecode转换为java 代码如下:
@Nullable public final Object hello(@NotNull Continuation $completion) { Object $continuation; label20: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) { ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl($completion) { // $FF: synthetic field Object result; int label; Object L$0; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return Test.this.hello(this); } }; } Object $result = ((<undefinedtype>)$continuation).result; Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(((<undefinedtype>)$continuation).label) { case 0: ResultKt.throwOnFailure($result); ((<undefinedtype>)$continuation).L$0 = this; ((<undefinedtype>)$continuation).label = 1; if (DelayKt.delay(100L, (Continuation)$continuation) == var6) { return var6; } break; case 1: Test var7 = (Test)((<undefinedtype>)$continuation).L$0; ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } String var2 = "hello world"; boolean var3 = false; System.out.print(var2); return Unit.INSTANCE; } 复制代码
这里首先我们发现方法的参数多了一个Continuation completion并且内部回定义一个 Object continuation,看看Continuation的定义。
@SinceKotlin("1.3") public interface Continuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result: Result<T>) } 复制代码
这是一个回调接口,里面有一个关键的方法为resumeWith。 这个方法的具体调用通过上面的协程调用流程可以知道 ,在DispatchedContinuation的resumeCancellableWith会触发。
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } 复制代码
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } } 复制代码