• Kotlin 协程 (6/6篇) - 跨协程间通信 Channel


    一、概念

    1.1 通信模式

    根据接收端节点数量以及接收方式,可分为三类。

    单播 Unicast一对一,对单个订阅者发送。
    组播 Multicast互斥性:一对多,分别对单个订阅者发送。多个订阅者互斥,收到的值不是同一个。同步性:元素只能被一个接收端消费。
    广播 BroadCast共享性:一对多,同时对全体订阅者发送。多个订阅者共享,接收到的值是同一个。并发性:一次发送多处消费。

    1.2 扇入扇出

    扇入 Fan-in指直接调用该模块的上级模块的个数(多个发送端),即多个协程可能会向同一个Channel发送值。扇入大表示模块的复用程序高。
    扇出 Fan-out指该模块直接调用的下级模块的个数(多个接收端),即多个协程可能会从同一个Channel中接收值。扇出大表示模块的复杂度高。

    二、组播(通道、队列)

            出现在Flow之前,现在退居幕后职责单一,仅作为协程间通信的并发安全的缓冲队列而存在。

            SendChannel在创建时定义了消费方式外界只能往里发送值,ReceiveChannel在创建时定义了生产方式外界只能从中获取值,Channel继承了它俩既能发送也能接收,根据实际需求暴露不同类型收窄功能。

    • 非阻塞:类似于 Java 中的 BlockingQueue 队列,不同的是 put() 和 take() 读取写入数据是阻塞的,而 Channel 中的 send() 和 receive() 是挂起的。
    • 同步性:每个值只能被众多订阅者中的一个消费。
    • 并发安全:没有检测到 receive() 的话 send() 就会挂起不会发送值(默认模式 RENDEZVOUS)。
    • 公平性:在多个协程中发送或接收(多线程竞争)遵循先进先出(FIFO即队列)。

    SendChannel

    生产者通道

    send()

    public suspend fun send(element: E)

    将值发送到Channel,缓冲区已满时将被挂起,直到有旧值被消费腾出空间。如果Channel已经关闭,调用会抛异常ClosedSendChannelException

    trySend()

    public fun trySend(element: E): ChannelResult

    如果不违反其容量限制,则立即将指定元素添加到此通道,并返回成功结果。否则返回失败或关闭的结果。

    close()

    public fun close(cause: Throwable? = null): Boolean

    Channel不会自动停止,导致程序也不会停止。手动调用后会停止发送新值,此时 isClosedForSend() 会返回true。会保证缓冲区里已经发送的值在关闭前被接收到,等所有元素被接收后 isClosedForReceive() 会返回true。具有原子性

    isClosedForSend()

    public val isClosedForSend: Boolean

    判断通道是否已经关闭,如果关闭,调用 send 会引发异常。

    ReceiveChannel

    消费者通道

    receive()

    public suspend fun receive(): E

    当Channel中有值时就从中消费,没有值时将被挂起,直到有新值被发送进来。如果Channel已经关闭,调用会抛异常ClosedReceiveChannelException

    tryReceive()

    public fun tryReceive(): ChannelResult

    当Channel中有值时就从中消费,并返回成功结果。否则返回失败或关闭的结果。

    receiveCatching()

    public suspend fun receiveCatching(): ChannelResult

    如果此通道不为空,则从中检索并删除元素,返回成功结果;如果通道为空,则返回失败结果;如果通道关闭,则返回关闭的原因。

    isEmpty()

    public val isEmpty: Boolean

    判断Channel是否为空

    isClosedForReceive()

    public val isClosedForReceive: Boolean

    判断Channel是否已经关闭,如果关闭,调用 receive 会引发异常。

    cancel()

    public fun cancel(cause: CancellationException? = null)

    以可选原因取消接收此Channel的剩余元素。此函数用于关闭Channel并从中删除所有缓冲发送的元素。

    iterator()

    public operator fun iterator(): ChannelIterator

    返回通道的迭代器。

    consumeEach()

    public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit): Unit

    遍历所有元素并执行给定操作。

    • send() 和 receive():挂起函数。当接收时,Channel中没有元素,协程将被挂起直到元素可用。当发送时,Channel容量达到阈值,协程将被挂起直到容量可用。
    • trySend() 和 tryReceive():普通函数。从非挂起函数中发送或接收元素,操作是即时的并返回 ChannelResult 对象,包含了有关操作成功或失败的信息。
    • close() 和 cancel():当创建的是 Channel 类型的时候,两个都可以用来关闭,close() 会接收完缓冲区中的值,cancel() 直接关闭。通道使用完一定要关闭,否则程序不会停止造成资源浪费。
    • produce() 和 actor() 被定义成协程构建器(因此会在异常、完成、取消时自动关闭Channel),同 launch、async 一样作为 CoroutineScope 的扩展函数。
    1. fun main() = runBlocking {
    2. private val _channel = Channel()
    3. val receiveChannel: ReceiveChannel = _channel
    4. val sendChannel: SendChannel = _channel
    5. _channel.send("Channel发送")
    6. sendChannel.send("SendChannel发送")
    7. delay(1000)
    8. _channel.consumeEach { println("Channel接收:$it") }
    9. receiveChannel.consumeEach { println("ReceiveChannel接收:$it") }
    10. }

    2.1 创建

    2.1.1 actor() 创建 SendChannel

    创建时在协程构建器 actor() 的 Lambda 中定义了数据的消费方式,返回一个生产者通道 SendChannel,其它协程通过该对象往里发送数据。

    public fun CoroutineScope.actor(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
        start: CoroutineStart = CoroutineStart.DEFAULT,
        onCompletion: CompletionHandler? = null,
        block: suspend ActorScope.() -> Unit
    ): SendChannel
    1. fun main() = runBlocking {
    2. //创建生产者通道
    3. val send: SendChannel<Int> = actor {
    4. for (i in channel) println(i) //定义了消费方式
    5. }
    6. //其它协程拿来发送数据
    7. launch {
    8. (1..3).forEach { sendChannel.send(it) }
    9. }
    10. }

    2.1.2 produce() 创建 ReceiveChannel

    创建时在协程构建器 produce() 的 Lambda 中定义了数据的生产方式,返回一个消费者通道 ReceiveChannel,其它协程通过该对象从中取出数据。

    public fun CoroutineScope.produce(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = 0,
        @BuilderInference block: suspend ProducerScope.() -> Unit
    ): ReceiveChannel

    1. fun main() = runBlocking {
    2. //创建消费者通道
    3. val receiveChannel: ReceiveChannel<Int> = produce {
    4. (1..3).forEach { send(it) } //定义了生产方式
    5. }
    6. //其它协程拿来接收数据
    7. launch {
    8. for (i in receiveChannel) println(i)
    9. }
    10. }

    2.1.3 构造创建 Channel

    public fun Channel(
        capacity: Int = RENDEZVOUS,        //容量
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,        //容量溢出后策略
        onUndeliveredElement: ((E) -> Unit)? = null        //异常回调
    ): Channel

    capacity

    缓冲区容量

    RENDEZVOUS(默认):或指定为0。缓冲区大小为0且为SUSPEND策略,每次一个值receive和send同步交替,另一方没准备好自己就会挂起。(并发安全)

    CONFLATED:或指定为1。只有1个值,新值覆盖旧值,send永不挂起,如果没有数据receive会被挂起。此时溢出策略只能设为SUSPEND。

    BUFFERED:或指定为-2(默认为64个值)或指为>1的具体值。缓冲区满了后根据溢出策略决定send是否被挂起(挂起直到消费后腾出空间),缓冲区为空时receive会被挂起。

    UNLIMITED:无限容量(Int.MAX_VALUE)。send永不挂起,能一直往里发送数据,无数据时receive会被挂起。

    onBufferOverflow

    缓冲区满后溢出策略

    当容量 >= 0 或容量 == Channel.BUFFERED 时才会触发。

    BufferOverflow.SUSPEND满了后send会挂起。

    BufferOverflow.DROP_OLDEST丢弃最旧的值,发送新值。

    BufferOverflow.DROP_LATEST丢弃新值。

    onUndeliveredElement

    异常回调

    数据没有被receive时回调,丢弃的值会在这里收到。通常用来关闭由Channel发送的资源(值)。
    1. val rendezvousChannel = Channel() //约会类型
    2. val bufferedChannel = Channel(10) //指定缓存大小类型
    3. val conflatedChannel = Channel(Channel.CONFLATED) //混合类型
    4. val unlimitedChannel = Channel(Channel.UNLIMITED) //无限缓存大小类型
    1. fun main() = runBlocking {
    2. val channel = Channel<Int>(capacity = Channel.CONFLATED) {
    3. println("onUndeliveredElement: $it")
    4. }
    5. launch {
    6. (1..3).forEach {
    7. println("send: $it")
    8. channel.send(it)
    9. }
    10. channel.close() //不要忘记关闭
    11. }
    12. launch {
    13. for (i in channel) {
    14. println("receive: $i")
    15. }
    16. }
    17. println("结束!")
    18. }
    19. 打印:
    20. 结束!
    21. send: 1
    22. send: 2
    23. onUndeliveredElement: 1
    24. send: 3
    25. onUndeliveredElement: 2
    26. receive: 3

    2.2 遍历元素

    2.2.1 迭代器 iterate

    1. fun main(): Unit = runBlocking {
    2. val channel = Channel(Channel.UNLIMITED)
    3. repeat(5) { channel.send("$it") }
    4. val iterator = channel.iterator()
    5. while (iterator.hasNext()) {
    6. println("【iterator】${iterator.next()}")
    7. delay(1000)
    8. }
    9. //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
    10. println("这行执行不到")
    11. }

    2.2.2 for循环

    不会消费掉 Channel 中的值。

    1. fun main():Unit = runBlocking {
    2. val channel = Channel(Channel.UNLIMITED)
    3. repeat(5) { channel.send("$it") }
    4. for (i in channel) {
    5. println("【for】$i,")
    6. delay(1000)
    7. }
    8. //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
    9. println("这行执行不到")
    10. }

    2.2.3 consumeEach( )

    会消费掉 Channel 中的值。

    1. fun main(): Unit = runBlocking {
    2. val channel = Channel(Channel.UNLIMITED)
    3. repeat(5) { channel.send("$it") }
    4. channel.consumeEach {
    5. println("【consumeEach】$it,")
    6. }
    7. //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
    8. println("这行执行不到")
    9. }

    2.3 转换

    Channel 转成 Flow 是为了使用那些方便的操作符,同时 Flow 的很多功能扩展底层由 Channel 实现(如flowOn、buffer)。

    receiveAsFlow()

    public fun ReceiveChannel.receiveAsFlow(): Flow = ChannelAsFlow(this, consume = false)

    //只能被collect一次,多次收集抛异常。

    consumeAsFlow()

    public fun ReceiveChannel.consumeAsFlow(): Flow = ChannelAsFlow(this, consume = true)

    //能被collect多次,但一个元素只能被消费一次。

    三、广播

    BroadcastChannel 在协程1.4版本已被废弃,取而代之的是 SharedFlow(StateFlow是它的特定配置版本)。

    3.1 SharedFlow

    SharedFlow只能订阅(消费),FlowCollector只能发送(生产),MutableSharedFlow继承了它俩既能发送也能订阅,根据实际需求暴露不同类型收窄功能。接收会一直监听,通过取消协程来关闭它的收集。

    SharedFlowpublic interface SharedFlow : Flow {
        public val replayCache: List        //缓存的重放数据的快照
    }
    FlowCollectorpublic fun interface FlowCollector {
        public suspend fun emit(value: T)        //发送数据
    }
    MutableSharedFlowpublic interface MutableSharedFlow : SharedFlow, FlowCollector {
        // 发射数据(注意这是个挂起函数)
        override suspend fun emit(value: T)
        // 尝试发射数据(如果缓存溢出策略是 SUSPEND,则溢出时不会挂起而是返回 false)
        public fun tryEmit(value: T): Boolean
        // 活跃订阅者数量,将它设为0生产数据就会停止用来释放资源。
        public val subscriptionCount: StateFlow
        //清空当前回放里的历史记录
        public fun resetReplayCache()
    }
    1. fun main() = runBlocking {
    2. //利用多态暴露不同父类限制功能给外部使用
    3. private val _mutableSharedFlow = MutableSharedFlow()
    4. val sharedFlow: SharedFlow = _mutableSharedFlow //或调用 asSharedFlow()
    5. val flowCollector: FlowCollector = _mutableSharedFlow
    6. launch { _mutableSharedFlow.collect(println("mutableSharedFlow接收:$it")) }
    7. launch { sharedFlow.collect(println("mutableSharedFlow接收:$it")) }
    8. delay(1000)
    9. mutableSharedFlow.emit("mutableSharedFlow发送")
    10. flowCollector.emit("flowCollector发送")
    11. }

    3.1.1 通过构造创建

    public fun MutableSharedFlow(
        replay: Int = 0,        //回放,观察者订阅后能得到几个订阅前已经发出过的旧值。
        extraBufferCapacity: Int = 0,        //额外缓存容量,加上replay才是总共的缓存数量。
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND        //缓存溢出策略。
    ): MutableSharedFlow

    onBufferOverflow

    缓存溢出策略

    只有在重播或缓存容量>0时,才支持两种丢弃模式。只有存在订阅者时才会触发缓存溢出。

    BufferOverflow.SUSPEND:挂起。

    BufferOverflow.DROP_OLDEST:丢弃最旧的。

    BufferOverflow.DROP_LATEST:丢弃最新的。

    1. suspend fun method(): Unit = coroutineScope { //让3个launch同时进行
    2. val shared = MutableSharedFlow<Int>(3) //回放3个数据
    3. launch {
    4. for(i in 1..5){
    5. shared.emit(i)
    6. println("emmit:$i")
    7. delay(1000) //1秒更新一个值
    8. }
    9. }
    10. //订阅者甲
    11. launch { shared.collect{ println("甲:$it") } }
    12. //订阅者乙5秒后再订阅,也就是值全都更新完了再订阅
    13. delay(5000)
    14. launch { shared.collect{ println("乙:$it") } }
    15. }
    16. 打印:
    17. emmit:1
    18. 甲:1
    19. emmit:2
    20. 甲:2
    21. emmit:3
    22. 甲:3
    23. emmit:4
    24. 甲:4
    25. emmit:5
    26. 甲:5
    27. 乙:3 //回放了3个已更新过的数据
    28. 乙:4
    29. 乙:5

    3.1.2 转换 Flow → SharedFlow

    public fun Flow.shareIn(
        scope: CoroutineScope,        //数据共享时所在的协程作用域
        started: SharingStarted,        //启动策略
        replay: Int = 0        //回放,新订阅时得到几个之前已经发射过的旧值。
    ): SharedFlow

    started

    启动策略

    SharingStarted.Eagerly立即发送数据(直到scope结束)。
    SharingStarted.Lazily在首个订阅者观察时才开始发送数据(当订阅者都没了还是活跃的,直到scope结束)。这保证了第一个订阅者能获得所有值,后续订阅者获得最新replay数量的值。
    SharingStarted.WhileSubscribed

    在首个订阅者观察时才开始发送数据,直到最后一个订阅者消失时停止,当又有新订阅者时会再次启动,避免引起资源浪费(例如一直从数据库、传感器中读取数据)。提供了两个配置:

    • stopTimeoutMillis:超时时间。最后一个订阅者消失后,数据流继续活跃多久用于等待新订阅者,默认值0表示立刻停止。避免订阅者都消失后就马上关闭数据流(例如不想UI有那么几秒不再监听就停止)。
    • replayExpirationMillis:回放过期时间。数据流停止后,保留回放数据的超时时间,默认值 Long.MAX_VALUE 表示永久保存。

    3.2 StateFlow

    StateFlow继承自SharedFlow是一种特殊配置,相当于MutableSharedFlow(1,0, BufferOverflow.DROP_OLDEST),可以使用value属性来访问值,可以当作是用来取代LiveData。

    • 必须传入默认值:Null安全。
    • 回放个数1+额外缓冲区大小0:只持有1个值。
    • 缓存策略DROP_OLDEST:只持有最新值(新值会替换旧值)。
    • 数据防抖:即仅在新值内容发生变化才会消费。
    StateFlowpublic interface StateFlow : SharedFlow {
        // 当前值
        public val value: T
    }
    MutableStateFlowpublic interface MutableStateFlow : StateFlow, MutableSharedFlow {
        // 当前值
        public override var value: T
        // 比较并设置(通过 equals 对比,如果值发生真实变化返回 true)
        public fun compareAndSet(expect: T, update: T): Boolean
    }

    3.2.1 通过构造创建

    public fun MutableStateFlow(value: T): MutableStateFlow = StateFlowImpl(value ?: NULL)

    形参value是默认值。

    1. val state = MutableStateFlow(0) //默认值会被覆盖
    2. launch {
    3. for(i in 1..5){
    4. state.emit(i)
    5. println("emit:$i")
    6. delay(1000) //1秒更新一个值
    7. }
    8. }
    9. launch {
    10. delay(2000) //2秒后开始订阅
    11. state.collect{ println("collect:$it") }
    12. }
    13. 打印:
    14. emit:1
    15. emit:2
    16. collect:2 //2秒后开始订阅,不会收到之前已更新的数据
    17. emit:3
    18. collect:3
    19. emit:4
    20. collect:4
    21. emit:5
    22. collect:5

    3.2.2 转换 Flow → StateFlow

    public fun Flow.stateIn(
        scope: CoroutineScope,        //数据共享时所在的协程作用域
        started: SharingStarted,        //启动策略
        initialValue: T        //默认值
    ): StateFlow

    public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow {
        val config = configureSharing(1)
        val result = CompletableDeferred>()
        scope.launchSharingDeferred(config.context, config.upstream, result)
        return result.await()
    }

    挂起函数版本,不用指定默认值,会挂起直到产出第一个值。

    四、区别及选择

    4.1 组播(Channel) or 广播(SharedFlow)

    ChannelSharedFlow
    必须性:事件不会被丢弃且必须执行。时效性:过期的事件没有意义且不应该被延迟消费。
    互斥性:挨个对订阅者发送,多个订阅者互斥,收到的值不是同一个。共享性:同时对全体订阅者发送,多个订阅者共享,接收到的值是同一个。
    同步性:事件只能消费一次。并发性:事件会被多处消费。

    4.2 事件(Event) or 状态(State)

    事件按顺序都要执行到,状态只关心最新值。

    SharedFlowStateFlow
    类型回放和额外缓存都设置为0的话,无订阅者直接丢弃数据,符合时效性事件特点。仅持有单个且最新的数据。
    初始值无(发生后才处理,不需要默认值)有(UI组件应当一直有一个值来表明其状态)
    回放

    默认0可配置(新的订阅者不重复处理已发生过的事情,或者需要知道之前发生过的事件)

    1(新的订阅者也应该知道当前状态,会出现粘性事件)

    额外缓冲区默认0可配置

    0(UI组件只显示最新的值)

    缓存模式默认SUSPEND(等待消费)DROP_OLDEST(UI组件只显示最新的值)
    发送重复的值会消费(事件都应该被处理)。

    不消费(防抖,无变化不用处理)。

  • 相关阅读:
    android: android:onClick=“@{() -> listener.onItemClick(viewModel)}“
    【Java 基础篇】Java transient 关键字详解:对象序列化与非序列化字段
    Swift中实现用户输入防抖动的两种方法
    [.NET学习] EFCore学习之旅 -3 一些其他的迁移命令
    wps excel js编程
    详解MES系统在质检管理中的多角度应用
    Spring中是否可以存在两个相同ID的bean
    redis缓存问题(数据库一致性,穿透,雪崩,击穿)
    java毕业设计——基于java+Eclipse的扫雷游戏设计与实现(毕业论文+程序源码)——扫雷游戏
    uniapp之路由中携带参数跳转
  • 原文地址:https://blog.csdn.net/HugMua/article/details/126722843