前言

Flow是谷歌官方供给的一套根据kotlin协程呼应式编程模型,与咱们熟知的RxJava运用起来类似,但相比较起来,Flow运用起来愈加简略,并且Flow是效果在协程内,能够跟协程的生命周期绑定在一起,线程切换起来也比RxJava灵活,所以,咱们学习Flow也跟学习RxJava相同,先从操作符开端学

操作符

创立单个Flow

runBlocking {
    flow {
        emit(System.currentTimeMillis())
        delay(1000)
        emit(System.currentTimeMillis())
        delay(1000)
        emit(System.currentTimeMillis())
    }.collect {
        println(it)
    }
}

collect是一个终端操作符,用来上流传送过来的数据,这儿创立了一个Flow,每距离一秒发送一个时刻戳,然后在终端将时刻戳打印出来,咱们得到成果

 I  1672563578934
 I  1672563579935
 I  1672563580938

创立多个Flow

runBlocking {
    flowOf(
        "小明阳了",
        "小强阳了",
        "小红阳了"
    ).collect{
        println(it)
    }
}

咱们运用flowOf来发送一组Flow,差异于创立单个flow,这儿不需求调用emit这样的发送函数,只需调用了终端操作符collect,flowOf里边的数据就都会发送出来,但也不能在数据之间做其他操作,比方调用delay函数,咱们运转一下得到成果如下

 I  小明阳了
 I  小强阳了
 I  小红阳了

集合转化为Flow

runBlocking {
    listOf("abc","123","666").asFlow()
        .collect{
            println(it)
        }
}

咱们运用asFlow操作符,来将一个集合转化为一组Flow并发送,上述运转成果为

 I  abc
 I  123
 I  666

在回调办法中发送Flow

平常开发中咱们经常会遇到回调函数,比方点击个按钮在点击事情中做些操作,或许监听一个输入框,监听用户输入的内容并把内容输出,在这些案例中,咱们发现假如继续用上文学到的Flow创立办法,然后在回调办法中调用emit发送函数,会呈现一段提示

Kotlin系列之认识一下Flow
意思是挂起函数有必要要在一个协程效果域里边才干调用,那遇到这个状况咱们该怎样做呢?咱们运用另一种创立办法,创立一个callbackFlow,咱们将上述比方改一下变成如下姿态

GlobalScope.launch(Dispatchers.Main) {
    callbackFlow {
        bindingView.callbackEdit.addTextChangedListener(object : TextWatcher {
            override fun beforeTextChanged(p0: CharSequence?, p1: Int, p2: Int, p3: Int) {}
            override fun onTextChanged(p0: CharSequence?, p1: Int, p2: Int, p3: Int) {}
            override fun afterTextChanged(p0: Editable?) {
                trySend(p0)
            }
        })
        awaitClose { }
    }.collect{
        println(it)
    }
}

咱们看到将创立Flow的办法从flow{}变成callbackFlow,内部发送函数从emit改成trySend,然后注意的是,运用callbackFlow有必要要在终究调用awaitClose{}封闭资源,否则程序会闪退,咱们运转一遍上述程序,出来这样一个界面

Kotlin系列之认识一下Flow
咱们在编辑框中顺次输入a,b,c

Kotlin系列之认识一下Flow
咱们看下操控台打印出来的信息

 I  a
 I  ab
 I  abc

能够看到跟每次输入完今后的内容共同

终端操作符collectLatest

这个操作符跟之前遇到的collect相同,也是终端操作符,差异在于它承受最新一条数据,假如新的数据来的时分,上一条数据未处理完,则撤销上一条数据,是处理背压现象的一种操作符,咱们用代码举个比方

runBlocking {
    var start = System.currentTimeMillis()
    flow {
        emit(1)
        delay(1000)
        emit(2)
        delay(500)
        emit(3)
        delay(600)
        emit(4)
    }.collectLatest {
        delay(1000)
        var end = System.currentTimeMillis()
        println("经过${end - start},收到$it")
    }
}

