• 将回调函数转为Flow


    一、前言

    在kotlin中,语言将程序进行了结构化处理,提高了可读性,对于旧的程序逻辑也提供了转换操作,这里记录下如何将回调转为Flow流,优化程序结构
    
    • 1

    二、代码示例

    1、callbackFlow

    这里演示callbackFlow的使用方式。callbackFlow属于多次回调可以重复触发,由于内容不是使用Channel进行通信,所以可以使用Channel的相关函数。

       interface Listener{
            fun listener()
            fun end()
        }
    inner class TouchModel{
            private var listener: Listener ?= null
            fun registerListener(sourceListener: Listener){
                listener = sourceListener
            }
            fun unregisterListener(){
                listener = null
            }
    
            fun emit(){
                listener?.listener()
            }
            fun end(){
                listener?.end()
            }
        }
       @Test
        fun test(){
            val model = TouchModel()
            runBlocking {
    
                val flow = flowFrom(model)
    
                flow.onEach {
                    println("YM--->流:$it")
                }.launchIn(this)
                delay(1000)
                model.emit()
                delay(1000)
                model.emit()
                delay(1000)
                model.emit()
                delay(1000)
                println("YM--->流即将结束")
                model.end()
                delay(1000)
    
            }
        }
        //callbackFlow属于多次回调可以重复触发,由于内容不是使用Channel进行通信,所以可以使用Channel的相关函数
        fun flowFrom(model: TouchModel): Flow<Int> = callbackFlow {
            var count = 0
            val callback = object : Listener{
                override fun listener() {
    //  为了避免阻塞,channel可以配置缓冲通道,这个暂时不知道怎么处理
    //                trySend(count)//这两种方式都行
                        trySendBlocking(count)
                            .onFailure { throwable ->
                                // Downstream has been cancelled or failed, can log here
                            }
                        count++
                }
    
                override fun end() {
                    //当执行结束后可以使用以下方式关闭channel,或者抛出异常,该参数可选,
    //                channel.close(IllegalStateException("这个状态不对"))
    //                close(IllegalStateException("这个状态不对"))
    //                channel.close() 等同于  close()
                    println("YM--->Channel关闭")
                    close()
                }
            }
            model.registerListener(callback)
            //因为是冷流,所以需要使用awaitClose进行挂起阻塞
            awaitClose {
                //关闭注册
                println("YM--->解除注册")
                model.unregisterListener()
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    2、suspendCancellableCoroutine

    如果对于单次回调的的话。可以使用suspendCancellableCoroutine进行处理。示例代码如下:

          interface Listener{
            fun listener()
            fun end()
        }
    
        inner class TouchModel{
            private var listener: Listener ?= null
            fun registerListener(sourceListener: Listener){
                listener = sourceListener
            }
            fun unregisterListener(){
                listener = null
            }
    
            fun emit(){
                listener?.listener()
            }
            fun end(){
                listener?.end()
            }
        }
           @Test
        fun test(){
                  val model = TouchModel()
            runBlocking {
    //            val flow = flowFrom(model)
                val job = async {
                    val flow = awaitCallback(model)
                    println("YM--->流:$flow")
                }
    //            delay(1000)
    //            model.emit()
                delay(1000)
                println("YM--->流即将结束")
                model.end()
    //            job.cancel()//该流是可以撤销的,假若里面任务还没有结束,这个任务可以直接撤销
                delay(1000)
            }
        }
      suspend fun awaitCallback(model: TouchModel): Int = suspendCancellableCoroutine { continuation ->
            val callback = object : Listener { // Implementation of some callback interface
                override fun listener() {
                    continuation.resume(0){//协程恢复时候使用
                        continuation.resumeWithException(it)
                    }
    //                continuation.resumeWithException(cause)
           println("YM---->isActive:${continuation.isActive}--->isCancel:${continuation.isCancelled}")
                }
    
                override fun end() {
                    continuation.cancel()
                }
            }
            // Register callback with an API
            model.registerListener(callback)
            // Remove callback on cancellation
            continuation.invokeOnCancellation {
                println("YM---->挂起关闭")
                model.unregisterListener()
            }
            // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    可以看到,执行一次就直接终止了,需要注意的是倘若任务没有执行完,直接进行continuation.cancel()。那么就会执行continuation.invokeOnCancellation函数。倘若,已经执行完再次进行continuation.cancel()。则不会执行continuation.invokeOnCancellation

    3、CompletableDeferred

    这个也可以监听将回调函数进行转换,如下:

    class CompletableDeferredTest {
        val response = CompletableDeferred<Int>()
        @Test
        fun test(){
            request(response)
            runBlocking {
                val result = response.await()
                println("YM---->结果:${result}")
    //            response.cancel() //如果在结果返回前执行撤销,那么就会触发CompletableDeferred.invokeOnCompletion()函数
                delay(4000)
            }
        }
    
         fun request(rep: CompletableDeferred<Int>){
    
             Thread{//这里用线程而不用协程主要是想证明这个函数不需要协程环境就可以执行
                 Thread.sleep(1000)//延迟两秒模拟请求
                 rep.complete(2)
             }.start()
    //         rep.completeExceptionally(IllegalStateException("非法状态异常"))//这个可以抛出异常
             rep.invokeOnCompletion {
                 if (rep.isCancelled) {
                     println("Call cancelled")
                 }
             }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    三、参考链接

    1. callbackFlow

    2. 使用更为安全的方式收集 Android UI 数据流

    3. Kotlin–suspendCancellableCoroutine和suspendCoroutine的区别及使用_Th.one的博客-CSDN博客

    4. CoroutineScope - CompletableDeferred cancellation

  • 相关阅读:
    三步,金蝶K3的数据可视化了
    npm排错记录
    α微管蛋白研究丨SYSY α微管蛋白抗体案例分析
    trustZone学习
    电商小程序实战教程-订单管理
    STM32学习笔记:驱动SPI外设读写FLASH
    【线性代数】MIT Linear Algebra Lecture 4: Factorization into A = LU
    x64内核实验3-页机制的研究(1)
    SpringBoot整合Minio文件存储
    汇编语言与接口技术笔记(持续更新)
  • 原文地址:https://blog.csdn.net/Mr_Tony/article/details/126119816