在Kotlin中,Flow是一种异步数据流处理的声明式编程东西。它是一种冷流(cold stream)的概念,类似于 RxJava 中的 Observable,但有一些重要的差异。

本文基于Kotlin 1.8.20,协程 1.7.1

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

最初

简单的运用入门在前面的文章中咱们有所介绍,具体请看:Flow流常识整理

本次咱们首要介绍Flow常用的操作符运用和原理,下面先看下操作符表格,了解下每个操作符的作用。

asFlow() 将Rang或许List等转化为Flow
map() 映射一次Flow发射的数据,只能改换发射的值
transform() 类似于Map,可是它能够操控Flow的发射
take() 操控Flow发射的数量,比如原始emit 3个,take(1)能够限制只发射1个
combine() 组合两个Flow,组合出来的Flow数据取决于最大的那个
zip() 组合两个Flow,组合出来的Flow数据取决去最小的那个
collectLatest() 接纳最新的数据
buffer() 操控Flow缓冲区的巨细,可平衡发送者和接受者之间的速度差异
conflate() 参加缓冲区,并且只接纳最新的数据
flatMapConcat() 转化原始Flow发射出来的数据,并且转化出来的是别的一个Flow,它连接了原始和新的Flow
flatMapMerge() 它和flatMapConcat()作用是共同的,差异在于它并不能确保时序
flatMapLatest() 也是转化原始Flow,并且只接受原始Flow最新的数据
flowOn() 限制Flow发射的协程域

asFlow()

asFlow()能够将数组、列表和Lambda函数直接转化为一个Flow,比如咱们我发射列表中的所有数据,不必在flowOf{}循环去emit每一个数据,直接运用asFlow()方法即可。

fun main() = runBlocking {
    createAsFlow().collect {
        println("collect asFlow $it")
    }
    createListAsFlow().collect {
        println("collect listAsFlow $it")
    }
}
// 将一个Rang目标转化成Flow目标
fun createAsFlow() = (0..2).asFlow()
// 将一个List目标转化成Flow目标
fun createListAsFlow() = listOf(1, 2, 3).asFlow()
# 
collect asFlow 0
collect asFlow 1
collect asFlow 2
collect listAsFlow 1
collect listAsFlow 2
collect listAsFlow 3

map()

map的意思是映射,它能够将Flow的原始数据映射成咱们所需求的数据格式,它只能够改动发射出来的数据,比如咱们将收集到的整数转化成字符串,就能够运用map()

fun main() = runBlocking {
    (0..2).asFlow().map { "Map一下 $it" }.collect {
        println("map collect $it")
    }
}
#
map collect Map一下 0
map collect Map一下 1
map collect Map一下 2

Kotlin中Flow操作符解析大全

transform()

transform的意思是转化、改换,它能够将原始的Flow改换成别的一个Flow,它和map相同的是都能够改动Flow发射的数据,可是它的不同点在于它所改换的目标是Flow,而map映射的目标是流发射出来的数据,下面咱们看下transform的具体用法和作用

fun main() = runBlocking {
    (0..1).asFlow().transform {
        emit(it + 1)
    }.collect {
        println("transform collect $it")
    }
}
#
transform collect 1
transform collect 2

Kotlin中Flow操作符解析大全

示例代码中中标红的当地就能够看出,transform能够改换原始流,将原始流的数据转化成咱们需求的数据再次发射emit出去

take()

take便是拿走、带走的意思,在Flow中它的作用是操控了接纳的数量,比如说原始流要发射4个数据,可是咱们只想要流中的前两个数据,那么能够直接运用take(2)来操控此数量

fun main() = runBlocking {
    (0..3).asFlow().take(2)
        .collect {
            println("collect take flow: $it")
        }
}
#
collect take flow: 0
collect take flow: 1

combine()

combine是结合的意思,在Flow中它能够将两个流发射出来的数据结合在一起

