定论先行

Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时间准备着,就像自来水一样,翻开开关就有水了。

Channel运用示例

fun main() = runBlocking {
    logX("开端")
    val channel = Channel<Int> {  }
    launch {
        (1..3).forEach{
            channel.send(it)
            logX("发送数据: $it")
        }
        // 封闭channel, 节约资源
        channel.close()
    }
    launch {
        for (i in channel){
            logX("接纳数据: $i")
        }
    }
    logX("完毕")
}

示例代码 运用Channel创建了一组int类型的数据流,经过send发送数据,并经过for循环取出channel中的数据,最终channel是一种协程资源,运用完毕后应该及时调用close办法封闭,避免糟蹋不必要的资源。

Channel的源码

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}
    }

能够看到Channel的结构函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,这个参数代表了管道的容量,默许参数是RENDEZVOUS,取值是0,还有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,没有定量
  • CONFLATED: 容量为1,新的覆盖旧的值
  • BUFFERED: 增加缓冲容量,默许值是64,能够经过修正VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修正

接下来看onBufferOverflow, 望文生义便是管道容量满了,怎么办?默许是挂起,也便是suspend,一共有三种分别是:
SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,当管道的容量满了今后,如果发送方还要持续发送,咱们就会挂起当时的 send() 办法。由于它是一个挂起函数,所以咱们能够以非阻塞的方法,将发送方的履行流程挂起,等管道中有了闲暇方位今后再康复,有点像生产者-顾客模型
  • DROP_OLDEST,望文生义,便是丢掉最旧的那条数据,然后发送新的数据,有点像LRU算法
  • DROP_LATEST,丢掉最新的那条数据。这儿要注意,这个动作的意义是丢掉当时正准备发送的那条数据,而管道中的内容将保持不变。

最终一个参数是onUndeliveredElement,从姓名看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接纳时,这个就会被调用。

归纳这个参数运用一下

fun main() = runBlocking {
    println("开端")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
        // 封闭channel, 节约资源
        channel.close()
    }
    launch {
        for (i in channel){
            println("接纳数据: $i")
        }
    }
    println("完毕")
}
输出成果如下:
开端
完毕
发送数据: 1
发送数据: 2
发送数据: 3
接纳数据: 2
接纳数据: 3

安全的从Channel中取数据

先看一个比如

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
while (!channel.isClosedForReceive){
    val i = channel.receive();
    println("接纳: $i")
}
输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

能够看到运用isClosedForReceive判断是否封闭再运用receive办法接纳数据,依然会报错,所以不引荐运用这种方法。

引荐运用上面for循环的方法取数据,还有kotlin引荐的consumeEach方法,看一下示例代码

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
channel.consumeEach {
    println("接纳:$it")
}

所以,当咱们想要获取Channel当中的数据时,咱们尽量运用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。

“热的数据流”从何而来?

先看一下代码

    println("开端")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
    }
    println("完毕")
}
输出:
开端
完毕
发送数据: 1
发送数据: 2
发送数据: 3

能够看到上述代码中并没有 取channel中的数据,可是发送的代码正常履行了,这种“不论有没有接纳方,发送方都会作业”的模式,便是咱们将其认定为“热”的原因。

举个比如,就像去海底捞吃火锅一样,你不需要自动要求服务员加水,服务员看到你的杯子中水少了,会自动给你增加,你只管拿起水杯喝水就行了。

总的来说,不论接纳方是否存在,Channel 的发送方一定会作业。

Channel能力的来历

经过源码能够看到Channel只是一个接口,它的能力来历于SendChannel和ReceiveChannel,一个发送管道,一个接纳管道,相当于做了一个组合。

这也是一种杰出的规划思维,“对读取敞开,对写入封闭”的开闭原则。