概述

关于协程的创立,以及挂起和康复,之前有写过一篇文章 Kotlin协程之深入了解协程作业原理 收拾这个流程,最近再看这篇文章的时分,感觉看起来比较费劲,不是说写得有问题,仅仅看起来比较臃肿。假如想再复习这块的常识,或许需求看几遍后才干懂,所以想别的再收拾一篇文章写写协程发动,挂起和康复的原理,适合在读完上篇文章后再看看,这篇文章的目的在于希望读完后能够明晰明了地了解 Kotlin 这部分的原理,进步效率。Kotlin 协程系列:

  • Kotlin协程之根底运用
  • Kotlin协程之深入了解协程作业原理
  • Kotlin协程之协程撤销与反常处理

Kotlin 因为本身灵活的语法和特性,导致有些时分盯梢它的源码时,容易跟着跟着就迷路了,记得我刚开端尝试阅览协程源码的时分,也是头大了一圈,后边 Kotlin 用的看的多了,现在再阅览就显得轻松了不少。

前置常识

在阅览 Kotlin 源码之前,能够先了解一些前置常识。

Function

Function 是 Kotlin 对函数类型的封装,关于函数类型,它会被编译成 FunctionX 系列的类:

// 0 个参数
public interface Function0<out R> : Function<R> {
    public operator fun invoke(): R
}
// 1 个参数
public interface Function1<in P1, out R> : Function<R> {
    public operator fun invoke(p1: P1): R
}
// X 个参数

Kotlin 供给了从 Function0 到 Function22 之间的接口这意味着咱们的 lambda 函数最多能够支撑 22 个参数,别的 Function 接口有一个 invoke 操作符重载,因此咱们能够直接经过 () 调用 lambda 函数:

val sum = { a: Int, b: Int ->
    a + b
}
sum(10, 12)
sum.invoke(10, 12)

编译成 Java 代码后:

Function2 sum = (Function2)null.INSTANCE;
sum.invoke(10, 12);
sum.invoke(10, 12);
// lambda 编译后的类
final class KotlinTest$main$sum$1 extends Lambda implements Function2<Integer, Integer, Integer> {
    public static final KotlinTest$main$sum$1 INSTANCE = new KotlinTest$main$sum$1();
    KotlinTest$main$sum$1() {
        super(2);
    }
    @Override // kotlin.jvm.functions.Function2
    public /* bridge */ /* synthetic */ Integer invoke(Integer num, Integer num2) {
        return invoke(num.intValue(), num2.intValue());
    }
    public final Integer invoke(int a, int b) {
        return Integer.valueOf(a + b);
    }
}

能够看到关于 lambda 函数,在编译后会生成一个完成 Function 接口的类,并在运用 lambda 函数时创立一个单例目标来调用,创立目标的进程是编译器主动生成的代码

而关于协程里的 lambda 代码块,也会为其创立一个目标,它完成 FunctionX 接口,并承继 SuspendLambda 类,不相同的地方在于它会主动增加一个 Continuation 类型的参数。

Continuation Passing Style(CPS)

Continuation Passing Style(续体传递风格): 约定一种编程标准,函数不直接回来成果值,而是在函数终究一个参数位置传入一个 callback 函数参数,并在函数履行完结时经过 callback 来处理成果。回调函数 callback 被称为续体(Continuation),它决议了程序接下来的行为,整个程序的逻辑经过一个个 Continuation 拼接在一起。

Kotlin 协程实质便是运用 CPS 来完成对进程的操控,并处理了 CPS 会产生的问题(如回调阴间,栈空间占用)

  • Kotlin suspend 挂起函数写法与一般函数相同,但编译器会对 suspend 关键字的函数做 CPS 改换,这便是咱们常说的用看起来同步的办法写出异步的代码,消除回调阴间(callback hell)。
  • 别的为了防止栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是运用状况机模型。每两个挂起点之间能够看为一个状况,每次进入状况机时都有一个当时的状况,然后履行该状况对应的代码;假如程序履行结束则回来成果值,否则回来一个特殊值,表明从这个状况退出并等待下次进入。相当于创立了一个可复用的回调,每次都运用这同一个回调,依据不同状况来履行不同的代码。

Continuation

Kotlin 续体有两个接口: Continuation 和 CancellableContinuation, 望文生义 CancellableContinuation 是一个能够撤销的 Continuation。

