本文涉及源码基于kotlinx-coroutines-core-jvm:1.7.1

kotlin 协成系列文章:

你真的了解kotlin中协程的suspendCoroutine原理吗?

Kotlin Channel系列(一)之读懂Channel每一行源码

kotlin Flow系列之-SharedFlow源码解析

kotlin Flow系列之-StateFlow源码解析

Kotlin Flow系列之-ChannelFlow源码解析之 -操作符 buffer & fuse & flowOn线程切换

<<重视微信公众号”皮克桃在写代码“学习更多常识>>

目标

经过本文的学习你将掌握以下常识:

  1. 对冷流SafeFlow的根本运用
  2. 从经过flow{}函数创立,到collect整个调用流程
  3. SafeFlow中的Safe是什么意思?这儿的安满是指的什么方面的安全

一,初识SafeFlow

一个简略的Flow实例:

flow<Int> {
    emit(1)
   }.collect{
    println(1)
   }

flow()函数:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

flow()函数接收一个带接收者FlowCollectorsuspend Function. 回来一个SafeFlow

//FlowCollector
public fun interface FlowCollector<in T> {
  public suspend fun emit(value: T)
}
​
//SafeFlow
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
  override suspend fun collectSafely(collector: FlowCollector<T>) {
    collector.block()
   }
}

SafeFlow承继了AbstractFlow:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
​
  public final override suspend fun collect(collector: FlowCollector<T>) {
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
      collectSafely(safeCollector)
     } finally {
      safeCollector.releaseIntercepted()
     }
   }
​
  public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
​

AbstractFlow完成了两个接口FlowCancellableFlow<out T>

public interface Flow<out T> {
  public suspend fun collect(collector: FlowCollector<T>)
}
​
internal interface CancellableFlow<out T> : Flow<T>

他们之间的联系类图:

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?

Flow的思维很简略,便是一个流,分为上游和下流,上游数据出产者,下流数据顾客。上游生成数据,然后把数据交给下流。假如把Flow比作上游,那么FlowCollector便是下流。Flow自己没有出产数据的功能,真实生成数据的是调用flow()函数时传进去的Function : block: suspend FlowCollector<T>.() -> Unit

调用上游Flowcollect办法传入一个FlowColletor目标,这样就把下流和上游对接上,上游开端作业。

Step1:当咱们调用flow()办法,并传入一个Function后,就回来了一个Flow的子类SafeFlow目标,SafeFlow持有Function的引证,这样Flow就能够经过Function来生成数据了。到此完成了上游的创立。

val upStream = flow{
 emit(1)
} 
​
//上面的代码拆开来看,//1先界说一个suspend 的Function类型,真实出产数据的是它
val realProducer : suspend FlowCollector<Int>.() -> Unit = {flowCollector ->
        //生成数据
        val data = 1
    //把数据交给下流                        
    flowCollector.emit(data)
}
​
//2.调用flow函数,传入function,获得了一个Flow目标。
val upStream = flow<Int>(realProducer)
​

Step2:调用Flowcollect()函数,传入一个FlowCollector打通上下流。

upStream.collect{
 println(it)
}
​
//代码拆开//1.创立一个FlowCollector作为下流顾客
val downStream = object : FlowCollector<Int>{
  override suspend fun emit(value: Int) {
      println(value)
   }
}
//2.经过Flow的collect办法打通上下流
upStream.collect(downStream)

二,SafeFlwo创立以及collect调用流程

经过根本认识中,咱们知道了什么是上游,下流,以及怎么把上下流打通。Flow是一个冷流,也能够叫做懒流,其意思便是Flow只要在运用它的时分才会作业。也便是说只调用了Flow.collect()函数,Flow才开端出产作业(创立Flow时传入的Function才开端履行)。上游为每一个下流顾客单独发动(履行Function的的代码)接着上面的代码。咱们进入源码持续分析:

Step1:flow<Int>(realProducer) 经过flow函数传入名为realProducer的Function创立一个流:

//直接实例化了一个SafeFlow目标回来,并把realProducer作为SafeFlow结构函数参数传入
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

发现flow{}函数回来了一个SafeFlow目标:

//block作为SafeFlow的私有特点。这儿的block 便是是realProducer
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
  override suspend fun collectSafely(collector: FlowCollector<T>) {
    collector.block()
   }
}

