前语

由于Flow相对来说比较复杂,所以笔者独自整理了一篇;Kotlin Flow流是协程的一个特性,它用作呼应式编程结构。Flow流旨在处理异步数据流。它类似于 Kotlin 中的Sequences,也具有呼应式编程的好处。本文笔者经过思考关于Flow的一些问题,来验证是否初步了解掌握Kotlin Flow

Kotlin面试攻略三部曲系列:

  • 编写Kotlin面试攻略三部曲-根底篇
  • 编写Kotlin面试攻略三部曲-协程篇

什么是 Kotlin Flow流?

在Kotlin官网中界说中是这么说的:

在协程中,与仅回来单个值的挂起函数相反,Flow可按次序宣布多个值, 它以协程为根底构建,可供给多个值,从概念上来讲,Flow是可经过异步办法进行核算处理的一组数据序列,所宣布值的类型必须相同。

总结一下,咱们能够将Flow界说为具有多个异步核算值的协程。它是能够异步核算的数据流,用于按次序发送多个值。 宣布的数据必须是同一类型,而且也是按次序宣布的,Flow运用挂起的函数以异步办法消费和出产。

为什么需求Kotlin Flow?

为了使应用程序呼应并进步其性能,异步编程起着非常重要的作用,日常开发中,在协程没有呈现的年月里,能够运用RxJava呼应式编程结构,它是ReactiveX基于Java的扩展,直到Kotlin的呈现和发展。

Kotlin中,相同的功能以Kotlin Flow API 的形式供给。它不仅具有所需的一切运算符和功能,而且还支持挂起函数,这有助于以次序办法履行异步使命,也让代码变得更加简洁和高效。

假如咱们有 RxJava/其他呼应式完结,为什么要创立 Flow?

Flow中,假如咱们想转化数据,用一个map操作符即可,这与RxJava有些不同,RxJava 具有同步的map和异步的flatMapSingle。流程图运算符有一个lambda,它是一个暂停,因而它适用于同步和异步。过滤数据也是如此,Flow只需一个Filter运算符适用于两者。

所以Flow有用减少了在其它呼应式完结中引入的操作符,更加方便简洁,这也是为什么要运用Flow的重要原因。

Kotlin Flow 是怎样作业的?

往常咱们开发都会运用到网络恳求吧,当从服务器恳求数据并运用异步编程来处理该数据时,Flow会在后台线程中异步管理该数据,由于某些进程可能会运转更长时刻来获取数据。一旦搜集器接收并搜集了数据,就会运用回收器视图显现数据。

谱写Kotlin面试指南三部曲-Flow篇

咱们现已知道,Flow是一系列值,它运用挂起函数异步生成和运用值。

Flow流由三个实体组成:

  • 出产者: 用于宣布添加到流中的数据。
  • 中介:它能够修正发送到流中的值
  • 消费者 : 从流中接收值

谱写Kotlin面试指南三部曲-Flow篇

Flow builders 的几种类型?

咱们平时创立一个Flow流一般供给了4种办法

  • flowOf:它用于一组数据的创立Flow流

    flowOf(4, 2, 3, 45, 5)
           .collect {
             print(it)
           }
    ​
    
  • asFlow:这是一个扩展函数,有助于将当时类型转化为Flow流

    (1..5).asFlow()
           .collect {
             print(it)
           }
    
  • flow:这便是官方示例程序标准的创立Flow流的办法

    flow {
           (0..10).forEach {
             emit(it)
           }
         }.collect {
           print(it)
         }
    
  • channelFlow:此构建器运用构建器自身供给的发送元从来创立Flow流。

    @OptIn(ExperimentalCoroutinesApi::class)
         channelFlow {
           (0..10).forEach {
             send(it)
           }
         }.collect {
           print(it)
         }
    

    需求留意的是channelFlow这个Api仍是实验性的,所以正式项目环境中很少用到

Kotlin Flow怎样完结并发?

咱们都知道,Flow是异步的,可是搜集器collect和发射器emit是按次序的办法来作业,怎样说呢?便是当搜集器开端搜集数据,然后发射器发射出数据,直到没有发射器停止。经常有人说,它便是用同步的代码做异步的事情,这个总结挺到位的。当然,为了完结Flow并发,咱们必须从运用单个协程转变为运用多个协程。

Flow对此供给了一些特别的并发运算符,比如说buffer() ,简单运用一个比如

