协程系列文章:
- 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
- 少年,你可知 Kotlin 协程最初的样子?
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇)
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇)
- Kotlin 协程调度切换线程是时候解开真相了
- Kotlin 协程之线程池探索之旅(与Java线程池PK)
- Kotlin 协程之取消与异常处理探索之旅(上)
- Kotlin 协程之取消与异常处理探索之旅(下)
- 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用
- 继续来,同我一起撸Kotlin Channel 深水区
- Kotlin 协程 Select:看我如何多路复用
- Kotlin Sequence 是时候派上用场了
- Kotlin Flow 背压和线程切换竟然如此相似
- Kotlin Flow啊,你将流向何方?
- Kotlin SharedFlow&StateFlow 热流到底有多热?
之前文章都是分析单个协程的原理、特性以及使用,本篇文章将着重分析协程间的通信方式。
通过本篇文章,你将了解到:
- Channel的引入及简单使用
- Channel的原理
- Channel四种类型深入解析
- produce/actor的使用与原理
先来看一个简单的通信Demo:
fun testChannel() {
//协程1
var deferred= GlobalScope.async {
//假装在加工数据
Thread.sleep(2000)
"Hello fishforest"
}
//协程2
GlobalScope.launch {
var result = deferred.await()
println("get result from coroutine1: $result")
}
}
如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。
现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:
共享一个变量,这个变量可以是个队列。
于是Demo改造如下:
fun testChannel2() {
//阻塞队列
var queue = ArrayBlockingQueue<String>(5)
//协程1
GlobalScope.launch {
var count = 0
while (true) {
//假装在加工数据
Thread.sleep(1000)
queue.put("fish ${count++}")
}
}
//协程2
GlobalScope.launch {
while (true) {
Thread.sleep(1000)
println("get result from coroutine1:${queue.take()}")
}
}
}
通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。
看似美好,实际上此处有个很大的漏洞:
队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。
我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。
同样的需求,我们用Channel 实现:
fun testChannel3() {
//定义Channel
var channel = Channel<String>()
//协程1
GlobalScope.launch {
var count = 0
while (true) {
//假装在加工数据
Thread.sleep(1000)
var sendStr = "fish ${count++}"
println("send $sendStr")
channel.send("$sendStr")
}
}
//协程2
GlobalScope.launch {
while (true) {
Thread.sleep(1000)
println("receive:${channel.receive()}")
}
}
}
与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。
查看打印结果:
你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列?
要想解开这个谜题,最好的方法是从源码入手深究其原理。
先从Channel 构造开始:
#Channel.kt
public fun <E> Channel(
//Channel 容量/叫做Channel类型更合理一些
capacity: Int = Channel.RENDEZVOUS,
//缓冲区满后,发送端的处理方式
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
//信息没有传递出去时的回调
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
//默认是约会模式
Channel.RENDEZVOUS -> {
//默认挂起
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
//...
}
此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。
RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。
重点在AbstractChannel/AbstractSendChannel及其子类里。
AbstractSendChannel 里有个很重要的成员变量:
protected val queue = LockFreeLinkedListHead()
LockFreeLinkedListHead 继承自LockFreeLinkedListNode,而这个Node 我们在分析Kotlin 协程之取消与异常处理探索之旅(上) 有提及过,此处再拎出来说说。
先看其定义:
#LockFreeLinkedList.kt
public actual open class LockFreeLinkedListNode {
//后驱指针
private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
//前驱指针
private val _prev = atomic(this)
//...
}
很典型的一个链表结构,并且是无锁链表,意思是它的插入/删除是无需上锁的,核心是使用了CAS。
回到Channel里的成员变量queue,初始的链表结构如下:
可以看出,当前节点的_next、_prev分别指向自己。
当往链表里面添加Node时,形成如下结构:
链表头为固定节点,通过它构造了双向循环链表。
AbstractSendChannel 里的queue 就是个链表头,通过它我们可以快速找到链表里的第1个节点(_next 指向的节点),也可以快速找到链表的最后一个节点(_prev指向的节点)。
于是形成了一个队列结构,每次往队列里放入数据,就放到链表的尾部,每次从队列里取数据,就从链表头后的第一个节点取。
send 分析
#AbstractChannel.kt
public final override suspend fun send(element: E) {
//快速判断是否可以放入queue 队列
//若能成功,则直接返回
if (offerInternal(element) === kotlinx.coroutines.channels.OFFER_SUCCESS) return
//不能退出,则挂起协程
return sendSuspend(element)
}
protected open fun offerInternal(element: E): Any {
while (true) {
//先找到队列第一个Node节点,如果存在并且是Receive 类型,说明有接收者在等待
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
//给接收者协程赋值
val token = receive.tryResumeReceive(element, null)
if (token != null) {
kotlinx.coroutines.assert { token === RESUME_TOKEN }
//重新恢复接收者协程
receive.completeResumeReceive(element)
//返回插入的结果
return receive.offerResult
}
}
}
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
//suspendCancellableCoroutineReusable 里有挂起协程的逻辑
loop@ while (true) {
if (isFullImpl) {
//构造SendElement,它是Node类型
val send = if (onUndeliveredElement == null)
//SendElement 有两个成员变量:1是具体的值,2是当前协程的封装体cont
SendElement(element, cont) else
SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
//将Element 加入到队列尾部
val enqueueResult = enqueueSend(send)
//插入成功,则返回
}
}
}
用图表示以上流程:
接收者协程被恢复后,重新调度执行协程,而传入的值即为send发送的值,最终recevie返回的即是send过来的值。
对协程的挂起有疑惑请移步:讲真,Kotlin 协程的挂起没那么神秘(原理篇)
receive 分析
与send流程类似,就不贴代码了,仅用图表示:
可以看出,send/receive 通过判断queue的状态来决定是否挂起当前协程,而queue里的Node 又分为三种类型:
综合以上得出:
在RENDEZVOUS类型(默认类型)下,发送者协程需要等待接收者就位了(到队列里等待)才会继续往下走。同样的,接收者协程需要等待发送者就位了(到队列里等待)才会继续往下走。因此,形成的现象是发送者/接收者成对出现。
成对出现的场景,我们称RENDEZVOUS 为约会类型。
前面的分析是基于约会类型,实际上Channel还有其它类型,通过其构造过程可看出:
#AbstractChannel.kt
public fun <E> Channel(
capacity: Int = Channel.RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
//约会类型
Channel.RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
//转为缓冲类型
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
//混合类型
Channel.CONFLATED -> {
//此种类型下必须是挂起模式
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
//无限制类型
Channel.UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
//缓冲类型
Channel.BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) Channel.CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
)
//没有指定具体类型是以上4种内的组合
}
先看CONFLATED(混合)类型。
ConflatedChannel 继承自AbstractChannel,有个成员变量:value。
重点来看其重写的函数:offerInternal与pollInternal,分别对应send与receive的逻辑。
send 分析
#ConflatedChannel.kt
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed? = null
//先上锁
lock.withLock {
//如果value 为空,也就是之前没有发送过,说明可能有接收者在等待。
if (value === kotlinx.coroutines.channels.EMPTY) {
loop@ while(true) {
//尝试取出接收者
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
//是关闭Node,直接返回
return receive!!
}
//赋值给接收者协程
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
//跳出锁
return@withLock
}
}
}
//更新发送值到value里
updateValueLocked(element)?.let { throw it }
//成功插入
return OFFER_SUCCESS
}
//如果找到接收者,则恢复接收者协程
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}
receive 分析
#ConflatedChannel.kt
protected override fun pollInternal(): Any? {
var result: Any? = null
//上锁
lock.withLock {
//如果value 为空,说明没数据,取数据失败
if (value === kotlinx.coroutines.channels.EMPTY) return closedForSend ?: POLL_FAILED
//从value 里取数据
result = value
//恢复到无数据状态
value = EMPTY
}
return result
}
由此可见:
在 CONFLATED类型下,发送者无需等待接收者就位,它可以一直更新数据。
此为缓冲类型,与其它类型最大的不同之处在于它内部有数据缓冲区。
ArrayChannel 继承自AbstractChannel,其成员变量:
//数据缓冲区
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }
重点函数还是offerInternal与pollInternal。
send 分析
#ArrayChannel.kt
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed? = null
lock.withLock {
//size 为buffer 当前的实际存储数据的个数
val size = this.size.value
//更新size,此处根据发送策略,有可能会直接退出
updateBufferSize(size)?.let { return it }
if (size == 0) {
//当前缓冲区没有数据
loop@ while (true) {
//查看是否有接收者等待
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
//给接收者协程赋值
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
//缓冲区数量不变
this.size.value = size
return@withLock
}
}
}
//加入到缓冲队列
enqueueElement(size, element)
//插入成功
return OFFER_SUCCESS
}
//恢复接收者协程
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}
private fun updateBufferSize(currentSize: Int): Symbol? {
if (currentSize < capacity) {
//还可以继续存放数据
size.value = currentSize + 1 // tentatively put it into the buffer
return null // proceed
}
//缓冲区满
return when (onBufferOverflow) {
//协程需要挂起
BufferOverflow.SUSPEND -> OFFER_FAILED
//舍弃最新数据,相当于发送永远是成功的
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
//舍弃旧的数据,发送继续走下面的流程
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
}
}
receive 分析
receive 过程与send类似,就不贴源码了,直接上图对比:
可以看出,对于发送者来说:
先将数据放入数据缓冲队列,当缓冲区满后才会考虑是否需要挂起发送者协程。同样的,对于接收者来说,先从缓冲队列取数据,当缓冲区没数据时才会挂起自身。
此类型为无限制类型,网上一些文章将此与BUFFERED类型类比,并归为“无限缓冲类型”,该说法是否正确,接下来一步步印证。
同样的,LinkedListChannel继承自AbstractChannel。
重点函数还是offerInternal与pollInternal。
send 分析
#LinkedListChannel.kt
protected override fun offerInternal(element: E): Any {
while (true) {
//快速查找是否有接收者等待
val result = super.offerInternal(element)
when {
//找到接收者,插入算是成功
result === OFFER_SUCCESS -> return OFFER_SUCCESS
//没找到
result === OFFER_FAILED -> {
//加入到协程缓冲队列
when (val sendResult = sendBuffered(element)) {
null -> return OFFER_SUCCESS
is Closed<*> -> return sendResult
}
}
}
}
}
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
//将SendBuffered 加入到queue里(队尾)
queue.addLastIfPrev(AbstractSendChannel.SendBuffered(element)) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
true
}
//添加成功
return null
}
receive 分析
receive 过程完全依靠父类AbstractChannel完成,此处就不再赘述,用图表示:
可以看出:
此类型下,发送者不会挂起,会一直往队列里存放数据,理论上是可以无限制存放的。与BUFFERED类型不同的是,UNLIMITED 缓冲数据使用的是queue,它是链表。而BUFFERED 缓冲数据使用的是数组。
对于接收者来说,只有一种逻辑:
有数据则消费数据,没数据则挂起等待。
通过上面的分析,我们知道接收者有可能会阻塞,怎样才能让接收者知道数据已经发送完毕了呢?
答案是:Channel.close()。
当调用该函数时,会往queue里加入Closed节点,当send/receive 取出该节点时就知道Channel关闭了。
你说,能不能不手动调用该函数呢?刚好,produce可以解决该问题:
fun testProduce() {
//返回接收者
var receiveChannel = GlobalScope.produce<String> {
//
for (x in 1..5) {
var sendStr = "fish $x"
println("send $sendStr")
send("$sendStr")
}
}
//接收数据
GlobalScope.launch {
while (true) {
println("job2 receive:${receiveChannel.receive()}")
}
println("job2 end")
}
GlobalScope.launch {
while (true) {
println("job3 receive:${receiveChannel.receive()}")
}
println("job3 end")
}
}
produce 函数返回Channel,在produce的协程体里可以发送数据,而通过返回的Channel,其它协程可以接收数据。当produce协程执行完毕后,将会主动调用close关闭Channel,其它Receive的Channel就会有感知,从而退出挂起状态。
这是一个典型的单生产者–多消费者的模型。
反之单消费者–多生产者的模型如下:
fun testActor() {
//返回发送者
var sendChannel = GlobalScope.actor {
//
for (x in 1..5) {
println("job1 receive:${receive()}")
}
println("actor end")
}
//发送者
GlobalScope.launch {
sendChannel.send("send from job2")
}
GlobalScope.launch {
sendChannel.send("send from job3")
}
}
produce和actor内部创建了RENDEZVOUS 类型的Channel,它们返回的Channel以及协程体里的Channel都是委托这个内部的Channel来完成功能的,并且Channel绑定了协程的生命周期,当协程取消时将会取消Channel。(由于篇幅原因,就不展开细说了,有兴趣可以自行阅读源码或是留言)。
下篇将着重分析Flow以及与LiveData的PK。
本文基于Kotlin 1.5.3,文中完整Demo请点击