开启生长之旅!这是我参与「日新方案 12 月更文应战」的第29天,点击查看活动概况

1.Flow流程中为什么是【冷】的

先看一段Flow的运用代码:

fun main() = runBlocking {
    flow {
        emit(0)
        emit(1)
        emit(2)
        emit(3)
    }.collect{
        println("it:$it")
    }
    println("end")
}
//输出成果:
//it:0
//it:1
//it:2
//it:3
//end

这是Flow的最简略的运用方法,经过调用collect达到我所希望的成果,那么在弄清楚Flow为什么是冷的之前先看一下Flow的创立流程:

public fun <T> flow(
    @BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = 
    SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

flow是一个高阶函数,参数类型是FlowCollector.() -> Unit,FlowCollector是它的扩展或者成员办法,没有参数也没有返回值,flow()函数的返回值是Flow,详细到返回类型是SafeFlow()SafeFlow()AbstractFlow()的子类。

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    //1
    public final override suspend fun collect(collector: FlowCollector<T>) {
        //2
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            //3
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

AbstractFlow源码中能够知道它完成了Flow接口,这儿进一步知道了SafeFlow间接的完成了Flow接口。这儿深化了解一下AbstractFlow做了哪些工作,经过上面三个注释进行解说:

  • 注释1:这个collect便是demo中的collect的调用。这儿我做一个猜想collect的调用会触发上游Lambda中的emit()函数的履行,然后将数据传递给collect
  • 注释2:这儿主要便是进行了安全查看,SafeCollector只是把collect中的参数FlowCollector<T>重新进行了封装,详细的安全查看是怎么样的稍后进行剖析;
  • 注释3:这儿调用了collectSafely这个抽象办法,而这儿的详细完成是在SafeFlow中的collectSafely,然后调用了collector.block(),这儿其实便是调用了flow()中的emit办法,或者说collector.block()调用了4次emit()函数。

到这儿Flow的创立就完毕了,那么现在来总结一下为什么Flow是冷的:FLow之所以是冷的是由于它的结构进程,在它的结构进程中结构了一个SafeFlow目标但并不会触发Lambda表达式的履行,只有当collect() 被调用后Lambda表达式才开始履行所以它是冷的。

  • SafeCollector是怎么进行安全查看的
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
    //1
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
    override fun getStackTraceElement(): StackTraceElement? = null
    @JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    private var lastEmissionContext: CoroutineContext? = null
    private var completion: Continuation<Unit>? = null
    // ContinuationImpl
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext
    override fun invokeSuspend(result: Result<Any?>): Any {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }
    // Escalate visibility to manually release intercepted continuation
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }
    /**
     * 这是状态机重用的巧妙完成。
	 * 首先它查看它没有被并发运用(这儿是明确制止的),
	 * 然后只缓存一个完成实例以避免在每次宣布时进行额定分配,使其在热路径上消除垃圾。
     */
    //2
    override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //3
                emit(uCont, value)
            } catch (e: Throwable) {
                //保存已抛出emit反常的事实,便于被上游 
                lastEmissionContext = DownstreamExceptionElement(e)
                throw e
            }
        }
    }
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        //4
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //5
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        if (previousContext is DownstreamExceptionElement) {
            exceptionTransparencyViolated(previousContext, value)
        }
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}
//6
private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
  • 注释1:这儿的collect参数对应的便是Flow在结构时的匿名内部类FlowCollector,在AS中看到的this:FlowCollector<Int>便是它;
  • 注释2:这个emit()函数其实便是demo中用来发送数据的emit(0)...emit(3),这儿能够了解为Flow上游发送的数据终究会传递到这个emit()中;
  • 注释3:这儿的emit(uCont, value)函数中多了两个参数,其间第一个是suspendCoroutineUninterceptedOrReturn,它是一个高阶函数,是把挂起函数的Continuation暴露出来并作为参数进行传递,第二个则是上游发送过来的数据。这儿还有一个反常捕获,反常被捕获后存储在lastEmissionContext,作用是:在下流发送反常今后能够让上游感知到。下面会有一个比照;
  • 注释4:这儿对当时协程上下文与之前的协程上下文进行比照,如果两者不一致就会履行checkContext(),在它里面做出进一步的判别和提示;
  • 注释5:这儿调用的便是下流的emit(),也便是FlowCollector中的emit函数,这儿接纳的那个value便是上游的emit(0)...emit(3)传递过来的,这儿可能不太好了解,我将demo中的代码换个写法就理解了:
fun main() = runBlocking {
    flow {
        emit(0)
        emit(1)
        emit(2)
        emit(3)
    }.collect{
        println("it:$it")
    }
    println("换个写法")
    flow{
        emit(0)
        emit(1)
        emit(2)
        emit(3)
    }.collect(object :FlowCollector<Int>{
        //注释5调用的便是这个emit办法。
        override suspend fun emit(value: Int) {
            println("value:$value")
        }
    })
    println("end")
}

两种写法的到的成果是如出一辙的,第一种写法其实便是第二种写法的简写方法。

  • 注释6:这是函数引用的语法,代表了它便是FlowCollectoremit()办法,它的类型是Function3, Any?, Continuation, Any?>,而这个Function3在前面了解挂起函数原理的时分将Kotlin代码反编译后有提过相似的,Function3代表的是三个参数的函数类型。

2.FlowCollector:上游与下流之间的桥梁

Flow创立的时分结构了一个SafeFlow目标,间接完成了Flow接口,因而能够直接调用collect()停止操作符,然后获取到Flow中的emit发送出来的数据。

这儿其实就验证了我上面的那个猜想:下流collect的调用会触发上游Lambda中的emit()函数的履行,上游的emit将数据传递给下流的emit()

那么上游和下流又是怎么进行衔接的呢?

先来看一段源码

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

在Flow的结构进程中结构了SafeFlow目标而且间接的完成了Flow,Flow中的collect便是停止操作,而collect函数中的参数FlowCollector中的emit()函数则是下流用来接纳上游发送数据的emit()

这儿再来回忆一下SafeCollector办法中所做的事情:首先collect停止符的调用会触发上游Lambda中的emit()函数履行,它将数据发送出去,然后进入到SafeCollector中的emit()函数,在这个函数中又将从上游数据发送到下流collect中的emitFun()函数中,这样就完成了衔接。所以说FlowCollector是上下流之间的桥梁。

3.中心操作符

在前面剖析Kotlin协程—Flow时咱们知道在Flow的上下流之间还能够添加一些操作符(中转站)的。

这儿我用之前的代码进行剖析:

fun flowTest() = runBlocking {
    flow {
        emit(0)
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.filter {			//中转站①
        it > 2
    }.map {				//中转站②
        it * 2
    }.collect {			//接纳
        println("it:$it")
    }
}
//输出成果:
//it:6
//it:8

由于上面咱们现已知道了上下流之间调用进程,所以这儿我先用一张图来表示有了中心操作符的进程。

【Kotlin回顾】22.Kotlin协程—Flow原理

从图中能够看到当Flow中呈现中心操作符的时分,上下流之间就会多出2个中转站,关于每一个中转站都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流。为什么会是这样一个流程,咱们对中心操作符进行剖析:

  • filter
//1
public inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
    //8
    if (predicate(value)) return@transform emit(value)
}
//2
internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { 
    //6
    collect { value ->
        //7
        return@collect transform(value)
    }
}
//3
internal inline fun <T> unsafeFlow(
    @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
    //4
    return object : Flow<T> {
        //5
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}
    • 注释1、2、3:返回值均为Flow,也便是说Flow.filter的返回值也是Flow,这个进程只是一个Flow的再次被封装的进程;
    • 注释4:这儿会变成一个普通的Flow匿名内部类目标;
    • 注释5:这儿应该比较熟悉了,完好代码应该是flow{}.filter{}.collect{},依据之前的剖析很容易知道这便是触发上游Lambda代码的履行,也便是履行注释6、注释7;
    • 注释6:collect{},这儿是在调用上游 Flow 的 collect{},触发上游的 Lambda 履行了,也便是注释5触发的Lambda的履行,然后注释 7 就会被履行;
    • 注释7:这儿的transform中的额value便是上游传递下来的值,至于怎么传递下来的就要看注释8了;
    • 注释8:首先这儿有一个条件if (predicate(value)),这个判别便是filter传入的it > 2这个条件,符合这个条件的才会持续履行,也便是经过emit()函数向下流传递数据。
  • map
public inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}
internal inline fun <T> unsafeFlow(
    @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}
    • 能够看到map和前面的filter是如出一辙的流程。

