1. 初步知道Flow(冷流)

    1.Flow上游

    a.先看一下flow的简单运用

    private fun flowTest() = runBlocking {
        flow {
            emit(0)
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }.collect {
            Log.d(TAG, "it:$it")
        }
    }
    输出成果:
    //输出成果:
    //it:0
    //it:1
    //it:2
    //it:3
    //it:4
    

    Flow从字面意思了解便是流,Flow除了有发送方和接纳方之外还能够有中转站,什么是中转站呢,中转站便是咱们常用的中心操作符。比方 : .filter() .map()等等

    b.中转站用法

    private fun flowTest() = runBlocking {
        flow {
            //发送方
            emit(0)
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }.filter {
            //中转站(1)
            it > 2
        }.map {
            //中转站(2)
            it * 2
        }.collect {
            //承受方
            Log.d(TAG, "it:$it")
        }
    }
    //输出成果:
    //it:6
    //it:8
    

    (1) flow{}: 是一个高阶函数,作用便是创立一个新的Flow,创立好后就要把音讯发送出去,这儿的emit是发射、发送的意思,那么flow{}的作用便是创立一个数据流而且将数据发送出去;

    (2) filter{}、map{}: 这是中心操作符,都是高阶函数,就像中转站相同对数据进行处理后向下传递;

    (3) collect{}: 中止操作符,中止Flow数据流并接纳从上游传递的数据。

    (4) 除了经过flow{}创立Flow之外还有flowOf{},也能够创立一个Flow

    private fun flowOfTest() {
        runBlocking {
            flowOf(0, 1, 2, 3, 4)
                .filter {
                    it > 2
                }.map {
                    it * 2
                }.collect {
                    Log.d(TAG, "it:$it")
                }
        }
    }
    //输出成果:
    //it:6
    //it:8
    

    (5)collect ->是中止操作符, 作用是接纳从上游传递的数据,那要是不接纳会怎样样?

    答案是:运转上面的代码会发现什么都没有做就结束了,而添加**collect函数后filtermap的日志便是正常输出的,因而得出一个定论:只要调用中止操作符collect**之后,Flow 才会开端作业。所以Flow是一个 冷流

    没有承受方Flow是不会作业的。

    (6)上面两段代码都发送了5条数据,然后由collect接纳,那么是一次发送完毕仍是逐条发送呢?

    private fun flowOfTest() {
        runBlocking {
            flowOf(0, 1, 2, 3, 4)
                .filter {
                    Log.d(TAG, "filter:$it")
                    it > 2
                }.map {
                    Log.d(TAG, "map:$it")
                    it * 2
                }.collect {
                    Log.d(TAG, "collect:$it")
                }
        }
    }
    输出成果:
    //filter:0
    //filter:1
    //filter:2
    //filter:3
    //map:3
    //collect:6
    //filter:4
    //map:4
    //collect:8
    

    从输出成果能够很清楚的知道Flow一次只会处理一条数据。

    (7)Kotlin还供给了Flow转List、List转Flow的API….Flow创立的几种办法

    Flow创立办法 运用场景 用法
    flow 未知数据集 flow{ emit() }.collect{ }
    flowOf 已知数据集 flowOf(T).collect{ }
    listOf 已知数据来历的调集 listOf(T).asFlow().collect{ }

    2.Flow中转站(中心操作符)

    A.中转站便是咱们常用的中心操作符,跟调集相同操作符(这边只列出了一部分。更多的自己点击查看源码)

    /**
     * 回来只包括与给定[predicate]匹配的原始流的值的流
     */
    public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
        if (predicate(value)) return@transform emit(value)
    }
    /**
     * 回来只包括与给定[predicate]值不匹配的原始流的值的流
     */
    public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
        if (!predicate(value)) return@transform emit(value)
    }
    /**
     * 回来一个只包括原始流的非空值的流
     */
    public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
        if (value != null) return@transform emit(value)
    }
    /**
     * 回来一个流,其间包括对原始流的每个值运用给定[transform]函数的成果。
     */
    public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
        return@transform emit(transform(value))
    }
    /**
     * 回来一个流,将每个元素包装成[IndexedValue],包括value和它的索引(从0开端)。
     */
    public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
        var index = 0
        collect { value ->
            emit(IndexedValue(checkIndexOverflow(index++), value))
        }
    }
    /**
     * 回来一个流,在上游流的每个值被下流宣布之前调用给定的[action]。
     */
    public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
        action(value)
        return@transform emit(value)
    }
    

    B.Flow特有的操作符.生命周期onStart, onCompletion

    private fun flowTest() = runBlocking {
        flow {
            //发送方
            emit(0)
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }.filter {
            //中转站(1)
            Log.d(TAG, "filter:$it")
            it > 2
        }.map {
            Log.d(TAG, "map:$it")
            //中转站(2)
            it * 2
        }.onStart {
            Log.d(TAG, "onStart:")
        }.onCompletion {
            Log.d(TAG, "onCompletion:")
        }
            .collect {
                //承受方
                Log.d(TAG, "collect:$it")
            }
    }
    输出:
    //onStart:
    //filter:0
    //filter:1
    //filter:2
    //filter:3
    //map:3
    //collect:6
    //filter:4
    //map:4
    //collect:8
    //onCompletion
    

    (1)能够看到onStart函数的履行数序与它在代码中界说的次序没有联系,而其他两个操作符filtermap的履行流程则跟它们界说的次序息息相关。

    (2)onCompletion在它的注释中也标注的比较清楚,类似于finally,都是在终究履行。

    C. Flow特有的操作符->catch反常处理

    Flow中的catch反常处理时要遵从上下流规矩的,由于Flow是具有上下流之分的,详细来讲便是catch只能管理自己上游产生的反常,对于它下流的反常则无能为力,用代码来展示一下他们的差异:

    private fun flowOfTest() {
        runBlocking {
            flowOf(0, 1, 2, 3, 4)
                .filter {
                    Log.d(TAG, "filter:$it")
                    it > 2
                }.map {
                    Log.d(TAG, "map:$it")
                    it * 2
                }.map {
                    it / 0
                }.catch {
                    Log.d(TAG, "catch:$it")
                }.collect {
                    Log.d(TAG, "collect:$it")
                }
        }
    }
    输出成果:
    //filter:0
    //filter:1
    //filter:2
    //filter:3
    //map:3
    //catch:java.lang.ArithmeticException: divide by zero
    

    上游产生 反常 ,在反常后捕获

    private fun flowOfTest() {
        runBlocking {
            flowOf(0, 1, 2, 3, 4)
                .filter {
                    Log.d(TAG, "filter:$it")
                    it > 2
                }.map {
                    Log.d(TAG, "map:$it")
                    it * 2
                }.catch {
                    Log.d(TAG, "catch:$it")
                }.map {
                    it / 0
                }.collect {
                    Log.d(TAG, "collect:$it")
                }
        }
    }
    //直接奔溃了
    // java.lang.ArithmeticException: divide by zero
    

    上游捕获反常,下流产生 反常

    (1)从两段代码能够非常清楚的总结出:上游产生反常并在反常后捕获是不会形成程序中止的,而在上游捕获反常,下流产生反常时则会形成程序中止。

    (2)那么下流的反常就无法捕获了吗?并不是,对于下流的反常能够考虑选用最传统的做法try catch办法

    (3)总结:Flow中的catch操作符的作用与它地点的方位是强相关的,catch无法捕获的能够选用try catch捕获。

    D.Flow特有的操作符——切换Context:flowOn、launchIn

    Flow由于它具有上游、中心操作符、下流的特性,使得它能够处理杂乱且异步履行的使命,那么异步履行的使命中大多又涉及到线程切换,Flow也恰好供给了线程切换的API。

    flowOn

    private fun flowTest2() {
        //flow切换线程
        runBlocking {
            flow {
                emit(0)
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            }.filter {
                Log.d(TAG, "filter:$it")
                Log.d(TAG, "thread = ${Thread.currentThread().name}")
                it > 2
            }.flowOn(Dispatchers.IO).map {
                Log.d(TAG, "map:$it")
                Log.d(TAG, "thread = ${Thread.currentThread().name}")
                it * 2
            }.collect {
                Log.d(TAG, "collect:$it")
                Log.d(TAG, "thread = ${Thread.currentThread().name}")
            }
        }
    }
    //输出成果
    //filter:0
    //thread = DefaultDispatcher-worker-2
    //filter:1
    //thread = DefaultDispatcher-worker-2
    //filter:2
    //thread = DefaultDispatcher-worker-2
    //filter:3
    //thread = DefaultDispatcher-worker-2
    //map:3
    //map thread = main
    //collect: 6
    //collect thread = main
    //map:4
    //map thread = main
    //collect: 8
    //collect thread = main
    

    flowOn线程的切换范围与catch相同仅针对上游,那么要拟定collect中的Context该怎样办?

    能够运用withContext,假如除了collect之外还想让其他操作符也运转在collect地点的线程中就会遇到问题,尽管依旧能够运用withContext可是这样的写法就会很丑恶,就像下面这样失去了本来简洁的链式调用。那么解决这个问题的另一种计划launchIn就派上用场了。

    launchIn

    public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
        collect() 
    }
    

    运用了launchIn操作符的flow无法再调用collect,从launchIn源码中可知,launchIn调用了collect()

    private fun flowTest3() {
        runBlocking {
            flow {
                emit(0)
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            }.filter {
                Log.d(TAG, "filter:$it")
                Log.d(TAG, "thread = ${Thread.currentThread().name}")
                it > 2
            }.flowOn(Dispatchers.IO).map {
                Log.d(TAG, "map:$it")
                Log.d(TAG, "map thread = ${Thread.currentThread().name}")
                it * 2
            }.onEach {
                Log.d(TAG, "onEach:$it")
                Log.d(TAG, "onEach thread = ${Thread.currentThread().name}")
            }.launchIn(CoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
            //当然你能够自界说的dispatcher
            //lifecycleScope == CoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
        }
    }
    //输出成果:
    //map{}、onEach{}、flow{}运转在主线程
    //filter{}运转在DefaultDispatcher -> DefaultDispatcher-worker-1
    

    3.Flow下流->中止操作符

    A. 在Flow中最常见的操作符便是collect(),除此之外还有first()Last()single()fold{}reduce{}。还有一个特殊的当Flow调用toList转化成调集后toList后边的API都不再属于Flow因而这也就阐明toList也算是一种中止操作符

    4.Flow总结&特色

    Flow,SharedFlow,StateFlow的使用及原理

  1. Flow的原理

1.Flow流程中为什么是冷流?

咱们来剖析下flow的源码

public fun <T> flow(
    @BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = 
    SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

flow是一个高阶函数,参数类型是FlowCollector.() -> Unit,FlowCollector是它的扩展或许成员办法,没有参数也没有回来值,flow()函数的回来值是Flow,详细到回来类型是SafeFlow()SafeFlow()AbstractFlow()的子类。

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    //注释1
    public final override suspend fun collect(collector: FlowCollector<T>) {
        //注释2
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            //注释3
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

AbstractFlow源码中能够知道它完成了Flow接口,这儿进一步知道了SafeFlow间接的完成了Flow接口。这儿深入了解一下AbstractFlow做了哪些作业,经过上面三个注释进行解释:

  • 注释1:这个collect便是demo中的collect的调用。这儿做一个猜想collect的调用会触发上游Lambda中的emit()函数的履行,然后将数据传递给collect

  • 注释2:这儿首要便是进行了安全查看,SafeCollector仅仅把collect中的参数FlowCollector<T>重新进行了封装,详细的安全查看是怎样样的稍后进行剖析;

  • 注释3:这儿调用了collectSafely这个笼统办法,而这儿的详细完成是在SafeFlow中的collectSafely,然后调用了collector.block(),这儿其实便是调用了flow()中的emit办法,或许说collector.block()调用了4次emit()函数。

现在来总结一下为什么Flow是冷的:FLow之所以是冷的是由于它的结构进程,在它的结构进程中结构了一个SafeFlow目标但并不会触发Lambda表达式的履行,只要当collect() 被调用后Lambda表达式才开端履行所以它是冷的。

2.SafeCollector剖析

@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
    //注释1
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
    override fun getStackTraceElement(): StackTraceElement? = null
    @JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    private var lastEmissionContext: CoroutineContext? = null
    private var completion: Continuation<Unit>? = null
    // ContinuationImpl
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext
    override fun invokeSuspend(result: Result<Any?>): Any {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }
    // Escalate visibility to manually release intercepted continuation
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }
    /**
     *  这是状况机重用的奇妙完成。
     * 首要它查看它没有被并发运用(这儿是清晰制止的),
     * 然后只缓存一个完成实例以防止在每次宣布时进行额定分配,使其在热路径上消除垃圾。
     */
    //注释2
    override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //注释3
                emit(uCont, value)
            } catch (e: Throwable) {
                //保存已抛出emit反常的现实,便于被上游 
                lastEmissionContext = DownstreamExceptionElement(e)
                throw e
            }
        }
    }
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        //注释4
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //注释5
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        if (previousContext is DownstreamExceptionElement) {
            exceptionTransparencyViolated(previousContext, value)
        }
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}
//注释6
private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

