• Kotlin 协程 (7/7篇) - 在Android中的使用


    一、使用场景

    1.1 LiveData 还是 StateFlow

    LiveData 问题StateFlow 解决
    粘性事件:按下Button弹出Toast,当配置改变例如屏幕旋转时,页面会销毁后重建,观察者将再次订阅LiveData,此时会再次弹出Toast。一样存在粘性事件问题。(可以使用SharedFlow,它默认回放=0,额外缓存=0)。
    数据不防抖:更新的值和当前值相同,onChange()依然会再次调用。可以使用Transformations的distinctUntilChanger()、SingleLiveEvent解决。会判断更新的值与当前值是否相同,相同则不更新。
    只处理最终值:在一个UI刷新周期内连续postValue()更新数据,只会显示最后一次的内容。我们希望每个更新的每一个值(事件)都会刷新UI,而LiveData会丢弃中间值只处理最终值(状态)。(setValue不存在该情况)一样只关注状态,只持有1个最新值。(可使用SharedFlow处理事件)
    只在主线程处理:更新值的函数都是在主线程,回调也是在主线程。协程随意切换线程。
    生命周期感知:只在界面活跃状态下(Start和Resume状态)接收通知,非活跃状态更新UI无意义浪费资源。在UI销毁时(Destroy状态)自动取消订阅避免内存泄漏。StateFlow执行在协程中,lifecycleScope会在UI销毁时结束协程。launchWhenX会在进入X状态前等待、进入后执行、离开后挂起,如果内部订阅了数据流,挂起只是停止消费数据,没有取消协程是无法阻止被订阅的数据流继续活跃生产数据。repeatOnLifecycle会在离开X状态时取消协程,再次进入X状态再重开协程,即围绕X状态的进出多次重新执行代码。
    没有默认值:创建实例未赋值,被调用会异常。构造必须传入初始化值,null安全,符合页面必须有初始状态的逻辑。

    1.2 Suspend 还是 Flow

    SuspendFlow
    值的数量一次性异步调用(单值)。数据流(多个值)。
    场景一次性数据,例如文章内容。

    ①在Repository中合并多个数据源,这些数据可能随时发生变化。

    ②数据随时变化需要观察,例如Room等。

    1.3 哪种数据流 

    FlowStateFlowSharedFlowChannel
    类型冷流:数据只能在创建对象的时候定义生产方式。热流:数据可以后期发送到流中。
    数据的生产消费时才会生产数据。不消费也会生产数据。
    数据的接收完整:收到的是全部发送的数据最新:只持有单个且最新的值,只能拿到订阅之后的数据,适用于状态。历史:可以缓存多个值,可以拿到订阅之前的历史数据,适用于事件。
    关系独立:多个订阅者彼此之间独立。共享:多个订阅者同时接收,收到的值相同。互斥:多个订阅者轮流接收,收到的值不是同一个值。。
    关闭流会自动关闭(停止订阅或者数据数据生产完)。构造创建的不会自动关闭,转换的启动模式配置为WhileSubscribed会超时关闭。构造函数创建的不会自动关闭,协程构建器创建的会跟随协程关闭。

    二、相互转换

    2.1 改造回调API

    2.1.1 Callback → Suspend(suspendCancellableCoroutine) 

    对于已有的调用了回调的函数可以改造成挂起函数,onSuccess()中调用resume()返回数据,onFailure()中调用resumeWithException()返回异常。suspendCoroutine适用于改造SAM回调。

    public suspend inline fun suspendCancellableCoroutine(
        crossinline block: (CancellableContinuation) -> Unit
    ): T 
    1. //1.回调接口
    2. interface NetCallback<T> {
    3. fun onSuccess(t: T)
    4. fun onError(excpption: Exception)
    5. }
    6. //2.在Model中封装回调API(联网获取数据)
    7. fun getData(callback: NetCallback<String>) {
    8. Thread {
    9. Thread.sleep(1000) //模拟联网操作
    10. callback.onSuccess("数据")
    11. callback.onError(Exception("错误"))
    12. }.run()
    13. }
    14. //3.以前在ViewModel中封装功能
    15. fun show() {
    16. getData(object : NetCallback {
    17. override fun onSuccess(t: String) {...}
    18. override fun onError(excpption: Exception) {...}
    19. })
    20. }
    21. //4.现在在ViewModel中封装功能
    22. suspend fun showWithSuspend():String = suspendCancellableCoroutine { cancellableContinuation ->
    23. getData(object : NetCallback {
    24. override fun onSuccess(t: String) { cancellableContinuation.resume(t)}
    25. override fun onError(excpption: Exception) {cancellableContinuation.resumeWithException(excpption)}
    26. })
    27. }

    2.1.2 Callback → Flow(callbackFlow)

    底层使用的sendChannel,默认容量64满了会挂起直到消费出空位,为了避免生产被挂起可以配置为CONFLATED或者UNLIMITED

    fun callbackFlow(block: suspend ProducerScope.() -> Unit): Flow
    send ()发送数据。
    offer ()允许在协程外提交。
    sendBlocking ()尝试用offer,失败则用runBlocking{ send() }阻塞式提交。
    awaitClose ()Flow关闭时执行,用来释放资源(注销回调函数),未调用报错 IllegalStateException。
    1. fun showWithFlow(): Flow<Int> = callbackFlow {
    2. //1.创建Flow并发送值(实现Callback接口)
    3. val callback = object: NetCallback {
    4. override fun onNextValue(value: Int) {
    5. try {
    6. offer(num)
    7. } catch(t: Throwable) {...}
    8. }
    9. override fun onError(ecxeption: Throwable) {
    10. cancel(CancellationException("API发生错误", exception))
    11. }
    12. override fun onCompleted() = close()
    13. }
    14. //2.注册回调(传参使用,并对API进行配置操作)
    15. getData(callback)
    16. //3.取消协程并注销回调(用来释放API资源)
    17. awaitClose {...}
    18. }

    2.2 数据转换到LiveData

    2.2.1 Suspend → LiveData(liveData{ })

    public fun liveData(
        context: CoroutineContext = EmptyCoroutineContext,//区块所执行的协程上下文,默认+Main.immediate。
        timeoutInMs: Long = DEFAULT_TIMEOUT,        //没有观察者后,多少毫秒后取消区块,默认5s。
        @BuilderInference block: suspend LiveDataScope.() -> Unit        //区块在LiveData被观察的时候执行
    ): LiveData
    emit ()提交新值
    emitSource ()

    订阅其它LiveData,从中获取值。

    .latestValue获取最新提交的值
    1. class MyViewModel : ViewModel() {
    2. //直接调用挂起函数赋值
    3. val num1: LiveData<Int> = liveData { emit(getData()) }
    4. //还可以指定线程,单独写耗时操作
    5. val num2: LiveData<Int> = liveData(Dispatchers.IO) {
    6. emit(5)
    7. delay(1000)
    8. emit(3)
    9. }
    10. //从另一个LivaData获取更新结果
    11. val aa = MutableLiveData(10)
    12. val bb = liveData{ emitSource(aa) }
    13. suspend fun getData(): Int = withContext(Dispatchers.IO) { 3 }
    14. }

    2.2.2 Flow → LiveData(asLiveData)

    public fun Flow.asLiveData(
        context: CoroutineContext = EmptyCoroutineContext,//区块所执行的协程上下文,默认+Main.immediate
        timeoutInMs: Long = DEFAULT_TIMEOUT        //没有观察者后,多少毫秒后取消区块,默认5s
    ): LiveData
    1. class MyViewModel : ViewModel() {
    2. val num1: LiveData<Int> = liveData { DataSource().getDataFlow.collect { emit(it) } }
    3. val num2: LiveData<Int> = DataSource().getDataFlow.asLiveData() //简写
    4. }
    5. class DataSource {
    6. val getDataFlow: Flow<Int> = flow { repeat(3) { emit(it) } }
    7. }

    2.3 流之间转换

    2.3.1  Channel → Flow

    public fun ReceiveChannel.receiveAsFlow(): Flow

    流能被多次消费。

    public fun ReceiveChannel.consumeAsFlow(): Flow

    流只能被消费一次。

    1. fun main(): Unit = runBlocking {
    2. val channel = Channel<Int>()
    3. val receiveAsFlow = channel.receiveAsFlow()
    4. val consumeAsFlow = channel.consumeAsFlow()
    5. launch { receiveAsFlow.collect { print("$it,") } }
    6. repeat(3) { channel.send(it) }
    7. channel.close()
    8. }

    2.3.2 Flow → Channel

    public fun Flow.produceIn(
        scope: CoroutineScope
    ): ReceiveChannel
    1. fun main(): Unit = runBlocking {
    2. val receiveChannel = (1..3).asFlow().produceIn(this)
    3. repeat(3) { print("$i,") }
    4. }

    2.3.3 Flow → SharedFlow

    public fun Flow.shareIn(
        scope: CoroutineScope,        //数据共享时所在的协程作用域
        started: SharingStarted,        //启动策略
        replay: Int = 0        //回放,新订阅时得到几个之前已经发射过的旧值。
    ): SharedFlow

    2.3.4 Flow → StateFlow

    public fun Flow.stateIn(
        scope: CoroutineScope,        //数据共享时所在的协程作用域
        started: SharingStarted,        //启动策略
        initialValue: T        //默认值
    ): StateFlow

    public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow {
        val config = configureSharing(1)
        val result = CompletableDeferred>()
        scope.launchSharingDeferred(config.context, config.upstream, result)
        return result.await()
    }

    挂起函数版本,不用指定默认值,会挂起直到产出第一个值。

    三、View生命周期

    通常需要在UI层收集数据流以便显示更新,当UI进入后台不可见时,数据流也会持续发送不停止(除非手动取消协程)。如使用了基于Channel收集的:CoroutineScope.launch、Flow.launchIn、LifecycleCoroutineScope.launchWhenX,或使用了带缓存操作符的Flow:buffer、conflate、flowIn、shareIn。

    收集单个Flow使用Flow.flowWithLifecyle( )
    收集多个流热流使用Lifecyle.repeatOnLifecyle( )
    不搜集流使用LifecycleScope.launchWhenX( )

    3.1 Flow.flowWithLifecyle( )

    当只有一个Flow需要收集时使用该操作符,内部基于Lifecyle.repeatOnLifecyle()实现。生命周期低于目标状态会取消上游,不影响下游,需要注意调用顺序。

    public fun Flow.flowWithLifecycle(
        lifecycle: Lifecycle,
        minActiveState: Lifecycle.State = Lifecycle.State.STARTED
    ): Flow = callbackFlow {
        lifecycle.repeatOnLifecycle(minActiveState) {
            this@flowWithLifecycle.collect {
                send(it)
            }
        }
        close()
    }

    3.2  Lifecyle.repeatOnLifecyle( )

    1. 在生命周期到达目标状态时,挂起调用它的协程。
    2. 会在离开目标状态时取消子协程,再次进入目标状态会重开子协程,即围绕目标状态的进出多次重新执行代码。
    3. 进入Destroy状态才会恢复调用它的协程。
    public suspend fun Lifecycle.repeatOnLifecycle(
        state: Lifecycle.State,//可选状态:INITIALIZED、CREATED、STARTED、RESUMED、DESTROYED
        block: suspend CoroutineScope.() -> Unit
    )
    1. lifecycleScope.launch {
    2. //会挂起之前的代码
    3. lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
    4. //重复的工作
    5. }
    6. //当进入onDestroy()状态协程会恢复执行后面的代码
    7. }

    3.3 LifecycleScope.launchWhenX( )

    会在进入X状态前等待、进入后执行、离开后挂起。如果内部订阅了数据流,挂起只是停止消费数据,没有取消协程,无法阻止被订阅的数据流继续活跃生产数据,由于会造成资源浪费已被废弃,使用上面的 3.2 替代。

    public fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job
    public fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job
    public fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

    3.4 配置生产者

    MutableSharedFlow

    MutableStateFlow

    持有数据流实例的对象还在内存中,它们就会保持生产者的活跃状态。控制字段subscriptionCount为0的时候,内部的生产者就会停止。

    Flow.shareIn

    Flow.stateIn

    对形参启动策略配置为WhileSubscriibed()会在没有活跃订阅者时停止内部生产者。而无论是配置成Eagerly还是Lazy只要使用的协程作用域还处于活跃状态内部生产者就会保持活跃。
  • 相关阅读:
    SpringBoot 中到底如何解决跨域问题?
    新点面试题
    序列图怎么画,也就是顺序图
    (LdAiChat、Ai Loading、不墨AI助手、360AI搜索、TIG AI)分析好用的ChatGPT
    React_lazy使用-组件加载前loading做优雅降级
    http https socket rpc grpc有啥区别联系
    内网穿透的应用-如何在Docker中部署MinIO服务并结合内网穿透实现公网访问本地管理界面
    spring中事件的使用方法
    二叉树OJ
    Python环境管理工具virtualenv的安装使用教程(图文详解)
  • 原文地址:https://blog.csdn.net/HugMua/article/details/127128589