这儿上游先发送1,然后等候1秒再发送2,再等候0.5秒,然后发送个3,等候0.6秒,再发送4,下流收到数据今后,会先等候1秒,然后再处理数据,咱们经过collectLatest特性不难推算出,发送1今后,下流等候1秒去处理数据1,这个时分,上游也过了1秒,然后发送数据2,在下流等候1秒的进程中上游过了0.5秒又发送了数据3,所以数据2被撤销,发送数据3今后,在下流经过1秒的等候时刻,上游过了0.6秒又发送了数据4,所以数据3相同也撤销,数据4能够被处理,所以只要1跟4能够被下流承受到并处理掉,咱们运转一遍看下成果是不是也相同

 I  经过1002,收到1
 I  经过3107,收到4

咱们看到的确是收到1跟4,并且收到4的时分,总共也就过去了3秒,说明数据2跟3在下流的处理操作是被撤销掉的,并没有把他们delay的时刻算上去

map操作符

这个操作符咱们应该都很了解,无论是在RxJava仍是kotlin自带的操作符中都有遇到过,效果便是改动上游数据,咱们举个比方看看

runBlocking {
    flowOf(
        "小红喝可乐","小强喝可乐","小明喝可乐"
    ).map {
        var exp = it
        if(it.startsWith("小强")){
            exp = "小强喝咖啡"
        }
        exp
    }.collect{
        println(it)
    }
}

这儿上游发送三条数据,别离是小红喝可乐,小强喝可乐,小明喝可乐,然后经过map操作符的时分,将小强的数据改成小强喝咖啡,咱们看下终究打印成果

 I  小红喝可乐
 I  小强喝咖啡
 I  小明喝可乐

能够看到小强那段数据改动了,map操作符在实践开发进程中,有一个很常见的场景能够运用到,那便是服务端下发的数据模型,与咱们前端需求展现的数据模型不共同,那就需求map在中心做一层转换,变成咱们需求的数据模型。

filter操作符

filter操作符是将不满足条件的数据过滤出去,只保留契合条件的数据,咱们将map的比方改一下

runBlocking {
    flowOf(
        "小红喝可乐","小强喝可乐","小明喝可乐"
    ).filter {
        it.startsWith("小强")
    }.collect{
        println(it)
    }
}

上述比方改成,只输出小强的数据,其他数据都过滤掉,咱们运转下看看成果

 I  小强喝可乐

成果也跟咱们预期的共同,filter操作符在实践开发中可用于将一些服务端下发的数据中,不需求展现给用户的数据过滤掉,比方产品过期,下架等状况

drop操作符

这个操作符入参是一个整数,表明丢掉前n个数据,咱们用一段代码演示下

runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.drop(2)
        .collect {
            println(it)
        }
}

这儿发送1至5五个数据,drop入参2,表明丢掉前2个数据,那么很明显终究输出的是3,4,5,咱们运转下看看成果

 I  3
 I  4
 I  5

成果跟咱们预期的共同

dropWhile操作符

这个操作符的效果是,找到第一个不契合条件的值,回来其后边一切的值,若第一个不满足,回来悉数,咱们相同将上述比方改一下

runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.dropWhile {
        it < 4
    }.collect {
        println(it)
    }
}

这儿表明,找到第一个满足条件<4的数据,回来其以及之后的一切数据,咱们看下运转成果

 I  4
 I  5

明显只输出4和5,dropWhile在实践开发当中可用在如下场景,比方回来当月一切订单数据,然后要求每一天看到的数据都是从当天开端往后的数据,那么dropWhile就要将当天之前的数据都扫除过滤掉

take操作符

take跟drop相同,也承受一个整型数据,差异是take是只获取前n个数据,咱们用一段代码演示一下

runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.take(3).collect {
        println(it)
    }
}

相同也是发送五个数据,咱们在下流用take操作符截取三个数据,那么打印出来的应该是1,2,3,运转下看看成果如何

 I  1
 I  2
 I  3

公然成果便是1,2,3,take操作符在实践开发场景中用途也很广泛,常见的是一个数据列表,在首页只展现这个列表的前几个数据,点击更多跳到下一页才把剩下数据展现出来,这儿前几个数据就能够用take去截取展现

takeWhile操作符

takewhile跟dropWhile用法相同,也是在办法体中参加判别条件,差异在于,takeWhile是找第一个不契合条件的值,取前面的值,若第一个就不契合,回来空,咱们用一段代码演示一下

runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.takeWhile { it < 4 }.collect {
        println(it)
    }
}

