原文地址:Kotlin Coroutines 幕後那一兩件事

【译文】扒一扒Kotlin协程的幕后实现

前言

假如你能看完本文并把一切内容都弄懂,你对协程的理解也现已超越大部分人了。

Coroutines是近几年在Kotlin上Google主推的异步问题处理方案,至少在Android R Asynctask被抛弃后,翻开Android Document看到最显目的提示项目便是扶引你至Coroutine的页面教训你怎样运用Coroutine。

Emm….那假如把一切问题简略化,其实大多数碰上异步问题,处理的办法基本上都是callback。

fun longTimeProcess(parameter1 : String, callBack:CallBack<String>){
    val result = ""
    //Do something long time
    callBack.onResponse(result)
}

其实咱们一般履行回调,本质上都是callback,所以许多异步处理方案,追根溯源,会发现他的完结办法仍旧是callback。

不过callback的运用情境、context还有许许多多的用法状况都不同,全体概念也会有收支,所以咱们会需求更多的名词来代表这样的状况,因此延伸出更多样的词汇,不过这段就题外话了。

话说回来,上面那段简易的callback,换作是Coroutine会变成是这样:

suspend fun longTimeProcess(parameter1:String):String{
val result =“”
//Do something long time
return result
}

这样写的优点是能够不必自己操控Thread的运用,上面的代码假如直接在主线程履行,或许会形成主线程卡顿,超越5秒喷Exception直接让Process out,所以还会需求额定自己开thread + handler或是运用Rxjava之类第三方套件去处理。换作是Coroutine,运用起来就简略许多了,被suspend修饰的函数longTimeProcess,有自己的效果域(Scope),用scope launch里头履行该function,使用这个function回传的数据做该在main thread上处理的工作,问题处理,便是如此的简略。

那问题来了。

Coroutine究竟是怎样运作的?究竟是甚么神奇的魔法让他能够这么的方便能够不必写那么多东西呢?

记得某次面试里有说到这个问题,但我只知道他是个有限状况机,然后就…

【译文】扒一扒Kotlin协程的幕后实现

恩,我那时的表情应该跟King Crimson有那么几分神似便是了。

Coroutine概念

维基百科上其实有解说了Coroutine的实作概念:

var q := new queue
coroutine produce
    loop
        while q is not full
            create some new items
            add the items to q
        yield to consume
coroutine consume
    loop
        while q is not empty
            remove some items from q
            use the items
        yield to produce

概念是,这有个queue是空的,那是先跑coroutine product仍是coroutine consume其实无所谓,总之随意跑一个,先从coroutine product开端好了。

coroutine produce在queue没满时,会发生一些items,然后参加queue里头,直到queue满停止,接着把程序让给coroutine consume。

coroutine consume在queue不是空的时分,会移除(消费)一些items,直到queue空停止,接着把程序让给coroutine produce,如此重复,这个世界的经济得以保持。

那这边能够看出,当coroutine produce碰到queue是满的时分会直接把程序让给coroutine consume;相对的,若coroutine consume在碰到queue是空的时分,会直接把程序让给coroutine produce

那么,以Kotlin Coroutine来说,queue的是空是满的条件会变成是method的状况是否suspend,那由于上面这个程序很明显会是无限循环,多数咱们在开发时会不需求无限的循环,那怎样样才能让这种来回传接球的形式有个终点呢?

答案便是有限状况机,接下来这篇文章会慢慢地解说。

有这么个东西叫做 Continuation

许多时分,本来很费事的工作突然变得简略了,其实不是什么都不必做,而是工作有人帮你做了,Coroutine也是,它帮你把写一堆callback的费事事给做掉了。

等等,Compiler把写一堆的callback的费事事给做掉了,那意思是…

没错,Coroutine本质上仍是callback,仅仅编译器帮你写了。

我本来是想说从CoroutineScope.Launch下去追的,追到IntrinsicsJvm,这东西叫Intrinsic这东西有很大的机率是给编译器用的,追到这儿,大约就能够知道,suspend fun会在编译的过程转成Continuation.

但后来换个方向去想,其实也不必这么费事,由于Kotlin是能够给Java呼叫的,那Java比较少这种语法糖转译的东西,也便是说,透过Java呼叫suspend fun,就能够知道suspend fun真实的模样。

