• 深入理解Kotlin协程


    本文已同步发表于我的微信公众号,搜索 代码说 即可关注,欢迎与我沟通交流。

    Kotlin协程

    协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快

    我们知道线程是CPU调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin中,Dispatcher(内部是线程池或者Android主线程)是调度器,可以调度协程运行在一个或多个线程之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:
    进程、线程、协程

    • 进程:线程 = 1:N
    • 线程:协程 = 1:N

    协程优势:

    • 更安全的代码kotlin中提供了许多语言功能,避免Java中最常见的null空指针等异常
    • 语法简洁、富有表现力:相比于Javakotlin可以使用更少的代码实现更多的功能。
    • 可互操作:与Java语言无缝互通。即可以在 kotlin 代码中调用Java代码,同时也可以在Java代码中调用kotlin代码。kotlin代码本质上也是通过kotlin编译器编译后生成VM能识别的字节码。
    • 结构化并发使用看似阻塞式的写法来实现异步功能。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。

    非阻塞式挂起

    协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call)return之外,协程添加了 suspend非阻塞式挂起 和 resume恢复:

    • suspend 用于暂停执行当前协程,并保存所有局部变量。
    • resume 用于让已挂起的协程从挂起处恢复执行。

    注意suspend 函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function 的提示;如需调用 suspend 函数,只能从其他 suspend 函数进行调用,或通过使用协程构建器(例如 launch)来启动新的协程。

    suspend 函数相比于普通函数内部多了一个 Continuation 续体实例(Kotlin转成Java代码之后即可看到),suspend函数中可以调用普通函数,但普通函数却不能调用suspend函数

     suspend fun fetchDocs() {                             // Dispatchers.Main
        val result = get("https://developer.android.com") // Dispatchers.IO for `get`
        show(result)                                      // Dispatchers.Main
    }
    
    suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
    

    suspend 修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java 中的Thread.sleep() 会阻塞当前线程,suspend 修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但主线程依然可以去执行协程之外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停(主线程依然是活跃的) -> 子线程执行完毕 -> 协程从挂起点恢复

    如在上面的示例中,get() 在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get 会恢复已挂起的协程,继续执行show(result) 方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起

    Kotlin 使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。

    CPS变换 + Continuation续体 + 状态机

    CPS变换 (Continuation-Passing-Style Transformation) 是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation 来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行

    Continuation意为续体,只存在于挂起函数中。Kotlin 中当一个普通函数加上suspend 关键字之后,就成为了挂起函数,如:

    private suspend fun suspendFuc() {}
    

    我们对此函数进行反编译,查看对应的Java代码为:

    private final Object suspendFuc(Continuation $completion) {
          return Unit.INSTANCE;
    }
    

    可以看到Java代码中系统帮我们多生成了一个Continuation 类型的参数,看下这个参数内部:

    @SinceKotlin("1.3")
    public interface Continuation {
        /**
         * The context of the coroutine that corresponds to this continuation.
         */
        public val context: CoroutineContext
    
        /**
         * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
         * return value of the last suspension point.
         */
        public fun resumeWith(result: Result)
    }
    

    Continuation 是一个接口类型,表示在返回T类型值的挂起点之后的延续,其中类型T代表的是原来函数的返回值类型。resumeWith 恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。

    • 状态机

    上述函数中加入一个delay 函数,delay 函数本身也是一个挂起函数:

        private suspend fun suspendFuc(): String {
            delay(1000)
            return "value"
        }
    

    经过Tools -> Kotlin -> Show Kotlin Bytecode 反编译查看:

       private final Object suspendFuc(Continuation var1) {
          Object $continuation;
          label20: {
             if (var1 instanceof ) {
                $continuation = ()var1;
                if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
                   (()$continuation).label -= Integer.MIN_VALUE;
                   break label20;
                }
             }
              
              //将要执行的Continuation逻辑传入ContinuationImpl中
             $continuation = new ContinuationImpl(var1) {
                // $FF: synthetic field
                Object result;
                int label;
    
                 //invokeSuspend()会在恢复协程挂起点时调用
                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                   this.result = $result;
                   this.label |= Integer.MIN_VALUE;
                   return CoroutineBaseFragment.this.suspendFuc(this);
                }
             };
          }
    
          Object $result = (()$continuation).result;
          //返回此状态意味着函数要被挂起
          Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
          
          // 状态机逻辑,通过label进行分块执行
          switch((()$continuation).label) {
          case 0:
             ResultKt.throwOnFailure($result);
             (()$continuation).label = 1;
             if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
                return var4;
             }
             break;
          case 1:
             ResultKt.throwOnFailure($result);
             break;
          default:
             throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
          }
          return "value";
       }
    

    suspend挂起函数经过Kotlin编译器编译之后会进行CPS变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1,上述函数逻辑中,通过switch(label){} 状态机将逻辑分块执行,首先label = 0,此时会先将label置为1,接着调用了delay挂起函数,返回Intrinsics.COROUTINE_SUSPENDED意味着函数要被挂起;

    当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result) 方法恢复执行,$result是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。

    此时label变为1ResultKt.throwOnFailure($result)中通过 if (value is Result.Failure) throw value.exception判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break,执行到最后的return "value" 逻辑,这样整个方法的流程就运行完毕了。

    总结:suspend挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机来运转的

    CoroutineContext

    我们知道 Android 中的 Context(子类有Application、Activity、Service)可以获取应用资源,可以启动ActivityService等。CoroutineContext意为协程上下文,其作用可以类比 Context,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称等。

    public interface CoroutineContext {
        //从该上下文中返回具有给定[key]的元素 或 null 
        public operator fun  get(key: Key): E?
    
        //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
        public fun  fold(initial: R, operation: (R, Element) -> R): R
    
        //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
        public operator fun plus(context: CoroutineContext): CoroutineContext =
            if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
                context.fold(this) { acc, element ->
                    val removed = acc.minusKey(element.key)
                    if (removed === EmptyCoroutineContext) element else {
                        // make sure interceptor is always last in the context (and thus is fast to get when present)
                        val interceptor = removed[ContinuationInterceptor]
                        if (interceptor == null) CombinedContext(removed, element) else {
                            val left = removed.minusKey(ContinuationInterceptor)
                            if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                                CombinedContext(CombinedContext(left, element), interceptor)
                        }
                    }
                }
    
        //删除带有指定[key]的元素。
        public fun minusKey(key: Key<*>): CoroutineContext
    
        /**
         * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
         */
        public interface Key
    
        //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
        public interface Element : CoroutineContext {
            /**
             * A key of this coroutine context element.
             */
            public val key: Key<*>
    
            public override operator fun  get(key: Key): E? =
                @Suppress("UNCHECKED_CAST")
                if (this.key == key) this as E else null
    
            public override fun  fold(initial: R, operation: (R, Element) -> R): R =
                operation(initial, this)
    
            public override fun minusKey(key: Key<*>): CoroutineContext =
                if (this.key == key) EmptyCoroutineContext else this
        }
    }
    

    协程总是会运行在 CoroutineContext 类型的上下文中,其中plus操作符被重写,多个CoroutineContext相加时,会生成一个CombinedContext( 内部可以理解成一个Element都不同的链表);同时 CombinedContext拥有 map索引能力,集合中的每个元素都有一个唯一的 Key

    继承关系

    协程上下文

    Element 定义在CoroutineContext内部,是它的内部接口。CoroutineContext 使用以下元素集定义协程的行为:

    • Job:控制协程的生命周期。
    • CoroutineDispatcher:将工作分派到适当的线程。
    • CoroutineName:协程的名称,可用于调试。
    • CoroutineExceptionHandler:处理未捕获的异常。

    下面就来详细看看几种元素的使用。

    CoroutineContext几种具体实现
    1、Job & SupervisorJob

    Job是协程的句柄。使用launchasync创建的协程都会返回一个Job实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:

    val job1 = GlobalScope.launch { }
    val job2 = MainScope().launch { }
    
    override fun onDestroy() {
        super.onDestroy()
        job1.cancel()
        job2.cancel()
    }
    

    注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel中使用viewModelScopeLifecycleOwner中使用lifecycleScope,可以在各自的生命周期中自动关闭协程。

    Job常用的API

      val job = GlobalScope.launch(
          context = Job() + Dispatchers.Main,
          start = CoroutineStart.LAZY) { // 逻辑部分
       }
        
      -  job.start()  // 对应 start = CoroutineStart.LAZY
      -  job.cancelAndJoin()  //cancel() + join()
      -  job.cancel() // 取消
      -  job.isActive // 协程是否存活
      -  job.isCancelled // 协程是否被取消
      -  job.isCompleted //协程是否已经执行完毕
    

    Job 有以下状态:

    StateisActiveisCompletedisCancelled
    New (optional initial state)falsefalsefalse
    Active (default initial state)truefalsefalse
    Completing (transient state)truefalsefalse
    Cancelling (transient state)falsefalsetrue
    Cancelled (final state)falsetruetrue
    Completed (final state)falsetruefalse

    状态流转:

                                              wait children
        +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
        | New | -----> | Active | ---------> | Completing  | -------> | Completed |
        +-----+        +--------+            +-------------+          +-----------+
                         |  cancel / fail       |
                         |     +----------------+
                         |     |
                         V     V
                     +------------+                           finish  +-----------+
                     | Cancelling | --------------------------------> | Cancelled |
                     +------------+                                   +-----------+
    

    SupervisorJob的使用

    val exceptionHandler =
                    CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
    
    GlobalScope.launch(exceptionHandler) {
       //子Job1
        launch(SupervisorJob()) {
            delay(200)
            throw NullPointerException()
         }
       //子Job2
        launch {
            delay(300)
            log("child2 execute successfully")
        }
        //子Job3
        launch {
            delay(400)
            log("child3 execute successfully")
        }
        log("parent execute successfully")
    }
    

    注意子Job1传入的是 SupervisorJob,当发生异常时,兄弟协程(job2/job3)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

    2022-08-30 23:31:30.142  E/TTT: parent execute successfully
    2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
    2022-08-30 23:31:30.446  E/TTT: child2 execute successfully
    2022-08-30 23:31:30.545  E/TTT: child3 execute successfully
    

    如果将job1中的更换为默认时,执行结果为:

    2022-08-30 23:31:30.142  E/TTT: parent execute successfully
    2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
    

    可以看到job1中使用默认设置时,当job1发生了异常,兄弟协程也会被取消。注意这里job2/job3可以被取消是因为delay()中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1使用默认配置,job3也不会被取消:

    //子Job3
     launch(Dispatchers.IO) {
         Thread.sleep(400)
         log("child3 execute successfully")
     }
    

    job3中没有了对协程状态的判断,所以即使job3被通知要取消协程了,依然会继续执行直到结束。

    SuperviorJob源码浅析

    public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
    
    public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
    
    private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
        override fun childCancelled(cause: Throwable): Boolean = false
    }
    

    当子协程 job1/job2/job3 启动时,会和协程本身的 Job 形成父子关系。而当子协程抛异常时,父JobchildCancelled() 方法会被执行,且默认返回的是 true,表示取消 父Job 及其所有 子Job;如果 子Job中使用的是 SuperviorJobchildCancelled() 返回的是 false,表示 父Job及其未发生异常的 子Job都不会受影响。

    2、CoroutineDispatcher
    public actual object Dispatchers {
       @JvmStatic
        public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
        
        @JvmStatic
        public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
        
        @JvmStatic
        public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
        
        @JvmStatic
        public val IO: CoroutineDispatcher = DefaultScheduler.IO
    }
    

    将工作任务分派到适当的线程。

    • Dispatchers.Main:运行在UI线程中。Dispatchers.Main.immediate: 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过Handler转发到主线程
    • Dispatchers.IO: 执行IO密集型任务,最多提交max(64, CPU核心数)个任务执行。
    • Dispatchers.DEFAULT :执行CPU密集型任务, CoroutineScheduler最多有corePoolSize个线程被创建,corePoolSize的取值为max(2, CPU核心数),即它会尽量的等于CPU核心数
    • Dispatchers.Unconfined:不给协程指定运行的线程,由启动协程的线程决定;但当被挂起后, 会由恢复协程的线程继续执行。内部通过ThreadLocal保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。

    withContext

    withContext() 可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext() 来确保每个函数都是主线程安全的,这意味着,可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。

    3、CoroutineName

    协程的名称,可用于调试。

    4、CoroutineExceptionHandler
    public interface CoroutineExceptionHandler : CoroutineContext.Element {
      
        public companion object Key : CoroutineContext.Key
    
        //当有未处理的异常发生时,该方法就会执行
        public fun handleException(context: CoroutineContext, exception: Throwable)
    }
    

    CoroutineExceptionHandler当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash,可以通过自定义CoroutineExceptionHandler来拦截异常,如下:

    val exceptionHandler =
                    CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
    lifecycleScope.launch(exceptionHandler) { // do something }              
    

    自定义CoroutineExceptionHandler调用的是如下方法:

    public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
        object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
            override fun handleException(context: CoroutineContext, exception: Throwable) =
                handler.invoke(context, exception)
        }
    

    所以当发生异常时,会在handleException()中执行我们传入的handler(CoroutineContext, Throwable)方法。

    CoroutineExceptionHandler使用示例

     val exceptionHandler =
                    CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
     lifecycleScope.launch(exceptionHandler) {
         log("parent execute start")
         val childExHandler =
                        CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
         //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
         val childJob = launch(childExHandler) {
             delay(1000)
             log("child execute")
             throw  IllegalArgumentException("error occur")
         }
         childJob.join()
         log("parent execute end")
     }
    

    执行结果:

    2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
    2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
    2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur
    

    可见虽然在子协程中也自定义了CoroutineExceptionHandler,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job(),则异常会在子协程中处理而不会往上传播:

     //其他内容不变
     val childJob = launch(childExHandler + SupervisorJob()) {
         //其他内容不变
     }
    

    执行结果:

    2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
    2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
    2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
    2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end
    

    可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。

    :如果启动协程的是async,则CoroutineExceptionHandler中的handler方法并不会马上执行,必须调用deffered.await()时才会执行。

    CoroutineScope 协程作用域

    CoroutineScope 会跟踪它使用 launchasync 创建的所有协程。CoroutineScope 本身并不运行协程,但是通过CoroutineScope可以保证对协程进行统一管理,避免发生内存泄漏等,

    常见的CoroutineScope

    • GlobalScope: 全局协程作用域,如果不主动通过job.cancel()关闭,其生命周期与Application一致。
    • MainScope: MainScope的内部:public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main),可以看到MainScope其实是通过SupervisorJob() + Dispatchers.Main自定义的协程作用域,其内部运行在UI线程且内部异常不会往上传播。
    • runBlocking: 顶层函数,和 GlobalScope 不一样,它会阻塞当前线程直到其内部所有子协程执行结束;
    • ktx扩展库中的自定义GlobalScope:典型应用就是在LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)使用的lifecycleScope,以及ViewModel中使用的ViewModelScope,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。

    参考

    【1】官网:利用 Kotlin 协程提升应用性能
    【2】官网:Android 上的 Kotlin 协程
    【3】Kotlin挂起函数原理
    【4】揭秘Kotlin协程中的CoroutineContext

  • 相关阅读:
    Flink DataStream 体系
    Python3 面向对象
    算法竞赛备赛进阶之状态机模型训练
    HLS学习1:点灯
    2022年8月15日陌陌推荐算法工程师面试题5道|含解
    面向对象编程之断言assert
    plc简单问题求看看对不对
    推荐一个Java学习路线图
    牛客网刷题(三)
    JavaScript中的模块化编程,包括CommonJS和ES6模块的区别。
  • 原文地址:https://blog.csdn.net/u013700502/article/details/125664262