
| LiveData 问题 | StateFlow 解决 |
| 粘性事件:按下Button弹出Toast,当配置改变例如屏幕旋转时,页面会销毁后重建,观察者将再次订阅LiveData,此时会再次弹出Toast。 | 一样存在粘性事件问题。(可以使用SharedFlow,它默认回放=0,额外缓存=0)。 |
| 数据不防抖:更新的值和当前值相同,onChange()依然会再次调用。可以使用Transformations的distinctUntilChanger()、SingleLiveEvent解决。 | 会判断更新的值与当前值是否相同,相同则不更新。 |
| 只处理最终值:在一个UI刷新周期内连续postValue()更新数据,只会显示最后一次的内容。我们希望每个更新的每一个值(事件)都会刷新UI,而LiveData会丢弃中间值只处理最终值(状态)。(setValue不存在该情况) | 一样只关注状态,只持有1个最新值。(可使用SharedFlow处理事件) |
| 只在主线程处理:更新值的函数都是在主线程,回调也是在主线程。 | 协程随意切换线程。 |
| 生命周期感知:只在界面活跃状态下(Start和Resume状态)接收通知,非活跃状态更新UI无意义浪费资源。在UI销毁时(Destroy状态)自动取消订阅避免内存泄漏。 | StateFlow执行在协程中,lifecycleScope会在UI销毁时结束协程。launchWhenX会在进入X状态前等待、进入后执行、离开后挂起,如果内部订阅了数据流,挂起只是停止消费数据,没有取消协程是无法阻止被订阅的数据流继续活跃生产数据。repeatOnLifecycle会在离开X状态时取消协程,再次进入X状态再重开协程,即围绕X状态的进出多次重新执行代码。 |
| 没有默认值:创建实例未赋值,被调用会异常。 | 构造必须传入初始化值,null安全,符合页面必须有初始状态的逻辑。 |
| Suspend | Flow | |
| 值的数量 | 一次性异步调用(单值)。 | 数据流(多个值)。 |
| 场景 | 一次性数据,例如文章内容。 | ①在Repository中合并多个数据源,这些数据可能随时发生变化。 ②数据随时变化需要观察,例如Room等。 |

| Flow | StateFlow | SharedFlow | Channel | |
| 类型 | 冷流:数据只能在创建对象的时候定义生产方式。 | 热流:数据可以后期发送到流中。 | ||
| 数据的生产 | 消费时才会生产数据。 | 不消费也会生产数据。 | ||
| 数据的接收 | 完整:收到的是全部发送的数据 | 最新:只持有单个且最新的值,只能拿到订阅之后的数据,适用于状态。 | 历史:可以缓存多个值,可以拿到订阅之前的历史数据,适用于事件。 | |
| 关系 | 独立:多个订阅者彼此之间独立。 | 共享:多个订阅者同时接收,收到的值相同。 | 互斥:多个订阅者轮流接收,收到的值不是同一个值。。 | |
| 关闭 | 流会自动关闭(停止订阅或者数据数据生产完)。 | 构造创建的不会自动关闭,转换的启动模式配置为WhileSubscribed会超时关闭。 | 构造函数创建的不会自动关闭,协程构建器创建的会跟随协程关闭。 | |

