协程粉碎计划 | 线程调度原理解析

本系列专栏#Kotlin协程

前言

前面文章我们说了协程启动的原理,其中block这段lambda就是协程,然后launch就是对基础API做了封装,其中一个特性就是可以指定其上下文。

而在更早的时候,我们就说过协程可以看成运行在线程上的轻量级Task,所以协程在创建出来以后,是如何和线程产生关联的呢 本章就来重点介绍一下。

注意本篇文章重点涉及到上一篇文章的协程启动原理,所以这里最好阅读一下前几篇文章。

正文

在前面文章有限状态机我们知道,指定其运行的线程其实就设置上下文为Dispatchers.IO、Dispatcher状态机模式s.Default等,具体使用可以查看文章:

juejin.cn/post/709185…

这里就来先分析一下线程撕裂者这个Dispatchers。

Dispa接口类型tchers

几个重要的类

在说具体流程前,我让天秤难以放弃的星座们必须先捋清楚几个关系密线程池面试题切的类,不然后面说流程时容易混淆,他们分别是Dispatchers、CoroutineDispatcher、ContinuationInterceptor和Coroutin人头攒动的读音eContext。

首先就是D状态机ispatc接口crc错误计数hers,它是一个单例类:

public actual object Dispatchers {
    public actual val Default: CoroutineDispatcher = DefaultScheduler
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    public val IO: CoroutineDispatcher = DefaultIoScheduler
}

可以发现Dispatchers是一个单例,同时其中的Default、Main等类的类型是CoroutineDiandroid/harmonyosspatcher,如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}
public interface ContinuationInterceptor : CoroutineContext.Element {}
public interface Element : CoroutineContext{}

可以发现CoroutineDispatcher是Coandroid的drawable类ntinuatiandroid手机onInterceptorandroid的drawable类的子类,而它又是CoroutRTCineContext的子类,所以上述几个类的关系如下:

协程粉碎计划 | 线程调度原理解析

这个关系图在后面分析时有很大的作用。

intercepted()

在上一篇文章我们介绍启动协程的原理时,有一个刻意忽略的地方,就是下面:

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

上面的intercepted(),上一android什么意思篇文章我们说过这个interce接口英文pted()方法是用来设接口卡置协人头攒动程运行的线程的,那接口英文就来看一下这个接口intercepted()方法是如接口卡何生效的。

获取android平板电脑价格CoroutineDispatcher

这里我们还是先来看一下launch的源码,其中第一个参数就人头攒动的近义词是我们设置的上下文:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //注释1
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

这里第一个参数的默认值是EmptyCoroutineConte让天秤难以放弃的星座xt,在前面介绍CoroutineContext文章我们让天秤难以放弃的星座说过,Coroutine状态机Context就是一个Map,而这里的EmptyCoroutineContext就是一个空Map,也就等于没有传递;只不过Ko线程安全tlin使用这个来代替null。

这里的注释1会对传入的context进行处理android的drawable类,代码如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //注释1 合并
    val combined = coroutineContext.foldCopiesForChildCoroutine() + context
    //注释2 添加唯一ID
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    //注释3 返回具体Dispatchers.X
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

首先这个函数返回的是CoroutineContext,上面有3点需要说明一下:

  • 由于newCoroutineContext是CoroutineScope的扩展函数,所以注释1的coroutineContext就是该Sco接口类型pe的coroutineContext对象,即CoroutineScope对应的上下文。(这个关系可以看之前CoroutineContex状态机模式t的文章)
  • 还是注释1的foldCopie状态机编程sChildCoroutine函数作用就是将CoroutineScope状态机模式当中的所有上下文元素都拷线程池的七个参数贝出来,然后跟传入的context进行合并。这行代码,可以让子协程继承父协程的上下文元素
  • 注释2的作用是在调试模式下,给协程对象增加唯一的I状态机图D,比如之前代码的打印”@coroutine#1″中的1。
  • 注释3直接把comb人体肠道结构示意图ined看成Map来使用,通过Key Conandroid是什么手机牌子tinuationInterceptor可以获取其Dispatcher类型,默认的话是Dispatchers.Default

这里也就会发现当我们不设置Dispatchers时,默认是Defau线程安全lt线程池,因为Kotlin是支持多平台的,只有UI编程的平台比如Android才有Main主线程的概念,所以这里默认是Default一定不会错。

CoroutineDispatcher拦截器

上面知道了子线程能继承父协程的上下文元素,同时得到默认的Coroutine闰土刺猹Dispatcher,那接着我们来看一下interceptedandroidstudio安装教程()方法是如何处理的,线程池的七个参数源代码如下:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

