前言

在了解了kotlin协程的基本原理之后咱们接下来就需求重视下协程的线程切换。咱们这篇文章就深入源码的角度来剖析一波协程中的线程切换。

CoroutineContext

要了解Kotlin的线程切换,那咱们首要必须要先了解协程的CoroutineContext这个东西。

咱们都知道,每一个挂起函数在最终编译转换的时分都会变成一个携带CoroutineContext参数的函数,这也是为什么非suspend函数不能够调用suspend函数的原因。那么CoroutineContext究竟是干啥的呢?

CoroutineContext望文生义便是协程的上下文,这个上下文是跟协程绑定的联系。每发动一个协程,就对应一个CoroutineContext。他规则了当时这个要发动的协程的运行环境。咱们看下代码中CoroutineContext的界说

public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    //重载加号操作符
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    val interceptor = removed[ContinuationInterceptor]
                    //假如拦截器为空就直接组合新和老的上下文回来
                    if (interceptor == null) CombinedContext(removed, element) else {
                        //假如有拦截器,那就取出拦截器放在新组合的上下文右边。也便是优先访问到
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(
                            element,
                            interceptor
                        ) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
    public interface Key<E : Element>
    public interface Element : CoroutineContext {
        public val key: Key<*>
        public override operator fun <E : Element> get(key: Key<E>): E? =
            if (this.key == key) this as E else null
    }
}

能够看到CoroutineContext仅仅一个接口,实质上他的数据结构是一个数组形式。他的完结类和间接完结类有很多

kotlin协程中的线程切换

CoroutineContextElement、ContinuationInterceptor、CoroutineDispatcher等等。而这儿每一个子类其实就代表协程上下文的一种才能。例如:

  • Job:控制协程的生命周期,例如引发或许取消。
  • CoroutineDispatcher:将工作分派到适当的线程。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineExceptionHandler:处理未捕获的反常。

咱们能够参阅Kotlin协程:协程上下文与上下文元素这篇文章深入了解下CoroutineContext,本文仍是顺着线程切换思路持续往下看。

withContext

上末节咱们说到CoroutineDispatcher实质也是一个CoroutineContext。他用来分发使命到具体线程上。那他具体又是怎么分发的呢?咱们以下边一个比方打开剖析下。

suspend fun withContextTest() {
    withContext(Dispatchers.IO) {
        println("==========!!!!!io============== ${Thread.currentThread().name}")
    }
}

这儿一个很常见的线程切换写法。调用withContext函数,然后传一个Dispatchers.IO,然后用个协程发动下就OK了。这样咱们withContext里的代码块就能在IO线程里履行了。

运行结果如下,一个名叫DefaultDispatcher-worker-1 @coroutine#1的线程履行了咱们这次的使命。

kotlin协程中的线程切换

所以咱们顺藤摸瓜看下withContext函数的界说:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val oldContext = uCont.context
        //用新的上下文和老的上下兼并下一个终究上下文。新上下文的装备会掩盖替换掉老的上下文装备。
        val newContext = oldContext.newCoroutineContext(context)
        newContext.ensureActive()
        //终究新的上下文跟老的完全一致,调用非startUndispatchedOrReturn分发逻辑
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        //走到这步那便是两个上下文不相同了,可是拦截器是相同的
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            withCoroutineContext(newContext, null) {
                //仍旧走了不分发逻辑,咱们没有拦截器能够先不考虑这个
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 终究策略,运用DispatchedCoroutine分发使命。
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

首要咱们先看withContext这个办法的签名,榜首个参数是CoroutineContext,协程上下文。Dispatchers.IO便是传递给了CoroutineContext这个参数。也是说Dispatchers.IO实质上也是CoroutineContext。

当咱们运用Dispatchers.IO切换线程的时分,终究是由DispatchedCoroutine组件了一个新的上下文进行使命分发。那咱们持续看DispatchedCoroutine处理逻辑。

DispatchedCoroutine

咱们直接定位DispatchedCoroutine的startCoroutineCancellable这个办法。它是一个扩展函数。用runSafely语法糖包装了下。

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

这个函数首要做了两步:

  1. 创立一个非拦截器的上下文,然后调用拦截办法。怪怪的,可是它便是这样。
  2. 这个上下文调用resumeCancellableWith办法。

咱们持续盯梢resumeCancellableWith办法。

inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    //假如判别使命需求分发
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        //那就调用dispatcher进行分发
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled(state)) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

