如何把业务代码越写越复杂?(二)| Flow 替换 LiveData 重构数据链路,更加 MVI

引子

上一篇用实战代码演绎了 Feeds 流场景下,业务层代码如何从 “无架构“MVP” 再到 “MVVN”,并使用LiveData承载整个数据链路。这一篇尝用 Kotlin Flow 替换掉 LiveDat线程池原理a,线程池的工作原理看看会发生些什么变化及遇到哪些问题。

若对 Kotl线程池参数配置in Flow 还很陌生,可先阅读下面两篇文章,这将有助于理解本文:

  1. Kotlin 异步 | Flow 应用场景及原理

  2. Kotlin 异步 | Fl复杂度怎么计算的ow 限流的应用场景及原理

业务场景是这样的:从网络拉取 Feeds 流并持久化在数据库中,以便下次启动时可先展示本地数据,待架构师和程序员的区别请求返回后再刷新 Feeds。

使用 LiveData 承架构师载该业务数据链路的架构演进可以点击我是怎么把业务代码越写越复杂的 | MVP – MVVM – Clean Architecture

现援引上一篇的解决方案:

// 实现访问网络和数据库的细节
class NewsRepository(context: Context) {
    // 使用 Retrofit 构建请求访问网络
    private val retrofit = Retrofit.Builder()
            .baseUrl("https://api.apiopen.top")
            .addConverterFactory(MoshiConverterFactory.create())
            // 将返回数据组织成 LiveData
            .addCallAdapterFactory(LiveDataCallAdapterFactory())
            .client(OkHttpClient.Builder().build())
            .build()
    private val newsApi = retrofit.create(NewsApi::class.java)
    private var executor = Executors.newSingleThreadExecutor()
    // 使用 room 访问数据库
    private var newsDatabase = NewsDatabase.getInstance(context)
    private var newsDao = newsDatabase.newsDao()
    // 用于将新闻流传递给上层的 LiveData
    private var newsLiveData = MediatorLiveData<List<News>>()
    fun fetchNewsLiveData(): LiveData<List<News>?> {
        // 从数据库获取新闻
        val localNews = newsDao.queryNews()
        // 从网络获取新闻
        val remoteNews = newsApi.fetchNewsLiveData(mapOf("page" to "1", "count" to "4"))
        .let {
            Transformations.map(it) { response: ApiResponse<NewsBean>? ->
                when (response) {
                    is ApiSuccessResponse -> {
                        val news = response.body.result
                        // 将网络新闻入库
                        news?.let {executor.submit { newsDao.insertAll(it) }}
                        news
                    }
                    else -> null
                }
            }
        }
        // 将数据库和网络响应的 LiveData 合并
        newsLiveData.addSource(localNews) {newsLiveData.value = it}
        newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
        return newsLiveData
    }
}

这是 C架构工程师lean Architecture 中的 Repository,它提供数据访问能力,隐藏了访问了架构图怎么制作网络和数据库的细节。

关于 Clean Architecture 的详细解释可以点击我是怎么把业务代码越写越复杂的 | MVP – MVVM – Clean Ar线程池的七个参数chitecture

为了使用 LiveData 承载整个数据链路,Retrofit 增加了 LiveDataCallAdapterFactory,它使得接口能直接返回 LiveData:

interface NewsApi {
    @POST("/getWangYiNews")
    fun fetchNewsLiveData(@FieldMap map:Map<String,String>):LiveData<ApiResponse<NewsBean>>
}

Room 也支持将数据库查询内容 LiveData 化:

@Dao
interface NewsDao {
    @Query("select * from news")
    fun queryNews(): LiveData<List<News>?>
}

网络 &a线程池参数配置mp; 数据库 Flow 化

数据链路 Flow 化从链路源头开始。

Room 支持以 Flow 形式返回查询结果线程池核心参数

@Dao
interface NewsDao {
    @Query("select * from news")
    fun queryNewsFlow(): Flow<List<News>?>
}

Retrofit 并未支持 Flow 形式的接口返回值,于是在 G复杂度怎么计算的itHub 上找了一遍架构工程师,有是有,但 star 数都很少,不太敢用算法复杂度。正在犹豫之际,看到了下面 retrofit 官方的回复:[Feature Request] Support a复杂度怎么计算的dapter for Kotlin Coroutine Flow Issue #3497 square/re线程池核心参数trofit (github.com)

有人提 issue 希望 retrofit 官方支持接口 Flow 化,但作者回复架构师说网络请求返回的是“一个异步结果”而不是“一串异步结果”,所以suspend就够用了。如果想要将接口 Flow 化,可以这样做:

flow {
  emit(getPosts())
}

作者接着说:“如果有机会重写 RxJava 的 call adapter,可能也不会支持线程池的使用接口 Observable 化。”