相同也是发送五个数据,咱们在takewhile中参加条件是小于4的值,那么咱们就看第一个不契合这个条件的值是哪个,明显是4,那么回来的自然便是1,2,3这三个数据,咱们运转下代码看看成果如何

 I  1
 I  2
 I  3

成果也果然如此,takewhile在实践开发进程中的运用场景跟dropWhile的场景比较类似,相同也是获取一个月的订单数据,然后有一个页面需求展现今日之前的历史订单,那么takeWhile就能够把今日之前的历史订单数据给过滤出来

回调操作符onStart,onEach,onCompletion

onStart操作符是在一段数据操作之前调用的办法,onEach是在数据操作进程中调用的办法,onCompletion是一段数据操作结束今后调用的办法,乍一看跟咱们平常的网络恳求的回调很类似,咱们用一段代码演示一下这三个操作符的特性

runBlocking {
    flowOf(
        "小明学语文",
        "小强学数学",
        "小红学物理"
    ).onStart {
        println("开端。。。")
    }.onEach {
        println("改动前。。${it}")
    }.map {
        var exp = it
        if (exp.startsWith("小红")) {
            exp = "在校园,${exp}"
        }
        exp
    }.onCompletion {
        println("结束。。。")
    }.collect {
        println("改动后。。${it}")
    }
}

咱们这儿发送三个数据,”小明学语文”,”小强学数学”,”小红学物理”,然后在中心对小红那条数据,咱们在前面加上”在校园”三个字,不是小红的就不加,这个进程中会在map前加上onEach办法把改动前的数据打印出来,然后在collect中把改动后的数据打印出来,这个进程的开端与结束会别离调用onStart跟onCompletion办法来加上标识符,咱们运转下这段代码看下成果如何

 I  开端。。。
 I  改动前。。小明学语文
 I  改动后。。小明学语文
 I  改动前。。小强学数学
 I  改动后。。小强学数学
 I  改动前。。小红学物理
 I  改动后。。在校园,小红学物理
 I  结束。。。

咱们看到,很明晰的将这个数据改动进程给打印出来了,回调操作符想必咱们也都清楚了,最常用的场景便是在咱们的网络框架中,在恳求的开端,恳求中,以及恳求结束别离做对应的操作,比方加载框的初始化,弹出,以及加载框的封闭

debounce操作符

这个操作符是用来处理数据发送过于频繁而简化上游数据,debounce会传入一个时刻,假如上游数据之间发送的时刻距离小于debounce设置的时刻,那么会撤销最先发送出来的数据,咱们用代码演示一下

runBlocking {
    flow {
        emit(1)
        delay(300)
        emit(2)
        delay(800)
        emit(3)
        emit(4)
        delay(300)
        emit(5)
    }.debounce(600).collect {
        println(it)
    }
}

相同发送五个数据,咱们在1跟2之间间隙300毫秒,在2跟3之间间隙800毫秒,4跟5之间间隙300毫秒,debounce设置600毫秒,意思便是假如数据之间发送距离小于600毫秒的话,那么就撤销发送第一条数据,咱们运转一下看看成果

 I  2
 I  5

咱们看到由于1跟2之间是300毫秒距离,小于600毫秒,所以撤销发送数据1,数据2跟3之间距离是800毫秒,大于600毫秒,所以数据2成功发送,3跟4之间没有时刻距离,所以3也撤销发送,4跟5之间是300毫秒,也小于600毫秒,所以4也撤销发送,剩下数据5由于后边没有数据了,所以数据5也是成功发送,终究咱们看到的成果便是收到了2跟5

这个操作符在咱们查找框里边输入东西查询成果时分很常用,由于咱们打字的速度都比较快,假如每输入一个字就去查询成果,那么等成果出来今后,输入框里边现已不是之前输入的内容了,所以正确的做法也是当输入进程中呈现间隙的时分,再将输入框中的内容拿去恳求成果,这样不管是在流量仍是性能上都是最佳的做法

采样操作符sample

sample操作符会经过一段时刻去上游去取数据,专门针对上游数据量比较庞大,然后并不是每一条数据都比较重要的状况,咱们用一段代码演示一遍