到这就很明晰了,用dispatcher校验下是否需求分发。假如需求的就去调用dispatch,假如不必则履行resumeUndispatchedWith恢复挂起点。

那这个dispatcher这个大局变量又是啥?

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    @JvmField
    @Suppress("PropertyName")
    }

他是DispatchedContinuation的一个结构参数,也便是咱们上边剖析的withContext函数里的newContext。而newContext实践上便是咱们比方里传递的Dispatchers.IO这个东西。

 val coroutine = DispatchedCoroutine(newContext, uCont)

到这儿咱们梳理下逻辑应该是这样的:

kotlin协程中的线程切换

所以基于以上剖析咱们能够总结以下几点:

  • Dispatchers.Main和Dispatchers.IO实质也是CoroutineContext,而且他们担任实践的线程切换操作。
  • withContext函数会比照新旧两个上下文的差异,只要不一致的时分才会走从头分发逻辑。所以并不是调用一次withContext就做一次上下文切换。

Dispatchers.Main

首要仍是咱们上边的比方,咱们只把Dispatchers.IO换成Dispatchers.Main,然后把代码放到一般单元测试类里,代码便是这样。

suspend fun withContextTest() {
    withContext(Dispatchers.Main) {
        println("==========!!!!!main============== ${Thread.currentThread().name}")
    }
}
@Test
fun startWithContext() {
    runBlocking{
        withContextTest()
    }
}

然后履行下你就会发现代码会报错了,报错信息:

Exception in thread "Test worker" java.lang.IllegalStateException: Module with the Main dispatcher had failed to initialize. For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used
	at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.missing(MainDispatchers.kt:118)
	at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.isDispatchNeeded(MainDispatchers.kt:96)
	at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:319)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
	at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110)
	at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
	at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
	at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
	at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
	at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
	at com.wuba.coroutinedemo.CoroutineDispatchDemo.addition_isCorrect(CoroutineDispatchDemo.kt:27)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineId(1), "coroutine#1":StandaloneCoroutine{Cancelling}@5f77d0f9, Dispatchers.Main[missing, cause=java.lang.RuntimeException: Method getMainLooper in android.os.Looper not mocked. See http://g.co/androidstudio/not-mocked for details.]]

能够看到实践上是由MissingMainCoroutineDispatcher这个分发器分发了主线程使命,而且报了一个主线程没有初始化的使命。

在这儿贴这个报错信息有两个目的。榜首个咱们能够根据这个报错仓库温习下上一节讲的分发逻辑,这个报错仓库很明晰反应了整个分发流程。第二个便是引出咱们的Main线程。

咱们先看下Main的界说。

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

能够看出main线程是需求初始化加载的,究竟每个渠道的的主线程是不相同的。比方安卓中主线程便是MainLooper。这也是上边报错仓库的原因。在单元测试模块中没有指定装备主线程,所以终究指定了MissingMainCoroutineDispatcher来报错。

咱们剖析下loadMainDispatcher这个函数

private fun loadMainDispatcher(): MainCoroutineDispatcher {
    return try {
        val factories = if (FAST_SERVICE_LOADER_ENABLED) {
            //了解便是个后门能够快速初始化一个测试级的主线程
            FastServiceLoader.loadMainDispatcherFactory()
        } else {
            /MainDispatcherFactory是个接口,反射加载MainDispatcherFactory完结类
            ServiceLoader.load(
                    MainDispatcherFactory::class.java,
                    MainDispatcherFactory::class.java.classLoader
            ).iterator().asSequence().toList()
        }
        @Suppress("ConstantConditionIf")
        //经过上边那个工厂创立实践的分发器
        factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
            ?: createMissingDispatcher()
    } catch (e: Throwable) {
        // 这便是咱们报错信息说到的MissingMainCoroutineDispatcher
        createMissingDispatcher(e)
    }
}

咱们再看下MainDispatcherFactory的完结类都有哪些。

kotlin协程中的线程切换

咱们一眼就看到了AndroidDispatcherFactory,对他便是在安卓渠道上实践的主线程分发器。赶紧点开看下完结。

internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
        //是不是很亲切很耳熟
        val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
        return HandlerContext(mainLooper.asHandler(async = true))
    }
}