这边先随意写一个suspend fun。

suspend fun getUserDescription(name:String,id:Int):String{
    return ""
}

在 Java 中调用的時候是如下这样:

instance.getUserDescription("name", 0, new Continuation<String>() {
    @NotNull
    @Override
    public CoroutineContext getContext() {
        return null;
    }
    @Override
    public void resumeWith(@NotNull Object o) {
    }
});
return 0;

咱们能够看到,其实suspend fun便是一般的function后头加上一个Continuation

总之得到一个头绪,这个头绪便是Continuation,它是个什么玩意呢?

它是一个 interface

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

它代表的是CoroutinerunBlocksuspend状况中,要被唤醒的callback

那注意这边说到状况了,大伙都知道Coroutine会是个状况机,那具体是怎样个状况呢?这个稍后提。

那假如硬要在java file里头运用GlobalScope.launch,那会长成这样:

BuildersKt.launch(GlobalScope.INSTANCE,
        Dispatchers.getMain(),//context to be ran on
        CoroutineStart.DEFAULT,
        new Function2<CoroutineScope,Continuation<? super Unit>,String>() {
            @Override
            public String invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
                return "";
            }
        }
);

这样就行了吗?这样如同没啥效果最后会回一个空字串便是了,但这儿就会发现,假如用lanuch会需求用到一个Function去传递一个continuation。这样看仍是蒙,不要紧,咱们持续看下去。

Continuation究竟怎样运转?

那这边简略用一个suspend:

fun main() {
    GlobalScope.launch {
        val text = suspendFunction("text")
        println(text) // print after delay
    }
}
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
    val result = doSomethingLongTimeProcess(text)
    result
}

Kotlin Bytecodedecompile 會得到這個:

public static final void main() {
   BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
      private CoroutineScope p$;
      Object L$0;
      int label;
      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         Object var10000;
         CoroutineScope $this$launch;
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            $this$launch = this.p$;
            this.L$0 = $this$launch;
            this.label = 1;
            var10000 = CoroutineTestKt.suspendFunction("text", this);
            if (var10000 == var5) {
               return var5;
            }
            break;
         case 1:
            $this$launch = (CoroutineScope)this.L$0;
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
         String text = (String)var10000;
         boolean var4 = false;
         System.out.println(text);
         return Unit.INSTANCE;
      }
      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         var3.p$ = (CoroutineScope)value;
         return var3;
      }
      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), 3, (Object)null);
} 

别的一个是 suspendFunctiondecompile code

public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
   return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
      private CoroutineScope p$;
      int label;
      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            CoroutineScope $this$withContext = this.p$;
            String result = CoroutineTestKt.doSomethingLongTimeProcess(text);
            return result;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
      }
      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         var3.p$ = (CoroutineScope)value;
         return var3;
      }
      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), $completion);
}

字节码反编译成 Java 这种事,咱们干过许多次了。跟往常不同的是,这次我不会直接贴反编译后的代码,由于假如我直接贴出反编译后的 Java 代码,估计会吓退一大波人。协程反编译后的代码,逻辑真实是太绕了,可读性真实太差了。不要紧,咱们直接梳理解说一下流程。

反编译代码中咱们看到一个 switch(this.label) , 这便是大名鼎鼎的 Coroutine状况机了,Kotlin编译器会在编译时发生一个label,这个label便是runBlock里边履行到第几段的状况了。

那具领会有几个状况呢?其真实runBlock里边有几个suspend就会对应有几个状况机,举个例子:

GlobalScope.launch {
        test()
        test()
        test()
        test()
}
fun test(){}

如上代码会有几个呢?

答案是一个,因為這 test() 不是挂起函数(suspend function),它不需求挂起操作(suspended)。

假如换成是这样?

GlobalScope.launch {
        test()
        test()
        test()
        test()
}
suspend fun test(){}

答案是五个。

GlobalScope.launch {
        // case 0
        test() // case 1 receive result
        test() // case 2 receive result
        test() // case 3 receive result
        test() // case 4 receive result
}

因為四个 test() 都有或许获得 suspended 的状况,所以需求五个履行状况的,case 0 用于初始化case 1– 4 用于成果获取。

那状况何时会改动呢?

答案是:invokeSuspend 履行时。

