前语

在前面文章咱们介绍过Channel的运用,Channel主要用于协程间的通讯,相比于Flow,它仍是热的,即不管有没有顾客,它都会往Channel中发射数据,即发射端一向会作业,就和一位热情的服务员相同。

那本篇文章,就来解析一波Channel的原理,看看是怎么完结在协程间通讯的,以及探求”热”的原因。

正文

咱们仍是以简略比方下手,来逐渐剖析。

Channel()顶层函数

咱们创立一个没有缓存容量的Channel,如下:

fun main()  {
    val scope = CoroutineScope(Job())
    //创立管道,都运用默许参数
    val channel = Channel<Int>()
    scope.launch {
        //在一个独自的协程傍边发送管道音讯
        repeat(3)  {
            channel.send(it)
            println("Send: $it")
        }
        channel.close()
    }
    scope.launch {
        //在一个独自的协程傍边接纳管道音讯
        repeat(3) {
            val result = channel.receive()
            println("Receive $result")
        }
    }
    println("end")
    Thread.sleep(2000000L)
}
/*
输出成果:
end
Receive 0
Send: 0
Send: 1
Receive 1
Receive 2
Send: 2
*/

在这儿会发现输出成果是替换履行的,这是由于Channelsendreceive是挂起函数,而默许参数创立的Channel是没有缓存容量的,所以调用完send后,假如没有顾客来消费,就会挂起;同理receive也是如此,这些知识点咱们在之前学习Channel文章时,现已说过这些特性了。

再结合挂起函数的实质,这种替换履行的输出成果,我信任都能理解。本篇文章,就来探索一下,Channel到底是怎么完结的。

和咱们之前剖析的CoroutineScopeJob等相似,Channel()也是一个顶层函数充当结构函数运用的事例,该办法代码如下:

//顶层函数充当结构函数运用
public fun <E> Channel(
    //容量
    capacity: Int = RENDEZVOUS,
    //背压战略
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    //元素投递失利回调
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        //依据容量分类
        RENDEZVOUS -> {
            //默许参数下,所创立的Channel
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement)
            else
                //背压战略对错挂起情况下的完结
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement) 
        }
        CONFLATED -> {
            ...
            ConflatedChannel(onUndeliveredElement)
        }
        UNLIMITED -> LinkedListChannel(onUndeliveredElement) 
        //容量为2,默许也是ArrayChannel
        BUFFERED -> ArrayChannel(
            if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        //其他自界说容量
        else -> {
            if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                ConflatedChannel(onUndeliveredElement) 
            else
                ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

由该顶层函数咱们能够看出,依据咱们所传入的参数不同,会创立不同的Channel实例,比方RendezvousChannelArrayChannel等,咱们等会以默许的RendezvousChannel为例来剖析。

这儿有个小知识点,便是onUndeliveredElement参数,这儿运用函数类型,即契合Kotlin的语法规矩,又不用创立剩余接口。

但是(E) -> Unit这种函数类型是否会形成误解呢?由于究竟丢失的元素能够用这个函数类型表明,那我再界说一个抵达元素的回调呢,是不是也能够界说为(E) -> Unit。为了防止形成这种误解,咱们看看是怎么完结的,咱们看看RendezvousChannel的界说:

internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement)

会发现这儿参数类型居然是OnUndeliveredElement,这就很容易了解了。这儿难道是界说了接口吗?咱们查看一下:

internal typealias OnUndeliveredElement<E> = (E) -> Unit

能够发现这儿仅仅给类型起了一个别名,经过typealias能够给一些容易形成了解紊乱的函数类型起个姓名,这个小知识点,在实际事务中,仍是蛮有用的。

回到主线,咱们来剖析RendezvousChannel的承继联系:

//该类承继至AbstractChannel
internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : 
    AbstractChannel<E>(onUndeliveredElement)
//承继至AbstractSendChannel类,完结Channel接口
internal abstract class AbstractChannel<E>(
    onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E>
//完结SendChannel接口
internal abstract class AbstractSendChannel<E>(
    @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E>
//Channel接口,承继至SendChannel和ReceiveChannel接口
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 

乍一看,这儿的接口和笼统类界说的有点复杂,但是咱们稍微剖析一下,就会发现这样界说挺合理:

  • 首先便是一个最基础的问题,接口和笼统类的差异?

    从面向对象解读来看,以及运用角度来剖析,接口是倾向于束缚公共的功用,或许给一个类添加额外的功用,某个类完结了接口,它就有了一些额外的能力行为。一起束缚了该类,有这些功用。

    比方这儿的SendChannel接口,就表明一个管道发送方,所以它束缚了一些一致操作:sendtrySend等。

    而笼统类,更多的是公共代码的抽取,或许一个笼统事务的根本完结。比方这儿的AbstractChannel<E>就代表传递E类型的笼统管道完结,在里面完结了大多数的公共函数功用。

  • 这儿Channel接口,承继至SendChannelReceiveChannel,即把发送端和接纳端给分开了,依据接口的界说,Channel便是具有发送端和接纳端的管道。

  • 这儿AbstractChannel代表发送方的笼统完结或许公共完结,结构函数的参数能够接纳发送失利的回调处理。

搞理解这几个笼统类,咱们接下来就很好剖析了。

LockFreeLinkedList简析

首先是AbstractChannel,为什么发送端独自需求抽离出一个笼统类呢?这也是由于,发送端的逻辑比较复杂,一起它还也是Channel是线程安全的核心完结点。

AbstractChannel中,有下面一个变量:

internal abstract class AbstractSendChannel<E>(
    @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E> {
    protected val queue = LockFreeLinkedListHead()
    ...

能够发现这是一个queue,即行列,一起它仍是一个线程安全的行列,从LockFreeLinkedList就能够看出,它是一个没有运用锁LockLinkedList

//Head仅仅一个岗兵节点
public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode()
//线程安全的双向链表
public actual open class LockFreeLinkedListNode {
    private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
    private val _prev = atomic(this) // Node to the left (cannot be marked as removed)
    private val _removedRef = atomic<Removed?>(null) 

关于这个数据结构,这儿不做过多剖析,等后边有时间能够专门研究一下,这个线程安全的数据结构,有如下特色:

  • 它是一个双向链表结构,按理说双向链表的插入能够从头或许尾都是能够的,但是在这儿,界说了插入只能是尾部,即右边;而获取元素,只能从头部,即左面。
  • 它有一个岗兵节点,岗兵节点是不存储数据的,它的next节点是数据节点的头节点,它的pre节点是数据节点的尾节点,当数据节点为空时,仍旧有岗兵节点。
  • 该数据结构中,保存数据运用了atomic,即CAS技术,这样能够保证这个链表的操作是线程安全的。

到这儿,咱们现已知道了在AbstractChannel中存在一个线程安全的双向行列,至于节点保存的数据是什么,后边待会再剖析。

send流程剖析

咱们以文章开端的测试代码为例,当调用send(0)时,完结办法便是AbstractChannel中:

//发送数据
public final override suspend fun send(element: E) {
    // fast path -- try offer non-blocking
    if (offerInternal(element) === OFFER_SUCCESS) return
    // slow-path does suspend or throws exception
    //挂起函数
    return sendSuspend(element)
}

在该办法中,有2个分支,当offerInternal办法回来成果为OFFER_SUCCESS时,就直接return,否则调用挂起发送函数sendSuspend

看到这个offerInternal(element)办法,我信任肯定会立马和前面所说的行列结合起来,由于offer这个单词就归于行列中的一种术语,表明添加的意思,和add相同,但是回来值不相同。

所以咱们能够大致猜出该办法作用:把element添加到行列中,假如添加成功,则直接回来,否则则挂起。咱们来看看offerInternal()办法:

//测验往buffer中添加元素,或许给顾客添加元素
protected open fun offerInternal(element: E): Any {
    while (true) {
        val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
        val token = receive.tryResumeReceive(element, null)
        if (token != null) {
            assert { token === RESUME_TOKEN }
            receive.completeResumeReceive(element)
            return receive.offerResult
        }
    }
}

该办法会往buffer中或许顾客添加数据,会成功回来数据,或许添加失利。

依据前面咱们设置的是默许Channel,是没有buffer的,且没有调用receive,即也没有顾客,所以这儿会直接回来OFFER_FAILED

所以咱们履行流程跳转到sendSuspend:

//send的挂起函数
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
    loop@ while (true) {
        //buffer是否已满,本例中,是满的
        if (isFullImpl) {
            //封装为SendElement
            val send = if (onUndeliveredElement == null)
                SendElement(element, cont) else
                SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
            //入队    
            val enqueueResult = enqueueSend(send)
            when {
                enqueueResult == null -> { // enqueued successfully
                    cont.removeOnCancellation(send)
                    return@sc
                }
                enqueueResult is Closed<*> -> {
                    cont.helpCloseAndResumeWithSendException(element, enqueueResult)
                    return@sc
                }
                enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
                enqueueResult is Receive<*> -> {} // try to offer instead
                else -> error("enqueueSend returned $enqueueResult")
            }
        }
       ...
    }
}

这便是send的挂起函数办法完结,剖析:

  • 这儿运用suspendCancellableCoroutineReusable挂起函数,和咱们之前所说的suspendCancellableCoroutine{}高阶函数相同,归于能接触到的最底层完结挂起函数的办法了,其间cont便是用来向挂起函数外部传递数据。

  • 在完结体中,首先判断isFullImpl即是否满了,由于本例测试代码的Channel是没有容量的,所以是满的。

  • 然后把elementcont封装为SendElement对象,这儿的element便是咱们之前所发送的0, 而continuation则代表后续的操作。

    这个SendElement类界说如下:

//发送元素
internal open class SendElement<E>(
   override val pollResult: E,
   @JvmField val cont: CancellableContinuation<Unit>
) : Send() {
   override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
       val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
       assert { token === RESUME_TOKEN } // the only other possible result
       // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
       otherOp?.finishPrepare() // finish preparations
       return RESUME_TOKEN
   }
   override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
   override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
   override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
}

从这儿咱们能够看出,这个Element便是把要发送的元素和Continuation给包装起来,而前面所说的双向链表中的元素也便是这种Element

  • 接着调用enqueueSend办法,把上面这个Element入队,依据该办法的回来值界说,这儿会回来null,表明插入成功。
  • 然后当入队成功时,会调用下面代码块:
enqueueResult == null -> { // enqueued successfully
   cont.removeOnCancellation(send)
   return@sc
}

这儿先是给cont设置了一个监听:

//给CancellableContinuation设置监听
internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
    invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)
//当Continuation被撤销时,节点主动从行列中remove掉
private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
    override fun invoke(cause: Throwable?) { node.remove() }
    override fun toString() = "RemoveOnCancel[$node]"
}

这个监听作用便是当Continuation履行完结或许被撤销时,该节点能够从双向行列中被移除。

然后便是return@sc,这儿是不是很疑惑呢?在曾经咱们完结挂起函数时,都是经过continuationresume办法来传递挂起函数的值,一起也是康复的过程,这儿居然没有康复。那这个挂起函数该什么时候康复呢?Channel是怎么来康复的呢?

receive流程剖析

咱们接着剖析,其实便是当调用receive()的时候。

receive()的完结,依据前面剖析便是在AbstractChannel中:

//接纳办法的完结
public final override suspend fun receive(): E {
    // fast path -- try poll non-blocking
    val result = pollInternal()
    @Suppress("UNCHECKED_CAST")
    if (result !== POLL_FAILED && result !is Closed<*>) return result as E
    // slow-path does suspend
    return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
}

这儿同样是相似的逻辑,首先是pollInternal办法,这儿的poll同样和offer相同,归于行列的术语,有轮询的意思,和remove相似的意思,所以该办法便是从行列中取出元素,咱们来看看完结:

//测验从buffer或许发送端中取出元素
protected open fun pollInternal(): Any? {
    while (true) {
        //取出SendElement
        val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
        //注释1
        val token = send.tryResumeSend(null)
        if (token != null) {
            assert { token === RESUME_TOKEN }
            //注释2
            send.completeResumeSend()
            return send.pollResult
        }
        // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
        send.undeliveredElement()
    }
}

依据前面咱们send的流程,这时能够成功取出咱们之前入队的SendElement对象,然后调用注释2处的send.completeResumeSend()办法:

override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)

这儿会调用continuationcompleteResume办法,这儿就需求结合前面文章所说的原理了,其实这个continuation便是状态机,它会回调CancellableContinuationImpl中的completeResume:

override fun completeResume(token: Any) {
    assert { token === RESUME_TOKEN }
    dispatchResume(resumeMode)
}

而该类的承继联系:

internal open class CancellableContinuationImpl<in T>(
    final override val delegate: Continuation<T>,
    resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame 

这儿相关的类,咱们在线程调度那篇文章中有所提及,这儿的dispatchResume:

private fun dispatchResume(mode: Int) {
    if (tryResume()) return // completed before getResult invocation -- bail out
    // otherwise, getResult has already commenced, i.e. completed later or in other thread
    dispatch(mode)
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
    ...
        if (dispatcher.isDispatchNeeded(context)) {
            dispatcher.dispatch(context, this)
        }
        ...
}

这儿终究会调用dispatcher.dispatch()办法,而这个咱们在之前调度器文章说过,这个最终会在Java线程池上履行,从而开端状态机。

既然该状态机康复了,也便是前面send流程中的挂起也康复了。

send挂起函数康复后,再经过

return send.pollResult

就能够获取咱们之前发送的值0了。

同样的,当pollInternal办法中,无法pollSendElement,则会调用receiveSuspend挂起办法:

private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
    val receive = if (onUndeliveredElement == null)
        ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
        ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
    while (true) {
        if (enqueueReceive(receive)) {
            removeReceiveOnCancel(cont, receive)
            return@sc
        }
        // hm... something is not right. try to poll
        val result = pollInternal()
        if (result is Closed<*>) {
            receive.resumeReceiveClosed(result)
            return@sc
        }
        if (result !== POLL_FAILED) {
            cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
            return@sc
        }
    }
}

send相似,这儿也会封装为ReceiveElement,一起入队到行列中,等待着send办法来康复这个协程。

“热”的探求

剖析完默许的Channel的发送和接纳,咱们来探求一下为什么Channel是热的。

这儿所说的热是由于Channel会在不管有没有接纳者的情况下,都会履行发送端的操作,当战略为Suspend时,它会一向持续到管道容量满。

这儿咱们仍是拿之前文章的比方:

fun main() = runBlocking {
    //创立管道 val channel = produce(capacity = 10) { 
        (1 .. 3).forEach { 
            send(it) 
            logX("Send $it") 
            } 
      } 
logX("end") }