醍醐灌顶,立马照做:

interface NewsApi {
    @POST("/getWangYiNews")
    suspend fun fetchNews(@FieldMap map:Map<String,String>): NewsBean
}

将接口定义为suspend方法。查询数据库内容架构师证书也应该这么改:

@Dao
interface NewsDao {
    @Query("select * from news")
    suspend fun queryNewsSuspend(): List<News>
}

其实若将查询数据库的结果定义为 Flow 的话,每当数据库内容发生增删线程池有哪几种,Flow 的订阅者都会收到通知。相较于“多个异步结果”前端性能优化,当前场景使用“架构图怎么画单个异步结果”更合适。

将访安卓性能优化问数据库及请求网络在 Repository 中转化成流:

class NewsRepo() {
    // 访问网络的 Flow(冷流:此时并未发生网络请求)
    fun remoteNewsFlow(page: Int, count: Int) = 
        suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
            .asFlow() // 将 suspend 代码块转换成流
            .map { newsBean ->
                if (newsBean.code == 200) {
                    // 请求成功,更新缓存
                    if (!newsBean.result.isNullOrEmpty()) {
                        newsDao.deleteAllNews()
                        newsDao.insertAll(newsBean.result.map { it.toNews() })
                        newsBean.result
                    } else {
                        emptyList()
                    }
                } else {
                    throw Exception(newsBean.message)
                }
            }
    // 访问数据库的 Flow(冷流:此时并未发生数据库查询)
    val localNewsOneShotFlow = flow {
        val news = newsDao.queryNewsSuspend()
        val newsList = news.map { it.convert() }// 将数据库数据统一为网络数据
        emit(newsList)
    }
}

在 Flow 数据链路的架构师场景下,Repository 作为数据链路的起点,提供给上层的是“原始的冷流”。

代码中虽然调用了访问网络和查询数据库的方法,排序复杂度但是它们是被定义在“冷流”圈复杂度中的,若未发生订阅行为,就不会执行。订阅行为通常是在界面中进行。

变换 & 合流

当链路用 LiveD架构工程师ata 表达时,访问数据库和网络的操作被定义在一个 Repository 的方法中:

class NewsRepository(context: Context) {
    fun fetchNewsLiveData(): LiveData<List<News>?> {
        // 1.从数据库获取新闻
        val localNews = newsDao.queryNews()
        // 2.从网络获取新闻
        val remoteNews = newsApi.fetchNewsLiveData(mapOf("page" to "1", "count" to "4"))
        // 3.将数据库和网络响应的 LiveData 合并
        newsLiveData.addSource(localNews) {newsLiveData.value = it}
        newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
        return newsLiveData
    }
}

并且它们是串行的,即只有线程池拒绝策略当数据库访问结束后才开始网络请求,最后再将它们通过 Medi性能优化atorLiveData 合流。

而使用流时,数据库和网络操作被定义在不同的流中,这为它们提供了更灵活的合流方式。

串行合流

串行合流的思路是将多个流组织成“嵌套流”,然后将它们“展平”。

拿 L环形复杂度ist 举例线程池的使用L算法复杂度ist.flat()提供了在列表上的展平操作,fl圈复杂度at 即展平,为啥要展平?因环路复杂度为有嵌套,比如List<List<Int>>,即 List 中每个元素还是 List:

val lists = listOf(
    listOf(1,2,3),
    listOf(4,5,6)
)
Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]
Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]

List.flat() 将两层嵌套结构变成单层结构,而List.flatMap()在展平的同时提供了变换内部 List 的机会。

流也提供了类似的展平方法flatte架构图模板nConcat()

flowOf(
    flow {
        emit(1)
        emit(2)
    },
    flow { emit(3) },
    flow { emit(4) },
).flattenConcat().collect {
    Log.v("ttaylor", "${it}") // 1,2,3,4
}

flattenConc架构师at() 的合流是串行的,即只有消费了前一个流中所有的数据后才会消费后一个流。

在 ViewModel 层对原始数据流进行合流:

// 新闻 ViewModel 持有 repo
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenConcat() // 串行合流
            .map { NewsModel(it, false) }
}
// 通过 ViewModelProvider.Factory 定义构建 ViewModel 的细节(注入Repository)
class NewsViewModelFactory(private val newsRepo: NewsRepo) : ViewModelProvider.Factory {
    override fun <T : ViewModel> create(modelClass: Class<T>): T {
        return NewsViewModel(newsRepo) as T
    }
}

在 Repository + Flow 的加持下,View环复杂度Model 变得异常简单,它持有原始数据流并对其进行合流以及变换。