label34: {
   label33: {
      var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure($result);
         $this$launch = this.p$;
         this.L$0 = $this$launch;
         this.label = 1;
         if (CoroutineTestKt.test(this) == var3) {
            return var3;
         }
         break;
      case 1://...ignore
         break;
      case 2://...ignore
         break label33;
      case 3://...ignore
         break label34;
      case 4://...ignore
         return Unit.INSTANCE;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      this.L$0 = $this$launch;
      this.label = 2;
      if (CoroutineTestKt.test(this) == var3) {
         return var3;
      }
   }
   this.L$0 = $this$launch;
   this.label = 3;
   if (CoroutineTestKt.test(this) == var3) {
      return var3;
   }
}
this.L$0 = $this$launch;
this.label = 4;
if (CoroutineTestKt.test(this) == var3) {
   return var3;
} else {
   return Unit.INSTANCE;
}

这部分比较有意思的地方是,这些状况还有 call method 的都不在 switch case 里边,这其实跟 Bytecode 有关,首要是由于这个成果是 反编译 出來的东西,所以会是这样的叠加办法。

咱们能够看到,在状况机改动时:

Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//...ignore
    this.label = 1;
    if (CoroutineTestKt.test(this) == var3) {
       return var3;
    }

依据上述代码能够看出, 编译器内部有一个函数IntrinsicsKt.getCOROUTINE_SUSPENDED() 该函数代表当时的状况是否挂起。假如它回传的是 getCOROUTINE_SUSPENDED,代表这个 function 处在 挂起(suspended)的状况,意味着它或许当时正在进行耗时操作。这时分直接回来 挂起状况,等候下一次被 调用(invoke)

那什么时分会再一次被 调用(invoke) 呢?

这时分就要看传入到该挂起函数的的 Continuation ,這裡能够觀察一下 BaseContinuationImplresumeWith 的操作:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
 public final override fun resumeWith(result: Result<Any?>) {
    var current = this
    var param = result
    while (true) {
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!!
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted()
            if (completion is BaseContinuationImpl) {
                current = completion
                param = outcome
            } else {
                completion.resumeWith(outcome)
                return
            }
        }
    }
 }
//...ignore
}

原则上 resumeWith 在一开端 Coroutine 被创立时就会履行(所以需求 case 0 做初始化),能够看到 invokeSuspend会被履行到。(probeCoroutineResumed 那個看起來是 debug 用的請無視),经过履行 invokeSuspend 开端履行状况机,假如该 continuation 的状况是挂起,就会履行return,从头履行 invokeSuspend,等下一次被唤醒,再次被唤醒后,持续履行,直到得到成果,而且将成果经过 continuation((name in completion)resumeWith回来成果,完毕此次履行,接着持续履行挂起函数的的 invokeSuspend ,如此重复直至终究完毕。

到這裡,我們知道了, suspend标记的函数内部是经过状况机才完结的挂起恢复的,而且使用状况机来记录Coroutine履行的状况

履行挂起函数时能够得到它的状况为:getCOROUTINE_SUSPENDED

不过又有问题来了,当挂起函数判别条件为:getCOROUTINE_SUSPENDED时履行了 return,代表它现已完毕了,那它怎样能持续履行呢?而且还有办法在履行完后通知协程。

这儿咱们拿一段代码来看看:

suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
    val result = doSomethingLongTimeProcess(text) 
    result //result 是個 String
}

它 decompile 後:

public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
   return BuildersKt.withContext(
(CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
//...ignore
   }), $completion);
}

会发现,该函数 return 的不是 String而是一个Object,那这个Object是什么呢?其实便是COROUTINE_SUSPENDED

要证明这点其实很简略,如下代码,调用该 suspendFunction 就能够了

Object text = instance.suspendFunction("", new Continuation<String>() {
    @NotNull
    @Override
    public CoroutineContext getContext() {
        return Dispatchers.getMain();
    }
    @Override
    public void resumeWith(@NotNull Object o) {
    }
});
System.out.println(text);

結果:

COROUTINE_SUSPENDED
Process finished with exit code 0

PS:假如该函数时一个普通函数,没有标记suspend 则会直接回来成果。

依据上边咱们这么多的剖析,咱们能够解说那段代码了。

