• Kotlin协程:MutableSharedFlow的实现原理


    一.MutableSharedFlow接口的实现

    1.MutableSharedFlow方法

        在Koltin协程:异步热数据流的设计与使用中,提到了可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:

    @Suppress("FunctionName", "UNCHECKED_CAST")
    public fun <T> MutableSharedFlow(
        replay: Int = 0,
        extraBufferCapacity: Int = 0,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    ): MutableSharedFlow<T> {
        // 参数检查
        require(replay >= 0) { "replay cannot be negative, but was $replay" }
        require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
        require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
            "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
        }
        
        // 相加计算缓存容量
        val bufferCapacity0 = replay + extraBufferCapacity
        // 如果缓存容量小于0,则设置缓存容量为Int类型的最大值
        val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
        // 创建一个SharedFlowImpl类型的对象并返回
        return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

        在MutableSharedFlow方法中,首先将参数replay与参数extraBufferCapacity相加,计算缓存容量,这与上一篇里提到的“参数replay与参数extraBufferCapacity共同决定缓存最大容量”的设计思想一致。接着判断缓存容量是否溢出,即小于0,如果小于零则将缓存容量设置为Int类型的最大值。最后根据参数,创建并返回一个SharedFlowImpl类型的对象。

    二.SharedFlowImpl类

        SharedFlowImpl类是MutableSharedFlow接口的核心实现,它的继承关系如下图所示:
    在这里插入图片描述

    • AbstractSharedFlow类:提供了对订阅者进行管理的方法。
    • CancellableFlow接口:用于标记SharedFlowImpl类型的Flow对象是可取消的。
    • MutableSharedFlow接口:表示SharedFlowImpl类型的Flow对象是一个热流。
    • FusibleFlow接口:表示SharedFlowImpl类型的Flow对象是可融合的。

    1.发射数据的管理

        在SharedFlowImpl类中,维护了一个缓存数组,用于保存emit方法发射数据,数据缓存数组分成了buffered values和queued emitters两部分,它的结构如下所示:
    在这里插入图片描述

    • buffered values:表示当前缓存数据的大小,最大容量为SharedFlowImp类构造方法中bufferCapacity。buffered values由extraBuffer和replayCache两部分构成:
      • replayCache的最大容量由MutableSharedFlow方法中参数replay决定。
      • extraBuffer的最大容量由MutableSharedFlow方法中参数extraBufferCapacity决定。
    • queued emitters:通常情况下,当调用emit方法发射数据时,如果缓存数组的buffered values未达到最大容量,则发射的数据将保存到缓存中,并立即返回emit方法。如果缓存数组的buffered values已达到最大容量,则调用emit方法的协程会被立即挂起,并且它的续体和数据会被封装成一个Emitter类型的对象,保存到缓存数组的queued emitters中。
    • 数据缓存的移动:假设上图中,当buffered values中位置为0的数据被所有的订阅者都处理后,buffered values会前移动一位。这时,queued emitters中位置为7的Emitter类型的对象就会被“拆箱”,将其中保存的数据存放到位置7,同时恢复其中保存的emit方法所在续体的执行。之后,位置7将作为buffered values的一部分。

        为了实现上述模型的运行,在SharedFlowImpl类中使用了很多的全局变量,代码如下:

    private class SharedFlowImpl<T>(
        private val replay: Int, // replayCache的最大容量
        private val bufferCapacity: Int, // buffered values的最大容量
        private val onBufferOverflow: BufferOverflow // 溢出策略
    ) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
        // 缓存数组,用于保存emit方法发射的数据,在需要时进行初始化
        private var buffer: Array<Any?>? = null
        // 新的订阅者从replayCache中获取数据的起始位置
        private var replayIndex = 0L
        // 当前所有的订阅者从缓存数组中获取的数据中,对应位置最小的索引
        // 如果没有订阅者,则minCollectorIndex的值等于replayIndex
        private var minCollectorIndex = 0L
        // 缓存数组中buffered values缓存数据的数量
        private var bufferSize = 0
        // 缓存数组中queued emitters缓存数据的数量
        private var queueSize = 0
    
        // 当前缓存数组的起始位置
        private val head: Long get() = minOf(minCollectorIndex, replayIndex)
        // 当前缓存数组中replayCache缓存数据的数量
        private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
        // 当前缓存数组中已经缓存的数据的数量
        private val totalSize: Int get() = bufferSize + queueSize
        // 当前缓存数组中buffered values的最末尾位置索引的后一位
        private val bufferEndIndex: Long get() = head + bufferSize
        // 当前数组中queued emitters的最末尾位置索引的后一位
        private val queueEndIndex: Long get() = head + bufferSize + queueSize
    
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

        上面代码中的全局变量对应到数组中的位置如下图所示:
    在这里插入图片描述

    2.订阅者的管理

        在SharedFlowImpl中,AbstractSharedFlow类与AbstractSharedFlowSlot类实现了对订阅者的管理,这两个类都是抽象类。在AbstractSharedFlow类中维护了一个订阅者数组,数组中每一个元素都是一个AbstractSharedFlowSlot类型的对象。

    1)AbstractSharedFlowSlot类与SharedFlowSlot类

        在AbstractSharedFlowSlot类中,定义了allocateLocked方法与freeLocked方法,用于实现订阅者数组中AbstractSharedFlowSlot类型对象的复用,代码如下:

    internal abstract class AbstractSharedFlowSlot<F> {
        // 用于新订阅者申请使用当前AbstractSharedFlowSlot类型的对象
        // 返回true代表申请成功,返回false代表申请失败
        abstract fun allocateLocked(flow: F): Boolean
        // 用于订阅者释放当前使用的AbstractSharedFlowSlot类型得到对象,
        // 并以数组的形式返回待恢复的续体
        abstract fun freeLocked(flow: F): Array<Continuation<Unit>?>
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

        在SharedFlowImpl中,当有新的订阅者出现时,会为它在订阅者数组中分配一个类型为SharedFlowSlot的对象。SharedFlowSlot类继承自AbstractSharedFlowSlot类,代码如下:

    private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
        // 表示将要在处理的数据在数组中的索引
        // 如果为-1,表示当前可用
        @JvmField
        var index = -1L
    
        // 用来保存等待新数据发送的订阅者的续体
        @JvmField
        var cont: Continuation<Unit>? = null
    
        // 重写
        override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
            // 如果已经被其他订阅者使用,则返回false
            if (index >= 0) return false
            // 走到这里说明没有被其他订阅者使用,分配成功
            // 获取当前的新订阅者应该从缓存数组获取数据的初始位置
            index = flow.updateNewCollectorIndexLocked()
            return true
        }
    
        // 重写
        override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
            assert { index >= 0 }
            val oldIndex = index
            // 清除索引,表示可用
            index = -1L
            // 清除续体
            cont = null
            return flow.updateCollectorIndexLocked(oldIndex)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

        SharedFlowSlot类在AbstractSharedFlowSlot类的基础上,加入了全局变量index和cont。

        当index大于等于0时,表示订阅者应该从缓存数组index对应的位置中获取数据,而当index小于0时,则表示当前SharedFlowSlot类型的对象没有被任何订阅者使用。

        当订阅者处理完应该处理的所有数据时,订阅者所在的协程会被挂起,它的续体就会被保存在全局变量cont中。

    2)AbstractSharedFlow类

        AbstractSharedFlow类内部维护了一个订阅者数组,并管理订阅者数组中AbstractSharedFlowSlot类型的对象。

        AbstractSharedFlow类定义了两个抽象方法:createSlot方法与createSlotArray方法,createSlot方法用于创建一个类型为AbstractSharedFlowSlot的对象,createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组,代码如下:

    internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
        @Suppress("UNCHECKED_CAST")
        // 存放订阅者的数组,在必要时进行初始化
        protected var slots: Array<S?>? = null
            private set
        // 用于记录订阅者的数量
        protected var nCollectors = 0
            private set
        // 用于保存在订阅者数组中查找空位时,下一次要查找的位置
        private var nextIndex = 0
        // 订阅者数量的状态流,当订阅者数量发生变化时,会进行回调
        private var _subscriptionCount: MutableStateFlow<Int>? = null    
        val subscriptionCount: StateFlow<Int>
            // 加锁
            get() = synchronized(this) {
                // 对_subscriptionCount初始化,nCollectors为初始值
                _subscriptionCount ?: MutableStateFlow(nCollectors).also {
                    _subscriptionCount = it
                }
            }
    
        ...
        // 创建一个类型为AbstractSharedFlowSlot的对象
        protected abstract fun createSlot(): S
        // createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组
        protected abstract fun createSlotArray(size: Int): Array<S?>
        
        ...    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

        AbstractSharedFlow类继承自SynchronizedObject类,SynchronizedObject类实际是Any类的别名,代码如下:

    @InternalCoroutinesApi
    public actual typealias SynchronizedObject = Any
    
    • 1
    • 2
    a)allocateSlot方法

        allocateSlot方法用于为新的订阅者在订阅者数组中分配一个可使用的AbstractSharedFlowSlot类型对象,代码如下:

    internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
        ...
         
        @Suppress("UNCHECKED_CAST")
        protected fun allocateSlot(): S {
            // 在锁外创建一个状态流
            var subscriptionCount: MutableStateFlow<Int>? = null
            // 加锁
            val slot = synchronized(this) {
                // 对订阅者数组进行判断处理
                val slots = when (val curSlots = slots) {
                    // 为空则初始化,初始化大小为2,并保存到全局变量
                    null -> createSlotArray(2).also { slots = it }
                    // 如果容量已满,则进行扩容,扩容前大小为扩容后大小的2倍,并保存到全局变量
                    else -> if (nCollectors >= curSlots.size) {
                        curSlots.copyOf(2 * curSlots.size).also { slots = it }
                    } else {
                        curSlots
                    }
                }
                
                // 获取全局变量
                var index = nextIndex
                var slot: S
                
                // 遍历
                while (true) {
                    // 获取index位置的AbstractSharedFlowSlot类型的对象,
                    // 如果为空,则调用createSlot方法创建一个,并保存到订阅者数组中
                    slot = slots[index] ?: createSlot().also { slots[index] = it }
                    // 自增
                    index++
                    // 如果遍历到数组的最后一个元素,则从头开始
                    if (index >= slots.size) index = 0
                    // 尝试对AbstractSharedFlowSlot类型的对象分配订阅者,
                    // 分配成功则跳出循环
                    if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break
                }
                // 走到这里说明已经分配成功
                
                // 将下一次要遍历的位置保存到全局变量
                nextIndex = index
                // 订阅者数量自增
                nCollectors++
                // 获取全局变量
                subscriptionCount = _subscriptionCount
                // 返回分配的AbstractSharedFlowSlot类型的对象
                slot
            }
            // 订阅者状态流增加1,此方法会触发回调通知
            subscriptionCount?.increment(1)
            // 返回
            return slot
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    b)freeSlot方法

        freeSlot方法用于释放已分配给订阅者的AbstractSharedFlowSlot类型的对象,代码如下:

    internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
        ...
    
        @Suppress("UNCHECKED_CAST")
        protected fun freeSlot(slot: S) {
            // 在锁外创建一个状态流
            var subscriptionCount: MutableStateFlow<Int>? = null
            // 加锁
            val resumes = synchronized(this) {
                // 订阅者数量自减
                nCollectors--
                // 获取全局变量
                subscriptionCount = _subscriptionCount
                // 如果订阅者为0,说明订阅者数组里没有订阅者,则下一次从头开始
                if (nCollectors == 0) nextIndex = 0
                // 释放已分配的AbstractSharedFlowSlot类型对象
                (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
            }
            
            // 对释放后返回的续体进行遍历,恢复续体
            for (cont in resumes) cont?.resume(Unit)
            // 订阅者状态流减1,此方法会触发回调通知
            subscriptionCount?.increment(-1)
        }
    
        // 用于遍历订阅者数组
        protected inline fun forEachSlotLocked(block: (S) -> Unit) {
            // 如果没有订阅者,则直接返回
            if (nCollectors == 0) return
            // 遍历订阅者数组
            slots?.forEach { slot ->
                if (slot != null) block(slot)
            }
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.数据的接收

        当调用SharedFlow类型对象的collect方法,会触发订阅过程,接收emit方法发送的数据,这部分在
    SharedFlowImpl中实现,代码如下:

    @Suppress("UNCHECKED_CAST")
    override suspend fun collect(collector: FlowCollector<T>) {
        // 为当前的订阅者分配一个SharedFlowSlot类型的对象
        val slot = allocateSlot()
        try {
            // 如果collector类型为SubscribedFlowCollector,
            // 说明订阅者监听了订阅过程的启动,则先回调
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            // 获取订阅者所在的协程
            val collectorJob = currentCoroutineContext()[Job]
            // 死循环
            while (true) {
                var newValue: Any?
                // 死循环
                while (true) {
                    // 从缓存数组中获取数据
                    newValue = tryTakeValue(slot)
                    // 如果获取数据成功,则跳出循环
                    if (newValue !== NO_VALUE) break
                    // 走到这里,说明获取数据失败,
                    // 挂起订阅者所在协程,等待新数据的到来
                    awaitValue(slot)
                }
                // 走到这里,说明已经获取到了数据
                // 判断订阅者所在协程是否是存活的,如果不是则抛出异常
                collectorJob?.ensureActive()
                // 进行类型转换,并向下游发射数据
                collector.emit(newValue as T)
            }
        } finally {
            // 释放已分配的SharedFlowSlot类型的对象
            freeSlot(slot)
        }
    }
    
    
    @SharedImmutable
    @JvmField
    internal val NO_VALUE = Symbol("NO_VALUE")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    1)数据的获取

        在collect方法中,通过tryTakeValue方法获取数据,代码如下:

    private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        // 加锁
        val value = synchronized(this) {
            // 从slot中获取index
            // index表示当前应该从缓存数组的index位置中获取数据
            val index = tryPeekLocked(slot)
            // 如果index小于0,说明没有数据
            if (index < 0) {
                // 返回空数据标识
                NO_VALUE
            } else { // 如果有数据
                // 获取当前的slot的index
                val oldIndex = slot.index
                // 从缓存数组的index处获取数据
                val newValue = getPeekedValueLockedAt(index)
                // 计算下一次获取数据的位置,并保存到slot中
                slot.index = index + 1
                // 更新缓存数组的位置,并获取缓存数组与订阅者数组中可恢复的续体
                resumes = updateCollectorIndexLocked(oldIndex)
                // 返回获取的数据
                newValue
            }
        }
        // 遍历,恢复续体
        for (resume in resumes) resume?.resume(Unit)
        // 返回获取的数据
        return value
    }
    
    @JvmField
    @SharedImmutable
    internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    a)数据获取策略

        在tryTakeValue方法,获取数据之前,首先会调用tryPeekLocked方法,判断数据所在的位置是否符合要求,代码如下:

    private fun tryPeekLocked(slot: SharedFlowSlot): Long {
        // 从slot中获取index
        val index = slot.index
        // 如果是在buffered values中获取,则直接返回
        if (index < bufferEndIndex) return index
        
        // 走到这里说明是要在queued emitters中获取,
        // 如果buffered values的最大容量大于0,则返回-1
        // 在buffered values可以存在的情况下,禁止发射者和订阅者接触
        if (bufferCapacity > 0) return -1L
        
        // 走到这里说明要在queued emitters中获取,同时buffered values的最大容量为0
        // 这种情况缓存数组只能有queued emitters,
        // 因此,只能处理queued emitters中的第一个Emitter类型的对象
        // 如果当前订阅者想要处理下一个Emitter类型的对象,则返回-1
        if (index > head) return -1L
        
        // 走到这里说明要在queued emitters中获取,同时buffered values的最大容量为0
        // 并且要获取当前的正在处理的Emmiter类型的对象
        // 如果queued emitters为空,说明当前没有Emmiter类型的对象,则返回-1
        if (queueSize == 0) return -1L
        // 满足上述要求,返回index
        return index
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

        在允许bufferd values存在的情况下,只能从bufferd values获取数据。在不允许bufferd values存在的情况下,只能处理queued emitters的第一个Emitter类型的对象。

    b)获取数据

        如果数据所在的位置符合要求,则会调用getPeekedValueLockedAt方法获取数据,代码如下:

    private fun getPeekedValueLockedAt(index: Long): Any? =
        // 从缓存数组中index位置获取数据
        when (val item = buffer!!.getBufferAt(index)) {
            // 如果是Emitter类型的,则进行拆箱,获取数据
            is Emitter -> item.value
            // 直接返回
            else -> item
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

        Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit方法所在的协程后,对emit方法发射的数据及挂起后的续体进行封装,代码如下:

    private class Emitter(
        @JvmField val flow: SharedFlowImpl<*>,
        @JvmField var index: Long, // 当前对象在缓存数组中的位置
        @JvmField val value: Any?,// emit方法发射的数据
        @JvmField val cont: Continuation<Unit> // 挂起的续体
    ) : DisposableHandle {
        override fun dispose() = flow.cancelEmitter(this)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2)订阅者协程的挂起

        在collect方法中,当订阅者无数据可获取时,则会调用awaitValue方法,挂起订阅者所在的协程,代码如下:

    private suspend fun awaitValue(slot: SharedFlowSlot): Unit = 
      // 直接挂起订阅者所在的协程
      suspendCancellableCoroutine { cont ->
        // 加锁
        synchronized(this) lock@{
            // 再次检查当前的index是否满足要求
            val index = tryPeekLocked(slot)
            // 如果确实不满足要求
            if (index < 0) {
                // 保存续体到slot中
                slot.cont = cont
            } else { // 如果再次检查发现index这时满足要求
                // 则恢复挂起,并返回
                cont.resume(Unit)
                return@lock
            }
            // 保存续体到slot中
            slot.cont = cont
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4.数据的发射

        当需要发射数据时,会调用SharedFlowImpl类的emit方法,代码如下:

    override suspend fun emit(value: T) {
        // 首先尝试调用不需要挂起的tryEmit方法,如果发射成功,则返回
        if (tryEmit(value)) return
        
        // 走到这里说明需要挂起,则调用emitSuspend方法
        emitSuspend(value)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1)以不挂起的方式发射数据

        SharedFlowImpl类中实现了MutableSharedFlow接口中tryEmit方法,用于以不挂起的方式发射数据,代码如下:

    override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        // 加锁
        val emitted = synchronized(this) {
            // 尝试发射数据,如果发射成功
            if (tryEmitLocked(value)) {
                // 收集已经挂起的订阅者的续体
                resumes = findSlotsToResumeLocked(resumes)
                // 返回true
                true
            } else { // 发射失败
                // 返回false
                false
            }
        }
        // 唤起挂起的订阅者
        for (cont in resumes) cont?.resume(Unit)
        // 返回结果
        return emitted
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

        tryEmit方法中通过tryEmitLocked方法尝试对数据进行发射,如果发射成功,会调用findSlotsToResumeLocked方法收集已经挂起的订阅者的续体,并唤醒订阅者去接收消费数据。

        tryEmitLocked方法代码如下:

    @Suppress("UNCHECKED_CAST")
    private fun tryEmitLocked(value: T): Boolean {
        // 如果当前没有订阅者,则调用tryEmitNoCollectorsLocked处理,并返回
        // 该方法永远返回true
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
        // 如果当前有订阅者,同时buffered values已达到最大容量
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            // 根据溢出策略进行判断
            when (onBufferOverflow) {
                // 如果是挂起,则返回fasle
                BufferOverflow.SUSPEND -> return false
                // 如果是丢掉最新的数据,则返回true
                BufferOverflow.DROP_LATEST -> return true
                // 如果是丢掉最旧的数据,则暂不作处理
                BufferOverflow.DROP_OLDEST -> {}
            }
        }
        
        // 走到这里,有两种情况:
        // 情况1:buffered values还可以继续添加数据
        // 情况2:buffered values已达到最大容量,同时溢出策略为DROP_OLDEST
        
        // 将数据加入到缓存数组中
        // 这里因为tryEmit方法不会挂起emit方法所在的协程,
        // 所以value没有被封装成Emitter类型的对象
        enqueueLocked(value)
        // buffered values的数据数量加1
        bufferSize++
        // 如果buffered values的数据数量超过最大容量的限制,
        // 说明此时为情况2,则调用dropOldestLocked方法,丢弃最旧的数据
        if (bufferSize > bufferCapacity) dropOldestLocked()
        // 如果replayCache中数据的数量超过了最大容量
        if (replaySize > replay) {
            // 更新replayIndex的值,replayIndex向前移动一位
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        // 返回true
        return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

        buffered values已达到最大容量,同时溢出策略为DROP_OLDEST情况下数据发射图解
    在这里插入图片描述
        在tryEmitLocked方法中,如果当前没有订阅者时,会调用tryEmitNoCollectorsLocked方法,代码如下:

    // 如果当前没有订阅者,会调用tryEmitNoCollectorsLocked方法
    private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        assert { nCollectors == 0 }
        // 如果不允许有replayCache,则不处理,直接返回true
        if (replay == 0) return true
        // 走到这里说明可以有replayCache
        // 加入到缓存数组中
        enqueueLocked(value)
        // buffered values的数据数量加1
        bufferSize++
        // 如果buffered values的数据数量超过了replayCache的最大容量
        // 则丢弃最旧的数据
        // 因为新订阅者只会从replayCache中取数据,
        // 如果没有订阅者,buffered values的数据数量超过replayCache的最大容量没有意义
        if (bufferSize > replay) dropOldestLocked()
        // 重新计算minCollectorIndex
        minCollectorIndex = head + bufferSize
        // 返回true
        return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

        在tryEmitNoCollectorsLocked方法中,如果发现当前buffered values的数据数量超过了replayCache的最大容量,则会丢弃最旧的数据,保持buffered values中数据的数量最大为replay。因为当有新的订阅者出现时,首先会从replayCache中获取数据,因此在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存超过replayCache最大容量的数据只会占用更多内存,是没有意义的。

        通过对tryEmitLocked方法与tryEmitNoCollectorsLocked方法的分析,可以知道数据的发射最终都调用了enqueueLocked方法,代码如下:

    private fun enqueueLocked(item: Any?) {
        // 获取当前缓存数组中缓存的数量
        val curSize = totalSize
        // 判断
        val buffer = when (val curBuffer = buffer) {
            // 缓存数组为空,则进行初始化,初始化容量为2
            null -> growBuffer(null, 0, 2)
            // 如果超过了当前缓存数组的最大容量,则进行扩容,新的缓存数组的容量为之前的2倍
            // growBuffer方法会把原来缓存数组的数据填充到新的缓存数组中
            else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
        }
        // 将要发射的数据填充的缓存数组的head + curSize位置
        buffer.setBufferAt(head + curSize, item)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

        enqueueLocked方法内部将要发射的数据填充的缓存数组的顺序位置,最终完成了数据发射的过程。通过分析可以知道,数据发射的实质就是将数据添加到缓存数组中。

    2)以挂起的方式发射数据

        SharedFlowImpl类中实现了MutableSharedFlow接口中emit方法。在上面分析的emit方法中,首先会尝试通过tryEmit方法发射数据,如果发射失败,说明发射过程需要挂起,这时会调用emitSuspend方法,代码如下:

    private suspend fun emitSuspend(value: T) = 
      // 直接挂起emit方法所在的协程,获取续体
      suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        // 加锁
        val emitter = synchronized(this) lock@{
            // 这里再次尝试以tryEmit的方式发射数据
            if (tryEmitLocked(value)) {
                // 如果发射成功,则恢复续体的执行
                cont.resume(Unit)
                // 收集已经挂起的订阅者的续体
                resumes = findSlotsToResumeLocked(resumes)
                // 返回
                return@lock null
            }
            // 将续体、待发射的数据等封装成Emitter类型的对象
            Emitter(this, head + totalSize, value, cont).also {
                // 加入到缓存数组中
                enqueueLocked(it)
                // queued emitters的数据的数量加1
                queueSize++
                // 如果buffered values的最大容量为0,即不存在
                // 则收集已经挂起的订阅者的续体,保存到局部变量resumes中
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
        // emitter对象监听emit方法所在协程的取消
        // 发生取消时会调用emitter对象的dispose方法
        emitter?.let { cont.disposeOnCancellation(it) }
        // 遍历,唤起挂起的订阅者
        for (cont in resumes) cont?.resume(Unit)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3)唤醒挂起的订阅者

        无论是在tryEmit方法,还是在emit方法,当发射数据成功后,都会调用findSlotsToResumeLocked方法,获取已经挂起的订阅者的续体,然后恢复订阅者所在协程的执行,代码如下:

    private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
        // 引用参数中的续体数组
        var resumes: Array<Continuation<Unit>?> = resumesIn
        // 用于记录需要恢复的续体的数量
        var resumeCount = resumesIn.size
        // 遍历订阅者数组
        forEachSlotLocked loop@{ slot ->
            // 获取续体,如果续体为空,说明对应订阅者的协程没有挂起,本次循环返回
            val cont = slot.cont ?: return@loop
            // 判断slot中index是否符合要求
            // 如果不符合要求,则本次循环返回
            if (tryPeekLocked(slot) < 0) return@loop
            // 如果需要恢复的续体的数量超过续体数组的容量,则进行扩容
            // 新的续体数组的容量是之前续体数组容量的2倍
            if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
            // 保存续体到续体数组中
            resumes[resumeCount++] = cont
            // 清空slot中保存的续体
            slot.cont = null
        }
        // 返回收集完的续体数组
        return resumes
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    5.新订阅者获取缓存数据

        SharedFlowImpl类实现了SharedFlow接口,重写了其中的常量replayCache,当有新订阅者出现时,如果replayCache存在,并且有缓存数据,则优先从replayCache中获取,代码如下:

    override val replayCache: List<T>
        // 只能获取,不能设置,加锁
        get() = synchronized(this) {
            // 获取当前replayCache中缓存数据的数量
            val replaySize = this.replaySize
            // 如果数量为0,则返回一个空列表
            if (replaySize == 0) return emptyList()
            // 若数量不为0,则根据容量创建一个列表
            val result = ArrayList<T>(replaySize)
            // 获取缓存数组
            val buffer = buffer!!
            // 遍历replayCache,将数据进行类型转换,并添加到列表中
            @Suppress("UNCHECKED_CAST")
            for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
            // 返回列表
            result
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    6.热流的融合

        SharedFlowImpl类实现了FusibleFlow接口,重写了其中的fuse方法,代码如下:

    // 调用了fuseSharedFlow方法实现
    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseSharedFlow(context, capacity, onBufferOverflow)
    
    ...
    
    internal fun <T>  SharedFlow<T>.fuseSharedFlow(
        context: CoroutineContext,
        capacity: Int,
        onBufferOverflow: BufferOverflow
    ): Flow<T> {
        // 如果容量为0或默认值,同时溢出策略为SUSPEND
        if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
            // 返回自身
            return this
        }
        // 其他情况,将当前的SharedFlow对象包装成ChannelFlowOperatorImpl类型的对象
        return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

        在Kotlin协程:Flow的融合、Channel容量、溢出策略中提到过,当对类型为SharedFlowImpl的对象使用某些操作符时,会触发fuse方法的执行。fuse方法默认的容量为OPTIONAL_CHANNEL,默认的溢出策略为SUSPEND,返回自身,因此融合后还是SharedFlowImpl类型的对象。

        如果容量为RENDEZVOUS,同时溢出策略为SUSPEND时,也会返回自身。RENDEZVOUS表示容量为0,无论SharedFlowImpl类型的对象的buffered values最大容量是否为0,在外面再套一层RENDEZVOUS是没有意义的。

        其他情况下,SharedFlowImpl类型的对象会被封装成一个类型为ChannelFlowOperatorImpl的对象,根据Kotlin协程:flowOn与线程切换讲过的,之后向下游发射的数据会通过Channel来发送。

    7.只读热流

        调用MutableSharedFlow方法,可以得到一个类型为MutableSharedFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅接收,也可以调用它的emit方法来发射数据。但大多数的时候,我们需要统一数据的发射过程,因此需要对外暴露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接暴露MutableSharedFlow类型的对象。

        根据上面代码的介绍,订阅的过程实际上是从缓存数组中读取数据的,而发射的过程实际上是向缓存数据中写数据,因此如果一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。

        事实上,根据在Koltin协程:异步热数据流的设计与使用中对接口的分析可以发现,MutableSharedFlow接口继承了FlowCollector接口和SharedFlow接口,emit方法定义在FlowCollector中。SharedFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableSharedFlow接口指向的对象转换为SharedFlow接口指向的对象就可以将读写流转换为只读流。

        在代码中,对MutableSharedFlow类型的对象调用asSharedFlow方法恰好可以实现将读写流转换为只读流,代码如下:

    // 该方法调用了ReadonlySharedFlow方法,返回一个类型为SharedFlow的对象
    public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
        // 传入当前的MutableSharedFlow类型的对象
        ReadonlySharedFlow(this)
    
    // 实现了FusibleFlow接口,
    // 实现了SharedFlow接口,并且使用上一步传入的MutableSharedFlow类型的对象作为代理
    private class ReadonlySharedFlow<T>(
        flow: SharedFlow<T>
    ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
        // 用于流融合,也是通过fuseSharedFlow方法实现
        override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
            fuseSharedFlow(context, capacity, onBufferOverflow)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    Sentinel服务保护
    Centos7原生hadoop环境,搭建Impala集群和负载均衡配置
    华为Mate系列回归,高端市场洗牌开始
    Python 多进程编程《*》:shared_memory 模块
    06.Oracle数据备份与恢复
    如何开发自己的npm依赖包,开发-本地调试-打包发布到自己的镜像库
    TensorFlow入门(二十三、退化学习率)
    深度强化学习极简入门(十一)——策略梯度方法REINFORCE【附代码】
    FPGA图像处理学习——人脸检测
    图论学习笔记 - 树链剖分
  • 原文地址:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126922041