注释1:这儿的collect参数对应的便是Flow在结构时的匿名内部类FlowCollector,在AS中看到的this:FlowCollector<Int>便是它;

注释2:这个emit()函数其实便是demo中用来发送数据的emit(0)...emit(3),这儿能够了解为Flow上游发送的数据终究会传递到这个emit()中;

注释3:这儿的emit(uCont, value)函数中多了两个参数,其间第一个是suspendCoroutineUninterceptedOrReturn,它是一个高阶函数,是把挂起函数的Continuation暴露出来并作为参数进行传递,第二个则是上游发送过来的数据。这儿还有一个反常捕获,反常被捕获后存储lastEmissionContext,作用是:在下流发送反常今后能够让上游感知到。下面会有一个对比;

注释4:这儿对当时协程上下文与之前的协程上下文进行对比,假如两者不一致就会履行checkContext(),在它里边做出进一步的判别和提示;

注释5:这儿调用的便是下流的emit(),也便是FlowCollector中的emit函数,这儿接纳的那个value便是上游的emit(0)...emit(3)传递过来的,能够了解成下面demo中的代码:

private fun flowOtherTest() {
    runBlocking {
        flow {
            emit(0)
            emit(1)
            emit(2)
            emit(3)
        }.collect(object : FlowCollector<Int> {
            //注释5调用的便是这个emit办法。
            override suspend fun emit(value: Int) {
                Log.d(TAG, "collect:$value")
            }
        })
    }
}

