本系列专栏/column/7090…

前言

前面文章我们介绍了协程的挂起和恢复,同时说了挂起函数的本质。那协程既然是运行在线程上轻量的Task,那我们是否可以操控这个协程呢,这就是本篇内容重点:Job。

正文

前面文章我们启动协程通过launch的时候,会返回一个协程句柄:Job,那这个Job到底是个什么东西呢。

Job

其实使用async启动协程的时候,返回的是Deferred,我们看一下这个Deferred:

public interface Deferred<out T> : Job {
    public suspend fun await(): T
    public val onAwait: SelectClause1<T>
    public fun getCompleted(): T
    public fun getCompletionExceptionOrNull(): Throwable?
}

会发现这个Deferred的本质就是一个Job对象,只不过多了一个泛型参数T以及几个方法,而其中最重要就是await()方法。

而通过Job对象,我们可以做2件事:

  1. 使用Job监测协程的生命周期状态

  2. 使用Job操控协程

打印协程状态

为了更好的打印出线程的生命周期,我们定义了下面函数:

/**
 * 打印Job的状态信息
 */
fun Job.log() {
    logX("""
        isActive = $isActive
        isCancelled = $isCancelled
        isCompleted = $isCompleted
    """.trimIndent())
}
/**
 * 控制台输出带协程信息的log
 */
fun logX(any: Any?) {
    println("""
================================
$any
Thread:${Thread.currentThread().name}
================================""".trimIndent())
}

这里通过Job的扩展函数,分别打印Job的isActive状态即是否活跃状态,Job的isCancelled状态即是否取消状态和Job的isCompleted状态即是否完成状态,我们来使用下面代码:

fun main() = runBlocking {
    val job = launch {
        delay(1000L)
    }
    job.log()
    job.cancel()
    job.log()
    delay(1500L)
}

上面代码运行结果如下:

协程粉碎计划 | Job的生命周期

可以发现刚开始job是活跃的,但是当调用cancel方法时,job的isCancelled状态返回true,这也符合预期。

监测和操控协程

上面代码中的job.log()其实就是监测协程,而job.cancel()就是在操控协程,而这个操控的动作就是取消协程。

看到这里,我们可以类比Java的线程,我们也可以监控线程的状态,和操作线程。

而除了job.cancel()可以操控协程外,还可以使用job.start()来启动协程任务,这就一般和懒加载配合使用:

fun main() = runBlocking {
    //                  变化在这里
    //                      ↓
    val job = launch(start = CoroutineStart.LAZY) {
        logX("Coroutine start!")
        delay(1000L)
    }
    delay(500L)
    job.log()
    job.start()     // 变化在这里
    job.log()
    delay(500L)
    job.cancel()
    delay(500L)
    job.log()
    delay(2000L)
    logX("Process end!")
}

上面代码我们使用懒加载模式来启动协程,虽然已经delay 500ms了,我们可以预计因为没有调用start(),这个job协程其实是没有启动的,我们来看一下打印:

协程粉碎计划 | Job的生命周期

第一个红框内就是在调用start()前的状态,是不活跃的,当调用start()后,协程#2开始运行,但是运行时间是1000ms,当协程#1中再调用cancel()后,协程#2就变成了取消状态。

协程生命周期

通过上面的实验,我们就可以得出协程的生命周期状态图了,大致如下:

协程粉碎计划 | Job的生命周期

这里可以发现当Job是以懒加载的方式创建的,其状态是New,当以非懒加载的方式创建的,其初始状态是Active。

但是仔细看上面代码会发现一个问题,就是当调用job.canced()后,协程#2的isCancelled和isCompleted都是true,这是因为协程认为由于某种原因取消的协程也是一种结束状态,所以流程图中的New、Active、Completing、Calceling、Calcelled这些状态都素Job内部私有的状态。而对外暴露的isCompleted并不是一一对应的,Job内部的COmpleted和Cancelled都会被认为是isCompleted状态。

join()和invokeOnCompletion

