根据接收端节点数量以及接收方式,可分为三类。
单播 Unicast | 一对一,对单个订阅者发送。 | |
组播 Multicast | 互斥性:一对多,分别对单个订阅者发送。多个订阅者互斥,收到的值不是同一个。 | 同步性:元素只能被一个接收端消费。 |
广播 BroadCast | 共享性:一对多,同时对全体订阅者发送。多个订阅者共享,接收到的值是同一个。 | 并发性:一次发送多处消费。 |
扇入 Fan-in | 指直接调用该模块的上级模块的个数(多个发送端),即多个协程可能会向同一个Channel发送值。扇入大表示模块的复用程序高。 |
扇出 Fan-out | 指该模块直接调用的下级模块的个数(多个接收端),即多个协程可能会从同一个Channel中接收值。扇出大表示模块的复杂度高。 |
出现在Flow之前,现在退居幕后职责单一,仅作为协程间通信的并发安全的缓冲队列而存在。
SendChannel在创建时定义了消费方式外界只能往里发送值,ReceiveChannel在创建时定义了生产方式外界只能从中获取值,Channel继承了它俩既能发送也能接收,根据实际需求暴露不同类型收窄功能。
SendChannel 生产者通道 | send() | public suspend fun send(element: E) 将值发送到Channel,缓冲区已满时将被挂起,直到有旧值被消费腾出空间。如果Channel已经关闭,调用会抛异常 |
trySend() | public fun trySend(element: E): ChannelResult 如果不违反其容量限制,则立即将指定元素添加到此通道,并返回成功结果。否则返回失败或关闭的结果。 | |
close() | public fun close(cause: Throwable? = null): Boolean Channel不会自动停止,导致程序也不会停止。手动调用后会停止发送新值,此时 isClosedForSend() 会返回true。会保证缓冲区里已经发送的值在关闭前被接收到,等所有元素被接收后 isClosedForReceive() 会返回true。具有原子性。 | |
isClosedForSend() | public val isClosedForSend: Boolean 判断通道是否已经关闭,如果关闭,调用 send 会引发异常。 | |
ReceiveChannel 消费者通道 | receive() | public suspend fun receive(): E 当Channel中有值时就从中消费,没有值时将被挂起,直到有新值被发送进来。如果Channel已经关闭,调用会抛异常 |
tryReceive() | public fun tryReceive(): ChannelResult 当Channel中有值时就从中消费,并返回成功结果。否则返回失败或关闭的结果。 | |
receiveCatching() | public suspend fun receiveCatching(): ChannelResult 如果此通道不为空,则从中检索并删除元素,返回成功结果;如果通道为空,则返回失败结果;如果通道关闭,则返回关闭的原因。 | |
isEmpty() | public val isEmpty: Boolean 判断Channel是否为空 | |
isClosedForReceive() | public val isClosedForReceive: Boolean 判断Channel是否已经关闭,如果关闭,调用 receive 会引发异常。 | |
cancel() | public fun cancel(cause: CancellationException? = null) 以可选原因取消接收此Channel的剩余元素。此函数用于关闭Channel并从中删除所有缓冲发送的元素。 | |
iterator() | public operator fun iterator(): ChannelIterator 返回通道的迭代器。 | |
consumeEach() | public suspend inline fun 遍历所有元素并执行给定操作。 |
- fun main() = runBlocking {
- private val _channel = Channel
() - val receiveChannel: ReceiveChannel
= _channel - val sendChannel: SendChannel
= _channel -
- _channel.send("Channel发送")
- sendChannel.send("SendChannel发送")
- delay(1000)
- _channel.consumeEach { println("Channel接收:$it") }
- receiveChannel.consumeEach { println("ReceiveChannel接收:$it") }
- }
创建时在协程构建器 actor() 的 Lambda 中定义了数据的消费方式,返回一个生产者通道 SendChannel,其它协程通过该对象往里发送数据。
public fun context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope ): SendChannel |
- fun main() = runBlocking {
- //创建生产者通道
- val send: SendChannel<Int> = actor {
- for (i in channel) println(i) //定义了消费方式
- }
- //其它协程拿来发送数据
- launch {
- (1..3).forEach { sendChannel.send(it) }
- }
- }
创建时在协程构建器 produce() 的 Lambda 中定义了数据的生产方式,返回一个消费者通道 ReceiveChannel,其它协程通过该对象从中取出数据。
public fun |
- fun main() = runBlocking {
- //创建消费者通道
- val receiveChannel: ReceiveChannel<Int> = produce {
- (1..3).forEach { send(it) } //定义了生产方式
- }
- //其它协程拿来接收数据
- launch {
- for (i in receiveChannel) println(i)
- }
- }
public fun capacity: Int = RENDEZVOUS, //容量 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, //容量溢出后策略 onUndeliveredElement: ((E) -> Unit)? = null //异常回调 ): Channel |
capacity 缓冲区容量 | RENDEZVOUS(默认):或指定为0。缓冲区大小为0且为SUSPEND策略,每次一个值receive和send同步交替,另一方没准备好自己就会挂起。(并发安全) CONFLATED:或指定为1。只有1个值,新值覆盖旧值,send永不挂起,如果没有数据receive会被挂起。此时溢出策略只能设为SUSPEND。 BUFFERED:或指定为-2(默认为64个值)或指为>1的具体值。缓冲区满了后根据溢出策略决定send是否被挂起(挂起直到消费后腾出空间),缓冲区为空时receive会被挂起。 UNLIMITED:无限容量(Int.MAX_VALUE)。send永不挂起,能一直往里发送数据,无数据时receive会被挂起。 |
onBufferOverflow 缓冲区满后溢出策略 | 当容量 >= 0 或容量 == Channel.BUFFERED 时才会触发。 BufferOverflow.SUSPEND满了后send会挂起。 BufferOverflow.DROP_OLDEST丢弃最旧的值,发送新值。 BufferOverflow.DROP_LATEST丢弃新值。 |
onUndeliveredElement 异常回调 | 数据没有被receive时回调,丢弃的值会在这里收到。通常用来关闭由Channel发送的资源(值)。 |
- val rendezvousChannel = Channel
() //约会类型 - val bufferedChannel = Channel
(10) //指定缓存大小类型 - val conflatedChannel = Channel
(Channel.CONFLATED) //混合类型 - val unlimitedChannel = Channel
(Channel.UNLIMITED) //无限缓存大小类型
- fun main() = runBlocking {
- val channel = Channel<Int>(capacity = Channel.CONFLATED) {
- println("onUndeliveredElement: $it")
- }
- launch {
- (1..3).forEach {
- println("send: $it")
- channel.send(it)
- }
- channel.close() //不要忘记关闭
- }
- launch {
- for (i in channel) {
- println("receive: $i")
- }
- }
- println("结束!")
- }
-
- 打印:
- 结束!
- send: 1
- send: 2
- onUndeliveredElement: 1
- send: 3
- onUndeliveredElement: 2
- receive: 3
- fun main(): Unit = runBlocking {
- val channel = Channel
(Channel.UNLIMITED) - repeat(5) { channel.send("$it") }
- val iterator = channel.iterator()
- while (iterator.hasNext()) {
- println("【iterator】${iterator.next()}")
- delay(1000)
- }
- //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
- println("这行执行不到")
- }
不会消费掉 Channel 中的值。
- fun main():Unit = runBlocking {
- val channel = Channel
(Channel.UNLIMITED) - repeat(5) { channel.send("$it") }
- for (i in channel) {
- println("【for】$i,")
- delay(1000)
- }
- //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
- println("这行执行不到")
- }
会消费掉 Channel 中的值。
- fun main(): Unit = runBlocking {
- val channel = Channel
(Channel.UNLIMITED) - repeat(5) { channel.send("$it") }
- channel.consumeEach {
- println("【consumeEach】$it,")
- }
- //5个元素打印完后程序没结束,Channel不关闭,后面代码执行不到
- println("这行执行不到")
- }
Channel 转成 Flow 是为了使用那些方便的操作符,同时 Flow 的很多功能扩展底层由 Channel 实现(如flowOn、buffer)。
receiveAsFlow() | public fun //只能被collect一次,多次收集抛异常。 |
consumeAsFlow() | public fun //能被collect多次,但一个元素只能被消费一次。 |
BroadcastChannel 在协程1.4版本已被废弃,取而代之的是 SharedFlow(StateFlow是它的特定配置版本)。
SharedFlow只能订阅(消费),FlowCollector只能发送(生产),MutableSharedFlow继承了它俩既能发送也能订阅,根据实际需求暴露不同类型收窄功能。接收会一直监听,通过取消协程来关闭它的收集。
SharedFlow | public interface SharedFlow public val replayCache: List } |
FlowCollector | public fun interface FlowCollector public suspend fun emit(value: T) //发送数据 } |
MutableSharedFlow | public interface MutableSharedFlow // 发射数据(注意这是个挂起函数) override suspend fun emit(value: T) // 尝试发射数据(如果缓存溢出策略是 SUSPEND,则溢出时不会挂起而是返回 false) public fun tryEmit(value: T): Boolean // 活跃订阅者数量,将它设为0生产数据就会停止用来释放资源。 public val subscriptionCount: StateFlow //清空当前回放里的历史记录 public fun resetReplayCache() } |
- fun main() = runBlocking {
- //利用多态暴露不同父类限制功能给外部使用
- private val _mutableSharedFlow = MutableSharedFlow
() - val sharedFlow: SharedFlow
= _mutableSharedFlow //或调用 asSharedFlow() - val flowCollector: FlowCollector
= _mutableSharedFlow -
- launch { _mutableSharedFlow.collect(println("mutableSharedFlow接收:$it")) }
- launch { sharedFlow.collect(println("mutableSharedFlow接收:$it")) }
- delay(1000)
- mutableSharedFlow.emit("mutableSharedFlow发送")
- flowCollector.emit("flowCollector发送")
- }
public fun |
onBufferOverflow 缓存溢出策略 | 只有在重播或缓存容量>0时,才支持两种丢弃模式。只有存在订阅者时才会触发缓存溢出。 |
BufferOverflow.SUSPEND:挂起。 BufferOverflow.DROP_OLDEST:丢弃最旧的。 BufferOverflow.DROP_LATEST:丢弃最新的。 |
- suspend fun method(): Unit = coroutineScope { //让3个launch同时进行
- val shared = MutableSharedFlow<Int>(3) //回放3个数据
- launch {
- for(i in 1..5){
- shared.emit(i)
- println("emmit:$i")
- delay(1000) //1秒更新一个值
- }
- }
- //订阅者甲
- launch { shared.collect{ println("甲:$it") } }
- //订阅者乙5秒后再订阅,也就是值全都更新完了再订阅
- delay(5000)
- launch { shared.collect{ println("乙:$it") } }
- }
-
- 打印:
- emmit:1
- 甲:1
- emmit:2
- 甲:2
- emmit:3
- 甲:3
- emmit:4
- 甲:4
- emmit:5
- 甲:5
- 乙:3 //回放了3个已更新过的数据
- 乙:4
- 乙:5
public fun scope: CoroutineScope, //数据共享时所在的协程作用域 started: SharingStarted, //启动策略 replay: Int = 0 //回放,新订阅时得到几个之前已经发射过的旧值。 ): SharedFlow |
started 启动策略 | SharingStarted.Eagerly | 立即发送数据(直到scope结束)。 |
SharingStarted.Lazily | 在首个订阅者观察时才开始发送数据(当订阅者都没了还是活跃的,直到scope结束)。这保证了第一个订阅者能获得所有值,后续订阅者获得最新replay数量的值。 | |
SharingStarted.WhileSubscribed | 在首个订阅者观察时才开始发送数据,直到最后一个订阅者消失时停止,当又有新订阅者时会再次启动,避免引起资源浪费(例如一直从数据库、传感器中读取数据)。提供了两个配置:
|
StateFlow继承自SharedFlow是一种特殊配置,相当于MutableSharedFlow(1,0, BufferOverflow.DROP_OLDEST),可以使用value属性来访问值,可以当作是用来取代LiveData。
StateFlow | public interface StateFlow // 当前值 public val value: T } |
MutableStateFlow | public interface MutableStateFlow // 当前值 public override var value: T // 比较并设置(通过 equals 对比,如果值发生真实变化返回 true) public fun compareAndSet(expect: T, update: T): Boolean } |
public fun 形参value是默认值。 |
- val state = MutableStateFlow(0) //默认值会被覆盖
- launch {
- for(i in 1..5){
- state.emit(i)
- println("emit:$i")
- delay(1000) //1秒更新一个值
- }
- }
- launch {
- delay(2000) //2秒后开始订阅
- state.collect{ println("collect:$it") }
- }
-
- 打印:
- emit:1
- emit:2
- collect:2 //2秒后开始订阅,不会收到之前已更新的数据
- emit:3
- collect:3
- emit:4
- collect:4
- emit:5
- collect:5
public fun |
public suspend fun 挂起函数版本,不用指定默认值,会挂起直到产出第一个值。 |
Channel | SharedFlow |
必须性:事件不会被丢弃且必须执行。 | 时效性:过期的事件没有意义且不应该被延迟消费。 |
互斥性:挨个对订阅者发送,多个订阅者互斥,收到的值不是同一个。 | 共享性:同时对全体订阅者发送,多个订阅者共享,接收到的值是同一个。 |
同步性:事件只能消费一次。 | 并发性:事件会被多处消费。 |
事件按顺序都要执行到,状态只关心最新值。
SharedFlow | StateFlow | |
类型 | 回放和额外缓存都设置为0的话,无订阅者直接丢弃数据,符合时效性事件特点。 | 仅持有单个且最新的数据。 |
初始值 | 无(发生后才处理,不需要默认值) | 有(UI组件应当一直有一个值来表明其状态) |
回放 | 默认0可配置(新的订阅者不重复处理已发生过的事情,或者需要知道之前发生过的事件) | 1(新的订阅者也应该知道当前状态,会出现粘性事件) |
额外缓冲区 | 默认0可配置 | 0(UI组件只显示最新的值) |
缓存模式 | 默认SUSPEND(等待消费) | DROP_OLDEST(UI组件只显示最新的值) |
发送重复的值 | 会消费(事件都应该被处理)。 | 不消费(防抖,无变化不用处理)。 |