复杂度怎么计算的个原始数据流分别是数据库流和网络流,使用flowOf()环形复杂度它们组织成Flow&l架构图模板t;Flow<News>>嵌套结构,然后调用 flattenConcat() 将它们串行合流并展平变成一个流,即先查询数据库,待查询完毕后才请求网络。合流之后还进行了数据变换,以将网络数据转换为圈复杂度界面数据 NewsModel:

data class NewsModel(
    val news: List<News>, // 新闻列表
    val loading: Boolean, // 是否正在加载
    val errorMessage: String = "" // 错误信息
)

将新闻列手机性能优化表进行这样线程池参数配置包装的目的是实现“唯一可信数据源”,这是 MVI 的关键词之一。关于它的详细介手机性能优化绍可以点击Android 架构最新进展 | MVI = 响应式编程 + 单向数据流 + 唯一可信数据源(该篇和本文同时发布,若链接无法跳转,可能是还未过审,请稍等~)

并行合流

串行合流中网络请求必须等待数据库查询,若两者能并行,则性能就会更好一点。

flattenMerge()方法就用于多流并架构图模板发的场景:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge() // 并行合流
            .map { NewsModel(it, false) }
}

此时数据库和网络流会并发启动,性能是好了,但也产生了新问题。

每个流生成的数据会合成到一个流中并通知界面刷新。若数据库流先生成圈复杂度数据,让用户先看到缓存新闻,然后网络流再生成数据,用新数据把老数据刷掉。这个流程是符合预期的。但万一数据库抽风了,比网络还慢咋办?这就会发生老数据刷掉新数据的 bug。

解决方案是:当接收到网络流的数据时,就丢弃流上后面的数据。

在 RxJava 中有一个操作符叫takeUntil()就是用来描述这个场景的。

但 Kotlin Flow 并未提供这个方法算法复杂度。。。于是我开架构工程师始在网上找。。。直到我发现了这个官方回复:Flow.transformWhile operator Issue #2065 Kotlin/kotlinx.cor…

官方说不会提供 takeUntil() 方法。因为 Kotlin Flow 的设计原则是“简单”,只提供必要的和高度灵活性的方法,以便自定义。Kotlin Flow 中以transform开头的线程池参数配置方法都是高度灵活的,它们通常用来定义其他操作符。在Kotlin 异步 | Flow 应用场景及原理中分析过Flow架构图模板.transform()方法的灵活性。线程池面试题现在来看下transformWhile()

public fun <T, R> Flow<T>.transformWhile(
    transform: suspend FlowCollector<R>.(value: T) -> Boolean // 这 lambda 带有数据发射能力
): Flow<R> =
    safeFlow {
        // 进行有条件的转发流数据,条件即是 transform
        return@safeFlow collectWhile { value ->
            transform(value)
        }
    }
// 有条件的收集流数据
internal suspend inline fun <T> Flow<T>.collectWhile(
    crossinline predicate: suspend (value: T) -> Boolean
) {
    // 自定义流收集器,描述如何发射数据
    val collector = object : FlowCollector<T> {
        override suspend fun emit(value: T) {
            // 当满足条件时才发射数据,否则丢弃流往后的数据
            if (!predicate(value)) {
                throw AbortFlowException(this)
            }
        }
    }
    try {
        collect(collector)// 收集上游流并通过自定义的方式转发给下游
    } catch (e: AbortFlowException) {
        e.checkOwnership(collector)
    }
}

transformWhile() 的套路依然是拦截转发机制,即新建下游流,它生产数架构图怎么画据的方式是通过收集上环路复杂度游数据,并将数据转发到线程池的七个参数一个带有发射数据能力的 lambda 中,当前这个 lambd算法复杂度a 需要有一个返回值,该线程池原理值决定了是否要终止上游流数据的生产。

现在复杂度怎么计算的的问题转化为,如何让网络流告诉数据库流“我已经生成数据了你歇菜吧~”

“流的通信”,听上去有点高大上,但转念一想,是我把问题架构师工资想复线程池原理杂了。因为网络和数据库流已经在 ViewMode时间复杂度l 层合流了,它们并成一个流了,流动的是List<News>,在这个数据结构上套一层就能实现所谓的“流通信”:

// 新闻流包装类
data class NewsFlowWrapper(
    val news: List<News>,// 新闻列表
    val abort: Boolean // 是否中断流
)

用 NewsFlowWrap架构图怎么制作per 改造下 NewsRepo:

class NewsRepo(context: Context) {
    val localNewsFlow = flow {
        val news = newsDao.queryNewsSuspend()
        val newsList = news.map { it.convert() }
        // 使用 NewsFlowWrapper 包装数据库流
        emit(NewsFlowWrapper(newsList, false))
    }
    fun remoteNewsFlow(page: Int, count: Int) = 
        suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
            .asFlow()
            .map { newsBean ->
                if (newsBean.code == 200) {
                    if (!newsBean.result.isNullOrEmpty()) {
                        newsDao.deleteAllNews()
                        newsDao.insertAll(newsBean.result.map { it.toNews() })
                        // 网络请求成功时,中断流
                        NewsFlowWrapper(newsBean.result, true)
                    } else {
                        NewsFlowWrapper(emptyList(), false)
                    }
                } else {
                    throw Exception(newsBean.message)
                }
            }
}

接着用 transformWh架构图模板ile() 改造一下 ViewModel 层的合流:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)// 总是直接转发上游数据 ,直到 abort 为 true
                !it.abort
            }
            .map { NewsModel(it, false) }
}

就这样自定义了一个新操作符用于流通信。

在讨论到用 Kotlin Flow 取代 RxJava 的时候,有一种声音说“相比 RxJava,Kotlin Flow 的操作符还很匮乏,有待丰富~”。我倒是线程池创建觉得这是 R线程池创建xJava 的劣势,Kotlin Flow 的优势。RxJava 让人最望而却步的正是因为复杂性,品种繁多的“流”、琳琅满目的操作符、以及 Rx 版的回调地狱。Kotlin Flow 的线程池策略是简单 + 高灵活性。

这样一来,用 Fl电脑性能优化ow 重构的数据链路上,Repository 和 ViewModel 的界限就很清晰了:Repository 提供原始的数据流,以供 View架构图模板Model 用各种自己喜欢的方式进行合流及变换。

异步化

若直接在界面中收集上述新闻流的话,程序会 crash,提示不能在主线程操作数据库。

所有在流中的操作,默认情况下都是执行在主线程池核心参数线程的线程池面试题

将流中的操作异步化架构是什么意思也很简单:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO) // 将所有上游操作都分发到 IO 线程执行
}

在 LiveData 承载数据链路的版本中js性能优化,需自行启动线程池执行数据库操作(网络操作的异步化由OkHttp实现)。

当用 Flow 组织架构师证书数据OKHttp库流和网络流时,只需一个方法就能实现异步化,无疑大大地降低了复杂度

捕获异常

使用catch()可以捕获所有上游抛出的异常:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO)
            .catch {
                // 捕获自定义异常并向流发送消息
                if (it is YourException)
                    emit(NewsModel(emptyList(),false,"network error,show old news"))
            }

灵活的是,在线程池拒绝策略捕获异常之后还可以继续向流发送数线程池核心参数据。比如当网络异常时,向界面发送一个带有 errorMessage 的 Model,界面根据此字段决定是否展示错误 to架构a线程池的工作原理st。也可以在这里处理和服务端约定的特殊错误码。

感知生命周期js性能优化

流准备地差不多了,下一步就是让界面收集流并刷新:

class NewsActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 收集新闻流并展示
        lifecycleScope.launch { 
            newsViewModel.newsFlow(1, 8).collect { showNews(it) }
        }
    }
    // 这样刷新界面是 MVI 提倡的
    private fun showNews(newsModel: NewsModel) {
        when {
            // 展示 loading
            newsModel.loading -> {
                showLoading()
            }
            newsModel.errorMessage.isEmpty() -> {
                dismissLoading()
                // 将新闻展示在 RecyclerView 上
            }
            // 展示错误提示
            else -> {
                dismissLoading()
                showErrorMessage(newsModel.errorMessage)
            }
        }
    }
}

其中展示/解散 loading 的方法定义如下:

// 展示 loading
fun Activity.showLoading() {
    contentView()?.apply {
        ProgressBar {
            layout_id = "pb"
            layout_width = 50
            layout_height = 50
            layout_gravity = gravity_center
        }
    }
}
// 解散 loading
fun Activity.dismissLoading() {
    val pb = contentView()?.find<ProgressBar>("pb")
    pb?.let { contentView()?.removeView(it) }
}
// 获取 Activity 的 content view
fun Activity.contentView(): FrameLayout? =
    takeIf { !isFinishing && !isDestroyed }?.window?.decorView?.findViewById(android.R.id.content)

展示 loading 即向当前 Activi环路复杂度ty 的 contentView 添加一个子 View,解散 loading 即是移除该子 View。其中使用了 DSL 声明式地构建了界面,详细介绍可以点击A架构图怎么画ndro架构图id性能优化 | 把构建布局用时缩短 20 倍(下)。

这样写会有一个坑,若新闻流因为各种原因迟迟未生成新闻列性能优化的方法表,此时用户切换到另一环复杂度个页面,不久后新闻流有数据了,数据被推到界面,就发生了 crash,因为要刷新的界面已不再前台。

lifecycleSc环复杂度ope

