聊一聊kotlin协程“初级”api

Kotlin协程已经出来很久了,相信咱们都有不同程度的用上了,由于最近处理的需求有遇到协程相关,因而今天来聊一Kotlin协程的“初级”api,首要初级api并不是它真的很“初级”,而是kotlin协程库中的基础api,咱们一般开发用的,其实都是经过初级api进行封装的高档函数,本章会经过初级api的组合,完结一个自界说的async await 函数(下文也会介绍kotlin 高档api的async await),触及的初级api有startCoroutineContinuationInterceptor

startCoroutine

咱们知道,一个suspend关键字润饰的函数,只能在协程体中履行,伴随着suspend 关键字,kotlin coroutine common库(平台无关)也提供出来一个api,用于直接经过suspend 润饰的函数直接启动一个协程,它便是startCoroutine

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
    作为Receiver
    receiver: R,
    当时协程结束时的回调
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}

能够看到,它的Receiver是(suspend R.() -> T),即是一个suspend润饰的函数,那么这个有什么效果呢?咱们知道,在一般函数中无法调起suspend函数(由于一般函数没有隐含的Continuation目标,这儿咱们不在这章讲,能够参阅kotlin协程的资料)

聊一聊Kotlin协程
可是一般函数是能够调起一个以suspend函数作为Receiver的函数(本质也是一个一般函数)

聊一聊Kotlin协程
其间startCoroutine便是其间一个,本质便是咱们直接从外部提供了一个Continuation,一起调用了resume办法,去进入到了协程的国际


startCoroutine完结
createCoroutineUnintercepted(completion).intercepted().resume(Unit)

这个原理咱们就不细讲下去原理,之前也有写过相关的文章。经过这种调用,咱们其实就能够完结在一般的函数环境,敞开一个协程环境(即带有了Continuation),进而调用其他的suspend函数。

ContinuationInterceptor

咱们都知道阻拦器的概念,那么kotlin协程也有,便是ContinuationInterceptor,它提供以AOP的办法,让外部在resume(协程康复)前后进行自界说的阻拦操作,比如高档api中的Diapatcher便是。当然什么是resume协程康复呢,可能读者有点懵,咱们还是以上图中出现的mySuspendFunc举例子

mySuspendFunc是一个suspned函数
::mySuspendFunc.startCoroutine(object : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext
    override fun resumeWith(result: Result<Unit>) {
    }
})

它其实等价于

val continuation = ::mySuspendFunc.createCoroutine(object :Continuation<Unit>{
    override val context: CoroutineContext
        get() = EmptyCoroutineContext
    override fun resumeWith(result: Result<Unit>) {
        Log.e("hello","当时协程履行完结的回调")
    }
})
continuation.resume(Unit)

startCoroutine办法就相当于创立了一个Continuation目标,并调用了resume。创立Continuation可经过createCoroutine办法,回来一个Continuation,假如咱们不调用resume办法,那么它其实什么也不会履行,只有调用了resume等履行办法之后,才会履行到后续的协程体(这个也是协程内部完结,感兴趣能够看看之前文章)

而咱们的阻拦器,就相当于在continuation.resume前后,能够增加自己的逻辑。咱们能够经过承继ContinuationInterceptor,完结自己的阻拦器逻辑,其间需求复写的办法是interceptContinuation办法,用于回来一个自己界说的Continuation目标,而咱们能够在这个Continuation的resumeWith办法里边(当调用了resume之后,会履行到resumeWith办法),进行前后打印/其他自界说操作(比如切换线程

class ClassInterceptor() :ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>):Continuation<T> by continuation{
    override fun resumeWith(result: Result<T>) {
        Log.e("hello","MyContinuation start ${result.getOrThrow()}")
        continuation.resumeWith(result)
        Log.e("hello","MyContinuation end ")
    }
}

其间的key是ContinuationInterceptor,协程内部会在每次协程康复的时分,经过coroutineContext取出key为ContinuationInterceptor的阻拦器,进行阻拦调用,当然这也是kotlin协程内部完结,这儿简单提一下。

实战

kotlin协程api中的 async await

咱们来看一下kotlon Coroutine 的高档api async await用法

CoroutineScope(Dispatchers.Main).launch {
    val block = async(Dispatchers.IO) {
        // 堵塞的事项
    }
    // 处理其他主线程的业务
    // 此刻有必要需求async的成果时,则可经过await()进行获取
    val result =  block.await()
}

咱们能够经过async办法,在其他线程中处理其他堵塞业务,当主线程有必要要用async的成果的时分,就能够经过await等候,这儿假如成果回来了,则直接获取值,不然就等候async履行完结。这是Coroutine提供给咱们的高档api,能够将使命简单分层而不需求过多的回调处理。

经过startCoroutine与ContinuationInterceptor完结自界说的 async await

咱们能够参阅其他言语的async,或者Dart的异步办法调用,都有相似这种办法进行线程调用

async {
    val result = await {
        suspend 函数
    }
    消费result
}

await在async效果域里边,一起获取到result后再进行消费,async能够直接在一般函数调用,而不需求在协程体内,下面咱们来完结一下这个做法。

