前言

自从jetbrains公司提出Kotlin协程用来处理异步线程问题,而且衍生出来了Flow作为呼应式结构,引来了很多Android开发者的喜爱;而现在比较稳定的呼应式库当属Rxjava,这样以来意图就很明显了,旨在用Kotlin协程来逐渐代替掉Rxjava;

仔细考虑下,真的可以彻底代替掉Rxjava么,它的复杂性和多样化的操作符,而协程的许多API仍然是实验性的,现在为止,跟着kt不断地进行版别迭代,越来越趋于稳定,对此我不能妄下断语;当然Rxjava无疑也是一个十分优秀的结构,值得咱们不断深入考虑,可是跟着协程的呈现,就个人而言我会更喜爱运用协程来作为满意日常开发的异步处理方案。

协程的本质和Rxjava是天壤之别的,所以直接拿它们进行比照是比较棘手的;换一种思路,本文咱们从日常开发中的异步问题动身,分别调查协程与Rxjava是如何供给相应的处理方案,依次来进行比对,讨论下 Kotlin协程是否真的足以取代Rxjava 这个论题吧

流类型的比较

现在咱们来看下Rxjava供给的流类型有哪些,咱们可以运用的根本流类型操作符如下图所示

日常思考,目前Kotlin协程能完全取代Rxjava吗

它们的根本完结鄙人文会提及到,这儿咱们简略来评论下在协程中是怎样界说这些流操作符的

  • Single<T>其实便是一个回来不可空值的suspend函数

  • Maybe<T>恰好相反,是一个回来可空的supspend函数

  • Completable不会发送事情,所以在协程中便是一个不回来任何东西的简略挂起函数

  • 关于ObservableFlowable,两者都可以发射多个事情,不同在于前者是没有背压管理的,后者才有,而他们在协程中咱们可以直接运用Flow来完结,在异步数据流中按顺序宣布值,所以只需求一个回来当时Data数据类型的Flow<T>

    值得留意的是,该函数自身是不需求supsend修饰符的,因为Flow是冷流,在进行搜集\订阅之前是不会发射数据,只要在collect的时分才需求协程作用域中履行。为什么说Flow足以代替ObservableFlowable原因在与它处理背压(backpressure)的办法。这自然而然来源于协程中的设计与理念,不需求一些奇妙设计的处理方案来处理显示背压,Flow中一切Api根本上都带有suspend修复符,它也成为了处理背压的要害先生。其意图便是在不堵塞线程的情况下暂停调用者的履行,因而,当Flow<T>在同一个协程中发射和搜集的时分,假如搜集器跟不上数据流,它可以简略地暂停元素的发射,直到它准备好接纳更多。

流类型比较的根本完结

好的小伙伴们,上文咱们简略用协程写出Rxjava的几个根本流类型,现在让咱们用几个具体的实例来看看他们的不同之处吧

Completable —- 异步使命完结没有成果,或许会抛出错误

Rxjava中,咱们运用Completable.create去创立,里边的CompletableEmitter中有onComplete表明完结的办法和一个onError传递反常的办法,如下代码所示

//completable in Rxjava
  fun completableRequest(): Completable {
    return Completable.create { emitter->
      try {
        emitter.onComplete()
       }catch (e:Exception) {
        emitter.onError(e)
       }
     }
   }
  fun main() {
    completableRequest()
       .subscribe {
        println("I,am done")
        println()
       }
   }

在协程傍边,咱们对应的便是调用一个不回来任何内容的挂起函数(returns Unit),就类似于咱们调用一个一般函数相同

 fun completableCoroutine() = runBlocking {
    try {
      delay(500L)
      println("I am done")
     } catch (e: Exception) {
      println("Got an exception")
     }
   }

留意不要在出产环境代码运用runBlocking,你应该有一个适宜的CoroutineScope,因为是测验代码本文都将运用runBlocking来辅助阐明测验场景

Single —- 必须回来或抛出错误的异步使命

RxJava 中,咱们运用一个Single ,它里边有一个onSuccess传递回来值的办法和一个onError传递反常的办法。

```kotlin
/**
 * Single in RxJava
 */
fun main() {
  singleResult()
     .subscribe(
       { result -> println(result) },
       { println("Got an exception") }
     )
}
fun singleResult(): Single<String> {
  return Single.create { emitter ->
    try {
      // process a request
      emitter.onSuccess("Some result")
     } catch (e: Exception) {
      emitter.onError(e)
     }
   }
​
```

而在协程中,咱们调用一个回来非空值的挂起函数:

/**
 * Single equivalent in coroutines
 */
