• Kotlin的协程:flow


    flow 介绍

    之前介绍的启动协程方法,比如 launch、async 都是协程的单次启动。如果有复杂场景,比如发送多个数据,就需要使用 flow 数据流。在 flow 中,数据如水流一样经过上游发送,中间站处理,下游接收。

    创建 flow

    创建 flow 有 3 种方式:

    1. flow{}
    2. flowOf()
    3. asFlow()

    flow

    flow{} 中使用 emit 发送数据。

    fun flowEmit() = runBlocking {
        flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }
            .filter {
                it > 2
            }
            .map {
                it * 2
            }
            .take(2)
            .collect {
                // 6 8
                println(it)
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    flowOf

    flowOf() 可以将指定的一串数据转换为 flow,接收可变参数。

    fun flowOfFun() = runBlocking {
        flowOf(1, 2, 3, 4, 5)
            .filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .collect {
                // 6 8
                println(it)
            }
    
        listOf(1, 2, 3, 4, 5)
            .filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .forEach {
                // 6 8
                println(it)
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    asFlow

    asFlow() 可以将 List 集合转换为 flow。toList() 可以将 flow 转换为 List 集合。

    fun flow2list() = runBlocking {
        flowOf(1, 2, 3, 4, 5)
            // flow to list
            .toList()
            .filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .forEach {
                println(it)
            }
    
        listOf(1, 2, 3, 4, 5)
            // list as flow
            .asFlow()
            .filter { it > 2 }
            .map { it * 2 }
            .take(2)
            .collect {
                println(it)
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    中间操作符

    创建 flow 之后使用中间操作符处理 flow 的每一个数据。flow 的中间操作符和 list 集合的操作符非常类似。
    常用中间操作符:

    1. filter
    2. map
    3. take

    filter

    filter 传入判断条件,条件满足时过滤数据,否则不将数据流向下游。

    map

    map 传入映射函数,将每个数据传入映射函数,得到结果继续传入下游。

    take

    take 传入非负整数 n,取前 n 个数据传入下游。

    fun flowEmit() = runBlocking {
        flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }
            .filter {
                it > 2
            }
            .map {
                it * 2
            }
            .take(2)
            .collect {
                // 6 8
                println(it)
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    终止操作符

    collect 是 flow 的终止操作符,收集每一个数据经过中间操作符后的最终结果,表示 flow 流的终止,后面不能再调用中间操作符。

    除了 collect,还有一些其他的终止操作符,first、single、fold、reduce。

    collect

    返回所有元素,结束 flow。

    fun flowEmit() = runBlocking {
        flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }
            .filter {
                it > 2
            }
            .map {
                it * 2
            }
            .take(2)
            .collect {
                // 6 8
                println(it)
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    first

    返回第一个元素,结束 flow。

    fun flowFirst() = runBlocking {
        val first = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }
            .filter {
                it > 2
            }
            .map {
                it * 2
            }
            .take(2)
            .first()
        // 6
        println(first)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    single

    返回唯一元素,结束flow。不能多于一个,也不能一个没有。

    fun flowSingle() = runBlocking {
        val single = flow {
            emit(3)
        }
            .filter {
                it > 2
            }
            .map {
                it * 2
            }
            .take(2)
            .single()
        // 6
        println(single)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    fold

    折叠所有元素。指定一个函数和初始值,对每一个元素反复执行函数,返回最后的结果。

    fun flowFold() = runBlocking {
        val fold = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }.filter {
            it > 2
        }.map {
            it * 2
        }.take(2)
            .fold(0) { acc, value ->
                acc + value
            }
    
        // 14
        println(fold)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    reduce

    reduce 和 fold 很类似,reduce 没有初始值。

    fun flowReduce() = runBlocking {
        val reduce = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }.filter {
            it > 2
        }.map {
            it * 2
        }.take(2)
            .reduce { acc, value ->
                acc + value
            }
        // 14
        println(reduce)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    first、single、fold、reduce 本质都是封装了 collect ,因此它们都是终止操作符。

    生命周期

    onStart

    onStart 是 flow 的开始生命周期回调。onStart 的执行时机和它在 flow 位置无关。

    对比下面两个方法,onStart 都会在第一时间回调。

    fun onStartFun() = runBlocking {
        flowOf(1, 2, 3, 4, 5)
            .filter {
                println("filter: $it")
                it > 2
            }
            .map {
                println("map: $it")
                it * 2
            }
            .take(2)
            .onStart {
                println("onStart")
            }
            .collect {
                println("collect: $it")
            }
    }
    
    fun onStartFun2() = runBlocking {
        flowOf(1, 2, 3, 4, 5)
            .take(2)
            .filter {
                println("filter: $it")
                it > 2
            }
            .map {
                println("map: $it")
                it * 2
            }
            .onStart {
                println("onStart")
            }
            .collect {
                println("collect: $it")
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    onComplete

    flow 执行完后回调 onComplete。onComplete 的执行时机和它在 flow 的位置无关。

    flow 正常执行完回调 onComplete。

    fun onCompleteFun() = runBlocking {
        flowOf(1, 2, 3, 4, 5)
            .onCompletion {
                println("onCompletion")
            }
            .filter {
                println("filter: $it")
                it > 2
            }
            .take(2)
            .collect {
                println("collect: $it")
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    flow 执行被取消,或者 flow 执行中出现异常。

    fun cancelOnCompleteFun() = runBlocking {
        launch {
            flow {
                emit(1)
                emit(2)
                emit(3)
            }
                // collect: 1
                //collect: 2
                //cancel
                //onCompletion first: kotlinx.coroutines.JobCancellationException
                .onCompletion {
                    println("onCompletion first: $it")
                }
                .collect {
                    println("collect: $it")
                    if (it == 2) {
                        // cancel flow
                        cancel()
                        println("cancel")
                    }
                }
        }
    
        delay(1000)
    
        flowOf(4, 5, 6)
    //    collect: 4
    //    onCompletion second: java.lang.IllegalStateException
            .onCompletion {
                println("onCompletion second: $it")
            }
            .collect {
                println("collect: $it")
                throw IllegalStateException()
            }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    异常处理

    flow 的异常处理可以分为上游异常和下游异常。上游异常指创建 flow 或者中间操作符发生的异常。下游异常指终止操作符 collect 发生的异常。

    上游异常

    上游异常可以用 catch 函数捕获异常。catch 函数和它的位置相关,只能捕获 catch 上游的异常。

    fun flowCatch() = runBlocking {
        val flow = flow {
            emit(1)
            emit(2)
            throw IllegalStateException()
            emit(3)
        }
        flow.map {
            it * 2
        }.catch {
            println("catch: $it")
        }.collect {
            println(it)
        }
        //    2
        //    4
        //    catch: java.lang.IllegalStateException
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    下游异常

    下游异常不能用 catch 函数,需要在 collect 的作用域用 try-catch 捕获。

    catch 函数无法捕获下游的 filter 除 0 异常。

    fun flowCatchDownStream() = runBlocking {
        val flow = flow {
            emit(1)
            emit(2)
            emit(3)
        }
    
        flow.map {
            it * 2
        }.catch {
            println("catch: $it")
        }.filter {
            it / 0 > 1
        }.collect {
            println(it)
        }
        // Exception in thread "main" java.lang.ArithmeticException: / by zero
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    使用 try-catch 捕获 collect 的异常。

    fun flowTryCatch() = runBlocking {
        flowOf(4, 5, 6)
            .onCompletion {
                println("onCompletion second: $it")
            }
            .collect {
                try {
                    println("collect: $it")
                    throw IllegalStateException()
                } catch (e: Exception) {
                    println("catch $e")
                }
            }
    //    collect: 4
    //    catch java.lang.IllegalStateException
    //            collect: 5
    //    catch java.lang.IllegalStateException
    //            collect: 6
    //    catch java.lang.IllegalStateException
    //            onCompletion second: null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    线程切换

    flowOn

    flowOn 可以指定上游所有操作符运行的线程,和它的位置相关。

    collect 运行在 main 线程,上游运行在 IO 线程,指定 DefaultDispatcher。

    fun flowOn() = runBlocking {
        val flow = flow {
            logX("Start")
            emit(1)
            logX("Emit: 1")
            emit(2)
            logX("Emit: 2")
            emit(3)
            logX("Emit: 3")
        }
    
        flow.filter {
            logX("Filter: $it")
            it > 2
        }
            .flowOn(Dispatchers.IO)
            .collect {
                logX("Collect: $it")
            }
    //    ================================
    //    Start
    //    Thread:DefaultDispatcher-worker-1, time:1666096501866
    //    ================================
    //    ================================
    //    Filter: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Emit: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Filter: 2
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Emit: 2
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Filter: 3
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Emit: 3
    //    Thread:DefaultDispatcher-worker-1, time:1666096501917
    //    ================================
    //    ================================
    //    Collect: 3
    //    Thread:main, time:1666096501917
    //    ================================
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    flowOn 在 filter 之前,emit 执行在 IO 线程,filter 和 collect 执行在 main 线程。

    fun flowOnIO() = runBlocking {
        val flow = flow {
            logX("Start")
            emit(1)
            logX("Emit: 1")
        }
    
        flow.flowOn(Dispatchers.IO)
            .filter {
                logX("Filter: $it")
                it > 0
            }
            .collect {
                logX("Collect: $it")
            }
    
    //    ================================
    //    Start
    //    Thread:DefaultDispatcher-worker-1, time:1666165816908
    //    ================================
    //    ================================
    //    Emit: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666165816942
    //    ================================
    //    ================================
    //    Filter: 1
    //    Thread:main, time:1666165816944
    //    ================================
    //    ================================
    //    Collect: 1
    //    Thread:main, time:1666165816944
    //    ================================
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    因为 flowOn 只能用于上游,在 collect 可以用 withContext 切换线程,但不建议这么用。

    collect 运行在 DefaultDispatcher,其他运行在 main 线程。

    fun flowWithContext() = runBlocking {
        val flow = flow {
            logX("Start")
            emit(1)
            logX("Emit: 1")
        }
    
        flow.filter {
            logX("Filter: $it")
            it > 0
        }
            .collect {
                // 不建议
                withContext(Dispatchers.IO) {
                    logX("Collect: $it")
                }
            }
    
    //    ================================
    //    Start
    //    Thread:main, time:1666167319244
    //    ================================
    //    ================================
    //    Filter: 1
    //    Thread:main, time:1666167319297
    //    ================================
    //    ================================
    //    Collect: 1
    //    Thread:DefaultDispatcher-worker-2, time:1666167319311
    //    ================================
    //    ================================
    //    Emit: 1
    //    Thread:main, time:1666167319312
    //    ================================
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    flow 的 emit、filter、collect 都运行在 DefaultDispatcher。

    fun flowWithContextAll() = runBlocking {
        val flow = flow {
            logX("Start")
            emit(1)
            logX("Emit: 1")
        }
    
        // 不建议
        withContext(Dispatchers.IO) {
            flow.filter {
                logX("Filter: $it")
                it > 0
            }.collect {
                logX("Collect: $it")
            }
        }
    //    ================================
    //    Start
    //    Thread:DefaultDispatcher-worker-1, time:1666167769589
    //    ================================
    //    ================================
    //    Filter: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666167769645
    //    ================================
    //    ================================
    //    Collect: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666167769645
    //    ================================
    //    ================================
    //    Emit: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666167769645
    //    ================================
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    launchIn

    flow 提供了 launchIn 函数指定在哪个线程执行。launchIn 运行在指定的 CoroutineScope。

    flowOn 之前的运行在 Dispatchers.IO,下游运行在 launchIn 指定的 scope。

    fun flowLaunchIn() = runBlocking {
        val flow = flow {
            logX("Start")
            emit(1)
            logX("Emit: 1")
        }
    
        val scope = CoroutineScope(mySingleDispatcher)
    
        flow.flowOn(Dispatchers.IO)
            .filter {
                logX("Filter: $it")
                it > 0
            }.onEach {
                logX("Collect: $it")
            }.launchIn(scope)
    
        delay(100L)
    
    //    ================================
    //    Start
    //    Thread:DefaultDispatcher-worker-1, time:1666168824669
    //    ================================
    //    ================================
    //    Emit: 1
    //    Thread:DefaultDispatcher-worker-1, time:1666168824704
    //    ================================
    //    ================================
    //    Filter: 1
    //    Thread:mySingleThread, time:1666168824706
    //    ================================
    //    ================================
    //    Collect: 1
    //    Thread:mySingleThread, time:1666168824706
    //    ================================
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    launchIn 调用了 scope 的 launch,然后执行 collect。相当于终止操作符。

    public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
        collect() // tail-call
    }
    
    • 1
    • 2
    • 3

    flow 是冷的

    flow 是冷的,只有接收者存在的情况下才会发送数据。如果不调用 collect,emit 不会执行。相反 channel 是热的,不管有没有接收者都会发送。

    flow 的 emit 未执行。

    fun flowCold() = runBlocking {
        val flow = flow {
            (1..3).forEach {
                println("Before send $it")
                emit(it)
                println("Send $it")
            }
        }
    
        val channel = produce<Int>(capacity = 0) {
            (1..3).forEach {
                println("Before send $it")
                send(it)
                println("Send $it")
            }
        }
    
        println("end")
    //    end
    //    Before send 1
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    总结

    flow 是 kotlin 提供的解决复杂异步场景的方案。

    1. flow 由创建、中间操作符、终止操作符三个部分组成。
    2. flow 的生命周期可以分为 onStart 和 onComplete,与它们在 flow 的位置无关。
    3. flow 的异常处理使用 catch。catch 与位置相关。
    4. flow 的线程切换使用 flowOn 和 launchIn。flowOn 控制上游,launchIn 控制全局。
    5. flow 是冷的,只有存在接收者它才会开始执行。
  • 相关阅读:
    js楼层导航点击跳转偶尔不生效bug
    [netcore] ASP.NET Core 中间件
    问题:conda删除虚拟环境,报错no package names supplied
    [stm32]——uc/OS-III多任务程序
    MQ系列13:消息大量堆积如何为解决
    vueRouter个人笔记
    ESP32网络开发实例-WebSocket服务器
    C/S架构学习之使用poll实现TCP中型并发服务器
    数据结构和算法八股与手撕
    PaddleOCR训练手写文字识别模型
  • 原文地址:https://blog.csdn.net/caoshen2014/article/details/127412493