说完协程的取消,我们来看看协程正常执行的流程:

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        logX("Coroutine start!")
        delay(1000L)
    }
    delay(500L)
    job.log()
    job.start()
    job.log()
    delay(1100L)    // 1
    job.log()
    delay(2000L)    // 2
    logX("Process end!")
}

这里我们为了能正常让job这个协程执行完毕,我们必须在注释1处delay 1100ms才能正确打印处job的状态,然后再delay 2000ms确保都执行完了,然后打印Process end,但是在实际业务中,我们并不知道job的真实运行时间,假如job的运行时间很长,比如把job中的delay改成10000ms,就会出现Process end已经打印了,但是程序并没有执行完成,因为runBlocking会一直阻塞等待job执行完毕。

这种靠猜的方式肯定不可取,那如果能等待和监听协程的结束事件就好了,这里就可以使用**job.join()和invokeOnCompletion{}**来优化,下面代码:

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        logX("Coroutine start!")
        download()
        logX("Coroutine end!")
    }
    delay(500L)
    job.log()
    job.start()
    job.log()
    job.invokeOnCompletion {
        job.log() // 协程结束以后就会调用这里的代码
    }
    job.join()      // 等待协程执行完毕
    logX("Process end!")
}
suspend fun download() {
    // 模拟下载任务
    val time = (Random.nextDouble() * 1000).toLong()
    logX("Delay time: = $time")
    delay(time)
}

上面代码就比较符合我们正常的业务了,当协程完成时会执行回调,我们来看一下打印:

协程粉碎计划 | Job的生命周期

这个结果很nice,不过你或许对job.invokeOnCompletion好理解,就是一个回调,那job.join()有上面魔力可以让Process end这行代码在job执行完打印呢,这就是挂起函数。

job.join()其实就是一个挂起函数,它的作用是挂起当前的程序执行流程,等待job中的协程任务执行完毕后,然后再恢复当前的程序执行流程。仔细思考一下前面文章说的挂起函数本质是CallBack,就很好理解了。

思维模型

学习抽象的东西,思维模型很关键,而这里Job是协程的句柄,这句话也有个模型,就类似于”遥控器和空调的关系”

  • 空调遥控器可以监测空调的运行状态;Job可以监测协程的运行状态。

  • 空调遥控器可以操控空调的运行状态;Job也可以简单操控协程的运行状态。

Deferred

前面文章我们说了如果想获取启动协程的运行结果可以使用async来启动协程,而且本篇文章前面也说了Deferred其实就Job的子类,而上面的job.join()方法来获取协程运行结果其实用Deferred就可以轻易解决,代码如下:

fun main() = runBlocking {
    val deferred = async {
        logX("Coroutine start!")
        delay(1000L)
        logX("Coroutine end!")
        "Coroutine result!"
    }
    val result = deferred.await()
    println("Result = $result")
    logX("Process end!")
}

这里在外面协程没有调用任何delay或者join方法,但是打印如下:

协程粉碎计划 | Job的生命周期

await()方法会阻塞协程运行流程,其实这时你大概就可以猜想出这个await其实就是一个挂起函数

public suspend fun await(): T

这也就意味着它会把后续的代码给保存起来,当deferred获取到结果时,再进行执行。

下面有个动图,来显示了这几行代码的运行:

协程粉碎计划 | Job的生命周期

而这个动图所显示的原理和前面使用job.join()是一模一样的,在协程执行完毕之前,后面的协程代码都会被暂时挂起的,等到协程执行完毕后,才继续执行。

总结

通过对Job我们可以监控协程的状态以及简单的操控协程,让协程和线程一样也有各种状态,同时我们知道了几个重要方法:

  • cancel()可以取消协程。
  • start()可以配合懒加载启动协程。
  • join()和协程join类似,会等待协程执行完成,这是一个挂起函数。
  • invokeOnCompletion是一个回调,在协程执行完成时回调。
  • Deferrd也是一个Job,其await()函数也是挂起函数,会等待协程执行完成。