奥,实践的回来者又是HandlerContext,而且给他传递了Looper.getMainLooper()。然后再翻下HandlerContext的继承联系,没错是CoroutineContext。上节咱们剖析分发逻辑的时分说终究分发的是dispatch办法。那咱们就看下HandlerContext的dispatch办法。其实不必看咱们也知道怎么回事。

override fun dispatch(context: CoroutineContext, block: Runnable) {
    if (!handler.post(block)) {
        cancelOnRejection(context, block)
    }
}

很简略调用looper的post办法扔到主线程里。可是这儿有个兜底逻辑,便是假如扔主线程失利了会兜底运用Dispatchers.IO.dispatch(context, block)进行分发。

Dispatchers.IO

接下来咱们就来剖析下Dispatchers.IO。有了之前剖析Dispatchers.Main的经验咱们很快就找到了相关的界说。

public val IO: CoroutineDispatcher = DefaultIoScheduler

能够看到Dispatchers.IO便是DefaultIoScheduler。

在老的版本上Dispatchers.IO其实是和Dispatchers.Default相同运用的是DefaultScheduler。默许最大线程数为64。新版优化成了DefaultIoScheduler,支持扩展这个最大线程数。具体能够看下新版的DefaultIoScheduler注释。

咱们来看下DefaultIoScheduler的界说:

//看办法名字和继承类,大约就能够知道这是一个线程池,不过这个线程池跟Java线程池没啥联系,完全是协程自己完结的一套
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
    //一个没有约束的IO调度器,调至调度办法初始化了一个调度器?
    private val default = UnlimitedIoScheduler.limitedParallelism(
        //读取默许装备
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    override val executor: Executor
        get() = this
    //线程池履行使命的办法
    override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
    @ExperimentalCoroutinesApi
    override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        return UnlimitedIoScheduler.limitedParallelism(parallelism)
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        //分发调度使命
        default.dispatch(context, block)
    }
}

能够看到UnlimitedIoScheduler.limitedParallelism这个办法创立了一个调度器,然后由这个调度器来履行dispatch办法。而这个办法终究回来的是LimitedDispatcher。

public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    parallelism.checkParallelism()
    return LimitedDispatcher(this, parallelism)
}

看到这你是不是会想当然的认为终究dispatch分发的是LimitedDispatcher这个东西?假如你这样想就堕入了骗局中。

咱们再放出LimitedDispatcher的源码:

//留意看参数
internal class LimitedDispatcher(
    private val dispatcher: CoroutineDispatcher,
    private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
    override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        if (parallelism >= this.parallelism) return this
        return super.limitedParallelism(parallelism)
    }
    //中心使命履行
    override fun run() {
        var fairnessCounter = 0
        while (true) {
            val task = queue.removeFirstOrNull()
            if (task != null) {
                try {
                    task.run()
                } catch (e: Throwable) {
                    handleCoroutineException(EmptyCoroutineContext, e)
                }
                if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
                    dispatcher.dispatch(this, this)
                    return
                }
                continue
            }
            synchronized(workerAllocationLock) {
                --runningWorkers
                if (queue.size == 0) return
                ++runningWorkers
                fairnessCounter = 0
            }
        }
    }
    //使命分发
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatchInternal(block) {
            //留意看这,看dispatcher哪来的!!
            dispatcher.dispatch(this, this)
        }
    }
    private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
        if (addAndTryDispatching(block)) return
        if (!tryAllocateWorker()) return
        dispatch()
    }
    private fun tryAllocateWorker(): Boolean {
        synchronized(workerAllocationLock) {
            if (runningWorkers >= parallelism) return false
            ++runningWorkers
            return true
        }
    }
    private fun addAndTryDispatching(block: Runnable): Boolean {
        queue.addLast(block)
        return runningWorkers >= parallelism
    }
}

首要咱们仍是看dispatch办法,他调用了dispatchInternal做后续的分发逻辑。

