• kotlin coroutine源码解析之suspend挂起函数原理


    suspend挂起函数

    在idea中写某些协程函数的时候,会有一个绿色箭头图标的出现,如下图:
    在这里插入图片描述
    而且这些方法不放在协程里面写的话,idea编辑器还会报错,如下图:
    在这里插入图片描述
    上面所说的这些方法就是挂起函数,挂起函数必须要在协程中调用,或者在挂起函数中调用;放在挂起函数中调用挂起函数调用,那么说明还是间接在协程中被调用,也就是挂起函数调用需要协程环境。
    在说挂起函数原理之前,先复习下 launch启动过程 所说的三类continuation:

    1. DispatchedContinuation用于分发continuation到指定的线程池中;
    2. ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类;
    3. StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。

    join原理

    启动一个launch的时候,返回一个Job对象,这个Job对象对应着上面三个continuation实例,其中我们的业务代码在ContinuationImpl中,例如下面的代码:

      val myScope = CoroutineScope(CoroutineName("name") + IO + Job())
    
      val job = myScope.launch(CoroutineName("job")) {
    
          var job2 : Job? = launch(CoroutineName("job2")) {
              Thread.sleep(20000)
          }
    
          job2?.join()
    
          printlnM("job has run to end")
      }
    
      printlnM("job has launch")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这段代码编译后的代码如下:

    BuildersKt.launch$default(
       myScope,
       (CoroutineContext)(new CoroutineName("job")),
       (CoroutineStart)null,
       (Function2)(new Function2((Continuation)null) {
           // $FF: synthetic field
           private Object L$0;
           int label;
    
           public final Object invokeSuspend(@NotNull Object $result) {
               Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
               switch(this.label) {
                   case 0:
                       ResultKt.throwOnFailure($result);
                       CoroutineScope $this$launch = (CoroutineScope)this.L$0;
                       //创建job2
                       Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)
                       
                       this.label = 1;
                       //调用join方法
                       if (job2.join(this) == var5) {
                           return var5;
                       }
                       break;
    
                   case 1:
                       ResultKt.throwOnFailure($result);
                       break;
    
                   default:
                       throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
               }
    
               MainActivity.Companion.printlnM("job has run to end");
               return Unit.INSTANCE;
           }
    	    //省略。。。
       }), 
       2, (Object)null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)job2的省略代码后面贴出,不影响代码流程。

    可以看到,我们的业务代码被编译到invokeSuspend()函数中了,是通过switch case语句来运行对应的代码逻辑,
    第一步:case 0执行的是Job job2 = BuildersKt.launch$default(/*省略。。。*/) 也就是逻辑代码里面的创建job2那句代码,然后this.label = 1; ,label设置为1,接着调用job2.join()方法,join方法一般都是返回COROUTINE_SUSPENDED的,那么在这里就返回结束了,除了立马就有返回值的挂起函数返回值,这个if判断就不成立了,直接走下面第三步;
    第二步:label为1了,那么case 1的情况成立,入参设置一下异常,这个入参是上一步传递进来的,没有异常的话这里是空的,break;
    第三步:调用printlnM("job has run to end");打印语句。

    我们可以看到,遇到有函数返回COROUTINE_SUSPENDED的,那么invokeSuspend函数会立刻return结束掉。按照之前 launch启动过程 分析的,launch的lambda逻辑代码被调用的调用链是:

    DispatchedContinuation -> ContinuationImpl(内部调用了invokeSuspend) -> StandAloneCoroutine
    
    • 1

    如果invokeSuspend函数因为返回了COROUTINE_SUSPENDED直接结束掉的话,岂不是这个Job2协程的代码还没有跑完,就结束了?显然是不行的,仔细观察发现,只需要多次调用invokeSuspend方法,label会随着每次调用都会递增或者变动,那么对应的case一定会让所有的代码都执行一遍的。
    可以把这种switch case语句当成是人们常说的状态机模式,在安卓开发中,常用的handler.sendMessage就很类似状态机模式,根据不同的what去处理不同的逻辑,只是协程会将顺序代码,根据case分成一个接着一个连续的代码段。

    这里我们直接去跟踪join源码看是怎么实现的:

    job2.join(this)
    
    public final override suspend fun join() {
        if (!joinInternal()) { // fast-path no wait
            coroutineContext.checkCompletion()
            return // do not suspend
        }
        return joinSuspend() // slow-path wait
    }
    
    private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
        // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
        cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    主要看joinSuspend函数,里面有几个重要的点:

    1. suspendCancellableCoroutine { cont -> 让当前协程在调用该函数处挂起,给当前协程的lambda代码块提供一个CancellableContinuation。
        suspendCoroutineUninterceptedOrReturn { uCont ->
            val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
            /*
             * For non-atomic cancellation we setup parent-child relationship immediately
             * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
             * properly supports cancellation.
             */
            cancellable.initCancellability()
            block(cancellable)
            cancellable.getResult()
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    首先创建一个CancellableContinuationImpl类型的continuation,入参是uCont.intercepted(),这个uCont是调用的协程,那么就是val job这个协程,uCont.intercepted()也就是 launch启动过程 那章分析的DispatchedContinuation对象,由于val job = myScope.launch(CoroutineName("job")) 这句话已经将DispatchedContinuation生成了,所有会复用之前的对象,代码如下:

        public fun intercepted(): Continuation<Any?> =
            intercepted
                ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
    
    • 1
    • 2
    • 3
    • 4

    那么cancellable对象持有的就是val job的DispatchedContinuation,

    接着在调用 cancellable.initCancellability()

    private fun setupCancellation() {
        //省略
        val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
        parent.start() // make sure the parent is started
        val handle = parent.invokeOnCompletion(
            onCancelling = true,
            handler = ChildContinuation(parent, this).asHandler
        )
        parentHandle = handle
    	//省略
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    看代码,val handle = parent.invokeOnCompletion 之前分析过,是parent和当前job组成父子关系的作用,parent代表val job,当前job就是cancellable对象,意思就是父子Job可以相互取消对方,和之前父子Job关联的分析是差不多的,那么val jobcancellable组成了父子关系。

    在调用 block(cancellable),这个就进入下面2了。

    1. invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler),这个方法是将入参ResumeOnCompletion和当前的JobSupport关联起来,加入到当前Jobsupport.state.list中,asHandler返回一个DisposableHandler然后将这个handler加入到cont的state.list中去,cont也是上面1中变量cancellable,是CancellableContinuationImpl类型的继承自continuation,这样val Job2和新创建的cancellable也组成了父子关系了。

    2. ResumeOnCompletion类型,传入的this,和cont参数,用来恢复cont协程的作用:

    private class ResumeOnCompletion(
        job: Job,
        private val continuation: Continuation<Unit>
    ) : JobNode<Job>(job)  {
        override fun invoke(cause: Throwable?) = continuation.resume(Unit)
        override fun toString() = "ResumeOnCompletion[$continuation]"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    调用了invoke -> continuation.resume(Unit),这样continuation就会继续执行了。

    经过上面的分析之后,可以看出整个Job树的结构如下:
    在这里插入图片描述

    看图:其中cancellable是可以被job取消的,取消之后,会将cancellable移除掉,cancellable被取消后,会遍历自己的state.list列表调用invoke方法,那么就会调用DIsposableHandle的invoke方法,将自己从job2中移除掉,cancellable就失去了作用。

    结合一下job2的被编译后的代码,Job2编译后的代码如下:

    BuildersKt.launch$default(
    	$this$launch, 
    	(CoroutineContext)(new CoroutineName("job2")),
     	(CoroutineStart)null, 
    	(Function2)(new Function2((Continuation)null) {
          int label;
          @Nullable
          public final Object invokeSuspend(@NotNull Object var1) {
              Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
              switch(this.label) {
                  case 0:
                      ResultKt.throwOnFailure(var1);
                      Thread.sleep(1000L);
                      return Unit.INSTANCE;
                  default:
                      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
              }
          }
    	  //省略。。。
      }), 
      2, (Object)null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    invokeSuspend方法被调用后,休眠了1s中,之后结束了,就会继续调用completion的resume方法,这样就会调用到job2对应的

    AbstractCoroutine.resumeWith -> makeCompletingOnce -> 
    notifyHandlers(list: NodeList, cause: Throwable?) -> 
    (ResumeOnCompletion) node.invoke(cause) -> continuation.resume(Unit) ->
    
    • 1
    • 2
    • 3

    进入到CancellableContinuationImpl类中,

    CancellableContinuationImpl.resumeWith -> resumeImpl -> 
    dispatchResume -> dispatch -> dispatcher.dispatch(context, this)
    
    • 1
    • 2

    上面已经分析了dispatcher.dispatch(context, this) dispatcher这个对象是根val job的dispatcher,在val joblaucnh的时候已经创建好了的,此时正在挂起中,所以这里调用dispatch就有下面的调用链:

    BaseContinuationImpl..resumeWith() ->   val outcome = invokeSuspend(param)  ->
    completion.resumeWith(outcome)
    
    • 1
    • 2

    job之前在挂起点结束了,现在又再一次被CancellableContinuationImpl类型的continuation调用,那么job就从这个挂起点被唤醒了。这样就从case 0,运行到case 1了,然后job就结束了。

    await原理

    将上面代码的Join换成await方法,代码如下:

      val myScope = CoroutineScope(CoroutineName("name") + IO + Job())
    
      val job = myScope.launch(CoroutineName("job")) {
    
           var deferred = async (CoroutineName("job2")) {
              Thread.sleep(20000)
          }
    
          job2?.await()
    
          printlnM("job has run to end")
      }
    
      printlnM("job has launch")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    原理其实和Join方法差不多,流程是一样的,只是其中的某些节点的类型不一样,代码如下:

    private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
        val cont = AwaitContinuation(uCont.intercepted(), this)
        
        cont.initCancellability()
     	cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
        cont.getResult()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 创建一个AwaitContinuation类型的continuation,
    2. 然后调用initCancellability方法,这个在上一节讲过,是cont节点和自己的父节点产生父子关系关联,这个父节点也就是uCont,intercepted() 对象,也就是val job 的协程,
    3. 接着
    cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
    
    • 1

    这句话在join方法里面看到了,是job2和AwaitContinuation产生父子关系的功能。
    那么我们看下ResumeAwaitOnCompletion类型的节点是怎么实现的吧,

    private class ResumeAwaitOnCompletion<T>(
        private val continuation: CancellableContinuationImpl<T>
    ) : JobNode() {
        override fun invoke(cause: Throwable?) {
            val state = job.state
            if (state is CompletedExceptionally) {
                continuation.resumeWithException(state.cause)
            } else {
                continuation.resume(state.unboxState() as T)
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    和ResumeOnCompletion节点其实差不多,只不过调用resume的时候参数值是await的返回值,所以在挂起点恢复的时候,还带有挂起函数执行完成的返回值;
    而且在出现异常的时候,还可以将异常抛出去给父Job处理,这一点好像比Job功能更完善。
    可以画个图描述一下await的Job树:
    在这里插入图片描述

    job2是通过AwaitContinuation完成挂起点恢复的,这个类也是继承子 CancellableContinuationImpl的,只是覆盖fun getContinuationCancellationCause(parent: Job): Throwable用于获取异常信息的接口。
    其他流程完全和join方法一模一样、

    delay函数

    跟踪一下delay函数:

    delay(1000)
        
    public suspend fun delay(timeMillis: Long) {
        if (timeMillis <= 0) return // don't delay
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
    
    internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    和join,await差不多,首先利用delay调用处,父协程作用域的continuation,创建一个CancellableContinuation,让cancellable节点和父协程产生父子关联,该处返回的挂起值COROUTINE_SUSPENDED让父协程挂起,接着调用cancellable的上下文集合context.delay.scheduleResumeAfterDelay方法,该context是父协程上下文集合,意思就是delay对象是父协程的dispatcher元素,这个元素的成员函数scheduleResumeAfterDelay实现的delay功能。
    为了简单理解,展示下HandlerDIspatcher的实现:

    //HandlerDIspatcher
     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
         val block = Runnable {
             with(continuation) { resumeUndispatched(Unit) }
         }
         handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
         continuation.invokeOnCancellation { handler.removeCallbacks(block) }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    就是使用线程自带的Handler延迟发送一个消息,这个消息的runnable的实现是,调用CancellableContinuationresumeUndispatched方法,跟踪之后,调用链条是:

    resumeUndispatched -> resumeImpl -> dispatchResume -> dispatch -> resumeUnconfined -> 
    后面调用父协程的invokeSuspend方法了,同join方法是一样的。
    
    • 1
    • 2

    delay的实质,就是父协程挂起后,延迟delay的一段时间(延迟这段时间,是任务加入队列,等待指定的时间后,线程取出任务然后执行,所以不存在阻塞线程的问题),使用父协程的dispatcher恢复父协程继续执行,有如下图所示:
    在这里插入图片描述

    suspend函数

    通过上面的Join和Await的分析,感觉挂起协程之后,似乎需要恢复协程才可以让协程执行完成未完成的代码的。如果Join和Await方法不创建CancellableContinuationImpl这个continuation节点的话,其实val job协程挂起后就结束了,剩下的代码是不会完成,后面的打印语句是不会执行的。这个直觉是正确的,Suspend函数就是有这个作用的,挂起协程之后,需要逻辑代码在挂起函数内部,主动去调用resume和resumeWithException()方法,用于恢复协程。

    suspend函数挂起代码实例:

    fun SuspendFunc() {
        val myScope = CoroutineScope(Dispatchers.IO)
    
        myScope.launch {
            printlnM("suspend func run before")
    
            testRemoteRequest()
    
            printlnM("suspend func run after")
        }
    }
    
    suspend fun testRemoteRequest():String {
        return suspendCancellableCoroutine { cancellableContinuation ->
            getUserInfo(object : Callback {
                override fun success() {
                    printlnM("成功请求用户信息")
                    cancellableContinuation.resume("成功")
                }
                override fun failed() {
                    printlnM("请求用户信息失败")
                    cancellableContinuation.resumeWithException(Exception("失败"))
                }
            })
        }
    }
    
    interface Callback{
        fun success()
        fun failed()
    }
    
    fun getUserInfo(callback: Callback?) {
        try {
            printlnM("getUserInfo")
            Thread.sleep(3000)
            callback?.success()
        } catch (e:Exception) {
            callback?.failed()
            e.printStackTrace()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    调用挂起函数,创建一个挂起点suspendCancellableCoroutine ,我们的逻辑代码写在suspendCancellableCoroutine 的lambda代码块中,内部调用模拟的网络请求,三秒后,主动调用resume方法,让挂起协程恢复执行。

    打印结果如下:

    2022-11-17 02:59:44.336  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
    2022-11-17 02:59:44.337  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
    2022-11-17 02:59:47.338  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
    2022-11-17 02:59:47.339  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run after
    
    • 1
    • 2
    • 3
    • 4

    可以看到协程运行到getUserInfo,中间过了三秒后才继续执行,说明Job被挂起后恢复执行了。

    现在修改代码,让网络请求失败,看看异常是谁来处理的:

    val myScope = CoroutineScope(Dispatchers.IO + CoroutineExceptionHandler { c,e ->
        printlnM("scopeExceptionHandler : " + e.message)
    })
    
    getUserInfo(object : Callback {
        override fun success() {
            printlnM("成功请求用户信息")
            cancellableContinuation.resumeWithException(Exception("失败"))
        }
        override fun failed() {
            printlnM("请求用户信息失败")
            cancellableContinuation.resumeWithException(Exception("失败"))
        }
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    打印日志如下:

    2022-11-17 03:10:07.952  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
    2022-11-17 03:10:07.953  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
    2022-11-17 03:10:10.954  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
    2022-11-17 03:10:10.955  E/MainActivity: DefaultDispatcher-worker-1 : scopeExceptionHandler : 失败
    
    • 1
    • 2
    • 3
    • 4

    可以看到,协程挂起后,三秒后,网路请求失败,挂起点抛出的异常,被根部的CoroutineExceptionHandler处理了,也就是说挂起函数的异常,也是遵循Job异常处理所说的链路传播。

    根据上面的分析,可以给挂起流程画个时序图,用于展示线程是怎么切换的,怎么恢复协程的,如下图所示:
    在这里插入图片描述

    其中带颜色的方框,代表的是DisptacherContinuation,它持有的线程池执行它持有的的协程,所以可以知道挂起点恢复之后,协程所执行的线程池是保持一致的,对于不同的协程之间,由它继承的Disptahcer决定,或者通过launch传递disptahcer参数覆盖,这样就可以修改协程运行所在的线程了。

    总结

    1. 父JobA的lambda表达式中有挂起函数,协程会在父JobA的挂起点处创建一个CancellableContinuationImpl类型的continuation,这个Cancellable会和父JobA进行父子关联;
    如果挂起函数本身是某个JobB的挂起函数,那么Cancellable还会和JobB组成父子关系,JobB在结束自己的时候,会通知Cancellable自己完成了,Cancellable又会继续通知JobA继续执行lambda的代码块,这样JobA就从挂起点恢复过来了。
    如果挂起函数是由suspendCancellableCoroutine函数完成的,那么需要在lambda代码块中,收到调用resume函数去主动唤起协程。

    2. suspendCancellableCoroutine是挂起函数的重点,这个函数才是挂起函数的实现成为可能。

    3. 由于JobA恢复执行的dispatcher不变,所以JobA的lambda代码在挂起点前后所执行的线程池是一样的,如果是单线程的话,那么线程前后是一样的。而挂起点continuation本身的逻辑代码执行线程由挂起点自己决定。

  • 相关阅读:
    如何在 VMware Workstation 16.2 中安装 Ubuntu 20.04
    Android Compose Bloom 项目实战 (二) : 欢迎页
    「C++小游戏教程」基本技巧(3)——发声函数 Beep()
    leetcode T31:下一排列
    加密算法发展简介
    2024.06.05【读书笔记】丨生物信息学与功能基因组学(第十一章 分子水平的系统发生和进化 第一部分)【AI测试版】
    打造“共富果园” 广东乳源推动茶油全产业链高质量发展
    Oracle/PLSQL: LNNVL Function
    C语言---自定义类型:结构体
    使用Python,dlib进行对象实时追踪
  • 原文地址:https://blog.csdn.net/u012345683/article/details/127895265