• Kotlin协程:生命周期原理


    一.start方法

        start方法用于启动协程,对于已经启动的协程,再次调用它的start方法是没有意义的。

        默认情况下,协程在创建后会自动启动。而根据上一篇的分析,如果协程按照LAZY模式启动,则需要手动去调用它的start方法。接下来以launch方法为例,分析LAZY启动模式下协程的启动。

        当指定协程的启动模式为LAZY时,在launch方法中会创建一个类型为LazyStandaloneCoroutine的对象,LazyStandaloneCoroutine类的代码如下:

    private class LazyStandaloneCoroutine(
        parentContext: CoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) : StandaloneCoroutine(parentContext, active = false) {
        // 创建协程
        private val continuation = block.createCoroutineUnintercepted(this, this)
        
        // 监听start方法调用
        override fun onStart() {
            // 启动协程
            continuation.startCoroutineCancellable(this)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

        LazyStandaloneCoroutine类内部只对协程进行了创建,但并没有启动,只有当手动去调用start方法后才会启动。而调用LazyStandaloneCoroutine类的start方法实际调用的是父类JobSupport的start方法,代码如下:

    public final override fun start(): Boolean {
        // 循环处理状态变化
        loopOnState { state ->
            when (startInternal(state)) {
                FALSE -> return false
                TRUE -> return true
            }
        }
    }
    
    private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
        while (true) {
            block(state)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

        通过上面的代码,可以知道start方法内部调用了startInternal方法,代码如下:

    // 三个返回值
    private const val RETRY = -1 // 重试
    private const val FALSE = 0 // 启动失败
    private const val TRUE = 1 // 启动成功
    
    private fun startInternal(state: Any?): Int {
        when (state) {
            // 一个内部状态,后面会谈到
            is Empty -> {
                // 如果已经启动,返回FALSE
                if (state.isActive) return FALSE
                // 如果更新状态失败,说明当前竞争失败,需要再试一次,返回RETRY
                if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
                // 核心启动方法
                onStartInternal()
                // 返回TRUE
                return TRUE
            }
            // 一个内部状态,后面会谈到
            is InactiveNodeList -> {
                // 如果更新状态失败,说明当前竞争失败,需要再试一次,返回RETRY
                if (!_state.compareAndSet(state, state.list)) return RETRY
                // 核心启动方法
                onStartInternal()
                // 返回TRUE
                return TRUE
            }
            // 其他状态,肯定已经启动了,返回FALSE
            else -> return FALSE
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

        startInternal内部调用了onStartInternal方法,该方法被AbstractCoroutine类重写,代码如下:

    internal final override fun onStartInternal() {
        onStart()
    }
    
    • 1
    • 2
    • 3

        onStart方法被LazyStandaloneCoroutine类重写,最终会调用startCoroutineCancellable方法来启动协程,代码如下:

    internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
        runSafely(fatalCompletion) {
            // intercepted:拦截调度
            // resumeCancellableWith:恢复执行
            intercepted().resumeCancellableWith(Result.success(Unit))
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

        startCoroutineCancellable方法内部最终通过resumeCancellableWith方法恢复协程的执行。resumeCancellableWith方法在Kotlin协程:创建、启动、挂起、恢复中进行过详细的分析,这里不再赘述。

    二.JobSupport类

        通过对协程的start方法分析可以发现,几乎所有操作协程生命周期的方法最后都是通过JobSupport类实现的。JobSupport类是协程中一个重要的类,它的内部通过一个复杂的状态机实现协程的状态管理。

        JobSupport类的内部状态的划分与对应的外部状态如下:

    内部状态名内部实现类或对象外部状态备注
    EMPTY_NEmptyNewNew没有监听器
    EMPTY_AEmptyActiveActive没有监听器
    SINGLEJobNodeActive有一个监听器
    SINGLE+JobNodeActive有一个监听器,同时持有一个NodeList对象的引用
    LIST_NInactiveNodeListNew有多个监听器
    LIST_ANodeListActive有多个监听器
    COMPLETINGFinishingCompleting\
    CANCELLINGFinishingCancelling\
    FINAL_CCompletedExceptionallyCancelled\
    FINAL_RCompleted表示执行完成的结果

        对应的状态转移图如下:
    在这里插入图片描述
        在JobSupport类中,通过一个类型为Any?的_state对象来代表当前协程的状态,通过对_state对象类型的判断,就可以对当前的状态进行区分。不同的状态切换,实际就是_state引用不同类型的对象,而对于相同类型的状态,可以通过_state引用的对象的内部变量来作区分,代码如下:

    // 根据参数active决定初始状态
    private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
    
    • 1
    • 2

        上面提到的JobSupport的内部状态类:Empty、JobNode、InactiveNodeList、NodeList、Finishing,除了CompletedExceptionally,都实现了Incomplete接口,代码如下:

    internal interface Incomplete {
        val isActive: Boolean
        val list: NodeList?
    }
    
    • 1
    • 2
    • 3
    • 4

        Incomplete接口定义了两个重要的常量。第一个是isActive,表示协程是否已经启动。第二个是list,list是一个类型为NodeList的变量,表示一个双向链表的首节点。在协程中,无论是父协程、子协程,还是取消监听器、完成监听器,都会被抽象成一个JobNode对象,添加到对应的NodeList中,当协程执行完毕或异常取消时,就会通过遍历链表的方式,对相关的协程和监听器进行线性化的通知。

    1.EmptyNew与EmptyActive

        EMPTY_NEW和EmptyActive是两个Empty类型的对象,它们的区别在于isActive的取值,同时它们的list都是空的。

    @SharedImmutable
    private val EMPTY_NEW = Empty(false)
    @SharedImmutable
    private val EMPTY_ACTIVE = Empty(true)
    
    private class Empty(override val isActive: Boolean) : Incomplete {
        override val list: NodeList? get() = null
        override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.SINGLE与SINGLE+

        SINGLE与SINGLE+都是JobNode类型的对象,但在代码中并没有像Empty类一样,用不同的对象进行区分。SINGLE与SINGLE+代表已经启动的状态,因此isActive为true。二者的区别在于list参数的取值,SINGLE状态下,参数list为空,而在SINGLE+状态下,参数list引用了一个空的NodeList对象,因此可以将SINGLE+理解为是从SINGLE到LIST_A的过度状态。

        JobNode实现了DisposableHandle接口的dispose方法,用于将自身从链表NodeList中移除。

    internal abstract class JobNode<out J : Job>(
        @JvmField val job: J
    ) : CompletionHandlerBase(), DisposableHandle, Incomplete {
        override val isActive: Boolean get() = true
        override val list: NodeList? get() = null
        override fun dispose() = (job as JobSupport).removeNode(this)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.LIST_N与LIST_A

        LIST_N是类型为InactiveNodeList的对象,LIST_A是类型为NodeList的对象,二者的区别在于isActive取值不同。同时,由于NodeList类继承了LockFreeLinkedListHead类,因此这两种类型的对象本身是一个链表。参数list为自身。

    internal class NodeList : LockFreeLinkedListHead(), Incomplete {
        override val isActive: Boolean get() = true
        override val list: NodeList get() = this
    
        fun getString(state: String) = buildString {
            append("List{")
            append(state)
            append("}[")
            var first = true
            this@NodeList.forEach<JobNode<*>> { node ->
                if (first) first = false else append(", ")
                append(node)
            }
            append("]")
        }
    
        override fun toString(): String =
            if (DEBUG) getString("Active") else super.toString()
    }
    
    internal class InactiveNodeList(
        override val list: NodeList
    ) : Incomplete {
        override val isActive: Boolean get() = false
        override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    4.COMPLETING与CANCELLING

        COMPLETING与CANCELLING是类型为Finishing的对象。当一个Finishing类型的对象的isCompleting为true的时候,就是COMPLETING状态。当一个Finishing类型的对象的rootCause不为空的时候,就是CANCELLING状态。如果rootCause不为空,同时isCompleting为true的时候,如何判断状态?

        其实,这两个状态都是必须要执行的,都是不稳定的过度状态,如果先进入完成状态,则会触发取消,如果先进行了取消,则会触发进入完成状态,最后都是进入到Finial状态。

    // 这里的参数list其实没有意义
    private class Finishing(
        override val list: NodeList,
        isCompleting: Boolean,
        rootCause: Throwable?
    ) : SynchronizedObject(), Incomplete {
        // 协程是否完成
        private val _isCompleting = atomic(isCompleting)
        var isCompleting: Boolean
            get() = _isCompleting.value
            set(value) { _isCompleting.value = value }
    
        // 触发协程取消的根本异常
        private val _rootCause = atomic(rootCause)
        var rootCause: Throwable?
            get() = _rootCause.value
            set(value) { _rootCause.value = value }
    
        // 协程的全局异常,取值可能为null、Throwable、ArrayList、SEALED
        private val _exceptionsHolder = atomic<Any?>(null)
        private var exceptionsHolder: Any?
            get() = _exceptionsHolder.value
            set(value) { _exceptionsHolder.value = value }
    
        // 表示当前状态是否密封,密封表示当前状态已经完成了协程异常的分析。
        // 再次调用cancel方法将返回false,同时不接受再添加异常
        val isSealed: Boolean get() = exceptionsHolder === SEALED
        // 表示协程是否处于取消状态
        val isCancelling: Boolean get() = rootCause != null
        // 与isCancelling状态相反
        override val isActive: Boolean get() = rootCause == null
    
        // 密封当前的状态,并且获取最终的异常列表
        fun sealLocked(proposedException: Throwable?): List<Throwable> {
            // 根据exceptionsHolder的类型进行判断,获取存储异常的列表
            val list = when(val eh = exceptionsHolder) { // volatile read
                null -> allocateList() // 为空,则创建一个的空列表
                is Throwable -> allocateList().also { it.add(eh) } // 如果只有一个异常,则加入到列表中
                is ArrayList<*> -> eh as ArrayList<Throwable> // 已经是列表了,则直接类型转换
                else -> error("State is $eh") // 已经密封了
            }
            // 获取触发取消的根本异常
            val rootCause = this.rootCause // volatile read
            // 不为空,则添加到第一个位置
            rootCause?.let { list.add(0, it) }
            // 额外传入的异常不为空,也不是触发取消的根本异常,则添加到异常列表中
            if (proposedException != null && proposedException != rootCause) list.add(proposedException)
            // 进行密封!!!!
            exceptionsHolder = SEALED
            // 返回异常列表
            return list
        }
    
        // 向状态中添加一个异常`
        fun addExceptionLocked(exception: Throwable) {
            // 获取触发取消的根本异常
            val rootCause = this.rootCause // volatile read
             // 若为空,则作为根本异常保存,并返回
            if (rootCause == null) {
                this.rootCause = exception
                return
            }
            
            // 如果等于根本异常,直接返回
            if (exception === rootCause) return
            // 根据exceptionsHolder的类型进行判断
            when (val eh = exceptionsHolder) { // volatile read
                null -> exceptionsHolder = exception // 为空,则直接保存在exceptionsHolder
                is Throwable -> { // 如果exceptionsHolder之前已经保存了一个异常
                    // 如果两个异常相等,则直接返回
                    if (exception === eh) return
                    // 如果是两个不同的异常,则创建列表,保存两个异常
                    exceptionsHolder = allocateList().apply {
                        // 注意顺序!!
                        add(eh)
                        add(exception)
    
                    }
                }
                // 如果本身已经保存了多个异常,则在添加一个
                is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
                else -> error("State is $eh") // 已经密封了
            }
        }
    
        // 分配一个初始容量为4的列表
        private fun allocateList() = ArrayList<Throwable>(4)
    
        override fun toString(): String =
            "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
    }
    
    @SharedImmutable
    private val SEALED = Symbol("SEALED")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94

        Finishing类提供了一个密封机制。当Finishing类的状态已经处理完异常时,会对状态进行密封,密封后的状态将不被允许再向其中添加异常。同时,密封后调用cancel方法,会直接返回false。

    5.FINAL_C与FINAL_R

        FINAL_C与FINAL_R是两个最终状态,如果是取消引起的协程结束与完成,则会进入FINAL_C状态,最终对应的是一个CompletedExceptionally类型的对象。如果是正常执行触发的正常或异常的完成,则会进入FINAL_R状态,对应的对象可能为空,可能为执行的返回值,可能为一个异常。

    internal open class CompletedExceptionally(
        @JvmField public val cause: Throwable,
        handled: Boolean = false
    ) {
        private val _handled = atomic(handled)
        // 表示异常是否已经处理
        val handled: Boolean get() = _handled.value
        fun makeHandled(): Boolean = _handled.compareAndSet(false, true)
        override fun toString(): String = "$classSimpleName[$cause]"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三.join方法

        join方法用于挂起当前协程,在协程A中调用协程B的join方法后,协程A会等到协程B执行完毕后再恢复执行。join方法的实现在JobSupport类中,代码如下:

    public final override suspend fun join() {
        // 调用joinInternal方法
        if (!joinInternal()) {
            // 对当前协程的上下文进行检查,检查是否是完成状态
            coroutineContext.checkCompletion()
            // 直接返回,不挂起
            return
        }
        // 挂起
        return joinSuspend()
    }
    
    private fun joinInternal(): Boolean {
        // 循环
        loopOnState { state ->
            // 如果当前是FINAL_*状态,返回false
            if (state !is Incomplete) return false
            // 启动协程,返回结果不为RETRY,则返回true
            if (startInternal(state) >= 0) return true
        }
    }
    
    // 检查上下文
    internal fun CoroutineContext.checkCompletion() {
        // 获取上下文的Job对象
        val job = get(Job)
        // 如果job不为空,同时还未启动,则抛出异常
        if (job != null && !job.isActive) throw job.getCancellationException()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

        join方法中最终通过joinSuspend方法,实现挂起执行,代码如下:

    private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
        cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
    }
    
    • 1
    • 2
    • 3

        joinSuspend方法内部调用了suspendCancellableCoroutine方法。

    1.suspendCancellableCoroutine方法

        这个方法用于挂起当前的协程,同时返回一个可取消的续体。代码如下:

    public suspend inline fun <T> suspendCancellableCoroutine(
        crossinline block: (CancellableContinuation<T>) -> Unit
    ): T =
        suspendCoroutineUninterceptedOrReturn { uCont ->// 核心方法,返回一个续体
            // 将返回的续体包装成可取消的续体,这里进行了拦截
            val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
            // 将cancellable绑定到uCont.intercepted()返回的续体上,
            // 这样当外部取消内部会得到通知
            cancellable.initCancellability()
            // 执行任务
            block(cancellable)
            // 获取结果
            cancellable.getResult()
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    1)suspendCoroutineUninterceptedOrReturn方法

        调用该方法会获取当前挂起函数中的续体,并作为参数,去调用block执行。如果需要挂起,则需要在block中返回COROUTINE_SUSPENDED,并且在合适的时间去手动的调用续体的resumeWith方法,来恢复执行。如果不需要挂起,则不可以对续体进行任何操作。

        该方法暴露的续体未经过拦截器拦截,续体将在被调用的线程上去执行。可以手动调用intercepted方法,对续体进行拦截。

        该方法是协程的内部实现,代码不可见。方法声明如下:

    @SinceKotlin("1.3")
    @InlineOnly
    @Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
    public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
        // 通知虚拟机,block只能调用一次
        contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
        throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

        suspendCoroutineUninterceptedOrReturn方法图解:
    在这里插入图片描述

    2)CancellableContinuationImpl类

        CancellableContinuationImpl类与JobSupport类类似,内部也是通过状态机实现的。不同之处在于,它只能有取消监听器,而且最多只能有一个。一旦发生取消,会立即回调取消监听器。它的取消监听器在设置后不能取消。

        CancellableContinuationImpl类中有两个状态机,一个是决策状态机,用于控制挂起和恢复,状态保存在全局变量_decision中,代码如下:

    private const val UNDECIDED = 0 // 初始状态
    private const val SUSPENDED = 1 // 挂起状态
    private const val RESUMED = 2 // 恢复状态
    
    private val _decision = atomic(UNDECIDED)
    
    • 1
    • 2
    • 3
    • 4
    • 5

        对应的状态转移图如下:
    在这里插入图片描述
        trySuspend与tryResume方法代码如下:

    private fun trySuspend(): Boolean {
        // 循环
        _decision.loop { decision ->
            when (decision) {
                // 进入挂起状态
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }
    
    private fun tryResume(): Boolean {
         // 循环
        _decision.loop { decision ->
            when (decision) {
                // 进入恢复状态
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

        另一个状态机则类似于JobSupport类中的状态机器,状态保存在全局变量_state中,代码如下:

    private val _state = atomic<Any?>(Active)
    
    • 1

        状态的划分与对应的外部状态如下:

    内部状态名内部实现类或对象外部状态备注
    ACTIVEActiveActive没有监听器
    SINGLE_ACancelHandlerActive有一个取消监听器
    CANCELLEDCancelledContinuationCancelled取消的最终状态
    COMPLETEDCompleted完成状态

        与JobSupport类相比,CancellableContinuationImpl类的状态类更多的是作为标记来表示一个状态,代码如下:

    internal interface NotCompleted
    
    // 实现了NotCompleted接口
    private object Active : NotCompleted {
        override fun toString(): String = "Active"
    }
    
    // 实现了NotCompleted接口
    internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
    
    // 继承自CompletedExceptionally
    internal class CancelledContinuation(
        continuation: Continuation<*>,
        cause: Throwable?,
        handled: Boolean
    ) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally"), handled) {
        private val _resumed = atomic(false)
        fun makeResumed(): Boolean = _resumed.compareAndSet(false, true)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3)getResult方法

        在suspendCancellableCoroutine方法中,最后调用了CancellableContinuationImpl类的getResult方法,代码如下:

    @PublishedApi
    internal fun getResult(): Any? {
        // 与initCancellability方法相同,监听取消回调
        setupCancellation()
        // 尝试挂起,成功则返回COROUTINE_SUSPENDED挂起
        if (trySuspend()) return COROUTINE_SUSPENDED
        // 获取状态
        val state = this.state
        if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
        // 如果当前的模式是取消模式
        if (resumeMode.isCancellableMode) {
            // 获取父协程
            val job = context[Job]
            // 如果父协程被取消
            if (job != null && !job.isActive) {
                // 获取异常
                val cause = job.getCancellationException()
                // 通知回调取消监听器,并修改对应状态
                cancelCompletedResult(state, cause)
                // 抛出异常
                throw recoverStackTrace(cause, this)
            }
        }
        // 获取结果
        return getSuccessfulResult(state)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

        到这里join方法实现了协程的挂起。

        接下来分析,如果在挂起过程中协程发生了取消,将如何恢复续体的执行?

    2.disposeOnCancellation方法

    private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
        cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
    }
    
    • 1
    • 2
    • 3

        返回到joinSuspend方法中,在获取到续体cont后,首先将续体和当前的Job对象,封装成了ResumeOnCompletion类的对象,接着调用了asHandler方法,转换成了(cause: Throwable?) -> Unit类型的lambda表达式,然后调用了invokeOnCompletion方法,注册协程执行完成时的回调,同时获取了一个DisposableHandle类型的对象,最后,作为参数传入到disposeOnCancellation方法中,代码如下:

    @InternalCoroutinesApi
    public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
        invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)
    
    • 1
    • 2
    • 3

        该方法是一个扩展方法。该方法将传入的参数,进一步封装成了DisposeOnCancel对象,最终作为参数,通过调用CancellableContinuation类的invokeOnCancellation方法,注册监听取消。

        假设在挂起到过程中,协程取消会调用DisposeOnCancel的invoke方法,代码如下:

    private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
        override fun invoke(cause: Throwable?) = handle.dispose()
        override fun toString(): String = "DisposeOnCancel[$handle]"
    }
    
    • 1
    • 2
    • 3
    • 4

        DisposeOnCancel类的对象内部会继续调用DisposableHandle对象的dispose方法,这个DisposableHandle是在调用invokeOnCompletion时候返回的,这个对象的实际类型为InvokeOnCompletion,代码如下:

    private class InvokeOnCompletion(
        job: Job,
        private val handler: CompletionHandler
    ) : JobNode<Job>(job)  {
        override fun invoke(cause: Throwable?) = handler.invoke(cause)
        override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
    }
    
    public typealias CompletionHandler = (cause: Throwable?) -> Unit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

        InvokeOnCompletion类的对象内部又回继续调用CompletionHandler类对象的invoke方法,而这个CompletionHandler对象就是之前传入的ResumeOnCompletion类对象,代码如下:

    private class ResumeOnCompletion(
        job: Job,
        private val continuation: Continuation<Unit>
    ) : JobNode<Job>(job)  {
        override fun invoke(cause: Throwable?) = continuation.resume(Unit)
        override fun toString() = "ResumeOnCompletion[$continuation]"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        ResumeOnCompletion类对象的invoke方法,最终调用了续体的reusme方法,恢复了执行。

    四.cancel方法

        cancel方法用于取消一个线程的执行,该方法在JobSupport类中实现,代码如下:

    public override fun cancel(cause: CancellationException?) {
        cancelInternal(cause ?: defaultCancellationException())
    }
    
    @Suppress("NOTHING_TO_INLINE")
    internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
        JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        cancel方法内部调用了cancelInternal方法,并传入取消异常,代码如下:

    public open fun cancelInternal(cause: Throwable) {
        cancelImpl(cause)
    }
    
    • 1
    • 2
    • 3

        cancelInternal方法内部调用了cancelImpl方法,代码如下:

    // 如果异常已经被处理,则返回true,否则返回false
    internal fun cancelImpl(cause: Any?): Boolean {
        // 最终状态,默认已经完成
        var finalState: Any? = COMPLETING_ALREADY
        // 如果协程进入Completing状态
        if (onCancelComplete) {
            // 通知子协程进入完成状态
            finalState = cancelMakeCompleting(cause)
            // 如果返回状态为等待子协程完成,则返回true
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        // 如果状态为已经完成
        if (finalState === COMPLETING_ALREADY) {
            // 则进入Cancelling状态
            finalState = makeCancelling(cause)
        }
        // 返回
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                // 其他状态,通过回调通知
                afterCompletion(finalState)
                true
            }
        }
    }
    
    // 结束的四种状态
    // 已经处于完成状态
    @SharedImmutable
    private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
    // 等待子协程进入完成状态
    @JvmField
    @SharedImmutable
    internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
    // 再次重试
    @SharedImmutable
    private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
    // 由于Finishing类已经密封,因此无法取消
    @SharedImmutable
    private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
    
    // 该变量返回true,表示当前的协程没有block任务待完成,
    // 应该马上进入Completing状态,等待子协程完成
    internal open val onCancelComplete: Boolean get() = false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

        接下来,分析makeCancelling方法如何取消子协程,代码如下:

    private fun makeCancelling(cause: Any?): Any? {
        // 局部的异常缓存
        var causeExceptionCache: Throwable? = null
        // 循环
        loopOnState { state ->
            when (state) {
                // 如果是Completing状态或者Cancelling状态
                is Finishing -> {
                    // 获取最根本的异常
                    val notifyRootCause = synchronized(state) {
                        // 如果已经密封,则返回TOO_LATE_TO_CANCEL
                        if (state.isSealed) return TOO_LATE_TO_CANCEL
                        // 是否已经处于Cancelling状态
                        val wasCancelling = state.isCancelling
                        // 如果传入的异常不为空,或者还未处于Cancelling状态
                        if (cause != null || !wasCancelling) {
                            // 则对异常进行包装,并记录到状态中
                            val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                            state.addExceptionLocked(causeException)
                        }
                        // 返回根本异常,如果wasCancelling为false,上面addExceptionLocked
                        // 会把causeException保存到rootCause
                        state.rootCause.takeIf { !wasCancelling }
                    }
                    // 如果根本异常不为空,说明已经进入Cancelling状态,则通知子协程取消
                    notifyRootCause?.let { notifyCancelling(state.list, it) }
                    // 返回COMPLETING_ALREADY
                    return COMPLETING_ALREADY
                }
                // 如果还没有进入Completing状态或者Cancelling状态
                is Incomplete -> {
                    // 获取异常
                    val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                    // 如果处于启动的状态
                    if (state.isActive) {
                        // 调用tryMakeCancelling方法,尝试进入Cancelling状态,
                        // 成功则返回COMPLETING_ALREADY
                        if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
                    } else {// 如果还未启动
                        // 则尝试进入Completing状态,
                        // 封装成CompletedExceptionally对象
                        val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
                        // 根据返回状态处理
                        when {
                            finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
                            finalState === COMPLETING_RETRY -> return@loopOnState // 继续循环
                            else -> return finalState
                        }
                    }
                }
                // 其他状态
                else -> return TOO_LATE_TO_CANCEL
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

        makeCancelling方法调用notifyCancelling方法取消子协程的执行,代码如下:

    private fun notifyCancelling(list: NodeList, cause: Throwable) {
        onCancelling(cause)
        notifyHandlers<JobCancellingNode<*>>(list, cause)
        cancelParent(cause)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

        notifyCancelling中做了三件事,首先是回调onCancelling方法,代码如下:

    protected open fun onCancelling(cause: Throwable?) {}
    
    • 1

        当协程完成或者取消时,就会调用一次这个方法。该方法与调用invokeOnCompletion方法并把参数onCancelling设置为true相似。如果cause为空,说明协程是正常执行结束。如果cause为CancellationException类型的对象,说明协程是正常取消,如果是其他的异常,说明是执行过程中发生错误导致。

        接着,在notifyCancelling方法中调用notifyHandlers方法,取消监听器和子协程,代码如下:

    private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
        var exception: Throwable? = null
        // 遍历调用invoke方法
        list.forEach<T> { node ->
            try {
                node.invoke(cause)
            } catch (ex: Throwable) {
                exception?.apply { addSuppressedThrowable(ex) } ?: run {
                    exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
                }
            }
        }
        // 如果取消的过程中发生异常,则调用handleOnCompletionException方法处理
        exception?.let { handleOnCompletionException(it) }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

        如果node是监听器,那它就是(cause: Throwable?) -> Unit类型的lambda表达式,则这时调用invoke方法会直接回调。

        如果node是子协程,那就是一个ChildHandleNode类型的对象,ChildHandleNode对象是在父子协程绑定时返回的对象,后面会提到,它的代码如下:

    internal class ChildHandleNode(
        parent: JobSupport,
        @JvmField val childJob: ChildJob
    ) : JobCancellingNode<JobSupport>(parent), ChildHandle {
        override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
        override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
        override fun toString(): String = "ChildHandle[$childJob]"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

        调用invoke方法,回调用子协程的parentCancelled方法,该方法代码如下:

    public final override fun parentCancelled(parentJob: ParentJob) {
        cancelImpl(parentJob)
    }
    
    • 1
    • 2
    • 3

        parentCancelled方法也是通过cancelImpl方法实现的。

        最后,在notifyCancelling方法中调用了cancelParent方法取消父协程,代码如下:

    // 该方法返回true,表示父协程将处理异常,返回false,则表示不处理
    private fun cancelParent(cause: Throwable): Boolean {
        // 如果当前的协程是作用域协程,则直接返回true
        if (isScopedCoroutine) return true
    
        // 判断当前的异常是否是正常的取消
        // 如果子协程是正常的取消,父协程不需要取消自身,除非子协程抛出的是其他的异常
        val isCancellation = cause is CancellationException
        // 获取父协程的handler
        val parent = parentHandle
        // 如果当前协程没有父协程,说明他是顶级协程
        if (parent === null || parent === NonDisposableHandle) {
            // 直接返回
            return isCancellation
        }
    
        // 如果不是顶级协程,则调用childCancelled方法取消自身
        return parent.childCancelled(cause) || isCancellation
    }
    
    // 作用域协程是在一个封闭作用域内顺序的执行任务,没有任何的并发
    // 作用域协程会将遇到的任何错误都抛出
    // coroutineScope、withTimeout、runBlocking、ScopeCoroutine都是作用域协程
    protected open val isScopedCoroutine: Boolean get() = false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

        parent是一个ChildHandle接口指向的对象,该接口最终被上面提到的ChildHandleNode类实现。childCancelled方法最终又会调用JobSupport类的childCancelled方法,代码如下:

    // 该方法返回值表示父协程是否取消自身
    public open fun childCancelled(cause: Throwable): Boolean {
        // 如果是正常取消,则直接返回true
        if (cause is CancellationException) return true
        // 取消自身
        return cancelImpl(cause) && handlesException
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        childCancelled方法也是通过cancelImpl方法实现的。

        在Kotlin协程:协程的基础与使用中提到的supervisorScope方法不受子协程取消影响,就是重写了这个方法,代码如下:

    public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R {
        contract {
            callsInPlace(block, InvocationKind.EXACTLY_ONCE)
        }
        // 获取续体
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            // 将续体封装成SupervisorCoroutine协程
            val coroutine = SupervisorCoroutine(uCont.context, uCont)
            // 启动协程执行block
            coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
    
    private class SupervisorCoroutine<in T>(
        context: CoroutineContext,
        uCont: Continuation<T>
    ) : ScopeCoroutine<T>(context, uCont) {
        // 永远返回false
        override fun childCancelled(cause: Throwable): Boolean = false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    五.delay方法

        delay方法用于暂停协程的执行,等到指定时间后再恢复协程的执行。delay方法不会挂起线程,同时delay方法导致的挂起是可以被取消中断的。delay方法没有定义在Job接口中,是一个独立的方法,代码如下:

    public suspend fun delay(timeMillis: Long) {
        // 时间小于0,没有意义,直接返回
        if (timeMillis <= 0) return
        // 直接挂起,返回可取消的续体
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            // 如果时间超过Long.MAX_VALUE,相当于一直等待,可以调用awaitCancellation方法
            if (timeMillis < Long.MAX_VALUE) {
                // 核心实现
                cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

        在上面的代码中,如果时间满足要求,会调用suspendCancellableCoroutine方法挂起,挂起后会获取续体cont的上下文context,并从上下文context中获取一个delay对象,代码如下:

    internal val CoroutineContext.delay: Delay get() = 
        get(ContinuationInterceptor) as? Delay ?: DefaultDelay
    
    • 1
    • 2

        delay对象是一个扩展属性,会从上下文中获取拦截器,并强制转换成Delay接口指向的对象。如果转换失败,则使用默认的DefaultDelay对象。

        接下来会调用Delay接口中定义的scheduleResumeAfterDelay方法,在等待指定时间后,恢复协程的执行。

    1.DefaultDelay

        如果使用的是默认的DefaultDelay对象,则对应的调度器是DefaultExecutor,代码如下:

    internal actual val DefaultDelay: Delay = DefaultExecutor
    
    • 1

        DefaultExecutor类继承了EventLoopImplBase类,EventLoopImplBase类就是在Kotlin协程:续体、续体拦截器、调度器中提到的实现了Delay接口的EventLoop。代码如下:

    private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
    
    public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        // 转换成纳秒,保证执行精度
        val timeNanos = delayToNanos(timeMillis)
        // 时间范围判断
        if (timeNanos < MAX_DELAY_NS) {
            // 获取当前时间
            val now = nanoTime()
            // 封装成DelayedResumeTask
            DelayedResumeTask(now + timeNanos, continuation).also { task ->
                // 注册续体取消监听
                continuation.disposeOnCancellation(task)
                // 将任务加入到队列中
                schedule(now, task)
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

        schedule方法内部最核心的执行会将任务加入到延迟队列中。这个延迟队列的内部是用数组实现的堆,延时任务在添加到队列时会按延迟时间排序。

        每次调用EventLoopImplBase类对象的processNextEvent方法时,会优先判断延迟队列有没有任务,如果有,则继续判断延迟队列第一个任务是否满足时间要求,满足则从延迟队列队首取出,加入到任务队列队尾。之后从任务队列队首取出一个任务执行。

    2.HandlerContext

        如果是在主线程执行协程时调用delay方法,则最后会通过HandlerContext调度器实现延迟,代码如下:

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        // 将恢复续体执行的任务封装成Runnable
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        // 延迟执行
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        // 监听取消回调
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

        最终通过Handler实现的。

    3.永久等待

        如果需要实现永远等待,可以使用awaitCancellation方法,该方法代码如下:

    public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
    
    • 1

        该方法直接调用suspendCancellableCoroutine方法挂起协程,只有取消协程时才可以中断。

    六.yield方法

        yield方法用于挂起当前协程的执行,并让出调度当前协程执行的线程,去执行其他的协程,代码如下:

                                       // 获取续体
    public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 获取续体的上下文
        val context = uCont.context
        // 检查当前协程是否完成
        context.checkCompletion()
        // 获取拦截后的续体,转换成DispatchedContinuation类型的对象,转换失败则返回
        val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
        // 判断是否需要调度
        if (cont.dispatcher.isDispatchNeeded(context)) {
            // 如果需要,则直接执行
            cont.dispatchYield(context, Unit)
        } else { // 如果不需要调度,可能立即去执行或为Unconfined调度器
            // 创建一个YieldContext上下文,来检测Unconfined调度器
            val yieldContext = YieldContext()
            // 检测Unconfined调度器
            cont.dispatchYield(context + yieldContext, Unit)
            // 如果为Unconfined调度器
            if (yieldContext.dispatcherWasUnconfined) {
                // 调用为Unconfined调度器执行,成功则挂起
                return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
            }
            // 如果不是Unconfined调度器,则在dispatchYield方法中已经被调度了
        }
        // 返回挂起
        COROUTINE_SUSPENDED
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    1.YieldContext类与Unconfined类

        YieldContext类是一个上下文对象,用来检测Unconfined调度器是否存在,其内部只有一个变量,代码如下:

    internal class YieldContext : AbstractCoroutineContextElement(Key) {
        companion object Key : CoroutineContext.Key<YieldContext>
    
        // 默认为false
        @JvmField
        var dispatcherWasUnconfined = false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        在上面的代码中,当调用DispatchedContinuation类的dispatchYield方法,会调用调度器的dispatchYield方法,根据Kotlin协程:续体、续体拦截器、调度器的分析,dispatchYield方法默认是通过dispatch方法实现的。因此最终会调用Unconfined调度器的dispatch方法。

        Unconfined调度器不会将任务限制到指定的线程上执行。它的dispatch方法根本不会进行调度,所以isDispatchNeeded永远返回false,代码如下:

    internal object Unconfined : CoroutineDispatcher() {
        // 永远返回false
        override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
    
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            // 获取YieldContext
            val yieldContext = context[YieldContext]
            // 若不为空
            if (yieldContext != null) {
                // 设置为true,并返回
                yieldContext.dispatcherWasUnconfined = true
                return
            }
            throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
                "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
                "isDispatchNeeded and dispatch calls.")
        }
        
        override fun toString(): String = "Dispatchers.Unconfined"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.yieldUndispatched方法

        如果Unconfined调度器存在,则会调用yieldUndispatched方法执行,代码如下:

    internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
        executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
            run()
        }
    
    • 1
    • 2
    • 3
    • 4

        yieldUndispatched方法是DispatchedContinuation类的一个扩展方法,内部通过executeUnconfined方法实现,代码如下:

    private inline fun DispatchedContinuation<*>.executeUnconfined(
        contState: Any?, mode: Int, doYield: Boolean = false,
        block: () -> Unit
    ): Boolean {
        assert { mode != MODE_UNINITIALIZED }
        // 获取当前线程的EventLoop
        val eventLoop = ThreadLocalEventLoop.eventLoop
        // 如果确实调用了yield方法,但是任务队列是空的,
        // 说明只有我们这一个任务,那么返回false,不用挂起
        if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
        // 如果EventLoop当前还在被Unconfined调度器使用
        return if (eventLoop.isUnconfinedLoopActive) {
            // 保存状态
            _state = contState
            resumeMode = mode
            // 调用EventLoop执行,防止StackOverflow
            eventLoop.dispatchUnconfined(this)
            true // 返回true,表示已经添加到队列了
        } else { // 如果EventLoop不可用
            // 重新启动EventLoop
            runUnconfinedEventLoop(eventLoop, block = block)
            false // 返回false
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

        接下来分析,runUnconfinedEventLoop方法如何重新启动EventLoop,代码如下:

    internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
        eventLoop: EventLoop,
        block: () -> Unit
    ) {
        // 记录使用
        eventLoop.incrementUseCount(unconfined = true)
        try {
            // 先执行任务
            block()
            // 循环分发
            while (true) {
                // 分发执行任务,返回false,说明任务队列空了
                if (!eventLoop.processUnconfinedEvent()) break
            }
        } catch (e: Throwable) {
            // 异常处理
            handleFatalException(e, null)
        } finally {
            // 记录取消使用
            eventLoop.decrementUseCount(unconfined = true)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

        通过runUnconfinedEventLoop方法可以知道,executeUnconfined方法最后在调用runUnconfinedEventLoop后返回false,是因为任务已经执行完毕,不需要挂起。

    3.DispatchedContinuation类

        当协程让出调度执行后,会被加入到队列,之后会再次被执行。根据yieldUndispatched方法,最后会调用DispatchedContinuation类的run方法,DispatchedContinuation类继承自DispatchedTask类,调用DispatchedTask类的run方法,最终会调用DispatchedContinuation类的resumeWith方法,代码如下:

    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        // 如果需要调度,切换回线程
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            // 由调度器进行处理
            dispatcher.dispatch(context, this)
        } else { // 如果不需要调度,说明是Unconfined调度器
            // 通过EventLoop执行
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    // 恢复续体的执行
                    continuation.resumeWith(result)
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

        最后通过调用续体的resumeWith方法恢复执行。

        yield方法图解:
    在这里插入图片描述

  • 相关阅读:
    数据库的功能模块图怎么画,就以我图所示
    LVS集群
    inno Setup 打包Java exe可执行文件和MySQL数据库,无需额外配置实现一键傻瓜式安装
    PostMan如何联调SignalR WebSockets
    Ts interface 和 type 的区别?
    初识 - Linux
    运输层(计算机网络谢希仁第八版)——学习笔记五
    Tomcat运维以及优化
    计算机网络复习总结5
    2 JavaScript in HTML
  • 原文地址:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126202801