面试题 | 等待多个并发结果有哪几种方法?

引子

App 开发中,等候多个异步成果的场景许多见,

比方并发地在后台履行若干个运算,待一切运算履行完毕后归总成果。

比方并发地恳求若干个接口,待一切成果回来后改写界面。

比方计算相册页并发加载 20 张图片的耗时。

其实把若干异步使命串行化软件复杂度级联是什么意思最简略的处理办法,即前一个异步使命履行完软件复杂度毕后再履行下一个。但这样就无法运用多核功能嵌套if函数,履行时刻被拉长,此刻的履行总时长 = 一切使命履行时长的和。

若答应使命并发,则履行总时长 = 履行时刻最长使命的耗时。时刻功能得以优软件复杂度化,但随之而来的一个复杂度是:“怎么等候多个异步成果”。

本文会介绍几种处理方案,并将它们运用到不同的事安全教育日务场景,比对一下哪个方案适用于哪个场景。

等候并发网络恳求

kotlin怎么读尔值

假设有如下两个网络恳求:

// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { ... }
    })
}

广告需要按必定规矩插入到新闻列表中。

最简略的做法是,先恳求新闻,待其回来后再恳求广告。显然这会增加嵌套if函数用户等候时刻。而且会写出这样的代码:

// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<News> {
        override fun onFailure(call: Call<News>, t: Throwable) { ... }
        override fun onResponse(call: Call<News>, response: Response<News>) {
                // 拉取广告
                newsApi.fetchAd().enqueue(object : Callback<Ad> {
                    override fun onFailure(call: Call<Ad>, t: Throwable) { ... }
                    override fun onResponse(call: Call<Ad>, response: Response<Ad>) { ... }
                })
        }
    })
}

嵌套回调,若再加一个接口,回调层次就会再加一kotlin匿名函数层,不能忍。 用户和程序员的体会都不好,得想办法处理。

第一个想到的方案级联是布尔值:

var isNewsDone = false
var isAdDone = false
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            isNewsDone = true
            tryRefresh(news, ad)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            isNewsDone = true
            news = response.body().result
            tryRefresh(news, ad)
        }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { 
            isAdDone = true
            tryRefresh(news, ad)
        }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { 
            isAdDone = true
            ads = response.body().result
            tryRefresh(news, ad)
        }
    })
}
// 尝试改写界面(只有当两个恳求都回来时才改写)
fun tryRefresh(news: List<News>, ads: List<Ad>) {
    if(isNewsDone && isAdDone){ //改写界面 }
}

设置两个布尔值分别对应两个恳求是否回来,而且在每个恳求回来时检测两个布尔值,若都为 true 则进行改写界面。

网络库一般会将恳求成功的回调抛到主线程履行,所以这儿没有线程安全问题。但如果不是网络恳求,而是后台使命,此刻需要将布尔值级联声明为volatile保证其可见性,关于 volatile 更安全教育手抄报详细的解说能够点击面试题 | 徒手写一个非堵塞线程安全行列 ConcurrentLi复杂度nkedQu圈复杂度eue?。

这个方案能处理问题,但只适用于并发恳求数量很少的恳求,由于每个恳求都要声明一个布尔值。而且每增加一个恳求都要修改其余恳求的代码,可维护性差。

CountdownLatch

更好的方案是CountDownLatch,它是java.util.concurrent包下的一个类,用来等候多个异步成果,用法如下:

val countdownLatch = CountDownLatch(2)//初始化,等候2个异步成果
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            countdownLatch.countDown()
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            news = response.body().result
            countdownLatch.countDown()
        }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { 
            countdownLatch.countDown()
        }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { 
            ads = response.body().result
            countdownLatch.countDown()
        }
    })
}
// countdownLatch 在新线程中等候
thread { 
    countdownLatch.await() // 堵塞线程等候两个恳求回来
    liveData.postValue() // 抛数据到主线程刷改写界面
}.start()

CountDownLat嵌套if函数ch 在构算法复杂度造时需传入一个数量嵌套查询sql语句,它的安全教育手抄报语义能够理解为一个计数器。countDown() 将计数器减一,而 awaikotlin匿名函数t() 会堵塞当时线时间复杂度程直到计数器为 0 才被唤醒。

该计数器是一个 int 值,或许被多线程拜访,为了保证线程安全,它被声明为 volatile,而且 countDown() 经过 CAS + 自旋的办法将其减一。

关于 CAS 的介绍能够点击面试题 | 徒手写一个非堵塞线程安全行列 ConcurrentLinkedQueue?。