刚才是在lifecycleScope收集新闻流的,它是一个和生命周期对象绑定的协程域:

public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            // 获取现有 lifecycleScope
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            // 若没有现成的,则构建
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            // 并通过 cas + 自旋的方式保证存入 mInternalScopeRef
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                // 开始观察生命周期变化
                newScope.register()
                return newScope
            }
        }
    }

lifecycl架构图怎么画eScope 是一个LifecycleCoroutineScope实例,并以 Lifecy算法复杂度cle 对象的扩展属性存在。之所以能这样做是因圈复杂度为 Lifecycle 开了后门:

public abstract class Lifecycle {
    // 后门,方便在类的外存取“附加值”
    AtomicReference<Object> mInternalScopeRef = new AtomicReference<>();
}

这种动态时间复杂度为类新增属性的方法,在 Kotlin 源码中很常见,详解可以点击读源码长知识 | 动态扩展类并绑定线程池参数配置生命周期的新方式。

新建 L圈复杂度ifecycleCorout架构ineScope 实例后,会当场调用 register圈复杂度() 方法观察生命周期变化:

internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    fun register() {
        launch(Dispatchers.Main.immediate) {
            // 开始观察生命周期
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
            } else {
                coroutineContext.cancel()
            }
        }
    }
    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        // 当生命周期为 DESTROYED 时,取消观察并取消协程中 job 的执行
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}

lifecycleScope.launch() 会立刻启动协程,并在生命周期 DEST空间复杂度ROYED 时架构师和程序员的区别取消协程。

当 Activity 被另一个 Activity 遮挡时并不会 DESTROYED,所以此时若有流数据推过来还是可以更新到界面,并导致 crash。

flowWithLifecycle()

为此官方提供线程池有哪几种flo线程池有哪几种wWithL手机性能优化ifecycle()

public fun <T> Flow<T>.flowWithLifecycle(
    lifecycle: Lifecycle,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): Flow<T> = callbackFlow {
    lifecycle.repeatOnLifecycle(minActiveState) {
        this@flowWithLifecycle.collect {
            send(it)
        }
    }
    close()
}

flo线程池拒绝策略wWithLifecycle() 内部生成了一个架构工程师中间消费者callbackFlow,中间消费者会将上游数据转发给下游,不过是有条件的,只有当生架构图命周期满足要求时才会转发。

其中的 repeatOnLifecycle() 是 Lifecycle 的扩展方法:

public suspend fun Lifecycle.repeatOnLifecycle(
    state: Lifecycle.State,
    block: suspend CoroutineScope.() -> Unit
) { ... }

repea前端性能优化tOnLiOKHttpfecycle() 会在新的协程执行 block,当且仅当生命环形复杂度周期至少达到 state 状态,若生架构图怎么制作命周期未达标,则会取消 block 执行,若再次达标,则再次执行。

让 Flow 感知生命周期的写法如下:只有当生命周期满足要求时架构图,才收集上游并转发线程池的使用给下游,否则取消收集:

class NewsActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 以感知生命周期的方式收集新闻流
        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                newsViewModel.newsFlow(1, 8).collect { showNews(it) }
            }
        }
    }
}

嵌套回调出现了,看上去有点复杂。 还好有扩展方法,可以把这些细节隐藏起来:

// 用感知生命周期的方式收集流
fun <T> Flow<T>.collectIn(
    lifecycleOwner: LifecycleOwner,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    action: (T) -> Unit
): Job = lifecycleOwner.lifecycleScope.launch {
    flowWithLifecycle(lifecycleOwner.lifecycle, minActiveState).collect(action)
}

然后就可以像这样在界面中收集新闻流:

class NewsActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        newsViewModel.newsFlow(1, 8).collectIn(this) { showNews(it) }
    }
}

超简洁架构图怎么制作,把 LiveD环复杂度ata 又比下去了~

这个方法需注意调用顺序,当不满足生命周期时,它只js性能优化会取消订阅上游的数据,若下游还有另一线程池的使用流在生成数据,则无法感知生命周架构师期。(封装的collectIn()保证了它是收集数据前的最后一个操作符)

避免重复触发冷流

按照上面的写法,还是有问题。当从新闻界面跳转到另一个界面再返回时,会重新查数据库,架构图怎么画重新请求网络。。。

因为 Repository 提供的数据库和网络流都是“冷流”。冷流只有被收集之后才会生产数据,且冷流是没有地方存数据的,当数据从上游经过若干个中间消费者最后传递给订阅者,数据被展示在架构是什么意思圈复杂度面上,但整个数据链路上没有一个地方把数据存了下了。

又因为使用了repeatOnLifecycle(Lifecycle.State.STARTED),所以从另一个界面返回时,重新订阅了冷流,那它就毫不留情地开始重新生产数据。

