定论先行
Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时间准备着,就像自来水一样,翻开开关就有水了。
Channel运用示例
fun main() = runBlocking {
logX("开端")
val channel = Channel<Int> { }
launch {
(1..3).forEach{
channel.send(it)
logX("发送数据: $it")
}
// 封闭channel, 节约资源
channel.close()
}
launch {
for (i in channel){
logX("接纳数据: $i")
}
}
logX("完毕")
}
示例代码 运用Channel创建了一组int类型的数据流,经过send发送数据,并经过for循环取出channel中的数据,最终channel是一种协程资源,运用完毕后应该及时调用close办法封闭,避免糟蹋不必要的资源。
Channel的源码
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {}
CONFLATED -> {}
UNLIMITED -> {}
else -> {}
}
能够看到Channel的结构函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.
首先看capacity,这个参数代表了管道的容量,默许参数是RENDEZVOUS,取值是0,还有其他一些值:
- UNLIMITED: Int = Int.MAX_VALUE,没有定量
- CONFLATED: 容量为1,新的覆盖旧的值
- BUFFERED: 增加缓冲容量,默许值是64,能够经过修正VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修正
接下来看onBufferOverflow, 望文生义便是管道容量满了,怎么办?默许是挂起,也便是suspend,一共有三种分别是:
SUSPNED、DROP_OLDEST以及DROP_LATEST
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
- SUSPEND,当管道的容量满了今后,如果发送方还要持续发送,咱们就会挂起当时的 send() 办法。由于它是一个挂起函数,所以咱们能够以非阻塞的方法,将发送方的履行流程挂起,等管道中有了闲暇方位今后再康复,有点像生产者-顾客模型
- DROP_OLDEST,望文生义,便是丢掉最旧的那条数据,然后发送新的数据,有点像LRU算法。
- DROP_LATEST,丢掉最新的那条数据。这儿要注意,这个动作的意义是丢掉当时正准备发送的那条数据,而管道中的内容将保持不变。
最终一个参数是onUndeliveredElement,从姓名看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接纳时,这个就会被调用。
归纳这个参数运用一下
fun main() = runBlocking {
println("开端")
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("发送数据: $it")
}
// 封闭channel, 节约资源
channel.close()
}
launch {
for (i in channel){
println("接纳数据: $i")
}
}
println("完毕")
}
输出成果如下:
开端
完毕
发送数据: 1
发送数据: 2
发送数据: 3
接纳数据: 2
接纳数据: 3
安全的从Channel中取数据
先看一个比如
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("发送: $it")
}
}
while (!channel.isClosedForReceive){
val i = channel.receive();
println("接纳: $i")
}
输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
能够看到运用isClosedForReceive判断是否封闭再运用receive办法接纳数据,依然会报错,所以不引荐运用这种方法。
引荐运用上面for循环的方法取数据,还有kotlin引荐的consumeEach方法,看一下示例代码
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("发送: $it")
}
}
channel.consumeEach {
println("接纳:$it")
}
所以,当咱们想要获取Channel当中的数据时,咱们尽量运用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。
“热的数据流”从何而来?
先看一下代码
println("开端")
val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("发送数据: $it")
}
}
println("完毕")
}
输出:
开端
完毕
发送数据: 1
发送数据: 2
发送数据: 3
能够看到上述代码中并没有 取channel中的数据,可是发送的代码正常履行了,这种“不论有没有接纳方,发送方都会作业”的模式,便是咱们将其认定为“热”的原因。
举个比如,就像去海底捞吃火锅一样,你不需要自动要求服务员加水,服务员看到你的杯子中水少了,会自动给你增加,你只管拿起水杯喝水就行了。
总的来说,不论接纳方是否存在,Channel 的发送方一定会作业。
Channel能力的来历
经过源码能够看到Channel只是一个接口,它的能力来历于SendChannel和ReceiveChannel,一个发送管道,一个接纳管道,相当于做了一个组合。
这也是一种杰出的规划思维,“对读取敞开,对写入封闭”的开闭原则。