作者:Elye

原文链接:medium.com/mobile-app-…

上一年年初的时候,我写了篇文章Kotlin Flow a much better version of Sequence?, 介绍了Flow,并与Sequence进行了比较。

现在除了Flow之外,咱们还有ChannelFlow和CallbackFlow。它们与Flow有什么不同呢?为了了解它,让咱们首先开端研讨根本的Flow原理。

Flow

首先先看下面的代码,发送从1到5的数值。

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            println("Emitting $i")
            emit(i)
        }
    }
        .collect { value ->
            delay(100)
            println("Consuming $value")
        }
}

代码履行成果如下:

Emitting 1
Consuming 1
Emitting 2
Consuming 2
Emitting 3
Consuming 3
Emitting 4
Consuming 4
Emitting 5
Consuming 5

咱们能够看到,每个emit都有必要鄙人一个emit调用之前被耗费掉。这是由于没有缓冲区(buffer)来存储额外的emit,因此每个emit都有必要排队,如下图所示:

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

没有任何通道的情况下,emit(发送)和consume(耗费)是同步进行的。这在咱们想保证在前一个事件被耗费之前不emit的情况下是好的。可是,它会减慢emit(发送)进程。

添加缓冲区(Buffer)

为了使emit(发送)速度更快,咱们能够给它添加一个缓冲区(运用buffer办法)。咱们先试试0缓冲,代码如下:

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            println("Emitting $i")
            emit(i)
        }
    }.buffer(0)
        .collect { value ->
            delay(100)
            println("Consuming $value")
        }
}

履行成果如下:

Emitting 1
Emitting 2
Consuming 1
Emitting 3
Consuming 2
Emitting 4
Consuming 3
Emitting 5
Consuming 4
Consuming 5

虽然是0缓冲区,可是添加的缓冲区能够给缓冲区添加一个emit事件,如下图所示:

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

假如有 1 个缓冲区,成果会怎样?履行成果成果如下

Emitting 1
Emitting 2
Emitting 3
Consuming 1
Emitting 4
Consuming 2
Emitting 5
Consuming 3
Consuming 4
Consuming 5

这将允许在consume(消费)之前的第一个事件前完结3个emit(发送)。

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

运用.buffer(2),其履行成果如下

Emitting 1
Emitting 2
Emitting 3
Emitting 4
Consuming 1
Emitting 5
Consuming 2
Consuming 3
Consuming 4
Consuming 5

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

运用.buffer(3),咱们能够直接完结5个事件的emit。其成果如下

Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

除了Buffer之外,还有Conflate和CollectLatest,这篇文章Kotlin Flow Buffer is like A Fashion Adoption 对此进行了说明

ChannelFlow

上面咱们现已了解了Flow的缓冲区,现在让咱们看看Flow和ChannelFlow之间有什么区别吧。

ChannelFlow 是自带Buffer的Flow

履行如下代码:

fun main() = runBlocking {
    channelFlow {
        for (i in 1..5) {
            println("Emitting $i")
            send(i)
        }
    }
        .collect { value ->
            delay(100)
            println("Consuming $value")
        }
}

成果如下:

Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5

这看起来像运用了.buffer(3)的Flow。可是ChannelFlow有更多缓存区,由于默许情况下它有64个缓冲区。你也能够经过buffer办法来设置缓冲区的大小

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

需求留意的是: channelFlow.buffer(0) 不等同于flow,而是类似 flow.buffer(0),更详细的解说请参考这篇文章Kotlin’s Channel Flow With Rendezvous Is Not The Same As Kotlin Flow

ChannelFlow 能够进行异步处理(例如 Merge)

假如咱们有下面两个flow

val myAbcFlow = flow { ('A'..'E').forEach {
    delay(50)
    emit(it)
} }
val my123Flow = flow { (1..5).forEach {
    delay(50)
    emit(it)
} }

而且想兼并它们,代码如下:

fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
    collect { emit(it) }
    other.collect { emit(it) }
}
my123Flow.flowMerge(myAbcFlow).collect {
    delay(50)
    print("$it ")
}

履行成果如下所示:

1 2 3 4 5 A B C D E

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

能够看出,兼并进程是依照顺序来的。collect{ emit(it) } 全部履行完,才会履行other.collect{ emit(it) }

咱们不能运用 launch 来进行异步处理,由于它不是 CoroutineScope 的一部分。代码如下:

fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
    launch { // 这是过错的用法 
        collect { emit(it) }
    }
    other.collect { emit(it) }
}

下面的操作也是过错的

fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
    CoroutineScope(Dispatchers.Main).launch {
        collect { emit(it) }
    }
    other.collect { emit(it) }
}

这种做法能够编译,可是在运行时会出错

FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'

这种情况下,你就能够运用channelFlow

fun <T> Flow<T>.channelMerge(other: Flow<T>): Flow<T> = channelFlow{
    launch { // THIS IS OKAY
        collect { send(it) }
    }
    other.collect { send(it) }
}

代码履行成果如下

A 1 B 2 C 3 D 4 E 5

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

launch { collect { send(it) }}办法不再阻塞,因此other.collect{ send(it) } 能够并行履行。ChannelFlow 加快了履行流程,并让兼并作业交替履行。

