本文已同步发表于我的微信公众号,搜索
代码说
即可关注,欢迎与我沟通交流。
协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快。
我们知道线程是CPU
调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin
中,Dispatcher(内部是线程池或者Android主线程)
是调度器,可以调度协程运行在一个或多个线程
之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:
协程优势:
kotlin
中提供了许多语言功能,避免Java
中最常见的null
空指针等异常Java
,kotlin
可以使用更少的代码实现更多的功能。Java
语言无缝互通。即可以在 kotlin
代码中调用Java
代码,同时也可以在Java
代码中调用kotlin
代码。kotlin
代码本质上也是通过kotlin
编译器编译后生成VM
能识别的字节码。使用看似阻塞式的写法来实现异步功能
。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call)
和return
之外,协程添加了 suspend
非阻塞式挂起 和 resume
恢复:
suspend
用于暂停执行当前协程,并保存所有局部变量。resume
用于让已挂起的协程从挂起处恢复执行。注意suspend
函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function
的提示;如需调用 suspend
函数,只能从其他 suspend
函数进行调用,或通过使用协程构建器(例如 launch
)来启动新的协程。
suspend
函数相比于普通函数内部多了一个 Continuation
续体实例(Kotlin转成Java代码之后即可看到),suspend
函数中可以调用普通函数,但普通函数却不能调用suspend
函数。
suspend fun fetchDocs() { // Dispatchers.Main
val result = get("https://developer.android.com") // Dispatchers.IO for `get`
show(result) // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
suspend
修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java
中的Thread.sleep()
会阻塞当前线程,suspend
修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但主线程依然可以去执行协程之外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停(主线程依然是活跃的) -> 子线程执行完毕 -> 协程从挂起点恢复
如在上面的示例中,get()
在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get
会恢复已挂起的协程,继续执行show(result)
方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起
。
Kotlin
使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。
CPS变换 (Continuation-Passing-Style Transformation)
是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation
来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation
, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行。
Continuation
意为续体,只存在于挂起函数中。Kotlin
中当一个普通函数加上suspend
关键字之后,就成为了挂起函数,如:
private suspend fun suspendFuc() {}
我们对此函数进行反编译,查看对应的Java代码为:
private final Object suspendFuc(Continuation $completion) {
return Unit.INSTANCE;
}
可以看到Java
代码中系统帮我们多生成了一个Continuation
类型的参数,看下这个参数内部:
@SinceKotlin("1.3")
public interface Continuation {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result)
}
Continuation
是一个接口类型,表示在返回T类型值的挂起点
之后的延续,其中类型T代表的是原来函数的返回值类型。resumeWith
恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。
上述函数中加入一个delay
函数,delay
函数本身也是一个挂起函数:
private suspend fun suspendFuc(): String {
delay(1000)
return "value"
}
经过Tools -> Kotlin -> Show Kotlin Bytecode
反编译查看:
private final Object suspendFuc(Continuation var1) {
Object $continuation;
label20: {
if (var1 instanceof ) {
$continuation = ()var1;
if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
(()$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
//将要执行的Continuation逻辑传入ContinuationImpl中
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
//invokeSuspend()会在恢复协程挂起点时调用
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineBaseFragment.this.suspendFuc(this);
}
};
}
Object $result = (()$continuation).result;
//返回此状态意味着函数要被挂起
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
// 状态机逻辑,通过label进行分块执行
switch((()$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
(()$continuation).label = 1;
if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "value";
}
suspend
挂起函数经过Kotlin
编译器编译之后会进行CPS
变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1
,上述函数逻辑中,通过switch(label){}
状态机将逻辑分块执行,首先label = 0
,此时会先将label
置为1,接着调用了delay
挂起函数,返回Intrinsics.COROUTINE_SUSPENDED
意味着函数要被挂起;
当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result)
方法恢复执行,$result
是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。
此时label变为1
,ResultKt.throwOnFailure($result)
中通过 if (value is Result.Failure) throw value.exception
判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break
,执行到最后的return "value"
逻辑,这样整个方法的流程就运行完毕了。
总结:suspend
挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机
来运转的。
我们知道 Android
中的 Context
(子类有Application、Activity、Service
)可以获取应用资源,可以启动Activity
、Service
等。CoroutineContext
意为协程上下文,其作用可以类比 Context
,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称
等。
public interface CoroutineContext {
//从该上下文中返回具有给定[key]的元素 或 null
public operator fun get(key: Key): E?
//从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
public fun fold(initial: R, operation: (R, Element) -> R): R
//返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
//删除带有指定[key]的元素。
public fun minusKey(key: Key<*>): CoroutineContext
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
*/
public interface Key
//CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun get(key: Key): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
协程总是会运行在 CoroutineContext
类型的上下文中,其中plus
操作符被重写,多个CoroutineContext
相加时,会生成一个CombinedContext
( 内部可以理解成一个Element
都不同的链表);同时 CombinedContext
拥有 map
索引能力,集合中的每个元素都有一个唯一的 Key
。
Element
定义在CoroutineContext
内部,是它的内部接口。CoroutineContext
使用以下元素集定义协程的行为:
Job
:控制协程的生命周期。CoroutineDispatcher
:将工作分派到适当的线程。CoroutineName
:协程的名称,可用于调试。CoroutineExceptionHandler
:处理未捕获的异常。下面就来详细看看几种元素的使用。
Job
是协程的句柄。使用launch
或async
创建的协程都会返回一个Job
实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:
val job1 = GlobalScope.launch { }
val job2 = MainScope().launch { }
override fun onDestroy() {
super.onDestroy()
job1.cancel()
job2.cancel()
}
注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel
中使用viewModelScope
,LifecycleOwner
中使用lifecycleScope
,可以在各自的生命周期中自动关闭协程。
Job
常用的API
:
val job = GlobalScope.launch(
context = Job() + Dispatchers.Main,
start = CoroutineStart.LAZY) { // 逻辑部分
}
- job.start() // 对应 start = CoroutineStart.LAZY
- job.cancelAndJoin() //cancel() + join()
- job.cancel() // 取消
- job.isActive // 协程是否存活
- job.isCancelled // 协程是否被取消
- job.isCompleted //协程是否已经执行完毕
Job
有以下状态:
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (optional initial state) | false | false | false |
Active (default initial state) | true | false | false |
Completing (transient state) | true | false | false |
Cancelling (transient state) | false | false | true |
Cancelled (final state) | false | true | true |
Completed (final state) | false | true | false |
状态流转:
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
SupervisorJob的使用
val exceptionHandler =
CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
GlobalScope.launch(exceptionHandler) {
//子Job1
launch(SupervisorJob()) {
delay(200)
throw NullPointerException()
}
//子Job2
launch {
delay(300)
log("child2 execute successfully")
}
//子Job3
launch {
delay(400)
log("child3 execute successfully")
}
log("parent execute successfully")
}
注意子Job1
传入的是 SupervisorJob
,当发生异常时,兄弟协程(job2/job3
)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:
2022-08-30 23:31:30.142 E/TTT: parent execute successfully
2022-08-30 23:31:30.346 E/TTT: throwable:java.lang.NullPointerException
2022-08-30 23:31:30.446 E/TTT: child2 execute successfully
2022-08-30 23:31:30.545 E/TTT: child3 execute successfully
如果将job1
中的更换为默认时,执行结果为:
2022-08-30 23:31:30.142 E/TTT: parent execute successfully
2022-08-30 23:31:30.346 E/TTT: throwable:java.lang.NullPointerException
可以看到job1
中使用默认设置时,当job1
发生了异常,兄弟协程也会被取消。注意这里job2/job3
可以被取消是因为delay()
中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1
使用默认配置,job3
也不会被取消:
//子Job3
launch(Dispatchers.IO) {
Thread.sleep(400)
log("child3 execute successfully")
}
job3
中没有了对协程状态的判断,所以即使job3
被通知要取消协程了,依然会继续执行直到结束。
SuperviorJob源码浅析
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}
当子协程 job1/job2/job3
启动时,会和协程本身的 Job
形成父子关系。而当子协程抛异常时,父Job
的 childCancelled()
方法会被执行,且默认返回的是 true
,表示取消 父Job
及其所有 子Job
;如果 子Job
中使用的是 SuperviorJob
,childCancelled()
返回的是 false
,表示 父Job
及其未发生异常的 子Job
都不会受影响。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
将工作任务分派到适当的线程。
UI
线程中。Dispatchers.Main.immediate
: 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过Handler
转发到主线程IO密集型
任务,最多提交max(64, CPU核心数)
个任务执行。CPU密集
型任务, CoroutineScheduler
最多有corePoolSize
个线程被创建,corePoolSize
的取值为max(2, CPU核心数)
,即它会尽量的等于CPU
核心数ThreadLocal
保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。withContext
withContext()
可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext()
来确保每个函数都是主线程安全的,这意味着,可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。
协程的名称,可用于调试。
public interface CoroutineExceptionHandler : CoroutineContext.Element {
public companion object Key : CoroutineContext.Key
//当有未处理的异常发生时,该方法就会执行
public fun handleException(context: CoroutineContext, exception: Throwable)
}
CoroutineExceptionHandler
当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash
,可以通过自定义CoroutineExceptionHandler
来拦截异常,如下:
val exceptionHandler =
CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) { // do something }
自定义CoroutineExceptionHandler
调用的是如下方法:
public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
override fun handleException(context: CoroutineContext, exception: Throwable) =
handler.invoke(context, exception)
}
所以当发生异常时,会在handleException()
中执行我们传入的handler(CoroutineContext, Throwable)
方法。
CoroutineExceptionHandler使用示例:
val exceptionHandler =
CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) {
log("parent execute start")
val childExHandler =
CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
//子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
val childJob = launch(childExHandler) {
delay(1000)
log("child execute")
throw IllegalArgumentException("error occur")
}
childJob.join()
log("parent execute end")
}
执行结果:
2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur
可见虽然在子协程中也自定义了CoroutineExceptionHandler
,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job()
,则异常会在子协程中处理而不会往上传播:
//其他内容不变
val childJob = launch(childExHandler + SupervisorJob()) {
//其他内容不变
}
执行结果:
2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end
可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。
注:如果启动协程的是async
,则CoroutineExceptionHandler
中的handler
方法并不会马上执行,必须调用deffered.await()
时才会执行。
CoroutineScope
会跟踪它使用 launch
或 async
创建的所有协程。CoroutineScope
本身并不运行协程,但是通过CoroutineScope
可以保证对协程进行统一管理,避免发生内存泄漏等,
常见的CoroutineScope
:
job.cancel()
关闭,其生命周期与Application
一致。MainScope
的内部:public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main),可以看到MainScope
其实是通过SupervisorJob() + Dispatchers.Main
自定义的协程作用域,其内部运行在UI线程
且内部异常不会往上传播。GlobalScope
不一样,它会阻塞当前线程直到其内部所有子协程执行结束;LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)
使用的lifecycleScope
,以及ViewModel
中使用的ViewModelScope
,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。【1】官网:利用 Kotlin 协程提升应用性能
【2】官网:Android 上的 Kotlin 协程
【3】Kotlin挂起函数原理
【4】揭秘Kotlin协程中的CoroutineContext