注释6:这是函数引证的语法,代表了它便是FlowCollectoremit()办法

3.FlowCollector 上游和下流之间的链接

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

Flow供给了一个办法,让下流触发一个搜集动作collect,天经地义下流作为搜集者的FlowCollector也供给了一个emit办法,让下流来接纳上游发送数据emit()

在Flow的结构进程中结构了SafeFlow目标而且间接的完成了Flow,Flow中的collect便是中止操作,而collect函数中的参数FlowCollector中的emit()函数则是下流用来接纳上游发送数据的emit()

这儿再来回顾一下SafeCollector办法中所做的工作:首要collect中止符的调用会触发上游Lambda中的emit()函数履行,它将数据发送出去,然后进入到SafeCollector中的emit()函数,在这个函数中又将从上游数据发送到下流collect中的emitFun()函数中,这样就完成了衔接。所以说FlowCollector是上下流之间的桥梁。(也能够了解为回调)

Flow,SharedFlow,StateFlow的使用及原理

4.总结

1.Flow的调用进程分为三个步骤:

  • 上游的Flow创立SafeFlow目标,下流的Flow的collect()函数触发上游的emit()函数履行开端发送数据;
  • 上游的emit()发送的数据进入到SafeCollector,其实上游的emit()函数调用的便是SafeCollector中的emit()函数;
  • SafeCollector中调用emitFun()其实便是调用了下流的emit()函数将数据传递给下流。

2.中心操作符:

假如有中心操作符的话,每个操作符都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流;

  1. ShardFlow与StateFlow(暖流)

上面咱们介绍到Flow是冷流。那么假如咱们要运用暖流,该怎样办呢?

答案是Flow 有供给相关完成: 那便是StateFlow 和 SharedFlow 。

StateFlow 和 SharedFlow 是暖流,出产数据不依赖顾客消费,暖流与顾客是一对多的联系,当有多个顾客时,它们之间的数据都是同一份。

1.SharedFlow与StateFlow的特色

A.SharedFlow
  • 对于同一个数据流,能够答应有多个订阅者同享。
  • 不调用collect搜集数据,也会开端发送数据。
  • 答应缓存前史数据(默许是非粘性,当设置缓存数据后即可完成为粘性, replay = 1)
  • 发送数据函数都是线程安全的。
  • 不防抖。能够发送相同的值
  • 无初始化值
B.StateFlow
  • 都答应多个顾客
  • 都有只读与可变类型
  • 永久只保存一个状况值,会把最新值重现给订阅者,即粘性。
  • 防抖。设置重复的值不会重新发送给订阅者
  • 有必要传入初始值,确保值的空安全,永久有值
C. StateFlow与LiveData不同的是
  • 强制要求初始默许值
  • 支撑CAS形式赋值
  • 默许支撑防抖 过滤
  • value的空安全校验
  • Flow丰厚的异步数据流操作
  • 默许没有Lifecycle支撑,flowcollect是挂起函数,会一向等候数据流传递数据
  • 线程安全LiveDatapostValue尽管也可在异步运用,但会导致数据丢失。

LiveData除了对于Lifecycle的支撑,StateFlow基本都是处于优势

2.SharedFlow创立及运用

public fun <T> MutableSharedFlow(
    //1.缓存的前史数据容量
    replay: Int = 0,
    //2.除前史数据外的额定缓存区容量
    extraBufferCapacity: Int = 0,
    //3.背压战略
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    //4.线程安全的挂机函数发送数据
    override suspend fun emit(value: T)
    //5.线程安全的测验发送数据
    public fun tryEmit(value: T): Boolean
    //6.同享数据流的订阅者数量
    public val subscriptionCount: StateFlow<Int>
    ...
}