首要咱们想要限定await函数只能在async的效果域才干使用,那么首要咱们就要界说出来一个Receiver,咱们能够在Receiver里边界说出自己想要露出的办法

interface AsyncScope {
    fun myFunc(){
    }
}
fun async(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个效果 1.充任receiver 2.completion,接回收调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}
留意这个类,resumeWith 只会跟startCoroutine的这个协程绑定关系,跟await的协程没有关系
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
    Continuation<Unit>, AsyncScope {
    override fun resumeWith(result: Result<Unit>) {
        // 这个是干嘛的 == > 完结的回调
        Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
    }
}

上面咱们界说出来一个async函数,一起界说出来了一个AsyncStub的类,它有两个用途,第一个是为了充任Receiver,用于标准后续的await函数只能在这个Receiver效果域中调用,第二个效果是startCoroutine函数有必要要传入一个参数completion,是为了收到当时协程结束的回调resumeWith中能够得到当时协程体结束回调的信息

await办法里边
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
    // 自界说的Receiver函数
    myFunc()
    Thread{
         切换线程履行await中的办法
        it.resumeWith(Result.success(block()))
    }.start()
}

在await中,其实是一个扩展函数,咱们能够调用任何在AsyncScope中界说的办法,一起这儿咱们模仿了一下线程切换的操作(Dispatcher的完结,这儿不选用Dispatcher便是想让咱们知道其实Dispatcher.IO也是这样完结的),在子线程中调用it.resumeWith(Result.success(block())),用于回来所需求的信息

经过上面定的办法,咱们能够完结

async {
    val result = await {
        suspend 函数
    }
    消费result
}

这种调用办法,可是这儿引来了一个问题,由于咱们在await函数中实践将操作切换到了子线程,咱们想要将消费result的动作切换至主线程怎么办呢?又或者是加入咱们希望获取成果前做一些调整怎么办呢?别急,咱们这儿预留了一个CoroutineContext函数,咱们能够在外部传入一个CoroutineContext

public interface ContinuationInterceptor : CoroutineContext.Element
而CoroutineContext.Element又是承继于CoroutineContext
CoroutineContext.Element:CoroutineContext

而咱们的阻拦器,正是CoroutineContext的子类,咱们把上文的ClassInterceptor修正一下


class ClassInterceptor() : ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
    Continuation<T> by continuation {
    private val handler = Handler(Looper.getMainLooper())
    override fun resumeWith(result: Result<T>) {
        Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
        handler.post {
            continuation.resumeWith(Result.success(自界说内容))
        }
        Log.e("hello", "MyContinuation end ")
    }
}

一起把async默许参数CoroutineContext完结一下即可

fun async(
    context: CoroutineContext = ClassInterceptor(),
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个效果 1.充任receiver 2.completion,接回收调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}

尔后咱们就能够直接经过,完美完结了一个类js协程的调用,一起具备了主动切换线程的能力

async {
    val result = await {
        test()
    }
    Log.e("hello", "result is $result   ${Looper.myLooper() == Looper.getMainLooper()}")
}

成果

  E  start
  E  MyContinuation start kotlin.Unit
  E  MyContinuation end 
  E  end 
  E  履行堵塞函数 test 1923
  E  MyContinuation start 自界说内容数值
  E  MyContinuation end 
  E  result is 自界说内容的数值   true
  E  AsyncStub resumeWith 2 kotlin.Unit

最终,这儿需求留意的是,为什么阻拦器回调了两次,由于咱们async的时分敞开了一个协程,一起await的时分也敞开了一个,因而是两个。AsyncStub只回调了一次,是由于AsyncStub被当作complete参数传入了async敞开的协程block.startCoroutine,因而仅仅async中的协程结束才会被回调。

聊一聊Kotlin协程

本章代码


class ClassInterceptor() : ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
    Continuation<T> by continuation {
    private val handler = Handler(Looper.getMainLooper())
    override fun resumeWith(result: Result<T>) {
        Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
        handler.post {
            continuation.resumeWith(Result.success(6 as T))
        }
        Log.e("hello", "MyContinuation end ")
    }
}
interface AsyncScope {
    fun myFunc(){
    }
}
fun async(
    context: CoroutineContext = ClassInterceptor(),
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个效果 1.充任receiver 2.completion,接回收调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
    Continuation<Unit>, AsyncScope {
    override fun resumeWith(result: Result<Unit>) {
        // 这个是干嘛的 == > 完结的回调
        Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
    }
}
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
    myFunc()
    Thread{
        it.resumeWith(Result.success(block()))
    }.start()
}
模仿堵塞
fun test(): Int {
    Thread.sleep(5000)
    Log.e("hello", "履行堵塞函数 test ${Thread.currentThread().id}")
    return 5
}
async {
    val result = await {
        test()
    }
    Log.e("hello", "result is $result   ${Looper.myLooper() == Looper.getMainLooper()}")
}

最终

咱们经过协程的初级api,完结了一个与官方库不同版本的async await,一起也希望经过对初级api的规划,也能对Coroutine官方库的高档api的完结有必定的了解。

本文正在参加「金石方案 . 分割6万现金大奖」