在实例化SafeFlow目标时,realProducer作为结构函数参数传给了SafeFlow,被SafeFlow的block特点持有。这样就适当与SafeFlow持有了realProducer的引证。realProducer作为一个Function,具有出产数据的能力,也便是说SafeFlow就能够调用realProducer来出产数据了,这就意味着一个具有出产能力的上游被结构好了。

Step2: 调用upStream.collect(collector),打通上下流,upStream 实际上是一个SafeFlowSafeFlow代码中并没有重写collect()办法,持续往上找。找到父类AbstractFlow。发现其重写了collect()办法:

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
 
    //所以upStream.collect(collector)真实调用到了此处,参数便是咱们传进来的downStream
  public final override suspend fun collect(collector: FlowCollector<T>) {
      //downStream作为结构函数参数被SafeCollector持有
      //这儿为什么要用SafeCollector?在西面”何为Safe“小结里边会解说。
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        //step3:接通上下流
      collectSafely(safeCollector)
     } finally {
      safeCollector.releaseIntercepted()
     }
   }
  public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

在collect()办法中,发现传进去的downStream目标并没有直接和上游对接,而是被进行了一次包装或许署理,变成了一个SafeCollector目标。由SafeCollector目标和上游Flow对接。Flow中生成的数据交给了SafeCollector。然后在由SafeCollector交给咱们下流也便是downStreamd目标。SafeCollector是FlowCollector的子类,一起它内部有一个colletor的特点,持有了咱们下流的引证:

internal actual class SafeCollector<T> actual constructor(
  @JvmField internal actual val collector: FlowCollector<T>, //指向下流
  @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
 
 
}

到此真实的下流downStream被包装成了一个SafeCollector目标。

Step3:collectSafely(safeCollector)把包装好的下流目标和上游对接上。collectSafely()办法在SafeFlow中被重写:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    //collectSafely(safeCollector)办法调用了此处,参数为包装后的下流
  override suspend fun collectSafely(collector: FlowCollector<T>) {
      //此处的block便是realProducer。负责出产数据。
      //此处的colector便是包装后的SafeCollector.
      //开端调用block指向的Function,
    collector.block()//适当于 block(collector)
      
   }
}

包装后的SafeCollector作为参数传给了Flow中真实用来出产数据的Function。这样Function内部就能够经过调用SafeCollector的emit办法把数据交给SafeCollector:

val realProducer : suspend FlowCollector<Int>.() -> Unit = { flowCollector ->
        //生成数据
        val data = 1
    //flowCollector实际上便是SafeCollector                       
    flowCollector.emit(data)
}

Step4:flowCollector.emit(data) 出产数据的Function。把数据生成好后,交给了SafeCollector目标:

internal actual class SafeCollector<T> actual constructor(
  @JvmField internal actual val collector: FlowCollector<T>,//指向真实的下流目标
  @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
 
​
    //flowCollector.emit(data)调用该办法。
  override suspend fun emit(value: T) {
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
      try {
          //持续调用,先不用管协第一个参数,
        emit(uCont, value)
       } catch (e: Throwable) {
        lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
        throw e
       }
     }
   } 
    //持续调用了此处
  private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context
    currentContext.ensureActive()
    
    val previousContext = lastEmissionContext
    if (previousContext !== currentContext) {
      checkContext(currentContext, previousContext, value)
      lastEmissionContext = currentContext
     }
    completion = uCont
      //适当于调用了下流目标的emit办法。collector指向的便是下流真实的目标。
    val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
   
    if (result != COROUTINE_SUSPENDED) {
      completion = null
     }
    return result
   }
}
//emitFun是FlowCollctor的emit办法的一个引证。
private val emitFun =
  FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun 指向了真实的下流的emit办法:

val downStream = object : FlowCollector<Int>{
 
    //emitFun履行了该emit办法
  override suspend fun emit(value: Int) {
   //下流消费上游的数据。
      println(value)
   }
}

到此一个最根底版的Flow的上游下流的创立,出产消费过程就完成了。

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?

三,何为Safe

在官方的文档中对完成Flow子类(cold flow)提出了两点束缚:

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?

关于这两点的详细解说,感兴趣的能够自行车查阅官网文档:

第一点 Context preservation:

意思便是说上游发送和下流接收(直接下流)只能运行在同一个CoroutineContext(协成上下文)里边。fowOn是仅有能够切换上游下流在不同context的方法。

