• Kotlin协程:Flow基础原理


        本文分析示例代码如下:

    launch(Dispatchers.Main) {
        flow {
            emit(1)
            emit(2)
        }.collect {
            delay(1000)
    
            withContext(Dispatchers.IO) {
                Log.d("liduo", "$it")
            }
    
            Log.d("liduo", "$it")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    一.Flow的创建

        在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    
    • 1

        FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

    public interface FlowCollector<in T> {
        // 可挂起,非线程安全
        public suspend fun emit(value: T)
    }
    
    • 1
    • 2
    • 3
    • 4

        调用flow方法,会返回一个Flow接口指向的对象,代码如下:

    public interface Flow<out T> {
       
        @InternalCoroutinesApi
        public suspend fun collect(collector: FlowCollector<T>)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

        这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

    二.Flow的消费

        在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    
    • 1
    • 2
    • 3
    • 4

        这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

    1.SafeFlow类

        根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

        SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

    2.AbstractFlow类

        AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

    @FlowPreview
    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    
         // 核心方法 
        @InternalCoroutinesApi
        public final override suspend fun collect(collector: FlowCollector<T>) {
            // 创建SafeCollector对象,对collector进行包裹
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
                // 调用collectSafely方法
                collectSafely(safeCollector)
            } finally {
                // 释放拦截的续体
                safeCollector.releaseIntercepted()
            }
        }
        
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

        collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

    3. SafeCollector类

        当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

        SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

    internal actual class SafeCollector<T> actual constructor(
        @JvmField internal actual val collector: FlowCollector<T>,
        @JvmField internal actual val collectContext: CoroutineContext
    ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
        
        ...
        // 保存上下文中元素数量,用于检查上下文是否变化
        @JvmField
        internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
        // 保存上一次的上下文
        private var lastEmissionContext: CoroutineContext? = null
        // 执行结束后的续体
        private var completion: Continuation<Unit>? = null
    
        // 协程上下文
        override val context: CoroutineContext
            get() = completion?.context ?: EmptyCoroutineContext
    
        // 挂起的核心方法
        override fun invokeSuspend(result: Result<Any?>): Any? {
            result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
            completion?.resumeWith(result as Result<Unit>)
            return COROUTINE_SUSPENDED
        }
    
        // 释放拦截的续体
        public actual override fun releaseIntercepted() {
            super.releaseIntercepted()
        }
    
        // 发射数据
        override suspend fun emit(value: T) {
            // 获取当前suspend方法续体
            return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
                try {
                    // 调用重载的方法
                    emit(uCont, value)
                } catch (e: Throwable) {
                     // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                    lastEmissionContext = DownstreamExceptionElement(e)
                    // 抛出异常
                    throw e
                }
            }
        }
    
        // 重载的emit方法
        private fun emit(uCont: Continuation<Unit>, value: T): Any? {
            // 从续体中获取上下文
            val currentContext = uCont.context
            // 保证当前协程的Job是active的
            currentContext.ensureActive()
            // 获取上次的上下文
            val previousContext = lastEmissionContext
            // 如果前后上下文发生变化
            if (previousContext !== currentContext) {
                // 检查上下文是否发生异常
                checkContext(currentContext, previousContext, value)
            }
            // 保存续体
            completion = uCont
            // 调用emitFun方法,传入collector,value,continuation
            return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        }
    
        // 检查上下文变化,防止并发
        private fun checkContext(
            currentContext: CoroutineContext,
            previousContext: CoroutineContext?,
            value: T
        ) {
            // 如果上次执行过程中发生了异常
            if (previousContext is DownstreamExceptionElement) {
                // 抛出异常
                exceptionTransparencyViolated(previousContext, value)
            }
            // 检查上下文是否发生变化,如果变化,则抛出异常
            checkContext(currentContext)
            lastEmissionContext = currentContext
        }
        
        // 用于抛出异常
        private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
            error("""
                Flow exception transparency is violated:
                    Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                    For a more detailed explanation, please refer to Flow documentation.
                """.trimIndent())
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

        emit方法最终会调用emitFun方法方法,代码如下:

    private val emitFun =
        FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
    
    • 1
    • 2

        emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

    @Nullable
    public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
       InlineMarker.mark(0);
       // 核心执行
       Object var10000 = p1.emit(p2, continuation);
       InlineMarker.mark(2);
       InlineMarker.mark(1);
       return var10000;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

        可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

        而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    
    • 1
    • 2
    • 3
    • 4

        调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

        通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

    4.消费过程中的挂起

        如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

        当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

    emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    
    • 1

        恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:
    在这里插入图片描述
        在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

    override fun invokeSuspend(result: Result<Any?>): Any? {
        // 尝试获取异常
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        // 如果没有异常,则恢复flow方法续体的执行
        completion?.resumeWith(result as Result<Unit>)
        // 返回挂起标识,这里挂起的是消费过程
        return COROUTINE_SUSPENDED
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

        在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:
    在这里插入图片描述

  • 相关阅读:
    你找到“活着”的意义了吗?
    springboot+nodejs+vue工程师售后服务评价管理系统
    服务器为什么会丢包
    Hello
    RocketMQ系列(一) 基本介绍
    Linux系统中Makefile的基本实现
    章鱼网络在 NEARCON23 发布 Octopus 2.0
    期末网页设计作业素材 (民宿 5页 带地图)
    ffmpeg知识点整理
    独立站如何做好社媒营销
  • 原文地址:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126611815