• Kotlin 中的协程 flow


    一、Flow概述

    Flow 具有异步挂起 suspend 响应式编程,可以使用挂起函数来异步生产和消费事件,Flow 的设计灵感也来源于响应式流以及其各种实现。

    二、Flow 的生产和消费

    1. suspend fun test1() {
    2. flow<Int> {
    3. (0..4).forEach {
    4. emit(it)//生产者发送数据
    5. }
    6. }.collect {
    7. println(it)
    8. }
    9. }

    flow {} 函数创建了一个冷数据流 Flow ,通过 emit 来发射数据,然后通过 collect 函数来收集这些数据。但是因为 collect 是挂起函数,挂起函数的调用又必须在另一个挂起函数或者协程作用域中。此时就需要我们使用协程来执行。

    1. fun main() {
    2. runBlocking {
    3. test1()
    4. }
    5. }

    三、Flow线程切换:FlowOn

    1. findViewById(R.id.textView).setOnClickListener() {
    2. lifecycleScope.launch {
    3. flow1()
    4. }
    5. }
    6. private suspend fun flow1() {
    7. flow<Int> {
    8. (0..4).forEach {
    9. Log.i("TAG", "flow:${currentCoroutineContext()}")
    10. emit(it)//生产者发送数据
    11. }
    12. }.collect {
    13. Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
    14. }
    15. }
    lifecycleScope.launch 默认是主线程执行的,按照协程的执行原理,我们可以确定上面例子中所有的执行操作都是在主线程上:

    flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
    collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:0
    flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
    collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:1
    flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
    collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:2
    flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
    collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:3
    flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
    collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:4

    当我们调用 flowOn切换线程时

    1. private suspend fun flow1() {
    2. flow<Int> {
    3. (0..2).forEach {
    4. Log.i("TAG", "flow:${currentCoroutineContext()}")
    5. emit(it)//生产者发送数据
    6. }
    7. }.flowOn(Dispatchers.IO)
    8. .collect {
    9. Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
    10. }
    11. }

    可以看到 flow 代码块中的执行已经切换到另外一个线程执行。但是 collect 中的代码依然执行在主线程上。

    输出: 

    1. flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
    2. flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
    3. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
    4. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
    5. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2

    增加map,看看

    1. private suspend fun flow1() {
    2. flow<Int> {
    3. (0..2).forEach {
    4. Log.i("TAG", "flow:${currentCoroutineContext()}")
    5. emit(it)//生产者发送数据
    6. }
    7. }.flowOn(Dispatchers.IO)
    8. .map {
    9. Log.i("TAG", "map:${currentCoroutineContext()}")
    10. it
    11. }
    12. .collect {
    13. Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
    14. }
    15. }

    输出:

    1. flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
    2. flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
    3. map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
    4. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
    5. map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
    6. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
    7. map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
    8. collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2

     

    总结:

    1、flowOn 可以将执行此流的上下文更改为指定的上下文。

    2、flowOn可以进行组合使用。

    3、flowOn只影响前面没有自己上下文的操作符。已经有上下文的操作符不受后面 flowOn影响。

    4、不管 flowOn 如何切换线程, collect 始终是运行在调用它的协程调度器上。

    四、操作符

    1、过度操作符/流程操作符:onStart -> onEach -> onCompletion

    1))onStart: 在上游流启动之前被调用

    2)onEach:在上游流的每个值被下游发出之前调用。

    3)onCompletion:在流程完成或者取消后调用,并将取消异常或失败作为操作的原因参数传递。

    1. private suspend fun flow2(){
    2. flow {
    3. Log.d("TAG","flow")
    4. emit(1)
    5. }.onStart {
    6. Log.d("TAG","onStart")
    7. }.onEach {
    8. Log.d("TAG","onEach")
    9. }.onCompletion {
    10. Log.d("TAG","onCompletion")
    11. }.collect {
    12. Log.d("TAG","collect")
    13. }
    14. }

    输出:

    onStart
    flow
    onEach
    collect
    onCompletion

    2、异常操作符

    1)catch

    1. private suspend fun flow3() {
    2. flow {
    3. Log.d("TAG", "flow")
    4. emit(1)
    5. throw NullPointerException("空指针")
    6. }.onStart {
    7. Log.d("TAG", "onStart")
    8. }.onEach {
    9. Log.d("TAG", "onEach")
    10. }.catch {
    11. Log.e("TAG", "catch $it")
    12. }.onCompletion {
    13. Log.d("TAG", "onCompletion")
    14. }.collect {
    15. Log.d("TAG", "collect")
    16. }
    17. }

    输出:

    onStart
    flow
    onEach
    collect

    onCompletion
    catch java.lang.NullPointerException: 空指针 

    2)retry

     

    1. private fun flow13() {
    2. var index = 0
    3. lifecycleScope.launch {
    4. flow {
    5. if (index < 2) {
    6. index++
    7. Log.e("TAG", "出现错误:$index")
    8. throw RuntimeException("runtime exception index $index")
    9. }
    10. emit(100)
    11. }.retry(2).catch {
    12. Log.e("TAG", "catch: $it")
    13. }.collect {
    14. Log.d("TAG", "collect $it")
    15. }
    16. }
    17. }

    输出

    出现错误:1
    出现错误:2
    collect 100

    3)retryWhen

    1. private fun flow14() {
    2. var index = 0
    3. lifecycleScope.launch {
    4. flow {
    5. if (index < 2) {
    6. index++
    7. Log.e("TAG", "出现错误:$index")
    8. throw RuntimeException("runtime exception index $index")
    9. }
    10. emit(100)
    11. }.retryWhen{ cause, attempt ->
    12. Log.e("TAG","cause is $cause,attempt is $attempt")
    13. cause is RuntimeException
    14. }.catch {
    15. Log.e("TAG", "catch: $it")
    16. }.collect {
    17. Log.d("TAG", "collect $it")
    18. }
    19. }
    20. }

     出现错误:1
    cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
    出现错误:2
    cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
    collect 100

    3、转换操作符

    1)transform

    1. private fun flow4() {
    2. lifecycleScope.launch {
    3. (1..3).asFlow().transform {
    4. emit(it)
    5. emit("transform $it")
    6. }.collect {
    7. println("collect: $it")
    8. }
    9. }
    10. }

    transfrom 操作符任意值任意此,其他转换操作符都是基于 transform 进行扩展。比如:可以在执行长时间运行的异步请求之前,发射一个字符串并跟踪这个响应。

    输出:

     collect: 1
    collect: transform 1
    collect: 2
    collect: transform 2
    collect: 3
    collect: transform 3

    2)map  

    数据转换操作符

    1. private fun flow5() {
    2. lifecycleScope.launch {
    3. flow {
    4. emit(1)
    5. }.map {
    6. Log.d("TAG", "第一次转换")
    7. it * 5
    8. }.map {
    9. Log.d("TAG", "第二次转换")
    10. "map $it"
    11. }.collect {
    12. Log.d("TAG", "最终转换后值:$it")
    13. }
    14. }
    15. }

     输出:

    第一次转换
    第二次转换
    最终转换后值:map 5

    3)fliter

    fliter 操作符主要是对数据进行一个过滤,返回仅包含与给定匹配的原始流的值的流。

    fliter 还有很多同类型操作符,如:filterNot / filterIsInstance / filterNotNull

    1. private fun flow6() {
    2. lifecycleScope.launch {
    3. (1..3).asFlow().filter {
    4. it < 2
    5. }.collect {
    6. println("it:$it")
    7. }
    8. }
    9. }

    输出:

     it:1

    4)zip

    zip 操作符用于组合两个流中的相关值,与 RxJava 中的 zip 功能一样;

    1. private fun flow7() {
    2. val flow1 = (1..3).asFlow()
    3. val flow2 = flowOf("one", "two", "three")
    4. lifecycleScope.launch {
    5. flow2.zip(flow1) { value1, value2 ->
    6. "$value1:$value2"
    7. }.collect {
    8. Log.d("TAG", "collect:$it")
    9. }
    10. }
    11. }

    输出:

    collect:one:1
    collect:two:2
    collect:three:3

    4、限制操作符

    1)take 

    take 操作符返回包含第一个计数元素的流,当发射次数大于等于 count 的值时,通过抛出异常来取消执行。

    1. private fun flow8() {
    2. lifecycleScope.launch {
    3. (1..3).asFlow().take(2)
    4. .collect {
    5. Log.d("TAG", "it:$it")
    6. }
    7. }
    8. }

     输出:

    it:1
    it:2

    2)takeWhile

    takeWhile 操作符与 filter 类似,不过他是当遇到条件判断为 false 的时候,将会中断后续的操作。

    1. private fun flow9() {
    2. lifecycleScope.launch {
    3. flowOf(1, 1, 2, 3, 1, 4).map {
    4. delay(100)
    5. it
    6. }.takeWhile {
    7. it == 1
    8. }.collect {
    9. Log.d("TAG", "it:$it")
    10. }
    11. }
    12. }

    输出:

    it:1
    it:1 

    3)drop

    drop 操作符与 take 相反,它是丢弃掉指定的 count 数量后执行后续的流。

    1. private fun flow10() {
    2. lifecycleScope.launch {
    3. (1..3).asFlow().drop(2)
    4. .collect {
    5. Log.d("TAG", "it:$it")
    6. }
    7. }
    8. }

     输出:

    it:3

    5、末端流操作符

     collect 是最基础的末端操作符,基本上每一个例子当中我们都是使用 collect。

    1)toList

    toList 操作符是将我们的流转换成一个List集合

    1. private fun flow11() {
    2. lifecycleScope.launch {
    3. val list = (1..5).asFlow().toList()
    4. Log.d("TAG", "toList:$list")
    5. }
    6. }

     输出:

    toList:[1, 2, 3, 4, 5]

    6、Flow的缓冲

    1. private fun flow12() {
    2. lifecycleScope.launch {
    3. val time = measureTimeMillis {
    4. (1..3).asFlow().map {
    5. delay(100)
    6. it
    7. }.buffer().collect {
    8. delay(300)
    9. Log.d("TAG", "it:$it")
    10. }
    11. }
    12. Log.d("TAG","collected in $time ms")
    13. }
    14. }

    输出: 

    it:1
    it:2
    it:3
    collected in 1060 ms 

    参考:

    Kotlin协程之Flow使用 - 掘金 

  • 相关阅读:
    Linux检查端口nmap
    微服务:高性能网关 ShenYu简介
    2、ARM处理器概论
    制作一个简单HTML游戏网页(HTML+CSS)仿龙之谷网络游戏官网
    基于Tensorflow、Keras实现Stable Diffusion
    sed实现修改最后一次匹配
    windows 安装配置GO开发环境
    MATLAB算法实战应用案例精讲-【集成算法】集成学习模型Blending(附Python实现代码)
    wpf devexpress添加TreeListControl到项目
    睿趣科技:现在做抖音网店卖啥好
  • 原文地址:https://blog.csdn.net/u010498248/article/details/132870231