开篇

flow api 现已渐渐被谷歌列为数据流的首选,能够见到官网的数据流篇都渐渐偏向于flow api,尽管LiveData等数据流类型现已深化开发者观念中,可是flow api也正渐渐的崛起出自己的商场。本篇讲的StateFlow是flow api中的一个更偏向于应用层的api,功用也非常和LiveData相似,那么为什么要出一个和LiveData相似的东西的,因为LiveData天生就引入了生命周期相关的概念,从设计角度出发,其实是耦合了生命周期这部分,所以现在才重整旗鼓,呈现了StateFlow。

接口部分

public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

能够看到StateFlow在SharedFlow上层上添加了一个属性,便是value值,能够被以为当时是当时可观测的值,跟LiveData的value相似。StateFlow愈加推崇的是单体状况,所以差异于一般的flowapi(主要是数据流状况),它的实现制止了一个重置操作。

StateFlowImpl 中
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
    throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

还有便是一般的flow属于冷流,冷流这个概念就不再赘述,而flow之所以是冷流,是因为只要在collect的时分间接通过flowcollecter的emit办法去产生数据,实质上数据的改动依靠搜集者,所以才是冷流,具体分析能够下看上一篇文章
/post/705860…

而StateFlow承继于SharedFlow,并且value数值是不依靠于collect办法改动,简略来说,便是搜集者能够改动value数值,可是StateFlow中的value改动并不是只要这一种手法【注意这儿概念】,它还能够直接对value数据改动,如下面比如

class CounterModel {
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
    fun inc() {
        _counter.value++
    }
}

所以按照数据划分,它就能够属于暖流,当然冷流暖流实质是一个概念,便于区别即可。

发送者部分

为了便利分析,咱们将数据改动部分称为发送者部分,每个对value进行set操作时都会进入下面办法。

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
    synchronized(this) {
        val oldState = _state.value
        if (expectedState != null && oldState != expectedState) return false // CAS support
        if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
        _state.value = newState
        curSequence = sequence
        if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
            curSequence++ // make it odd
            sequence = curSequence
        } else {
            // update is already in process, notify it, and return
            sequence = curSequence + 2 // change sequence to notify, keep it odd
            return true // updated
        }
        curSlots = slots // read current reference to collectors under lock
    }
    /*
       Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
       Loop until we're done firing all the changes. This is a sort of simple flat combining that
       ensures sequential firing of concurrent updates and avoids the storm of collector resumes
       when updates happen concurrently from many threads.
     */
    while (true) {
        // Benign race on element read from array
        curSlots?.forEach {
            it?.makePending()
        }
        // check if the value was updated again while we were updating the old one
        synchronized(this) {
            if (sequence == curSequence) { // nothing changed, we are done
                sequence = curSequence + 1 // make sequence even again
                return true // done, updated
            }
            // reread everything for the next loop under the lock
            curSequence = sequence
            curSlots = slots
        }
    }
}

其实一段代码下来,除了数据保护部分,便是做了这么一件事,更新value的数据值,并记载当时数值是否是最新数值。和livedata的更新数据部分相似,实质都是保护一个计数器sequence,用来区别当时的value是否是最新。slots用来(下文会讲)记载数据更新状况,总的来说便是当数值发生改动的时分更新数据。

订阅者部分

当咱们调用collect的时分,就会进行数据获取,也便是调用接收的那一部分。

override suspend fun collect(collector: FlowCollector<T>) {
    val slot = allocateSlot()
    try {
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        val collectorJob = currentCoroutineContext()[Job]
        var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        // The loop is arranged so that it starts delivering current value without waiting first
        while (true) {
            // Here the coroutine could have waited for a while to be dispatched,
            // so we use the most recent state here to ensure the best possible conflation of stale values
            val newState = _state.value
            // always check for cancellation
            collectorJob?.ensureActive()
            // Conflate value emissions using equality
            if (oldState == null || oldState != newState) {
                collector.emit(NULL.unbox(newState))
                oldState = newState
            }
            // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
            if (!slot.takePending()) { // try fast-path without suspending first
                slot.awaitPending() // only suspend for new values when needed
            }
        }
    } finally {
        freeSlot(slot)
    }
}

因为collect在协程傍边,所以要时刻重视协程的状况collectorJob?.ensureActive()便是之一,那么什么时分才会正在调用咱们collect里的自定义逻辑呢?换个说法来说便是什么时分像LiveData相同收到回调呢?
注意一下上面的这儿

if (oldState == null || oldState != newState) {
                collector.emit(NULL.unbox(newState))
                oldState = newState
            }

和livedata不相同的是,livedata每次设定value都会收到回调,而flow是在新旧状况不相同时,才会进行 collector.emit(NULL.unbox(newState)),进而触发搜集。所以当value被屡次设定同一个值的时分,LiveData会回调屡次,而StateFlow只会调用一次。

关于StateFlowSlot

能够看到不管上面的发送部分或许接收部分都有StateFlowSlot类的影子,
比如

var curSlots: Array<StateFlowSlot?>? = this.slots /

里边保护着一个Array,那么这个是有什么用处呢?
咱们知道flow是运行在协程里边的,简略来说协程实质便是状况机各种回调,那么根据这种环境下,在多线程或许多子协程中,为了保护State的正确更改,里边也要设定自己的状况机进行标识状况的正确改动。

private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
    private val _state = atomic<Any?>(null)
    override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
        // No need for atomic check & update here, since allocated happens under StateFlow lock
        if (_state.value != null) return false // not free
        _state.value = NONE // allocated
        return true
    }
    override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
        _state.value = null // free now
        return EMPTY_RESUMES // nothing more to do
    }
    @Suppress("UNCHECKED_CAST")
    fun makePending() {
        _state.loop { state ->
            when {
                state == null -> return // this slot is free - skip it
                state === PENDING -> return // already pending, nothing to do
                state === NONE -> { // mark as pending
                    if (_state.compareAndSet(state, PENDING)) return
                }
                else -> { // must be a suspend continuation state
                    // we must still use CAS here since continuation may get cancelled and free the slot at any time
                    if (_state.compareAndSet(state, NONE)) {
                        (state as CancellableContinuationImpl<Unit>).resume(Unit)
                        return
                    }
                }
            }
        }
    }
    fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
        assert { state !is CancellableContinuationImpl<*> }
        return state === PENDING
    }
    @Suppress("UNCHECKED_CAST")
    suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
        assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
        if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
        // CAS failed -- the only possible reason is that it is already in pending state now
        assert { _state.value === PENDING }
        cont.resume(Unit)
    }
}

一个StateFlowSlot里边有以下状况:
一般状况下有如下状况机切换

graph TD
null --> NONE --> PENDING --> 进行新value设定

最终调用的是CancellableContinuationImpl 里边的执行,这儿没什么好讲的,便是使用cas机制保证状况机的切换
值得注意的是,第一个null 和NONE的差异,null代表的是初始状况,设置null是为了防止新new一个StateFlowSlot 和对StateFlowSlot同时进行makePending 导致状况机紊乱,所以才多加了一个初始状况null,简略表示StateFlowSlot刚刚创立,还没来的及去进入状况机更改,而NONE代表着能够进入状况机更改。

总结

到这儿StateFlow现已是根本解说完毕了,有理解过错的地方还望纠正,最好能够重视一下往期的解析:
使用flow api打造一个总线型库
/post/705860…

github地址:github.com/TestPlanB/p… 欢迎star or pr