dispatchInternal首要逻辑:

  1. 把使命加到LockFreeTaskQueue行列里,判别下正在履行的使命数量是否现已大于约好的约束数量。假如大于,那证明现已没有可用的闲暇线程去履行当时使命了。所以只用回来就好。
  2. 假如小于,那证明还有可用闲暇线程来履行当时的这个使命。那么就调用tryAllocateWorker请求资源。留意这儿仅仅同步办法改变下计数,并非真实的去请求线程池资源。
  3. 最终调用dispatch()办法,也便是override fun dispatch(context: CoroutineContext, block: Runnable) 这个办法里的 dispatcher.dispatch(this, this) 这一行。
  4. 最终便是调用dispatcher.dispatch(this, this)
  • 第4条这句很简略让人堕入误解。其实它并不是递归调用LimitedDispatcher的dispatch办法。
  • 这儿的dispatcher是LimitedDispatcher结构办法里传来的CoroutineDispatcher。
  • 是limitedParallelism办法LimitedDispatcher(this, parallelism) 的this。
  • 也是DefaultIoScheduler里的default变量赋值句子里的UnlimitedIoScheduler.limitedParallelism
  • 也便是说实践上是UnlimitedIoScheduler的dispatch办法在起作用。

咱们再看下UnlimitedIoScheduler的dispatch办法:

private object UnlimitedIoScheduler : CoroutineDispatcher() {
    @InternalCoroutinesApi
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}

他调用的是DefaultScheduler的dispatchWithContext分发使命。咱们再看下DefaultScheduler的界说。

//看参数,又是中心池巨细,最大池子巨细的大约也能猜出这是个线程池。可是这个类办法里就shutdown和close两个函数。所以中心完结在SchedulerCoroutineDispatcher里。
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    internal fun shutdown() {
        super.close()
    }
    override fun close() {
        throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
    }
}

看这阵仗咱们好像也应该猜到什么了。对,这便是个线程池。哎,问题来了,为什么不直接复用java的线程池?要自己完结呢?咱们先把这个问题放下持续剖析源码。最终回过头来再思考这个问题。

DefaultScheduler类里就两个函数,所以中心逻辑肯定在父类SchedulerCoroutineDispatcher里。所以咱们持续看SchedulerCoroutineDispatcher这个类

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    override val executor: Executor
        get() = coroutineScheduler
    private var coroutineScheduler = createScheduler()
    private fun createScheduler() =
        //奥,SchedulerCoroutineDispatcher也不是实践的线程池,CoroutineScheduler才是。
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
   }

dispatch在用coroutineScheduler进行分发,coroutineScheduler又是CoroutineScheduler。

到这儿咱们咱们要先休憩休憩了,咱们总结下Dispatchers.IO,其实就三点:

  • Dispatchers.IO也是一个CoroutineContext,在老版本对应的是DefaultScheduler,新版本是DefaultIOScheduler。
  • 新版本DefaultIOScheduler相对于DefaultScheduler增加了最大线程数量的扩展。实质上仍是运用DefaultScheduler做分发。
  • DefaultScheduler的实质其实是CoroutineScheduler,他是一个自界说的线程池。咱们的Dispatchers.IO实质是交给了CoroutineScheduler去履行调度使命了。

咱们能够以一个更简略的图来描绘下他们的联系。

kotlin协程中的线程切换

CoroutineScheduler

接下来便是咱们IO线程池的中心部分,CoroutineScheduler。或许我剖析的有些当地不行透彻,咱们能够先看一遍我的剖析文章然后自行去源码里剖析剖析这个类。也能够直接越过我的剖析直接自己动手丰衣足食。

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
    //线程池的大局行列,CpuQueue能够了解为中心线程使命。
    @JvmField
    val globalCpuQueue = GlobalQueue()
    //第二条使命行列,BlockingQueue用来存放优先级较低的使命。便是中心线程把CpuQueue使命做完之后才会调度到这儿。
    @JvmField
    val globalBlockingQueue = GlobalQueue()
    //增加使命行列
    private fun addToGlobalQueue(task: Task): Boolean {
        return if (task.isBlocking) {
            globalBlockingQueue.addLast(task)
        } else {
            globalCpuQueue.addLast(task)
        }
    }
   override fun execute(command: Runnable) = dispatch(command)
   fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask()
        //创立使命
        val task = createTask(block, taskContext)
        //判别现在是否现已在一个Worker线程中,假如在的话那就能够进行复用了,算是一个小优化。
        val currentWorker = currentWorker()
        //将使命增加到到Worker线程自己的使命行列里。留意不是上边的大局行列
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        //假如增加失利,那就增加到分发器的大局行列里。
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        //假如是尾调模式,而且当时是worker线程,也便是说使命被增加到了复用线程使命里了
        val skipUnpark = tailDispatch && currentWorker != null
        if (task.mode == TASK_NON_BLOCKING) {
            //假如是中心线程就等候当时线程履行结束,不在引发或许创立新的线程。以希望使命能够在这个线程中按序履行结束。便是说不要在发动非中心线程来抢占这个中心使命。
            if (skipUnpark) return
            signalCpuWork()
        } else {
            //非中心使命履行逻辑,其实大体跟中心使命逻辑相同
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}

