• Kotlin 协程二三事:挂起原理


    协程的本质还是线程

    需要知道的是,因为Java虚拟机不支持协程,所以Kotlin的协程跑在Java还是通过线程来实现的。
    通过kotlin编译成的class文件,我们可以看到,协程本质还是通过不同的线程池来实现不同的任务。我们通过Dispatchers类,可以看到协程中出创建的线程池。

    public actual object Dispatchers {
        @JvmStatic
        public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    
        @JvmStatic
        public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    
        @JvmStatic
        public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    
        @JvmStatic
        public val IO: CoroutineDispatcher = DefaultScheduler.IO
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    跟随源码,我们可以看到MainCoroutineDispatcher的实现

    internal class AndroidDispatcherFactory : MainDispatcherFactory {
    
        override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
            HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    由以上代码可以得出,Dispatcher.Main实际上就是一个主线程的Handler,把任务切换到主线程是去执行
    再看一下Default的实现

    private var coroutineScheduler = createScheduler()
    
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    internal class CoroutineScheduler(
        private val corePoolSize: Int,
        private val maxPoolSize: Int,
        private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
        private val schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : Executor,Closeable
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到Dispatchers.default本质上是一个自定义的线程池。根据应用场景的不同,和Dispatcher.IO适用于不同的工作

    状态机

    协程通过状态机实现协程的挂起与恢复,以下是一段协程代码

        fun onCreate() {
            GlobalScope.launch {
                val result = getContent()
                Log.e("test", "onCreate: result = $result")
            }
        }
    
        suspend fun getContent(): String {
            return withContext(Dispatchers.Default) {
                Thread.sleep(3000)
                "xxx"
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们通过tools–>kotlin–>show kotlin bytecode,然后Decompile,可以看到生成的java代码

    BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    		// 状态机状态的值
             int label;
    
             @Nullable
             public final Object invokeSuspend(@NotNull Object $result) {
             	// 挂起状态
                Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                Object var10000;
                switch(this.label) {
                case 0:
                	// 首次进来,默认为0
                   ResultKt.throwOnFailure($result);
                   GobalTest var4 = GobalTest.this;
                   // 此时更新状态
                   this.label = 1;
                   // 执行挂起函数
                   var10000 = var4.getContent(this);
                   // 如果挂起,直接返回挂起值
                   if (var10000 == var3) {
                      return var3;
                   }
                   break;
                case 1:
                	// 第二次进来时,走这,退出循环
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
    
    			// 第二次执行打印语句
                String result = (String)var10000;
                Log.e("test", "onCreate: result = " + result);
                return Unit.INSTANCE;
             }
    
             @NotNull
             public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function2 var3 = new <anonymous constructor>(completion);
                return var3;
             }
    
             public final Object invoke(Object var1, Object var2) {
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
             }
          }), 3, (Object)null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    以上是onCreate部分,我们可以看到,成员变量label代表状态机的状态,当函数运行到挂起点的时候,label的值会发生变化,如果没有发生挂起,则函数继续执行,如果发生了挂起,则函数就直接return,直到协程恢复,再次调到invokeSuspend方法,此时lebal已经发生了变化,所以直接执行case 1的逻辑。

    我们再看下getContent()方法

       @Nullable
       public final Object getContent(@NotNull Continuation $completion) {
          return BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
             int label;
    
             @Nullable
             public final Object invokeSuspend(@NotNull Object var1) {
                Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(this.label) {
                case 0:
                   ResultKt.throwOnFailure(var1);
                   Thread.sleep(3000L);
                   return "xxx";
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
             }
    
             @NotNull
             public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function2 var3 = new <anonymous constructor>(completion);
                return var3;
             }
    
             public final Object invoke(Object var1, Object var2) {
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
             }
          }), $completion);
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    点进withContext方法,我们可以看到他的返回正是COROUTINE_SUSPENDED

    public suspend fun <T> withContext(
        context: CoroutineContext,
        block: suspend CoroutineScope.() -> T
    ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
       	...
        coroutine.getResult()
    }
    
        fun getResult(): Any? {
            if (trySuspend()) return COROUTINE_SUSPENDED
            // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
            val state = this.state.unboxState()
            if (state is CompletedExceptionally) throw state.cause
            @Suppress("UNCHECKED_CAST")
            return state as T
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    那么既然挂起了,何时恢复呢,我们看到BaseContinuationImpl这个类,看下resumewith的方法

    internal abstract class BaseContinuationImpl(
        // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
        // it has a public getter (since even untrusted code is allowed to inspect its call stack).
        // 这个 completion 可能是父协程的协程体
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        // This implementation is final. This fact is used to unroll resumeWith recursion.
        public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
                // can precisely track what part of suspended callstack was already resumed
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                        	// 调用协程体
                            val outcome = invokeSuspend(param)
                            // 如果挂起直接返回
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // unrolling recursion via loop
                        current = completion
                        param = outcome
                    } else {
                        // top-level completion reached -- invoke and return
                        // 执行恢复逻辑
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    对于getContent方法来讲,没有挂起逻辑,所以执行完invokeSuspend方法后,没有return,当再走到completion.resumeWith方法的时候,getContent的逻辑已经执行完了,接下来就是恢复父协程的操作了、
    也就是第二次调用父协程状态机的invokeSuspend方法,此时label已经为1,就执行打印逻辑了

  • 相关阅读:
    ADS127L11采集板系统噪声评估
    状态保持-JWT
    HBase 2.x ---- 整合 Phoenix
    Tips--lib静态库调用外部函数
    【Linux】Linux的常见指令详解(上)
    智能手机主动安全防护系统设计与实现
    【栈和队列OJ】一、有效的括号
    “牛市陷阱?还是回调?是好?还是坏!“
    【atoi函数的介绍以及模拟实现】
    概括专线接入类型,互联网专线接入方式有哪些?
  • 原文地址:https://blog.csdn.net/one1go/article/details/125471654