al myFlow = flow {
  // GlobalScope.launch { // 制止
  // launch(Dispatchers.IO) { // 制止
  // withContext(CoroutineName("myFlow")) { // 制止
  emit(1) // OK
  coroutineScope { //这个适当于创立了一个作用于,相同的supervisorScope也能够
    emit(2) // OK -- 能够,仍然是同一个协成
  }
}
​

简而言之,flow{}里边的emit有必要是次序发送的,不能并发发送。假如想要完成上下流在不同context里边能够运用channelFlow来创立流。

留意,这儿指的上游和下流在同一个协成上下文里边。在kotlin里边,不要把协成和线程混为一谈,比方:

suspend fun main() {
  val flow = flow<Int> {
    log("emit 1")
    emit(1)
    log("emit 2")
    emit(2)
   }
  flow.collect {
    log("receive $it")
    delay(1000)
   }
  Thread.sleep(1000000)
}
​
//
20:35:45:018[ main ] emit 1
20:35:45:051[ main ] receive 1
20:35:46:071[ kotlinx.coroutines.DefaultExecutor ] emit 2
20:35:46:071[ kotlinx.coroutines.DefaultExecutor ] receive 2

上面代码collect 和 emit 运行的协成的context 为 EmptyCouroutinecContext,emit(1) 和emit(2) 不在同一个线程。

第二点 Exception transparency:

当emit或许emitAll产生反常(也便是下流处理数据时产生反常),上游有必要中止持续发送新的数据,并以反常完毕。流的完成永远不要捕获或许处理下流流中产生的反常。 从完成的视点来看,这意味着永远不要将对 emit 和 emitAll 的调用包裹在 try {…} catch {…} 块中。 流中的反常处理应由 catch 运算符履行,并且旨在仅捕获来自上游流的反常,而传递一切下流反常。 类似地,终端运算符如 collect 会抛出在其代码或上游流中产生的任何未处理的反常,例如:

flow { emitData() }
   .map { computeOne(it) }
   .catch { ... } // 能捕获上游 computeOne 和emitData里边的溢出
   .map { computeTwo(it) }
   .collect { process(it) } //抛出未被处理的溢出,比方process 和 computeTow中产生了未捕获的溢出。

在前面咱们知道了flow{}函数回来了一个SafeFlow,一个安全的流,安满是指的哪方面安全?是线程安全吗?答案不是的。这儿的安全有两点,第一是指emit数据发送次序的安全(制止并发emit),第二下流产生了反常,即便在上游try catch了,也不会持续emit。假如你写的代码让emit和collect不是在同一个coroutine里边,那便是不安全的,就会抛反常。比方说:

  1. 次序发送,制止并发送
//加入说代码能够这样写,那就会存在下流先收到数据3。实际上这样写了会抛出一个反常:
val flow<Int>{
 emit(1)
 launch(Dispatchers.IO){
  //TODO 做一些复杂的计算,需求销号必定的时刻
  emit(2)
  }
 emit(3)
}

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?

这个反常信息里边也说的比较了解

Flow invariant is violated: //违反了流的不可改变的特性
//查看到了emit在其他协成被调用(意思便是不是collect函数地点的协成)
Emission from another coroutine is detected.
//emit被调用在StandaloneCoroutine这个Job里边。emit希望的是调用在BlockingCoroutine这个Job里边。
//launch发动的协成的Job是StandaloneCoroutine
//collec函数运行在runBlockiing代码块里边,因而Job为BlockingCoroutine
//在抛出反常的代码里边是经过 === 来比较两个Job的,所以你就算把collect调用写在相同的launch也是不可的。
//比方上面代码在collect调用的当地也加一个launch。这样Job也是StandaloneCoroutine,可是他们是不是同一个目标
//虽然是同一个类。两个Job的 === 为false.
Child of StandaloneCoroutine{Active}@4dd11322, expected child of BlockingCoroutine{Active}@2302b8ed.
​
​
//FlowCollector不是线程安全的,并且制止并发emit。上面emit(2)和emit(3)就适当于并发履行
FlowCollector is not thread-safe and concurrent emissions are prohibited.
//要想不受此限制,请运用“channelFlow”构建器而不是“flow”。
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
​
。