这儿尽管没有调用receive办法,即没有顾客,send仍旧会履行,也便是”热”的。

依据前面所说的Channel()顶层函数源码,这儿容量为10,战略不变,终究会创立出ArrayChannel实例。

该类界说:

internal open class ArrayChannel<E>(
    /**
     * Buffer capacity.
     */
    private val capacity: Int,
    private val onBufferOverflow: BufferOverflow,
    onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractChannel<E>(onUndeliveredElement)

这儿同样是AbstractChannel的子类,所以send办法仍是仍旧:

public final override suspend fun send(element: E) {
    // fast path -- try offer non-blocking
    if (offerInternal(element) === OFFER_SUCCESS) return
    // slow-path does suspend or throws exception
    return sendSuspend(element)
}

仍是先测验往行列中offer数据,当无法offer时,履行挂起;但是这儿的offerInternal办法在ArrayChannel中被重写了:

//ArrayChannel中的办法
protected override fun offerInternal(element: E): Any {
    //接纳者
    var receive: ReceiveOrClosed<E>? = null
    //当多个线程都一起调用该办法时,为了容量安全,这儿进行加锁
    lock.withLock {
        //元素个数
        val size = this.size.value
        //发送现已关闭,直接回来
        closedForSend?.let { return it }
        // update size before checking queue (!!!)
        //在入队之前,更新管道容量,当元素小于管道容量,回来null
        //只有管道中的元素个数,大于管道容量时,该办法才会return
        //依据战略,会回来挂起或许丢弃或许失利等
        updateBufferSize(size)?.let { return it }
        ...
        //容量没满时,把元素入队
        enqueueElement(size, element)
        //回来入队成功
        return OFFER_SUCCESS
    }
    ...
}

在这儿咱们能够发现,不管有没有接纳者的情况下,当咱们屡次调用send办法,当行列没满时,在这儿都会回来OFFER_SUCCESS,即发送端现已在作业了,所以也便是咱们所说的的作用。

总结

Channel作为线程安全的管道,能够在协程之间通讯,一起能够完结替换履行的作用,经过本篇文章学习,我信任现已知道其原因了。小小总结一下:

  • Channel接口在设计时就十分奇妙,充分利用了接口和笼统,把发送端和接纳端能力分开,这个值得咱们学习。
  • Channel的线程安全原因是发送端保护了一个线程安全的双向行列:LockFreeLinkedList,咱们把值和continutaion封装为SendElement/ReceiveElement保存其间,这样就保证了线程安全。
  • Channel的发送和接纳挂起函数的康复机遇,是经过行列中的continuation操控,在CancellableContinuationImpl进行直接康复,而不是咱们常见的调用resumeWith办法。