fun main() = runBlocking {
    val stringFlow = listOf("a", "b").asFlow()
    (0..2).asFlow().combine(stringFlow) { origin1, origin2 ->
        "$origin1 - $origin2"
    }.collect {
        println("collect combine flow: $it")
    }
}
# 
collect combine flow: 0 - a
collect combine flow: 1 - b
collect combine flow: 2 - b

Kotlin中Flow操作符解析大全

留意看上面的代码,stringFlow只会发射两个数据,(0..2)会发射三个数据,经过combine()组合只会一共会发射三个数据,它是以最多数据的那个流为主,超出的数据都是和另一个流的最终一个数据来组合

zip()

zip也是组合的意思,它和combine的作用是共同的,都是将两个流数据组合起来,可是终究作用两者有所差异,zip终究的数据量是依据组合前两个流数量最少的那个为主

fun main() = runBlocking {
    createZip1Flow().zip(createZip2Flow()) { i, j ->
        "$i - $j"
    }.collect {
        println("collect zip $it")
    }
}
fun createZip1Flow() = (0..2).asFlow()
fun createZip2Flow() = listOf("a", "b").asFlow()
#
collect zip 0 - a
collect zip 1 - b

zip1Flow自身会发射三个数据,而zip2Flow只会发射两个数据,运用zip之后,咱们终究得到的只有两个数据,zip1Flow中第三个数据会被放弃。

collectLatest()

latest字面意思是最新的、最近的意思,collectLatest联想一下是不是收集最新最近的数据呢?咱们经过代码来验证下此想法

fun main() = runBlocking {
    createLatestFlow().collectLatest {
        println("start collectLatest: $it")
      	// 模仿耗时操作500毫米
        delay(500L)
        println("end collectLatest: $it")
    }
}
fun createLatestFlow() = flow {
    var i = 0
    while (i < 10) {
        emit(i++)
      	// 发射完数据之后延时100毫秒
        delay(100)
    }
}

先看下上面代码,createLatestFlow每次发射完数据之后都会延时100毫秒,并且只发射10次,10次之后中止发射;collectLatest有两处日志打印,start处是延时之前,end是延时之后。其实一开始触摸collectLatest的时分我的预期是每过500毫秒接纳一次最新的数据,然而事实却并非如此,collectLatest的实际情况是接纳到一个数据的时分,延时之前(start处)会得到正常的打印,而延时之后(end处)的打印却不会履行,因为在延时500毫秒之后,它现已接纳到了别的一个或多个新的数据,所以它会扔掉旧数据,也便是end打印不会再履行了,等到流发射完最终一个数据的时分,没有新数据发射了,end会履行最终一个接纳到的数据。

也便是说start处每次接纳到数据的时分都会得到输出,而end只会输出最终一个数据,下面履行看看输出是不是如此

start collectLatest: 0
start collectLatest: 1
start collectLatest: 2
start collectLatest: 3
start collectLatest: 4
start collectLatest: 5
start collectLatest: 6
start collectLatest: 7
start collectLatest: 8
start collectLatest: 9
end collectLatest: 9

经过输出就能够证明咱们上面的剖析的确没错。假如讲collectLatest模仿延时时刻改为50毫秒,小于发射延时呢?作用会是咋样,接纳处理的延时小于发射的延时,在接纳的函数中startend应该是都能够得到正常的输出。

fun main() = runBlocking {
    createLatestFlow().collectLatest {
        println("start collectLatest: $it")
        delay(50L)
        println("end collectLatest: $it")
    }
}
fun createLatestFlow() = flow {
    var i = 0
    while (i < 10) {
        emit(i++)
        delay(100)
    }
}
#
start collectLatest: 0
end collectLatest: 0
start collectLatest: 1
end collectLatest: 1
start collectLatest: 2
end collectLatest: 2
start collectLatest: 3
end collectLatest: 3
start collectLatest: 4
end collectLatest: 4
start collectLatest: 5
end collectLatest: 5
start collectLatest: 6
end collectLatest: 6
start collectLatest: 7
end collectLatest: 7
start collectLatest: 8
end collectLatest: 8
start collectLatest: 9
end collectLatest: 9