对于已有的调用了回调的函数可以改造成挂起函数,onSuccess()中调用resume()返回数据,onFailure()中调用resumeWithException()返回异常。suspendCoroutine适用于改造SAM回调。
| public suspend inline fun crossinline block: (CancellableContinuation ): T |
- //1.回调接口
- interface NetCallback<T> {
- fun onSuccess(t: T)
- fun onError(excpption: Exception)
- }
-
- //2.在Model中封装回调API(联网获取数据)
- fun getData(callback: NetCallback<String>) {
- Thread {
- Thread.sleep(1000) //模拟联网操作
- callback.onSuccess("数据")
- callback.onError(Exception("错误"))
- }.run()
- }
-
- //3.以前在ViewModel中封装功能
- fun show() {
- getData(object : NetCallback
{ - override fun onSuccess(t: String) {...}
- override fun onError(excpption: Exception) {...}
- })
- }
-
- //4.现在在ViewModel中封装功能
- suspend fun showWithSuspend():String = suspendCancellableCoroutine { cancellableContinuation ->
- getData(object : NetCallback
{ - override fun onSuccess(t: String) { cancellableContinuation.resume(t)}
- override fun onError(excpption: Exception) {cancellableContinuation.resumeWithException(excpption)}
- })
- }
底层使用的sendChannel,默认容量64满了会挂起直到消费出空位,为了避免生产被挂起可以配置为CONFLATED或者UNLIMITED。
| fun |
| send () | 发送数据。 |
| offer () | 允许在协程外提交。 |
| sendBlocking () | 尝试用offer,失败则用runBlocking{ send() }阻塞式提交。 |
| awaitClose () | Flow关闭时执行,用来释放资源(注销回调函数),未调用报错 IllegalStateException。 |
- fun showWithFlow(): Flow<Int> = callbackFlow {
- //1.创建Flow并发送值(实现Callback接口)
- val callback = object: NetCallback {
- override fun onNextValue(value: Int) {
- try {
- offer(num)
- } catch(t: Throwable) {...}
- }
- override fun onError(ecxeption: Throwable) {
- cancel(CancellationException("API发生错误", exception))
- }
- override fun onCompleted() = close()
- }
- //2.注册回调(传参使用,并对API进行配置操作)
- getData(callback)
- //3.取消协程并注销回调(用来释放API资源)
- awaitClose {...}
- }
| public fun context: CoroutineContext = EmptyCoroutineContext,//区块所执行的协程上下文,默认+Main.immediate。 timeoutInMs: Long = DEFAULT_TIMEOUT, //没有观察者后,多少毫秒后取消区块,默认5s。 @BuilderInference block: suspend LiveDataScope ): LiveData |
| emit () | 提交新值 |
| emitSource () | 订阅其它LiveData,从中获取值。 |
| .latestValue | 获取最新提交的值 |
- class MyViewModel : ViewModel() {
- //直接调用挂起函数赋值
- val num1: LiveData<Int> = liveData { emit(getData()) }
- //还可以指定线程,单独写耗时操作
- val num2: LiveData<Int> = liveData(Dispatchers.IO) {
- emit(5)
- delay(1000)
- emit(3)
- }
- //从另一个LivaData获取更新结果
- val aa = MutableLiveData(10)
- val bb = liveData{ emitSource(aa) }
- suspend fun getData(): Int = withContext(Dispatchers.IO) { 3 }
- }
| public fun context: CoroutineContext = EmptyCoroutineContext,//区块所执行的协程上下文,默认+Main.immediate timeoutInMs: Long = DEFAULT_TIMEOUT //没有观察者后,多少毫秒后取消区块,默认5s ): LiveData |
- class MyViewModel : ViewModel() {
- val num1: LiveData<Int> = liveData { DataSource().getDataFlow.collect { emit(it) } }
- val num2: LiveData<Int> = DataSource().getDataFlow.asLiveData() //简写
- }
-
- class DataSource {
- val getDataFlow: Flow<Int> = flow { repeat(3) { emit(it) } }
- }
| public fun 流能被多次消费。 |
| public fun 流只能被消费一次。 |
- fun main(): Unit = runBlocking {
- val channel = Channel<Int>()
- val receiveAsFlow = channel.receiveAsFlow()
- val consumeAsFlow = channel.consumeAsFlow()
- launch { receiveAsFlow.collect { print("$it,") } }
- repeat(3) { channel.send(it) }
- channel.close()
- }
| public fun scope: CoroutineScope ): ReceiveChannel |
- fun main(): Unit = runBlocking {
- val receiveChannel = (1..3).asFlow().produceIn(this)
- repeat(3) { print("$i,") }
- }
| public fun scope: CoroutineScope, //数据共享时所在的协程作用域 started: SharingStarted, //启动策略 replay: Int = 0 //回放,新订阅时得到几个之前已经发射过的旧值。 ): SharedFlow |
| public fun scope: CoroutineScope, //数据共享时所在的协程作用域 started: SharingStarted, //启动策略 initialValue: T //默认值 ): StateFlow |
| public suspend fun 挂起函数版本,不用指定默认值,会挂起直到产出第一个值。 |
通常需要在UI层收集数据流以便显示更新,当UI进入后台不可见时,数据流也会持续发送不停止(除非手动取消协程)。如使用了基于Channel收集的:CoroutineScope.launch、Flow.launchIn、LifecycleCoroutineScope.launchWhenX,或使用了带缓存操作符的Flow:buffer、conflate、flowIn、shareIn。
| 收集单个Flow使用 | Flow.flowWithLifecyle( ) |
| 收集多个流、热流使用 | Lifecyle.repeatOnLifecyle( ) |
| 不搜集流使用 | LifecycleScope.launchWhenX( ) |
当只有一个Flow需要收集时使用该操作符,内部基于Lifecyle.repeatOnLifecyle()实现。生命周期低于目标状态会取消上游,不影响下游,需要注意调用顺序。
| public fun |
- 在生命周期到达目标状态时,挂起调用它的协程。
- 会在离开目标状态时取消子协程,再次进入目标状态会重开子协程,即围绕目标状态的进出多次重新执行代码。
- 进入Destroy状态才会恢复调用它的协程。
| public suspend fun Lifecycle.repeatOnLifecycle( state: Lifecycle.State,//可选状态:INITIALIZED、CREATED、STARTED、RESUMED、DESTROYED block: suspend CoroutineScope.() -> Unit ) |
- lifecycleScope.launch {
- //会挂起之前的代码
- lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
- //重复的工作
- }
- //当进入onDestroy()状态协程会恢复执行后面的代码
- }
会在进入X状态前等待、进入后执行、离开后挂起。如果内部订阅了数据流,挂起只是停止消费数据,没有取消协程,无法阻止被订阅的数据流继续活跃生产数据,由于会造成资源浪费已被废弃,使用上面的 3.2 替代。
| public fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job public fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job public fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job |
| MutableSharedFlow MutableStateFlow | 持有数据流实例的对象还在内存中,它们就会保持生产者的活跃状态。控制字段subscriptionCount为0的时候,内部的生产者就会停止。 |
| Flow.shareIn Flow.stateIn | 对形参启动策略配置为WhileSubscriibed()会在没有活跃订阅者时停止内部生产者。而无论是配置成Eagerly还是Lazy只要使用的协程作用域还处于活跃状态内部生产者就会保持活跃。 |