协程系列文章:

  • Kotlin协程之根底运用
  • Kotlin协程之深入了解协程作业原理
  • Kotlin协程之协程撤销与异常处理
  • Kotlin协程之再次读懂协程作业原理-推荐
  • Kotlin协程之Flow作业原理
  • Kotlin协程之一文看懂StateFlow和SharedFlow

概述

Channel 类似于 Java 的 BlockingQueue 堵塞行列,不同之处在于 Channel 供给了挂起的 send() 和 receive() 办法。别的,通道 Channel 可以被封闭标明不再有数据会进入 Channel, 而接纳端可以通过 for 循环取出数据。

Channel 也是出产-顾客形式,这个设计形式在协程中很常见。

基本运用

val channel = Channel<Int>()
// 发送
launch {
    repeat(10) {
        channel.send(it)
        delay(200)
    }
    // 封闭
    channel.close()
}
// 接纳
launch {
    for (i in channel) {
        println("receive: $i")
    }
    // 封闭后
    println("closed")
}

produce 和 actor

produce 和 actor 是 Kotlin 供给的结构出产者与顾客的便捷办法。

其间 produce 办法用来发动一个出产者协程,并回来一个 ReceiveChannel 在其他协程中接纳数据:

// produce 出产协程
val receiveChannel = CoroutineScope(Dispatchers.IO).produce {
    repeat(10) {
        send(it)
        delay(200)
    }
}
// 接纳者 1
launch {
    for (i in receiveChannel) {
        println("receive-1: $i")
    }
}
// 接纳者 2
launch {
    for (i in receiveChannel) {
        println("receive-2: $i")
    }
}

输出:

2022-11-29 10:48:03.045 I/System.out: receive-1: 0
2022-11-29 10:48:03.250 I/System.out: receive-1: 1
2022-11-29 10:48:03.451 I/System.out: receive-2: 2
2022-11-29 10:48:03.654 I/System.out: receive-1: 3
2022-11-29 10:48:03.856 I/System.out: receive-2: 4
2022-11-29 10:48:04.059 I/System.out: receive-1: 5
2022-11-29 10:48:04.262 I/System.out: receive-2: 6
2022-11-29 10:48:04.466 I/System.out: receive-1: 7
2022-11-29 10:48:04.669 I/System.out: receive-2: 8
2022-11-29 10:48:04.871 I/System.out: receive-1: 9

反之也可以用 actor 来发动一个消费协程:

// actor 消费协程
val sendChannel = CoroutineScope(Dispatchers.IO).actor<Int> {
    while (true) {
        println("receive: ${receive()}")
    }
}
// 发送者 1
launch {
    repeat(10) {
        sendChannel.send(it)
        delay(200)
    }
}
// 发送者 2
launch {
    repeat(10) {
        sendChannel.send(it * it)
        delay(200)
    }
}

可以看出 produce 创立的是一个单出产者——多顾客的模型,而 actor 创立的是一个单顾客--多出产者的模型

不过这些相关的 API 要不就是 ExperimentalCoroutinesApi 实验性标记的,要不就是 ObsoleteCoroutinesApi 抛弃标记的,个人感觉暂时没必要运用它们。

Channel 是公正的

发送和接纳操作是公正的,它们遵守先进先出原则。官方也给了一个比如:

data class Ball(var hits: Int)
fun main() = runBlocking {
    val table = Channel<Ball>() // 一个同享的 table(桌子)
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // 率先打出第一个球
    delay(1000) // 延迟 1 秒钟
    coroutineContext.cancelChildren() // 游戏完毕,撤销它们
}
suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // 在循环中接纳球
        ball.hits++
        println("$name $ball")
        delay(300) // 等候一段时间
        table.send(ball) // 将球发送回去
    }
}

由于 ping 协程首要被发动,所以它首要接纳到了球,接着即使 ping 协程在将球发送后会当即开端接纳,可是球还是被 pong 协程接纳了,因为它一直在等候着接纳球:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

带缓冲的 Channel

前面现已说过 Channel 实践上是一个行列,那它当然也存在一个缓存区以及缓存满后的战略(处理背压之类的问题),在创立 Channel 时可以指定两个相关的参数:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

这里的 Channel() 其实并不是结构函数,而是一个顶层函数,它内部会依据不同的入参来创立不同类型的 Channel 实例。其参数含义如下:

  • capacity: Channel 缓存区的容量,默许为 RENDEZVOUS = 0
  • onBufferOverflow: 缓冲区满后发送端的处理战略,默许挂起。当顾客处理数据比出产者出产数据慢时,新出产的数据会存入缓存区,当缓存区满后,出产者再调用 send() 办法会挂起,等候顾客处理数据。

看个小栗子:

// 创立缓存区巨细为 4 的 Channel
val channel = Channel<Int>(4)
// 发送
launch {
    repeat(10) {
        channel.send(it)
        println("send: $it")
        delay(200)
    }
}
// 接纳
launch {
    val channel = viewModel.channel
    for (i in channel) {
        println("receive: $i")
        delay(1000)
    }
}

输出结果:

2022-11-28 17:16:47.905 I/System.out: send: 0
2022-11-28 17:16:47.907 I/System.out: receive: 0
2022-11-28 17:16:48.107 I/System.out: send: 1
2022-11-28 17:16:48.310 I/System.out: send: 2
2022-11-28 17:16:48.512 I/System.out: send: 3
2022-11-28 17:16:48.715 I/System.out: send: 4
2022-11-28 17:16:48.910 I/System.out: receive: 1
2022-11-28 17:16:48.916 I/System.out: send: 5 // 缓存区满了, receive 后才干持续发送
2022-11-28 17:16:49.913 I/System.out: receive: 2
2022-11-28 17:16:49.914 I/System.out: send: 6
2022-11-28 17:16:50.917 I/System.out: receive: 3
2022-11-28 17:16:50.917 I/System.out: send: 7
2022-11-28 17:16:51.920 I/System.out: receive: 4
2022-11-28 17:16:51.920 I/System.out: send: 8
2022-11-28 17:16:52.923 I/System.out: receive: 5
2022-11-28 17:16:52.923 I/System.out: send: 9
2022-11-28 17:16:53.925 I/System.out: receive: 6
2022-11-28 17:16:54.928 I/System.out: receive: 7
2022-11-28 17:16:55.932 I/System.out: receive: 8
2022-11-28 17:16:56.935 I/System.out: receive: 9

Channel 结构类型

这一节来简略看看 Channel 结构的几种类型,为避免内容过于单调,就不深入剖析一些源码细节了。

Channel 结构

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement)
            else
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }
        UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        else -> {
            if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                ConflatedChannel(onUndeliveredElement)
            else
                ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

前面咱们说了 Channel() 并不是结构函数,而是一个顶层函数,它内部会依据不同的入参来创立不同类型的 Channel 实例。咱们看看入参可取的值:

public const val UNLIMITED: Int = Int.MAX_VALUE
public const val RENDEZVOUS: Int = 0
public const val CONFLATED: Int = -1
public const val BUFFERED: Int = -2
public enum class BufferOverflow {
    SUSPEND, DROP_OLDEST, DROP_LATEST
}

其实光看这个结构的进程,以及两个入参的取值,咱们基本上就能知道生成的这个 Channel 实例的体现了。

比如说 UNLIMITED 表示缓存区无限大的管道,它所创立的 Channel 叫 LinkedListChannel; 而 BUFFERED 或指定 capacity 巨细的入参,创立的则是 ArrayChannel 实例,这也正是命名为 LinkedList(链表) 和 Array(数组) 的数据结构一个差异,前者可以视为无限大,后者有固定的容量巨细。

比如说 SUSPEND 表示缓存区满后挂起, DROP_OLDEST 表示缓存区满后会删去缓存区里最旧的那个元素且把当时 send 的数据存入缓存区, DROP_LATEST 表示缓存区满后会删去缓存区里最新的那个元素且把当时 send 的数据存入缓存区。

Channel 类型

上面创立的这四种 Channel 都有一个一同的基类——AbstractChannel,简略看看他们的继承关系:

Kotlin协程之一文看懂Channel管道

在 AbstractSendChannel 中有个重要的成员变量:

protected val queue = LockFreeLinkedListHead()

它是一个循环双向链表,形成了一个行列 queue 结构,send() 数据时存入链表尾部,receive() 数据时就从链表头第一个节点取。至于具体的挂起,恢复等流程,感爱好的可以自己看看源码。

值得一提的是, queue 中的节点类型可以大体分为三种:

  • Send
  • Receive
  • Closed: 当调用 Channel.close() 办法时,会往 queue 行列中加入 Closed 节点,这样当 send or receive 时就知道 Channel 现已封闭了。

别的,对于 ArrayChannel 管道,它有一个成员变量:

private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }

这是一个数组类型,用来完成指定 capacity 的缓存区。可是它的初始巨细不是 capacity, 主要是用来避免一些不必要的内存分配。

总结

Channel 类似于 BlockingQueue 堵塞行列,其不同之处是默许把堵塞行为换成了挂起,这也是协程的一大特性。它的思维是出产-消费形式(观察者形式)。

简略比较一下四种 Channel 类型:

  • RendezvousChannel: 翻译成约会类型,缓存区巨细为0,且指定为 SUSPEND 挂起战略。发送者和接纳者1对1呈现,接纳者没呈现,则发送者 send 会被挂起;发送者没呈现,则接纳者 receive 会被挂起。
  • ConflatedChannel: 混合类型。发送者不会挂起,它只要一个 value 值,会被新的值覆盖掉;如果没有数据,则接纳者会被挂起。
  • LinkedListChannel: 不限缓存区巨细的类型。发送者不会挂起,能一直往行列里存数据;行列无数据时接纳者会被挂起。
  • ArrayChannel: 指定缓存区巨细的类型。当缓存区满时,发送者依据 BufferOverflow 战略来处理(是否挂起);当缓存区空时,接纳者会被挂起。

我的专栏文章

Android视图体系:Android 视图体系相关的底层原了解析,看完定有收获。

Kotlin专栏:Kotlin 学习相关的博客,包含协程, Flow 等。

Android架构学习之路:架构不是一蹴而就的,期望咱们有一天的时候,可以从自己写的代码中找到架构的成就感,而不是干几票就跑路。作业太忙,更新比较慢,大家有爱好的话可以一同学习。

Android实战系列:记录实践开发中遇到和处理的一些问题。

文中内容如有错误欢迎指出,一同进步!更新不易,觉得不错的留个再走哈~

本文正在参与「金石方案 . 瓜分6万现金大奖」