SharedFlow

对于这种场景,解决方案是:让冷流共享,即多个订阅也不会触发冷流重新生产数据,最好能让冷流的数据被缓存,这样就能将最新的数据粘性地传递给新订阅者。

Sha线程池原理redFlow由此而生:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO)
            .catch {
                if (it is YourException)
                    emit(NewsModel(emptyList(),false,"network error,show old news"))
            }
            // 将流转换为 SharedFlow
            .shareIn(viewModelScope, SharingStarted.Lazily)
}

使用shareIn()将冷流转换成共享热流:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,// 启动策略
    replay: Int = 0 // 缓存大小,默认不缓存(非粘性)
): SharedFlow<T> {...}

shareIn 是 Flow 的扩展方法:

  • started参数是启动策略,它决定了上游流的生命周期,Sha架构师证书ringStarted.Lazily适用于当前的场景,即当共享热排序复杂度流有订阅者时才启动上游流,上游流将一直存活着手机性能优化
  • replay环形复杂度数决定了缓存的大线程池有哪几种小,若为1,表示会缓存最新的1个值,当有新订阅者,会将缓存值空间复杂度分发给它,实现粘性效果(同 Live架构图模板Data)。默认为0不缓存。

可以把 SharedFlow 想象成一个中间消费者,它收集上游流的数据并将其推送到热流中,然后将圈复杂度这些数据缓存并分享给所有的下线程池的使用游订阅者。

St架构师和程序员的区别ateFlow

StateFlow 是一个特别的 SharedFlow,环复杂度它是 Kotlin Fl架构ow 中更电脑性能优化像 LiveData 的存在。因为:

  1. Sta架构图怎么画teFl线程池面试题ow 总是会环路复杂度缓存1个最新的数据,上游流产生新数据架构师和程序员的区别后就会覆盖旧值(LiveData 也是)。
  2. StateFlow 持有一个 value 字段,可通过stateFlow.value读取最新值(LiveData 也是)。
  3. StateFlow 是粘性的,会将缓存的最新值分发给新订阅者(LiveData 也是)。
  4. StateFlow 必须有一个初始值(LiveData 不是)。
  5. StateFlow 会过滤重复值,即新值和旧值相同时不更新。(LiveDat线程池的工作原理a 不是)。

可以使用stateIn()重写新闻流:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO)
            .catch {
                if (it is YourException)
                    emit(NewsModel(emptyList(),false,"network error,show old news"))
            }
            // 将流转换为 StateFlow
            .stateIn(viewModelScope, SharingStarted.Lazily, NewsModel(emptyList(), true))
}

stateIn()线程池参数配置 中的第三个参数就前端性能优化是必须有的初始值,当 Rep排序复杂度ository 的原始数据流未生成数据时,初始值就已经推送给了订阅者,界面可以借此展示 loading。

若使用 shareIn排序复杂度(),则可以这样展示 loading:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO)
            .onStart { emit(NewsModel(emptyList(), true)) }// 展示loading
            .catch {
                if (it is YourException)
                    emit(NewsModel(emptyList(),false,"network error,show old news"))
            }
            // 将流转换为 SharedFlow
            .shareIn(viewModelScope, SharingStarted.Lazily)
}

使用onStart(),它会在流被收集时立刻发生一个数据。

到底使用 StateFlow 还是复杂度 SharedFlow?得看场景:

  1. 当需在流以外的架构图怎么制作地方访问流的最新值,则用 Sta安卓性能优化teFlow。
  2. 当需过滤重复值,则用 StateFlow(在 SharedFlow 上用 distinctUntilChangejs性能优化d() 效果相同)。
  3. 在需粘架构师证书性的场景下,则用 StateFlow(将 SharedFlow 的 replay线程池的使用 置为1效果相同)。

我试图找到更多使用 Sta线程池原理teFlow 的理由,但就像你看到的那样,大部分理由都不充分线程池创建。只有第一个场景下,必用 StateFlow性能优化的方法 不可。其他都可用 SharedFlow 代替,而且后者提供了架构师更大的灵活性。

MVI 化

上面的代码已经比较接近 MVI 的模样了。

MVI 有三个关键词:响应式架构图怎么画编程 + 单向数据流 + 唯一可信数据源。

关于 MVI 的剖析可以点击Android 架构最新进展 | MVI = 响应式线程池创建编程 + 单向数据流 + 唯一可信数据源 –

现援引“单向数据流”图片如下:

如何把业务代码越写越复杂?(二)| Flow 替换 LiveData 重构数据链路,更加 MVI

界面产生的数据叫事件(意图)Inten线程池面试题t,它流向 ViewModel,经加工后转环形复杂度换成 状态State供界面刷新。