fun main() = runBlocking {
   val time = measureTimeMillis {
     flow {
       for (i in 1..5) {
         delay(100)
         emit(i)
       }
     }
     .buffer()
     .collect { value ->
       delay(300)
       println(value)
     }
   }
   println("Collected in $time ms")
}

在上述比如中,一切的 delay 所花费的时刻是2000ms,当然这是在没有并发操作的状况下,但是经过buffer操作符并发地履行emit,再次序地履行collect函数后,所花费的时刻在1700ms左右,比预期的花费时刻更高效。

那咱们运用缓存区buffer的时分会产生什么呢?如下图所示

谱写Kotlin面试指南三部曲-Flow篇

当搜集器处理宣布的数据时,下一个发射器开端其过程。它不会等待搜集器完结它对从前宣布的数据的处理并回来控制权。这是经过在独自的协程中运转搜集器和发射器来完结的,这与在单个协程中运转的异步次序办法不同。在内部它运用通道Channel将数据发送给接收者。

Kotlin Flow 怎样进行并行调用?

那有小伙伴说了,假如我有很多个长使命想要一起履行呢,也便是并行,Flow能不能做到呢?答案肯定是YES的,为了并行运转长使命,咱们需求运用Flow供给的组合操作符zip,它经过指定的函数将两个流调集的排放组合在一起,并依据此函数的成果为每个组合排放单个项目。

谱写Kotlin面试指南三部曲-Flow篇

让咱们经过示例代码来了解弹珠图:

val flowOne = flowOf(1, 2, 3)
val flowTwo = flowOf("A", "B", "C")
​
flowOne.zip(flowTwo) { intValue, stringValue ->
   "$intValue$stringValue"
}.collect {
   println(it)
}

输出成果如下:

1A
2B
3C

能够看到经过zip紧缩将两个流履行完之后再一起输出成果,不行明晰?那咱们再举一个Android中的真有用例:当咱们想要并行运转两个网络恳求使命并希望在两个使命完结时将两个使命的成果放在一个回调中。

假定咱们有两个长时刻答应的网络恳求使命

  • 长时刻运转的使命恳求一:

    private fun doLongRunningTaskOne(): Flow<String> {
       return flow {
         //开端恳求接口。。。。
         delay(5000)
         emit("One")
       }
    }
    ​
    
  • 长时刻运转的使命恳求二:

    private fun doLongRunningTaskTwo(): Flow<String> {
        return flow {
            //恳求另一个接口。。。
            delay(5000)
            emit("Two")
        }
    }
    
  • 现在,运用zip操作符处理

    fun startLongRunningTask() {
        viewModelScope.launch {
            doLongRunningTaskOne()
                .zip(doLongRunningTaskTwo()) { resultOne, resultTwo ->
                    return@zip resultOne + resultTwo
                }
                .flowOn(Dispatchers.Default)
                .catch { e ->
                    //反常处理
                }
                .collect {
                    //获取成果
                    println(it)
                }
        }
    }
    
  • 成果输出如下:

    OneTwo
    

    在这里,由于咱们运用了zip运算符,它并行运转两个使命,并在两个使命完结时在一个回调中给出两个使命的成果。

    经过运用zip运算符紧缩两个流调集,两个使命并行运转。当两者都完结时,咱们就会得到成果。这样咱们就一次得到了两个流调集的成果。

    Kotlin Flowzip运算符的优点:

  • 并行运转使命。

  • 当一切使命完结时,在单个回调中回来使命成果。

这样咱们就能够在 Kotlin 中运用FlowZip操作符来并行运转使命。

Kotlin Flow中的反常处理?

咱们都知道了Flow是在协程的根底进行构建的,它能够在有或没有反常的状况下完结,假如有反常的话它会直接完结Complete;在Flow中,咱们一般都运用其供给的catch操作符来进行反常的处理。

仍是以往的套路,先来举个比如,看下面的代码,告诉我,它会产生反常导致溃散么? 答案是肯定的

(1..5).asFlow().map {
            //当值为3得时分抛出反常
            check(it != 3) {
                "Value $it"
            }
            it * it
        }.onCompletion {
            Log.d(TAG, "onCompletion")
        }.collect {
            Log.d(TAG, "$it")
        }

输出成果如下:

1
4
onCompletion
Exception in thread "main" java.lang.IllegalStateException: Value 3

是的,它溃散了,间断了发射数据,Flow进入了onCompletion状况

此时咱们再加上catch操作符,它将把反常的句柄信息交给咱们,并不会产生溃散

