• Kotlin协程:续体、续体拦截器、调度器


    一.Continuation

        Continuation接口是协程中最核心的接口,代表着挂起点之后的续体,代码如下:

    public interface Continuation<in T> {
        // 续体的上下文
        public val context: CoroutineContext
    
        // 该方法用于恢复续体的执行
        // result为挂起点执行完成的返回值,T为返回值的类型
        public fun resumeWith(result: Result<T>)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.Continuation图解

    在这里插入图片描述

    二.ContinuationInterceptor

        ContinuationInterceptor接口继承自Element接口,是协程中的续体拦截器,代码如下:

    public interface ContinuationInterceptor : CoroutineContext.Element {
        // 拦截器的Key
        companion object Key : CoroutineContext.Key<ContinuationInterceptor>
        
        // 拦截器对续体进行拦截时会调用该方法,并对continuation进行缓存
        // 拦截判断:根据传入的continuation对象与返回的continuation对象是否相同
        public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
        
        // 当interceptContinuation方法拦截的协程执行完毕后,会调用该方法
        public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
            /* do nothing by default */
        }
    
        // get方法多态实现
        public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
            @OptIn(ExperimentalStdlibApi::class)
            if (key is AbstractCoroutineContextKey<*, *>) {
                @Suppress("UNCHECKED_CAST")
                return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
            }
            @Suppress("UNCHECKED_CAST")
            return if (ContinuationInterceptor === key) this as E else null
        }
    
        // minusKey方法多态实现
        public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
            @OptIn(ExperimentalStdlibApi::class)
            if (key is AbstractCoroutineContextKey<*, *>) {
                return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
            }
            return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    三.CoroutineDispatcher

        CoroutineDispatcher类继承自AbstractCoroutineContextElement类,实现了ContinuationInterceptor接口,是协程调度器的基类,代码如下:

    public abstract class CoroutineDispatcher :
        AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    
        // ContinuationInterceptor的多态实现,调度器本质上就是拦截器
        @ExperimentalStdlibApi
        public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
            ContinuationInterceptor,
            { it as? CoroutineDispatcher })
            
        // 用于判断调度器是否要调用dispatch方法进行调度,默认为true
        public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
        
        // 调度的核心方法,在这里进行调度,执行block
        public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        
        // 如果调度是由Yield方法触发的,默认通过dispatch方法实现
        @InternalCoroutinesApi
        public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
        
        // ContinuationInterceptor接口的方法,将续体包裹成DispatchedContinuation,并传入当前调度器
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)
    
        // 释放父协程与子协程的关联。
        @InternalCoroutinesApi
        public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
            (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
        }
    
        // 重载了"+"操作,直接返回others
        // 因为两个调度器相加没有意义,同一个上下文中只能有一个调度器
        // 如果需要加的是调度器对象,则直接替换成最新的,因此直接返回
        public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
    
        override fun toString(): String = "$classSimpleName@$hexAddress"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    四.EventLoop

        EventLoop类继承自CoroutineDispatcher类,用于协程中任务的分发执行,只在runBlocking方法中和Dispatchers.Unconfined调度器中使用。与Handler中的Looper类似,在创建后会存储在当前线程的ThreadLocal中。EventLoop本身不支持延时执行任务,如果需要可以自行继承EventLoop并实现Delay接口,EventLoop中预留了一部分变量和方法用于延时需求的扩展。

        为什么协程需要EventLoop呢?协程的本质是续体传递,而续体传递的本质是回调,假设在Dispatchers.Unconfined调度下,要连续执行多个suspend方法,就会有多个续体传递,假设suspend方法达到一定数量后,就会造成StackOverflow,进而引起崩溃。同样的,我们知道调用runBlocking会阻塞当前线程,而runBlocking阻塞的原理就是执行“死循环”,因此需要在循环中做任务的分发,去执行内部协程在Dispatchers.Unconfined调度器下加入的任务。

        EventLoop代码如下:

    internal abstract class EventLoop : CoroutineDispatcher() {
        // 用于记录使用当前EventLoop的runBlocking方法和Dispatchers.Unconfined调度器的数量
        private var useCount = 0L
    
        // 表示当前的EventLoop是否被暴露给其他的线程
        // runBlocking会将EventLoop暴露给其他线程
        // 因此,当runBlocking使用时,shared必须为true
        private var shared = false
    
        // Dispatchers.Unconfined调度器的任务执行队列
        private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
        
        // 处理任务队列的下一个任务,该方法只能在EventLoop所在的线程调用
        // 返回值<=0,说明立刻执行下一个任务
        // 返回值>0,说明等待这段时间后,执行下一个任务
        // 返回值为Long.MAX_VALUE,说明队列里没有任务了 
        public open fun processNextEvent(): Long {
            if (!processUnconfinedEvent()) return Long.MAX_VALUE
            return 0
        }
        
        // 队列是否为空
        protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
        
        // 下一个任务多长时间后执行
        protected open val nextTime: Long
            get() {
                val queue = unconfinedQueue ?: return Long.MAX_VALUE
                return if (queue.isEmpty) Long.MAX_VALUE else 0L
            }
         
        // 任务的核心处理方法
        public fun processUnconfinedEvent(): Boolean {
            // 若队列为空,则返回
            val queue = unconfinedQueue ?: return false
            // 从队首取出一个任务,如果为空,则返回
            val task = queue.removeFirstOrNull() ?: return false
            // 执行
            task.run()
            return true
        }
        
        // 表示当前EventLoop是否可以在协程上下文中被调用
        // EventLoop本质上也是协程上下文
        // 如果EventLoop在runBlocking方法中使用,必须返回true
        public open fun shouldBeProcessedFromContext(): Boolean = false
        // 向队列中添加一个任务
        public fun dispatchUnconfined(task: DispatchedTask<*>) {
            // 若队列为空,则创建一个新的队列
            val queue = unconfinedQueue ?:
                ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
            queue.addLast(task)
        }
        // EventLoop当前是否还在被使用
        public val isActive: Boolean
            get() = useCount > 0
        
        // EventLoop当前是否还在被Unconfined调度器使用
        public val isUnconfinedLoopActive: Boolean
            get() = useCount >= delta(unconfined = true)
    
        // 判断队列是否为空
        public val isUnconfinedQueueEmpty: Boolean
            get() = unconfinedQueue?.isEmpty ?: true
         
        // 下面三个方法用于计算使用当前的EventLoop的runBlocking方法和Unconfined调度器的数量
        // useCount是一个64位的数,
        // 它的高32位用于记录Unconfined调度器的数量,低32位用于记录runBlocking方法的数量
        private fun delta(unconfined: Boolean) =
            if (unconfined) (1L shl 32) else 1L
    
        fun incrementUseCount(unconfined: Boolean = false) {
            useCount += delta(unconfined)
            // runBlocking中使用,shared为true
            if (!unconfined) shared = true 
        }
    
        fun decrementUseCount(unconfined: Boolean = false) {
            useCount -= delta(unconfined)
            // 如果EventLoop还在被使用
            if (useCount > 0) return
            assert { useCount == 0L }
            // 如果EventLoop不被使用了,并且在EventLoop中使用过
            if (shared) {
                // 关闭相关资源,并在ThreadLocal中移除
                shutdown()
            }
        }
    
        protected open fun shutdown() {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

        协程中提供了EventLoopImplBase类,间接继承自EventLoop,实现了Delay接口,用来延时执行任务。同时,协程中还提供单例对象ThreadLocalEventLoop用于EventLoop在ThreadLocal中的存储。

  • 相关阅读:
    【C#语言】DataGridView隔行显示颜色
    软件测试-03缺陷管理
    Cocos2dx 安装运行
    Rust GUI库 egui 的简单应用
    springboot+mybatis实现一对一,一对多
    ubuntu20下安装nginx插件geoip2查询ip信息
    中国34省级行政区及行政区划代码
    我开源了一个Go学习仓库|笔记预览
    公司安防工程简要介绍及系统需求分析
    径向基神经网络RBF:Matlab实现RBF神经网络(含例子及代码)
  • 原文地址:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126083367