• Kotlin协程Flow浅析


    Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大。

    Flow基本概念

    Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理“了一下,又流淌到了下游。

    示例代码如下

    flow {         // 发送方、上游
        emit(1)    // 挂起函数,发送数据
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }
    .filter { it > 2 }  // 中转站,处理数据
    .map { it * 2 }
    .take(2)
    .collect{           // 接收方,下游
        println(it)
    }
    输出内容:
    6
    8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    通过上面代码我们可以看到,基于一种链式调用api的方式,流式的进行处理数据还是很棒的,接下来具体看一下上面的组成:

    • flow{},是个高阶函数,主要用于创建一个新的Flow。在其Lambda函数内部使用了emit()挂起函数进行发送数据。
    • filter{}、map{}、take{},属于中间处理层,也是中间数据处理的操作符,Flow最大的优势,就是它的操作符跟集合操作符高度一致。只要会用List、Sequence,那么就可以快速上手 Flow 的操作符。
    • collect{},下游接收方,也成为终止操作符,它的作用其实只有一个:终止Flow数据流,并且接收这些数据。

    其他创建Flow的方式还是flowOf()函数,示例代码如下

    fun main() = runBlocking{aassssssssaaaaaaaas
        flowOf(1,2,3,4,5).filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .collect{
                println("flowof: $it")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们在看一下list集合的操作示例

    listOf(1,2,3,4,5).filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .forEach{
                println("listof: $it")
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    通过以上对比发现,两者的基本操作几乎一致,Kotlin也提供了两者相互转换的API,Flow.toList()、List.asFlow()这两个扩展函数,让数据在 List、Flow 之间来回转换,示例代码如下:

    //flow 转list
        flowOf(1,2,3)
            .toList()
            .filter { it > 1 }
            .map { it * 2 }
            .take(2)
            .forEach{
                println(it)
            }
        // list 转 flow
        listOf(1,2,3).asFlow()
            .filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .collect{
                println(it)
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Flow生命周期

    虽然从上面操作看和集合类型,但是Flow还是有些特殊操作符的,毕竟它是协程的一部分,和Channel不同,Flow是有生命周期的,只是以操作符的形式回调而已,比如onStart、onCompletion这两个中间操作符。

    flowOf(1,2,3,4,5,6)
            .filter {
                println("filter: $it")
                it > 3
            }
            .map {
                println("map: $it")
                it * 2
            }
            .take(2)
            .onStart { println("onStart") }
            .collect{
                println("collect: $it")
            }
    输出内容:
    onStart
    filter: 1
    filter: 2
    filter: 3
    filter: 4
    map: 4
    collect: 8
    filter: 5
    map: 5
    collect: 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    我们可以看到onStart,它的作用是注册一个监听事件:当 flow 启动以后,它就会被回调。

    和filter、map、take这些中间操作符不同,他们的顺序会影响数据的处理结果,这也很好理解;onStart和位置没有关系,它本质上是一个回调,不是一个数据处理的中间站。同样的还有数据处理完成的回调onCompletion。

    flowOf(1,2,3,4,5,6)
            .filter {
                println("filter: $it")
                it > 3
            }
            .map {
                println("map: $it")
                it * 2
            }
            .take(2)
            .onStart { println("onStart") }
            .onCompletion { println("onCompletion") }
            .collect{
                println("collect: $it")
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Flow中onCompletion{} 在面对以下三种情况时都会进行回调:

    • 1,Flow 正常执行完毕
    • 2,Flow 当中出现异常
    • 3,Flow 被取消。

    处理异常

    在数据流的处理过程中,很难保证不出现问题,那么出现异常之后再该怎么处理呢?

    • 对于发生在上游、中间操作这两个阶段的异常,我们可以直接使用 catch 这个操作符来进行捕获和进一步处理。
    • 对于发生在下游,使用try-catch,把collect{}当中可能出现问题的代码包裹起来进行捕获处理。
    上游或者中间异常使用catch
    fun main() = runBlocking{
        val flow = flow {
            emit(1)
            emit(2)
            throw IllegalStateException()
            emit(3)
        }
    
        flow.map { it * 2 }
            .catch { println("catch: $it") }
            .collect{
                println("collect: $it")
            }
    }
    输出:
    collect: 2
    collect: 4
    catch: java.lang.IllegalStateException
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    catch 这个操作符的作用是和它的位置强相关的,catch 的作用域,仅限于catch的上游。换句话说,发生在 catch 上游的异常,才会被捕获,发生在 catch 下游的异常,则不会被捕获。

    val flow = flow {
            emit(1)
            emit(2)
            throw IllegalStateException()
            emit(3)
        }
    
        flow.map { it * 2 }
            .catch { println("catch: $it") }
            .filter { it / 0 > 1 } // catch之后发生异常
            .collect{
                println("collect: $it")
        }
    输出内容:
    Exception in thread "main" java.lang.ArithmeticException: / by zero
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    下游使用try-catch
    flowOf(1,2,3)
            .onCompletion { println("onCompletion $it") }
            .collect{
                try {
                    println("collect: $it")
                    throw IllegalStateException();
                }catch (e: Exception){
                    println("catch $e")
                }
            }
    输出:
    collect: 1
    catch java.lang.IllegalStateException
    collect: 2
    catch java.lang.IllegalStateException
    collect: 3
    catch java.lang.IllegalStateException
    onCompletion null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    切换执行线程

    Flow适合处理复杂的异步任务,大多数情况下耗时任务放在子线程或线程池中处理,对于UI任务放在主线程中进行。

    在Flow中可以使用flowOn操作符实现上述场景中的线程切换。

    flowOf(1,2,3,4,5)
            .filter {
                logX("filter: $it")
                it > 2 }
            .flowOn(Dispatchers.IO) // 切换线程
            .collect{
                logX("collect: $it")
            }
    输出内容:
    ================================
    filter: 1
    Thread:DefaultDispatcher-worker-1
    ================================
    ================================
    filter: 2
    Thread:DefaultDispatcher-worker-1
    ================================
    ================================
    filter: 3
    Thread:DefaultDispatcher-worker-1
    ================================
    ================================
    filter: 4
    Thread:DefaultDispatcher-worker-1
    ================================
    ================================
    filter: 5
    Thread:DefaultDispatcher-worker-1
    ================================
    ================================
    collect: 3
    Thread:main
    ================================
    ================================
    collect: 4
    Thread:main
    ================================
    ================================
    collect: 5
    Thread:main
    ================================
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    flowOn 操作符也是和它的位置强相关的。作用域限于它的上游。在上面的代码中,flowOn 的上游,就是 flowOf{}、filter{} 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有collect{}当中的代码是运行在 main 线程当中的。

    终止操作符

    Flow 里面,最常见的终止操作符就是collect。除此之外,还有一些从集合中借鉴过来的操作符,也是Flow的终止操作符。比如 first()、single()、fold{}、reduce{},本质上来说说当我们尝试将 Flow 转换成集合的时候,已经不属于Flow的API,也不属于协程的范畴了,它本身也就意味着 Flow 数据流的终止。

    "冷的数据流"从何而来

    在上面文章《Kotlin协程Channel浅析》中,我们认识到Channel是”热数据流“,随时准备好,随用随取,就像海底捞里的服务员。

    现在我们看下Flow和Channel的区别

    val flow = flow {
            (1..4).forEach{
                println("Flow发送前:$it")
                emit(it)
                println("Flow发送后: $it")
            }
        }
    
        val channel: ReceiveChannel = produce {
            (1..4).forEach{
                println("Channel发送前: $it")
                send(it)
                println("Channel发送后: $it")
            }
        }
        
    输出内容:
    Channel发送前: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Flow中的逻辑并未执行,因此我们可以这样类比,Channel之所以被认为是“热”的原因,是因为不管有没有接收方,发送方都会工作。那么对应的,Flow被认为是“冷”的原因,就是因为只有调用终止操作符之后,Flow才会开始工作。

    除此之外,Flow一次处理一条数据,是个”懒家伙“。

        val flow = flow {
            (3..6).forEach {
                println("Flow发送前:$it")
                emit(it)
                println("Flow发送后: $it")
            }
        }.filter {
            println("filter: $it")
            it > 3
        }.map {
            println("map: $it")
            it * 2
        }.collect {
            println("结果collect: $it")
        }
    输出内容:
    Flow发送前:3
    filter: 3
    Flow发送后: 3
    Flow发送前:4
    filter: 4
    map: 4
    结果collect: 8
    Flow发送后: 4
    Flow发送前:5
    filter: 5
    map: 5
    结果collect: 10
    Flow发送后: 5
    Flow发送前:6
    filter: 6
    map: 6
    结果collect: 12
    Flow发送后: 6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    相比于满面春风,热情服务的Channel,Flow更像个冷漠的家伙,你不找他,他不搭理你。

    • Channel,响应速度快,但数据可能是旧的,占用资源
    • Flow,响应速度慢,但数据是最新的,节省资源

    Flow也可以是”热“的,你知道吗?

  • 相关阅读:
    发展前景好、薪资高,计算机行业成为许多人改变命运的首选!
    Iceberg 数据治理及查询加速实践
    LCP 51.烹饪料理
    【脑与认知科学】【n-back游戏】
    Linux服务器搭建超简易跳板机连接阿里云服务器
    微信小程序自定义方法submitPwd(e){}传入的e有什么作用
    前端食堂技术周刊第 46 期:Chrome 三方 cookie 计划、npm 引入更多安全增强功能、Awesome Bun
    VUE2安装初始化步骤(2022)
    【LeetCode】矩阵模拟相关题目汇总
    Cell 重磅丨不依赖泛素蛋白酶降解途径的新型 PROTAC - MCE
  • 原文地址:https://blog.csdn.net/wayne214/article/details/128061135