咱们能够再重视下signalBlockingWork办法的界说:

private fun signalBlockingWork(skipUnpark: Boolean) {
    //一个状态值的获取
    val stateSnapshot = incrementBlockingTasks()
    //方才咱们说到的尾调,直接回来
    if (skipUnpark) return
    //从线程池引发一个线程
    if (tryUnpark()) return
    //引发失利,那就预备创立一个线程
    if (tryCreateWorker(stateSnapshot)) return
    //创立失利了,那在尝试引发一遍,万一这时分线程池又有线程了呢
    tryUnpark()
}

以上便是协程自界说线程池的大约逻辑。咱们能够只重视两点内容:

  • 这个自界说线程池有两个大局使命行列,一个中心线程使命,一个非中心线程使命。
  • 优先复用已有的线程使命,假如有就会把使命加到已有的work使命的本地行列里。否则会从头引发或许创立线程。

比方有个协程使命接连两次调withContext(Dispatchers.IO)切换子线程分发使命,那么第二个withContext(Dispatchers.IO)就在榜首个子线程中持续分发履行,而非从头创立线程使命。

Worker

接下来咱们就要剖析真实担任干活的线程使命Worker。

//看继承类,哦是个线程。既然是线程咱们就要重视run办法
internal inner class Worker private constructor() : Thread() {
        inline val scheduler get() = this@CoroutineScheduler
        //本地使命行列,也便是每个线程的使命表。
        @JvmField
        val localQueue: WorkQueue = WorkQueue()
        //要害办法
        override fun run() = runWorker()
        //中心使命
        private fun runWorker() {
            var rescanned = false
            while (!isTerminated && state != WorkerState.TERMINATED) {
                //找活干!
                val task = findTask(mayHaveLocalTasks)
                //找到活了
                if (task != null) {
                    rescanned = false
                    minDelayUntilStealableTaskNs = 0L
                    executeTask(task)
                    //continue下持续找活干
                    continue
                } else {
                    mayHaveLocalTasks = false
                }
                //活都干完了,先别走。在延迟一会,从头continue下。万一这时分又有活来了呢?
                if (minDelayUntilStealableTaskNs != 0L) {
                    if (!rescanned) {
                        rescanned = true
                    } else {
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L
                    }
                    continue
                }
                //活真的干完了,也没新活来,那这个线程就能够被回收了。收工!
                tryPark()
            }
            //开释资源,其实便是改符号位
            tryReleaseCpu(WorkerState.TERMINATED)
        }
        //找活干
        fun findTask(scanLocalQueue: Boolean): Task? {
            //获取符号位,获取成功后就开端找使命。假如答应扫描本地行列,那就先扫描本地行列。假如不答应扫描本地行列就去大局行列里查找。
            if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
            //没有获取到cpu令牌,仍是从本地大局行列里去查询。
            val task = if (scanLocalQueue) {
                localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
            } else {
                globalBlockingQueue.removeFirstOrNull()
            }
            //唉,有意思的来了。假如以上都没查询到使命,那就尝试盗取一个使命(Steal=偷)
            return task ?: trySteal(blockingOnly = true)
        }
    }

那这Work线程又去哪偷使命去呢?咱们来看下trySteal办法的界说:

private fun trySteal(blockingOnly: Boolean): Task? {
    assert { localQueue.size == 0 }
    val created = createdWorkers
    // 看下当时有几个线程呢,小于两个那便是只要一个。奥那不便是我自己么,那还偷啥,不偷了。
    if (created < 2) {
        return null
    }
    var currentIndex = nextInt(created)
    var minDelay = Long.MAX_VALUE
    //有多少个线程我就重复多少次
    repeat(created) {
        ++currentIndex
        if (currentIndex > created) currentIndex = 1
        val worker = workers[currentIndex]
        //取出来线程,而且这个线程不是我自己
        if (worker !== null && worker !== this) {
            assert { localQueue.size == 0 }
            //从其他Work线程使命里去偷他的本地使命。
            val stealResult = if (blockingOnly) {
                localQueue.tryStealBlockingFrom(victim = worker.localQueue)
            } else {
                localQueue.tryStealFrom(victim = worker.localQueue)
            }
            if (stealResult == TASK_STOLEN) {
                return localQueue.poll()
            } else if (stealResult > 0) {
                minDelay = min(minDelay, stealResult)
            }
        }
    }
    minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
    return null
}

经过咱们剖析,本来Worker真是个敬业好职工(卷王)。自己没活了(本地使命行列),领导那也没活了(大局使命行列),又自动去帮同事完结一部分工作(偷使命)。而且在所有使命完结之后也不立马下班,而是自动加班,等候分配新工作(等候复用机制)。

尾调机制

咱们大体对这个IO线程池有个开始了解了,然后咱们回头看下上边说的那个“尾调”这个逻辑currentWorker.submitToLocalQueue(task, tailDispatch)

咱们盯梢这个办法最终定位到WorkQueue类。其间fair参数便是tailDispatch。

fun add(task: Task, fair: Boolean = false): Task? {
        //尾调便是规规矩矩的放在使命行列尾部
        if (fair) return addLast(task)
        //不是尾调,就把新使命发在高优出队使命里,然后把本来要出队的使命放在队尾。
        val previous = lastScheduledTask.getAndSet(task) ?: return null
        return addLast(previous)
    }

结合以上CoroutineScheduler和Worker末节学到的知识点。咱们能够总结出这个尾调逻辑具体要做啥。

在传统的线程池的线程充足情况下,一个使命到来时,会被分配一个线程。假设前后两个使命A与B有依靠联系,需求在履行A再履行B,这时假如两个使命一起到来,履行A使命的线程会直接履行,而履行B线程的使命或许需求被堵塞。而一旦线程堵塞会造成线程资源的糟蹋。而协程实质上便是多个小段程序的相互协作,因此这种场景会非常多,经过这种机制能够确保使命的履行次序,一起减少资源糟蹋,而且能够最大限度的确保一个接连的使命履行在同一个线程中。

所以基于咱们也很简略了解谷歌doc关于withContext的这段描绘。

kotlin协程中的线程切换

总结

至此咱们基本现已剖析完了协程线程切换的大体流程。咱们总结本篇文章的几个中心知识点吧

  • 什么是协程上下文?他的作用是什么?
    • 协程上下文是规则了此次协程使命的工作环境,比方在什么线程里,反常处理机制等操作。
  • 协程IO线程池为什么不复用java线程池?
    • 针对协程多个小段程序的相互协作,线程切换场景频频的特色,协程运用尾回调机制和线程使命盗取机制来优化IO线程池性能。
  • 协程IO线程池做了那些优化?
    • 尾回调机制和线程使命盗取机制
  • 什么是尾回调机制?
    • 假如有当时活跃线程,协程会把使命放到这个线程的本地使命行列里,并等候线程履行完使命,而非从头创立或引发新使命。以此来确保有前后依靠使命的场景能够次序履行。以防止线程资源的糟蹋。
  • 什么是线程使命盗取?
    • 当一个线程的本地使命和大局行列使命都履行结束后,会尝试去其他线程里的本地使命行列里盗取一个使命拿来履行,以完结线程的最大复用。

以上是我的总结和答案,咱们也能够参阅他人的文章得到自己的总结和答案。

参阅文章

【深入了解Kotlin协程】协程的上下文 CoroutineContext

Kotlin协程:协程上下文与上下文元素

Kotlin协程:Dispatchers.IO线程池原理