ChannelFlow能坚持Open

当咱们运用flow履行下面的代码

flow {
    for (i in 1..5) emit(i) 
}.collect { println(it) }

当履行完结,flow就封闭了。类似的,咱们也能够运用ChannelFlow

channelFlow {
    for (i in 1..5) send(i) 
}.collect { println(it) }

当履行完结时,channelFlow也就封闭了。

假如数据来自另一个来历,而咱们想坚持flow open呢?也就是,数据能够在上面的Channel Flow lambda函数之外持续发送。

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

在正常情况下,这是不可能的。flow在履行完后将被封闭。可是,对于ChannelFlow,咱们能够运用的awaitClose()办法,代码如下:

channelFlow {
    for (i in 1..5) send(i)
    awaitClose()
}.collect { println(it) }

这个办法将防止函数在Lamda之后被封闭。为了完成在channelFlow lambda之后发送数据的能力,咱们能够经过传递给一个变量函数将发送函数分配到外面,代码如下所示:

fun main(): Unit = runBlocking {
    var sendData: suspend (data: Int) -> Unit = { }
    var closeChannel: () -> Unit = { }
    launch {
        channelFlow {
            for (i in 1..5) send(i)
            sendData = { data -> send(data) }
            closeChannel = { close() }
            awaitClose { 
                sendData = {}
                closeChannel = {}
            }
        }.collect { println(it) }
    }
    delay(10)
    println("Sending 6")
    sendData(6)
    closeChannel()
}

咱们现在能够在ChannelFlow lambda之后运用sendData发送数据。咱们还能够稍后封闭通道,运用closeChannel办法,它将在lambda中调用close()。

ChannelFlow 运用非挂起(suspend)函数

当咱们运用flow时,咱们有必要运用emit,它是一个suspend的办法

public suspend fun emit(value: T)

在 ChannelFlow中,咱们运用send办法替代emit办法,send也是一个suspend办法

public suspend fun send(element: E)

假如约束了咱们运用非suspend办法,在ChannelFlow中,咱们能够运用trySend

public fun trySend(element: E): ChannelResult<Unit>

trySend办法允许咱们在不需求suspend的情况下发送数据。

fun main(): Unit = runBlocking {
    var sendData: (data: Int) -> Unit = { } // Not suspending
    var closeChannel: () -> Unit = { }
    launch {
        channelFlow {
            for (i in 1..5) trySend(i)
            sendData = { data -> trySend(data) }
            closeChannel = { close() }
            awaitClose {
                sendData = {}
                closeChannel = {}
            }
        }.collect { println(it) }
    }
    delay(10)
    println("Sending 6")
    sendData(6)
    closeChannel()
    sendData(7)
}

留意,trySend在流被封闭的情况下运用ChannelResult回来状况,而不是溃散。除了trySend办法外,还有trySendBlocking办法。假如缓冲区已满,trySend 将不发送数据,并履行下一次 trySend。假如缓冲区已满,trySendBlocking 将不会发送数据,而是等待缓冲区可用并再次 trySendBlocking。

public fun <E> SendChannel<E>.trySendBlocking(element: E)
: ChannelResult<Unit> {
    trySend(element).onSuccess { return ChannelResult.success(Unit) }
    return runBlocking {
        val r = runCatching { send(element) }
        if (r.isSuccess) ChannelResult.success(Unit)
        else ChannelResult.closed(r.exceptionOrNull())
    }
}

CallbackFlow

当咱们在ChannelFlow里边运用awaitClose()时,咱们是期望设置其lambda的回调。为了保证咱们记住运用awaitClose(),咱们能够运用CallbackFlow替代channelFlow,CallbackFlow本质上是channelFlow,CallbackFlow强制要求运用awaitClose()。

假如咱们不在CallbackFlow运用awaitClose(),你会得到一个运行时过错,即

Exception in thread "main" java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.

更多见解,请查看下面的文章。Keep Your Kotlin Flow Alive and Listening With CallbackFlow

咱们还需求运用flow吗?

最开端,我认为flow只能完成channelFlow的部分功用,channelFlow是flow的一个超集。然后,我了解到flow默许情况下的行为与channelFlow略有不同,正如这篇文章所描绘的那样Kotlin’s Channel Flow With Rendezvous Is Not The Same As Kotlin Flow

除此以外,与channelFlow相比,flow的重量也更轻。如下代码,假如咱们仅仅比较emitsend

fun main() = runBlocking {
    val x = 10000000
    val time = measureNanoTime {
        channelFlow {
            for (i in 1..x) {
                send(i)
            }
        }.collect {}
    }
    println(time)
}
fun main() = runBlocking {
    val x = 10000000
    val time = measureNanoTime {
        flow {
            for (i in 1..x) {
                emit(i)
            }
        }.collect {}
    }
    println(time)
}

成果如下图

[译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow

咱们能够显着看到,flow在默许情况下比channelFlow的重量更轻。所以,除非你需求 channelFlow 的花哨功用,否则请运用 Flow。

总结

1. 假如flow满意你的要求,请运用flow

2. 假如你需求在flow里边运用缓冲和异步,运用ChannelFlow

3. 假如你需求在flow外部发送数据,运用带有 awaitClose 的 CallbackFlow。