在 Kotlin 协程 Coroutine 中 , 使用 suspend 挂起函数 以异步的方式 返回单个返回值肯定可以实现 , 如果要 以异步的方式 返回多个元素的返回值 , 可以使用如下方案 :
同步调用函数时 , 如果函数耗时太长或者中途有休眠 , 则会阻塞主线程导致 ANR 异常 ;
如果要 以异步方式 返回多个返回值 , 可以在协程中调用挂起函数返回集合 , 但是该方案只能一次性返回多个返回值 , 不能持续不断的 先后 返回 多个 返回值 ;
- // 携程中调用挂起函数返回多个值
- // 调用 " 返回 List 集合的挂起函数 " , 并遍历返回值
- runBlocking {
- listFunction().forEach {
- // 遍历打印集合中的内容
- Log.e(TAG, "$it")
- }
- }
序列可以先后返回多个返回值 , 但是会阻塞线程 ;序列可以先后返回多个返回值 , 但是会阻塞线程 ;
本篇博客中开始引入 Flow 异步流的方式 , 持续性返回多个返回值 ;
调用 flow 构建器 , 可创建 Flow 异步流 , 在该异步流中, 异步地产生指定类型的元素 ;
public fun flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow = SafeFlow(block)
在 flow 异步流构建器中 , 通过调用 FlowCollector#emit 生成一个元素 ; 函数原型如下 :
- /**
- * [FlowCollector]用作流的中间或终端收集器,并表示接收[Flow]发出的值的实体。
- * 该接口通常不应该直接实现,而是在实现自定义操作符时作为[flow]构建器中的接收器使用。
- * 这个接口的实现不是线程安全的。
- */
- public interface FlowCollector<in T> {
-
- /**
- * 收集上游发出的值。
- * 此方法不是线程安全的,不应该并发调用。
- */
- public suspend fun emit(value: T)
- }
调用 Flow#collect 函数, 可以获取在异步流中产生的元素 , 并且该操作是异步操作, 不会阻塞调用线程 ;
- public interface Flow<out T> {
- /**
- * 接收给定的[collector]并[发出][FlowCollector]。向它发射]值。
- * 永远不应该实现或直接使用此方法。
- *
- * 直接实现“Flow”接口的唯一方法是扩展[AbstractFlow]。
- * 要将它收集到特定的收集器,可以使用' collector. emitall (flow) '或' collect{…}的扩展
- * 应该使用。这样的限制确保了上下文保存属性不被侵犯,并防止了大多数情况
- * 与并发性、不一致的流调度程序和取消相关的开发人员错误。
- */
- @InternalCoroutinesApi
- public suspend fun collect(collector: FlowCollector<T>)
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- // 协程中调用挂起函数flowFunction()返回一个 Flow 异步流
- runBlocking{
- // 调用 Flow#collect 函数, 可以获取在Flow异步流中产生的元素值it
- val mFlow: Flow<Int> = flowFunction()
- mFlow.collect(collector = {
- Log.e(TAG," 收集Flow异步流冷流mFlow的协程上下文 : ${Thread.currentThread().name}")
- // 每隔 500ms 即可 获取 Flow异步流中的一个Int 元素
- // 并且该操作是异步操作, 不会阻塞调用线程
- Log.e(TAG, "收集Flow异步流冷流mFlow中的一个个元素it=$it")
- })
- }
- /**
- Flow 异步流冷流mFlow的构建器的上下文 : main
- 收集Flow异步流冷流mFlow的协程上下文 : main
- Flow 异步流冷流mFlow发射元素值i=0
- 收集Flow异步流冷流mFlow中的一个个元素it=0
- Flow 异步流冷流mFlow发射元素值i=1
- 收集Flow异步流冷流mFlow中的一个个元素it=1
- Flow 异步流冷流mFlow发射元素值i=2
- 收集Flow异步流冷流mFlow中的一个个元素it=2
- */
-
-
- }
-
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
-
- suspend fun flowFunction(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
- Log.e(TAG, "Flow 异步流冷流mFlow的构建器的上下文 : ${Thread.currentThread().name}")
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i")
- this.emit(i)
- }
- })
- return mFlow
- }
-
- }
① 异步流构建方式 : Flow 异步流是通过 flow 构建器函数 创建的 ;
public fun flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow = SafeFlow(block)
② 构建器可调用挂起函数 : flow 构建器代码块中的代码 , 是可以挂起的 , 可以在其中调用 挂起函数 , 如 kotlinx.coroutines.delay
函数等 ;
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
-
- suspend fun flowFunction(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
- Log.e(TAG, "Flow 异步流冷流mFlow的构建器的上下文 : ${Thread.currentThread().name}")
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i")
- this.emit(i)
- }
- })
- return mFlow
- }
③ suspend 关键字可省略 : 返回值为 Flow 异步流的函数 , 其默认就是 suspend 挂起函数 , suspend 关键字可以省略 , 上述函数中不标注 suspend 也可 ;
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
-
- // suspend fun flowFunction(): Flow
{ - fun flowFunction(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
- Log.e(TAG, "Flow 异步流冷流mFlow的构建器的上下文 : ${Thread.currentThread().name}")
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i")
- this.emit(i)
- }
- })
- return mFlow
- }
-
④ 生成元素 : 在 Flow 异步流中 , 通过调用 FlowCollector#emit
函数生成元素 ;
⑤ 收集元素 : 在 Flow 异步流中 , 通过调用 Flow#collect
函数可以收集 在 Flow 异步流中生成的元素 ;
Android 中主线程不可执行网络相关操作 , 因此只能在 子线程 中下载文件 ,可以在协程中使用 Dispatcher.IO 调度器在子线程下载文件 ,下载文件时需要实时显示下载百分比进度 ,这个进度需要上报给主线程 , 在主线程中更新 UI 显示下载进度 ,在 Flow 异步流中 , 可以 使FlowCollector#emit 向主线程中发送进度值 。在主线程中 , 可以 使用 Flow#collect 函数 收集 Flow 异步流中发射出来的数据 , 如 : 进度 , 捕获的异常 , 下载状态等 ;
完整流程 , 如下图所示 :
Flow 异步流 的 构建器函数 flow 函数 中的 代码 ,在 调用 Flow#collect 函数 时 , 也就是在 Flow 异步流 收集元素时 ,才会 执行 flow 构建器 中的代码 ;这种机制的异步流 称为 冷流 ;
在 flow 构建器的开始位置 , 发射元素 , 在主线程中 Flow#collect 收集元素位置 , 添加日志信息 , 查看日志打印的时机 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- // 协程中调用挂起函数flowFunction()返回一个 Flow 异步流
- runBlocking{
- // 调用 Flow#collect 函数, 可以获取在Flow异步流中产生的元素值it
- val mFlow: Flow<Int> = flowFunction()
- mFlow.collect(collector = {
- Log.e(TAG," 收集Flow异步流冷流mFlow的协程上下文 : ${Thread.currentThread().name}")
- // 每隔 500ms 即可 获取 Flow异步流中的一个Int 元素
- // 并且该操作是异步操作, 不会阻塞调用线程
- Log.e(TAG, "收集Flow异步流冷流mFlow中的一个个元素it=$it")
- })
- }
- /**
- Flow 异步流冷流mFlow的构建器的上下文 : main
- 收集Flow异步流冷流mFlow的协程上下文 : main
- Flow 异步流冷流mFlow发射元素值i=0
- 收集Flow异步流冷流mFlow中的一个个元素it=0
- Flow 异步流冷流mFlow发射元素值i=1
- 收集Flow异步流冷流mFlow中的一个个元素it=1
- Flow 异步流冷流mFlow发射元素值i=2
- 收集Flow异步流冷流mFlow中的一个个元素it=2
- */
-
-
- }
-
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
-
- suspend fun flowFunction(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
- Log.e(TAG, "Flow 异步流冷流mFlow的构建器的上下文 : ${Thread.currentThread().name}")
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i")
- this.emit(i)
- }
- })
- return mFlow
- }
-
- }
Flow 流 的 每次调用 Flow#collect 收集元素的操作 , 都是 按照 固定顺序 执行的 , 使用 特殊操作符 可以改变该顺序 ;
Flow 异步流 中的元素 , 按照顺序进行 FlowCollector#emit 发射元素操作 , 则 调用 Flow#collect 收集元素时获取的元素 也是按照顺序获取的 ;
在流的 上游发射元素 到 下游收集元素 的过程中 , 会 使用 过渡操作符 处理每个 FlowCollector#emit 发射出的元素 , 最终才交给 最末端的 操作符 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- // 协程中调用挂起函数返回一个 Flow 异步流
-
- runBlocking{
- // 使用下面的方式asFlow()可以快速构建一个 Flow 流
- //上游发射元素
- val mFlow : Flow<Int> = (0..5).asFlow()
- val filterFlow: Flow<Int> = mFlow.filter (predicate={
- if( it % 2 == 1){
- Log.e(TAG,"Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=${it}")
- }
- // 奇数才能继续向下流,偶数被过滤掉了
- it % 2 == 1 //Lambda最后一行最为返回值
- })
- filterFlow.collect(collector = {
- Log.e(TAG, "下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=$it")
- })
- //将Flow异步流冷流filterFlow的元素值进行转换后,返回一个新的Flow流 ,然后对流的元素进行发射
- val mapFlow : Flow
= filterFlow.map<Int,String>(transform={ - // 遍历元素, 将其拼接成字符串
- val str = "学号 : $it"
- Log.e(TAG,"Flow 异步流冷流mapFlow发射转换后的元素值str=$str")
- str //Lambda最后一行最为返回值
- })
- mapFlow.collect(collector = {
- Log.e(TAG, "下游收集Flow异步流冷流mapFlow的一个个元素it=$it")
- })
-
- }
-
-
- /**
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=1
- *下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=1
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=3
- * 下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=3
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=5
- * 下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=5
- *
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 1
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 1
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 3
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 3
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 5
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 5
- *
- * */
-
- }
-
- }
在 flow 流构建器中 , 调用 FlowCollector#emit 函数 发射元素 , 然后在外部 调用 Flow#collect 函数 收集元素 ;
public fun flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow = SafeFlow(block)
- public fun
flowOf(vararg elements: T): Flow = flow { - for (element in elements) {
- emit(element)
- }
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
- runBlocking {
- // 协程中调用挂起函数返回一个 Flow 异步流
- val mFlowOf: Flow<Int> = flowOf(0, 1, 2, 3)
- val mOnEachFlow: Flow<Int> = mFlowOf.onEach {
- // 每次发射元素时调用的代码块
- delay(1000)
- Log.e(TAG,"Flow 异步流冷流mOnEachFlow发射元素值it=$it")
- }
-
- mOnEachFlow.collect(collector = {
- // 每隔 1 秒接收一个元素
- Log.e(TAG, "下游收集Flow异步流冷流mOnEachFlow的一个个元素it=$it")
- })
- }
-
- /**
- * Flow 异步流冷流mOnEachFlow发射元素值it=0
- *下游收集Flow异步流冷流mOnEachFlow的一个个元素it=0
- *
- * Flow 异步流冷流mOnEachFlow发射元素值it=1
- * 下游收集Flow异步流冷流mOnEachFlow的一个个元素it=1
- *
- * Flow 异步流冷流mOnEachFlow发射元素值it=2
- * 下游收集Flow异步流冷流mOnEachFlow的一个个元素it=2
- *
- * Flow 异步流冷流mOnEachFlow发射元素值it=3
- * 下游收集Flow异步流冷流mOnEachFlow的一个个元素it=3
- * */
-
- }
-
- }
- public fun IntRange.asFlow(): Flow<Int> = flow {
- forEach { value ->
- emit(value)
- }}
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- // 协程中调用挂起函数返回一个 Flow 异步流
-
- runBlocking{
- // 使用下面的方式asFlow()可以快速构建一个 Flow 流
- //上游发射元素
- val mFlow : Flow<Int> = (0..5).asFlow()
- val filterFlow: Flow<Int> = mFlow.filter (predicate={
- if( it % 2 == 1){
- Log.e(TAG,"Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=${it}")
- }
- // 奇数才能继续向下流,偶数被过滤掉了
- it % 2 == 1 //Lambda最后一行最为返回值
- })
- filterFlow.collect(collector = {
- Log.e(TAG, "下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=$it")
- })
- //将Flow异步流冷流filterFlow的元素值进行转换后,返回一个新的Flow流 ,然后对流的元素进行发射
- val mapFlow : Flow
= filterFlow.map<Int,String>(transform={ - // 遍历元素, 将其拼接成字符串
- val str = "学号 : $it"
- Log.e(TAG,"Flow 异步流冷流mapFlow发射转换后的元素值str=$str")
- str //Lambda最后一行最为返回值
- })
- mapFlow.collect(collector = {
- Log.e(TAG, "下游收集Flow异步流冷流mapFlow的一个个元素it=$it")
- })
-
- }
-
-
- /**
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=1
- *下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=1
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=3
- * 下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=3
- * Flow 异步流冷流filterFlow发射过滤后符合条件的元素值it=5
- * 下游收集Flow异步流冷流filterFlow过滤后满足条件的一个个元素it=5
- *
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 1
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 1
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 3
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 3
- * Flow 异步流冷流mapFlow发射转换后的元素值str=学号 : 5
- * 下游收集Flow异步流冷流mapFlow的一个个元素it=学号 : 5
- *
- * */
-
- }
-
- }
Flow 异步流 收集元素 的操作 , 一般是在 协程上下文 中进行的 , 如 : 在协程中调用 Flow#collect 函数 , 收集元素 ;
收集元素 时 的 协程上下文 , 会 传递给 发射元素 的 流构建器 , 作为 流构建器的 上下文 ;Flow 异步流 在 收集元素 时 , 才调用 流构建器 中的代码 , 收集元素操作在协程中执行 , 流构建器 也同样在相同的协程中运行 ;
流收集元素 和 发射元素 在相同的协程上下文中 的 属性 , 称为 上下文保存 ;
Flow#collect 函数原型如下 : Flow#collect 函数 由 suspend 关键字修饰 , 该函数是 suspend 挂起函数 , 因此 该函数必须在 协程中调用 ;
- public suspend inline fun
Flow .collect(crossinline action: suspend (value: T) -> Unit): Unit = - collect(object : FlowCollector
{ - override suspend fun emit(value: T) = action(value)
- })
Flow 异步流的 构建器 函数 : 流构建器 不是 suspend 挂起函数 , 可以在普通的线程中运行 , 不必在协程中运行 ;
使用 flow()函数 构建 Flow异步流冷流
public fun flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow = SafeFlow(block)
使用flowOf()函数 构建 Flow异步流冷流
- public fun
flowOf(vararg elements: T): Flow = flow { - for (element in elements) {
- emit(element)
- }
- }
使用asFlow()函数 构建 Flow异步流冷流
- @FlowPreview
- public fun
(() -> T).asFlow(): Flow = flow { - emit(invoke())
- }
代码示例 : 在 流收集 时 和 流构建时 , 分别打印线程名称 , 查看是在哪个线程中执行的 ;
执行结果 : 最终执行时 , 流构建器和流收集 都是在 主线程中执行的 , 这是 由 runBlocking 协程构建器 将 主线程 包装后的 协程 ;
-
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- // 协程中调用挂起函数flowFunction()返回一个 Flow 异步流
- runBlocking{
- // 调用 Flow#collect 函数, 可以获取在Flow异步流中产生的元素值it
- val mFlow: Flow<Int> = flowFunction()
- mFlow.collect(collector = {
- Log.e(TAG," 收集Flow异步流冷流mFlow的协程上下文 : ${Thread.currentThread().name}")
- // 每隔 500ms 即可 获取 Flow异步流中的一个Int 元素
- // 并且该操作是异步操作, 不会阻塞调用线程
- Log.e(TAG, "收集Flow异步流冷流mFlow中的一个个元素it=$it")
- })
- }
- /**
- Flow 异步流冷流mFlow的构建器的上下文 : main
- 收集Flow异步流冷流mFlow的协程上下文 : main
- Flow 异步流冷流mFlow发射元素值i=0
- 收集Flow异步流冷流mFlow中的一个个元素it=0
- Flow 异步流冷流mFlow发射元素值i=1
- 收集Flow异步流冷流mFlow中的一个个元素it=1
- Flow 异步流冷流mFlow发射元素值i=2
- 收集Flow异步流冷流mFlow中的一个个元素it=2
- */
-
- }
-
-
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
-
- suspend fun flowFunction(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
- Log.e(TAG, "Flow 异步流冷流mFlow的构建器的上下文 : ${Thread.currentThread().name}")
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i")
- this.emit(i)
- }
- })
- return mFlow
- }
-
-
- }
在上述 流的收集 和 流的发射 都 必须在同一个协程中执行 , 这样并不是我们想要的 ;如 : 下载时 , 想要在后台线程中下载 , 在主线程中更新 UI , 那么对应 Flow 异步流应该是在 后台线程中 发射元素 , 在主线程中 收集元素 ;
使用 flowOn 操作符 , 可以修改 流发射 的协程上下文 , 不必必须在 流收集 的协程上下文中执行 流发射操作 ;
Flow#flowOn 函数原型如下 :
- /**
- * 将此流执行的上下文更改为给定的[context]。
- * 此操作符是可组合的,仅影响前面没有自己上下文的操作符。
- * 这个操作符是上下文保护的:[context] **不会**泄漏到下游流中。
- *
- * 例如:
- *
- * ```
- * withContext(Dispatchers.Main) {
- * val singleValue = intFlow // will be executed on IO if context wasn't specified before
- * .map { ... } // Will be executed in IO
- * .flowOn(Dispatchers.IO)
- * .filter { ... } // Will be executed in Default
- * .flowOn(Dispatchers.Default)
- * .single() // Will be executed in the Main
- * }
- * ```
- *
- * 有关上下文保存的更多说明,请参考[Flow]文档。
- *
- * 如果更改上下文不需要更改,则此操作符保留流的_sequential_性质
- * (调度)[CoroutineDispatcher]。否则,如果需要更改dispatcher,它将进行收集
- * 使用指定[上下文]运行的协同例程中的流发射,并从另一个协同例程中发射它们
- * 使用带有[default][channel]的通道与原始收集器的上下文连接。BUFFERED]缓冲区大小
- * 在两个协程之间,类似于[buffer]操作符,除非显式调用[buffer]操作符
- * 在' flowOn '之前或之后,请求缓冲行为并指定通道大小。
- *
- * 注意,跨不同调度程序操作的流在取消时可能会丢失一些正在运行的元素。
- * 特别是,该操作符确保下游流不会在取消时恢复,即使元素
- * 已经被上游的气流释放出来了。
- *
- * ###算子融合
- *
- * 相邻的[channelFlow]、[flowOn]、[buffer]和[produceIn]的应用是
- * 始终融合,以便只有一个正确配置的通道用于执行。
- *
- * 多个“flowOn”操作符融合到一个具有组合上下文的单一“flowOn”。上下文的要素
- * 第一个' flowOn '操作符自然优先于第二个' flowOn '操作符的元素
- * 当它们具有相同的上下文键时,例如:
- *
- * ```
- * flow.map { ... } // Will be executed in IO
- * .flowOn(Dispatchers.IO) // This one takes precedence
- * .flowOn(Dispatchers.Default)
- * ```
- *
- * 请注意,[SharedFlow]的实例本身没有执行上下文,
- * 所以应用' flowOn '到' SharedFlow '没有效果。参见[SharedFlow]关于Operator Fusion的文档。
- *
- * @throws [IllegalArgumentException] 如果所提供的上下文包含[Job]实例。
- */
- public fun
Flow .flowOn(context: CoroutineContext): Flow { - checkFlowContext(context)
- return when {
- context == EmptyCoroutineContext -> this
- this is FusibleFlow -> fuse(context = context)
- else -> ChannelFlowOperatorImpl(this, context = context)
- }
- }
流发射 在子线程中执行 , 流收集 在 主线程中执行 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking{
- // 调用 Flow#collect 函数, 可以获取在Flow异步流中产生的元素值it
- val mFlow: Flow<Int> = flowFunction2()
- mFlow.collect(collector = {
- // 每隔 500ms 即可 获取 Flow异步流中的一个Int 元素
- // 并且该操作是异步操作, 不会阻塞调用线程
- Log.e(TAG, "收集Flow异步流冷流mFlow中的一个个元素it=$it -------- mFlow的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- }
-
- /**
- Flow 异步流冷流mFlow发射元素值i=0 -------- mFlow构建器所在的上下文 : DefaultDispatcher-worker-3
- 收集Flow异步流冷流mFlow中的一个个元素it=0 -------- mFlow的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=1 -------- mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- 收集Flow异步流冷流mFlow中的一个个元素it=1 -------- mFlow的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=2 -------- mFlow构建器所在的上下文 : DefaultDispatcher-worker-3
- 收集Flow异步流冷流mFlow中的一个个元素it=2 -------- mFlow的协程所在的上下文 : main
- * */
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 在该异步流中, 异步地产生 Int 元素
- */
- suspend fun flowFunction2(): Flow<Int>{
- val mFlow : Flow<Int> = flow<Int>(block = {
- Log.e(TAG, "输出接受者对象this=${this}")
-
- for (i in 0..2) {
- // 挂起函数 挂起 500ms
- // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
- // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
- delay(500)
- // 每隔 500ms 产生一个元素
- // 通过调用 FlowCollector#emit 生成一个元素
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i -------- mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- this.emit(i)
- }
- }).flowOn(context=Dispatchers.IO)
- return mFlow
- }
-
-
- }
响应式编程 , 是 基于事件驱动 的 , 在 Flow 流中会产生源源不断的事件 , 就是 发射元素操作 ;拿到 Flow 流后 , 开始 收集元素 , 按照顺序逐个处理产生的事件 ( 元素 ) ;
调用 Flow#launchIn()函数 , 传入 协程作用域 作为参数 , 可以 指定 收集 Flow 流元素 的 协程 ;
我们知道调用 Flow#flowOn() 函数 , 可以 指定 Flow 流发射元素 的 协程 ;
Flow#launchIn() 函数返回值是 Job 对象 , 是 协程任务对象 , 可调用 Job#cancel 函数取消该协程任务 ;
2、Flow#launchIn() 函数原型
- /**
- * 终端流操作符,在[作用域]中[启动][启动]给定流的[收集][收集]。
- * 它是“范围”(scope)的简称。启动{flow.collect()} '。
- *
- * 此操作符通常与[onEach], [onCompletion]和[catch]操作符一起使用,以处理所有发出的值
- * 处理上游流或处理过程中可能发生的异常,例如:
- *
- * ```
- * flow
- * .onEach { value -> updateUi(value) }
- * .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
- * .catch { cause -> LOG.error("Exception: $cause") }
- * .launchIn(uiScope)
- * ```
- *
- * 注意,[launchIn]的结果值没有被使用,提供的作用域负责取消。
- */
- public fun
Flow .launchIn(scope: CoroutineScope): Job = scope.launch { - collect() // tail-call
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val mFlowEvent: Flow<Int> = flowEvent()
- val mOnEachFlow:Flow<Int> = mFlowEvent.onEach (action={
- // 逐个处理产生的事件
- Log.e(TAG, "收集Flow异步流冷流mFlowEvent中的一个个元素it=$it -------- mFlow的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- // 该 launchIn 函数返回一个 Job 对象
- val job: Job = mOnEachFlow.launchIn(CoroutineScope(Dispatchers.IO)) // 在指定的协程作用域中处理收集元素操作,
- job.join() // 该协程不是 runBlocking 主协程 的子协程, 需要调用 join 等待协程执行完毕
- }
-
- /**
- * Flow 异步流冷流mAsFlow发射元素值it=0 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=0 -------- mFlow的协程所在的上下文 : DefaultDispatcher-worker-3
- *
- * Flow 异步流冷流mAsFlow发射元素值it=1 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-3
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=1 -------- mFlow的协程所在的上下文 : DefaultDispatcher-worker-1
- *
- * Flow 异步流冷流mAsFlow发射元素值it=2 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=2 -------- mFlow的协程所在的上下文 : DefaultDispatcher-worker-2
- *
- * Flow 异步流冷流mAsFlow发射元素值it=3 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-2
- *收集Flow异步流冷流mFlowEvent中的一个个元素it=3 -------- mFlow的协程所在的上下文 : DefaultDispatcher-worker-1
- * */
-
-
- }
-
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 产生事件的 事件源
- */
-
- suspend fun flowEvent(): Flow<Int>{
- // 将区间转为 Flow 流
- val mAsFlow: Flow<Int> = (0..3).asFlow()
- val mOnEachFlow:Flow<Int> = mAsFlow.onEach(action={
- // 发射元素 ( 产生事件 ) 时挂起 500ms
- delay(500)
- Log.e(TAG,"Flow 异步流冷流mAsFlow发射元素值it=$it -------- mAsFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- })
-
- val mFlowOn:Flow<Int> = mOnEachFlow.flowOn(context=Dispatchers.Default) // 设置发射元素的协程
- return mFlowOn
- }
-
- }
Flow 流的 收集元素 操作 , 是在协程中执行 , 将 协程 取消 , 即可将 Flow 流收集操作 取消 , 也就是 将 Flow 流取消 ;
在 Flow#collect 代码块中 , 执行 Job#cancel 函数 , 即可取消该流收集操作所在的协程 , 进而取消了流 ;
withTimeoutOrNull(2000)
创建一个协程 , 该协程在 3000ms 后自动超时取消 , 同时在其中进行 流收集 的操作也一并取消 ;- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
-
- // 该协程作用域 2 秒后超时取消
- withTimeoutOrNull(timeMillis=3000,block={
- // 协程中调用挂起函数返回一个 Flow 异步流
- val mFlowEvent: Flow<Int> = flowEvent()
- mFlowEvent.collect(collector={
- Log.e(TAG, "收集Flow异步流冷流mFlowEvent中的一个个元素it=$it -------- mFlowEvent的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- })
- Log.e(TAG,"协程作用域取消")
-
- }
-
- /**Flow 异步流冷流mAsFlow发射元素值it=0 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=0 -------- mFlowEvent的协程所在的上下文 : main
- *
- * Flow 异步流冷流mAsFlow发射元素值it=1 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=1 -------- mFlowEvent的协程所在的上下文 : main
- *
- * Flow 异步流冷流mAsFlow发射元素值it=2 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=2 -------- mFlowEvent的协程所在的上下文 : main
- *
- * Flow 异步流冷流mAsFlow发射元素值it=3 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=3 -------- mFlowEvent的协程所在的上下文 : main
- *
- * Flow 异步流冷流mAsFlow发射元素值it=4 -------- mAsFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流mFlowEvent中的一个个元素it=4 -------- mFlowEvent的协程所在的上下文 : main
- *
- * 协程作用域取消
- * */
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- * 产生事件的 事件源
- */
-
- private suspend fun flowEvent(): Flow<Int>{
- // 将区间转为 Flow 流
- val mAsFlow: Flow<Int> = (0..10).asFlow()
- val mOnEachFlow:Flow<Int> = mAsFlow.onEach(action={
- // 发射元素 ( 产生事件 ) 时挂起 500ms
- delay(500)
- Log.e(TAG,"Flow 异步流冷流mAsFlow发射元素值it=$it -------- mAsFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- })
-
- val mFlowOn:Flow<Int> = mOnEachFlow.flowOn(context=Dispatchers.Default) // 设置发射元素的协程
- return mFlowOn
- }
-
-
- }
在 Flow 流构建器 中 , 每次 调用 FlowCollector#emit 发射元素时 ,都会执行一个 ensureActive 检测 , 检测当前的流是否取消 ,因此 , 在 flow 流构建器 中 , 循环执行的 FlowCollector#emit 发射操作 , 是可以取消的 ;
在 Flow#collect 代码块中 , 执行 Job#cancel 函数 , 即可取消该流收集操作所在的协程 , 进而取消了流 ;
- /**
- * 用一个可选的cancel [cause]取消这个作用域,包括它的作业和它的所有子任务。
- * 原因可用于指定错误消息或提供有关的其他详细信息
- * 用于调试目的的取消原因。
- * 如果作用域中没有作业,则抛出[IllegalStateException]。
- */
- public fun CoroutineScope.cancel(cause: CancellationException? = null) {
- val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
- job.cancel(cause)
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val mFlowEvent2 : Flow<Int> = flowEvent2()
- mFlowEvent2.collect(collector={
- Log.e(TAG, "收集Flow异步流冷流mFlowEvent2中的一个个元素it=$it -------- mFlowEvent2的协程所在的上下文 : ${Thread.currentThread().name}")
- if(it==3){
- // 收集到元素 3 时, 取消流
- // 在流中 emit 发射 3 时, 就会自动爆出异常, 停止后续操作
- cancel()
- }
- })
- }
-
- /**
- *收集Flow异步流冷流mFlowEvent2中的一个个元素it=1 -------- mFlowEvent2的协程所在的上下文 : main
- *Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- *
- * 收集Flow异步流冷流mFlowEvent2中的一个个元素it=2 -------- mFlowEvent2的协程所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- *
- * 收集Flow异步流冷流mFlowEvent2中的一个个元素it=3 -------- mFlowEvent2的协程所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 : main
- */
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- private suspend fun flowEvent2(): Flow<Int>{
- val mFlow:Flow<Int> = flow<Int>(block = {
- for(i in 1..6) {
- delay(1000)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
- return mFlow
- }
-
-
-
- }
在 Flow 流中 , 除 FlowCollector#emit 发射元素 之外 ,还有很多其它的 流操作 , 这些操作不会 自动执行 ensureActive 检测 ,因此这里需要我们 手动 进行 流取消检测 ;调用 Flow#cancellable() 函数 , 可以手动设置流取消检测 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
-
- (0..5).asFlow().cancellable().collect {
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
-
- // 收集到元素 2 时, 协程退出
- if (it == 2) {
- cancel()
- }
- }
- Log.e(TAG,"协程作用域取消")
- }
-
- /**
- * 收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- *
- * */
- }
-
- }
" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,
数据 生产者 的 生产效率 大于 数据 消费者 的 消费效率 , 就会产生 背压 ;
处理背压问题 , 有 2 种方案 :
以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val delta = measureTimeMillis {
- // 以 200 ms 的间隔收集元素
- // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
- flowEmit().collect{
- delay(200)
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- }
- }
-
- Log.e(TAG,"Flow异步流冷流收集元素总共耗时 $delta ms")
- }
-
- /**
- *收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- *Flow 异步流冷流mFlow发射元素值i=0 --------mFlow构建器所在的上下文 : main
- 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- 收集Flow异步流冷流中的一个个元素it=3 --------流的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 :
- main收集Flow异步流冷流中的一个个元素it=4 --------流的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=4--------mFlow构建器所在的上下文 : main
- main收集Flow异步流冷流中的一个个元素it=5 --------流的协程所在的上下文 : main
- Flow 异步流冷流mFlow发射元素值i=5--------mFlow构建器所在的上下文 : main
- Flow异步流冷流收集元素总共耗时 1815 ms
- * */
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- suspend fun flowEmit(): Flow<Int> {
- // 以 100 ms 的间隔发射元素
-
- val mFlow:Flow<Int> = flow<Int>(block ={
- for (i in 0..5) {
- delay(100)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
-
- return mFlow
- }
-
-
-
- }
调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val delta = measureTimeMillis {
- // 以 200 ms 的间隔收集元素
- // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
- flowEmit().buffer(10).collect{
- delay(200)
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- }
- }
-
- Log.e(TAG,"Flow异步流冷流收集元素总共耗时 $delta ms")
- }
-
- /**
- * Flow 异步流冷流mFlow发射元素值i=0 --------mFlow构建器所在的上下文 : main
- *Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=4 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=5 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- *
- * 收集Flow异步流冷流中的一个个元素it=3 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=4 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=5 --------流的协程所在的上下文 : main
- * Flow异步流冷流收集元素总共耗时 1320 ms
- *
- * */
-
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- suspend fun flowEmit(): Flow<Int> {
- // 以 100 ms 的间隔发射元素
-
- val mFlow:Flow<Int> = flow<Int>(block ={
- for (i in 0..5) {
- delay(100)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
-
- return mFlow
- }
-
-
-
- }
上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;
-
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val delta = measureTimeMillis {
- // 以 200 ms 的间隔收集元素
- // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
- flowEmit().flowOn(Dispatchers.Default).collect{
- delay(200)
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- }
- }
-
- Log.e(TAG,"Flow异步流冷流收集元素总共耗时 $delta ms")
- }
-
- /**Flow 异步流冷流mFlow发射元素值i=0 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=4 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * Flow 异步流冷流mFlow发射元素值i=5 --------mFlow构建器所在的上下文 : DefaultDispatcher-worker-1
- * 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=3 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=4 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=5 --------流的协程所在的上下文 : main
- * Flow异步流冷流收集元素总共耗时 1316 ms
- * */
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- suspend fun flowEmit(): Flow<Int> {
- // 以 100 ms 的间隔发射元素
-
- val mFlow:Flow<Int> = flow<Int>(block ={
- for (i in 0..5) {
- delay(100)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
-
- return mFlow
- }
-
-
- }
从提高收集元素效率方向解决背压问题 :
❶调用 Flow#conflate 函数 , 合并发射元素项 , 不对每个值进行单独处理 ;
❷调用 Flow#collectLatest 函数 , 取消并重新发射最后一个元素 , 只关心最后一个结果 , 不关心中间的过程值 ;
发射了 6 个元素 , 但是只接收到了 4个元素 , 接收的元素 2 4 被过滤掉了 ;
-
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val delta = measureTimeMillis {
- // 以 200 ms 的间隔收集元素
- // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
- flowEmit().conflate().collect{
- delay(200)
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- }
- }
-
- Log.e(TAG,"Flow异步流冷流收集元素总共耗时 $delta ms")
- }
-
- /**
- *Flow 异步流冷流mFlow发射元素值i=0 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- *
- * Flow 异步流冷流mFlow发射元素值i=4 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=5 --------mFlow构建器所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=3 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=5 --------流的协程所在的上下文 : main
- *
- * Flow异步流冷流收集元素总共耗时 915 ms
- *
- * */
-
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- suspend fun flowEmit(): Flow<Int> {
- // 以 100 ms 的间隔发射元素
-
- val mFlow:Flow<Int> = flow<Int>(block ={
- for (i in 0..5) {
- delay(100)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
-
- return mFlow
- }
-
-
- }
只接收了最后一个元素 , 前几个元素没有接收 ;
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- val delta = measureTimeMillis {
- // 以 200 ms 的间隔收集元素
- // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
- flowEmit().collectLatest {
- delay(200)
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- }
- }
-
- Log.e(TAG,"Flow异步流冷流收集元素总共耗时 $delta ms")
- }
-
- /**Flow 异步流冷流mFlow发射元素值i=0 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=3 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=4 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=5 --------mFlow构建器所在的上下文 : main
- *
- * 收集Flow异步流冷流中的一个个元素it=5 --------流的协程所在的上下文 : main
- * Flow异步流冷流收集元素总共耗时 838 ms
- */
-
-
- }
-
- /**
- * 使用 flow 构建器 Flow 异步流
- */
- suspend fun flowEmit(): Flow<Int> {
- // 以 100 ms 的间隔发射元素
-
- val mFlow:Flow<Int> = flow<Int>(block ={
- for (i in 0..5) {
- delay(100)
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
-
- return mFlow
- }
-
-
- }
Flow 操作符主要分类:过渡操作符、限长操作符、末端操作符
过渡操作符 相关概念 :
❶转换流 : 使用 过渡操作符 转换 Flow 流 ;
❷作用位置 : 过渡操作符作用 于 流的上游 , 返回 流的下游 ;
❸非挂起函数 : 过渡操作符 不是挂起函数 , 属于冷操作符 ;
❹运行速度 : 过渡操作符 可以 快速返回 新的 转换流 ;
通过 map 操作符 , 可以操作每个元素 , 将元素转为另外一种类型的元素 ;
- /**
- * 返回一个流,其中包含对原始流的每个值应用给定[transform]函数的结果。
- */
- public inline fun
Flow .map(crossinline transform: suspend (value: T) -> R): Flow = transform { value -> - return@transform emit(transform(value))
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- (0..3).asFlow()
- // 通过 map 操作符 将 Flow 中发射的 Int 元素it 转为 字符串
- .map(transform={
- stringConvert(it)
- })
- .collect(collector={
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- }
-
- /**
- * 收集Flow异步流冷流中的一个个元素it=convert 0 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 1 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 2 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 3 --------流的协程所在的上下文 : main
- * */
-
-
-
- }
-
- // 将 Int 转为 字符串
- suspend fun stringConvert(num: Int): String {
- delay(1000)
- return "convert $num"
- }
-
-
- }
通过 transform 操作符 , 可以操作每个元素 , 可以在单个元素处理时 , 发射多次元素 ;
- /**
- * 将[transform]函数应用到给定流的每个值。
- *
- * ' transform '的接收者是[FlowCollector],因此' transform '是一个
- * 灵活的函数,可以转换发出的元素,跳过它或多次发出它。
- *
- * 该操作符泛化了[filter]和[map]操作符和
- * 可以用作其他操作符的构建块,例如:
- *
- * ```
- * fun Flow
.skipOddAndDuplicateEven(): Flow = transform { value -> - * if (value % 2 == 0) { // Emit only even values, but twice
- * emit(value)
- * emit(value)
- * } // Do nothing if odd
- * }
- * ```
- */
- public inline fun
Flow .transform( - @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
- ): Flow
= flow { // 注意:这里使用的是安全流,因为收集器对每个操作的转换都是公开的 - collect { value ->
- // 没有它,单元将被退回,TCE将不会生效,KT-28938
- return@collect transform(value)
- }
- }
-
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- (0..3).asFlow()
- // 通过 map 操作符 将 Flow 中发射的 Int 元素it 转为 字符串
- .transform (transform={
- Log.e(TAG,"接受者对象this=$this,回调的参数it=$it")
- // 在 transform 操作符中发射 2 个元素
- emit(it)
- emit(stringConvert(it))
-
- })
- .collect(collector={
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- }
-
-
- /**
- *收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 0 --------流的协程所在的上下文 : main
- * 接受者对象this=Continuation at kotlinx.coroutines.flow.internal.SafeCollector,回调的参数it=1
- *
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 1 --------流的协程所在的上下文 : main
- * 接受者对象this=Continuation at kotlinx.coroutines.flow.internal.SafeCollector,回调的参数it=2
- *
- * 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- *收集Flow异步流冷流中的一个个元素it=convert 2 --------流的协程所在的上下文 : main
- * 接受者对象this=Continuation at kotlinx.coroutines.flow.internal.SafeCollector,回调的参数it=3
- *
- * 收集Flow异步流冷流中的一个个元素it=3 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=convert 3 --------流的协程所在的上下文 : main
- *
- * */
-
-
-
- }
-
- // 将 Int 转为 字符串
- suspend fun stringConvert(num: Int): String {
- delay(1000)
- return "convert $num"
- }
-
-
-
- }
通过 take 操作符 , 可以选择选取指定个数的发射元素 ;
如 : 在 Flow 流中发射了 4 个元素 , 但是调用了 Flow#take(2) , 只收集其中 2 个元素 ;
- /**
- * 返回包含第一个[count]元素的流。
- * 当[count]元素被消耗时,原始流将被取消。
- * 如果[count]不是正数,抛出[IllegalArgumentException]。
- */
- public fun
Flow .take(count: Int): Flow { - require(count > 0) { "Requested element count $count should be positive" }
- return unsafeFlow {
- var consumed = 0
- try {
- collect { value ->
- // 注意:这个for take不是故意用collectWhile写的。
- // 它首先检查条件,然后对emit或emitAbort进行尾部调用。
- // 这样,正常的执行不需要状态机,只需要终止(emitAbort)。
- // 有关不同方法的比较,请参阅“TakeBenchmark”。
- if (++consumed < count) {
- return@collect emit(value)
- } else {
- return@collect emitAbort(value)
- }
- }
- } catch (e: AbortFlowException) {
- e.checkOwnership(owner = this)
- }
- }
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
- (0..5).asFlow()
- .take(3)
- .collect(collector={
- Log.e(TAG, "收集Flow异步流冷流中的一个个元素it=$it --------流的协程所在的上下文 : ${Thread.currentThread().name}")
- })
- }
-
- /**
- * 收集Flow异步流冷流中的一个个元素it=0 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=1 --------流的协程所在的上下文 : main
- * 收集Flow异步流冷流中的一个个元素it=2 --------流的协程所在的上下文 : main
- *
- * */
-
- }
-
- }
何为末端(终端)操作符函数? 末端(终端)操作符的后面不可以再接其他操作符函数了,而是只能获取最终的运行结果。
末端操作符 指的是 在 Flow 流最末端 调用 挂起函数 收集元素 的操作符 , 最常见的 末端操作符 就是 collect 操作符 ;
- 收集元素 : collect ;
- 将收集的元素转为集合 : toList , toSet ;
- 收集第一个元素 : first ;
- 发射单个元素 : single ;
- 规约流到单个值 : reduce , fold ;
- /**
- * 终端流操作符,使用提供的[动作]收集给定的流。
- * 如果在收集过程中或在所提供的流中发生任何异常,则此方法将重新抛出此异常。
- *
- * 使用示例:
- *
- * ```
- * val flow = getMyEvents()
- * try {
- * flow.collect { value ->
- * println("Received $value")
- * }
- * println("My events are consumed successfully")
- * } catch (e: Throwable) {
- * println("Exception from the flow: $e")
- * }
- * ```
- */
- public suspend inline fun
Flow .collect(crossinline action: suspend (value: T) -> Unit): Unit = - collect(object : FlowCollector
{ - override suspend fun emit(value: T) = action(value)
- })
我个人认为reduce函数还是比较好理解的,它的基本公式如下:
flow.reduce { acc, value -> acc + value }
其中acc是累积值的意思,value则是当前值的意思。
也就是说,reduce函数会通过参数给我们一个Flow的累积值acc和一个Flow的当前值value ,我们可以在函数体中对它们进行一定的运算,运算的结果会作为下一个累积值继续传递到reduce函数当中。
从第一个元素开始累加值,并对当前累加器值和每个元素应用[操作]。
- /**
- * 从第一个元素开始累加值,并对当前累加器值和每个元素应用[操作]。
- * 如果流为空,抛出[NoSuchElementException]。
- */
- public suspend fun
Flow .reduce(operation: suspend (accumulator: S, value: T) -> S): S { - var accumulator: Any? = NULL
-
- collect { value ->
- accumulator = if (accumulator !== NULL) {
- @Suppress("UNCHECKED_CAST")
- operation(accumulator as S, value)
- } else {
- value
- }
- }
-
- if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
- @Suppress("UNCHECKED_CAST")
- return accumulator as S
- }
举一个更加具体点的例子,我们上学时学等差数列都会讲这个故事,高斯的老师让全班同学计算从1加到100的结果。今天我们不需要借助等差数列,只需要借助reduce函数就可以立刻算出结果了:
reduce函数是一个终端操作符函数,它的后面不可以再接其他操作符函数了,而是只能获取最终的运行结果。
-
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- runBlocking {
-
- val mFlow : Flow<Int> = flow(block={
- for (i in (1..100)) {
- emit(i)
- Log.e(TAG,"Flow 异步流冷流mFlow发射元素值i=$i --------mFlow构建器所在的上下文 : ${Thread.currentThread().name}")
- }
- })
- val result:Int = mFlow.reduce(operation={ acc: Int, value:Int->
- acc+value
- })
-
- Log.e(TAG,"返回最终计算结果result=$result")
-
- }
-
-
- /**
- * Flow 异步流冷流mFlow发射元素值i=1 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=2 --------mFlow构建器所在的上下文 : main
- * ...
- * Flow 异步流冷流mFlow发射元素值i=99 --------mFlow构建器所在的上下文 : main
- * Flow 异步流冷流mFlow发射元素值i=100 --------mFlow构建器所在的上下文 : main
- * 返回最终计算结果result=5050
- * */
-
- }
-
- }
- runBlocking {
- val mAsFlow:Flow<Int> = (1..100)
- .asFlow() //asFlow=flow+for
- val result:Int = mAsFlow.reduce(operation={ acc: Int, value:Int->
- acc+value
- })
- Log.e(TAG,"返回最终计算结果result=$result")
- }
-
- // 返回最终计算结果result=5050
从[initial]值开始累加值,并应用[operation]当前累加器值和每个元素
fold函数和reduce函数基本上是完全类似的,
主要的区别在于,fold函数需要传入一个初始值initial,这个初始值initial会作为首个累积值被传递到fold的函数体当中,它的基本公式如下:
flow.fold(initial) { acc, value -> acc + value }
注意:其实reduce函数和fold函数并不是只能用作数值计算,相反它们可以作用于任何类型的数据。
- /**
- * 从[initial]值开始累加值,并应用[operation]当前累加器值和每个元素
- */
- public suspend inline fun
Flow .fold( - initial: R,
- crossinline operation: suspend (acc: R, value: T) -> R
- ): R {
- var accumulator = initial
- collect { value ->
- accumulator = operation(accumulator, value)
- }
- return accumulator
- }
- class FlowActivity : ComponentActivity() {
-
- override fun onCreate(savedInstanceState: Bundle?){
-
- }
-
- }
终端操作符,等待一个且仅等待一个值发出。
- /**
- * 终端操作符,等待一个且仅等待一个值发出。
- * 为空流抛出[NoSuchElementException],为流抛出[IllegalStateException]
- * 包含多个元素的。
- */
- public suspend fun
Flow .single(): T { - var result: Any? = NULL
- collect { value ->
- require(result === NULL) { "Flow has more than one element" }
- result = value
- }
-
- if (result === NULL) throw NoSuchElementException("Flow is empty")
- return result as T
- }
终端操作符,返回流发出的第一个元素,然后取消流的收集。
- /**
- * 终端操作符,返回流发出的第一个元素,然后取消流的收集。
- * 如果流为空,则抛出[NoSuchElementException]。
- */
- public suspend fun
Flow .first(): T { - var result: Any? = NULL
- collectWhile {
- result = it
- false
- }
- if (result === NULL) throw NoSuchElementException("Expected at least one element")
- return result as T
- }