4.上下文维护

在前面剖析Kotlin协程—Flow时有提到withContext,当时仅仅是提到了运用withContext会使代码变得丑陋,其实还有另一层原因——如果调用withContext改变协程上下文的话,Flow上游与下流的协程上下文就会变得不一致。在默许情况下Flow下流的协程上下文终究会成为上游的履行环境,也会变成中心操作符的履行环境,所以才让Flow本身就支持协程的「结构化并发」的特性,例如结构化撤销。而withContext参加会使Flow上游与下流的协程上下文变得不一致,它们的整体结构也会被损坏,然后导致「结构化并发」的特性被损坏,所以不要容易运用withContext,而Flow本身是带有上下文维护的。

Flow 源码中关于上下文的检测,称之为上下文维护,前面剖析SafeCollector时没有深化checkContext剖析,这儿持续来剖析一下,上下文维护是怎样的流程。

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context
    currentContext.ensureActive()
    val previousContext = lastEmissionContext
    if (previousContext !== currentContext) {
        checkContext(currentContext, previousContext, value)
    }
    completion = uCont
    return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
    val result = currentContext.fold(0) fold@{ count, element ->
        val key = element.key
        val collectElement = collectContext[key]
        if (key !== Job) {
            return@fold if (element !== collectElement) Int.MIN_VALUE
            else count + 1
        }
        val collectJob = collectElement as Job?
        val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
        if (emissionParentJob !== collectJob) {
            error(
                "Flow invariant is violated:\n" +
                        "\t\tEmission from another coroutine is detected.\n" +
                        "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
                        "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
                        "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
            )
        }
        if (collectJob == null) count else count + 1
    }
    //判别上游、下流的Context
    if (result != collectContextSize) {
        error(
            "Flow invariant is violated:\n" +
                    "\t\tFlow was collected in $collectContext,\n" +
                    "\t\tbut emission happened in $currentContext.\n" +
                    "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
        )
    }
}

所以,总的来说,Flow 不允许直接运用 withContext{} 的原因,是为了“结构化并发”,它并不是不允许切换线程,而是不允许随意损坏协程的上下文。Kotlin 提供的操作符 flowOn{},官方现已帮咱们处理好了上下文的问题,所以咱们能够放心地切换线程。

5.总结:

1.Flow的调用进程分为三个过程:
  • 上游的Flow创立SafeFlow目标,下流的Flow的collect()函数触发上游的emit()函数履行开始发送数据;
  • 上游的emit()发送的数据进入到SafeCollector,其实上游的emit()函数调用的便是SafeCollector中的emit()函数;
  • SafeCollector中调用emitFun()其实便是调用了下流的emit()函数将数据传递给下流。
2.中心操作符:

如果有中心操作符的话,每个操作符都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流;

3.上下文维护:

Flow的上游和中心操作符并不需要协程作用域,所以他们都是共用Flow下流的协程上下文,也正是由于这种规划所以Flow具有天然的『结构化并发』的特色,因而Kotlin官方也限制了开发者不能随意在上游与中转站阶段改变Flow的上下文。