本文咱们来聊聊协程是如何实现切换线程的。要搞清楚这个问题,我觉得需求搞懂这几个常识点:

  1. Continuation,简单讲能够把它看成是Callback,回调。当协程调用suspend函数,协程会被挂起,当suspend函数履行完成后,会经过Continuation的resumeWith办法,将履行成果回来给协程让协程持续履行。
  2. ContinuationInterceptor顾名思义是Continuation拦截器,也便是Callback拦截器,它的效果便是让回调在指定的线程中履行。假定有这样一个场景,在主线程中敞开协程,在子线程中履行耗时操作,当耗时操作履行结束时,它需求履行回调办法,而回调是需求在主线程中履行的,协程结构内部在协程敞开的时分就会经过拦截器将回调与主线程绑定在一起,让回调一定在主线程中履行
  3. CoroutineDispatcher是拦截器的子类,除了具有拦截器的功用之外,它还有两个重要效果,isDispatchNeeded(context:CoroutineContext)决议回调是否需求分发到其它线程中履行, dispatch(context: CoroutineContext, block: Runnable)将回调分发到指定线程中履行
  4. ThreadContextElement处理协程中的线程的ThreadLocal相关的变量。ThreadLocal很好理解,便是线程私有变量,只能被当时线程访问。那么协程中为什么会有这ThreadLocal呢?举个例子,在同一个线程中履行两个协程,将协程命名为A、B,在履行协程时打印出协程的称号。由于是同一个线程,并且协程的称号是存储在ThreadLocal中的。所以在协程履行的时分,需求将ThreadLocal中保存协程称号的变量修改为当时协程的称号,协程履行结束时,将变量重置。

首要经过一个例子来解说Continuation

fun main() = runBlocking(Dispatchers.Main) { // 花括号中是Continuation
    suspendNoChangeThread()
    suspendChangeToIOThread()
    normalFunc()
}
fun normalFunc() {
    // do something
}
suspend fun suspendNoChangeThread() {
    suspendCoroutine<Unit> {
        it.resume(Unit)
    }
}
suspend fun suspendChangeToIOThread(): String {
    return withContext(Dispatchers.IO) {
        Thread.sleep(1000)
        return@withContext "OK"
    }
}

咱们聚焦到main函数。咱们能够把整个函数体当成Continuation

// 命名为namedContinuation避免混淆
val namedContinuation = {
    suspendNoChangeThread()
    suspendChangeToIOThread()
    normalFunc()
}

整个函数都是在runBlocking(Dispatchers.Main)中的Dispatchers.Main主线程中履行的。咱们看到函数体中有两个suspend的函数,并且suspendChangeToIOThread是切换到IO线程履行的,那么当它履行完,会紧接着履行normalFunc()办法,而该办法是需求在主线程中调用的。那么阐明在suspendChangeToIOThread()和normalFunc()之间有一次切换线程的进程。

咱们都知道suspend修饰的函数,编译器会加上Continuation参数的。所以咱们能够把suspendChangeToIOThread()等价成:

fun suspendChangeToIOThread(continuation:Continuation):String {
    return withContext(Dispatchers.IO) {
            Thread.sleep(1000)
            return@withContext "OK"
        }
}

continuation需求在主线程中履行,那么continuation参数是什么时分与主线程绑在一起的呢?咱们需求跟踪runBlocking的调用链。最终会调用到Callable.kt中的startCoroutineCancellable办法

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

intercepted()办法便是把一个continuation回调和回调所需求履行的线程绑定在一起

//ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

以上代码,经过协程上下文获取到ContinuationInterceptor,调用interceptContinuation(this)办法,生成DispatchedContinuation,很显然DispatchedContinuation是一个回调的同时,还经过CoroutineDispatcher指明回调在哪个线程中履行。

// CoroutineDispatcher.kt
 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
// DispatchedContinuation.kt
internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) 

接下来咱们经过isDispatchNeeded和dispatch办法是如何实现回调在指定线程中履行。

isDispatchNeeded判断是否有必要切换线程。假定当时在履行的函数地点的线程与回调方针线程是同一个线程,那就没必要切线程了,否则是需求切线程的。

咱们以Android中的主线程为例,解说

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
}

HandlerContext的功用便是协程Android库将线程切换到Handler地点的线程。一个特例,切换到主线程履行。

从isDispatchNeeded办法中咱们能够看到,当时线程与方针线程不同时需求切换线程(Looper.myLooper() != handler.looper)。

经过dispatch办法,咱们看到只需求履行handler.post(block)办法,就能把block切到指定线程中履行。

最终咱们再简单解说下withContext是如何实现线程切换的。

fun main() = runBlocking {
    withContext(Dispatchers.IO) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    println("runBlocking" + coroutineContext[Job])
}

withContext内部会创立一个新的协程,并且会阻塞外部的协程。只要内部的协程履行完成后,外部的协程才会履行。所以咱们看到的打印日志是 先打印withContext 然后再打印runBlocking。

看下源码,此处需求理解协程上下文常识,具体能够看下我之前写的文章

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext.newCoroutineContext(context)
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
           // 省略代码
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) 
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            // 省略代码
        }
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

withContext源码将线程切换分成三种情况:

  1. 无需切换线程,并且协程上下文都没改变(newContext === oldContext)
  2. 无需切换线程,但是协程上下文其它的内容发现变化,比如说CorotineName发生变化了。会发动UndispatchedCoroutine,该协程会更新ThreadContextElement,由于线程没发生变化,只需求改变ThreadLocal中的内容即可
  3. 需求切换线程。发动DispatchedCoroutine,block.startCoroutineCancellable办规律最终会调用到intercepted(),后续流程与上文相同。

下面代码别离对应上述3个case:

fun main() = runBlocking {
    // case1 
    withContext(EmptyCoroutineContext) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    // case2
    withContext(CoroutineName("newName")) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    // case3
    withContext(Dispatchers.IO) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    println("runBlocking" + coroutineContext[Job])
}