fun main() = runBlocking {
  try {
    val result = getResult()
    println(result)
   } catch (e: Exception) {
    println("Got an exception")
   }
}
suspend fun getResult(): String {
  // process a request
  delay(100)
  return "Some result"
}
Maybe — 或许回来成果或抛出错误的异步使命

RxJava 中,咱们运用一个Maybe. 它里边有一个onSuccess传递回来值的办法onComplete,一个在没有值的情况下宣布完结信号的办法,以及一个onError传递反常的办法。

/**
 * Maybe in RxJava
 */
fun main() {
  maybeResult()
     .subscribe(
       { result -> println(result) },
       { println("Got an exception") },
       { println("Completed without a value!") }
     )
}
fun maybeResult(): Maybe<String> {
  return Maybe.create { emitter ->
    try {
      // process a request
      if (Random.nextBoolean()) {
        emitter.onSuccess("Some value")
       } else {
        emitter.onComplete()
       }
     } catch (e: Exception) {
      emitter.onError(e)
     }
   }
}

在协程中,咱们调用一个回来可空值得挂起函数

/**
 * Maybe equivalent in coroutines
 */
fun main() = runBlocking {
  try {
    val result = getNullableResult()
    if (result != null) {
      println(result)
     } else {
      println("Completed without a value!")
     }
   } catch (e: Exception) {
    println("Got an exception")
   }
}
suspend fun getNullableResult(): String? {
  // process a request
  delay(100)
  return if (Random.nextBoolean()) {
    "Some value"
   } else {
    null
   }
}
0..N事情的异步流

因为在Rxjava中,FlowableObservable都是归于0..N事情的异步流,可是Observable几乎没有做相应的背压管理,所以这儿咱们首要以Flowable为比如,onNext宣布下一个流值的办法,一个onComplete表明流完结的办法,以及一个onError传递反常的办法。

/**
 * Flowable in RxJava
 */
fun main() {
  flowableValues()
     .subscribe(
       { value -> println(value) },
       { println("Got an exception") },
       { println("I'm done") }
     )
}
fun flowableValues(): Flowable<Int> {
  val flowableEmitter = { emitter: FlowableEmitter<Int> ->
    try {
      for (i in 1..10) {
        emitter.onNext(i)
       }
     } catch (e: Exception) {
      emitter.onError(e)
     } finally {
      emitter.onComplete()
     }
   }
  return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}

在协程中,咱们只是创立一个Flow就可以完结这个办法

/**
 * Flow in Kotlin
 */
fun main() = runBlocking {
  try {
    eventFlow().collect { value ->
      println(value)
     }
    println("I'm done")
   } catch (e: Exception) {
    println("Got an exception")
   }
}
fun eventFlow() = flow {
  for (i in 1..10) {
    emit(i)
   }
}

在惯用的 Kotlin 中,创立上述流程的办法之一是:fun eventFlow() = (1..10).asFlow()

如上面这些代码所见,咱们根本可以运用协程涵盖Rxjava一切的首要根本用法,此外,协程的设计允许咱们运用一切规范的Kotlin功用编写典型的顺序代码 ,它还消除了对onCompleteonError回调的需求。咱们可以像在一般代码中那样捕获错误或设置协程反常处理程序。而且,考虑到当挂起函数完结时,协程持续按顺序履行,咱们可以鄙人一行持续编写咱们的“完结逻辑”。

值得留意的是,当咱们进行调用collect搜集的时分也是如此,在搜集完一切元素后才会履行下一行代码

eventFlow().collect { value ->
  println(value)
}
println("I'm done")

Flow搜集完一切元素后,才会调用打印I'm done

操作符的比较

总所周知,Rxjava的首要优势在于它拥有十分多的操作符,根本上可以应对日常开发中呈现的各种情况,因为它品种特别繁多又比较难记忆,这儿我只简略举些常见的操作符进行比较

COMPLETABLE,SINGLE, MAYBE

这儿需求着重的是,在RxjavaCompletable,SingleMaybe都有许多相同的操作符,然而在协程中任何类型的操作符其实都是多余的,咱们以Single中的map()简略操作符为例来看下:

/**
 * Maps Single<String> to
 * Single<User> synchronously
 */
fun main() {
  getUsername()
     .map { username ->
      User(username)
     }
     .subscribe(
       { user -> println(user) },
       { println("Got an exception") }
     )
}

map作为Rxjava中最常用的操作符,获取一个值并将其转化为另一个值,可是在协程中咱们不需求.map()操作符就可以完结这种操作

fun main() = runBlocking {
  try {
    val username = getUsername() // suspend fun
    val user = User(username)
    println(user)
   } catch (e: Exception) {
    println("Got an exception")
   }
}

运用suspend挂起函数可以挂起当时函数,当履行完毕后在按顺序履行接下来的代码