runBlocking {
    flow {
        var num = 1
        while (num < 20){
            emit(num)
            delay(500)
            num++
        }
    }.sample(1000).collect{
        println(it)
    }
}

这儿每距离500毫秒会发送一条数据,从1开端,逐一累加到20,然后咱们采样操作符设定了每过1秒取一次数据,咱们看看终究打印出来的成果如何

 I  2
 I  4
 I  6
 I  8
 I  10
 I  12
 I  14
 I  16
 I  18

能够看到只输出了刚好距离为1秒的数据,其他数据就直接被过滤掉了,sample操作符运用的场景是数据量比较庞大,但并不需求展现一切的状况,比方视频弹幕,每分钟或许弹幕有很多条,可是显现在屏幕上的弹幕就只要几条,终究假如都显现在屏幕上咱们就没必要去看视频了

规约操作符reduce

reduce操作符是具有累加的功用,它跟咱们之前遇到的collect不相同,它回来的是一个累加的成果,咱们用一段代码演示一下这个操作符的特性

runBlocking {
    var result = listOf("he", "llo", ",w", "or", "ld").asFlow()
        .reduce{a,b -> a+b}
    println("输出成果:$result")
}

代码中咱们看到,有一组字符串的数组,它们转换成flow之后进行reduce操作,意思便是对字符串逐一累加,reduce里边的a跟b第一次表明累加的初始两个值,后边则一个表明之前累加的成果,另一个表明接下去需求累加的值,咱们运转一遍上述代码得到成果如下

 I  输出成果:hello,world

规约操作符fold

这个操作符与之前讲的reduce操作符用法基本共同,差异在于fold承受一个初始值,咱们将上述代码更改一下,用fold完成一遍

runBlocking {
    var result = listOf("he", "llo", ",w", "or", "ld").asFlow()
        .fold("Testing..."){a,b -> a+b}
    println("输出成果:$result")
}

咱们在之前输出的成果前面加上了Testing…字样,运转一遍代码得到成果如下

I  输出成果:Testing...hello,world

flatMapConcat

之前咱们讲的都是只操作一个Flow,接下去咱们要讲操作两个或多个Flow,首要咱们先看下flatMapConcat操作符,这个操作符咱们解释一下便是多个Flow依照先后次序串行履行,咱们用一段代码演示一下

runBlocking {
    flowOf(
        "小明",
        "小强",
        "小红"
    ).flatMapConcat {
        val start = System.currentTimeMillis()
        flow {
            when (it) {
                "小明" -> delay(3000)
                "小强" -> delay(2000)
                "小红" -> delay(1000)
            }
            var dis = System.currentTimeMillis() - start
            emit("用时$dis,${it}去上学")
        }
    }.collect {
        println(it)
    }
}

这儿第一个流顺次发送小明,小强,小红,第二个流在小明到来时分中止3秒,小强中止两秒,小红中止1秒,终究打印出来看下它们的履行顺讯以及用时多少

 I  用时3001,小明去上学
 I  用时2000,小强去上学
 I  用时1001,小红去上学

能够看到仍是依照第一个流的发送次序顺次履行,就算小红花的时刻最少,也要等小明跟小强履行结束今后在去履行,这个便是flatMapConcat的特性,适合用在前后使命有强烈依靠联系的状况下,比方获取一个用户信息,有必要拿到这个用户的id,这个id有必要是登录之后才干拿到,所以有必要是先调用完登录接口之后拿到id再去调用用户信息接口,次序不能错

flatMapMerge

这个操作符与flatMapConcat类似,也是多个flow一起履行,差异在于flatMapMerge没有次序要求,哪个先履行完哪个就先输出,咱们将上述比方改一下

runBlocking {
    flowOf(
        "小明",
        "小强",
        "小红"
    ).flatMapMerge {
        val start = System.currentTimeMillis()
        flow {
            when (it) {
                "小明" -> delay(3000)
                "小强" -> delay(2000)
                "小红" -> delay(1000)
            }
            var dis = System.currentTimeMillis() - start
            emit("用时$dis,${it}去上学")
        }
    }.collect {
        println(it)
    }
}

改动不大,只是是将flatMapConcat的方位改成flatMapMerge,咱们看下终究成果有什么差异

 I  用时1001,小红去上学
 I  用时2001,小强去上学
 I  用时3001,小明去上学