buffer()

一看到buffer函数,基本就能够联想到缓冲了,也便是经典的背压问题,当发射处流速大于接纳处速度时就会产生背压问题,咱们能够经过上面collectLatest()函数来只接纳最新数据,当然假如咱们想接纳每一个数据时就能够经过buffer来处理了。

下面咱们模仿下接纳处比发射处要慢的场景,看看流的运行是什么样的

fun main() = runBlocking {
    val time = System.currentTimeMillis()
    flow {
        for (i in 1..10) {
            // 模仿每次延时 1 秒发射数据
            delay(1000L)
            emit(i)
        }
    }.onEach {
        println("emit $it, time: ${System.currentTimeMillis() - time}")
    }.collect {
        // 模仿每次接纳的时分都延时 2 秒
        delay(2000L)
        println("buffer flow collect $it, time: ${System.currentTimeMillis() - time}")
    }
}

在上面示例中,咱们经过 delay将接纳的延时设置大于发射延时,看看输出的情况

emit 1, time: 1028
buffer flow collect 1, time: 3033
emit 2, time: 4035
buffer flow collect 2, time: 6040
emit 3, time: 7045
buffer flow collect 3, time: 9045
emit 4, time: 10050
buffer flow collect 4, time: 12055
emit 5, time: 13056
buffer flow collect 5, time: 15057
emit 6, time: 16059
buffer flow collect 6, time: 18061
emit 7, time: 19062
buffer flow collect 7, time: 21066
emit 8, time: 22072
buffer flow collect 8, time: 24074
emit 9, time: 25077
buffer flow collect 9, time: 27079
emit 10, time: 28080
buffer flow collect 10, time: 30081

从输出日志中能够看出,发射+接纳此 10 个数据一共花费了 30s,流的每次发射需求等候接纳处理完成才会发射下一个数据,这就导致了一共花费的时刻额定的长,试想一下假如在事务处理中那么会白白的浪费咱们等候的时刻,此刻咱们就能够经过 buffer()来处理这种问题,接下来咱们再看看加上 buffer()的作用是咋样的

fun main() = runBlocking {
    val time = System.currentTimeMillis()
    flow {
        for (i in 1..10) {
            // 模仿每次延时 1 秒发射数据
            delay(1000L)
            emit(i)
        }
    }.onEach {
        println("emit $it, time: ${System.currentTimeMillis() - time}")
    }.buffer().collect {
        // 模仿每次接纳的时分都延时 2 秒
        delay(2000L)
        println("buffer flow collect $it, time: ${System.currentTimeMillis() - time}")
    }
}

在原有的代码上仅仅加上了 buffer()函数,未改动其它当地,再看看日志输出

emit 1, time: 1051
emit 2, time: 2057
buffer flow collect 1, time: 3057
emit 3, time: 3058
emit 4, time: 4059
buffer flow collect 2, time: 5059
emit 5, time: 5060
emit 6, time: 6065
buffer flow collect 3, time: 7059
emit 7, time: 7066
emit 8, time: 8071
buffer flow collect 4, time: 9060
emit 9, time: 9073
emit 10, time: 10074
buffer flow collect 5, time: 11061
buffer flow collect 6, time: 13065
buffer flow collect 7, time: 15070
buffer flow collect 8, time: 17071
buffer flow collect 9, time: 19072
buffer flow collect 10, time: 21077

从此输出日志中能够看出,发射+接纳此 10 个数据一共花费了 21s,并且能够看出此刻发射数据不再受制于接纳处理完了再发射下一个数据,buffer() Flow提供了一个缓冲区,它将接纳到的数据缓冲起来,这样发射处就不用等候接纳处处理完数据再进行数据的发射,也就能够减少 Flow履行的总体时刻。

