我正在参与「启航方案」

前语

Kotlin 为我们供给了两种创建“热流”的东西:StateFlowSharedFlow。StateFlow 经常被用来替代 LiveData 充任架构组件运用,所以我们相对了解。其实 StateFlow 仅仅 SharedFlow 的一种特化方式,SharedFlow 的功能更健壮、运用场景更多,这得益于其自带的缓存体系,本文用图解的方法,带我们更形象地了解 SharedFlow 的缓存体系。

创建 SharedFlow 需要运用到 MutableSharedFlow() 方法,我们通过方法的三个参数配备缓存:

fun <T> MutableSharedFlow(
    replay: Int = 0, 
    extraBufferCapacity: Int = 0, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

接下来,我们通过时序图的方式介绍这三个要害参数对缓存的影响。正文之前让我们先一致一下用语:

  • Emitter:Flow 数据的出产者,从上游发射数据
  • Subcriber:Flow 数据的顾客,在下游接收数据

replay

当 Subscriber 订阅 SharedFlow 时,有时机接收到之前已发送过的数据,replay 指定了可以收到 subscribe 之前数据的数量。replay 不能为负数,默许值为 0 标明 Subscriber 只能接收到 subscribe 之后 emit 的数据:

一看就懂!图解 Kotlin SharedFlow 缓存体系

上图展示的是 replay = 0 的情况,Subscriber 无法收到 subscribe 之前 emit 的 ❶,只能接收到 ❷ 和 ❸。

当 replay = n ( n > 0)时,SharedFlow 会启用缓存,此时 BufferSize 为 n,意味着可以缓存发射过的最近 n 个数据,并发送给新增的 Subscriber。

一看就懂!图解 Kotlin SharedFlow 缓存体系

上图以 n = 1 为例 :

  1. Emitter 发送 ❶ ,并被 Buffer 缓存
  2. Subscriber 订阅 SharedFlow 后,接收到缓存的 ❶
  3. Emitter 相继发送 ❷ ❸ ,Buffer 缓存的数据相继顺次被更新

在出产者顾客模型中,有时消费的速度赶不及出产,此时要加以操控,要么停止出产,要么丢掉数据。SharedFlow 也相同如此。有时 Subscriber 的处理速度较慢,Buffer 缓存的数据得不到及时处理,当 Buffer 为空时,emit 默许将会被挂起 ( onBufferOverflow = SUSPEND)

一看就懂!图解 Kotlin SharedFlow 缓存体系

上面的图展示了 replay = 1 时 emit 发生 suspend 场景:

  1. Emitter 发送 ❶ 并被缓存
  2. Subscriber 订阅 SharedFlow ,接收 replay 的 ❶ 初步处理
  3. Emitter 发送 ❷ ,缓存数据更新为 ❷ ,因为 Subscriber 对 ❶ 的处理没有结束,❷ 在缓存中没有及时被消费
  4. Emitter 发送 ❸,因为缓存的 ❷ 没有被 Subscriber 消费,emit 发生挂起
  5. Subscriber 初步消费 ❷ ,Buffer 缓存 ❸ , Emitter 可以继续 emit 新数据

留心 SharedFlow 作为一个多播可以有多个 Subscriber,所以上面比如中,❷ 被消费的时间点,取决于终究一个初步处理的 Subscriber。

extraBufferCapacity

extraBufferCapacity 中的 extra 标明 replay-cache 之外为 Buffer 还可以额定追加的缓存。

若 replay = n, extraBufferCapacity = m,则 BufferSize = m + n

extraBufferCapacity 默许为 0,设置 extraBufferCapacity 有助于进步 Emitter 的吞吐量

在上图的根底之上,我们再设置 extraBufferCapacity = 1,作用如下图:

一看就懂!图解 Kotlin SharedFlow 缓存体系

上图中 BufferSize = 1 + 1 = 2 :

  1. Emitter 发送 ❶ 并得到 Subscriber1 的处理 ,❶ 作为 replay 的一个数据被缓存,
  2. Emitter 发送 ❷,Buffer 中 replay-cache 的数据更新为 ❷
  3. Emitter 发送 ❸,Buffer 在存储了 replay 数据 ❷ 之上,作为 extra 又存储了 ❸
  4. Emitter 发送 ❹,此时 Buffer 已没有空余方位,emit 挂起
  5. Subscriber2 订阅 SharedFlow。虽然此时 Buffer 中存有 ❷ ❸ 两个数据,但是因为 replay = 1,所以 Subscriber2 只能收到最近的一个数据 ❸
  6. Subscriber1 处理完 ❶ 后,顺次处理 Buffer 中的下一个数据,初步消费 ❷
  7. 对于 SharedFlow 来说,现已不存在没有消费 ❷ 的 Subscriber,❷ 移除缓存,❹ 的 emit 继续,并进入缓存,此时 Buffer 又有两个数据 ❸ ❹ ,
  8. Subscriber1 处理完 ❷ ,初步消费 ❸
  9. 不存在没有消费 ❸ 的 Subscriber, ❸ 移除缓存。

onBufferOverflow

前面的比如中,当 Buffer 被填满时,emit 会被挂起,这都是建立在 onBufferOverflow 为 SUSPEND 的条件下的。onBufferOverflow 用来指定缓存移除时的战略,除了默许的 SUSPEND,还有两个数据丢掉战略:

  • DROP_LATEST:丢掉最新的数据
  • DROP_OLDEST:丢掉最老的数据

需要特别留心的是,当 BufferSize = 0 时,extraBufferCapacity 只支撑 SUSPEND,其他丢掉战略是无效的。这很好了解,因为 Buffer 中没有数据,所以丢掉无从下手,所以发动丢掉战略的条件是 Buffer 至少有一个缓冲区,且数据被填满

一看就懂!图解 Kotlin SharedFlow 缓存体系

上图展示 DROP_LATEST 的作用。假设 replay = 2,extra = 0

  1. Emitter 发送 ❸ 时,因为 ❶ 现已被消费,所以 Buffer 数据从 ❶❷ 变为 ❷❸
  2. Emitter 发送 ❹ 时,因为 ❷ 还未被消费,Buffer 处于填满情况, ❹ 直接被丢掉
  3. Emitter 发送 ❺ 时,因为 ❷ 现已被费,可以移除缓存,Buffer 数据变为 ❸❺

一看就懂!图解 Kotlin SharedFlow 缓存体系

上图展示了 DROP_OLDEST 的作用,与 DROP_LATEST 比较后非常显着,缓存中永远会贮存最新的两个数据,但是较老的数据不管有没有被消费,都或许会从 Buffer 移除,所以 Subscriber 可以消费其时最新的数据,但是有或许漏掉中间的数据,比如图中漏掉了 ❷

留心:当 extraBufferCapacity 设为 SUSPEND 可以保证 Subscriber 一个不漏的消费掉一切数据,但是会影响 Emitter 的速度;当设置为 DROP_XXX 时,可以保证 emit 调用后当即回来,但是 Subscriber 或许会漏掉部分数据。

假设我们不想让 emit 发生挂起,除了设置 DROP_XXX 之外,还有一个方法就是调用 tryEmit,这是一个非 suspend 版本的 emit

abstract suspend override fun emit(value: T)
abstract fun tryEmit(value: T): Boolean

tryEmit 回来一个 boolean 值,你可以这样判别回来值,当运用 emit 会挂起时,运用 tryEmit 会回来 false,其他情况都是 true。这意味着 tryEmit 回来 false 的条件是 extraBufferCapacity 必须设为 SUSPEND,且 Buffer 中空余方位为 0 。此时运用 tryEmit 的作用等同于 DROP_LATEST。

SharedFlow Buffer

前面介绍的 MutableSharedFlow 的三个参数,其本质都是环绕 SharedFlow 的 Buffer 进行作业的。那么这个 Buffer 详细结构是怎样的呢?

一看就懂!图解 Kotlin SharedFlow 缓存体系

上面这个图是 SharedFlow 源码中关于 Buffer 的注释,这个图形象地告知了我们 Buffer 是一个线性数据结构(就是一个普通的数组 Array<Any?>),但是这个图不能直观反响 Buffer 运行机制。下面通过一个比如,看一下 Buffer 在运行时的详细更新进程:

val sharedFlow = MutableSharedFlow<Int>(
    replay = 2, 
    extraBufferCapacity = 2,
    onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1
fun main() {
    runBlocking {
        launch {
            sharedFlow.onEach {
                delay(200) // simulate the consume of data
            }.collect()
        }
        repeat(12) {
            sharedFlow.emit(emitValue)
            emitValue++
            delay(50)
        }
    }
}

上面的代码很简单,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 出产的速度大于 Subscriber 消费的速度,所以进程中会呈现 Buffer 的填充和更新,下面依旧用图的方法展示 Buffer 的改动

先看一下代码对应的时序图:

一看就懂!图解 Kotlin SharedFlow 缓存体系

有前面的介绍,相信这个时序图很简单了解,这儿就不再赘述了,下面重点图解一下 Buffer 的内存改动。SharedFlow 的 Buffer 本质上是一个根据 Array 完结的 queue,通过指针移动从往队伍增删元素,避免了元素在实践数组中的移动。这儿要害的指针有三个:

  • head:队伍的 head 指向 Buffer 的第一个有用数据,这是时间上最早进入缓存的数据,在数据被一切的 Subscriber 消费之前不会移除缓存。因此 head 也代表了最慢的 Subscriber 的处理发展
  • replay:Buffer 为 replay-cache 预留空间的其实方位,当有新的 Subscriber 订阅发生时,从此方位初步处理数据。
  • end:新数据进入缓存时的方位,end 这也代表了最快的 Subscriber 的处理发展。

假设 bufferSize 标明其时 Buffer 中存储数据的个数,则我们可知三指针 index 契合如下联系:

  • replay <= head + bufferSize
  • end = head + bufferSize

了解了三指针的意义后,我们再来看上图中的 Buffer 是如何作业的:

一看就懂!图解 Kotlin SharedFlow 缓存体系

终究,总结一下 Buffer 的特征:

  • 根据数组完结,当数组空间不行时进行 2n 的扩容
  • 元素进入数组后的方位保持不变,通过移动指针,决议数据的消费起点
  • 指针移动到数组尾部后,会从头指向头部,数组空间可循环运用