能够看到每个数据的终究用时没有什么差异,可是在履行次序上,小红由于用时最少先履行完,所以第一个输出的便是小红,其次是小强,小明由于用时最多,所以是终究一个输出

flatMapLatest

这个操作符就跟collectLatest相同,便是获取最新的数据,这边便是从第一个流传输到第二个流的时分,假如第一个流里边又有新的数据来了,而之前的流的数据并没有处理完,那么就撤销之前流的数据的操作,履行新的流的数据,咱们用一段代码演示一下

runBlocking {
    flow{
        emit("小强")
        delay(1000)
        emit("小明")
        delay(400)
        emit("小红")
        delay(600)
        emit("小李")
    }.flatMapLatest {
        flow {
            delay(1000)
            emit("${it}从校园回来")
        }
    }.collect{
        println(it)
    }
}

这边第一个流每过一段时刻就会发送一条数据,在第二个流里边,先中止1秒,然后拼接一个字符串在输出,终究打印出成果,咱们看下代码的终究运转成果

 I  小强从校园回来
 I  小李从校园回来

根据flatMapLatest的特性,小强发送出去今后在第二个流里边中止1秒钟在操作,此刻第一个流也过去了1秒才有新的数据发送出来,所以小强是能够输出的,小明在发送出去的时分,只过去了0.4秒又发送了小红,所以小明在第二个流里边并没有得到处理就被撤销了,而小红在发送出去之后,只是过了0.6秒又发送了一个小李,所以小红也没有得到处理就被撤销掉了,终究小李后边由于没有其他数据了,小李终究也被输出

zip

之前讲的都是从一个流传到另一个流,两个流之间是串行运转的,而zip操作符则有点差异,它是将两个流并行运转,一个流里边的数据履行结束,会等候另一个流里边的数据,等都履行结束今后才会输出,咱们举个比方来看下

runBlocking {
    flow {
        emit("a1发送时刻${System.currentTimeMillis()}\n")
        delay(1000)
        emit("a2发送时刻${System.currentTimeMillis()}\n")
        delay(1000)
        emit("a3发送时刻${System.currentTimeMillis()}\n")
    }.zip(flow {
        emit("b1发送时刻${System.currentTimeMillis()}")
        delay(3000)
        emit("b2发送时刻${System.currentTimeMillis()}")
        delay(1000)
        emit("b3发送时刻${System.currentTimeMillis()}")
        delay(1000)
        emit("b4发送时刻${System.currentTimeMillis()}")
    }) { a, b ->
        a + b
    }.collect {
        println("${it}\n-------------------")
    }
}

直接运转看下成果

 I  a1发送时刻1672713340429
 I  b1发送时刻1672713340429
 I  -------------------
 I  a2发送时刻1672713341431
 I  b2发送时刻1672713343430
 I  -------------------
 I  a3发送时刻1672713344432
 I  b3发送时刻1672713344431
 I  -------------------

咱们发现a1跟b1是先被一起输出,接着a2过了1秒今后也被发送出去,但没有被输出,a2在等候b2履行结束,b2在等候3秒今后也被发送出去,这个时分a2与b2才一起被输出,紧接着过了1秒,a3与b3也被输出,此刻由于第一个流里边使命现已都被履行完了,所以b4是不会被输出。zip操作符咱们能够用在,当一个页面有多个接口时分,咱们想要获取多个接口履行结束的回调,来履行一些比方重置改写或许封闭加载框的动作,咱们就能够运用这个操作符来完成

buffer

之前咱们讲了一个collectLatest,它是用来处理背压现象的一种操作符,接下去咱们来讲别的两个处理背压现象的操作符,其间一个便是buffer,这个操作符从字面上就能看出,它是启缓冲效果,如何缓冲呢?它是将上游流速过快,导致下流没来得及处理的一些数据,放在一个缓存堆里边,下流直接从缓存堆里边拿数据,不受上游的流速影响,咱们用代码来加深一下对这个操作符的理解

runBlocking {
    var start = System.currentTimeMillis()
    flow {
        var value = 1
        while (value < 10) {
            emit(value)
            delay(500)
            value++
        }
    }.collect {
        delay(1000)
        var end = System.currentTimeMillis()
        println("收到上游数据--$it--用时${end - start}")
    }
}

