Kotlin 协程是什么?

本文仅仅自己通过研究后,对 Kotlin 协程的了解归纳,如有偏差,还请斧正。

简要归纳:

协程是 Kotlin 提供的一套线程 API 框架,能够很便利的做线程切换。 并且在不必关心线程调度的情况下,能轻松的做并发编程。也能够说协程便是一种并发设计形式。

下面是运用传统线程和协程履行使命:

       Thread{
            //履行耗时使命
        }.start()
        val executors = Executors.newCachedThreadPool()
        executors.execute {
          //履行耗时使命
        }
       GlobalScope.launch(Dispatchers.IO) {
          //履行耗时使命
        }

在实际应用开发中,通常是在主线中去启动子线程履行耗时使命,等耗时使命履行完结,再将成果给主线程,然后改写UI:

       Thread{
            //履行耗时使命
            runOnMainThread { 
                //获取耗时使命成果,改写UI
            }
        }.start()
        val executors = Executors.newCachedThreadPool()
        executors.execute {
            //履行耗时使命
            runOnMainThread {
                //获取耗时使命成果,改写UI
            }
        }
        Observable.unsafeCreate<Unit> {
            //履行耗时使命
        }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe {
            //获取耗时使命成果,改写UI
        }
        GlobalScope.launch(Dispatchers.Main) {
            val result = withContext(Dispatchers.IO){
                //履行耗时使命
            }
            //直接拿到耗时使命成果,改写UI
            refreshUI(result)
        }

从上面能够看到,运用Java 的 ThreadExecutors 都需求手动去处理线程切换,这样的代码不仅不高雅,并且有一个重要问题,那便是要去处理与生命周期相关的上下文判别,这导致逻辑变复杂,并且容易犯错。

RxJava 是一套高雅的异步处理框架,代码逻辑简化,可读性和可维护性都很高,很好的帮咱们处理线程切换操作。这在 Java 言语环境开发下,是如虎添翼,可是在 Kotlin 言语环境中开发,现在的协程就比 RxJava 更便利,或许说更有优势。

下面看一个 Kotlin 中运用协程的比如:

        GlobalScope.launch(Dispatchers.Main) {
            Log.d("TestCoroutine", "launch start: ${Thread.currentThread()}")
            val numbersTo50Sum = withContext(Dispatchers.IO) {
                //在子线程中履行 1-50 的自然数和
                Log.d("TestCoroutine", "launch:numbersTo50Sum: ${Thread.currentThread()}")
                delay(1000)
                val naturalNumbers = generateSequence(0) { it + 1 }
                val numbersTo50 = naturalNumbers.takeWhile { it <= 50 }
                numbersTo50.sum()
            }
            val numbers50To100Sum = withContext(Dispatchers.IO) {
               //在子线程中履行 51-100 的自然数和
                Log.d("TestCoroutine", "launch:numbers50To100Sum: ${Thread.currentThread()}")
                delay(1000)
                val naturalNumbers = generateSequence(51) { it + 1 }
                val numbers50To100 = naturalNumbers.takeWhile { it in 51..100 }
                numbers50To100.sum()
            }
            val result = numbersTo50Sum + numbers50To100Sum
            Log.d("TestCoroutine", "launch end:result=$result ${Thread.currentThread()}")
        }
        Log.d("TestCoroutine", "Hello World!,${Thread.currentThread()}")
控制台输出成果:
2023-01-02 16:05:45.846 10153-10153/com.wangjiang.example D/TestCoroutine: Hello World!,Thread[main,5,main]
2023-01-02 16:05:48.058 10153-10153/com.wangjiang.example D/TestCoroutine: launch start: Thread[main,5,main]
2023-01-02 16:05:48.059 10153-10322/com.wangjiang.example D/TestCoroutine: launch:numbersTo50Sum: Thread[DefaultDispatcher-worker-1,5,main]
2023-01-02 16:05:49.114 10153-10322/com.wangjiang.example D/TestCoroutine: launch:numbers50To100Sum: Thread[DefaultDispatcher-worker-1,5,main]
2023-01-02 16:05:50.376 10153-10153/com.wangjiang.example D/TestCoroutine: launch end:result=5050 Thread[main,5,main]

在上面的代码中:

  • launch 是一个函数,用于创立协程并将其函数主体的履行分派给相应的调度程序。
  • Dispatchers.MAIN 指示此协程应在为 UI 操作预留的主线程上履行。
  • Dispatchers.IO 指示此协程应在为 I/O 操作预留的线程上履行。
  • withContext(Dispatchers.IO) 将协程的履行操作移至一个 I/O 线程。

从控制台输出成果中,能够看出在核算 1-50 和 51-100 的自然数和的时候,线程是从主线程(Thread[main,5,main])切换到了协程的线程(DefaultDispatcher-worker-1,5,main),这儿核算 1-50 和 51-100 都是同一个子线程。

