需要知道的是,因为Java虚拟机不支持协程,所以Kotlin的协程跑在Java还是通过线程来实现的。
通过kotlin编译成的class文件,我们可以看到,协程本质还是通过不同的线程池来实现不同的任务。我们通过Dispatchers类,可以看到协程中出创建的线程池。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
跟随源码,我们可以看到MainCoroutineDispatcher的实现
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
}
由以上代码可以得出,Dispatcher.Main实际上就是一个主线程的Handler,把任务切换到主线程是去执行
再看一下Default的实现
private var coroutineScheduler = createScheduler()
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
internal class CoroutineScheduler(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor,Closeable
可以看到Dispatchers.default本质上是一个自定义的线程池。根据应用场景的不同,和Dispatcher.IO适用于不同的工作
协程通过状态机实现协程的挂起与恢复,以下是一段协程代码
fun onCreate() {
GlobalScope.launch {
val result = getContent()
Log.e("test", "onCreate: result = $result")
}
}
suspend fun getContent(): String {
return withContext(Dispatchers.Default) {
Thread.sleep(3000)
"xxx"
}
}
我们通过tools–>kotlin–>show kotlin bytecode,然后Decompile,可以看到生成的java代码
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
// 状态机状态的值
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
// 挂起状态
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0:
// 首次进来,默认为0
ResultKt.throwOnFailure($result);
GobalTest var4 = GobalTest.this;
// 此时更新状态
this.label = 1;
// 执行挂起函数
var10000 = var4.getContent(this);
// 如果挂起,直接返回挂起值
if (var10000 == var3) {
return var3;
}
break;
case 1:
// 第二次进来时,走这,退出循环
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// 第二次执行打印语句
String result = (String)var10000;
Log.e("test", "onCreate: result = " + result);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
以上是onCreate部分,我们可以看到,成员变量label代表状态机的状态,当函数运行到挂起点的时候,label的值会发生变化,如果没有发生挂起,则函数继续执行,如果发生了挂起,则函数就直接return,直到协程恢复,再次调到invokeSuspend方法,此时lebal已经发生了变化,所以直接执行case 1的逻辑。
我们再看下getContent()方法
@Nullable
public final Object getContent(@NotNull Continuation $completion) {
return BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
Thread.sleep(3000L);
return "xxx";
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), $completion);
}
点进withContext方法,我们可以看到他的返回正是COROUTINE_SUSPENDED
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
...
coroutine.getResult()
}
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
那么既然挂起了,何时恢复呢,我们看到BaseContinuationImpl这个类,看下resumewith的方法
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
// 这个 completion 可能是父协程的协程体
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 调用协程体
val outcome = invokeSuspend(param)
// 如果挂起直接返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
// 执行恢复逻辑
completion.resumeWith(outcome)
return
}
}
}
}
}
对于getContent方法来讲,没有挂起逻辑,所以执行完invokeSuspend方法后,没有return,当再走到completion.resumeWith方法的时候,getContent的逻辑已经执行完了,接下来就是恢复父协程的操作了、
也就是第二次调用父协程状态机的invokeSuspend方法,此时label已经为1,就执行打印逻辑了