注释1 replay 表明前史元素缓存区容量。

  • 能够将最新的数据缓存到调集内,当时史缓存区满后,会移除最早的元素。
  • 当在新顾客订阅了该数据流,会先将前史缓存区元素顺次发送给新的顾客,然后才发送新元素。

注释2:extraBufferCapacity:MutableSharedFlow 缓存的数据个数为 replay + extraBufferCapacity; 缓存一方面用于粘性事情的发送,另一方面也为了处理背压问题,既下流的顾客的消费速度低于上游出产者的出产速度时,数据会被放在缓存中。。

注释3:onBufferOverflow:背压处理战略,缓存区满后怎样处理(挂起或丢掉数据),默许挂起。留意,当没有订阅者时,只要最近 replay 个数的数据会存入缓存区,不会触发 onBufferOverflow 战略。

    private fun sharedFlow(){
        runBlocking {
            val _sharedFlow = MutableSharedFlow<Int>()
            val sharedFlow = _sharedFlow.asSharedFlow()
            lifecycleScope.launch(Dispatchers.IO) {
                for(i in 0..50){
                    Log.d(TAG, "emit:$i")
                    _sharedFlow.emit(i)
                    delay(50)
                }
            }
        }
    }
    输出成果:
    //emit:0
    //emit:1
    // ...
    //emit:50

尽管此时并没有顾客订阅,但依旧会履行发送数据操作,仅仅现在没有设置前史缓存,一切数据都被”扔掉”了。

再让咱们看看设置replay = 3的状况。这时分订阅者会收到3个值.

private fun sharedFlow(){
    runBlocking {
        val _sharedFlow = MutableSharedFlow<Int>(replay = 3)
        val sharedFlow = _sharedFlow.asSharedFlow()
        lifecycleScope.launch(Dispatchers.IO) {
            for(i in 0..50){
                Log.d(TAG, "emit:$i")
                _sharedFlow.emit(i)
                delay(50)
            }
        }
            delay(5000)
            //推迟5000毫秒去订阅,其实这时分流现已发送完了.相当于新的订阅者
            sharedFlow.onEach {
                Log.d(TAG, "onEach:$it")
            }.launchIn(lifecycleScope)
    }
}
输出成果:
//emit:0
//emit:1
// ...
//emit:50
//onEach:48
//onEach:49
//onEach:50

Flow冷流转化成SharedFlow暖流

private fun shareIn(){
    runBlocking {
        flowOf(0,1,2,3,4).shareIn(lifecycleScope, SharingStarted.WhileSubscribed())
    }
}
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

这儿started表明新创立的同享数据流的发动与中止战略。

  • Eagerly->当即开端发送数据源。而且消费端永久搜集数据,只会收到前史缓存和后续新数据,直到地点协程撤销。
  • Lazily->a.等候呈现第一个顾客订阅后,才开端发送数据源。确保第一个顾客能收到一切数据,但后续顾客只能收到前史缓存和后续数据。b.顾客会永久等候搜集数据,直到地点协程撤销
  • WhileSubscribed->a.能够说是Lazily战略的进阶版,同样是等候第一个顾客订阅后,才开端发送数据源。

b.但其能够装备在终究一个订阅者关闭后,同享数据流上游中止的时刻(默许为当即中止),与前史数据缓存清 空时刻(默许为永久保存)。

需求留意,在运用shareIn每次都会创立一个新 SharedFlow 实例,而且该实例会一向保存在 内存 ,直到被垃圾回收。所以最好减少转化流的履行次数,不要在函数内每次都调用这类函数。

3.SharedFlow原理

经过MutableSharedFlow工厂函数创立的SharedFlow,内部实践是创立了SharedFlowImpl目标,是运用数组缓存一切数据。

internal open class SharedFlowImpl<T>(
    private val replay: Int, // replayCache的最大容量
    private val bufferCapacity: Int, // buffered values的最大容量
    private val onBufferOverflow: BufferOverflow // 溢出战略
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    // 缓存数组,用于保存emit办法发射的数据,在需求时进行初始化
    private var buffer: Array<Any?>? = null
    // 新的订阅者从replayCache中获取数据的开端方位
    private var replayIndex = 0L
    // 当时一切的订阅者从缓存数组中获取的数据中,对应方位最小的索引
    // 假如没有订阅者,则minCollectorIndex的值等于replayIndex
    private var minCollectorIndex = 0L
    // 缓存数组中buffered values缓存数据的数量
    private var bufferSize = 0
    // 缓存数组中queued emitters缓存数据的数量
    private var queueSize = 0
    // 当时缓存数组的开端方位
    private val head: Long get() = minOf(minCollectorIndex, replayIndex)
    // 当时缓存数组中replayCache缓存数据的数量
    private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
    // 当时缓存数组中现已缓存的数据的数量
    private val totalSize: Int get() = bufferSize + queueSize
    // 当时缓存数组中buffered values的最末尾方位索引的后一位
    private val bufferEndIndex: Long get() = head + bufferSize
    // 当时数组中queued emitters的最末尾方位索引的后一位
    private val queueEndIndex: Long get() = head + bufferSize + queueSize
    ...
   //2.tryEmit 
   override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    val emitted = synchronized(this) {
        if (tryEmitLocked(value)) {
            resumes = findSlotsToResumeLocked(resumes) //寻觅需求康复的挂起函数
            true
        } else {
            false
        }
    }
        for (cont in resumes) cont?.resume(Unit) //康复履行挂起函数,发送数据
        return emitted
    }
    //1.emit
     override suspend fun emit(value: T) {
            if (tryEmit(value)) return // fast-path 先测验发送数据
            emitSuspend(value)  //无法发送数据时先创立挂起函数
     }
}
  1. 发送数据

SharedFlowImpl这个类第一次看起来有点多,这儿先从emittryEmit看起,看看是如何完成发送数据。

tryEmit内部经过synchronized加锁,是线程安全的。