Continuation 成员

  • val context: CoroutineContext: 当时协程的 CoroutineContext 上下文
  • fun resumeWith(result: Result<T>): 传递 result 康复协程

CancellableContinuation 成员

  • isActive, isCompleted, isCancelled: 表明当时 Continuation 的状况
  • fun cancel(cause: Throwable? = null): 可选经过一个反常 cause 来撤销当时 Continuation 的履行

能够将 Continuation 看成是在挂起点康复后需求履行的代码封装(经过之前的文章能够知道是经过状况机完成的),比方说对如下逻辑:

suspend fun request() = suspendCoroutine<Response> {
    val response = doRequest()
    it.resume(response)
}
fun test() = runBlocking {
    val response = request()
    handle(response)
}

用下面的伪代码简略描述 Continuation 的作业:

// 假装是 Continuation 接口
interface Continuation<T> {
    fun resume(t: T)
}
fun request(continuation: Continuation<Response>) {
    val response = doRequest()
    continuation.resume(response)
}
fun test() {
    request(object :Continuation<Response>{
        override fun resume(response: Response) {
            handle(response)
        }
    })
}

关于 suspend 关键词润饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),能够经过这个 Continuation 续体目标的 resume 办法回来成果值来康复协程的履行

协程创立与发动

SuspendLambda

Kotlin 编译时会将 lambda 协程代码块编译成 SuspendLambda 的子类:

fun main() {
    GlobalScope.launch {
        val id = getId()
        val avatar = getAvatar(id)
        println("${Thread.currentThread().name} - $id - $avatar")
    }
}

对应的字节码能够看到:

final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

SuspendLambda 完成了 Continuation 续体接口,其 resume 办法能够康复协程的履行;别的它将协程体封装成 SuspendLambda 目标,其内以状况机的形式消除回调阴间,并完成逻辑的次序履行

承继联系

- Continuation: 续体,康复协程的履行
    - BaseContinuationImpl: 完成 resumeWith(Result) 办法,操控状况机的履行,定义了 invokeSuspend 抽象办法
        - ContinuationImpl: 增加 intercepted 阻拦器,完成线程调度等
            - SuspendLambda: 封装协程体代码块
                - 协程体代码块生成的子类: 完成 invokeSuspend 办法,其内完成状况机流通逻辑

这下子,是不是就明晰了许多?那咱们接下来看协程是怎样开端发动的。

协程发动流程

CoroutineScope.launch

CoroutineScope.launch 开端盯梢协程发动流程:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // newContext = scope效果域上下文 + context参数上下文 + Dispatchers.Default(未指定则增加)
    val newContext = newCoroutineContext(context)
    // 创立协程目标
    val coroutine = if (start.isLazy) {
        LazyStandaloneCoroutine(newContext, block)
    } else {
        StandaloneCoroutine(newContext, active = true)
    }
    // 发动协程
    coroutine.start(start, coroutine, block)
    return coroutine
}
// 发动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

上面 coroutine.start 的调用涉及到运算符重载,实践上会调到 CoroutineStart.invoke() 办法:

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

咱们能够注意下 completion 参数,它是一个续体 Continuation 类型,此刻传入的实参为 StandaloneCoroutine/LazyStandaloneCoroutine 目标,在协程体的逻辑履行完后会调用到其 resume 办法(CPS),做一些收尾作业,比方说修改状况等

此刻 receiver 和 completion 都是 launch() 中创立的 StandaloneCoroutine 协程目标。接着往下看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
    // 重新创立 SuspendLambda 子类目标
    createCoroutineUnintercepted(receiver, completion)
        // 调用阻拦器逻辑,进行线程调度等
        .intercepted()
        // 真实履行协程逻辑
        .resumeCancellableWith(Result.success(Unit), onCancellation)
}

创立SuspendLambda

看看上面 createCoroutineUnintercepted 中的代码:

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

咱们在前面说过,这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类目标,因此会走上面的 create() 办法,经过 completion 续体参数创立一个新的 SuspendLambda 目标,这是之前说的 协程的三层包装 里的第二层包装,它持有的 completion 目标是第一层封装(AbstractCoroutine)。

所以在协程发动进程中针对一个协程体会创立两个 SuspendLambda 的子类目标:

  1. 调用 launch() 时创立第一个,传入 null 作为参数,作为一个一般的 Function 目标运用
  2. 调用 create() 时创立第二个,传入 completion 续体作为参数