在这儿有一个重要的现象,代码从逻辑上看起来是同步的,并且启动协程履行使命的时候,没有堵塞主线程持续履行相关操作,并且在协程中的异步使命履行完结之后,又主动切回了主线程。这便是 Kotlin 协程给开发做并发编程带来的优点。这也是有个概念的来历: Kotlin 协程同步非堵塞

同步非堵塞”是真的“同步非堵塞” 吗?下面探求一下其中的猫腻,通过 Android Studio ,查看 .class 文件中的上面一段代码:

      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int I$0;
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var10000;
            int numbersTo50Sum;
            label17: {
               Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
               Function2 var10001;
               CoroutineContext var6;
               switch(this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  Log.d("TestCoroutine", "launch start: " + Thread.currentThread());
                  var6 = (CoroutineContext)Dispatchers.getIO();
                  var10001 = (Function2)(new Function2((Continuation)null) {
                     int label;
                     @Nullable
                     public final Object invokeSuspend(@NotNull Object $result) {
                        Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                        switch(this.label) {
                        case 0:
                           ResultKt.throwOnFailure($result);
                           Log.d("TestCoroutine", "launch:numbersTo50Sum: " + Thread.currentThread());
                           this.label = 1;
                           if (DelayKt.delay(1000L, this) == var4) {
                              return var4;
                           }
                           break;
                        case 1:
                           ResultKt.throwOnFailure($result);
                           break;
                        default:
                           throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                        }
                        Sequence naturalNumbers = SequencesKt.generateSequence(Boxing.boxInt(0), (Function1)null.INSTANCE);
                        Sequence numbersTo50 = SequencesKt.takeWhile(naturalNumbers, (Function1)null.INSTANCE);
                        return Boxing.boxInt(SequencesKt.sumOfInt(numbersTo50));
                     }
                     @NotNull
                     public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                        Intrinsics.checkNotNullParameter(completion, "completion");
                        Function2 var3 = new <anonymous constructor>(completion);
                        return var3;
                     }
                     public final Object invoke(Object var1, Object var2) {
                        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                     }
                  });
                  this.label = 1;
                  var10000 = BuildersKt.withContext(var6, var10001, this);
                  if (var10000 == var5) {
                     return var5;
                  }
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  var10000 = $result;
                  break;
               case 2:
                  numbersTo50Sum = this.I$0;
                  ResultKt.throwOnFailure($result);
                  var10000 = $result;
                  break label17;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
               }
               numbersTo50Sum = ((Number)var10000).intValue();
               var6 = (CoroutineContext)Dispatchers.getIO();
               var10001 = (Function2)(new Function2((Continuation)null) {
                  int label;
                  @Nullable
                  public final Object invokeSuspend(@NotNull Object $result) {
                     Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                     switch(this.label) {
                     case 0:
                        ResultKt.throwOnFailure($result);
                        Log.d("TestCoroutine", "launch:numbers50To100Sum: " + Thread.currentThread());
                        this.label = 1;
                        if (DelayKt.delay(1000L, this) == var4) {
                           return var4;
                        }
                        break;
                     case 1:
                        ResultKt.throwOnFailure($result);
                        break;
                     default:
                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                     }
                     Sequence naturalNumbers = SequencesKt.generateSequence(Boxing.boxInt(51), (Function1)null.INSTANCE);
                     Sequence numbers50To100 = SequencesKt.takeWhile(naturalNumbers, (Function1)null.INSTANCE);
                     return Boxing.boxInt(SequencesKt.sumOfInt(numbers50To100));
                  }
                  @NotNull
                  public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                     Intrinsics.checkNotNullParameter(completion, "completion");
                     Function2 var3 = new <anonymous constructor>(completion);
                     return var3;
                  }
                  public final Object invoke(Object var1, Object var2) {
                     return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                  }
               });
               this.I$0 = numbersTo50Sum;
               this.label = 2;
               var10000 = BuildersKt.withContext(var6, var10001, this);
               if (var10000 == var5) {
                  return var5;
               }
            }
            int numbers50To100Sum = ((Number)var10000).intValue();
            int result = numbersTo50Sum + numbers50To100Sum;
            Log.d("TestCoroutine", "launch end:result=" + result + ' ' + Thread.currentThread());
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 2, (Object)null);
      Log.d("TestCoroutine", "Hello World!," + Thread.currentThread());

虽然上面 .class 文件中的代码比较复杂,可是从大体逻辑能够看出,Kotlin 协程也是通过回调接口来实现异步操作的,这也解释了 Kotlin 协程仅仅让代码逻辑是同步非堵塞,可是实际上并没有,仅仅 Kotlin 编译器为代码做了许多事情,这也是说 Kotlin 协程其实便是一套线程 API 框架的原因。