buffer()有一个可选的参数capacity,它表示了缓冲区的巨细,默认为Channel.BUFFERED也便是 64

  • UNLIMITED表示不限制缓冲区的巨细,可为无限大,可是此形式占用的内存大,慎用;
  • RENDEZVOUS表示缓冲区巨细为 0,发射处在遇到接纳处处理数据的时分,有必要等候接纳处处理完了才会发射新的数据;
  • BUFFERD是默认的形式,表示缓冲区巨细为固定的 64;
  • CONFLATED这个形式表示每次接纳的时分只会取最新的那个数据,之前未来得及处理的数据都直接扔掉不再处理。

conflate()

conflate字面意思是兼并,可是看源码其实便是调用buffer(CONFLATED)函数,用于增加缓冲区并且只处理最新的数据,此处就不再过多解析。

flatMapConcat()

flatMapConcat()包括下面介绍的 flatMapMerge()flatMapLatest()函数都是转化原始流数据的作用,并且在转化之后做了额定的操作,

咱们先看下flatMapConcat()接纳的参数:transform: suspend (value: T) -> Flow<R>,参数是一个 Lambda,并且返回的是一个 Flow目标,也便是说它将原始流的数据接纳到之后再转化成了一个新的流,终究将新的流数据发射出去,下面咱们来实践看看作用

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    createFlatMapFlow().flatMapConcat {
      	// 这儿需求返回一个 Flow 目标,咱们创建了一个新的 Flow,并且原封不动的将原始流数据再发射出去
        flow {
            emit(it)
        }
    }.collect {
        println("collect flatMapConcat $it")
    }
}
fun createFlatMapFlow() = flow {
    (3 downTo 1).forEach {
        emit(it)
    }
}
#
collect flatMapConcat 3
collect flatMapConcat 2
collect flatMapConcat 1

看到上面代码是不是有种疑问,这函数有啥用啊,原始的流不要,新建一个流替换它发射数据?这儿大家细心的看一下,flatMapConcat是对原始流的每一个数据都回新建一个流,然后发射,试想一种场景,先查询一个年级的所有班级信息,然后再查询每个班级的学生信息,这种场景是不是特别合适flatMapConcat操作符,它先接纳每个班级的信息,然后对班级中每个学生信息再进行查询

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    fetchClassInfo().flatMapConcat {
        fetchStudent(it)
    }.collect {}
}
fun fetchClassInfo() = flow {
    emit(""/*班级信息*/)
}
fun fetchStudent(classInfo: String) = flow {
    emit(""/*学生信息*/)
}

flatMapMerge()

假如把上面flatMapConcat()换成flatMapMerge(),咱们会发现输出的日志居然如出一辙,的确如此两者在这方面的作用是共同的,文章最初的表格中咱们也提到了,flatMapMerge()的差异在于时序上面不能确保原始流的发射次序,下面咱们经过模仿耗时操作来看下两者在时序上面的差异

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    createFlatMergeFlow().flatMapConcat {
        flow {
            // 模仿耗时操作,依据原始流数据*100 毫秒,原始流发射数据为3,2,1
            kotlinx.coroutines.delay(it * 100L)
            emit(it)
        }
    }.collect {
        println("collect flatMapConcat $it")
    }
    createFlatMergeFlow().flatMapMerge {
        flow {
            // 模仿耗时操作,依据原始流数据*100 毫秒,原始流发射数据为3,2,1
            kotlinx.coroutines.delay(it * 100L)
            emit(it)
        }
    }.collect {
        println("collect createFlatMergeFlow $it")
    }
}
fun createFlatMergeFlow() = flow {
    (3 downTo 1).forEach {
        emit(it)
    }
}