这儿是一个很简略的上游发送数据,下流承受数据的比方,咱们看到上游每发送一个数据都要中止0.5秒,而下流在接纳到数据后也中止1秒才去处理数据,其实从设计上来讲,咱们不关怀上游如何发送数据,咱们只想要每一秒钟将数据打印出来,然而咱们看下终究成果是不是咱们预期的那样呢

 I  收到上游数据--1--用时1002
 I  收到上游数据--2--用时2504
 I  收到上游数据--3--用时4005
 I  收到上游数据--4--用时5506
 I  收到上游数据--5--用时7008
 I  收到上游数据--6--用时8515
 I  收到上游数据--7--用时10026
 I  收到上游数据--8--用时11530
 I  收到上游数据--9--用时13034

咱们看到了,每一个数据用时都要超越1秒,明显是受到上游流速影响了,那咱们该怎样修改呢,这个时分buffer的效果就体现出来了,咱们把buffer参加到上面的比方中去试试

runBlocking {
    var start = System.currentTimeMillis()
    flow {
        var value = 1
        while (value < 10) {
            emit(value)
            delay(500)
            value++
        }
    }.buffer().collect {
        delay(1000)
        var end = System.currentTimeMillis()
        println("收到上游数据--$it--用时${end - start}")
    }
}

改动不大,便是在上下流之间参加了buffer操作符,效果便是将上游的数据先缓存起来,等下流数据处理完今后,直接从缓存中拿数据,咱们来看下现在运转的成果如何

 I  收到上游数据--1--用时1002
 I  收到上游数据--2--用时2003
 I  收到上游数据--3--用时3003
 I  收到上游数据--4--用时4005
 I  收到上游数据--5--用时5005
 I  收到上游数据--6--用时6006
 I  收到上游数据--7--用时7007
 I  收到上游数据--8--用时8010
 I  收到上游数据--9--用时9014

能够看到,现在的成果才是咱们想要的,每一条数据都是距离1秒才被打印出来

conflate

咱们讲了两个处理背压现象的操作符,现在来讲终究一个,在刚刚的比方中,咱们有没有注意到,咱们每过一秒去缓存里边拿的数据,其完成已不是最新数据了,在一些券商的app中,制作k线图的时分,行情数据就算下发的再快,咱们界面上守时改写制作的数据永远都是最新的,用户并不关怀你几秒前的行情数据终究多少,他们只关怀最新的数据,所以咱们在关于处理上游流速过快的问题上,也只需求关怀最新数据即可,这个时分咱们就要用到另一个操作符conflate,相同咱们将之前的比方改一下

runBlocking {
    val start = System.currentTimeMillis()
    flow {
        var value = 0
        while (value < 20) {
            emit(value)
            delay(500)
            value++
        }
    }.conflate().collect {
        delay(1000)
        var end = System.currentTimeMillis()
        println("收到上流数据--$it--用时${end - start}")
    }
}

这儿咱们将上游发送的数据扩增10个,在下流获取数据之前,咱们加上了conflate操作符,意思是每秒钟只取最新的数据,咱们运转一遍看看成果

 I  收到上流数据--0--用时1002
 I  收到上流数据--1--用时2004
 I  收到上流数据--3--用时3004
 I  收到上流数据--5--用时4005
 I  收到上流数据--7--用时5006
 I  收到上流数据--9--用时6007
 I  收到上流数据--11--用时7010
 I  收到上流数据--13--用时8015
 I  收到上流数据--15--用时9022
 I  收到上流数据--17--用时10025
 I  收到上流数据--19--用时11031

咱们看到每秒钟输出的数据不是连贯的,由于当上一条数据处理完今后,上游又有新的数据来了,所以只会拿最新的数据而不会依照发送次序将老的数据输出,这样处理背压的三个操作符都讲完了,咱们在日常开发中能够按需运用。

冷流与暖流

冷流

  • 一般创立的flow{}都属于冷流
  • 冷流在无订阅者的状况下,不会发生数据
  • 冷流里边生产者跟订阅者属于1对1联系,同一个订阅者多次订阅的时分,有且只要一个生产者对它发送数据