(1..5).asFlow().map {
            check(it != 3) {
                "Value $it"
            }
            it * it
        }.onCompletion {
            Log.d(TAG, "onCompletion")
        }.catch { e ->
            Log.e(TAG, "Caught is $e")
        }.collect {
            Log.d(TAG, "$it")
        }

成果将是:

1
4
onCompletion
Caught java.lang.IllegalStateException: Value 3

它记载了反常信息,能够看到,无论加不加catch操作符,Flow发射数据都会间断,而且直接进入OnComplete状况,这和协程的反常处理基本是一致的,原来如此,差点忘了,Flow是以协程的根底构建,归于包含联系。

如此,咱们知道了Flow是怎样处理反常的,咱们能够将catch应用于单个流并在呈现任何过错时宣布数据

假如我有一个长时刻使命有反常的使命,Flow怎样进行重试?

Rxjava一样,在进行使命重试的时分,Kotlin Flow供给了相应的操作符,它们别离是

  • retrywhen
  • retry

在大多数状况下,这两个操作符能够互换运用,它们自身并没有什么太大的差异,下面咱们别离来看看吧

retrywhen

先用力的撇一眼Kotlin Flow源码中retryWhen的界说

fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>

咱们先开端运用这个操作符

.retryWhen { cause, attempt ->
}

能够看到它回来了两个参数,它们别离是

  • causeThrowable一切过错和反常的基类。
  • attemptattempt是代表当时测验的次数,它从零开端

这样以来,当咱们在使命中呈现了反常,咱们将收到当时反常和测验次数,这个时分就需求咱们依据需求条件进行判别是否进行重试了,只需满意了必定的条件才能够重试,如此咱们能够这么做

.retryWhen { cause, attempt ->
    if (cause is IOException && attempt < 3) {
        delay(2000)
        return@retryWhen true
    } else {
        return@retryWhen false
    }
}
retry

仍是老规矩,咱们再用力瞥一眼retry的源码界说

fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

只用关心一点,它实践上是在retryWhen的根底上调用,这也是为什么这两个操作符能够相互替换

retry函数是默许参数的,假如不设置次数,它将运用Long.MAX_VALUE,这样以来它将会不断重试,直到使命完结停止。所以咱们能够这么写:

.retry(retries = 3) { cause ->
    if (cause is IOException) {
        delay(2000)
        return@retry true
    } else {
        return@retry false
    }
}

说说Cold Flow冷流 和Hot Flow暖流?

说起Flow能够分为冷流和暖流,结合官网和网络上的一些解释,其实别离能够归纳成一句话

  • 冷流便是不消费,不出产,屡次消费,屡次出产,1对1联系

  • 暖流便是只需创立,有无消费,都出产,一对多联系

    你说什么,不是特别明晰,那笔者用一个表来展现冷流和暖流的差异吧

冷流 暖流
它仅在有搜集器的时分才有数据 即使没有搜集器,它也会宣布数据
它不存储数据 它能够存储数据
它不能有多个搜集器 它答应有多个搜集器

Cold Flow中,在多个搜集器的状况下,完好的流程将从每个搜集器的开端而开端,履行使命并将值发送到它们相应的搜集器。这就像1对1的映射。1个搜集器的1个出产流量。这意味着冷流不能有多个搜集器,由于它将为每个搜集器创立一个新流。

Hot Flow中,在多个搜集器的状况下,流将持续宣布值,搜集器从它们开端搜集的地方获取值。这就像 1 对 N 映射,N 个搜集器对应1个出产流量,这意味着一个暖流能够有多个搜集器。

如此咱们写些伪代码来辅佐说明下吧:

  • 冷流示例

假定咱们有一个Cold Flow,它以 1 秒的距离宣布 1 到 5

fun getNumbersColdFlow():ColdFlow<Int> {
    return someColdFlow {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }
}

咱们创立两个搜集器进行搜集:

val numbersColdFlow = getNumbersColdFlow()
numbersColdFlow
    .collect {
        println("1st Collector: $it")
    }
delay(2500)
numbersColdFlow
    .collect {
        println("2nd Collector: $it")
    }

那么输出将是:

1st Collector: 1
1st Collector: 2
1st Collector: 3
1st Collector: 4
1st Collector: 5
2nd Collector: 1
2nd Collector: 2
2nd Collector: 3
2nd Collector: 4
2nd Collector: 5

咱们这两个搜集器从一开端就会得到一切的值,会自始至终悉数一点不落的拿到

  • 暖流示例