若新增一个接口,只需要将计数器kotlin什么意思的值加一,并在新接口回来时调用 countDown() 即可,可维护性陡增。

协程

Kotlin 是下降复杂度的大安全工程师师,它关于这个问题的处理方案kotlin是什么能够让代码看上去更简略。

在 Kotlin 的国际里异步操作应该被定义为suspend办法,retrofit 就支撑这样的操作,比方:

interface NewsApi {
    @GET("/xxx")
    suspend fun fetchNews(): List<News>
    @GET("/xxx")
    suspend fun fetchAd(): List<Ad>
}

然后在协程中运用async发动异步使命:

scope.launch {
    // 并发地恳求网络
    val newsDefered = async { fetchNews() }
    val adDefered = async { fetchAd() }
    // 等候两个网络恳求回来
    val news = newsDefered.await()
    val ads = adDefered.await()
    // 改写界面
    refreshUi(news, ads)
}

不管是写起来还是读起来,体会都非常好。由于协嵌套函数程把回调干掉了,逻辑不会跳来跳去。

其间的async()是 CoroutineScope 的扩展办法:

// 发动协程,并回来协程履行成果
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}

async() 和 launch() 唯一的不同是它的回来值是Defered,用于描kotlin怎么读述协程体履行的成果:

public interface Deferred<out T> : Job {
    // 挂起办法: 等候值的核算,但不会堵塞当时线程,核算完成后康复当时协程履行
    public suspend fun await(): T
}

调用async()发动子协程不会级联是串联还是并联挂起外层协程,而是立即回来一个Defe安全生产法rred目标级联输入,直到调用Deferred.awkotlin和javaait()协程的履行才会被挂起。当协程在多个D嵌套查询sql语句ekotlin和java区别ferred目标级联输入上被挂起时,只有当它们都康复后,协程才持续履行。这样就完成了“等候多个并行的异步成果”。

但这样写会问题:当广告拉取抛出反常时,新闻拉取也会被撤销。

这是协程的一个默许设定,叫结构化并发,即并发是有结构性的kotlin和java

Java 中线程的并发是没有结构的,所以做如下事情很困难:

  1. 完毕一个线程时,怎么一并完毕它一切的子线程?
  2. 当某个子线程抛出反常时,怎么完毕复杂度和它同一层级的兄弟线程?
  3. 父线程怎么等候一切子线程完毕之后才完毕?

之所以会很困难,是由于 Java 中的线程是没有级联级联系的。而 Kotlin 经过协程域 CoroutineScope 以及协程上下文 CoroutineContext 完成级联联系。

在协程中发动的子协程会承继父协程的协程上下文,除了其间的 Job,一个新的 Job 会被创建并归属于父协程的嵌套结构子 Job安全。经过这套机制,协程和子协程软件复杂度之间有了级联联系,kotlin怎么读就能完成结构化并发。(以后会就结构化并发写一个系列,敬请期待~)

关于 CoroutineContext 内部结构的详细分析能够点击Kotlin 协程 | CoroutineContext 为什么要设计成 indexed s嵌套结构et?

但有些事务场景不需要子使命之间彼此关联,比方当时场景,广告加载失利不应该影响新闻的拉取,大不了不展示广告。为此 kotlin 供给了supervisorScope

scope.launch {
    supervisorScope {
        // 并发地恳求网络
        val newsDefered = async { fetchNews() }
        val adDefered = async { fetchAd() }
        // 等候两个网络恳求回来
        val news = newsDefered.await()
        val ads = adDefered.await()
        // 改写界面
        refreshUi(news, ads)
    }
}

supervisorScope 新建一个协程域承继父亲的协程上时间复杂度下文,但会将其间的 Job 重写为SupervisorJob,它的特点便是孩子的失利不会嵌套分类汇总的操作步骤影响父亲,也不会影响兄弟。

现在广告和新闻加载互不影响,各自抛反常都不会影响对方。但就目前的事务场景来说,理想情况是这样的:“广告加载失利不应该影响kotlin匿名函数新闻的加载。但新闻加载失利应该撤销广告的加载(由于此刻广告也没有展示的机会)”

稍改动下安全生产法代码:

scope.launch {
    supervisorScope {
        // 并发地恳求网络
        val adDefered = async { fetchAd() }
        val newsDefered = async { fetchNews() }
        // 当新闻恳求抛反常时,撤销广告恳求
        newsDefered.invokeOnCompletion { throwable ->
            throwable?.let { adDefered.cancel() }
        }
        // 等候新闻
        val news = try {
            newsDefered.await()
        } catch (e: Exception) {
            emptyList()
        }
        // 等候广告
        val ads = try {
            adDefered.await()
        } catch (e: Exception) {
            emptyList()
        }
        // 改写界面
        refreshUi(news, ads)
    }
}

invokeOnCompletion()安全相当安全教育平台作业登录于注册了一个回调,在异步使命完毕时调用,不管是正常完毕还是因反常而完毕。在该回调中判断,若新闻因反常而完毕则撤销广告使命。

由于新闻和广告使命都或许抛出反常,且 async 发动的异步使命是在调用 await() 时才会抛出反常,所以它应该安全教育手抄报包裹在 try-环路复杂度catch 中。Kotlin 中的 try-catc嵌套分类汇总的操作步骤h 是一个表达式,即是有回来值的。这安全个特性让正常和反常情况的值聚合在一个表达式中。

若不运用 try-catch,程序也不会奔溃,由于 supervisorScope 中反常是不会向上传播的,即子协程的反常不会影kotlin和java区别响兄弟和父亲。但这样就少了反常级联反应名词解释情况嵌套if函数的处理。

若现有代码都安全模式是 Callback 办法的,还能不能享用协程的简洁?

能!Kotlikotlin为什么流行不起来n 供给了suspendCoroutine(),专门用嵌套是什么意思于将回调风格的安全代码转换成级联反应 suspend 办法,以拉取新闻为例:

// Callback 办法
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
    })
}
// suspend 办法
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            continuation.resumeWithException(t)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            continuation.resume(response.body().result)
        }
    }) 
}

其间的Continuation剩余的核算,从办嵌套分类汇总怎么做法上看,它便是一个回调:

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>) // 开端剩余的核算
}

每个 suspend 办法被编译成 java 之后,都会在原有办法参数表最终增加一个 Co安全教育平台ntinuation 参数,用于表达这个挂起点之后“剩余的核算”,举个比方:

scope.launch {
    fun1() // 普通办法
    suspendFun1() // 挂起办法 
    // --------------------------
    fun2() // 普通办法
    suspendFun2() // 挂起办法
    // --------------------------
}

kotlin怎么读个协程体中有四个办法,其间两个是挂起办法,每个挂起办法都是一道水平的分割线,分割线下方的代码便是当时履行点相关于整个嵌套分类汇总的操作步骤协程体剩余的核算,这“剩余的核环形复杂度算”会被包装成 Continua嵌套虚拟化tion 并作为参数级联删除传入挂起办法。所以上述代码翻译成 ja劳动复杂度va 就类似于嵌套结构

scope.launch {
    fun1()
    suspendFun1(new Continuation() {
        @override
        public void resumeWith(Result<T> result) {
            fun2()
            suspendFun2(new Continuation() {
                @override
                public void resumeWith(Result<T> result) {
                }
            })
        }
    })
}

所以挂起办法无异于 jakotlin匿名函数va 中带回调的办法,它自然不会堵塞当时线程,它仅仅把协程体中剩余的代码级联是什么意思当成回调,该回调会在将来某个时刻点被履行。经过这种办法,挂起办法主动让出了 cpu 履行权。

题外话

从事务上讲,将 C安全工程师allback 办法改造成挂起式能够下降事务复杂度。举个比方:用户能够经过若干动级联效应作触发拉取新闻,比方初次进入新闻页、下拉改写新闻页、上拉加载更多新闻、切换分区。新闻页有一个埋点,当初次展示某分区时,上报此刻的新闻。

若没有 s安全工程师uspend 办法,代码应该这样写:

// NewsViewModel.kt
fun fetchNews(isFirstLoad: Boolean, isChangeType: Boolean) {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            // 将新闻抛给界面改写
            newsLiveData.value = response.body.result
            // 只有当初次加载或切换分区不时才埋点
            if(isFirstLoad || isChangeType) {
                reportNews(response.body.result)
            }
        }
    })
}
// NewsActivity.kt
// 分区切换监听
tab.setOnTabChangeListener { index ->
    newsViewModel.fetchNews(false, true)
}
// 初次加载新闻
fun init() {
    newsViewModel.fetchNews(true, false)
}
// 下拉改写
refreshLayout.setOnRefreshListener {
    newsViewModel.fetchNews(false, false)
}
// 上拉加载更多
refreshLayout.setOnLoadMoreListener {
    newsViewModel.fetchNews(false, false)
}