Tips: 关于”channelFlow“的相关解说,不出意外下一篇博客便是它了。感兴趣的先重视,第一时刻收到告诉,不过想搞搞懂channelFlow函数的条件是先要搞懂ChannelFlow这个类。关于ChannelFlow的解说,我已经发布了,大家能够自行查看学习,连接在文章顶部。

  1. 下流产生反常,上游try catch后,也不能持续发送。
suspend fun main() {
  val flow = flow<Int> {
    emit(1)
    try {
      emit(2)
     } catch (e: Exception) {
      println(e)
     }
    println("我还能持续履行,可是不能emit")
    emit(3) //会抛出反常
   }
  flow.collect {
    println(it)
    if (it == 2) {
      throw NullPointerException("ddd")
     }
   }
​
  Unit
}

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?

输出的溢出信息仍是比较直观简单了解的,接不做翻译解说了。

知道了Safe是何物后,那SafeFlow是经过什么来完成的呢?从前第二节的调用流程里边,咱们知道调用SafeFlow的collect函数后,会把咱们下流的FlowCollector包装进一个SafeCollector。这个SafeCollector便是用来确保”safe“的要害。

SafeCollector源码

SafeCollector不光完成了FlowCollector接口,相同它还承继了ContinuationImpl,至于CoroutineStackFrame这个接口里边的函数编译器会完成,用来在debugger时获取一个stack frame,咱们不用去关怀它,对咱们来首CoroutineStackFrame没啥用。

SafeCollector为什么要完成ContinuationImpl呢?

