作者:Elye
原文链接:medium.com/mobile-app-…
上一年年初的时候,我写了篇文章Kotlin Flow a much better version of Sequence?, 介绍了Flow,并与Sequence进行了比较。
现在除了Flow之外,咱们还有ChannelFlow和CallbackFlow。它们与Flow有什么不同呢?为了了解它,让咱们首先开端研讨根本的Flow原理。
Flow
首先先看下面的代码,发送从1到5的数值。
fun main() = runBlocking {
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
.collect { value ->
delay(100)
println("Consuming $value")
}
}
代码履行成果如下:
Emitting 1
Consuming 1
Emitting 2
Consuming 2
Emitting 3
Consuming 3
Emitting 4
Consuming 4
Emitting 5
Consuming 5
咱们能够看到,每个emit都有必要鄙人一个emit调用之前被耗费掉。这是由于没有缓冲区(buffer)来存储额外的emit,因此每个emit都有必要排队,如下图所示:
没有任何通道的情况下,emit(发送)和consume(耗费)是同步进行的。这在咱们想保证在前一个事件被耗费之前不emit的情况下是好的。可是,它会减慢emit(发送)进程。
添加缓冲区(Buffer)
为了使emit(发送)速度更快,咱们能够给它添加一个缓冲区(运用buffer办法)。咱们先试试0缓冲,代码如下:
fun main() = runBlocking {
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}.buffer(0)
.collect { value ->
delay(100)
println("Consuming $value")
}
}
履行成果如下:
Emitting 1
Emitting 2
Consuming 1
Emitting 3
Consuming 2
Emitting 4
Consuming 3
Emitting 5
Consuming 4
Consuming 5
虽然是0缓冲区,可是添加的缓冲区能够给缓冲区添加一个emit事件,如下图所示:
假如有 1 个缓冲区,成果会怎样?履行成果成果如下
Emitting 1
Emitting 2
Emitting 3
Consuming 1
Emitting 4
Consuming 2
Emitting 5
Consuming 3
Consuming 4
Consuming 5
这将允许在consume(消费)之前的第一个事件前完结3个emit(发送)。
运用.buffer(2),其履行成果如下
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Consuming 1
Emitting 5
Consuming 2
Consuming 3
Consuming 4
Consuming 5
运用.buffer(3),咱们能够直接完结5个事件的emit。其成果如下
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5
除了Buffer之外,还有Conflate和CollectLatest,这篇文章Kotlin Flow Buffer is like A Fashion Adoption 对此进行了说明
ChannelFlow
上面咱们现已了解了Flow的缓冲区,现在让咱们看看Flow和ChannelFlow之间有什么区别吧。
ChannelFlow 是自带Buffer的Flow
履行如下代码:
fun main() = runBlocking {
channelFlow {
for (i in 1..5) {
println("Emitting $i")
send(i)
}
}
.collect { value ->
delay(100)
println("Consuming $value")
}
}
成果如下:
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5
这看起来像运用了.buffer(3)的Flow。可是ChannelFlow有更多缓存区,由于默许情况下它有64个缓冲区。你也能够经过buffer
办法来设置缓冲区的大小
需求留意的是: channelFlow.buffer(0) 不等同于flow,而是类似 flow.buffer(0),更详细的解说请参考这篇文章Kotlin’s Channel Flow With Rendezvous Is Not The Same As Kotlin Flow
ChannelFlow 能够进行异步处理(例如 Merge)
假如咱们有下面两个flow
val myAbcFlow = flow { ('A'..'E').forEach {
delay(50)
emit(it)
} }
val my123Flow = flow { (1..5).forEach {
delay(50)
emit(it)
} }
而且想兼并它们,代码如下:
fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
collect { emit(it) }
other.collect { emit(it) }
}
my123Flow.flowMerge(myAbcFlow).collect {
delay(50)
print("$it ")
}
履行成果如下所示:
1 2 3 4 5 A B C D E
能够看出,兼并进程是依照顺序来的。collect{ emit(it) }
全部履行完,才会履行other.collect{ emit(it) }
。
咱们不能运用 launch 来进行异步处理,由于它不是 CoroutineScope 的一部分。代码如下:
fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
launch { // 这是过错的用法
collect { emit(it) }
}
other.collect { emit(it) }
}
下面的操作也是过错的
fun <T> Flow<T>.flowMerge(other: Flow<T>): Flow<T> = flow {
CoroutineScope(Dispatchers.Main).launch {
collect { emit(it) }
}
other.collect { emit(it) }
}
这种做法能够编译,可是在运行时会出错
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
这种情况下,你就能够运用channelFlow
fun <T> Flow<T>.channelMerge(other: Flow<T>): Flow<T> = channelFlow{
launch { // THIS IS OKAY
collect { send(it) }
}
other.collect { send(it) }
}
代码履行成果如下
A 1 B 2 C 3 D 4 E 5
launch { collect { send(it) }}
办法不再阻塞,因此other.collect{ send(it) }
能够并行履行。ChannelFlow 加快了履行流程,并让兼并作业交替履行。
ChannelFlow能坚持Open
当咱们运用flow履行下面的代码
flow {
for (i in 1..5) emit(i)
}.collect { println(it) }
当履行完结,flow就封闭了。类似的,咱们也能够运用ChannelFlow
channelFlow {
for (i in 1..5) send(i)
}.collect { println(it) }
当履行完结时,channelFlow也就封闭了。
假如数据来自另一个来历,而咱们想坚持flow open呢?也就是,数据能够在上面的Channel Flow lambda函数之外持续发送。
在正常情况下,这是不可能的。flow在履行完后将被封闭。可是,对于ChannelFlow,咱们能够运用的awaitClose()
办法,代码如下:
channelFlow {
for (i in 1..5) send(i)
awaitClose()
}.collect { println(it) }
这个办法将防止函数在Lamda之后被封闭。为了完成在channelFlow lambda之后发送数据的能力,咱们能够经过传递给一个变量函数将发送函数分配到外面,代码如下所示:
fun main(): Unit = runBlocking {
var sendData: suspend (data: Int) -> Unit = { }
var closeChannel: () -> Unit = { }
launch {
channelFlow {
for (i in 1..5) send(i)
sendData = { data -> send(data) }
closeChannel = { close() }
awaitClose {
sendData = {}
closeChannel = {}
}
}.collect { println(it) }
}
delay(10)
println("Sending 6")
sendData(6)
closeChannel()
}
咱们现在能够在ChannelFlow
lambda之后运用sendData
发送数据。咱们还能够稍后封闭通道,运用closeChannel
办法,它将在lambda中调用close()。
ChannelFlow 运用非挂起(suspend)函数
当咱们运用flow时,咱们有必要运用emit,它是一个suspend的办法
public suspend fun emit(value: T)
在 ChannelFlow中,咱们运用send
办法替代emit
办法,send
也是一个suspend办法
public suspend fun send(element: E)
假如约束了咱们运用非suspend办法,在ChannelFlow中,咱们能够运用trySend
public fun trySend(element: E): ChannelResult<Unit>
trySend
办法允许咱们在不需求suspend的情况下发送数据。
fun main(): Unit = runBlocking {
var sendData: (data: Int) -> Unit = { } // Not suspending
var closeChannel: () -> Unit = { }
launch {
channelFlow {
for (i in 1..5) trySend(i)
sendData = { data -> trySend(data) }
closeChannel = { close() }
awaitClose {
sendData = {}
closeChannel = {}
}
}.collect { println(it) }
}
delay(10)
println("Sending 6")
sendData(6)
closeChannel()
sendData(7)
}
留意,trySend
在流被封闭的情况下运用ChannelResult回来状况,而不是溃散。除了trySend
办法外,还有trySendBlocking
办法。假如缓冲区已满,trySend 将不发送数据,并履行下一次 trySend。假如缓冲区已满,trySendBlocking 将不会发送数据,而是等待缓冲区可用并再次 trySendBlocking。
public fun <E> SendChannel<E>.trySendBlocking(element: E)
: ChannelResult<Unit> {
trySend(element).onSuccess { return ChannelResult.success(Unit) }
return runBlocking {
val r = runCatching { send(element) }
if (r.isSuccess) ChannelResult.success(Unit)
else ChannelResult.closed(r.exceptionOrNull())
}
}
CallbackFlow
当咱们在ChannelFlow里边运用awaitClose()时,咱们是期望设置其lambda的回调。为了保证咱们记住运用awaitClose(),咱们能够运用CallbackFlow替代channelFlow,CallbackFlow本质上是channelFlow,CallbackFlow强制要求运用awaitClose()。
假如咱们不在CallbackFlow运用awaitClose(),你会得到一个运行时过错,即
Exception in thread "main" java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
更多见解,请查看下面的文章。Keep Your Kotlin Flow Alive and Listening With CallbackFlow
咱们还需求运用flow吗?
最开端,我认为flow只能完成channelFlow的部分功用,channelFlow是flow的一个超集。然后,我了解到flow默许情况下的行为与channelFlow略有不同,正如这篇文章所描绘的那样Kotlin’s Channel Flow With Rendezvous Is Not The Same As Kotlin Flow
除此以外,与channelFlow相比,flow的重量也更轻。如下代码,假如咱们仅仅比较emit
和send
fun main() = runBlocking {
val x = 10000000
val time = measureNanoTime {
channelFlow {
for (i in 1..x) {
send(i)
}
}.collect {}
}
println(time)
}
fun main() = runBlocking {
val x = 10000000
val time = measureNanoTime {
flow {
for (i in 1..x) {
emit(i)
}
}.collect {}
}
println(time)
}
成果如下图
咱们能够显着看到,flow在默许情况下比channelFlow的重量更轻。所以,除非你需求 channelFlow 的花哨功用,否则请运用 Flow。
总结
1. 假如flow满意你的要求,请运用flow
2. 假如你需求在flow里边运用缓冲和异步,运用ChannelFlow
3. 假如你需求在flow外部发送数据,运用带有 awaitClose 的 CallbackFlow。