暖流

  • 分为SharedFlow跟StateFlow
  • 暖流是在无订阅者的状况下,也会发生数据
  • 暖流中生产者跟订阅者是一对多联系,即同一个生产者,一起能够由多个订阅者一起订阅
  • SharedFlow可缓存数据,同一份数据可发送给多个订阅者
  • StateFlow每次只发送一条数据的SharedFlow,有初始值

SharedFlow

咱们先来看下SharedFlow哪里体现出可缓存数据,哪里体现出一份数据可发送给多个订阅者,咱们创立SharedFlow的办法有两种,一种是经过MutableSharedFlow的结构办法,另一种是经过一般Flow调用shareIn来创立,咱们首要看下shareIn办法,由于shareIn内部其实就用到了MutableSharedFlow的结构办法生成SharedFlow

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

咱们看到shareIn接纳三个参数,第一个参数便是协程的效果域,没啥说的,第二个参数是操控同享流开端与结束的战略,它总共有三个选项

  • Eagerly 同享数据立马发送,并且永不停止
  • Lazily 同享数据直到呈现第一个订阅者的时分才开端发送,并且永不停止
  • WhileSubscribed 第一个订阅者呈现时分开端发送数据,并默许在终究一个订阅者消失时分停止发送数据,数据缓存也是默许永远存在,WhileSubscribed有两个参数,stopTimeoutMillis表明终究一个订阅者消失时分停止发送数据的延迟时刻,默许为0,第二个参数为replayExpirationMillis,表明缓存数据的过期时刻,时刻一到就会重置缓存池 第三个参数replay为缓存数量,表明发送到缓存池里边的数据个数

咱们用一段代码示例来进一步讲解下SharedFlow

var num = 0
var numFlow = flow {
    num++
    emit("第${num}条数据 ${System.currentTimeMillis()}")
    emit("第${num + 1}条数据 ${System.currentTimeMillis()}")
    emit("第${num + 2}条数据 ${System.currentTimeMillis()}")
}.shareIn(lifecycleScope, SharingStarted.Eagerly, 2)
bindingView.sharedflowBtn.setOnClickListener {
    runBlocking {
        launch {
            numFlow.collect {
                println("${System.currentTimeMillis()} 订阅者1收到 $it")
            }
        }
        launch {
            numFlow.collect {
                println("${System.currentTimeMillis()} 订阅者2收到 $it")
            }
        }
    }
}

咱们创立一个一般流,流里边发送三条数据,每一条数据都有它的下标值以及发送时刻,随后调用shareIn来将这个一般流转化成SharedFlow,shareIn里边的战略参数选用Eagerly,表明马上发送,缓存数量设置为2,表明只在缓存中写入两条数据,咱们又新增一个按钮,点击时分生成两个订阅者,每个订阅者都打印出接纳到数据的时刻以及数据内容,咱们运转一遍上述代码,看看点击按钮今后的输出内容

 I  1672719702780 订阅者1收到 第2条数据 1672719685184
 I  1672719702781 订阅者1收到 第3条数据 1672719685184
 I  1672719702781 订阅者2收到 第2条数据 1672719685184
 I  1672719702781 订阅者2收到 第3条数据 1672719685184

咱们看到两个订阅者收到的数据彻底相同,表明同一份数据多个订阅者同享,打印出来的成果里边,前面的时刻略大于后边的时刻,表明数据现已在订阅者呈现之前就现已发送出来,只是保存在缓存池里边,当订阅者呈现之后,才从缓存池中拿出数据,别的,订阅者只接纳到第2,3条数据,第一条数据没有了,从而也证明了replay参数设置起了效果。咱们再将shareIn的第二个参数改成Lazily,其他的都不变,试试看运转出来的成果有什么不相同的地方

 I  1672722732294 订阅者1收到 第1条数据 1672722732293
 I  1672722732294 订阅者1收到 第2条数据 1672722732294
 I  1672722732294 订阅者1收到 第3条数据 1672722732294
 I  1672722732294 订阅者2收到 第2条数据 1672722732294
 I  1672722732294 订阅者2收到 第3条数据 1672722732294

咱们看到成果稍微有一点不相同,首要订阅者1彻底接纳到了一切三条数据,证明缓存只在数据发送一次今后才会保存进去,订阅者2也证明了这一点,其次发送数据和接纳数据的时刻彻底相同,证明了Lazily是在订阅者呈现之后才干发送数据,没有订阅者数据不会发送。

