• Koltin协程:Flow的触发与消费


        本文分析示例代码如下:

    launch(Dispatchers.Main) {
        val task = flow {
            emit(2)
            emit(3)
        }.onEach {
            Log.d("liduo", "$it")
        }
    
        task.collect()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    一.Flow的触发与消费

        在Kotlin协程:Flow基础原理的分析中,流的触发与消费都是同时进行的。每当调用collect方法时,会触发流的执行,并同时在collect方法中对流发出的值进行消费。

        而在协程中,其实还提供了分离流的触发与消费的操作——onEach方法。通过使用onEach方法,可以将原本在collect方法中的消费过程的移动到onEach方法中。这样在构建好一个Flow对象后,不会立刻去执行onEach方法,只有当调用collect方法时,才会真正的去触发流的执行。这样就实现了流的触发与消费的分离。

        接下来,将对onEach方法进行分析。

    1.onEach方法

        onEach方法用于预先构建流的消费过程,只有在触发流的执行后,才会对流进行消费,代码如下:

    public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
        action(value)
        return@transform emit(value)
    }
    
    • 1
    • 2
    • 3
    • 4

        onEach方法是一个Flow接口的扩展方法,返回一个类型为Flow的对象。Flow方法内部通过transform方法实现。

    2.transform方法

        transform方法是onEach方法的核心实现,代码如下:

    public inline fun <T, R> Flow<T>.transform(
        @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
    ): Flow<R> = flow { // 创建Flow对象
        collect { value -> // 触发collect
            return@collect transform(value)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        transform方法也是Flow接口的扩展方法,同样会返回一个类型为Flow的对象。并且在transform方法内部,首先构建了一个类型为Flow的对象,并且在这个Flow对象的执行体内,调用上游的流的collect来触发消费过程,并通过调用参数transform来实现消费。这个collect方法是一个扩展方法,在Kotlin协程:Flow基础原理分析过,因此不再赘述。

        这就是onEach方法实现触发与消费分离的核心,它将对上游的流的消费过程包裹在了一个新的流内,只有当这个新的流或其下游的流被触发时,才会触发这个新的流自身的执行,从而实现对上游的流的消费。

        接下来分析一下流的消费过程。

    3.collect方法

        collect方法用于触发流的消费,我们这里调用的collect方法,是一个无参数的方法,代码如下:

    public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
    
    • 1

        这里的无参数collect方法是Flow接口的扩展方法。在无参数collect方法中,调用了另一个有参数的collect方法,这个有参数的collect方法在Kotlin协程:Flow基础原理中提到过,就是Flow接口中定义的方法,并且传入了NopCollecor对象,代码如下:

    internal object NopCollector : FlowCollector<Any?> {
        override suspend fun emit(value: Any?) {
            // 什么都不做
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

        NopCollecor是一个单例类,它实现了FlowCollector接口,但是emit方法为空实现。

        因此,这里会调用onEach方法返回的Flow对象的collect方法,这部分在Kotlin协程:Flow基础原理进行过分析,最后会触发flow方法中的block参数的执行。而这个Flow对象就是transform方法返回的Flow对象。代码如下:

    public inline fun <T, R> Flow<T>.transform(
        @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
    ): Flow<R> = flow { // 创建Flow对象
        collect { value -> // 触发collect
            return@collect transform(value)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

        通过上面的transform方法可以知道,在触发flow方法中的block参数执行后,会调用collect方法。上面提到transform方法是Flow接口的扩展方法,因此这里有会继续调用上游Flow对象的collect方法。这个过程与刚才分析的类似,这里调用的上游的Flow对象,就是我们在示例代码中通过flow方法构建的Flow对象。

        此时,会触发上游flow方法中block参数的执行,并在执行过程中,通过emit方法将值发送到下游。

        接下来,在transform方法中,collect方法的block参数会被会被回调执行,处理上游发送的值。这里又会继续调用transform方法中参数的执行,这部分逻辑在onEach方法中,代码如下:

    public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
        action(value)
        return@transform emit(value)
    }
    
    • 1
    • 2
    • 3
    • 4

        这里会调用参数action的执行,流在这里最终被消费。同时,onEach方法会继续调用emit方法,将上游返回的值再原封不动的传递到下游,交由下游的流处理。

    二.多消费过程的执行

        首先看下面这段代码:

    launch(Dispatchers.Main) {
        val task = flow {
            emit(2)
            emit(3)
        }.onEach {
            Log.d("liduo1", "$it")
        }.onEach {
            Log.d("liduo2", "$it")
        }
    
        task.collect()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

        根据上面的分析,两个onEach方法会按顺序依次执行,打印出liduo1:2、liduo2:2、liduo1:3、liduo2:3。就是因为onEach方法会将上游的值继续向下游发送。

        同样的,还有下面这段代码:

    launch(Dispatchers.Main) {
        val task = flow {
            emit(2)
            emit(3)
        }.onEach {
            Log.d("liduo1", "$it")
        }
    
        task.collect {
            Log.d("liduo2", "$it")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

        这段代码也会打印出liduo1:2、liduo2:2、liduo1:3、liduo2:3。虽然使用了onEach方法,但也可以调用有参数的collect方法来对上游发送的数据进行最终的处理。

    三.总结

    在这里插入图片描述
        粉线为代码编写顺序,绿线为下游触发上游的调用顺序,红线为上游向下游发送值的调用顺序,蓝线为onEach方法实现的核心。

  • 相关阅读:
    Int数据取byte,Byte数据取bit
    外包干了3年,跳槽后转自动化测试工资是原来的2倍,秘诀原来是......
    P1747 好奇怪的游戏
    使用jmeter进行接口测试
    自媒体助手篇
    【线上问题】Jedis Could not get a resource from the pool
    Rocksdb LSM Tree Compaction策略
    既然有了malloc/free,C++中为什么还需 要new/delete呢?
    python面相对象基础语法
    电脑经常弹出“不支持的硬件”解决办法
  • 原文地址:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126689856