上述代码咱们原始流的发射数据应该是 3,2,1,然后经过flatMapConcatflatMapMerge内部相同的逻辑来转化流的发射,以此延时 300ms、200ms、100ms,假如两者都能确保原始流的时序,那么是不是转化之后仍是应该都输出3,2,1呢?运行看下具体的输出

collect flatMapConcat 3
collect flatMapConcat 2
collect flatMapConcat 1
collect createFlatMergeFlow 1
collect createFlatMergeFlow 2
collect createFlatMergeFlow 3

经过日志就能够清楚的知道,在延时的作用下flatMapConcat依旧能够确保原始流的发射次序,然而flatMapMerge却改动了原始流的发射次序,延时少的先得到输出,延时越长发射越慢,这便是两者在时序上的差异。

flatMapLatest()

flatMapLatest其实便是在转化原始流的操作了结合了collectLatest的特征,它只接纳原始流最新的数据,并且将最新数据转化成别的一个流。

这儿就简单看个示例

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    createFlatMapLatestFlow().flatMapLatest {
        flow {
            println("start flatMapLatest: $it")
            delay(300L)
            println("end flatMapLatest: $it")
            emit(it)
        }
    }.collect {
        println("collect flatMapLatest: $it")
    }
}
fun createFlatMapLatestFlow(): Flow<Int> {
    return flow {
        (0..10).forEach {
            delay(100L)
            emit(it)
        }
    }
}
#
start flatMapLatest: 0
start flatMapLatest: 1
start flatMapLatest: 2
start flatMapLatest: 3
start flatMapLatest: 4
start flatMapLatest: 5
start flatMapLatest: 6
start flatMapLatest: 7
start flatMapLatest: 8
start flatMapLatest: 9
start flatMapLatest: 10
end flatMapLatest: 10
collect flatMapLatest: 10

原始流每次延时 100ms 发射数据,然后在flatMapLatest()中模仿了 300ms 的延时操作,300ms 之前的日志每次都是能够得到输出,300ms 之后只有最新的数据 10 才能够得到输出,并且在最新的收集中也只有最新的数据 10,看到这儿是不是便是类似flatMapConcat结合了collectLatest两者的特性,仍是比较简单理解的。

flowOn()

flowOn()函数首要便是用来指定流发射时的协程环境,假如咱们想指定发射的操作在子线程中,无需经过thread或许 CoroutinesScope来切换线程,只需求调用 flowOn(Dispatchers.Default/IO)来切换即可,下面经过示例来看看实际作用

fun main() = runBlocking {
    createFlowOn().collect {
        println("flowOn collect $it, thread ${Thread.currentThread().name}")
    }
}
fun createFlowOn() = flow {
    for (i in 0..2) {
        emit(i)
        println("emit thread ${Thread.currentThread().name}")
    }
}
#
flowOn collect 0, thread main
emit thread main
flowOn collect 1, thread main
emit thread main
flowOn collect 2, thread main
emit thread main

此刻咱们未调用 flowOn()函数来切换线程,经过日志能够直观的看出,Flow的发射和接纳是在同一个线程中,也便是main线程中,然后咱们再给它指定个 IO协程环境看看

fun main() = runBlocking {
    createFlowOn().collect {
        println("flowOn collect $it, thread ${Thread.currentThread().name}")
    }
}
fun createFlowOn() = flow {
    for (i in 0..2) {
        emit(i)
        println("emit thread ${Thread.currentThread().name}")
    }
}.flowOn(Dispatchers.IO)
#
emit thread DefaultDispatcher-worker-1
emit thread DefaultDispatcher-worker-1
emit thread DefaultDispatcher-worker-1
flowOn collect 0, thread main
flowOn collect 1, thread main
flowOn collect 2, thread main

加上 flowOn(Dispatcher.IO)之后,Flow的发射处就变成了子线程,接纳依旧在主线程中,这样它的作用就很明显了。

到此为止, Flow的大部分操作符就介绍结束了,假如文章中有什么你觉得不对的当地帮助指出,谢谢!