假定咱们有一个Hot Flow,它相同以1秒的距离宣布1到5

fun getNumbersHotFlow(): HotFlow<Int> {
    return someHotflow {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }
}

现在咱们持续创立两个搜集器进行搜集:

val numbersHotFlow = getNumbersHotFlow()
numbersHotFlow
    .collect {
        println("1st Collector: $it")
    }
delay(2500)
numbersHotFlow
    .collect {
        println("2nd Collector: $it")
    }

输出将是

1st Collector: 1
1st Collector: 2
1st Collector: 3
1st Collector: 4
1st Collector: 5
2nd Collector: 3
2nd Collector: 4
2nd Collector: 5

能够看到,第一个搜集器将会接收到一切的值,由于第二个搜集器将会2500毫秒之后开端搜集,所以它只会搜集那些2500毫秒之后的值。

StateFlow 和 SharedFlow 之间的差异?

咱们现已知道什么是冷流和暖流了,而StateFlowSharedFlow都归于暖流,下面笔者列出一张表来说明StateFlowSharedFlow之间的差异

StateFlow SharedFlow
暖流 暖流
需求一个初始值并在搜集器开端搜集时当即宣布它 不需求初始值,因而默许状况下不宣布任何值
写法:val stateFlow = MutableStateFlow(0) 写法:val sharedFlow = MutableSharedFlow()
仅宣布终究一个已知值 能够装备为运用重播运算符宣布许多以前的值
它有value特点,能够查看当时值,保存了一个值的历史记载,咱们能够直接获取而无需搜集 它没有 value 特点
它不会宣布接连的重复值。只需当它与前一项不一起,它才会宣布值 它宣布一切值而且不关心与前一项的差异,它还宣布接连的重复值
类似于 LiveData,除了 Android 组件的生命周期感知。咱们应该运用带有 StateFlow 的 repeatOnLifecycle 规模来为其添加生命周期感知,然后它就会变得与 LiveData 彻底一样 和LiveData不同

咱们别离举个相应的比如看看:

  • StateFlow示例

    假定咱们去记载小明的一次数学考试成果, 一开端的成果是个鸭蛋,代码如下:

    val gradeStateFlow = MutableStateFlow(0)
    

    这个时分教师再去搜集这个成果

    gradeStateFlow.collect {
        println(it)
    }
    

    一旦开端搜集了,那么咱们就会得到,原来小明考了个鸭蛋,回去应该得挨骂了

    0
    

    这是由于小明的初始分数便是0,它会当即宣布

    后来教师发现原来自己算错分了,小明的分数原来是10分,所以就持续设置这个值,但又不小心输了两遍进去

    gradeStateFlow.value = 10
    gradeStateFlow.value = 10
    

    此时终究只会得到一个值,那便是10分,由于它不会宣布接连的重复值

    10
    

    小明回到家了,妈妈问他成果怎样样,小明怕被打没有告诉她,所以妈妈就亲自打电话问了教师,这个时分,咱们就创立了一个新的搜集器,用来发送小明的终究成果

    gradeStateFlow.collect {
        println(it)
    }
    

    这个时分妈妈现已知道了小明终究只考了10分,免不了一顿叱骂

    10
    

    由于StateFlow存储终究一个值并在新的搜集器中开端搜集时会当即宣布它

  • SharedFlow示例

第二天,小明的妈妈让小明去买些生果回来,于是乎,小明来到了楼下的生果店,开端选些生果并放入自己的购物篮中

 ```kotlin
 val fruitSharedFlow = MutableSharedFlow<String>()
 fruitSharedFlow.collect {
     println(it)
 }
 ```

这个时分是不会得到任何东西的,小明还没开端购买生果呢,这个时分他拿了1斤苹果,1斤香蕉和1斤葡萄,后边发现葡萄不行又拿了1斤

```kotlin
fruitSharedFlow.emit("1斤苹果")
fruitSharedFlow.emit("1斤香蕉")
fruitSharedFlow.emit("1斤葡萄")
fruitSharedFlow.emit("1斤葡萄")
```

终究得到的成果便是

1斤苹果
1斤香蕉
1斤葡萄
1斤葡萄

能够看到咱们能够得到接连的重复值,现在小明买完回到家,将购物篮的生果都交给了妈妈,这时分小明再看购物篮的时分,里面现已空空如也了,他不会得到任何成果,由于SharedFlow不存储终究一个值

fruitSharedFlow.collect {
    println(it)
}