Flow操作符与Rxjava操作符

现在让咱们看看Flow中有哪些操作符,它们与Rxjava相比有什么不同,因为篇幅原因,这儿我简略比较下日常开发中最常用的操作符

map()

关于map操作符,Flow中也具有相同的操作符

/**
 * Maps Flow<String> to Flow<User>
 */
fun main() = runBlocking {
  usernameFlow()
     .map { username ->
      User(username)
     }
     .collect { user ->
      println(user)
     }
}

Flow中的map操作符 适当于Rxjava做了必定的简化处理,这是它的一个首要优势,可以看下它的源码

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
  collect { value -> emit(transform(value)) }
}

是不是十分简略,只是从头创立一个新的flow,它从从上游搜集值transform并在当时函数应用后宣布这些值;事实上大多数Flow的操作符都是这样作业的,不需求遵循严格的协议;关于大多数应用场景,规范Flow操作符就已经足够了,当然编写自界说操作符也是十分简略容易的;相关于Rxjava,假如想要编写自界说操作符,你必须十分了解Rxjava

Reactive Streams协议

flatmap()

另外,在Rxjava中咱们经常运用的操作符还有flatmap(),同时还有很多种变体,例如.flatMapSingle()flatMapObservable(),flatMapIterable()等,简略来说,在Rxjava中咱们假如需求对一个值进行同步转化,就运用map,进行异步转化的时分就需求运用flatMap();对此,Flow进行同步或者异步转化的时分不需求不同的操作符,只是运用map就足够了,因为它们都有supsend挂起函数进行修饰,不必担心同步性

可以看下在Rxjava中的示例

fun compareFlatMap() {
  getUsernames() //Flowable<String>
     .flatMapSingle { username ->
      getUserFromNetwork(username) // Single<User>
     }
     .subscribe(
       { user -> println(user) },
       { println("Got an exception") }
     )
}

好的,咱们运用Flow来转化下上述的这一段代码,只需求运用map就可以以任何办法进行转化值,如下代码所示:

  runBlocking {
    flow {
      emit(User("Jacky"))
     }.map {
      getUserFromName(it) //suspend
     }.collect {
      println(it)
     }
   }
​
  suspend fun getUserFromName(user: User): String {
    return user.userName
   }

实际上运用Flow中的map操作符,就可以将上游流宣布的值转化为新流,然后将一切流扁平化为一个,这和flatMap的功用几乎可以到达相同的作用

filter()

关于filter操作符,咱们在Rxjava中并没有直接的办法进行异步过滤,这需求咱们自己编写代码来进行过滤判别,如下所示

fun getUsernames(): Flowable<String> {
  val flowableEmitter = { emitter: FlowableEmitter<String> ->
    emitter.onNext("Jacky")
   }
  return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}
​
fun isCorrectUserName(userName: String): Single<Boolean> {
  return Single.create { emitter ->
    runCatching {
      //姓名判别....
      if (userName.isNotEmpty()) {
        emitter.onSuccess(true)
       } else {
        emitter.onSuccess(false)
       }
     }.onFailure {
      emitter.onError(it)
     }
   }
}
​
fun compareFilter() {
  getUsernames()//Flowable<String>
     .flatMapSingle { userName ->
      isCorrectUserName(userName)
         .flatMap { isCorrect ->
          if (isCorrect) {
            Single.just(userName)
           } else {
            Single.never()
           }
         }
     }.subscribe {
      println(it)
     }
​
}

乍一看,是不是感觉有点麻烦,事实上这的确需求咱们运用些小手法才能到达意图;而在Flow中,咱们可以轻松地根据同步和异步调用过滤流

runBlocking {
    userNameFlow().filter { user ->
      isCorrectName(user.userName)
     }.collect { user->
      println(user)
     }
   }
​
suspend fun isCorrectName(userName: String): Boolean {
  return userName.isNotEmpty()
}
​

结语

因为篇幅原因,Rxjava和协程都是一个十分巨大的考虑论题,它们之间的不同比较可以永久进行下去;事实上,在Kotlin协程被广泛运用之前,Rxjava作为项目中首要的异步处理方案,以至于到现在作业上还有很多项目用着Rxjava, 所以即便切换到Kotlin协程之后,还有适当长一段时间还在用着Rxjava;这并不代表Rxjava不够好,而是协程让代码变得更易读,更易于运用;

暂时先告一段落了,事实上证明协程的确可以满意咱们日常开发的首要需求,下次将会对Rxjava中的背压和之前所评论的Flow背压问题进行比较讨论,还有十分多的东西要学,共勉!!!!

本文首要内容译至 -> www.javaadvent.com/2021/12/are…