• Kotlin 协程二三事:入门


    例子(Retrofit网络请求)

    1. 定义接口
    2. 发起请求
    3. 结果回调
    4. 数据展示

    常规请求

    public interface xxApiService {
        @POST("/xx/queryxx")
        Call<xxBean> queryxx();
    }
    
    • 1
    • 2
    • 3
    • 4
    ServiceFactory.newService(url, xxApiService::class.java)	//封装的Retrofit实例
    						.queryPublishxx()
      					.enqueue(object : Callback<xxBean> {
                    override fun onResponse(call: Call<xxBean>, response: Response<xxBean>) {
                        if (ActivityUtils.checkActivityAlive(view?.getActivity())) {
                            updateUI()
                        }
                    }
    
                    override fun onFailure(call: Call<xxBean>, t: Throwable) {
                    }
                })
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    基于RxJava

    public interface xxApiService {
        @POST("/xx/queryxx")
        Observable<xxBean> queryxx();
    }
    
    • 1
    • 2
    • 3
    • 4
    ServiceFactory.newService(url, xxApiService::class.java)
                .queryxx(body)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : SubscriberImpl<xxBean>(
                    RequestOptions.create(view?.getActivity()).loading(true)) {
                    override fun onSuccess(xxBean: xxBean) {
                        super.onSuccess(xxBean)
                      
                        if (ActivityUtils.checkActivityAlive(view?.getActivity())) {
                            updateUI()
                        }
    
                    }
    
                    override fun onFailure(respCode: Int, errMsg: String) {
                        super.onFailure(respCode, errMsg)
                    }
                })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    基于协程

    interface xxApiService {
        @POST("/xx/queryxx")
        suspend fun queryxx(): xxBean
    }
    
    • 1
    • 2
    • 3
    • 4
    viewModelScope.launch {
                try {
                    val xxBean = ServiceFactory.newKTXService(BASE_URL, xxService::class.java).queryxx()
                  updateUI(xxBean)
                  
                  val xxBean = ServiceFactory.newKTXService(BASE_URL, xxApiService::class.java).queryxx()
                  updateUI(xxBean)
                  
                  
                } catch (e: Exception) {
                    e.printStackTrace()
                   
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    为什么使用协程

    1. 异步代码像同步代码一样展示,逻辑更清晰

    协程不会阻塞线程,写法上跟同步代码一致,但是并不是单线程

    1. 解决”回调地狱“,代码更加简洁

    2. 线程切换、事件管理更加方便

    为什么不用RxJava?学习成本高,使用起来也并不如协程方便

    协程是什么

    顾名思义,协程的协就是协作,协程是一种非抢占式的或者说协作式的程序并发调度的实现。程序可以主动挂起或者恢复执行,协程更加轻量级,他的调度在用户态就可以搞定,不需要映射成内核线程这么重的资源。

    协程并不局限于语言,它是一种编程思想,在其他语言也有实现,例:Go

    用户态:程序的运行空间(内核态:Linux内核的运行空间)

    挂起/恢复:执行异步程序与结束(线程的切换与切回)

    轻量级:协程的调度是轻量级,但是在Android上异步操作的实现依旧使用了线程

    协程的引入

        // 标准库
        implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
        // 协程基础库
        implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1"
        // 协程 Android 库,提供 Android UI 调度器
        implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.1'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    参考地址:https://github.com/Kotlin/kotlinx.coroutines

    协程的上下文与调度器

    我们可以通过GlobalScope.launch来启动一个协程,协程体就是大括号里的部分。协程的代码都是写在协成体里面的

            GlobalScope.launch {
                // do something...
            }
    
    • 1
    • 2
    • 3

    分析launch代码,我们可以看到实际上他调用的是CoroutineScope.launch (/ˌkərʊ:ˈtɪ:n/ ) 扩展方法,因为GlobalScope是CoroutineScope接口的实现类

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,	// 启动模式
        block: suspend CoroutineScope.() -> Unit	// 不带返回值的函数
    ): Job
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其中context就是上下文,上下文可以携带参数,拦截协程执行等等,一般情况下我们不关心上下文的实现,使用现成的即可。

    上下文一个重要的作用就是线程切换,它包含一个线程调度器CoroutineDispatcher。kotlin提供了四种默认的调度器

    调度器线程特性
    Main主线程(UI线程)UI操作
    Default子线程(线程池)适合 CPU 密集型的任务,比如计算
    IO子线程(线程池)针对磁盘和网络 IO 进行了优化,适合 IO 密集型的任务,比如:读写文件,操作数据库以及网络请求
    Unconfined启动协程是哪个线程就是什么线程

    使用方式

            GlobalScope.launch(Dispatchers.Main) {
              // do something or not...
                GlobalScope.launch(Dispatchers.IO) {
                    // do something or not...
                }
                // do something or not...
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    协程的启动模式

    start就是协程的启动模式,kotlin提供了4种启动模式

    启动模式特性
    DEFAULT立即开始调度
    LAZY在需要的时候调用(start,join,await)
    ATOMIC立即开始调度,在第一个挂起点前不能被取消
    UNDISPATCHED在当前线程立即开始调度,直到遇到第一个挂起点,后面取决于调度器

    使用方式(其他的都是正常启动,所以这里以LAZY举例)

            val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
                // do something...
            }
            job.start()
            // job.join()  // join是挂起函数,需要在协程体中调用,join使用方式与线程相同 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    或者

            val deferred = GlobalScope.async(start = CoroutineStart.LAZY) {
                // do something...
            }
            deferred.await()	// await是挂起函数,同样需要在协程体中调用
    
    • 1
    • 2
    • 3
    • 4

    之前的协程都是串行执行任务,但是假如我们需要并行执行任务,并且需要返回值的时候,就需要用到async、await,如下

            GlobalScope.launch {
                val deferred1 = async {
                    // do something...
                    1
                }
    
                val deferred2 = async {
                    // do something
                    2
                }
              
                Log.e("Test", "result = ${deferred1.await() + deferred2.await()}")
            }
    
    GlobalScope.launch {
                val deferred1 = async {
                    // do something...
                    1
                }
    
                val deferred2 = async {
                    // do something
                    2
                }
     
    val result =   deferred1.await();
      result = result + result;
              
                Log.e("Test", "result = ${deferred1.await() + deferred2.await()}")
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    async的实现基本与launch一样,区别在返回上

    public fun <T> CoroutineScope.async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T	// 可携带返回值的函数
    ): Deferred<T>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    async构造函数的block返回值是泛型,launch构造函数的block是Unit,也就是无返回值。Deffrred也是Job的子类,但是它通过await返回协程的返回值。

    如果串行的情况下需要返回值,可以使用withContext

        suspend fun getResult() = withContext(Dispatchers.IO) {
          	// do something...
            1
        }
    
    • 1
    • 2
    • 3
    • 4

    suspend:修饰挂起函数,只能在协程体中执行。如果自己直接用suspend修饰函数不是挂起函数,需要用到kotlin协程框架自带的suspend函数才行

    withContext与async:都可以携带返回值,withContext是串行的,async是并行的。我们一般使用withContext实现上下文的切换

    launch:串行,不携带返回值

    协程的异常处理

    正常情况下,我们处理异常都是通过onFailure回调,那么协程怎么处理呢。先看一个把Callback转协程的例子

    Callback转协程

    suspend fun queryPublishInfoDetail() = suspendCoroutine<Any> { continuation ->
        ServiceFactory.newService(ConfigUrl.BASE_URL, SaleApiService::class.java)
            .queryPublishInfoDetail()
            .enqueue(object : Callback<SalePublishBean> {
                override fun onResponse(call: Call<SalePublishBean>, response: Response<SalePublishBean>) {
                    continuation.resume(response.body)
                }
    
                override fun onFailure(call: Call<SalePublishBean>, t: Throwable) {
                    continuation.resumeWithException(t)
                }
            })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    suspendCoroutine:是将当前执行流挂起,在适合的时机将协程恢复执行(适合的时机:resume)

    continuation:执行流

    通过try catch捕获作用域内异常

            GlobalScope.launch {
                try {
                    val queryPublishInfoDetail = queryPublishInfoDetail()
                    // do something...
                } catch (e: Exception) {
                    // do something if error...
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    可以看出,协程通过抛出Exception来处理异常,我们可以通过catch住错误来进行异常情况的处理

    全局异常处理CoroutineExceptionHandler

    需要注意的是:无法使用 try-catch 去捕获 launch 和 async 作用域的异常,比如以下代码

            try {
                GlobalScope.launch {
                    throw NullPointerException("hha")
                }
            } catch (e: Exception) {
                Log.e("test", "error = ${e.message}")
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    依然会造成程序的崩溃,所以我们可以通过以下方式进行处理

            GlobalScope.launch(CoroutineExceptionHandler { coroutineContext, throwable ->
                Log.e("test", "error = ${throwable.message}")
            }) {
                throw NullPointerException("hha")
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    协程的作用域

    先看一个例子,来区分出父协程与子协程

            val parentJob = GlobalScope.launch {
              
                val childJob = GlobalScope.launch {
                }
    
                val brotherJob = GlobalScope.launch {
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    协程的作用域中异常的传递是默认是双向的,具体的表现是

    • 父协程发生异常,所有子协程都会取消
    • 子协程发生异常,会导致父协程取消,同同时导致兄弟协程也取消

    这就是协同作用域,除了协同作用域,还有主从作用域、顶级作用域。从列表看一下他们的区别

    作用域类型产生方式异常传播机制
    顶级作用域通过GlobalScope启动,不继承外部作用域不向外部传播全局协程作用域,生命周期和应用一样,一般不使用,或者自定义作用域,及时取消。不受外部作用域影响。如以上代码父子作用域相互之间完全不影响
    协同作用域Job嵌套,coroutineScope获取当前作用域实现双向传播一荣俱荣,一损俱损。当子协程出现问题的时候,直接影响父协程。父协程取消,子协程也会取消
    主从作用域通过supervisorScope启动,与内部子协程主动,与外部协程协同自上而下,单向传播父协程取消,子协程也会取消。子协程出问题,不影响父协程
     GlobalScope.launch {
                supervisorScope {
                    coroutineScope {
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里从作用域分析协程的异常处理机制

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UH5274Th-1656228878120)(/Users/miaoxinxin/Desktop/协程入门/未捕获异常处理机制.png)]

    协程的取消机制

    协程通过Cancel来取消

            val job = GlobalScope.launch {
                getUser("one1go")
            }
    
            job.cancel()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    正确的Callback转协程

    这里与之前的并没有什么区别,只是关键字换成suspendCancellableCoroutine,表示协程可以被外部作用域取消

    suspend fun queryPublishInfoDetail() = suspendCancellableCoroutine<Any> { continuation ->
        ServiceFactory.newService(ConfigUrl.BASE_URL, SaleApiService::class.java)
            .queryPublishInfoDetail()
            .enqueue(object : Callback<SalePublishBean> {
                override fun onResponse(call: Call<SalePublishBean>, response: Response<SalePublishBean>) {
                    continuation.resumeWith(response.body)
                }
    
                override fun onFailure(call: Call<SalePublishBean>, t: Throwable) {
                    continuation.resumeWithException(t)
                }
            })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    其他

    为什么suspend修饰的函数不一定是挂起函数

    看一个例子

        private val TAG = "MainActivity"
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            lifecycleScope.launch {
                test()
            }
        }
    
        suspend fun test(): Any {
            Log.e(TAG, "test: ")
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    通过show kotlin bytecode看下编译后的java代码

       @Nullable
       public final Object test(@NotNull Continuation $completion) {
          Log.e(this.TAG, "test: ");
          return Unit.INSTANCE;
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    以及test的引用

             @Nullable
             public final Object invokeSuspend(@NotNull Object $result) {
                Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(this.label) {
                case 0:
                   ResultKt.throwOnFailure($result);
                   MainActivity var10000 = MainActivity.this;
                   this.label = 1;
                   if (var10000.test(this) == var2) {
                      return var2;
                   }
                   break;
                case 1:
                   ResultKt.throwOnFailure($result);
                   break;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
    
                return Unit.INSTANCE;
             }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    可以看到通过比较test的返回和IntrinsicsKt.getCOROUTINE_SUSPENDED()是否相同来判断协程是否挂起

    public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
    
    • 1

    所以可以得出,我们把函数返回这个值,就可以挂起了

        suspend fun test(): Any {
            Log.e(TAG, "test: ")
            return kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
        }
    
    • 1
    • 2
    • 3
    • 4

    反编译后可得

       @Nullable
       public final Object test(@NotNull Continuation $completion) {
          Log.e(this.TAG, "test: ");
          return IntrinsicsKt.getCOROUTINE_SUSPENDED();
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    但是不建议这么做,因为函数这里只有挂起,没有恢复

    协程通过状态机来控制挂起,从label可以看出,case 0 判断挂起,执行完后会再调invokeSuspend, 此时走到了case 1返回结果。然后就是对结果进行处理的逻辑

    使用带生命周期的协程

    因为GlobalScope是全局协程,所以我们一般不使用它,Kotlin提供了带有生命周期的协程,我们可以使用lifecycleScope、viewModelScope,他们会在界面销毁或者viewmodel的clear时及时的取消

            lifecycleScope.launch {}
            lifecycleScope.launchWhenCreated {  }
            lifecycleScope.launchWhenStarted {  }
            lifecycleScope.launchWhenResumed {  }
    
            viewModelScope.launch{}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    简单看一下lifecycleScope的构造

    public val Lifecycle.coroutineScope: LifecycleCoroutineScope
        get() {
            while (true) {
                val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
                if (existing != null) {
                    return existing
                }
                val newScope = LifecycleCoroutineScopeImpl(
                    this,
                    SupervisorJob() + Dispatchers.Main.immediate
                )
                if (mInternalScopeRef.compareAndSet(null, newScope)) {
                    newScope.register()
                    return newScope
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    他是一个主从作用域并且在主线程,所以他可以在里面直接操作ui,我们执行完网络请求后直接可以更新界面。ViewModelScope是一样的

    基础的网络请求封装

    我在优信拍封装了一个简单的基于协程的网络请求框架,基于Retrofit,在viewmodel和Activity中都可以使用

    fun <T : Any> CoroutineScope.normalRequest(dsl: RequestAction<T>.() -> Unit) {
        val action = RequestAction<T>().apply(dsl)
    
        launch {
            try {
                if (action.showLoading) {
                    ActivityLifecycler.getInstance().list?.let {
                        showProgress(action.loadingCancelable)
                    }
                }
    
                action.request?.invoke()?.let {
                    if (action.showLoading) {
                        cancelProgress()
                    }
    
                    if (checkData(it)) {
                        action.onSuccess?.invoke(it)
                    } else {
                        action.onFail?.invoke(Exception())
                    }
                }
            } catch (e: Exception) {
                if (action.showLoading) {
                    cancelProgress()
                }
                action.onFail?.invoke(e)
            }
        }
    }
    
    fun cancelProgress() {
        val activityList = ActivityLifecycler.getInstance().list
        if (activityList?.isNotEmpty() == true) {
            LoadingDialogUtils.getInstance(activityList[activityList.size - 1]).cancelCommonProgressDialog()
        }
    }
    
    fun showProgress(loadingCancelable: Boolean) {
        val activityList = ActivityLifecycler.getInstance().list
        if (activityList?.isNotEmpty() == true) {
            LoadingDialogUtils.getInstance(activityList[activityList.size - 1]).showCommonProgressDialog(loadingCancelable)
        }
    }
    
    fun <T : Any> checkData(appBaseBean: AppBaseBean<T>): Boolean {
        return when (appBaseBean.code) {
            0 -> true
            1012, 1021 -> {
                // sessionId失效
                LoginManager.getInstance().doLogout()
                showSessionDialog(ActivityLifecycler.getInstance().currentActivity, appBaseBean.msg)
                false
            }
            else -> false
        }
    }
    
    class RequestAction<T> {
    
        var request: (suspend () -> AppBaseBean<T>?)? = null
        var onSuccess: ((data: AppBaseBean<T>?) -> Unit)? = null
        var onFail: ((e: Exception) -> Unit)? = null
        var showLoading: Boolean = true
        var loadingCancelable = false
    
        fun request(block: suspend () -> AppBaseBean<T>?) {
            request = block
        }
    
        fun onSuccess(block: (data: AppBaseBean<T>?) -> Unit) {
            onSuccess = block
        }
    
        fun onFail(block: (e: Exception) -> Unit) {
            onFail = block
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    使用

    viewModelScope.normalRequest<CarSourceListBean> {
    
                request {
                    val params = HashMap<String, String>()
                    params["req"] = mGson.toJson(carSourceRequestModel)
                    params["sessionId"] = UserSettings.instance(BaseApp.getInstance()).sessionId
                    ServiceFactory.newKTXService(CarListConstants.BASE_URL, CarListApiService::class.java)
                            .getCarSourceList(HeaderUtil.getHeaders(params),params)
                }
    
                onSuccess {
                  // do something...
                }
    
                onFail {
                    // do something...
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    总结起来一句话,让异步代码像同步代码一样展示

  • 相关阅读:
    Yocto - 使用Yocto开发嵌入式Linux系统_11 调试Yocto项目
    【js逆向爬虫】-有道翻译js逆向实战
    Cloudera Manager-6.2.0安装文档
    从求职惨败到阿里35w+offer,这位双非硕的经历太励志!
    基于Springboot的漫画网站springboot022
    精神心理科医生:抑郁症正在好转的5种表现
    【nginx代理postgresql】
    数字化助力生产管理:报工与跟踪管理系统
    【数据集的制作】VOC2007数据集格式的转换(voc2yolo)与划分
    删除集合中的指定元素A.discard(B)A.remove(B)
  • 原文地址:https://blog.csdn.net/one1go/article/details/125470487