咱们现在对SharedFlow有了必定的认识了,咱们再看看StateFlow

StateFlow

StateFlow能够理解为缓存池大小为1的SharedFlow,StateFlow生成的办法也有两种,一种是MutableStateFlow的结构办法,办法参数只要一个initvalue,初始默许值,表明没有数据时分默许发送的一条数据,类似于咱们页面上的空态页,另一种办法是调用stateIn办法,来将一般Flow转换成StateFlow,stateIn的入参也有三个,前两个同shareIn相同,第三个是默许值initValue,咱们来用一段代码演示一遍

var numFlow = flow {
    emit("喜羊羊")
    emit("美羊羊")
}.stateIn(lifecycleScope, SharingStarted.Lazily, "灰太狼")
bindingView.stateflowBtn.setOnClickListener {
    runBlocking {
        launch {
            numFlow.collect {
                println("订阅者1获取值 $it")
            }
        }
        launch {
            numFlow.collect {
                println("订阅者2获取值 $it")
            }
        }
    }
}

相同也有一个一般流,连续发送两个数据别离是喜羊羊和美羊羊,然后调用stateIn办法转换成StateFlow,stateIn的战略参数选用Lazily,默许值为灰太狼,相同也有一个按钮,点击之后生成两个订阅者,都输出接纳到的内容,咱们运转一遍代码看看成果

 I  订阅者1获取值 美羊羊
 I  订阅者2获取值 美羊羊

很明显,作为缓存池大小只要1的StateFlow来说,每一次只发送一条数据,所以订阅者收到的数据是最新一条数据也便是美羊羊,那这个时分咱们就有个疑惑了,咱们设置的默许值灰太狼什么时分呈现呢?咱们是否还记得刚刚举的空态页的比方,咱们都知道一般性空态页是在恳求接口等候数据回来的进程中在屏幕上起到占位的效果,那咱们也模拟一个等候数据的进程,很简略,在emit函数之前加上delay函数,时刻咱们就定3秒,代码咱们就改成了如下姿态

var numFlow = flow {
    delay(3000)
    emit("喜羊羊")
    emit("美羊羊")
}.stateIn(lifecycleScope, SharingStarted.Lazily, "灰太狼")
bindingView.stateflowBtn.setOnClickListener {
    GlobalScope.launch(Dispatchers.Main) {
        launch {
            numFlow.collect {
                println("订阅者1获取值 $it")
            }
        }
        launch {
            numFlow.collect {
                println("订阅者2获取值 $it")
            }
        }
    }
}

从头运转一下代码后咱们点击按钮,输出的成果就变成了

 I  订阅者1获取值 灰太狼
 I  订阅者2获取值 灰太狼
 I  订阅者1获取值 美羊羊
 I  订阅者2获取值 美羊羊

在等候3秒的进程中默许值就被发送出来了,后边3秒往后真实的数据美羊羊也被发送了出来,简略的模拟了一个在恳求数据进程中从空态页到真实数据展现在屏幕上的进程

Flow与LiveData

记得Flow刚被推出来的那一段时刻,经常会看到一些文章或许谈论有提到LiveData将要被替代,那LiveData终究会不会被替代呢,咱们仍是来总结下LiveData的优缺点

优点

  1. 责任单一,首要效果便是在主线程更新UI,上手简略,可满足于现在大部分需求
  2. 现在国内还有不少项目以java为主,而Flow是kotlin言语在协程环境下运用的工具,所以LiveData关于纯java项目来说仍是必不可少的

缺点

  1. 在异步线程中运用postValue简单丢失数据
  2. LiveData会发生比方数据倒灌的问题

总结

个人感觉在今后的技能领域里边,一切新出来的技能并不是以替代谁为目的而推出来的,而是在这个日趋杂乱的事务环境中,让咱们开发者多一个技能方案的挑选,挑选合适自己以及项目的方案,比方LiveData的责任单一上手简单,Flow丰厚的操作符,以及配合协程让事务功用完成起来更简单,后边也会尝试着用Flow替代RxJava写一个网络恳求框架,也算是对Flow的一个真实实践