BuildersKt.launch$default(/*...*/ (Function2)(new Function2((Continuation)null))

线程调度

接着调用 SuspendLambda.intercepted() 办法履行阻拦器逻辑,从上下文中获取阻拦器(Dispatcher调度器)阻拦当时 continuation 目标,将其包装成 DispatchedContinuation 类型,这便是协程的第三层包装,封装了线程调度等逻辑,其 continuation 参数便是第二层包装(SuspendLambda)实例。

关于线程调度的详细逻辑,后边再单独写篇文章收拾,此处略过。

发动协程

在经过 SuspendLambda 目标创立了 DispatchedContinuation 续体后,接着履行其 resumeCancellableWith() 办法,详细履行代码不贴出了,终究会调用到 continuation.resumeWith(result) 办法,而这个 continuation 便是之前传入的第二层封装 SuspendLambda 目标,其 resumeWith() 办法在父类 BaseContinuationImpl 中:

// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
    // ...
    val outcome = invokeSuspend(param)
    // ...
}

上面的 invokeSuspend() 是一个抽象办法,它的完成在编译器生成的 SuspendLambda 子类中,详细逻辑是经过状况机来履行协程体中的逻辑,详细见下章解析。

到这里咱们 launch() 里的协程体逻辑就开端真实履行了。

协程挂起与康复

协程的发动,挂起和康复有两个关键办法: invokeSuspend()resumeWith(Result)invokeSuspend() 办法是对协程代码块的封装,内部参加状况机机制将整个逻辑分为多块,分隔点便是每个挂起点。协程发动时会先调用一次 invokeSuspend() 函数触发协程体的开端履行,后边每逢调用到一个挂起函数时,挂起函数会回来 COROUTINE_SUSPENDED 标识,然后 return 停掉 invokeSuspend() 函数的履行,即非堵塞挂起。编译器会为挂起函数主动增加一个 continuation 续体目标参数,表明调用它的那个协程代码块,在该挂起函数履行完结后,就会调用到续体 continuation.resumeWith() 办法来回来成果(或反常),而在 resumeWith() 中又调用了 invokeSuspend() 办法,其内依据状况机的状况来康复协程的履行。这便是整个协程的挂起和康复进程。

接下来看详细解析。

协程的状况机

在之前 协程的状况机 一文里曾经剖析过协程的状况机,并且贴出了对应的 Java 代码,剖析其状况的流通进程,这次换个思路来看看,对如下代码:

fun main() = CoroutineScope(Dispatchers.Main).launch {
    println("label 0")
    val isLogin = checkLogin() // suspend
    println("label 1")
    println(isLogin)
    val login = login() // suspend
    println("label 2")
    println(login)
    val id = getId() // suspend
    println("label 3")
    println(id)
}

关于协程体中的代码,首个挂起点前的代码可看为初始状况, 其后每两个挂起点之间都是一个新的状况,终究一个挂起点到结束是终究的状况。其对应的状况机伪代码如下,协程体被编译成 SuspendLambda 子类,它完成父类中的 invokeSuspend() 办法,是协程的真实履行逻辑:

final class KotlinTest$main$1 extends SuspendLambda implements Function2 {
    int label = 0;  // 状况码
    public final Object invokeSuspend(Object result) {
        switch(this.label) {
            case 0:
                println("label 0");
                label = 1;
                result = checkLogin(this); // this 是编译器增加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 1:
                // 此刻传入的 result 是 checkLogin() 的成果
                println("label 1")
                val isLogin = result;
                println(isLogin)
                label = 2;
                result = login(this); // this 是编译器增加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 2:
                // 此刻传入的 result 是 login() 的成果
                println("label 2")
                val login = result;
                println(login)
                label = 3;
                result = getId(this); // this 是编译器增加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 3:
                // 此刻传入的 result 是 getId() 的成果
                println("label 3")
                val id = result;
                println(id)
                return;
        }
    }
}

看上面每次调用 suspend 函数时都会传一个 this 参数(continuation),这个参数是编译器增加的续体参数,表明的是协程体自身,在 suspend 挂起函数履行结束后会调用 continuation.resumeWith() -> invokeSuspend(result) 来康复该状况机的履行。

协程挂起

