• Kotlin协程Channel浅析


    结论先行

    Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。

    Channel使用示例

    fun main() = runBlocking {
        logX("开始")
        val channel = Channel {  }
        launch {
            (1..3).forEach{
                channel.send(it)
                logX("发送数据: $it")
            }
            // 关闭channel, 节省资源
            channel.close()
        }
        launch {
            for (i in channel){
                logX("接收数据: $i")
            }
        }
    
        logX("结束")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。

    Channel的源码

    public fun  Channel(
        capacity: Int = RENDEZVOUS,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
        onUndeliveredElement: ((E) -> Unit)? = null
    ): Channel =
        when (capacity) {
            RENDEZVOUS -> {}
            CONFLATED -> {}
            UNLIMITED -> {}
            else -> {}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.

    首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:

    • UNLIMITED: Int = Int.MAX_VALUE,没有限量
    • CONFLATED: 容量为1,新的覆盖旧的值
    • BUFFERED: 添加缓冲容量,默认值是64,可以通过修改VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修改

    接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是:
    SUSPNED、DROP_OLDEST以及DROP_LATEST

    public enum class BufferOverflow {
        /**
         * Suspend on buffer overflow.
         */
        SUSPEND,
    
        /**
         * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
         */
        DROP_OLDEST,
    
        /**
         * Drop **the latest** value that is being added to the buffer right now on buffer overflow
         * (so that buffer contents stay the same), do not suspend.
         */
        DROP_LATEST
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • SUSPEND,当管道的容量满了以后,如果发送方还要继续发送,我们就会挂起当前的 send() 方法。由于它是一个挂起函数,所以我们可以以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复,有点像生产者-消费者模型
    • DROP_OLDEST,顾名思义,就是丢弃最旧的那条数据,然后发送新的数据,有点像LRU算法。
    • DROP_LATEST,丢弃最新的那条数据。这里要注意,这个动作的含义是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变。

    最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。

    综合这个参数使用一下

    fun main() = runBlocking {
        println("开始")
        val channel = Channel(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
            println("onUndeliveredElement = $it")
        }
    
        launch {
            (1..3).forEach{
                channel.send(it)
                println("发送数据: $it")
            }
            // 关闭channel, 节省资源
            channel.close()
        }
        launch {
            for (i in channel){
                println("接收数据: $i")
            }
        }
    
        println("结束")
    }
    
    输出结果如下:
    开始
    结束
    发送数据: 1
    发送数据: 2
    发送数据: 3
    接收数据: 2
    接收数据: 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    安全的从Channel中取数据

    先看一个例子

    val channel: ReceiveChannel = produce {
            (1..100).forEach{
                send(it)
                println("发送: $it")
            }
        }
    
    while (!channel.isClosedForReceive){
        val i = channel.receive();
        println("接收: $i")
    }
        
    输出报错信息:
    Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。

    推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码

    val channel: ReceiveChannel = produce {
            (1..100).forEach{
                send(it)
                println("发送: $it")
            }
        }
    channel.consumeEach {
        println("接收:$it")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。

    “热的数据流”从何而来?

    先看一下代码

        println("开始")
        val channel = Channel(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
            println("onUndeliveredElement = $it")
        }
    
        launch {
            (1..3).forEach{
                channel.send(it)
                println("发送数据: $it")
            }
        }
        println("结束")
    }
    输出:
    开始
    结束
    发送数据: 1
    发送数据: 2
    发送数据: 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种“不管有没有接收方,发送方都会工作”的模式,就是我们将其认定为“热”的原因。

    举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。

    总的来说,不管接收方是否存在,Channel 的发送方一定会工作。

    Channel能力的来源

    通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。

    这也是一种良好的设计思想,“对读取开放,对写入封闭”的开闭原则。

    欢迎关注公众号:君伟说,后台回复“技术交流”,邀你进群,一起进步

  • 相关阅读:
    JS中面向对象的程序设计
    2023.09.10 学习周报
    【Python报错】ValueError:optimizer got an empty parameter list 概念理解解决
    汽车以太网协议栈
    Parker驱动器维修COMPAX控制器维修CPX0200H
    【Logback+Spring-Aop】实现全面生态化的全链路日志追踪系统服务插件「Logback-MDC篇」
    【计网】第六章 应用层
    OpenStack 创建虚拟机错误: Host ‘compute1‘ is not mapped to any cell
    go语言常量中的iota
    mapstruct实体转换 转换成不同类型
  • 原文地址:https://blog.csdn.net/wayne214/article/details/127998494