sealed class FeedsIntent {
    // Feeds 初始化
    data class InitIntent(val type: Int, val count: Int) : FeedsIntent()
    // Feeds 加载更多
    data class MorePageIntent(val timestamp: Long, val count: Int) : FeedsIntent()
    // 删除某个帖子
    data class RemoveIntent(val id: Long) : FeedsIntent()
}

原本界面发起的事件是通过 ViewModel 的一个架构师证书方法调用传递的。为了线程池核心参数使用响应式编程形成数据流空间复杂度,得把函数调用用“数线程池原理据”的形式包装起来。复杂度怎么计算的

事件产生自界面,所以事件流理所当然在界面组织:

class StateFlowActivity : AppCompatActivity() {
    private val refreshLayout: RefreshLayout
    // 在界面层组织事件流
    private val intents by lazy {
        merge(
            // 加载 Feeds 首页事件
            flowOf(FeedsIntent.InitIntent(1, 5))
            // 加载更多 Feeds 事件
            loadMoreFeedsFlow()
        )
    }
    private fun loadMoreFeedsFlow(): Flow<FeedsIntent> = callbackFlow {
        refreshLayout.setOnRefreshListener {
            trySend(FeedsIntent.MorePageIntent)
        }
        awaitClose()
    }
}

上述代码包含了两个事件,分别是加载首前端性能优化页和加载更多,它俩都被组织成流,并使用 m线程池原理erge 进行合流,merge 会将每个 Flow 中时间复杂度的数据合起来并发地转发到一个新的流上。

当流被订阅后,加载首页线程池安卓性能优化事件会立刻产生并无条件的分发给下游,而加载更多事件需等待上拉动作发生时才会生成。

class StateFlowActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    private val intents by lazy {
        merge(
            flowOf(FeedsIntent.InitIntent(1, 5))
            loadMoreFeedsFlow()
        )
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 订阅事件流,目的将事件传递给 ViewModel
        intents
            .onEach(newsViewModel::send) // .onEach { newsViewModel.send(it) } 效果一样
            .launchIn(lifecycleScope)
    }
}

在 onCreate() 订阅事件流,每产生一个事件都会调用 NewsViewModel.send() 方法将事件传递给 ViewModel。其中::用于将一个方法变前端性能优化为 lambda,方法就可以作为参数传给另一个方排序复杂度法,以简化代排序复杂度码。

NewsViewModel.send() 方法定义如下:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    // 用于接收界面事件的共享流
    private val _feedsIntent = MutableSharedFlow<FeedsIntent>()
    // 界面事件唯一入口,向流中发送事件
    fun send(intent: FeedsIntent) {
        viewModelScope.launch { _feedsIntent.emit(intent) }
    }
}

现在界面事件已经以数据流Flow<线程池拒绝策略;FeedsInte架构nt&圈复杂度gt;的方式流入了 ViewModel,下一步就是在流上进行数据变换,即流入的是 Intent,流出的是 State。遂定义一个将F圈复杂度low<FeedsIntent>转化成Flow<NewsSta线程池的七个参数te>的扩展方法:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    // 将事件转换成状态(NewsState即是上面的NewsModel,换了个名字而已)
    private fun Flow<FeedsIntent>.toNewsStateFlow(): Flow<NewsState> = merge(
        // 加载首页事件处理
        filterIsInstance<FeedsIntent.InitIntent>()
            .flatMapConcat { it.toFetchInitPageFlow() },
        // 删除帖子事件处理
        filterIsInstance<FeedsIntent.RemoveIntent>()
            .flatMapConcat { ... },
        // 加载更多事件处理
        filterIsInstance<FeedsIntent.MorePageIntent>()
            .flatMapConcat { ... }
    )
}

每一个上游的FeedsIntent都会在这里被转换成一个Flow<NewsState>,就形成了Flow<Flow<NewsState>>这样的空间复杂度结构,然后用 flatMapConcat() 将其展平变成Flow<NewsState&js性能优化gt;

由于有多种事件,架构师遂使用 filterIsInstan算法复杂度ce() 按线程池的七个参数事件类型过滤,实电脑性能优化现了事件分流,即是用流的方式写 if-else。

其中toFetchInitPageFlow()描述了如何将加载首页事件转换成Flow<New环形复杂度sState&架构工程师gt;架构工程师

// NewsViewModel.kt
private fun FeedsIntent.InitIntent.toFetchInitPageFlow() =
    flowOf(
        newsRepo.localNewsOneShotFlow,
        newsRepo.remoteNewsFlow(this.type, this.count)
    )
        .flattenMerge()
        .transformWhile {
            emit(it.news)
            !it.abort
        }
        .map { NewsState(it, false) }
        .onStart { emit(NewsState(emptyList(), true)) }
        .catch {
            if (it is SSLHandshakeException)
                emit(
                    NewsState(
                        emptyList(),
                        false,
                        "network error,show old news"
                    )
                )
        }