由于埋点需要带时间复杂度上新闻列表,所以必须在恳求回来之后上报。安全教育平台作业登录不同事务场景嵌套结构的拉取接口是同一个,所以只能在一致的环形复杂度 onResponse() 中分类评论,分类评论依赖于符号位,不得不为 fetchNews(安全教育平台登录入口) 增加两个参数。

如果将级联菜单拉取新闻的接口改成 sukotlin匿名函数spend 办法就能化解这类复杂度:

// NewsViewModel.kt
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            continuation.resumeWithException(t)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            val news = response.body.result
            newsLiveData.value = news
            continuation.resume(news)
        }
    }) 
}
// NewsActivity.kt
fun initNews() {
    scope.launch {
        val news = viewModel.fetchNews()
        reportNews(news)
    }
}
fun changeNewsType() {
    scope.launch {
        val news = viewModel.fetchNews()
        reportNews(news)
    }
}
fun loadMoreNews() {
    scope.launch { viewModel.fetchNews() }
}
fun refreshNews() {
    scope.launch { viewModel.fetchNews() }
}
newsViewModel.newsLiveData.observe {news ->
    showNews(news)
}

一切界面的改写还是走 Livekotlin语言Data,但拉取新闻级联交换机的办法被改造成挂起之后,也会将新闻列表用类似同步的办法回来,所以能够在相关事务点进行独自空间复杂度埋点。

计算相册加载图片耗时

再经过一个更高并发数的场景比对下各个方案代码上的差异,场景如下:

面试题 | 等候多个并发成果有哪几种办法?

测试并发加载 20 张网络图片的总耗时。该场景下现已无法运用布尔值,由于并发数太多。

Countdow级联输入nLatc安全教育平台h

var start = SystemClock.elapsedRealtime()
var imageUrls = listOf(...)
val countdownLatch = CountDownLatch(imageUrls.size)
// 另起线程等候 CountDownLatch 并输出耗时
scope.launch(Dispatchers.IO) {
    countdownLatch.await()
    Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 遍历 20 张图片 url
imageUrls.forEach { img ->
        ImageView {// 动态构建 ImageView
            layout_width = 100
            layout_height = 100
            Glide.with(this@GlideActivity)
                .load(img)
                .listener(object : RequestListener<Drawable> {
                    override fun onLoadFailed(
                        e: GlideException?,
                        model: Any?,
                        target: Target<Drawable>?,
                        isFirstResource: Boolean
                    ): Boolean {
                        countdownLatch.countDown() // 加载完一张
                        return false
                    }
                    override fun onResourceReady(
                        resource: Drawable?,
                        model: Any?,
                        target: Target<Drawable>?,
                        dataSource: DataSource?,
                        isFirstResource: Boolean
                    ): Boolean {
                        countdownLatch.countDown() // 加载完一张
                        return false
                    }
                })
               .into(this)
        }
}

协程

var imageUrls = listOf(...)
scope.launch {
    val start = SystemClock.elapsedRealtime()
    // 将每个 url 都变换为一个 Defered
    val defers = imageUrls.map { img ->
            val imageView = ImageView {
                layout_width = 100
                layout_height = 100
            }
            async { imageView.loadImage(img) }
    }
    defers.awaitAll()//等候一切的异步使命完毕
    Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 将 Callback 办法的加载转换为挂起办法
private suspend fun ImageView.loadImage(img: String) = suspendCoroutine<String> { continuation ->
    Glide.with(this@GlideActivity)
        .load(img)
        .listener(object : RequestListener<Drawable> {
            override fun onLoadFailed(
                e: GlideException?,
                model: Any?,
                target: Target<Drawable>?,
                isFirstResource: Boolean
            ): Boolean {
                continuation.resume("")
                return false
            }
            override fun onResourceReady(
                resource: Drawable?,
                model: Any?,
                target: Target<Drawable>?,
                dataSource: DataSource?,
                isFirstResource: Boolean
            ): Boolean {
                continuation.resume("")
                return false
            }
        })
        .into(this)
}

你更喜爱哪种办法?

参阅

Multiple Concurrent Asynchronous calls using Kotlin coroutines (async-await and sukotlin和java区别s安全教育平台pendCoroutine) | by Priya Sind安全教育平台作业登录kar Shah | MindOrks | Medium

我正在参与技能社区创作者签约方案招募活动,点击链接报名投稿。

发表回复

提供最优质的资源集合

立即查看 了解详情