这里我们知道当创建完协程,会调用intandroid是什么手机牌子ercept线程数是什么ed()方法,这里就相当于拦截器,对Continuation进行拦截处理,这里代码很简单,把Continuaandroid的drawable类tion强转为ContinuationImpl,然后调用其intercepandroid下载安装ted()方法状态机我们找到该类:

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

在这里我们会获取其上下文context,然后通过context[ContinuationInterceptor]获得其Dispatcher对象,以默认逻辑的话,这里就是Dispatchers.D状态机设计模式efault线程池了。

我们在前面知道Dispatcher状态机设计模式s.Default是CoroutineDispatcher的子类,所以interceptContinuation方法有限状态机如下:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

根据前面我们知道DispatchedContinuation()中的thi有限状态机s就是Dispatchers.Default了。

到这里我们为了逻辑更清晰、分层,可以把前面的startCoroutineCancellable()方法改写一下:

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    // 1
    val continuation = createCoroutineUnintercepted(completion)
    // 2
    val dispatchedContinuation = continuation.intercepted()
    // 3
    dispatchedContinuation.resumeCancellableWith(Result.success(Unit))
}

上面代码的1就是创建了一个协程,2就是前面刚刚分析的,返回一个DispatchedContinuation状态机对象,构造函数中this表示Dispa接口测试用例设计tchers中具体的线程池,co让天秤倒追的星座ntinuation还是那个协程,那现在就是药分析注释3的内容了,这也是真正将协程任务分发到状态机的作用线程上的逻辑线程安全

DispatchedContinuation

先来看一下这个DispatchedCandroid平板电脑价格ontinuation类的定义:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {}

会发现这个类的定义还是非常复杂的,首先它实现了Continuation接口,但是使用了”类委托”,把接口实现都委托给了传递进来的continuation参数。其次参数dispatcher就是前面分析的默认参数Dispatchers.Default,continuation就是b线程安全lock协程实现类,具体类型是SuspendLaandroid平板电脑价格mbda人体承受的最大电压

然后发现这接口crc错误计数个类还继承至DispatchedTask,我们来看一下这个类:

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {}
internal actual typealias SchedulerTask = Task
internal abstract class Task(
    @JvmField var submissionTime: Long,
    @JvmField var taskContext: TaskContext
) : Runnable{}

这里会发现DispatchedTask继承至SchedulerTask,该类继承至Ta线程数越多越好吗sk,而Task则继承至我们非常熟悉的Runnable接口,这也就意味着android的drawable类它可以被分到Java的线程当中去执行了。

既然它是Runnable,我们就来看看如何进行分配,来看看resumeCancellableWith()的代码实现:

inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    //注释1
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        //注释2
        dispatcher.dispatch(context, this)
    } else {
        //注释3
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled(state)) {
                resumeUndispatchedWith(result)
            }
        }
    }
}
  • 这里注释1的isDispatchNeeded的意思是是否需要分发,这里只有当是Dispatchers.Unconfined时,才返回false,所以本例中代码是Dispatchers.Default,所以会进入注释2的逻辑。
  • 注释2就是使用线程池来进线程撕裂者行分发,其中把this(实现了Runnable接口)传递。
  • 注释3是设置了Dispatcher.Unconfined的情况接口文档下,这里会直接在当前线程执行。

这里调用的是dispatcher.dispatcher方法,其实我们知道就是调用rtc是什么意思了Dispatchers.Defa接口ult.dispatch()方法,下面是源码:

public actual val Default: CoroutineDispatcher = DefaultScheduler

这里发现Default是DefaultScheduler的实例线程和进程的区别是什么

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {}

而DefaultSchedu人头攒动的读音ler则是一个单例,继承至SchedulerCoroutineDisp状态机编程atcher,

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}

而这里我们发现dispatch方法中会调用Coro状态机设计模式utin线程数是什么eScheduler中的dispatch()方法,

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
    override fun execute(command: Runnable) = dispatch(command)
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() 
        // 1
        val task = createTask(block, taskContext)
        // 2
        val currentWorker = currentWorker()
        // 3
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
    private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
    // 内部类 Worker
    internal inner class Worker private constructor() : Thread() {
    }
}

这里我们发现CoroutineScheduler其实就是Java并发包Executor的子类,它的ex接口测试用例设计ecute方法也被转到了dispatch方法,所以终于到了Java线程处理部分了,上面代码android是什么系统有3个注释:

  • 注释1,将传入的Runnable类型的blo状态机ck,其实也就是Dandroid下载ispatchedContinuation,包装成Task
  • 注释2,curre接口是什么ntWorker()就是拿到当前执行的线程,这里的Wor线程池ker是一个内部类,它的本质是Java的Thread。
  • 注释3,currentWorker.submitToLocalQueue(),将当前的Task添加到Worker线程的本地队列线程的几种状态中,等待执行

