前言

协程系列文章:

  • 一个小故事讲理解进程、线程、Kotlin 协程到底啥关系?
  • 少年,你可知 Kotlin 协程开始的样子?
  • 讲真,Kotlin 协程的挂起/康复没那么奥秘(故事篇)
  • 讲真,Kotlin 协程的挂起/康复没那么奥秘(原理篇)
  • Kotlin 协程调度切换线程是时分解开本相了
  • Kotlin 协程之线程池探索之旅(与Java线程池PK)
  • Kotlin 协程之撤销与反常处理探索之旅(上)
  • Kotlin 协程之撤销与反常处理探索之旅(下)
  • 来,跟我一同撸Kotlin runBlocking/launch/join/async/delay 原理&运用
  • 继续来,同我一同撸Kotlin Channel 深水区
  • Kotlin 协程 Select:看我怎么多路复用
  • Kotlin Sequence 是时分派上用场了
  • Kotlin Flow啊,你将流向何方?
  • Kotlin Flow 背压和线程切换居然如此类似
  • Kotlin SharedFlow&StateFlow 暖流到底有多热?

前面剖析的都是冷流,冷热是对应的,有冷就有热,本篇将要点剖析暖流SharedFlow&StateFlow的运用及其原理,探究其”热度”。
经过本篇文章,你将了解到:

  1. 冷流与暖流差异
  2. SharedFlow 运用方法与运用场景
  3. SharedFlow 原理不一样的视点剖析
  4. StateFlow 运用方法与运用场景
  5. StateFlow 原理一看就会
  6. StateFlow/SharedFlow/LiveData 差异与运用

1. 冷流与暖流差异

Kotlin SharedFlow&StateFlow 热流到底有多热?

2. SharedFlow 运用方法与运用场景

运用方法

流的两头别离是顾客(观察者/订阅者),出产者(被观察者/被订阅者),因而只需求重视两头的行为即可。

1. 出产者先发送数据

    fun test1() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>()
            //发送数据(出产者)
            flow.emit("hello world")
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客)
                flow.collect {
                    println("collect: $it")
                }
            }
        }
    }

Q:先猜想一下成果?
A:没有任何打印

咱们猜想:出产者先发送了数据,由于此刻顾客还没来得及接纳,因而数据被丢掉了。

2. 出产者拖延发送数据
咱们很容易想到变换一下机遇,让顾客先注册等候:

    fun test2() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>()
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客)
                flow.collect {
                    println("collect: $it")
                }
            }
            //发送数据(出产者)
            delay(200)//确保顾客现已注册上
            flow.emit("hello world")
        }
    }

这个时分顾客成功打印数据。

3. 历史数据的保存(重放)
虽然2的方法连通了出产者和顾客,可是你对1的失败耿耿于怀:觉得SharedFlow有点弱啊,限制有点狠,LiveData每次新的观察者到来都能收到当时的数据,而SharedFlow不行。
实际上,SharedFlow关于历史数据的重放比LiveData更强壮,LiveData一直只需个值,也便是每次只重放1个值,而SharedFlow可装备重放恣意值(当然不能超过Int的规模)。
换一下运用姿势:

    fun test3() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>(1)
            //发送数据(出产者)
            flow.emit("hello world")
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客)
                flow.collect {
                    println("collect: $it")
                }
            }
        }
    }

此刻达到的效果与2共同,MutableSharedFlow(1)表明设定出产者保存1个值,当有新的顾客来了之后将会获取到这个保存的值。
当然也能够保存更多的值:

    fun test3() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>(4)
            //发送数据(出产者)
            flow.emit("hello world1")
            flow.emit("hello world2")
            flow.emit("hello world3")
            flow.emit("hello world4")
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客)
                flow.collect {
                    println("collect: $it")
                }
            }
        }
    }

此刻顾客将打印出”hell world1~hello world4″,此刻也说明晰不管有没有顾客,出产者都出产了数据,由此说明:

SharedFlow 是暖流

4. collect是挂起函数
在2里,咱们敞开了协程去履行顾客逻辑:flow.collect,不独自敞开协程履行会怎样?

    fun test4() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>()
            //接纳数据(顾客)
            flow.collect {
                println("collect: $it")
            }
            println("start emit")//①
            flow.emit("hello world")
        }
    }