再看一个上面比如的变种:

        GlobalScope.launch(Dispatchers.Main) {
            Log.d("TestCoroutine", "launch start: ${Thread.currentThread()}")
            val numbersTo50Sum = async {
                withContext(Dispatchers.IO) {
                    Log.d("TestCoroutine", "launch:numbersTo50Sum: ${Thread.currentThread()}")
                    delay(2000)
                    val naturalNumbers = generateSequence(0) { it + 1 }
                    val numbersTo50 = naturalNumbers.takeWhile { it <= 50 }
                    numbersTo50.sum()
                }
            }
            val numbers50To100Sum = async {
                withContext(Dispatchers.IO) {
                    Log.d("TestCoroutine", "launch:numbers50To100Sum: ${Thread.currentThread()}")
                    delay(500)
                    val naturalNumbers = generateSequence(51) { it + 1 }
                    val numbers50To100 = naturalNumbers.takeWhile { it in 51..100 }
                    numbers50To100.sum()
                }
            }
            // 核算 1-50 和 51-100 的自然数和是两个并发操作
            val result = numbersTo50Sum.await() + numbers50To100Sum.await()
            Log.d("TestCoroutine", "launch end:result=$result ${Thread.currentThread()}")
        }
        Log.d("TestCoroutine", "Hello World!,${Thread.currentThread()}")
  控制台输出成果:
2023-01-02 16:32:12.637 13303-13303/com.wangjiang.example D/TestCoroutine: Hello World!,Thread[main,5,main]
2023-01-02 16:32:13.120 13303-13303/com.wangjiang.example D/TestCoroutine: launch start: Thread[main,5,main]
2023-01-02 16:32:14.852 13303-13444/com.wangjiang.example D/TestCoroutine: launch:numbersTo50Sum: Thread[DefaultDispatcher-worker-2,5,main]
2023-01-02 16:32:14.853 13303-13443/com.wangjiang.example D/TestCoroutine: launch:numbers50To100Sum: Thread[DefaultDispatcher-worker-1,5,main]
2023-01-02 16:32:17.462 13303-13303/com.wangjiang.example D/TestCoroutine: launch end:result=5050 Thread[main,5,main]

async 创立了一个协程,它让核算 1-50 和 51-100 的自然数和是两个并发操作。上面控制台输出成果能够看到核算 1-50 的自然数和是在线程 Thread[DefaultDispatcher-worker-2,5,main] 中,而核算 51-100 的自然数和是在另一个线程Thread[DefaultDispatcher-worker-1,5,main]中。

从上面的比如,协程在异步操作,也便是线程切换上:主线程启动子线程履行耗时操作,耗时操作履行完结将成果更新到主线程的过程中,代码逻辑简化,可读性高。

suspend 是什么?

suspend 直译便是:挂起

suspend 是 Kotlin 言语中一个 关键字,用于润饰办法,当润饰办法时,表示这个办法只能被 suspend 润饰的办法调用或许在协程中被调用。

下面看一下将上面代码事例拆分成几个 suspend 办法:

    fun getNumbersTo100Sum() {
        GlobalScope.launch(Dispatchers.Main) {
            Log.d("TestCoroutine", "launch start: ${Thread.currentThread()}")
            val result = calcNumbers1To100Sum()
            Log.d("TestCoroutine", "launch end:result=$result ${Thread.currentThread()}")
        }
        Log.d("TestCoroutine", "Hello World!,${Thread.currentThread()}")
    }
    private suspend fun calcNumbers1To100Sum(): Int {
        return calcNumbersTo50Sum() + calcNumbers50To100Sum()
    }
    private suspend fun calcNumbersTo50Sum(): Int {
        return withContext(Dispatchers.IO) {
            Log.d("TestCoroutine", "launch:numbersTo50Sum: ${Thread.currentThread()}")
            delay(1000)
            val naturalNumbers = generateSequence(0) { it + 1 }
            val numbersTo50 = naturalNumbers.takeWhile { it <= 50 }
            numbersTo50.sum()
        }
    }
    private suspend fun calcNumbers50To100Sum(): Int {
        return withContext(Dispatchers.IO) {
            Log.d("TestCoroutine", "launch:numbers50To100Sum: ${Thread.currentThread()}")
            delay(1000)
            val naturalNumbers = generateSequence(51) { it + 1 }
            val numbers50To100 = naturalNumbers.takeWhile { it in 51..100 }
            numbers50To100.sum()
        }
    }