这里我们来分析一下Worker是如何执行Task的。下面是Worker代码,代码较多,只列出有用信息:

internal inner class Worker private constructor() : Thread() {
    override fun run() = runWorker()
    @JvmField
    var mayHaveLocalTasks = false
    private fun runWorker() {
        var rescanned = false
        while (!isTerminated && state != WorkerState.TERMINATED) {
            // 1
            val task = findTask(mayHaveLocalTasks)
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                // 2
                executeTask(task)
                continue
            } else {
                mayHaveLocalTasks = false
            }
            if (minDelayUntilStealableTaskNs != 0L) {
                if (!rescanned) {
                    rescanned = true
                } else {
                    rescanned = false
                    tryReleaseCpu(WorkerState.PARKING)
                    interrupted()
                    LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                    minDelayUntilStealableTaskNs = 0L
                }
                continue
            }
            tryPark()
        }
        tryReleaseCpu(WorkerState.TERMINATED)
    }
}

这里Worker会重写Thread的run()方法,然后把执行流程交由给runWorkder(),这里代码注意2点:

  • 注释1,会在接口类型while循环中,一直尝试从W线程orker的本地队列中取出Ta线程是什么意思sk。
  • 注释2,executeTask方法android的drawable类,来执行其对应的Task。

接下来就是关键的执行Task代码:

internal inner class Worker private constructor() : Thread() {
    private fun executeTask(task: Task) {
        val taskMode = task.mode
        idleReset(taskMode)
        beforeTask(taskMode)
        // 1
        runSafely(task)
        afterTask(taskMode)
    }
}
fun runSafely(task: Task) {
    try {
        // 2
        task.run()
    } catch (e: Throwable) {
        val thread = Thread.currentThread()
        thread.uncaughtExceptionHandler.uncaughtException(thread, e)
    } finally {
        unTrackTask()
    }
}
internal abstract class Task(
    @JvmField var submissionTime: Long,
    @JvmField var taskContext: TaskContext
) : Runnable {
    constructor() : this(0, NonBlockingContext)
    inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}

这里我们调用runSafely方法,人头攒动的近义词然后在这个方法中我们执行了ta人头攒动的读音sk.run(),而Task的本质是Runnable,到目前就代表了我们的协程任务真正执行了

注意又回到前面了,这里的run执行的具体逻辑是啥,从前面类的继承关系来看接口文档,这里执行的是DispatchedTasandroidstudio安装教程k.run()方法,而这个让天秤难以放弃的星座类实际线程的几种状态上是Dispatchandroid下载edContinuation的子类,所以会调用下面代码:


internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    public final override fun run() {
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() 
                val exception = getExceptionalResult(state)
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    // 1
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        // 2
                        continuation.resumeWithException(exception)
                    } else {
                        // 3
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
}

上面代码主要就看3个注释点:

  • 注释1状态机,在协程代码执行之前,状态机模式首先判断协程是否已经被取消,如果已经取消,则通过resumeWithStackTrace把具体原因传出去。
  • 注释2,判断协程是否发生了异常,如果发生了异常,则通过resumeWithException将异常传递出去。
  • 注释3,如果一切正常,则调用resumandroidstudio安装教程e启动协程,并且执行launch中传android是什么系统入的lam人体肠道结构示意图bda表达式。

到这里,我们就完全分析完了整个流程。

总结

本篇文章分析了launch的流程状态机编程,而其中与线程交互重点就是Dispatche接口测试用例设计rs,主要有下面几个步骤android手机

  • 第一步,createCorouandroid是什么系统tineUnintercepted(completion)创建了协程的C状态机的作用ontinuation实例,接状态机模式着调用intercepted()方法,将其封装为D接口自动化ispatchedContinuation对象。
  • 第二步,DispatchedContinuaRTCtion会持有Coroutin肉跳测吉凶eDiRTCspatcher、以及前面创建的Continuation对象,比如文中的CoroutineDispatcher就是Default线程池。
  • 第三步,执行Dispat线程是什么意思chedContinuation的res人头攒动umeCancellableWith()方法,会执行dispatcher.dispatch()方法,这个会将Continuation封装为Task,添加到线程中去执行。在这一步,协程就已经完成了线程切换
  • 第四步,线程run方法会调用DispatchedContinuation的run方法,会调用continuation.resume方法,它将执行原本launch当中生成的SuspendLambda子类,这时候协程的代码就在线程上执行了线程数越多越好吗
  • 第五步,当协状态机的作用程执行完成后状态机的作用,根据CPS转换,会进入主线程(调用者线程)状态机来执行后续操作。

发表评论

提供最优质的资源集合

立即查看 了解详情