注释1:emit办法会先进行一次tryEmit的处理,当回来false的时分再进行suspend的发送操作

注释2:经过前面对tryEmit办法的注释判别缓存战略为 BufferOverflow.SUSPEND才有可能为true,所以再看tryEmitLocked办法

//从这儿能够看到,当战略选用suspended一起缓存溢出的时分,回来false,否则,永久回来true,一起做一些事情上的入队处理等
@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
    //注释1.--------没有检测到订阅者(collect搜集器)
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
    //注释2.--------检测到订阅者(collect搜集器)假如当时有订阅者,一起buffered values已达到最大容量。
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        when (onBufferOverflow) {
            BufferOverflow.SUSPEND -> return false // will suspend,只要这儿才会回来true
            BufferOverflow.DROP_LATEST -> return true // just drop incoming
            BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
        }
    }
    //注释3.--------buffered values还能够持续添加数据, 将数据加入到缓存数组中
    // 这儿由于tryEmit办法不会挂起emit办法地点的协程, 
    // 所以value没有被封装成Emitter类型的目标
    enqueueLocked(value)
    bufferSize++ // value was added to buffer
    // drop oldest from the buffer if it became more than bufferCapacity
    //注释4.--------  假如buffered values的数据数量超越最大容量的约束, 
    //则调用dropOldestLocked办法,丢掉最旧的数据
    if (bufferSize > bufferCapacity) dropOldestLocked()
    // keep replaySize not larger that needed
    //注释5.--------   假如replayCache中数据的数量超越了最大容量
    if (replaySize > replay) { // increment replayIndex by one
        // 更新replayIndex的值,replayIndex向前移动一位
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    return true
}
  • tryEmitLocked办法代码注释1处可知: 由于是暖流tryEmitNoCollectorsLocked办法便是没有订阅者的状况
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
    assert { nCollectors == 0 }
    //replay == 0直接回来,不需求去缓存
    if (replay == 0) return true // no need to replay, just forget it now
    //将值入队
    enqueueLocked(value) // enqueue to replayCache
    //缓存加1
    bufferSize++ // value was added to buffer
    // drop oldest from the buffer if it became more than replay
    //假如缓存满了,就删除最老的那一条数据
    // 假如buffered values的数据数量超越了replayCache的最大容量 
    // 则丢掉最旧的数据 
    // 由于新订阅者只会从replayCache中取数据, 
    // 假如没有订阅者,buffered values的数据数量超越replayCache的最大容量没有意义
    if (bufferSize > replay) dropOldestLocked()
    //重新设置订阅者的值方位索引
    minCollectorIndex = head + bufferSize // a default value (max allowed)
    return true
}
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
private fun enqueueLocked(item: Any?) {
    //totalSize = 缓冲值的数目+排队发射器的数量
    val curSize = totalSize
    val buffer = when (val curBuffer = buffer) {
        // 缓存数组为空,则进行初始化,初始化容量为2
        null -> growBuffer(null, 0, 2)
        // 假如超越了当时缓存数组的最大容量,则进行扩容,新的缓存数组的容量为之前的2倍 
        // growBuffer办法会把原来缓存数组的数据填充到新的缓存数组中
        else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
    }
    buffer.setBufferAt(head + curSize, item)
}

从tryEmitLocked办法代码注释2处可知现在是有订阅者的状况:

1.bufferSize >= bufferCapacity 缓存数组中buffered values缓存数据的数量 >= buffered values的最大容量。

2.minCollectorIndex <= replayIndex 当时一切的订阅者从缓存数组中获取的数据中,对应方位最小的索引<=新的订阅者从replayCache中获取数据的开端方位

到此tryEmitLocked办法剖析完了。回到tryEmit办法

override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    val emitted = synchronized(this) {
        if (tryEmitLocked(value)) {
            //搜集现已挂起的订阅者的续体
            resumes = findSlotsToResumeLocked(resumes)
            true
        } else {
            false
        }
    }
    // 遍历,引发挂起的订阅者
    for (cont in resumes) cont?.resume(Unit)
    return emitted
}

findSlotsToResumeLocked

private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
    // 引证参数中的续体数组
    var resumes: Array<Continuation<Unit>?> = resumesIn
    // 用于记载需求康复的续体的数量
    var resumeCount = resumesIn.size
    // 遍历订阅者数组
    forEachSlotLocked loop@{ slot ->
        // 获取续体,假如续体为空,阐明对应订阅者的协程没有挂起,本次循环回来
        val cont = slot.cont ?: return@loop
        // 判别slot中index是否符合要求
        // 假如不符合要求,则本次循环回来
        if (tryPeekLocked(slot) < 0) return@loop
        // 假如需求康复的续体的数量超越续体数组的容量,则进行扩容
        // 新的续体数组的容量是之前续体数组容量的2倍
        if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
        // 保存续体到续体数组中
        resumes[resumeCount++] = cont
        // 清空slot中保存的续体
        slot.cont = null
    }
    // 回来搜集完的续体数组
    return resumes
}

到这儿非 挂起 的发送流程结束,下面看看挂起发送流程

override suspend fun emit(value: T) {
    //非挂起
    if (tryEmit(value)) return // fast-path
    //挂起
    emitSuspend(value)
}

挂起调用emitSuspend办法,这儿也调用了上面提到的findSlotsToResumeLocked办法

private suspend fun emitSuspend(value: T) =
  // 直接挂起emit办法地点的协程,获取续体
  suspendCancellableCoroutine<Unit> sc@{ cont ->
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    // 加锁
    val emitter = synchronized(this) lock@{
        // 这儿再次测验以tryEmit的办法发射数据
        if (tryEmitLocked(value)) {
            // 假如发射成功,则康复续体的履行
            cont.resume(Unit)
            // 搜集现已挂起的订阅者的续体
            resumes = findSlotsToResumeLocked(resumes)
            // 回来
            return@lock null
        }
        // 将续体、待发射的数据等封装成Emitter类型的目标
        Emitter(this, head + totalSize, value, cont).also {
            // 加入到缓存数组中
            enqueueLocked(it)
            // queued emitters的数据的数量加1
            queueSize++
            // 假如buffered values的最大容量为0,即不存在
            // 则搜集现已挂起的订阅者的续体,保存到局部变量resumes中
            if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
        }
    }
    // emitter目标监听emit办法地点协程的撤销
    // 产生撤销时会调用emitter目标的dispose办法
    emitter?.let { cont.disposeOnCancellation(it) }
    // 遍历,引发挂起的订阅者
    for (cont in resumes) cont?.resume(Unit)
}