internal actual class SafeCollector<T> actual constructor(
    //这使咱们下流的collector
  @JvmField internal actual val collector: FlowCollector<T>,
    //这是咱们下流调用SafeFlow collect函数地点的协成的context
  @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
​
  override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
  override fun getStackTraceElement(): StackTraceElement? = null@JvmField //计算context中包括的Element个数,是为了后边比照emit和collect两个调用的context是否一样。
  internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
​
  //用来保存上一次emit的context(每一次emit后就会把context保存在该特点中),或许假如下流处理数据产生反常了
    //就把emit调用的的context包装成一个DownstreamExceptionContext保存在特点中。
  private var lastEmissionContext: CoroutineContext? = null
  // 上游的续体,由于flow{}里边的代码块会封装成一个续体(ContinuationImpl的子类)。当在flow{}里边
    //被挂起后,调用completion.resume后能接着持续下一次emit。
  private var completion: Continuation<Unit>? = null/*
   * This property is accessed in two places:
   * * ContinuationImpl invokes this in its `releaseIntercepted` as `context[ContinuationInterceptor]!!`
   * * When we are within a callee, it is used to create its continuation object with this collector as completion
   */
    /**
     * 这个特点会在两个当地被调用:
     * 1. ContinuationImpl中调用 releaseIntercepted的时分在里边会context[ContinuationInterceptor]!!
     * 2. 当SafeCollector作为一个被调用者时分,这个context作为创立continuation的completion.
     *  比方说下流collect{} 里边调用了其他挂起函数,比方delay。在delay中就会创立一个SafeContinuation,
     *  SafeContinuation的结构函数参数需求调用 SafeCollector.intercepted()。在intercepted()里边
     *  就会调用SafeCollector的context[ContinuationInterceptor].interceptContinuation()。
     *  正由于这样,delay挂起康复后,康复后的代码才干持续履行在本来的协成的context环境里边。
     */
  override val context: CoroutineContext
    get() = lastEmissionContext ?: EmptyCoroutineContext
 
 
    //SafeCollector作为ContinuationImpl的子类,当flow{}里边代用SafeCollector的emit函数,SafeCollector
    //的emit里边持续调用下流collector的emit函数,假如下流collector的emit(也便是colelct代码块)被挂起了,那么
  //flow{}里边的emit也被挂起了(SafeCollector的emit被挂起了),当下流collect{}里边挂起被康复后,履行完colect
 //代码块后就调用SafeCollector的resume函数,在BaseContinuationImpl的resumeWith里边会调用SafeCollector的
 //invokeSupend,在其里边调用completion.resume就能上flow{}里边持续emit。
  override fun invokeSuspend(result: Result<Any?>): Any {
      //假如下流collect{}里边产生了反常(这个反常必定是被try catch了,不然直接就泡出来了,也不会走到这儿)
      //调用了con.rresumeWithException(),这儿con便是SafeCollector,那么result里边就会包括反常信息。
      //result.onFailure里边就会把反常信息和context封装到DownstreamExceptionContext里边然后赋值
      //lastEmissionContext。这样上游再想持续emit时就会抛出反常。在SafeCollector的emit函数里边
      //回去查看lastEmissionContext,假如lastEmissionContext是DownstreamExceptionContext
      //就会抛出反常。
    result.onFailure { lastEmissionContext = DownstreamExceptionContext(it, context) }
      //康复flow{}里边的挂起,持续emit
    completion?.resumeWith(result as Result<Unit>)
      //回来COROUTINE_SUSPENDED是为了让BaseContinuationImpl的resumeWith里边调用了invokeSuspend后
      //就直接return掉。也能够说让是SafeCollector这个ContinuationImpl的子类挂起了,假如不回来
      //回来COROUTINE_SUSPENDED,在BaseContinuationImpl的resumeWith里边就会持续履行releaseIntercepted
      //函数。
      //SafeCollector这个ContinuationImpl的子类,那他也是一个协成。它只要在Flow履行完成后,这个协成
      //才干完毕,调用releaseIntercepted。因而假如此处不回来COROUTINE_SUSPENDED就会导致Flow下一次emit挂起
      //被康复后调用BaseContinuationImpl的resumeWith->releaseIntercepted 或许Flow履行完成后
      //在调用releaseIntercepted的时分就会抛出反常。
   
    return COROUTINE_SUSPENDED
   }
​
  // 每一个协成履行完后都需求releaseIntercepted
  public actual override fun releaseIntercepted() {
    super.releaseIntercepted()
   }
​
    /**
     * emit 的回来值为suspendCoroutineUninterceptedOrReturn的回来值。
     * suspendCoroutineUninterceptedOrReturn的回来值为 try 表达式的回来值。
     *  假如对这个不了解的能够看文章顶部列出的“[你真的了解kotlin中协程的suspendCoroutine原理吗?]
     * 这篇文章。
     * try 里边的emit 便是把把上游来的数据交给下流collector,并把SafeCollector作为协成续体传入到下流。
     * 因而假如下流collect{}里边被挂起了emit就会回来COROUTINE_SUSPENDED,这样flow{} 里边的emit也就被挂起了。
     *
     * 下流collect{}里边挂起康复后,履行完collect{}里边的代码后就会调用SafeCollector的invokeSuspend
     * 让上游flow{}中挂起的emit(也便是SafeCollector的emit)康复,持续履行,持续emit.
     */
  override suspend fun emit(value: T) {
   
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
      try {
          //调用内部自己的emit
        emit(uCont, value) 
       } catch (e: Throwable) {
        //假如下流collect{} 里边产生了未捕获的反常,这儿把反常捕获,然后把context 和
          //反常信息封装到DownstreamExceptionContext里边,赋值给lastEmissionContext。
          //这样假如上游还想持续emit,由于lastEmissionContext为DownstreamExceptionContext
          //类型,里边记载了下流抛出了反常,就会抛出反常。
          //这个反常是在exceptionTransparencyViolated里边抛出。
        lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
        throw e
       }
     }
   }
    /**
     * 1,这个emit里边会对上游flow{}里边调用emit时所在的协成的context和下流collect函数所在的协成的context
     * 经过checkContext函数做比较,假如发现emit 和 collect 不是同一个协成,那就会抛出反常。
     * 2,假如下流抛collect{}里边抛出了未捕获的溢出,也会在 checkContext的时分调用
     * exceptionTransparencyViolated抛出反常。阻止上游flow{}里边持续发送。
     */
  private fun emit(uCont: Continuation<Unit>, value: T): Any? {
      //上游flow{}里边调用emit时所在的协成的context
    val currentContext = uCont.context
      //相应协成取消。也便是说在collect上游数据的过程中假如外部协成被取消了。flow 也要中止持续发送。
    currentContext.ensureActive()
   
    // 要么是上一次emit时所在的协成的context,要么是DownstreamExceptionContext(代表上一次emit数据,下流
      // 产生了未捕获的反常)。
    val previousContext = lastEmissionContext
   
    /**
     * 假如两次的context 不持平。比方: 
     * flow {
     *  emit(1)
     *  launch{
     *    emit(2)
     *  }
     * }
     * 两次emit的context 必定就不持平了。(原因1)
     * 或许前一次emit时下流抛反常了,那也不持平。(原因2)
     * 第一次emit lastEmissionContext = null。因而也不持平(原因3)
     */
    if (previousContext !== currentContext) {
        //假如两次不持平,就再做进一步查看,看看是什么原因导致的。
      checkContext(currentContext, previousContext, value)
        //记载本次emit的context,存入lastEmissionContext中。
      lastEmissionContext = currentContext
     }
      //记载flow{}代码块的协成续体,当下流collect遇到挂起,挂起康复履行完collect{}里边的代码后,需求
      //经过completion来让上游flow{}里边持续履行。
    completion = uCont
      //emitFun指的是下流Collector的emit函数。这儿为什么没有直接调用collector.emit。而是经过函数引证的
      //方法,由于直接调用就没办法把协成续体(SafeCollector)传过去了。
    val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    /*
     * If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
     * and we don't have to retain a strong reference to it to avoid memory leaks.
     */
      //假如下流
    if (result != COROUTINE_SUSPENDED) {
      completion = null
     }
    return result
   }
    //进一步查看是哪一种原因导致两次context不持平的
  private fun checkContext(
    currentContext: CoroutineContext,
    previousContext: CoroutineContext?,
    value: T
   ) { 
      //假如是前一次emit时下流抛出了未捕获的溢出
    if (previousContext is DownstreamExceptionContext) {
        //遵从官网提出的两个束缚中的 Eexception Transparency,
        //exceptionTransparencyViolated里边直接抛出反常,完毕Flow。
      exceptionTransparencyViolated(previousContext, value)
     }
      //假如不是下流抛出反常的,那就遍历context里边的每一个Element进行比较。
    checkContext(currentContext)
   }
    //假如下流抛出反常,持续在emit,exceptionTransparencyViolated里边直接抛出反常,完毕Flow.
  private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
    /*
     * Exception transparency ensures that if a `collect` block or any intermediate operator
     * throws an exception, then no more values will be received by it.
     * For example, the following code:
     * ```
     * 上面的意思便是 Exception transparency 这个束缚确保了假如collect{}代码块或许任何中心操作符里边
     * 抛出了反常,下流就不应该再接收到任何数据。
     * 比方:
     * val flow = flow {
     *   emit(1)
     *   try {
     *      emit(2)
     *   } catch (e: Exception) { //中心操作符或许collect代码块里边抛出了反常
     *      emit(3) //持续发送
     *   }
     * }
     * // Collector
     * flow.collect { value ->
     *   if (value == 2) { //让collect 代码块连抛出一个反常
     *     throw CancellationException("No more elements required, received enough")
     *   } else {
     *     println("Collected $value")
     *   }
     * }
     * ```
     * is expected to print "Collected 1" and then "No more elements required, received enough" 
     * exception,
     * 希望的是打印 “ollected 1” 和 反常信息“and then "No more elements required, received enough" 
     * exception”
     * but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be
     * printed instead.
     * 可是假如 exception transparency这个束缚没有被强制履行的话,就会打印 “Collected 1” 和 “Collected 3”
     */
      
      //因而为了确保 exception transparency的束缚被应用,所以当collect里边抛出反常后,再想持续emit就
      //直接抛出反常,终止Flow.
    error("""
         //Flow的exception transparency被违反了
      Flow exception transparency is violated:
           //上一次emit已经抛出了${exception.e},又查看到了持续emit ${value}
        Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
        //在ty 的catch里边emit 是制止的。  假如你想要做到下流抛出反常后,还想持续发送就用
        //catch操作符代替。
        Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
           //更多的解说查阅 catch操作符的文档。
           //catch操作符的源码在Errors.kt文件里边
        For a more detailed explanation, please refer to Flow documentation.
      """.trimIndent())
   }
}
​
​
//crrrentContext 为 本次emit时的context
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
  //fold 遍历context 里边的每一个Element,
    // result为遍历完后count的值,用于和 SafeCollector中的collectContextSize进行比照。假如两个
    //值都不持平,阐明两个context里边的Element个数不一样。那便是产生了context切换。
  val result = currentContext.fold(0) fold@{ count, element ->
    val key = element.key //拿到第一个element的Key。
     
    //用这个Key去collectContext里边取Element。                   
    val collectElement = collectContext[key]
    
     //Job代表了这个协成的completion。每次创立一个协成,都需求为其创立一个Job类型的Continuation
     //作为其completion。key != Job,意思便是其他非Job类型的Element的处理情况。
     //比方CoroutineName,Dispatchers.xxx 都不是Job类型                   
    if (key !== Job) {
        //假如两个element 不持平。回来Int.MIN_VALUE
      return@fold if (element !== collectElement)
          Int.MIN_VALUE
      else
          //持平就加一。
          count + 1
     }
    /**
     * note : 假如不了解上面代码,那就需求你自行去恶补一下协成相关常识。
     */
     
    //假如Element为Job类型                    
    val collectJob = collectElement as Job? // collectContext中的Job
    
    // 拿到currentContext中的Job                   
    val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
    /*
     * Code like
     * ```
     * coroutineScope {
     *   launch {
     *     emit(1)
     *   }
     *
     *   launch {
     *     emit(2)
     *   }
     * }
     * ```
     * 上面的代码写法是错的,会抛反常。
     * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if
     * you need concurrent emission
     * or want to switch context dynamically (e.g. with `withContext`).
     *
     * 假如你想要并发emit,意思便是在flow{}中开启子协成emit数据,或许想经过withContext来切换context
     * 那么请运用 channelFlow来代替。
     * Note that collecting from another coroutine is allowed, e.g.:
     * ```
     * //以下下代码出现在flow{}里边是能够的
     * coroutineScope {
     *   //虽然这个Channel的send是在一个新的协成里边
     *   val channel = produce {
     *     collect { value ->
     *       send(value)
     *     }
     *   }
     *   //可是从channe中receive 数据仍是产生在和collect{}同一个协成里边。
     *   //并且coroutineScope也没有切换context。
     *   channel.consumeEach { value ->
     *     emit(value)
     *   }
     * }
     * ```
     * is a completely valid.
     */
     //假如连个Job不持平,必然产生了context切换。直接抛出反常                   
    if (emissionParentJob !== collectJob) {
      error(
        "Flow invariant is violated:\n" +
            "\t\tEmission from another coroutine is detected.\n" +
            "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
            "\t\tFlowCollector is not thread-safe and concurrent emissions are           prohibited.\n" +
            "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
       )
     }
​
    /*
     * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`),
     * then invariant is maintained(common transitive parent is "null"),
     *  but count check will fail, so just do not count job context element when
     * flow is collected from EmptyCoroutineContext
     * 针对collectJob == null,也便是 collect 函数被调用在”suspend fun main“的情况,
     *  那就直接回来count。不然两个Job持平时对其加一,
     * 由于在计算collectContextSize的时分,collectJob不为空时是算了一个数的。因而这儿也需求加一。
     */
    if (collectJob == null) count else count + 1
   }
    //假如context中element个数不持平,那也阐明产生了context切换。抛出反常
  if (result != collectContextSize的) {
    error(
      "Flow invariant is violated:\n" +
          "\t\tFlow was collected in $collectContext,\n" +
          "\t\tbut emission happened in $currentContext.\n" +
          "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
     )
   }
}
​
​
//
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
    //假如currentContext没有Job。回来null。比方currentContext = EmptyCoroutineContext。
  if (this === null) return null
    //假如currentContext和collectContext中的两个Job是同一个目标,直接回来this.
  if (this === collectJob) return this
    //假如两个Job不是同一个目标。一起this 也不是ScopeCoroutine类型,意思便是在flow中产生了context
    //切换,那也就意味着两个Job不持平。
  if (this !is ScopeCoroutine<*>) return this
 
    //假如上面都不是,那就阐明this 是ScopeCoroutine类型,比方在flow{} 里边运用了
    //coroutineScope 或许supervisorScope,这种是支撑的,那就拿到this.parent,也是Job,持续递归调用
    //再进行比较。假如在flow{}里边没有发动新协成或许切换context, tiis作为一个ScopeCoroutine类型,它的
    //parent递归时是能回来一个collectJob持平(===)的目标的。
    //当让this 不是ScopeCoroutinel类型时,
  return parent.transitiveCoroutineParent(collectJob)
}

Flow的catch操作直接从代码里边点进步假如看不到源码,你能够试试双击Shift,查找“Errors.kt”(必定要把后缀带上)

结语:本次发布比较充忙,没有来得及回顾,可能存在错别字和表述不清楚的当地,欢迎大家留言指出,谢谢

kotlin Flow系列之 -  冷流SafeFlow源码解析之 - Safe在那里?