• Kotlin 协程 Select:看我如何多路复用


    前言

    协程系列文章:

    协程通信三剑客:Channel、Select、Flow,上篇已经分析了Channel的深水区,本篇将会重点分析Select的使用及原理。
    通过本篇文章,你将了解到:

    1. Select 的引入
    2. Select 的使用
    3. Invoke函数 的妙用
    4. Select 的原理
    5. Select 注意事项

    1. Select 的引入

    多路数据的选择

    串行执行

    如今的二维码识别应用场景越来越广了,早期应用比较广泛的识别SDK如zxing、zbar,它们各有各的特点,也存在识别不出来的情况,为了将两者优势结合起来,我们想到的方法是同一份二维码图片分别给两者进行识别。
    如下:

        //从zxing 获取二维码信息
        suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String {
            //模拟耗时
            delay(2000)
            return "I'm fish"
        }
    
        //从zbar 获取二维码信息
        suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
            delay(1000)
            return "I'm fish"
        }
    
        fun testSelect() {
            runBlocking {
                var bitmap = null
                var starTime = System.currentTimeMillis()
                var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
                var qrcode2 = getQrcodeInfoFromZbar(bitmap)
                println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    查看打印,最后花费的时间:

    qrcode1=I’m fish qrcode2=I’m fish useTime:3013 ms

    当然这是串行的方式效率比较低,我们想到了用协程来优化它。

    协程并行执行

    如下:

        fun testSelect1() {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var deferredZxing = GlobalScope.async {
                getQrcodeInfoFromZxing(bitmap)
            }
    
            var deferredZbar = GlobalScope.async {
                getQrcodeInfoFromZbar(bitmap)
            }
    
            runBlocking {
                //挂起等待识别结果
                var qrcoe1 = deferredZxing.await()
                //挂起等待识别结果
                var qrcode2 = deferredZbar.await()
                println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    查看打印,最后花费的时间:

    qrcode1=I’m fish qrcode2=I’m fish useTime:2084 ms

    可以看出,花费时间明显变少了。
    与上个Demo 相比,虽然识别过程是放在协程里并行执行的,但是在等待识别结果却是串行的。我们引入两个识别库的初衷是哪个识别快就用哪个的结果,为了达成这个目的,传统的方式是:

    同时监听并记录识别结果的返回。

    同时监听多路结果

    如下:

        fun testSelect2() {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var deferredZxing = GlobalScope.async {
                getQrcodeInfoFromZxing(bitmap)
            }
    
            var deferredZbar = GlobalScope.async {
                getQrcodeInfoFromZbar(bitmap)
            }
    
            var isEnd = false
            var result: String? = null
            GlobalScope.launch {
                if (!isEnd) {
                    //没有结束,则继续识别
                    var resultTmp = deferredZxing.await()
                    if (!isEnd) {
                        //识别没有结束,说明自己是第一个返回结果的
                        result = resultTmp
                        println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                        //标记识别结束
                        isEnd = true
                    }
                }
            }
    
            GlobalScope.launch {
                if (!isEnd) {
                    var resultTmp = deferredZbar.await()
                    if (!isEnd) {
                        //识别没有结束,说明自己是第一个返回结果的
                        result = resultTmp
                        println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                        isEnd = true
                    }
                }
            }
    
            //检测是否有结果返回
            runBlocking {
                while (!isEnd) {
                    delay(1)
                }
                println("recognize result:$result")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    通过检测isEnd 标记来判断是否有某个模块返回结果。
    结果如下:

    • zbar recognize ok useTime:1070 ms
    • recognize result:I’m fish

    由于模拟设定的zbar 解析速度快,因此每次都是采纳的是zbar的结果,所花费的时间大幅减少了,该结果符合预期。

    Select 闪亮登场

    虽说上个Demo结果符合预期,但是多了很多额外的代码、多引入了其它协程,并且需要子模块对标记进行赋值(对"isEnd"进行赋值),没有达到解耦的目的。我们希望子模块的任务是单一且闭环的,如果能在一个函数里统一检测结果的返回就好了。
    Select 就是为了解决多路数据的选择而生的。
    来看看它是怎么解决该问题的:

        fun testSelect3() {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var deferredZxing = GlobalScope.async {
                getQrcodeInfoFromZxing(bitmap)
            }
            var deferredZbar = GlobalScope.async {
                getQrcodeInfoFromZbar(bitmap)
            }
            runBlocking {
                //通过select 监听zxing、zbar 结果返回
                var result = select<String> {
                    //监听zxing
                    deferredZxing.onAwait {value->
                        //value 为deferredZxing 识别的结果
                        "zxing result $value"
                    }
    
                    //监听zbar
                    deferredZbar.onAwait { value->
                        "zbar result $value"
                    }
                }
    
                //运行到此,说明已经有结果返回
                println("result from $result useTime:${System.currentTimeMillis() - starTime}")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    结果如下:

    result from zbar result I’m fish useTime:1079

    符合预期,同时可以看出:相比上个Demo,这样写简洁了许多。

    2. Select 的使用

    除了可以监听async的结果,Select 还可以监听Channel的发送方/接收方 数据,我们以监听接收方数据为例:

        fun testSelect4() {
            runBlocking {
                var bitmap = null;
                var starTime = System.currentTimeMillis()
                var receiveChannelZxing = produce {
                    //生产数据
                    var result = getQrcodeInfoFromZxing(bitmap)
                    //发送数据
                    send(result)
                }
    
                var receiveChannelZbar = produce {
                    var result = getQrcodeInfoFromZbar(bitmap)
                    send(result)
                }
    
                var result = select<String> {
                    //监听是否有数据发送过来
                    receiveChannelZxing.onReceive {
                        value->"zxing result $value"
                    }
    
                    receiveChannelZbar.onReceive {
                            value->"zbar result $value"
                    }
                }
    
                println("result from $result useTime:${System.currentTimeMillis() - starTime}")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    结果如下:

    result from zbar result I’m fish useTime:1028

    不论是async还是Channel,Select 都可以监听它们的数据,从而形成多路复用的效果。

    image.png

    在监听协程里调用select 表达式,表达式{}内声明需要监听的协程的数据,对于select 来说有两种场景:

    1. 没有数据,则select 挂起协程并等待直到其它协程数据准备完成后再次恢复select 所在的协程。
    2. 有数据,则select 正常执行并返回获取的数据。

    3. Invoke函数 的妙用

    在分析Select 原理之前,需要弄明白invoke函数的原理。
    对于Kotlin 类来说,都可以重写其invoke函数。

        operator fun invoke():String {
            return "I'm fish"
        }
    
    • 1
    • 2
    • 3

    如上,重写了SelectDemo里的invoke函数,和普通成员函数一样,我们可以通过对象调用它。

    fun main(args: Array<String>) {
        var selectDemo = SelectDemo()
        var result = selectDemo.invoke()
        println("result:$result")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当然,可以进一步简化:

    fun main(args: Array<String>) {
        var selectDemo = SelectDemo()
        var result = selectDemo()
        println("result:$result")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里涉及到了kotlin的语法糖:对象居然可以像函数一样调用。
    作为函数,invoke 当然也可以接收高阶函数作为参数:

        operator fun invoke(block: (Int) -> String): String {
            return block(3)
        }
    
    fun main(args: Array<String>) {
        var selectDemo = SelectDemo()
        var result = selectDemo { age ->
            when (age) {
                3 -> "I'm fish3"
                4 -> "I'm fish4"
                else -> "error"
            }
        }
        println("result:$result")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    因此,当看到对象作为函数调用时,实际上调用的是invoke函数,具体的逻辑需要查看其invoke函数的实现。

    4. Select 的原理

    上篇分析过Channel,因此本篇趁热打铁,通过Select 监听Channel数据的变化来分析其原理,为方便讲解,我们先以监听一个Channel的为例。
    先从select 表达式本身入手。

        fun testSelect5() {
            runBlocking {
                var starTime = System.currentTimeMillis()
                var receiveChannelZxing = produce {
                    //发送数据
                    send("I'm fish")
                }
    
                //确保channel 数据已经send
                delay(1000)
                var result = select<String> {
                    //监听是否有数据发送过来
                    receiveChannelZxing.onReceive { value ->
                        "zxing result $value"
                    }
                }
                println("result from $result useTime:${System.currentTimeMillis() - starTime}")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    select 是挂起函数,因此协程运行到此有可能被挂起。

    #Select.kt
    public suspend inline fun  select(crossinline builder: SelectBuilder.() -> Unit): R {
        //...
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            //传入父协程体
            val scope = SelectBuilderImpl(uCont)
            try {
                //执行builder
                builder(scope)
            } catch (e: Throwable) {
                scope.handleBuilderException(e)
            }
            //通过返回值判断是否需要挂起协程
            scope.getResult()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    重点看builder(scope),builder 是高阶函数,实际上就是执行了select花括号里的内容,而它里面就是监听数据是否返回。

    receiveChannelZxing.onReceive
    刚开始看的时候势必以为onReceive是个函数,然而它是ReceiveChannel 里的成员变量:

    #Channel.kt
        public val onReceive: SelectClause1<E>
    
    • 1
    • 2

    通过上一节的分析可知,关键是要找到SelectClause1 的invoke的实现。

    #Select.kt
    public interface SelectBuilder<in R> {
        //block 有个入参
        //声明了SelectClause1的扩展函数invoke
        public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
    }
    
    override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
        //SelectBuilderImpl 实现了 SelectClause1 的invoke函数
        registerSelectClause1(this@SelectBuilderImpl, block)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    再看onReceive 的赋值:

    #AbstractChannel.kt
    final override val onReceive: SelectClause1<E>
        get() = object : SelectClause1<E> {
            @Suppress("UNCHECKED_CAST")
            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
                registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    因此,简单总结调用栈如下:

    当调用receiveChannelZxing.onReceive{},实际上调用了SelectClause1.invoke(),而它里面又调用了SelectClause1.registerSelectClause1(),最终调用了AbstractChannel.registerSelectReceiveMode。

    AbstractChannel. registerSelectReceiveMode

    #AbstractChannel.kt
    private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
        while (true) {
            //如果已经有结果了,则直接返回------->①
            if (select.isSelected) return
            if (isEmptyImpl) {
                //没有发送者在等待,则入队等待,并返回 ------->②
                if (enqueueReceiveSelect(select, block, receiveMode)) return
            } else {
                //直接取出值------->③
                val pollResult = pollSelectInternal(select)
                when {
                    pollResult === ALREADY_SELECTED -> return
                    pollResult === POLL_FAILED -> {} // retry
                    pollResult === RETRY_ATOMIC -> {} // retry
                    //调用block------->④
                    else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    分为4个点,接着来一一分析。

    select 同时监听多个值,若是有1个符合要求的数据返回了,那么该isSelected 标记为true,当检测到该标记为true时直接退出。
    结合之前的Demo,zbar 已经识别出结果了,当select 检测zxing的结果时直接返回。

    #AbstractChannel.kt
    private fun <R> enqueueReceiveSelect(
        select: SelectInstance<R>,
        block: suspend (Any?) -> R,
        receiveMode: Int
    ): Boolean {
        //构造为Node元素
        val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode)
        //添加到Channel队列里
        val result = enqueueReceive(node)
        if (result) select.disposeOnSelect(node)
        return result
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    当select 时,发现Channel里没有数据,说明Channel还没有开始send,因此构造了Node(ReceiveSelect)加入到Channel queue里。当send数据时,会查找queue里是否有接收者等待,若有则调用Node(ReceiveSelect.completeResumeReceive):

    #AbstractChannel.kt
            override fun completeResumeReceive(value: E) {
                block.startCoroutineCancellable(
                    if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
                    select.completion,
                    resumeOnCancellationFun(value)
                )
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    block 被调度执行,最后会恢复select 协程的执行。


    取出数据,并尝试恢复send协程。


    在③的基础上,拿到数据后,直接执行block(此时并没有切换线程进行调度)。

    小结一下select 原理:
    image.png

    可以看出:

    select 本身执行并不耗时,若最终没有数据返回则挂起等待,若是有数据返回则不会挂起协程。

    我们从头再捋一下select 配合Channel 的原理:
    image.png

    虽然以Channel为例讲解了select 原理,实际上async等结合select 原理大致差不多,重点都是利用了协程的挂起/恢复做文章。

    5. Select 注意事项

    如果select有多个数据同时到达,select 默认会选择第一个数据,若想要随机选择数据,可做如下处理:

                var result = selectUnbiased<String> {
                    //监听是否有数据发送过来
                    receiveChannelZxing.onReceive { value ->
                        "zxing result $value"
                    }
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    想要知道select 还可以监听哪些数据,可查看该数据是否实现了SelectClauseX(X 表示0、1、2)。

    以上即为Select 的原理及其使用,下篇将会进入协程的精华部分:Flow的运用,该部分内容较多,可能会分几篇分析,敬请期待。

    本文基于Kotlin 1.5.3,文中完整Demo请点击

    您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

  • 相关阅读:
    ==和equals的区别
    【ESP32_8266_MQTT篇】
    Mysql:锁
    Ceph入门到静态-deep scrub 深度清理处理
    # 从浅入深 学习 SpringCloud 微服务架构(六)Feign(2)
    继承框架 - 秒杀接口实现
    计算机毕业设计之java+javaweb的外婆家网上订餐平台
    K_A01_001 基于单片机驱动WS2812 点灯流水灯 0-9显示
    数据结构复习题(二)
    Android12开发之窗口模糊功能的实现
  • 原文地址:https://blog.csdn.net/wekajava/article/details/126808287