sharedFlow设置了缓存优先从replayCache取数据.SharedFlowImpl类完成了SharedFlow接口,重写了其间的常量replayCache,当有新订阅者呈现时,假如replayCache存在,而且有缓存数据,则优先从replayCache中获取,代码如下:

override val replayCache: List<T>
    // 只能获取,不能设置,加锁
    get() = synchronized(this) {
        // 获取当时replayCache中缓存数据的数量
        val replaySize = this.replaySize
        // 假如数量为0,则回来一个空列表
        if (replaySize == 0) return emptyList()
        // 若数量不为0,则依据容量创立一个列表
        val result = ArrayList<T>(replaySize)
        // 获取缓存数组
        val buffer = buffer!!
        // 遍历replayCache,将数据进行类型转化,并添加到列表中
        @Suppress("UNCHECKED_CAST")
        for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
        // 回来列表
        result
    }
  1. collect流程(承受数据)
SharedFlowImpl.kt
@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
    // 为当时的订阅者分配一个SharedFlowSlot类型的目标
    val slot = allocateSlot()
    try {
        // 假如collector类型为SubscribedFlowCollector,
        // 阐明订阅者监听了订阅进程的发动,则先回调
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        // 获取订阅者地点的协程
        val collectorJob = currentCoroutineContext()[Job]
        // 死循环
        while (true) {
            var newValue: Any?
            // 死循环
            while (true) {
                // 从缓存数组中获取数据
                newValue = tryTakeValue(slot)
                // 假如获取数据成功,则跳出循环
                if (newValue !== NO_VALUE) break
                // 走到这儿,阐明获取数据失利,
                // 挂起订阅者地点协程,等候新数据的到来
                awaitValue(slot)
            }
            // 走到这儿,阐明现已获取到了数据
            // 判别订阅者地点协程是否是存活的,假如不是则抛出反常
            collectorJob?.ensureActive()
            // 进行类型转化,并向下流发射数据
            collector.emit(newValue as T)
        }
    } finally {
        // 开释已分配的SharedFlowSlot类型的目标
        freeSlot(slot)
    }
}
@SharedImmutable
@JvmField
internal val NO_VALUE = Symbol("NO_VALUE")

在collect办法中,经过tryTakeValue办法获取数据,代码如下:

private fun tryTakeValue(slot: SharedFlowSlot): Any? {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    // 加锁
    val value = synchronized(this) {
        // 从slot中获取index
        // index表明当时应该从缓存数组的index方位中获取数据
        val index = tryPeekLocked(slot)
        // 假如index小于0,阐明没有数据
        if (index < 0) {
            // 回来空数据标识
            NO_VALUE
        } else { // 假如有数据
            // 获取当时的slot的index
            val oldIndex = slot.index
            // 从缓存数组的index处获取数据
            val newValue = getPeekedValueLockedAt(index)
            // 计算下一次获取数据的方位,并保存到slot中
            slot.index = index + 1
            // 更新缓存数组的方位,并获取缓存数组与订阅者数组中可康复的续体
            resumes = updateCollectorIndexLocked(oldIndex)
            // 回来获取的数据
            newValue
        }
    }
    // 遍历,康复续体
    for (resume in resumes) resume?.resume(Unit)
    // 回来获取的数据
    return value
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)

在tryTakeValue办法,获取数据之前,首要会调用tryPeekLocked办法,判别数据地点的方位是否符合要求,代码如下:

private fun tryPeekLocked(slot: SharedFlowSlot): Long {
    // 从slot中获取index
    val index = slot.index
    // 假如是在buffered values中获取,则直接回来
    if (index < bufferEndIndex) return index
    // 走到这儿阐明是要在queued emitters中获取,
    // 假如buffered values的最大容量大于0,则回来-1
    // 在buffered values能够存在的状况下,制止发射者和订阅者接触
    if (bufferCapacity > 0) return -1L
    // 走到这儿阐明要在queued emitters中获取,一起buffered values的最大容量为0
    // 这种状况缓存数组只能有queued emitters,
    // 因而,只能处理queued emitters中的第一个Emitter类型的目标
    // 假如当时订阅者想要处理下一个Emitter类型的目标,则回来-1
    if (index > head) return -1L
    // 走到这儿阐明要在queued emitters中获取,一起buffered values的最大容量为0
    // 而且要获取当时的正在处理的Emmiter类型的目标
    // 假如queued emitters为空,阐明当时没有Emmiter类型的目标,则回来-1
    if (queueSize == 0) return -1L
    // 满意上述要求,回来index
    return index
}

假如数据地点的方位符合要求,则会调用getPeekedValueLockedAt办法获取数据,代码如下:

private fun getPeekedValueLockedAt(index: Long): Any? =
    // 从缓存数组中index方位获取数据
    when (val item = buffer!!.getBufferAt(index)) {
        // 假如是Emitter类型的,则进行拆箱,获取数据
        is Emitter -> item.value
        // 直接回来
        else -> item
    }

Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit办法地点的协程后,对emit办法发射的数据及挂起后的续体进行封装,代码如下:

private class Emitter(
    @JvmField val flow: SharedFlowImpl<*>,
    @JvmField var index: Long, // 当时目标在缓存数组中的方位
    @JvmField val value: Any?,// emit办法发射的数据
    @JvmField val cont: Continuation<Unit> // 挂起的续体
) : DisposableHandle {
    override fun dispose() = flow.cancelEmitter(this)
}

在collect办法中,当订阅者无数据可获取时,则会调用awaitValue办法,挂起订阅者地点的协程,代码如下:

private suspend fun awaitValue(slot: SharedFlowSlot): Unit =
  // 直接挂起订阅者地点的协程
  suspendCancellableCoroutine { cont ->
    // 加锁
    synchronized(this) lock@{
        // 再次查看当时的index是否满意要求
        val index = tryPeekLocked(slot)
        // 假如确实不满意要求
        if (index < 0) {
            // 保存续体到slot中
            slot.cont = cont
        } else { // 假如再次查看发现index这时满意要求
            // 则康复挂起,并回来
            cont.resume(Unit)
            return@lock
        }
        // 保存续体到slot中
        slot.cont = cont
    }
}

到此SharedFlow就剖析完成了。

咱们知道SharedFlow内部是依据数组+synchronized,订阅分发的逻辑则是轮询取出内部缓存数组的数据,没有数据则将订阅挂起,直到上游再次发送数据

  1. StateFlow创立及运用

作为SharedFlow的子类,StateFlow在运用上与其父类基本相同。

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}

StateFlow是一种单数据更新的暖流,经过emit办法更新StateFlow的数据,经过value特点能够获取当时的数据

上面同样是利用同名工厂函数的进行创立,仅仅比较SharedFlow,StateFlow有必要设置默许初始值

private fun stateFlow(){
    runBlocking {
        val _stateFlow = MutableStateFlow(3)
        val stateFlow = _stateFlow.asStateFlow()
        //没有发送数据时当即订阅
        stateFlow.onEach {
            Log.d(TAG, "onEach:$it")
        }.launchIn(lifecycleScope)
        //模拟发送数据
        lifecycleScope.launch(Dispatchers.IO) {
            for(i in 3..50){
                Log.d(TAG, "emit:$i")
                _stateFlow.emit(i)
                delay(50)
            }
        }
        //推迟5000毫秒去订阅,其实这时分流现已发送完了.相当于新的订阅者
        delay(5000)
        //新的订阅者
        stateFlow.onEach {
            Log.d(TAG, "new onEach:$it")
        }.launchIn(lifecycleScope)
    }
}
//输出成果:
//onEach:3
//emit:3
//emit:4
//onEach:4
//emit:5
//onEach:5
//...
//新的订阅者直接输出
//new onEach:50

从上面的输出成果看出:

  1. 没有发送数据时订阅会先接纳默许值

  2. 而新发送的数据后,由于第一个值与原有值相同,直接被过滤掉了。在StateFlow中,经过Any#equals办法来判别前后两个数据是否持平。当时后两个数据持平时,数据不会被更新,订阅者也不会处理。

  3. StateFlow有必要要有一个初始值。当新订阅者呈现时,StateFlow会将最新的数据发射给订阅者。StateFlow只保存终究发射的数据,除此之外不会缓存任何其他的数据。一起,StateFlow不支撑resetReplayCache办法。

Flow 冷流 转化成StateFlow暖流

StateFlow同样也有由Flow冷流转化为暖流的操作符函数stateIn。

private fun stateIn(){
    flowOf(0,1,2,3,4).stateIn(lifecycleScope, SharingStarted.WhileSubscribed(),0)
}

shareIn函数的差异仅仅有必要设置默许值stateIn转化的同享数据流只缓存一个最新值。这儿不多说

  1. StateFlow的原理

1.订阅者的管理类->StateFlowSlot

上面剖析SharedFlow原理剖析中没有单独列出来说SharedFlowSlot是SharedFlowImpl的订阅者的管理类。这儿阐明一下:SharedFlowImpl类中,订阅者数组中存储的目标类型为SharedFlowSlot,而在StateFlowImpl类中,订阅者数组存储的目标类型为StateFlowSlot。

@SharedImmutable
private val NONE = Symbol("NONE") // 状况标识
@SharedImmutable
private val PENDING = Symbol("PENDING") // 状况标识
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
    // _state中默许值为null
    private val _state = atomic<Any?>(null)
    ...
}

依据_state保存目标的不同,能够确认StateFlowSlot类型的目标的状况。StateFlowSlot类型的目标共有四种状况:

  • null:假如_state保存的目标为空,表明当时StateFlowSlot类型的目标没有被任何订阅者运用。
  • NONE:假如_state保存的目标为NONE标识,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,但既没有挂起,也没有在处理当时的数据。
  • PENDING:假如_state保存的目标为PENDING标识,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,而且将开端处理当时的数据。
  • CancellableContinuationImpl :假如_state保存的目标为续体,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,可是订阅者已处理完当时的数据,地点的协程已被挂起,等候新的数据到来。

在StateFlowSlot类中,重写了AbstractSharedFlowSlot类的allocateLocked办法与freeLocked办法,两个办法会对订阅者的初始状况和终究状况进行改变,代码如下:

// 新订阅者申请运用当时的StateFlowSlot类型的目标
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
    // 假如_state保存的目标不为空,
    // 阐明当时StateFlowSlot类型的目标现已被其他订阅者运用
    // 回来false
    if (_state.value != null) return false
    // 走到这儿,阐明没有被其他订阅者运用,分配成功
    // 修正状况值为NONE
    _state.value = NONE
    // 回来true
    return true
}
// 订阅者开释现已运用的StateFlowSlot类型的目标
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
    // 修正状况值为null
    _state.value = null
    // 回来空数组
    return EMPTY_RESUMES
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)

为了完成上述对订阅者状况的管理,在StateFlowSlot类中,还额定供给了三个办法用于完成对订阅者的状况的切换,代码如下:

// 当有状况更新成功时,会调用makePending办法,告诉订阅者能够开端处理新数据
@Suppress("UNCHECKED_CAST")
fun makePending() {
    // 依据当时状况判别
    _state.loop { state ->
        when {
            // 假如未被订阅者运用,则直接回来
            state == null -> return
            // 假如现已处于PENDING状况,则直接回来
            state === PENDING -> return
            // 假如当时状况为NONE
            state === NONE -> {
                // 经过CAS的办法,将状况修正为PENDPENDING,并回来
                if (_state.compareAndSet(state, PENDING)) return
            }
            // 假如为挂起状况
            else -> {
                // 经过CAS的办法,将状况修正为NONE
                if (_state.compareAndSet(state, NONE)) {
                    // 假如修正成功,则康复对应续体的履行,并回来
                    (state as CancellableContinuationImpl<Unit>).resume(Unit)
                    return
                }
            }
        }
    }
}
// 当订阅者每次处理完新数据(不一定处理成功)后,会调用takePending办法,表明完成处理
// 获取当时的状况,并修正新状况为NONE
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
    assert { state !is CancellableContinuationImpl<*> }
    // 假如之前的状况为PENDING,则回来true
    return state === PENDING
}
// 当订阅者没有新数据需求处理时,会调用awaitPending办法挂起
@Suppress("UNCHECKED_CAST")
// 直接挂起,获取续体
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
    assert { _state.value !is CancellableContinuationImpl<*> }
    // 经过CAS的办法,将当时的状况修正为挂起,并回来
    if (_state.compareAndSet(NONE, cont)) return@sc
    // 走到这儿代表状况修正失利,阐明又发射了新数据,当时的状况被修正为PENDING
    assert { _state.value === PENDING }
    // 引发订阅者续体的履行
    cont.resume(Unit)
}
  1. 发送数据

在StateFlowImpl类中,当需求发射数据时,能够调用emit办法、tryEmit办法、compareAndSet办法,代码如下:

override fun tryEmit(value: T): Boolean {
    this.value = value
    return true
}
override suspend fun emit(value: T) {
    this.value = value
}
override fun compareAndSet(expect: T, update: T): Boolean =
    updateState(expect ?: NULL, update ?: NULL)
@Suppress("UNCHECKED_CAST")
public override var value: T
    // 拆箱
    get() = NULL.unbox(_state.value)
    // 更新数据
    set(value) { updateState(null, value ?: NULL) }
// 拆箱操作
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T

能够发现,无论是经过emit办法、tryEmit办法仍是compareAndSet办法,终究都是经过updateState办法完成数据的更新,代码如下:

// sequence是一个全局变量,当新的数据更新时,sequence会产生变化
// 当sequence为奇数时,表明当时数据正在更新
private var sequence = 0
// CAS办法更新当时数据的值
private fun updateState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    // 获取一切的订阅者
    var curSlots: Array<StateFlowSlot?>? = this.slots
    // 加锁
    synchronized(this) {
        // 获取当时数据的值
        val oldState = _state.value
        // 假如等待数据不为空,一起当时数据不等于等待数据,则回来false
        if (expectedState != null && oldState != expectedState) return false
        // 假如新数据与老数据相同,即前后数据没有产生变化,则直接回来true
        if (oldState == newState) return true
        // 更新当时数据
        _state.value = newState
        // 获取全局变量
        curSequence = sequence
        // 假如为偶数,阐明updateState办法没有被其他协程调用,没有并发
        if (curSequence and 1 == 0) {
            // 自添加1,表明当时正在更新数据
            curSequence++
            // 将新值保存到全局变量中
            sequence = curSequence
        } else { // 假如为奇数,阐明updateState办法正在被其他协程调用,处于并发中
            // 加2后不改变奇偶性,仅仅表明当时数据产生了变化
            sequence = curSequence + 2
            // 回来true
            return true
        }
        // 获取当时一切的订阅者
        curSlots = slots
    }
    // 走到这儿,阐明上面不是并发调用updateState办法的状况
    // 循环,告诉订阅者
    while (true) {
        // 遍历,修正订阅者的状况,告诉订阅者
        curSlots?.forEach {
            it?.makePending()
        }
        // 加锁,判别在告诉订阅者的进程中,数据是否又被更新了
        synchronized(this) {
            // 假如数据没有被更新
            if (sequence == curSequence) {
                // 加1,让sequence变成偶数,表明更新完毕
                sequence = curSequence + 1
                // 回来true
                return true
            }
            // 假如数据有被更新,则获取sequence和订阅者
            // 再次循环
            curSequence = sequence
            curSlots = slots
        }
    }
}
  1. 承受数据

调用StateFlow类型目标的collect办法,会触发订阅进程,接纳emit办法发送的数据,这部分在 StateFlowImpl中完成,代码如下:

override suspend fun collect(collector: FlowCollector<T>) {
    // 为当时的订阅者分配一个StateFlowSlot类型的目标
    val slot = allocateSlot()
    try {
        // 假如collector类型为SubscribedFlowCollector,
        // 阐明订阅者监听了订阅进程的发动,则先回调
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        // 获取订阅者地点的协程
        val collectorJob = currentCoroutineContext()[Job]
        // 局部变量,保存上一次发射的数据,初始值为null
        var oldState: Any? = null
        // 死循环
        while (true) {
            // 获取当时的数据
            val newState = _state.value
            // 判别订阅者地点协程是否是存活的,假如不是则抛出反常
            collectorJob?.ensureActive()
            // 假如订阅者是第一次处理数据或许当时数据与上一次数据不同
            if (oldState == null || oldState != newState) {
                // 将数据发送给下流
                collector.emit(NULL.unbox(newState))
                // 保存当时发射数据到局部变量
                oldState = newState
            }
            // 修正状况,假如之前不是PENGDING状况
            if (!slot.takePending()) {
                // 则挂起等候新数据更新
                slot.awaitPending()
            }
        }
    } finally {
        // 开释已分配的StateFlowSlot类型的目标
        freeSlot(slot)
    }
}
  1. 新订阅者怎样获取获取缓存数据呢

当新订阅者呈现时,StateFlow会将当时最新的数据发送给订阅者。能够经过调用StateFlowImpl类重写的常量replayCache获取当时最新的数据,代码如下:

override val replayCache: List<T>
    get() = listOf(value)

看到这儿就能明白为什么新订阅者能拿到数据。由于每次都是获取最新的value..而且还不能清楚缓存。假如清楚就会报错

override fun resetReplayCache() {
    throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

到此咱们常用的sharedFlow和stateFlow就剖析完了。。。。

当然我这儿剖析sharedFlow和stateFlow里边的原理仍是没有剖析很细心。仅仅依据注释和流程对它们的有所了解。后续假如咱们开发起来遇到问题就能很快去翻看源码找到答案。。用起来也能很随心应手~~~