在看 flow 相关的源码,总是看到 FusibleFlow、ChannelFlow、ChannelFlowOperatorImpl 等内容,所以把 Channel 相关内容代码都过一遍

// flow 的操作符
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

Channel是什么

Kotlin 版的 BlockingQueue, 我的个人理解是支撑在协程环境下的运用的队列数据结构

根据 capacityonBufferOverflow 获取不同 Channel 实例

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                // step1.1: 溢出战略为挂起的时分,无须缓存区,挂起就好
                RendezvousChannel(onUndeliveredElement)
            else
                // step1.2: 溢出丢掉战略,设置一个缓存区寄存
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        // step2: 设置兼并容量形式,由于会自动扔掉最老的数据, 其实我感觉这儿只生效 DROP_OLDEST, 要么不判别,要么判别全
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }
        // step3: 设置无限制形式,底层运用链表完结,由于容量无限制,相对应的溢出也不存在了
        UNLIMITED -> LinkedListChannel(onUndeliveredElement) 
        // step4: 设置缓存形式,假如为挂起战略则容量为64(默许), 反之容量为1,由于剩余2种溢出战略不是丢掉最新的就是丢掉最老的,所以容量1足够
        BUFFERED -> ArrayChannel(
            if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        // step5: 设置指定容量的的时分,创立 ArrayChannel
        else -> {
            if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                step5.1: 与 CONFLATED相同直接复用
                ConflatedChannel(onUndeliveredElement) 
            else
                ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }
  • Tips: 其实容量还有一个特别值为 OPTIONAL_CHANNEL, 是 ChannelFlow 内部运用的,代表可选择是否运用Channel,即能不必就不必;场景为假如非 ContinuationInterceptor 上下文切换 而且 无buffer 的时分,不需要 Channel 的介入
class ChannelFlowOperator{
    // ...
    override suspend fun collect(collector: FlowCollector<T>) {
        // step1: 假如channel是可选的且没有buffer(flowOn/flowWith操作符)
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            val collectContext = coroutineContext
            val newContext = collectContext + context 
            // step2: 没有上下文产生改变,则开始原始流搜集
            if (newContext == collectContext)
                return flowCollect(collector)
            // step3: 不需要改变协程的线程环境
            if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
                return collectWithContextUndispatched(collector, newContext)
        }
        // step4: 创立 channel 开始搜集
        super.collect(collector)
    }
    // ...
}

Channel 转成 Flow: 意图让 Channel 具备 Flow 的运算符

Channel 转换 flow 有2种完结方法,实质是同一种完结方法,仅仅是否可以被多次 collect

  1. ReceiveChannel#consumeAsFlow(): 只能被 collect 一次,多次 collect 时分会抛反常;
  2. ReceiveChannel#receiveAsFlow: 能被 collect 多次,但 Channel 中的一条消息只能被一个 collector 所消费, 俗称扇形消费
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)
private class ChannelAsFlow<T>(
    private val channel: ReceiveChannel<T>,
    private val consume: Boolean,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    private val consumed = atomic(false)
    private fun markConsumed() {
        if (consume) {
            check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
        }
    }
    // ...
    override suspend fun collect(collector: FlowCollector<T>) {
        // step1: 从 receiveAsFlow 或许 consumeAsFlow 构造可知,只会进入此分支
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            // step2: 检查是否消费过
            markConsumed()
            // step3: 把 channel 的交给 collector 搜集
            collector.emitAllImpl(channel, consume) 
        } else {
            super.collect(collector) 
        }
    }
    // ... 
}
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    //step4: 检查协程是否正常
    ensureActive()
    var cause: Throwable? = null
    try {
        while (true) {
            // step5: 获取 channel 中队头 => 由这儿知道当时值被此 collector 消费,其他 collector 就获取不到了
            val result = run { channel.receiveCatching() }
            // step6: channel关闭的,则跳出循环
            if (result.isClosed) {
                result.exceptionOrNull()?.let { throw it }
                break
            }
            // step7: 未关闭,弹出获取的值
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        // step8: 撤销 channel
        if (consume) channel.cancelConsumed(cause)
    }
}

BroadcastChannel: 广播Channel(现已被 StatedFlowSharedFlow 替换)

  1. 当容量 capacity 设置为 Conflated, 实际会生成 ConflatedBroadcastChannel, 相当于溢出的时分每次丢掉最老的,而且内部只要一个状况存储值, 等价于 StatedFlow

  2. 当容量 capacity 设置为大于0的时分,实际生成 ArrayBroadcastChannel, 彻底被 SharedFlow 功用替代,而且还多了 replay 功用

ChannelFlow: 意图扩展Flow功用,比方切换协程履行上下文、增加buffer等

FusibleFlow: 意图是提供一个融合方法,可以返回一个进行本身上下文的切换容量溢出战略装备后的流

public interface FusibleFlow<T> : Flow<T> {
    public fun fuse(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    ): Flow<T>
}

ChannelFlow: 继承与FusibleFlow

ChannelFlow 其实算 Channel 场景的一个应用;Channel 本身提供了 capacityonBufferOverflow 装备,可以装备当作背压来运用;一起 Channel 本身也是一个出产消费者模型的数据结构,可以在其他协程出产,在 collector 协程进行消费;所以 Kotlin 官方运用 Channel 来完结 flowOnbuffer 流操作符的完结

则最初所贴的代码就好理解了

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        // step1: 假如没有上下文切换,则返回本身
        context == EmptyCoroutineContext -> this
        // step2: 假如本身支撑切换的话,则调用本身方法进行切换
        this is FusibleFlow -> fuse(context = context)
        // step3: ChannelFlowOperatorImpl 继承 ChannelFlow,ChannelFlow 继承 FusibleFlow,走到此分支,则代表是一个普通流,运用 ChanelFlowOperatorImpl 进行包装切换上下文
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

ChannelFlowOperatorImpl 源码解析: 从 ChannelFlowOperatorImpl#collect 为起点

internal class ChannelFlowOperatorImpl<T>(
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
    internal val collectToFun: suspend (ProducerScope<T>) -> Unit
        get() = { collectTo(it) }
    // Tips: 从上述 flowOn 可知, 不会对流进行无限包装,则返回原始流即可
    override fun dropChannelOperators(): Flow<T> = flow
    // step1: 开始搜集内容
    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
           val channel: ReceiveChannel<T> = produceImpl(this)
            // step10: 传入的 collector 接受 channel 传过来的内容
            collector.emitAll(channel)
        }
    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
         // step2: 发动一个新的协程去履行 collectToFun,即 collectTo
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
    protected override suspend fun collectTo(scope: ProducerScope<T>) =
        // step8: 创立 collector 进行初始流搜集
        flowCollect(SendingCollector(scope))
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)
}
public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    // step9: 把初始流的内容发射到 channel 中
    override suspend fun emit(value: T): Unit = channel.send(value)
}
internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    // step3: 按照容量和溢出战略装备channel
    val channel = Channel<E>(capacity, onBufferOverflow)
    // step4: 取得新的协程环境
    val newContext = newCoroutineContext(context)
    // step5: channel 和 协程上下文
    val coroutine = ProducerCoroutine(newContext, channel)
    // step6: 注册协程完结回调
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    // step7: 发动协程,会履行 ChannelFlowOperator#collectToFun => ChannelFlowOperator#collectTo
    coroutine.start(start, coroutine, block)
    return coroutine
}