现在经过两个实践中的比如咱们就能够了解下面几点

  • StateFlowSharedFlow的一种,StateFlowSharedFlow的特化

  • StateFlow 实质是一个具有固定 replay = 1 的SharedFlow,并增加了一些内容。这意味着新搜集器在开端搜集后将当即取得当时状况初始值。怎样说呢,咱们仍是用一段伪代码看看吧

    StateFlow = SharedFlow
                .withInitialValue(initialValue)
                .replay(count=1)
                .distinctUntilChanged()
    

    实践开发中,咱们是能够运用SharedFlow获取StateFlow行为

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    sharedFlow.emit(0) // initial value
    val stateFlow = sharedFlow.distinctUntilChanged()
    

    事实上,这是让咱们能够自界说归于咱们的StateFlow,比如说需求存储终究的两个值,咱们就能够依据咱们的用例履行replay = 2

什么时分需求用StateFlow/SharedFlow?

运用StateFlow的状况

仍是运用实践的比如来说明,咱们有一个用例,便是经过网络从后台用户信息接口获取用户列表并将其显现再UI中

现在咱们的ViewModel中有一个StateFlow

val usersStateFlow = MutableStateFlow<UiState<List<User>>>(UiState.Loading)

Activity中创立搜集器

usersStateFlow.collect {
​
}

现在,只需咱们进入到Activity中,就会开端订阅搜集,将搜集以下内容:

usersStateFlow:作为StateFlow加载状况选用初始值并当即宣布

当咱们的viewModel从网络中获取数据时,它将数据设置到 usersStateFlow。

usersStateFlow.value = UiState.Success(usersFromNetwork)

咱们的Activity中的搜集器将获取用户的数据并将其显现在 UI 中

试想一下,假如方向改动了呢,ViewModel将被保存,但咱们在 Activity 中的搜集器将从头订阅搜集,由于StateFlow保存终究一个值,咱们能够直接获取之前从网络设置的用户列表,这个时分优势在于我不需求在从头恳求网络接口了

这个时分,咱们测验运用SharedFlow代替StateFlow看看

val usersSharedFlow = MutableSharedFlow<UiState<List<User>>>()
usersSharedFlow.collect {
​
}

现在,只需咱们翻开Activity,Activity就会订阅搜集。由于运用了SharedFlow,因而不会在此处搜集任何内容。

当咱们的 ViewModel从网络中获取数据时。它将数据设置为usersSharedFlow

usersSharedFlow.emit(UiState.Success(usersFromNetwork))

此时由于方向改动,ViewModel将被保存,咱们在Activity中的搜集器将从头订阅搜集。由于运用不存储任何数据的SharedFlow,因而不会在此处搜集任何内容。咱们将不得不进行新的网络恳求。

所以在这种状况下咱们应该运用StateFlow而不是SharedFlow,为了避免不必要的频繁网络恳求调用

运用SharedFlow的状况

假定咱们现在履行一项使命,假如该使命失利就必须显现SnackBar提示

在咱们的ViewModel中创立一个SharedFlow

val showSnackbarSharedFlow = MutableSharedFlow<Boolean>()

相同的,咱们的Activity中有一个搜集器

showSnackbarSharedFlow.collect {
​
}

现在,只需咱们翻开Activity,Activity就会订阅搜集。由于运用了 SharedFlow,因而不会在此处搜集任何内容

然后,当咱们的viewModel发动使命并失利的时分,它会将showSnackbarSharedFlows设置为true,表示咱们需求显现Snackbar

showSnackbarSharedFlow.emit(true)

假如方向改动,ViewModel将被保存,咱们在 Activity 中的搜集器将从头订阅搜集。此处不会搜集任何内容,由于SharedFlow不保存终究一个值。那很好。咱们不该该在方向改动时再次显现Snackbar

此时咱们测验运用StateFlow代替SharedFlow,可是假如方向改动了,ViewModel将被保存,咱们在 Activity 中的搜集器将从头订阅搜集,由于StateFlow保存终究一个值。它将再次显现 Snackbar,那这样就欠好,咱们不该该在方向改动时再次显现Snackbar

所以,在这种状况下,咱们应该运用SharedFlow而不是StateFlow

具体什么时分运用StateFlow,什么时分ShardFlow,还请大家结合自身项目和需求的特点,挑选适宜的即可

参考资料

  • Kotlin Flow面试笔记
  • Kotlin协程之一文看懂StateFlow和SharedFlow
  • Kotlin Flow学习终极攻略