launch、async 可以创建、启动新的协程,那么协程到底是如何创建的?
- runBlocking {
- println(Thread.currentThread().name)
- launch {
- println(Thread.currentThread().name)
- delay(100L)
- }
- Thread.sleep(1000L)
- }
-
-
- Log
- main @coroutine#1
- main @coroutine#2
-
- Process finished with exit code 0
runBlocking{} 启动了第一个协程,launch{} 启动了第二个协程。
-
-
- public fun
(suspend () -> T).createCoroutine( - completion: Continuation
- ): Continuation<Unit> =
- SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
-
- public fun
(suspend () -> T).startCoroutine( - completion: Continuation
- ) {
- createCoroutineUnintercepted(completion).intercepted().resume(Unit)
- }
createCoroutine{}、startCoroutine{}就是 Kotlin 协程当中最基础的两个创建协程的 API。启动协程有三种常见的方式:launch、runBlocking、async。它们其实属于协程中间层提供的 API,而它们的底层都调用了“基础层”的协程 API。
createCoroutine{}、startCoroutine{}是扩展函数,其扩展接收者类型是一个函数类型:suspend () -> T,代表了“无参数,返回值为 T 的挂起函数或者 Lambda”。而对于函数本身,它们两个都接收一个 Continuation
- val block = suspend {
- println("Hello")
- delay(1000L)
- println("World!")
- "Result"
- }
-
- fun testLaunch2() {
- val continuation = object : Continuation
{ - override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<String>) {
- println("Result:" + result.getOrNull())
- }
- }
- block.startCoroutine(continuation)
- }
-
- fun main() {
- testLaunch2()
- Thread.sleep(2000L)
- }
-
- Log
- Hello
- World!
- Result:Result
-
- Process finished with exit code 0
类型为 suspend () -> T的函数或者 Lambda 表达式可以用 block.startCoroutine() 来启动协程了。
Continuation 有两种用法,一种是在实现挂起函数的时候,用于传递挂起函数的执行结果;另一种是在调用挂起函数的时候,以匿名内部类的方式,用于接收挂起函数的执行结果。
使用 createCoroutine() 这个方法其实上面代码的逻辑:
- fun testLaunch3() {
- val continuation = object : Continuation
{ - override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<String>) {
- println("Result:" + result.getOrNull())
- }
- }
- val coroutinue = block.createCoroutine(continuation)
- coroutinue.resume(Unit)
- }
-
-
- val block = suspend {
- println("Hello")
- delay(1000L)
- println("World!")
- "Result"
- }
-
- fun main() {
- testLaunch3()
- Thread.sleep(2000L)
- }
-
- Log
- Hello
- World!
- Result:Result
-
- Process finished with exit code 0
createCoroutine() 创建一个协程,先不启动。调用 resume() 才能启动。 createCoroutine()、startCoroutine() 的源代码差别也并不大,只是前者没有调用 resume(),而后者调用了 resume()。startCoroutine() 之所以可以创建并同时启动协程的原因就在于,它在源码中直接调用了 resume(Unit)。
将 startCoroutine()转换为Java:
- package com.example.myapplication.testcoroutinue;
-
- import kotlin.Metadata;
- import kotlin.Result;
- import kotlin.ResultKt;
- import kotlin.Unit;
- import kotlin.coroutines.Continuation;
- import kotlin.coroutines.ContinuationKt;
- import kotlin.coroutines.CoroutineContext;
- import kotlin.coroutines.EmptyCoroutineContext;
- import kotlin.coroutines.intrinsics.IntrinsicsKt;
- import kotlin.jvm.functions.Function1;
- import kotlin.jvm.internal.Intrinsics;
- import kotlinx.coroutines.DelayKt;
- import org.jetbrains.annotations.NotNull;
- import org.jetbrains.annotations.Nullable;
-
- @Metadata(
- mv = {1, 6, 0},
- k = 2,
- d1 = {"\u0000\u001e\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0000\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\b\u001a\u00020\t\u001a\u0006\u0010\n\u001a\u00020\t\",\u0010\u0000\u001a\u0018\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u0002\u0012\u0006\u0012\u0004\u0018\u00010\u00040\u0001ø\u0001\u0000¢\u0006\n\n\u0002\u0010\u0007\u001a\u0004\b\u0005\u0010\u0006\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u000b"},
- d2 = {"block", "Lkotlin/Function1;", "Lkotlin/coroutines/Continuation;", "", "", "getBlock", "()Lkotlin/jvm/functions/Function1;", "Lkotlin/jvm/functions/Function1;", "main", "", "testLaunch2", "My_Application.app.main"}
- )
- public final class TestCoroutinue888Kt {
- // Kotlin 为 block 变量生成的静态变量
- @NotNull
- private static final Function1 block;
-
- public static final void main() {
- testLaunch2();
- Thread.sleep(2000L);
- }
-
- // $FF: synthetic method
- public static void main(String[] var0) {
- main();
- }
-
- // Kotlin 为 block 变量生成的静态变量以及方法
-
- @NotNull
- public static final Function1 getBlock() {
- return block;
- }
-
- public static final void testLaunch2() {
- //continuation 变量对应的匿名内部类
-
continuation = new Continuation() { - @NotNull
- public CoroutineContext getContext() {
- return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
- }
-
- public void resumeWith(@NotNull Object result) {
- String var2 = "Result:" + (String)(Result.isFailure-impl(result) ? null : result);
- System.out.println(var2);
- }
- };
- //block.startCoroutine(continuation) 转换成了ContinuationKt.startCoroutine(block, (Continuation)continuation)
- ContinuationKt.startCoroutine(block, (Continuation)continuation);
- }
-
- static {
- //实现了 Continuation 接口
- Function1 var0 = (Function1)(new Function1((Continuation)null) {
- int label;
- //invokeSuspend()为协程状态机逻辑
-
- @Nullable
- public final Object invokeSuspend(@NotNull Object $result) {
- Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
- String var2;
- switch(this.label) {
- case 0:
- ResultKt.throwOnFailure($result);
- var2 = "Hello";
- System.out.println(var2);
- this.label = 1;
- if (DelayKt.delay(1000L, this) == var3) {
- return var3;
- }
- break;
- case 1:
- ResultKt.throwOnFailure($result);
- break;
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
-
- var2 = "World!";
- System.out.println(var2);
- return "Result";
- }
-
- @NotNull
- public final Continuation create(@NotNull Continuation completion) {
- Intrinsics.checkNotNullParameter(completion, "completion");
- Function1 var2 = new
(completion); - return var2;
- }
-
- public final Object invoke(Object var1) {
- return ((
)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); - }
- });
- block = var0;
- }
- }
-
-
-
- public fun
(suspend () -> T).startCoroutine( - completion: Continuation
- ) {
- createCoroutineUnintercepted(completion).intercepted().resume(Unit)
- }
在 startCoroutine() 当中,首先会调用 createCoroutineUnintercepted() 方法。
-
- public expect fun
(suspend () -> T).createCoroutineUnintercepted( - completion: Continuation
- ): Continuation<Unit>
代码中的 expect,一种声明,由于 Kotlin 是面向多个平台的,具体的实现,就需要在特定的平台实现。
-
- public actual fun
(suspend () -> T).createCoroutineUnintercepted( - completion: Continuation
- ): Continuation<Unit> {
- val probeCompletion = probeCoroutineCreated(completion)
- return if (this is BaseContinuationImpl)
- create(probeCompletion)
- else
- createCoroutineFromSuspendFunction(probeCompletion) {
- (this as Function1
, Any?>).invoke(it) - }
- }
actual,代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。
createCoroutineUnintercepted() 是一个扩展函数,this代表了 block 变量。(this is BaseContinuationImpl) 条件为ture,就会调用 create(probeCompletion)。
-
-
-
- public open fun create(completion: Continuation<*>): Continuation<Unit> {
- throw UnsupportedOperationException("create(Continuation) has not been overridden")
- }
在默认情况下,这个 create() 方法是会抛出异常的。
-
- @NotNull
- public final Continuation create(@NotNull Continuation completion) {
- Intrinsics.checkNotNullParameter(completion, "completion");
- Function1 var2 = new
(completion); - return var2;
- }
返回了Continuation 对象。
-
-
- public fun
(suspend () -> T).startCoroutine( - completion: Continuation
- ) {
-
- createCoroutineUnintercepted(completion).intercepted().resume(Unit)
- }
intercepted() 在 JVM 实现如下:
-
-
-
- public actual fun
Continuation .intercepted(): Continuation = - (this as? ContinuationImpl)?.intercepted() ?: this
将 Continuation 强转成了 ContinuationImpl,调用了它的 intercepted()。
ContinuationImpl 的源代码:
-
-
- internal abstract class ContinuationImpl(
- completion: Continuation
?, - private val _context: CoroutineContext?
- ) : BaseContinuationImpl(completion) {
-
- @Transient
- private var intercepted: Continuation
? = null -
- public fun intercepted(): Continuation
= - intercepted
- ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
- .also { intercepted = it }
- }
通过 ContinuationInterceptor,对 Continuation 进行拦截,从而将程序的执行逻辑派发到特定的线程之上。
resume(Unit):
-
-
- public fun
(suspend () -> T).startCoroutine( - completion: Continuation
- ) {
-
- createCoroutineUnintercepted(completion).intercepted().resume(Unit)
- }
resume(Unit),作用其实就相当于启动了协程。
- fun main() {
- testLaunch11()
- Thread.sleep(2000L)
- }
-
- fun testLaunch11() {
- val coroutineScope = CoroutineScope(Job())
- coroutineScope.launch {
- println("Hello")
- delay(1000L)
- println("World!")
- }
- }
-
- Log
- Hello
- World!
-
- Process finished with exit code 0
转Java
- package com.example.myapplication.testcoroutinue;
-
- import kotlin.Metadata;
- import kotlin.ResultKt;
- import kotlin.Unit;
- import kotlin.coroutines.Continuation;
- import kotlin.coroutines.CoroutineContext;
- import kotlin.coroutines.intrinsics.IntrinsicsKt;
- import kotlin.jvm.functions.Function2;
- import kotlin.jvm.internal.Intrinsics;
- import kotlinx.coroutines.BuildersKt;
- import kotlinx.coroutines.CoroutineScope;
- import kotlinx.coroutines.CoroutineScopeKt;
- import kotlinx.coroutines.CoroutineStart;
- import kotlinx.coroutines.DelayKt;
- import kotlinx.coroutines.Job;
- import kotlinx.coroutines.JobKt;
- import org.jetbrains.annotations.NotNull;
- import org.jetbrains.annotations.Nullable;
-
- @Metadata(
- mv = {1, 6, 0},
- k = 2,
- d1 = {"\u0000\n\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\u0000\u001a\u00020\u0001\u001a\u0006\u0010\u0002\u001a\u00020\u0001¨\u0006\u0003"},
- d2 = {"main", "", "testLaunch11", "My_Application.app.main"}
- )
- public final class TestCoroutinue999Kt {
- public static final void main() {
- testLaunch11();
- Thread.sleep(2000L);
- }
-
- // $FF: synthetic method
- public static void main(String[] var0) {
- main();
- }
-
- public static final void testLaunch11() {
- CoroutineScope coroutineScope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));
- //对应 launch 当中的 Lambda。
- BuildersKt.launch$default(coroutineScope, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
- int label;
-
- @Nullable
- public final Object invokeSuspend(@NotNull Object $result) {
- Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
- String var2;
- switch(this.label) {
- case 0:
- ResultKt.throwOnFailure($result);
- var2 = "Hello";
- System.out.println(var2);
- this.label = 1;
- if (DelayKt.delay(1000L, this) == var3) {
- return var3;
- }
- break;
- case 1:
- ResultKt.throwOnFailure($result);
- break;
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
-
- var2 = "World!";
- System.out.println(var2);
- return Unit.INSTANCE;
- }
-
- @NotNull
- public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
- Intrinsics.checkNotNullParameter(completion, "completion");
- Function2 var3 = new
(completion); - return var3;
- }
-
- public final Object invoke(Object var1, Object var2) {
- return ((
)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); - }
- }), 3, (Object)null);
- }
- }
launch源码
-
- public fun CoroutineScope.launch(
- context: CoroutineContext = EmptyCoroutineContext,
- start: CoroutineStart = CoroutineStart.DEFAULT,
- block: suspend CoroutineScope.() -> Unit
- ): Job {
- //launch 会根据传入的 CoroutineContext 创建出新的 Context。
- val newContext = newCoroutineContext(context)
- //launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。
- val coroutine = if (start.isLazy)
- LazyStandaloneCoroutine(newContext, block) else
- StandaloneCoroutine(newContext, active = true)
- //启动协程。
- coroutine.start(start, coroutine, block)
- return coroutine
- }
coroutine.start() :
-
- public abstract class AbstractCoroutine<in T>(
- parentContext: CoroutineContext,
- initParentJob: Boolean,
- active: Boolean
- ) : JobSupport(active), Job, Continuation
, CoroutineScope { -
-
-
- public fun
start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { - start(block, receiver, this)
- }
- }
AbstractCoroutine.kt 对应协程的抽象逻辑。AbstractCoroutine 的 start() 方法,用于启动协程。
-
- public enum class CoroutineStart {
- public operator fun
invoke(block: suspend () -> T, completion: Continuation<T>): Unit = - when (this) {
- DEFAULT -> block.startCoroutineCancellable(completion)
- ATOMIC -> block.startCoroutine(completion)
- UNDISPATCHED -> block.startCoroutineUndispatched(completion)
- LAZY -> Unit // will start lazily
- }
- }
start(block, receiver, this),进入 CoroutineStart.invoke()。
invoke() 方法当中,根据 launch 传入的启动模式,以不同的方式启动协程。当启动模式是 ATOMIC 的时候,就会调用 block.startCoroutine(completion)。startCoroutineUndispatched(completion) 和 startCoroutineCancellable(completion),只是在 startCoroutine() 的基础上增加了一些额外的功能而已。前者代表启动协程以后就不会被分发,后者代表启动以后可以响应取消。
startCoroutineCancellable(completion)
-
- public fun
(suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) { -
- createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
- }
-
- public actual fun
(suspend () -> T).createCoroutineUnintercepted( - completion: Continuation
- ): Continuation<Unit> {
- val probeCompletion = probeCoroutineCreated(completion)
-
- return if (this is BaseContinuationImpl)
-
- create(probeCompletion)
- else
- createCoroutineFromSuspendFunction(probeCompletion) {
- (this as Function1
, Any?>).invoke(it) - }
- }
startCoroutineCancellable() 的源代码,会调用 createCoroutineUnintercepted(),然后调用 create(probeCompletion),然后最终会调用create() 方法。launch 这个 API,只是对协程的基础元素 startCoroutine() 等方法进行了一些封装而已。