控制台输出成果:
2023-01-03 14:47:57.047 11349-11349/com.wangjiang.example D/TestCoroutine: Hello World!,Thread[main,5,main]
2023-01-03 14:47:59.311 11349-11349/com.wangjiang.example D/TestCoroutine: launch start: Thread[main,5,main]
2023-01-03 14:47:59.312 11349-11537/com.wangjiang.example D/TestCoroutine: launch:numbersTo50Sum: Thread[DefaultDispatcher-worker-3,5,main]
2023-01-03 14:48:00.336 11349-11535/com.wangjiang.example D/TestCoroutine: launch:numbers50To100Sum: Thread[DefaultDispatcher-worker-1,5,main]
2023-01-03 14:48:01.339 11349-11349/com.wangjiang.example D/TestCoroutine: launch end:result=5050 Thread[main,5,main]

suspend 关键字符号办法时,其实是告知 Kotlin 从协程内调用办法。所以这个“挂起”,并不是说办法或函数被挂起,也不是说线程被挂起

假设一个非 suspend 润饰的办法调用 suspend 润饰的办法会怎么样呢?

  private fun calcNumbersTo100Sum(): Int {
        return calcNumbersTo50Sum() + calcNumbers50To100Sum()
    }

此刻,编译器会提示:

Suspend function 'calcNumbersTo50Sum' should be called only from a coroutine or another suspend function
Suspend function 'calcNumbers50To100' should be called only from a coroutine or another suspend function

下面查看 .class 文件中的上面办法 calcNumbers50To100Sum 代码:

   private final Object calcNumbers50To100Sum(Continuation $completion) {
      return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("TestCoroutine", "launch:numbers50To100Sum: " + Thread.currentThread());
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var4) {
                  return var4;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            Sequence naturalNumbers = SequencesKt.generateSequence(Boxing.boxInt(51), (Function1)null.INSTANCE);
            Sequence numbers50To100 = SequencesKt.takeWhile(naturalNumbers, (Function1)null.INSTANCE);
            return Boxing.boxInt(SequencesKt.sumOfInt(numbers50To100));
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), $completion);
   }

能够看到 private suspend fun calcNumbers50To100Sum() 通过 Kotlin 编译器编译后变成了private final Object calcNumbers50To100Sum(Continuation $completion)suspend 消失了,办法多了一个参数 Continuation $completion,所以 suspend润饰 Kotlin 的办法或函数,编译器会对此办法做特殊处理。

别的,suspend 润饰的办法,也预示着这个办法是耗时办法,告知办法调用者要运用协程。当履行 suspend 办法,也预示着要切换线程,此刻主线程仍然能够持续履行,而协程里面的代码可能被挂起了。

下面再稍为修改 calcNumbers50To100Sum 办法:

   private suspend fun calcNumbers50To100Sum(): Int {
        Log.d("TestCoroutine", "launch:numbers50To100Sum:start: ${Thread.currentThread()}")
        val sum= withContext(Dispatchers.Main) {
            Log.d("TestCoroutine", "launch:numbers50To100Sum: ${Thread.currentThread()}")
            delay(1000)
            val naturalNumbers = generateSequence(51) { it + 1 }
            val numbers50To100 = naturalNumbers.takeWhile { it in 51..100 }
            numbers50To100.sum()
        }
        Log.d("TestCoroutine", "launch:numbers50To100Sum:end: ${Thread.currentThread()}")
        return sum
    }
控制台输出成果:
2023-01-03 15:28:04.349 15131-15131/com.bilibili.studio D/TestCoroutine: Hello World!,Thread[main,5,main]
2023-01-03 15:28:04.803 15131-15131/com.bilibili.studio D/TestCoroutine: launch start: Thread[main,5,main]
2023-01-03 15:28:04.804 15131-15266/com.bilibili.studio D/TestCoroutine: launch:numbersTo50Sum: Thread[DefaultDispatcher-worker-3,5,main]
2023-01-03 15:28:06.695 15131-15131/com.bilibili.studio D/TestCoroutine: launch:numbers50To100Sum:start: Thread[main,5,main]
2023-01-03 15:28:06.696 15131-15131/com.bilibili.studio D/TestCoroutine: launch:numbers50To100Sum: Thread[main,5,main]
2023-01-03 15:28:07.700 15131-15131/com.bilibili.studio D/TestCoroutine: launch:numbers50To100Sum:end: Thread[main,5,main]
2023-01-03 15:28:07.700 15131-15131/com.bilibili.studio D/TestCoroutine: launch end:result=5050 Thread[main,5,main]

主线程不受协程线程的影响。

总结

Kotlin 协程是一套线程 API 框架,在 Kotlin 言语环境下运用它做并发编程比传统 Thread, Executors 和 RxJava 更有优势,代码逻辑上“同步非堵塞“,并且简洁,易阅读和维护。

suspend 是 Kotlin 言语中一个关键字,用于润饰办法,当润饰办法时,该办法只能被 suspend 润饰的办法和协程调用。此刻,也预示着该办法是一个耗时办法,告知调用者需求在协程中运用。

参考文档:

  1. Android 上的 Kotlin 协程
  2. Coroutines guide

下一篇,将研究 Kotlin Flow。