• Kotlin 协程 - 多路复用 select()


    一、概念

            又叫选择表达式,是一个挂起函数,可以同时等待多个挂起结果,只取用最快恢复的那个值(即多种方式获取数据,哪个更快返回结果就用哪个)。

            同时到达 select() 会优先选择先写子表达式,想随机(公平)的话使用 selectUnbiased() 替换 。

            能被选择的都是 SelectClauseN 函数类型。

    public suspend inline fun select(crossinline builder: SelectBuilder.() -> Unit): R 

    public sealed interface SelectBuilder {

            public operator fun SelectClause0.invoke(block: suspend () -> R)
            public operator fun SelectClause1.invoke(block: suspend (Q) -> R)
            public operator fun SelectClause2.invoke(param: P, block: suspend (Q) -> R)
            public operator fun SelectClause2.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
    }

    SelectClause0对应事件没有返回值。例如 job.onJoin。
    SelectClause1对应事件有返回值。例如 deffered.onAwait 和 channel.onReceive。
    SelectClause2对应事件有返回值。此外还需要一个额外的参数,例如 Channel.onSend() 有两个参数,第一个是 Channel 数据类型的值表示即将发送的值,第二个是发送成功时的回调函数。

    二、使用

    在使用 async() 启动协程的返回类型 Deferred 中,定义了 SelectClause1 函数类型的变量 onAwait,其作用和 await() 一样,只是当其在 select() 中作为子语句时,具有“同时等待看谁最先返回”的效果。同理其它。

    2.1 复用多个 job.onJoin

    1. fun main() = runBlocking<Unit> {
    2. val job1 = launch {
    3. delay(100)
    4. println("job 1")
    5. }
    6. val job2 = launch {
    7. delay(10)
    8. println("job 2")
    9. }
    10. select {
    11. job1.onJoin { println("job 1 更快") }
    12. job2.onJoin { println("job 2 更快") }
    13. }
    14. delay(1000)
    15. }
    16. //打印:
    17. //job 2
    18. //job 2 更快
    19. //job 1

    2.2 复用多个 deffered.onAwait

    public interface Deferred : Job {
            public val onAwait: SelectClause1        //等效await()
    1. fun main() = runBlocking {
    2. val defferedCache = async {
    3. delay(10)
    4. "Cache"
    5. }
    6. val defferedLocal = async {
    7. delay(100)
    8. "Local"
    9. }
    10. val defferedRemote = async {
    11. delay(1000)
    12. "Remote"
    13. }
    14. val result = select {
    15. defferedCache.onAwait { println("最快的是$it") }
    16. defferedLocal.onAwait { println("最快的是$it") }
    17. defferedRemote.onAwait { println("最快的是$it") }
    18. }
    19. delay(2000)
    20. println(result) //打印:最快的是Cache
    21. }

    2.3 复用多个 channel.onReceive

    public interface SendChannel {

            public val onSend: SelectClause2>        //等效send()

    }

    public interface ReceiveChannel {

            public val onReceive: SelectClause1        //等效receive()

            public suspend fun receiveCatching(): ChannelResult        //等效receiveCatching()

    }

    //select() 中的 onReceive() 在已经关闭的通道执行会发生失败,并导致相应的 select() 抛出异常,使用 onReceiveCatching() 在关闭通道时执行特定操作。

    1. suspend fun getDataFromLocal() = withContext(Dispatchers.IO) { "Local" }
    2. suspend fun getDataFromRemote() = withContext(Dispatchers.IO) { "Remote" }
    3. @OptIn(ExperimentalCoroutinesApi::class)
    4. fun main() = runBlocking {
    5. val produceLocal = produce { send(getDataFromLocal()) }
    6. val produceRemote = produce { send(getDataFromRemote()) }
    7. val result = select {
    8. produceLocal.onReceive { it }
    9. produceRemote.onReceive { it }
    10. }
    11. // val result = select {
    12. // produceLocal.onReceiveCatching { it.getOrNull() ?: "Channel已关闭:produceLocal" }
    13. // produceRemote.onReceiveCatching { it.getOrNull() ?: "Channel已关闭:produceRemote " }
    14. // }
    15. println("结果更快的是:$result")
    16. }
  • 相关阅读:
    四旋翼无人机学习第6节--SPL06气压传感器电路分析
    找出多组分辨率中最接近目标值的一组
    20个Golang片段让我不再健忘
    [正式学习java①]——java项目结构,定义类和创建对象,一个标准javabean的书写
    vue2知识点————(父子通信)
    勒索病毒处置
    69-Java常用API:Object、Objects、StringBuilder、Math、System、BigDecimal
    【Pytorch Lighting】第 4 章:Lightning Flash 中的即食模型
    Android HIDL(1) ---- 概述
    rfc7234之http缓存
  • 原文地址:https://blog.csdn.net/HugMua/article/details/132613352