fun main() {
    GlobalScope.launch {
        val text = suspendFunction("text")
        println(text) // print after delay
    }
}
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
    val result = doSomethingLongTimeProcess(text)
    result
}

首先,Kotlin编译器会把 main() 里边的代码反编译生成一个Continuation,而 launch block 的部分生成一個有限的状况机,并包装进 Continuation 里边那个叫 invokeSuspend(result) 的办法里头,并做为初度 resumeWith

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

invokeSuspend(result) 会在该 ContinuationresumeWith 履行的时分履行。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

第一次履行 invokeSuspend(result) 的时分,会履行到 suspendFunction(String),并传入包裝好的 Continuation

Continuation { // suspendFunction(text)
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val text = doSomethingLongTimeProcess(context)
                return 後執行 continuation.resultWith(text)
            }
        }
    }
}

suspendFunction 自己自身也是一個挂起函数,所以它也会包裝成一个 Continuation (但这边就单纯许多,尽管也会生成状况机,但其实便是直接跑doSomethingLongTimeProcess())。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

由于会进行耗时操作,所以直接回传COROUTINE_SUSPENDED,让原先履行该挂起函数的Threadreturn 并履行其他东西,而 suspendFunction则在另一条 Thread上把耗时使命完结。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

等候 suspendFunction 的耗时使命完结后,使用传入的 ContinuationresumeWith 把成果传入,这个动作会履行到挂起函数的invokeSuspend(result),并传入成果,该动作就能让挂起函数得到suspendFunction(String)的成果。

PS:上面那段代码实际上是伪代码,实际事务会比这复杂的多

所以事实上,挂起函数便是我把我的 callback 給你,等你完毕后再用我之前给你的 callback 回调给我,你把你的 callback 給我,等我完毕后我用之前你给我的 callback 通知你。

挂起函数时怎么自行切换线程的?

原则上,挂起函数在履行时,就会决议好要用哪个 Dispatcher,然后就会树立挂起点,一般状况下,会走到 startCoroutineCancellable,然后履行createCoroutineUnintercepted,也便是上面说到的:resumeWithinvokeSuspend

咱们进入到startCoroutineCancellable内部再看看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

createCoroutineUnintercepted 最后会产出一个 Continuation ,而resumeCancellableWith 其实便是咱们前面说到的初始化操作, 這行会去履行状况机 case 0

至于 intercepted() ,究竟要阻拦啥,其实便是把生成的 Continuation 阻拦给指定的 ContinuationInterceptor (这东西包裝在 CoroutineContext 里边,原则上在指定 Dispatcher 的时分就现已树立好了)

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

这儿能够注意到 interceptContinuation(Continuation) ,能够用他追下去,发现他是 ContinuationInterceptor 的办法 ,再追下去能够发现CoroutineDispatcher 继承了他:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

能够发现该动作发生了一个 DispatchedContinuation,看看 DispatchedContinuation ,能够注意到刚才有说到的 resumeCancellableWith

inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

原则上便是使用 dispatcher 来決定需不需求 dispatch,沒有就直接履行了 resumeUndispatchedWith

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
    withCoroutineContext(context, countOrElement) {
        continuation.resumeWith(result)
    }
}

其实便是直接跑 continuationresumeWith

那回头看一下,其实就能够发现是 CoroutineDispatcher 决议要用什么 Thread 了。

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    @InternalCoroutinesApi
    public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    }
}

其实知道这个东西后,就能够向下去找它的 Child ,就能找到 HandlerDispatcher 了。

isDispatchNeeded 便是说是否需求切换线程

dispatch 则是切换线程的操作

能够看到这两个办法在 HandlerDispatcher 的履行:

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

能够看到CoroutineContext底子没有用到。

为什么呢?其实原因首要是: 挂起函数是规划给 Kotlin 用的,并不是专门规划给 Android用的,所以 Android 要用的话,仍是需求完结 CoroutineDispatcher 的部分,这实际上是两个系统的东西。那 CoroutineDispatcherdispatch 有供给 CoroutineContext,但不见的 Android 这边会用到,所以就有这个情況了。

其他诸如 Dispatcher.Default ,他用到了 线程池(Executor)Dispatcher.IO 则是用到了一个叫 工作队列(WorkQueue) 的东西。

所以每一个 Dispatcher 都有自己的一套完结,目前有供给四种 Dispatcher