上面给出了协程体 SuspendLambda.invokeSuspend() 办法的状况机伪代码,那再看下 SuspendLambda 父类 BaseContinuationImpl 中的 resumeWith() 办法:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                val outcome: Result<Any?> = try {
                    // invokeSuspend() 履行续体下一个状况的逻辑
                    val outcome = invokeSuspend(param)
                    // 假如续体里调用到了挂起函数,则直接 return
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    // 关于 launch 发动的协程体,传入的 completion 是 AbstractCoroutine 子类目标
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

咱们说过协程发动后会调用到上面这个 resumeWith() 办法,接着调用其 invokeSuspend() 办法:

  1. 当 invokeSuspend() 回来 COROUTINE_SUSPENDED 后,就直接 return 终止履行了,此刻协程被挂起。
  2. 当 invokeSuspend() 回来非 COROUTINE_SUSPENDED 后,说明协程体履行结束了,关于 launch 发动的协程体,传入的 completion 是 AbstractCoroutine 子类目标,终究会调用其 AbstractCoroutine.resumeWith() 办法做一些状况改变之类的收尾逻辑。至此协程便履行结束了。

协程康复

这里咱们接着看上面第一条:协程履行到挂起函数被挂起后,当这个挂起函数履行结束后是怎样康复协程的,以下面挂起函数为例:

private suspend fun login() = withContext(Dispatchers.IO) {
    Thread.sleep(1000)
    return@withContext true
}

经过反编译能够看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创立其实例时也需求传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

首要调用了 suspendCoroutineUninterceptedOrReturn 办法,看注释知道能够经过它来获取到当时的续体目标 uCont, 接着有几条分支调用,但终究都是会经过续体目标来创立挂起函数体对应的 SuspendLambda 目标,并履行其 invokeSuspend() 办法,在其履行结束后调用 uCont.resume() 来康复协程,详细逻辑大家感兴趣能够自己跟代码,与前面迥然不同

至于其他的顶层挂起函数如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部也是经过 suspendCoroutineUninterceptedOrReturn() 来获取到当时的续体目标,以便在挂起函数体履行结束后,能经过这个续体目标康复协程履行。

协程库没有直接供给创立续体目标的办法,一般都是经过 suspendCoroutineUninterceptedOrReturn() 函数获取的,感兴趣的同学能够看看这个办法的注释: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension...

总结

Kotlin 协程实质便是运用 CPS 来完成对进程的操控,并处理了 CPS 会产生的问题(如回调阴间,栈空间占用)。

Kotlin suspend 挂起函数写法与一般函数相同,但编译器会对 suspend 关键字的函数做 CPS 改换;Kotlin 编译器并没有把代码转换成函数回调的形式,而是运用状况机模型,消除 callback hell, 处理栈空间占用问题。

行将协程代码块编译成 SuspendLambda 子类,完成 invokeSuspend() 办法。

invokeSuspend() 办法是对协程代码块的封装,内部参加状况机机制将整个逻辑分为多块,分隔点便是每个挂起点。协程发动时会先调用一次 invokeSuspend() 函数触发协程体的开端履行,后边每逢调用到一个挂起函数时,挂起函数会回来 COROUTINE_SUSPENDED 标识,然后 return 停掉 invokeSuspend() 函数的履行,即非堵塞挂起。编译器会为挂起函数主动增加一个 continuation 续体目标参数,表明调用它的那个协程代码块,在该挂起函数履行完结后,就会调用到续体 continuation.resumeWith() 办法来回来成果(或反常),而在 resumeWith() 中又调用了 invokeSuspend() 办法,其内依据状况机的状况来康复协程的履行。

Kotlin 协程中存在三层包装,每层包装都持有上层包装的引证,用来履行其 resumeWith() 办法做一些处理:

  • 第一层包装: launch & async 回来的 Job, Deferred 承继自 AbstractCoroutine, 里边封装了协程的状况,供给了 cancel 等接口;
  • 第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真实履行逻辑,其承继联系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数便是第一层包装实例;
  • 第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数便是第二层包装实例。

这三层包装都完成了 Continuation 续体接口,经过代理模式将协程的各层包装组合在一起,每层担任不同的功用。

下图的 resumeWith() 或许表明 resume(), 也或许表明 resumeCancellableWith() 等系列办法:

Kotlin协程之再次读懂协程工作原理

博文链接

文中内容如有错误欢迎指出,共同进步!觉得不错的同学留个赞再走哈~