Kotlin协程系列文章:《一、Kotlin协程系列:协程的创立与发动,挂起和恢复》
Kotlin协程系列文章:《二、Kotlin协程系列:协程的线程调度》
在榜首篇文章中咱们剖析了协程发动创立进程发动进程,在本文中,咱们将着重剖析协程中协程调度的逻辑流程。主要是剖析解答如下2个问题:
-
涉及到协程办法器是怎么将协程代码调度到特定的线程履行?
-
子协程履行完又是怎么切换回父协程的线程环境?
一、协程的分发器作用
1.1 测验代码
GlobalScope.launch {
//协程体1
Log.d(TAG, "before suspend job.")
withContext(Dispatchers.Main) {
//协程体2
Log.d(TAG, "print in Main thread.")
}
Log.d(TAG, "after suspend job.")
}
- 此次的协程测验用例中,咱们默许的
launch
一个协程,咱们简单的将launch
需求履行的这外层逻辑为协程体1。 - 在协程体1中,咱们运用
withContext
将协程切换到主线程履行,打印日志。咱们将这儿边履行的协程逻辑为协程体2。 - 协程体2履行完成后,切回协程体1中履行并打印Log。
- 留意,依据咱们之前《协程的创立与发动》文章中剖析的,Kotlin编译器针对协程体1和协程体2分别生成一个承继与
SuspenLamabda
的类型,比方:class MainActivity#onCreate$1 : SuspenLambda{...}
。咱们在讲协程体时,也一起代指这个类实例。
持续盯梢launch()
函数履行逻辑,这次盯梢进程不同与《协程的创立与发动》篇章,咱们会将侧重点放在发动进程中协程调度器是怎么起作用的?接下来见1.2
1.2 CoroutineScope.launch
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//1. 见1.2.1
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//2. 详见1.3
coroutine.start(start, coroutine, block)
return coroutine
}
- 这儿会新建一个
CoroutineContext
,详见1.2.1 - 依据之前的剖析,这个里终究会调用到
startCoroutineCancellable()
办法,详见1.3流程。
1.2.1 newCoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return
if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default
else
debug
}
-
coroutineContext
:coroutineContext
是CoroutineScope
的成员变量,当此刻为GlobalScope.coroutineContext==EmptyCoroutineContext
-
context
:因为调用launch
时没有指定Context
,所以传到此处也是EmptyCoroutineContext
。foldCopies()
函数将2个context相加并复制,终究combied==EmptyCoroutineContext
。
而在return这最终判别回来的是debug+Dispatchers.Defatult
,所以此刻默许的分发器为Dispatchers.Defatult
。
这儿涉及到的协程Context运算不做深化剖析,简单能够以为协程重写了“+”运算,使得Context之间能够运用“+”来叠加,没有的Element类型会被添加到Element调集,调集中已有的Element类型会被覆盖。
1.3 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
//1. 创立SuspendLambda协程体
createCoroutineUnintercepted(receiver, completion)
//2. 阻拦:取出分发器,并构建办法器Continuation。详见1.3.1
.intercepted()
//3. 调用办法器Continuation的resume办法,详见1.4
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
- 这儿的构建协程体在《协程的创立与发动》一节中现已剖析,不再赘述。
- 进行阻拦,留意:这儿其实会依据办法器再构建出一个
DispatchedContinuation
目标,它也是一个续体类型,这是对协程体的一次包装。详见1.3.1末节。 - 调用阻拦器续体的
resumeCancellableWith()
开端状况机流转,履行分发流程详见1.4末节。
1.3.1 intercepted()
public fun intercepted(): Continuation<Any?> =
intercepted?: (
//1. 取出阻拦器
context[ContinuationInterceptor]?
//2.构建阻拦器续体
.interceptContinuation(this)?: this)
.also { intercepted = it }
- 取出当时上下文中的阻拦器类型,依据之前1.2.1末节的剖析,这儿取出来的是
Dispatchers.Defatult
。 -
interceptContinuation(this)
为构建阻拦器续体,留意这儿传入的this
是协程体1。 详见1.3.2。
1.3.2 CoroutineDispatcher
//Base class to be extended by all coroutine dispatcher implementations.
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>):
//详见1.4
Continuation<T> = DispatchedContinuation(this, continuation)
}
直接新建了一个DispatchedContinuation
目标实例这儿需求留意传入的构建参数:
- this:当时
Dispatcher
,也便是Dispatchers.Defatult
。 - continuation:协程体1。
1.3.3 小结
自此Continuation.intercepted()
办法就剖析完毕,终究的结果是:用上下文中的Dispatcher
和当时Contination
目标也便是协程体1,一起作为构建参数,新建了一个DispatchedContinuation
目标。接下来接着1.3中的第三点,调用DispatchedContinuation.resumeCancellableWith()
办法开端剖析。
1.4 DispatchedContinuation
internal class DispatchedContinuation<in T>(
//1. 分发器
@JvmField val dispatcher: CoroutineDispatcher,
//2. 留意这儿将Continuation的完成托付给了continuation成员变量。
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED)
, CoroutineStackFrame,
Continuation<T> by continuation {
//3. 复写特点delegate为自己
override val delegate: Continuation<T>
get() = this
...
// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//默许为true
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//4. 详细见
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}
这儿的dispatcher==Dispatchers.Defatult
,所以接下来需求解析Dispatchers.Defatult
到底是什么东西。详见1.5
- 成员变量
dispatcher==Dispatchers.Default
。 - 成员变量
continucation==协程体1(SuspenLambda类型实例)
。一起DispatchedContinuation
承继于Continuation
接口,它将Continuation
接口的完成托付给了成员变量continuation
。 -
deleagte
为复写了DispatchedTask.delegate
特点,将其回来自己。 - 调用分发器也便是
Dispatchers.Defatult
的dispatch()
办法,留意这儿传入的参数:
-
-
context
:来自Continuation
接口的特点,因为托付给了成员变量continuation
,所以此context
==continuation.context
。 -
this
:分发器自身Dispatchers.Defatult
-
自此这个办法的剖析完毕:调用分发器的进行分发,接下来剖析就开端剖析协程办法器CoroutineDispatcher
1.5 DefaultScheduler
//Dispathcer.kt
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
//Dispathcer.kt
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}
实践上是承继 SchedulerCoroutineDispatcher
类型。详见1.5.1
1.5.1 SchedulerCoroutineDispatcher
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
//1. 详见1.5.2
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
//2. 详见1.5.2
override fun dispatch(context: CoroutineContext, block: Runnable): Unit
= coroutineScheduler.dispatch(block)
...
}
//Executors.kt
//2. 实践上是承继ExecutorCoroutineDispatcher
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
...
}
- 能够看到实践上调用了
CoroutineScheduler.dispatch
办法。此刻发现,第二个参数是Runnable
类型的,而在1.4末节中,咱们知道传入的是this
也便是DispatchedContinuation
,所以DispatchedContinuation
承继的父类中,必定有承继了Runnable
接口,而他的run办法的完成也在父类中,这块咱们暂时按下不表,接着看持续盯梢coroutineScheduler.dispatch(block)
。
1.5.2 CoroutineScheduler
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
...
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
- 该类承继了
Executor
类,并且它的构建参数可看到是线程池的参数,所以能够知道这个其实是Kotlin协程完成的一个线程池,详细就不跟进去了。 -
execute()
进程也是dispatch
进程:将使命投递到使命队列,然后通知线程去取使命履行,自此完成了线程切换动作。 - 而在新线程里履行的
Runnable
为1.4中的调用代码:dispatcher.dispatch(context, this)
中的this
,也便是DispatchedContinuation
。DispatchedContinuation.kt
并没有完成run
办法,那么一定是他承继的父类完成了Runnable
接口并完成,所以需求接着看它承继的父类:DispatchedTask
类。
1.6 DispatchedTask.run()
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
...
internal abstract val delegate: Continuation<T>
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
internal open fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
//1. 取出代理商的续体
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
//1. 被包装的续体的resume办法,真正的开端动身其协程状况机代码。
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
- 将
delegate
转为DispatchedContinuation
,应该留意1.4 末节中DispatchedContinuation
承继DispatchTask
时,便对此delegate
进行了复写:
override val delegate: Continuation
get() = this
而此delegate.continucation
便是当初newDispatchedContinuation(this)
时传入的this,此this便是Kotlin编译器一开端为协程体生成的SuspendLambda
类型目标。详细能够回看1.3末节。
- 调用了
continuation.resume()
办法触发了协程的状况机从而开端履行协程事务逻辑代码,结合之前1.5.2的剖析能够知道,这个办法的调用现已是被dispatch
到特定线程,完成线程切换后履行的。所以协程状况机的代码也是跑在新线程上的。
1.7 总结
至此,协程的线程调度剖析完毕,关键有如下几个关键:
- 创立
SuspendLambda
时,他的协程上下文目标来自于comletion.context
,默许便是Dispatcher.Default
。 -
SuspendLambda
发动时调用了intercept()
进行一层包装,得到DispatchedContinuation
,后续协程发动是发动的DispatchedContinuation
协程。 -
DispatchedContinuation
承继于Runnable
接口,协程发动时将自己投递到分发器dispatcher
履行run
办法,从而到达了线程切换作用。 - 在
DispatchedContinuation
的run
办法中,调用SuspendLambda.resume()
发动状况机。在新线程履行协程状况机代码。
这一末节中,介绍了怎么将协程调度到意图线程履行,接下来剖析怎么做到随意切换线程后,然后再恢复到本来线程的。
二、协程中的线程切换
在榜首末节中,咱们搞清楚了协程发动时,协程调度器是怎么在其间起作用的。这一末节旨在剖析在协程用分发器切换线程履行新的挂起函数后,是怎么切换会本来线程持续履行剩下的逻辑的。
为此,咱们需求将1.1的测验代码反编译出来实践代码从而剖析。
2.1 反编译代码
2.1.1 MainActivityonCreateonCreate1
final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
...
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04());
this.label = 1;
//1. 新建编译器主动生成的承继于SuspendLambda的类型。
AnonymousClass1 anonymousClass1 = new AnonymousClass1(null);
//2. 调用withContext
Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this);
if (res != coroutine_suspended) {
break;
} else {
//挂起
return coroutine_suspended;
}
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328());
return Unit.INSTANCE;
}
}
-
依据之前的文章剖析,这儿
suspend lambda
的类型都主动生成承继于SuspendLambda
的类型。详见2.1.2。 -
将
anonymousClass1
传入withContext
,并且留意这儿传入了this==MainActivity$onCreate$1
,详见2.2。
2.1.2 AnonymousClass1
/* compiled from: MainActivity.kt */
public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> {
int label
...
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object obj) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(obj);
return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f()));
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
2.2 withContext
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
//1. 获取当时协程, 留意这儿的uCont便是当时续体,也便是MainActivity$onCreate$1
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
//2. 核算获的新的协程上下文
val oldContext = uCont.context
val newContext = oldContext + context
//3. 快速判别:新上下文和旧上下文共同的情况快速处理。
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
//4. 新建一个DispatchedCoroutine
val coroutine = DispatchedCoroutine(newContext, uCont)
//5. 发动协程
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
-
suspendCoroutineUninterceptedOrReturn
这个函数直接步进是看不到完成的,它的完成是由Kotlin编译器生成的,它的作用是用来获取当时续体的,并且经过uCont
回来,这儿便是MainActivity$onCreate$1
。 -
将旧协程上下文和新的上下文一起。核算得到终究的上下文。这儿的
context==Dispatchers.getIO()
。 -
快速判别,不用看。
-
新建一个
DispatchedCoroutine
,留意这儿传入了新的协程上下文和当时续体目标。 -
调用
startCoroutineCancellable()
发动协程。这儿的同1.3.2末节剖析一样,详见 2.2.1
2.2.1 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
//1. 创立SuspendLambda协程体
createCoroutineUnintercepted(receiver, completion)
//2. 阻拦:取出分发器,并构建办法器Continuation。详见1.3.1
.intercepted()
//3. 调用办法器Continuation的resume办法,详见1.4
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
此办法在之前1.3末节现已剖析过,针对此此次调用,其间的改动是协程上下文中的分发器现已被设置为Dispatchers.Main
。
-
创立了
SuspendLambda
目标,此目标的CoroutineContext
为completion.context
。而其间的ContinuationInterceptor
类型Element
便是咱们之前传入的Dispatchers.Main
。 -
创立一个
DispatchedContinuation
。 -
将协程
SuspendLambda
的状况机逻辑经过Dispatcher.Main
调度到主线程履行,调度进程参考榜首下节。分发逻辑详见2.7末节。 -
当
SuspendLambda
的状况机invokeSuspend()
逻辑履行完成后,会回来到BaseContinuationImpl.resumeWith()
,咱们需求接此办法剖析,来得到协程在切换到主线程履行后,又是怎么切回协程体1的履行线程的,详见2.3。
2.3 resumeWith
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
//1. 进入此判别
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
当状况机履行完后, 后进入到completion的类型判别,由2.2和2.2.1能够知道,当初传入的completion是DispatchedCoroutine
类型,所以加入到else分支,调用了DispatchedCoroutine.resumeWith()
,接下来剖析此办法。
在此之前,咱们需求看下DispatchedCoroutine
的承继联系,详见2.4.1。假如想直接盯梢流程,能够直接看2.4.2。
2.4 DispatchedCoroutine
2.4.1 DispatchedCoroutine 的承继联系
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
}
承继于ScopeCoroutine
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
}
承继于AbstractCoroutine
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
}
2.5 协程线程的恢复
2.5.1 AbstractCoroutine.resumeWith()
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
调用了afterResume
办法,此办法在DispatchedCoroutine
类型有详细完成。见2.5.2
2.5.2 afterResume
//DispatchedCoroutine
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
-
取出当时续体
uCont
,这个续体依据之前的剖析:2.2末节,能够知道它等于MainActivity$onCreate$1
。 -
intercepted()
:取出其分发阻拦器 -
resumeCancellableWith
:运用办法阻拦器协程体,将uCont续体的状况机逻辑调度到相对应的线程环境履行,这儿便是之前的Dispatcher.Default
。留意其注释:“将其切换到原先的分发器”。2⃣而这一进程其实和1.3末节的进程共同。 -
恢复到
Dispatcher.Default
持续履行状况机时,因为label现已被更新,所以会往下持续履行,打印最终一句log。
2.6 总结
-
withContext(Dispatcher.Main)
发动的协程时,获得当时协程续体uCount
也便是MainActivity$onCreate$1
,会核算出新的协程context
,然后用它们创立一个DispatchedCoroutine
。 -
AnonymousClass1
协程发动时,用DispatchedCoroutine
作为completion
参数,然后发动,此刻会调度主线程履行协程。 -
当协程履行完成后,
AnonymousClass1.resumeWith()
办法会调用completion.resumeWith()
。 -
DispatchedCoroutine.resumeWith()
办法会调用uCount.intercepted().resumeCancellableWith()
,使得父协程进行调度并接着履行状况机逻辑。
2.7 Dispatchers.Main
@JvmStatic
public actual val Main: MainCoroutineDispatcher get()
= MainDispatcherLoader.dispatcher
直接详见2.7.1
2.7.1 MainDispatcherLoader
internal object MainDispatcherLoader {
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
// 1.获得MainDispatcherFactory的完成类
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
- 经过ServiceLoad机制获取
MainDispatcherFactory
的完成类,而在源码里边,其完成类为AndroidDispatcherFactory
- 调用
tryCreateDispatcher()
创立分发器,详见2.7.2。
2.7.2 AndroidDispatcherFactory
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
依据createDispatcher
分发,主线程分发器的完成类为HandlerContext
类型,传入用MainLooper
构建的Handler
。详见2.7.3。
2.7.3 HandlerContext
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
...
}
HandlerContext
承继于HandlerDispatcher
,而他的dispatch
办法,能够看到,便是将block丢到设置MainLooper
的handler
履行。所以续体将会在主线程履行状况机,到达切换到主线程履行协程的意图。