最终发现①没打印出来,由于collect是挂起函数,此刻由于出产者还没来得及出产数据,顾客调用collect时发现没数据后便挂起协程。

因而出产者和顾客要处在不同的协程里

5. emit是挂起函数
顾客要等候出产者出产数据,所以collect规划为挂起函数,反过来出产者是否要等候顾客消费完数据才进行下一次emit呢?

fun test5() {
    runBlocking {
        //结构暖流
        val flow = MutableSharedFlow<String>()
        //敞开协程
        GlobalScope.launch {
            //接纳数据(顾客)
            flow.collect {
                println("collect: $it")
                delay(2000)
            }
        }
        //发送数据(出产者)
        delay(200)//确保顾客先履行
        println("emit 1 ${System.currentTimeMillis()}")
        flow.emit("hello world1")
        delay(100)
        println("emit 2 ${System.currentTimeMillis()}")
        flow.emit("hello world2")
        delay(100)
        println("emit 3 ${System.currentTimeMillis()}")
        flow.emit("hello world3")
        delay(100)
        println("emit 4 ${System.currentTimeMillis()}")
        flow.emit("hello world4")
    }
}
从打印能够看出,出产者每次emit都需求等候顾客消费完成之后才干进行下次emit。
**6. 缓存的设定**    
在之前剖析Flow的时分有说过Flow的背压问题以及运用Buffer来处理它,相同的在SharedFlow里也有缓存的概念。    
```kotlin
    fun test6() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>(0, 10)
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客)
                flow.collect {
                    delay(2000)
                    println("collect: $it")
                }
            }
            //发送数据(出产者)
            delay(200)//确保顾客先履行
            println("emit 1 ${System.currentTimeMillis()}")
            flow.emit("hello world1")
            println("emit 2 ${System.currentTimeMillis()}")
            flow.emit("hello world2")
            println("emit 3 ${System.currentTimeMillis()}")
            flow.emit("hello world3")
            println("emit 4 ${System.currentTimeMillis()}")
            flow.emit("hello world4")
        }
    }

MutableSharedFlow(0, 10) 第2个参数10表明额定的缓存巨细为10,出产者经过emit先将数据放到缓存里,此刻它并没有被顾客的速度连累。

7. 重放与额定缓存个数

public fun <T> MutableSharedFlow(
    replay: Int = 0,//重放个数
    extraBufferCapacity: Int = 0,//额定的缓存个数
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
):

重放主要用来给新进的顾客重放特定个数的历史数据,而额定的缓存个数是为了敷衍背压问题,总的缓存个数=重放个数+额定的缓存个数。

运用场景

如有以下需求,可用SharedFlow

  1. 需求重放历史数据
  2. 能够装备缓存
  3. 需求重复发射/接纳相同的值

3. SharedFlow 原理不一样的视点剖析

带着问题找答案

要点重视的无非是emit和collect函数,它俩都是挂起函数,而是否挂起取决于是否满足条件。一同出产者和消费呈现的机遇也会影响这个条件,因而列举出产者、顾客呈现的机遇即可。

只需出产者

当只需出产者没有顾客,此刻出产者调用emit会挂起协程吗?假如不是,那么什么情况会挂起?
从emit函数源码入手:

    override suspend fun emit(value: T) {
        //假如发射成功,则直接退出函数
        if (tryEmit(value)) return // fast-path
        //不然挂起协程
        emitSuspend(value)
    }

先看tryEmit(xx):

    override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = kotlinx.coroutines.internal.synchronized(this) {
            //测验emit
            if (tryEmitLocked(value)) {
                //遍历所有顾客,找到需求唤醒的顾客协程
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
        //康复顾客协程
        for (cont in resumes) cont?.resume(Unit)
        //emitted==true表明发射成功
        return emitted
    }
    private fun tryEmitLocked(value: T): Boolean {
        //nCollectors 表明顾客个数,若是没有顾客则不管怎么都会发射成功
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            //假如缓存现已满而且有顾客没有消费最旧的数据(replayIndex),则进入此处
            when (onBufferOverflow) {
                //挂起出产者
                BufferOverflow.SUSPEND -> return false // will suspend
                //直接丢掉最新数据,认为发射成功
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                //丢掉最旧的数据
                BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
            }
        }
        //将数据参加到缓存行列里
        enqueueLocked(value)
        //缓存数据行列长度
        bufferSize++ // value was added to buffer
        // drop oldest from the buffer if it became more than bufferCapacity
        if (bufferSize > bufferCapacity) dropOldestLocked()
        // keep replaySize not larger that needed
        if (replaySize > replay) { // increment replayIndex by one
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }
    private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        kotlinx.coroutines.assert { nCollectors == 0 }
        //没有设置重放,则直接退出,丢掉发射的值
        if (replay == 0) return true // no need to replay, just forget it now
        //参加到缓存里
        enqueueLocked(value) // enqueue to replayCache
        bufferSize++ // value was added to buffer
        // drop oldest from the buffer if it became more than replay
        //若是超出了重放个数,则丢掉最旧的值
        if (bufferSize > replay) dropOldestLocked()
        minCollectorIndex = head + bufferSize // a default value (max allowed)
        //发射成功
        return true
    }

再看emitSuspend(value):

    private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitter = kotlinx.coroutines.internal.synchronized(this) lock@{
            ...
            //结构为Emitter,参加到buffer里
            SharedFlowImpl.Emitter(this, head + totalSize, value, cont).also {
                enqueueLocked(it)
                //独自记录挂起的emit
                queueSize++ // added to queue of waiting emitters
                // synchronous shared flow might rendezvous with waiting emitter
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
    }

用图表明整个emit流程:

Kotlin SharedFlow&StateFlow 热流到底有多热?

现在能够回到上面的问题了。

  1. 假如没有顾客,出产者调用emit函数永远不会挂起
  2. 有顾客注册了而且缓存容量已满而且最旧的数据没有被消费,则出产者emit函数有机会被挂起,假如设定了挂起模式,则会被挂起

最旧的数据下面会剖析

只需顾客

当只需顾客时,顾客调用collect会被挂起吗?
从collect函数源码入手。

    override suspend fun collect(collector: FlowCollector<T>) {
        //分配slot
        val slot = allocateSlot()//①
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                //死循环
                var newValue: Any?
                while (true) {
                    //测验获取值 ②
                    newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                    if (newValue !== NO_VALUE) 
                        break//拿到值,退出内层循环
                    //没拿到值,挂起等候 ③
                    awaitValue(slot) // await signal that the new value is available
                }
                collectorJob?.ensureActive()
                //拿到值,消费数据
                collector.emit(newValue as T)
            }
        } finally {
            freeSlot(slot)
        }
    }

要点看三点:
① allocateSlot()
先看Slot数据结构:

    private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
        //顾客当时应该消费的数据在出产者缓存里的索引
        var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
        //挂起的顾客协程体
        var cont: Continuation<Unit>? = null // collector waiting for new value
    }

每此调用collect都会为其生成一个AbstractSharedFlowSlot目标,该目标存储在AbstractSharedFlowSlot目标数组:slots里

allocateSlot() 有两个效果:

  1. 给slots数组扩容
  2. 往slots数组里存放AbstractSharedFlowSlot目标

② tryTakeValue(slot)
创立了slot之后就能够去取值了

    private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val value = kotlinx.coroutines.internal.synchronized(this) {
            //找到slot对应的buffer里的数据索引
            val index = tryPeekLocked(slot)
            if (index < 0) {
                //没找到
                NO_VALUE
            } else {
                //找到
                val oldIndex = slot.index
                //依据索引,从buffer里获取值
                val newValue = getPeekedValueLockedAt(index)
                //slot索引增加,指向buffer里的下个数据
                slot.index = index + 1 // points to the next index after peeked one
                //更新游标等信息,并回来挂起的出产者协程
                resumes = updateCollectorIndexLocked(oldIndex)
                newValue
            }
        }
        //假如能够,则引发出产者协程
        for (resume in resumes) resume?.resume(Unit)
        return value
    }

该函数有或许取到值,也或许取不到。

③ awaitValue

    private suspend fun awaitValue(slot: kotlinx.coroutines.flow.SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
        kotlinx.coroutines.internal.synchronized(this) lock@{
            //再次测验获取
            val index = tryPeekLocked(slot) // recheck under this lock
            if (index < 0) {
                //说明没数据可取,此刻记录当时协程,后续康复时才干找到
                slot.cont = cont // Ok -- suspending
            } else {
                //有数据了,则唤醒
                cont.resume(Unit) // has value, no need to suspend
                return@lock
            }
            slot.cont = cont // suspend, waiting
        }
    }

Kotlin SharedFlow&StateFlow 热流到底有多热?

对比出产者emit和顾客collect流程,显然collect流程比emit流程简略多了。

现在能够回到上面的问题了。

不管是否有出产者,只需没拿到数据,collect都会被挂起

slot与buffer

以上别离剖析了emit和collect流程,咱们知道了emit或许被挂起,被挂起后能够经过collect唤醒,相同的collect也或许被挂起,挂起后经过emit唤醒。
要点在于两者是怎么交换数据的,也便是slot目标和buffer是怎么相关的?

Kotlin SharedFlow&StateFlow 热流到底有多热?

如上图,简介其流程:
  1. SharedFlow设定重放个数为4,额定容量为3,总容量为4+3=7
  2. 出产者将数据堆到buffer里,此刻顾客还没开始collect
  3. 顾客开始collect,由于设置了重放个数,因而结构Slot目标时,slot.index=0,依据index找到buffer下标为0的元素即为能够消费的元素
  4. 拿到0号数据后,slot.index=1,找到buffer下标为1的元素
  5. index++,重复4的进程

由于collect消费了数据,因而emit能够继续放新的数据,此刻又有新的collect参加进来:

Kotlin SharedFlow&StateFlow 热流到底有多热?

  1. 新参加的顾客collect时结构Slot目标,由于此刻的buffer最旧的值为buffer下标为2,因而Slot初始化Slot.index = 2,取第2个数据
  2. 相同的,继续往后取值

此刻有了2个顾客,假定顾客2消费速度很慢,它停留在了index=3,而顾客1消费速度快,变成了如下图:

Kotlin SharedFlow&StateFlow 热流到底有多热?

  1. 顾客1在取index=4的值(能够继续往后消费数据),顾客2在取index=3的值
  2. 出产者此刻现已填充满buffer了,buffer里最旧的值为index=4,为了确保顾客2能够获取到index=4的值,此刻它不能再emit新的数据了,于是出产者被挂起
  3. 等到顾客2消费了index=4的值,就会唤醒正在挂起的出产者继续出产数据

由此得出一个结论:

SharedFlow的emit或许会被最慢的collect连累然后挂起

该现象用代码查看打印比较直观:

    fun test7() {
        runBlocking {
            //结构暖流
            val flow = MutableSharedFlow<String>(4, 3)
            //敞开协程
            GlobalScope.launch {
                //接纳数据(顾客1)
                flow.collect {
                    println("collect1: $it")
                }
            }
            GlobalScope.launch {
                //接纳数据(顾客2)
                flow.collect {
                    //模拟消费慢
                    delay(10000)
                    println("collect2: $it")
                }
            }
            //发送数据(出产者)
            delay(200)//确保顾客先履行
            var count = 0
            while (true) {
                flow.emit("emit:${count++}")
            }
        }
    }

4. StateFlow 运用方法与运用场景

运用方法

1. 重放功用
上面花了很大篇幅剖析SharedFlow,而StateFlow是SharedFlow的特例,先来看其简略运用。

    fun test8() {
        runBlocking {
            //结构暖流
            val flow = MutableStateFlow("")
            flow.emit("hello world")
            flow.collect {
                //顾客
                println(it)
            }
        }
    }

咱们发现,并没有给Flow设置重放,此刻顾客仍然能够消费到数据,说明StateFlow默认支持历史数据重放。

2. 重放个数
具体能重放几个值呢?

    fun test10() {
        runBlocking {
            //结构暖流
            val flow = MutableStateFlow("")
            flow.emit("hello world")
            flow.emit("hello world1")
            flow.emit("hello world2")
            flow.emit("hello world3")
            flow.emit("hello world4")
            flow.collect {
                //顾客
                println(it)
            }
        }
    }

最终发现顾客只需1次打印,说明StateFlow只重放1次,而且是最新的值。

3. 防抖

    fun test9() {
        runBlocking {
            //结构暖流
            val flow = MutableStateFlow("")
            flow.emit("hello world")
            GlobalScope.launch {
                flow.collect {
                    //顾客
                    println(it)
                }
            }
            //再发送
            delay(1000)
            flow.emit("hello world")
//            flow.emit("hello world1")
        }
    }

出产者发送了两次数据,猜猜此刻顾客有几次打印?
答案是只需1次,由于StateFlow规划了防抖,当emit时会检测当时的值和上一次的值是否共同,若共同则直接抛弃当时数据不做任何处理,collect当然就收不到值了。若是咱们将注释铺开,则会有2次打印。

运用场景

StateFlow 和LiveData很像,都是只保护一个值,旧的值过来就会将新值掩盖。
适用于告诉状况变化的场景,如下载进展。适用于只重视最新的值的变化。
假如你熟悉LiveData,就能够理解为StateFlow根本能够做到替换LiveData功用。

5. StateFlow 原理一看就会

假如你看懂了SharedFlow原理,那么对StateFlow原理的理解就不在话下了。

emit 进程

    override suspend fun emit(value: T) {
        //value 为StateFlow保护的值,每次emit都会修正它
        this.value = value
    }
    public override var value: T
        get() = NULL.unbox(_state.value)//从state取出
        set(value) { updateState(null, value ?: NULL) }
    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence = 0
        var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
        kotlinx.coroutines.internal.synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            //新旧值共同,则无需更新
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            //更新到state里
            _state.value = newState
            curSequence = sequence
            //...
            curSlots = slots // read current reference to collectors under lock
        }
        while (true) {
            curSlots?.forEach {
                //遍历顾客,修正状况或是将挂起的顾客唤醒
                it?.makePending()
            }
                ...
        }
    }

emit进程便是修正value值的进程,不管是否修正成功,emit函数都会退出,它不会被挂起。

collect 进程

    override suspend fun collect(collector: FlowCollector<T>) {
        //分配slot
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            while (true) {
                val newState = _state.value
                collectorJob?.ensureActive()
                //值不相同才调用collect闭包
                if (oldState == null || oldState != newState) {
                    collector.emit(NULL.unbox(newState))
                    oldState = newState
                }
                if (!slot.takePending()) { // try fast-path without suspending first
                    //挂起协程
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

StateFlow 也有slot,叫做StateFlowSlot,它比SharedFlowSlot简略多了,由于一直只需求保护一个值,所以不需求index。里边有个成员变量_state,该值既能够是顾客协程当时的状况,也能够表明协程体。
当表明为协程体时,说明此刻顾客被挂起了,等到出产者经过emit唤醒该协程。

Kotlin SharedFlow&StateFlow 热流到底有多热?

6. StateFlow/SharedFlow/LiveData 差异与运用

  1. StateFlow 是SharedFlow特例
  2. SharedFlow 多用于事情告诉,StateFlow/LiveData多用于状况变化
  3. StateFlow 有默认值,LiveData没有,StateFlow.collect闭包可在子线程履行,LiveData.observe需求在主线程监听,StateFlow没有相关生命周期,LiveData相关了生命周期,StateFlow防抖,LiveData不防抖等等。

跟着本篇的完结,Kotlin协程系列也告一段落了,接下来将要点放在协程工程架构实践上,敬请期待。

以上为Flow背压和线程切换的全部内容,下篇将剖析Flow的暖流。
本文根据Kotlin 1.5.3,文中完好Demo请点击

您若喜欢,请点赞、重视、收藏,您的鼓舞是我行进的动力

继续更新中,和我一同稳扎稳打系统、深入学习Android/Kotlin

1、Android各种Context的宿世此生
2、Android DecorView 必知必会
3、Window/WindowManager 不行不知之事
4、View Measure/Layout/Draw 真理解了
5、Android事情分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 怎么确认巨细/onMeasure()屡次履行原因
8、Android事情驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明晰
11、Android Activity/Window/View 的background
12、Android Activity创立到View的显现过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑问
16、Java 线程池系列
17、Android Jetpack 前置根底系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读