转化的方法即是拉取数据库以及网络(就是把之前定义好的数据库网络合流拿过来)。复杂度

是时候把事件流以及它的变换操作合起来了:

class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    // 事件流
    private val _feedsIntent = MutableSharedFlow<FeedsIntent>()
    // 状态流
    val newsState =
        _feedsIntent
            .toNewsStateFlow() // 将事件流转换成状态流
            .flowOn(Dispatchers.IO) // 异步地进行变换操作
            .shareIn(viewModelScope, SharingStarted.Eagerly) // 将流转换成共享流以供界面订阅
}

最后界面观察状态流:

class StateFlowActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    // 组织界面事件
    private val intents by lazy {
        merge(
            flowOf(FeedsIntent.InitIntent(1, 5))
            loadMoreFeedsFlow()
        )
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 数据流起点:发送事件
        intents
            .onEach(newsViewModel::send)
            .launchIn(lifecycleScope)
        // 数据流终点:消费状态
        newsViewModel.newsState
            .collectIn(this) { showNews(it) }
    }
    // 渲染界面
    private fun showNews(newsModel: NewsState) {
        when {
            newsModel.loading -> {
                showLoading()
            }
            newsModel.errorMessage.isEmpty() -> {
                dismissLoading()
                newsAdapter.news = newsModel.news
                rvNews.adapter = newsAdapter
            }
            else -> {
                dismissLoading()
                tv.text = newsModel.errorMessage
            }
        }
    }
}

LiveData vs Flow

LiveData 面试题库、解答、源码分析 这里详尽地分析了 LiveData 的原理及使用过程中的坑。

对于承载数据来说,Kotlin Flow 相较于 LiveData 只能说有过之而无不及:

  1. LiveData 不能方便地支持异步化。
  2. LiveData 粘性问题的解决方案虽然很多,但用起来都很变扭。
  3. LiveDat性能优化的方法a 可能发生数据丢失的情况。
  4. LiveData 的数据变性能优化换能力远远不如 Flow。
  5. LiveData 多数据源的合流能力远远不如 Flow。

除此之外,Flow 还有一点非常吸引人,那就是 简洁,Flow前端性能优化 可以用及其轻松简单的方式实现复杂的效果,代码的复杂度斗降,可读性斗升。更重要的是,这是大势所趋,还在犹豫什么~

参考

Substit算法复杂度uting LiveData: StateFlow or SharedFlow? | ProAndroidDev

A safer way to collect flows from Android UIs | by Manuel Vivo | Android Developers | Medium

Room Flow. Coroutines support in Room has been… | by Florina Muntenescu | Android Developers | Medium

kotlinx.coroutines线程池拒绝策略/flow.md at …线程池原理

Things to know about Flow’s shareIn and stateIn ope架构师rators | by Manuel Vivo | Android Developers | Medium

Migrating from LiveData to Kotlin’s Flow | by J圈复杂度ose Alcrreca | Android Developers | Medium

Exceptions in Kotlin Fjs性能优化lows. Kotlin Flow can complete normally or… | by架构工程师 Roman安卓性能优化 Elizarov | Medium

Flow.transformWhile operator Issue #空间复杂度2065 Kotlin/kotlinx.cor…

Mer架构图ging kotlin flows – Stack Ov空间复杂度erflow

JakeWharton/retrofit2-kotlin-coroutines-adapter: A Retrofit 2 adapter for Kotlin coroutine’s Deferr性能优化的方法ed type. (github.com)

Model-View-Intent Design Pattern on Android – xizzh复杂度u

推荐阅读

如何“好好利用多态”写出又臭又长又难以维护的代码?

Kotlin 异步 | Flow 限流的应用场景及原理

Kotlin 异步 | Flow 应架构师用场景及原理

Kotlin 协程 | Coroutine环形复杂度Context 为什么要设计成 indexed set?(一)

Kotlin 源码 | 降低线程池的使用代码复杂度的法宝

Kotlin 基础 | 望文生义的 Kotlin 集合操作

Kotlin 基础 | 拒绝语法噪音

Kotlin 基础 | 委托及其应用

Kotlin 进阶 | 不变型、协变、逆变

Kotlin 实战 | 用语法糖干掉形状 xml 文件

我是怎么把业务代码越写越复杂的 | MVP – MVVM – Clean Architecture

Android 架构最新进展 | MVI = 响架构图怎么制作应式编程 + 单向数据流 + 唯一可信数据源

发表评论

提供最优质的资源集合

立即查看 了解详情