• Kotlin协程 - launch原理 笔记


    一、协程是如何创建的?

    launch、async 可以创建、启动新的协程,那么协程到底是如何创建的?

    1. runBlocking {
    2. println(Thread.currentThread().name)
    3. launch {
    4. println(Thread.currentThread().name)
    5. delay(100L)
    6. }
    7. Thread.sleep(1000L)
    8. }
    9. Log
    10. main @coroutine#1
    11. main @coroutine#2
    12. Process finished with exit code 0

     runBlocking{} 启动了第一个协程,launch{} 启动了第二个协程。

    1.协程启动的基础 API

    1. public fun (suspend () -> T).createCoroutine(
    2. completion: Continuation
    3. ): Continuation<Unit> =
    4. SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
    5. public fun (suspend () -> T).startCoroutine(
    6. completion: Continuation
    7. ) {
    8. createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    9. }

    createCoroutine{}、startCoroutine{}就是 Kotlin 协程当中最基础的两个创建协程的 API。启动协程有三种常见的方式:launch、runBlocking、async。它们其实属于协程中间层提供的 API,而它们的底层都调用了“基础层”的协程 API。

    createCoroutine{}、startCoroutine{}是扩展函数,其扩展接收者类型是一个函数类型:suspend () -> T,代表了“无参数,返回值为 T 的挂起函数或者 Lambda”。而对于函数本身,它们两个都接收一个 Continuation 类型的参数,其中一个函数,还会返回一个 Continuation 类型的返回值。

    1. val block = suspend {
    2. println("Hello")
    3. delay(1000L)
    4. println("World!")
    5. "Result"
    6. }
    7. fun testLaunch2() {
    8. val continuation = object : Continuation {
    9. override val context: CoroutineContext
    10. get() = EmptyCoroutineContext
    11. override fun resumeWith(result: Result<String>) {
    12. println("Result:" + result.getOrNull())
    13. }
    14. }
    15. block.startCoroutine(continuation)
    16. }
    17. fun main() {
    18. testLaunch2()
    19. Thread.sleep(2000L)
    20. }
    21. Log
    22. Hello
    23. World!
    24. Result:Result
    25. Process finished with exit code 0

    类型为 suspend () -> T的函数或者 Lambda 表达式可以用 block.startCoroutine() 来启动协程了。

    Continuation 有两种用法,一种是在实现挂起函数的时候,用于传递挂起函数的执行结果;另一种是在调用挂起函数的时候,以匿名内部类的方式,用于接收挂起函数的执行结果。

    使用 createCoroutine() 这个方法其实上面代码的逻辑:

    1. fun testLaunch3() {
    2. val continuation = object : Continuation {
    3. override val context: CoroutineContext
    4. get() = EmptyCoroutineContext
    5. override fun resumeWith(result: Result<String>) {
    6. println("Result:" + result.getOrNull())
    7. }
    8. }
    9. val coroutinue = block.createCoroutine(continuation)
    10. coroutinue.resume(Unit)
    11. }
    12. val block = suspend {
    13. println("Hello")
    14. delay(1000L)
    15. println("World!")
    16. "Result"
    17. }
    18. fun main() {
    19. testLaunch3()
    20. Thread.sleep(2000L)
    21. }
    22. Log
    23. Hello
    24. World!
    25. Result:Result
    26. Process finished with exit code 0

     createCoroutine() 创建一个协程,先不启动。调用 resume() 才能启动。 createCoroutine()、startCoroutine() 的源代码差别也并不大,只是前者没有调用 resume(),而后者调用了 resume()。startCoroutine() 之所以可以创建并同时启动协程的原因就在于,它在源码中直接调用了 resume(Unit)。

    将 startCoroutine()转换为Java:

    1. package com.example.myapplication.testcoroutinue;
    2. import kotlin.Metadata;
    3. import kotlin.Result;
    4. import kotlin.ResultKt;
    5. import kotlin.Unit;
    6. import kotlin.coroutines.Continuation;
    7. import kotlin.coroutines.ContinuationKt;
    8. import kotlin.coroutines.CoroutineContext;
    9. import kotlin.coroutines.EmptyCoroutineContext;
    10. import kotlin.coroutines.intrinsics.IntrinsicsKt;
    11. import kotlin.jvm.functions.Function1;
    12. import kotlin.jvm.internal.Intrinsics;
    13. import kotlinx.coroutines.DelayKt;
    14. import org.jetbrains.annotations.NotNull;
    15. import org.jetbrains.annotations.Nullable;
    16. @Metadata(
    17. mv = {1, 6, 0},
    18. k = 2,
    19. d1 = {"\u0000\u001e\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0000\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\b\u001a\u00020\t\u001a\u0006\u0010\n\u001a\u00020\t\",\u0010\u0000\u001a\u0018\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u0002\u0012\u0006\u0012\u0004\u0018\u00010\u00040\u0001ø\u0001\u0000¢\u0006\n\n\u0002\u0010\u0007\u001a\u0004\b\u0005\u0010\u0006\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u000b"},
    20. d2 = {"block", "Lkotlin/Function1;", "Lkotlin/coroutines/Continuation;", "", "", "getBlock", "()Lkotlin/jvm/functions/Function1;", "Lkotlin/jvm/functions/Function1;", "main", "", "testLaunch2", "My_Application.app.main"}
    21. )
    22. public final class TestCoroutinue888Kt {
    23. // Kotlin 为 block 变量生成的静态变量
    24. @NotNull
    25. private static final Function1 block;
    26. public static final void main() {
    27. testLaunch2();
    28. Thread.sleep(2000L);
    29. }
    30. // $FF: synthetic method
    31. public static void main(String[] var0) {
    32. main();
    33. }
    34. // Kotlin 为 block 变量生成的静态变量以及方法
    35. @NotNull
    36. public static final Function1 getBlock() {
    37. return block;
    38. }
    39. public static final void testLaunch2() {
    40. //continuation 变量对应的匿名内部类
    41. continuation = new Continuation() {
    42. @NotNull
    43. public CoroutineContext getContext() {
    44. return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
    45. }
    46. public void resumeWith(@NotNull Object result) {
    47. String var2 = "Result:" + (String)(Result.isFailure-impl(result) ? null : result);
    48. System.out.println(var2);
    49. }
    50. };
    51. //block.startCoroutine(continuation) 转换成了ContinuationKt.startCoroutine(block, (Continuation)continuation)
    52. ContinuationKt.startCoroutine(block, (Continuation)continuation);
    53. }
    54. static {
    55. //实现了 Continuation 接口
    56. Function1 var0 = (Function1)(new Function1((Continuation)null) {
    57. int label;
    58. //invokeSuspend()为协程状态机逻辑
    59. @Nullable
    60. public final Object invokeSuspend(@NotNull Object $result) {
    61. Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    62. String var2;
    63. switch(this.label) {
    64. case 0:
    65. ResultKt.throwOnFailure($result);
    66. var2 = "Hello";
    67. System.out.println(var2);
    68. this.label = 1;
    69. if (DelayKt.delay(1000L, this) == var3) {
    70. return var3;
    71. }
    72. break;
    73. case 1:
    74. ResultKt.throwOnFailure($result);
    75. break;
    76. default:
    77. throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    78. }
    79. var2 = "World!";
    80. System.out.println(var2);
    81. return "Result";
    82. }
    83. @NotNull
    84. public final Continuation create(@NotNull Continuation completion) {
    85. Intrinsics.checkNotNullParameter(completion, "completion");
    86. Function1 var2 = new (completion);
    87. return var2;
    88. }
    89. public final Object invoke(Object var1) {
    90. return (()this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
    91. }
    92. });
    93. block = var0;
    94. }
    95. }

     

    1. public fun (suspend () -> T).startCoroutine(
    2. completion: Continuation
    3. ) {
    4. createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    5. }

    在 startCoroutine() 当中,首先会调用 createCoroutineUnintercepted() 方法。

    1. public expect fun (suspend () -> T).createCoroutineUnintercepted(
    2. completion: Continuation
    3. ): Continuation<Unit>

     代码中的 expect,一种声明,由于 Kotlin 是面向多个平台的,具体的实现,就需要在特定的平台实现。

    1. public actual fun (suspend () -> T).createCoroutineUnintercepted(
    2. completion: Continuation
    3. ): Continuation<Unit> {
    4. val probeCompletion = probeCoroutineCreated(completion)
    5. return if (this is BaseContinuationImpl)
    6. create(probeCompletion)
    7. else
    8. createCoroutineFromSuspendFunction(probeCompletion) {
    9. (this as Function1, Any?>).invoke(it)
    10. }
    11. }

      actual,代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。

    createCoroutineUnintercepted() 是一个扩展函数,this代表了 block 变量。(this is BaseContinuationImpl) 条件为ture,就会调用 create(probeCompletion)。

    1. public open fun create(completion: Continuation<*>): Continuation<Unit> {
    2. throw UnsupportedOperationException("create(Continuation) has not been overridden")
    3. }

    在默认情况下,这个 create() 方法是会抛出异常的。

    1. @NotNull
    2. public final Continuation create(@NotNull Continuation completion) {
    3. Intrinsics.checkNotNullParameter(completion, "completion");
    4. Function1 var2 = new (completion);
    5. return var2;
    6. }

    返回了Continuation 对象。

    1. public fun (suspend () -> T).startCoroutine(
    2. completion: Continuation
    3. ) {
    4. createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    5. }

     intercepted() 在 JVM 实现如下:

    1. public actual fun Continuation.intercepted(): Continuation =
    2. (this as? ContinuationImpl)?.intercepted() ?: this

    将 Continuation 强转成了 ContinuationImpl,调用了它的 intercepted()。

    ContinuationImpl 的源代码:

    1. internal abstract class ContinuationImpl(
    2. completion: Continuation?,
    3. private val _context: CoroutineContext?
    4. ) : BaseContinuationImpl(completion) {
    5. @Transient
    6. private var intercepted: Continuation? = null
    7. public fun intercepted(): Continuation =
    8. intercepted
    9. ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
    10. .also { intercepted = it }
    11. }

     

    通过 ContinuationInterceptor,对 Continuation 进行拦截,从而将程序的执行逻辑派发到特定的线程之上。

     resume(Unit):

    1. public fun (suspend () -> T).startCoroutine(
    2. completion: Continuation
    3. ) {
    4. createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    5. }

      resume(Unit),作用其实就相当于启动了协程。

    二、launch 是如何启动协程的?

    1. fun main() {
    2. testLaunch11()
    3. Thread.sleep(2000L)
    4. }
    5. fun testLaunch11() {
    6. val coroutineScope = CoroutineScope(Job())
    7. coroutineScope.launch {
    8. println("Hello")
    9. delay(1000L)
    10. println("World!")
    11. }
    12. }
    13. Log
    14. Hello
    15. World!
    16. Process finished with exit code 0

    转Java

    1. package com.example.myapplication.testcoroutinue;
    2. import kotlin.Metadata;
    3. import kotlin.ResultKt;
    4. import kotlin.Unit;
    5. import kotlin.coroutines.Continuation;
    6. import kotlin.coroutines.CoroutineContext;
    7. import kotlin.coroutines.intrinsics.IntrinsicsKt;
    8. import kotlin.jvm.functions.Function2;
    9. import kotlin.jvm.internal.Intrinsics;
    10. import kotlinx.coroutines.BuildersKt;
    11. import kotlinx.coroutines.CoroutineScope;
    12. import kotlinx.coroutines.CoroutineScopeKt;
    13. import kotlinx.coroutines.CoroutineStart;
    14. import kotlinx.coroutines.DelayKt;
    15. import kotlinx.coroutines.Job;
    16. import kotlinx.coroutines.JobKt;
    17. import org.jetbrains.annotations.NotNull;
    18. import org.jetbrains.annotations.Nullable;
    19. @Metadata(
    20. mv = {1, 6, 0},
    21. k = 2,
    22. d1 = {"\u0000\n\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\u0000\u001a\u00020\u0001\u001a\u0006\u0010\u0002\u001a\u00020\u0001¨\u0006\u0003"},
    23. d2 = {"main", "", "testLaunch11", "My_Application.app.main"}
    24. )
    25. public final class TestCoroutinue999Kt {
    26. public static final void main() {
    27. testLaunch11();
    28. Thread.sleep(2000L);
    29. }
    30. // $FF: synthetic method
    31. public static void main(String[] var0) {
    32. main();
    33. }
    34. public static final void testLaunch11() {
    35. CoroutineScope coroutineScope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));
    36. //对应 launch 当中的 Lambda。
    37. BuildersKt.launch$default(coroutineScope, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    38. int label;
    39. @Nullable
    40. public final Object invokeSuspend(@NotNull Object $result) {
    41. Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    42. String var2;
    43. switch(this.label) {
    44. case 0:
    45. ResultKt.throwOnFailure($result);
    46. var2 = "Hello";
    47. System.out.println(var2);
    48. this.label = 1;
    49. if (DelayKt.delay(1000L, this) == var3) {
    50. return var3;
    51. }
    52. break;
    53. case 1:
    54. ResultKt.throwOnFailure($result);
    55. break;
    56. default:
    57. throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    58. }
    59. var2 = "World!";
    60. System.out.println(var2);
    61. return Unit.INSTANCE;
    62. }
    63. @NotNull
    64. public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
    65. Intrinsics.checkNotNullParameter(completion, "completion");
    66. Function2 var3 = new (completion);
    67. return var3;
    68. }
    69. public final Object invoke(Object var1, Object var2) {
    70. return (()this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
    71. }
    72. }), 3, (Object)null);
    73. }
    74. }

    launch源码

    1. public fun CoroutineScope.launch(
    2. context: CoroutineContext = EmptyCoroutineContext,
    3. start: CoroutineStart = CoroutineStart.DEFAULT,
    4. block: suspend CoroutineScope.() -> Unit
    5. ): Job {
    6. //launch 会根据传入的 CoroutineContext 创建出新的 Context。
    7. val newContext = newCoroutineContext(context)
    8. //launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。
    9. val coroutine = if (start.isLazy)
    10. LazyStandaloneCoroutine(newContext, block) else
    11. StandaloneCoroutine(newContext, active = true)
    12. //启动协程。
    13. coroutine.start(start, coroutine, block)
    14. return coroutine
    15. }

      coroutine.start() :

    1. public abstract class AbstractCoroutine<in T>(
    2. parentContext: CoroutineContext,
    3. initParentJob: Boolean,
    4. active: Boolean
    5. ) : JobSupport(active), Job, Continuation, CoroutineScope {
    6. public fun start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    7. start(block, receiver, this)
    8. }
    9. }

    AbstractCoroutine.kt 对应协程的抽象逻辑。AbstractCoroutine 的 start() 方法,用于启动协程。 

    1. public enum class CoroutineStart {
    2. public operator fun invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
    3. when (this) {
    4. DEFAULT -> block.startCoroutineCancellable(completion)
    5. ATOMIC -> block.startCoroutine(completion)
    6. UNDISPATCHED -> block.startCoroutineUndispatched(completion)
    7. LAZY -> Unit // will start lazily
    8. }
    9. }

    start(block, receiver, this),进入 CoroutineStart.invoke()。

     invoke() 方法当中,根据 launch 传入的启动模式,以不同的方式启动协程。当启动模式是 ATOMIC 的时候,就会调用 block.startCoroutine(completion)。startCoroutineUndispatched(completion) 和 startCoroutineCancellable(completion),只是在 startCoroutine() 的基础上增加了一些额外的功能而已。前者代表启动协程以后就不会被分发,后者代表启动以后可以响应取消。

    startCoroutineCancellable(completion)

    1. public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) {
    2. createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
    3. }
    4. public actual fun (suspend () -> T).createCoroutineUnintercepted(
    5. completion: Continuation
    6. ): Continuation<Unit> {
    7. val probeCompletion = probeCoroutineCreated(completion)
    8. return if (this is BaseContinuationImpl)
    9. create(probeCompletion)
    10. else
    11. createCoroutineFromSuspendFunction(probeCompletion) {
    12. (this as Function1, Any?>).invoke(it)
    13. }
    14. }

    startCoroutineCancellable() 的源代码,会调用 createCoroutineUnintercepted(),然后调用 create(probeCompletion),然后最终会调用create() 方法。launch 这个 API,只是对协程的基础元素 startCoroutine() 等方法进行了一些封装而已。

  • 相关阅读:
    若依vue中字典Dict插件的研究
    利用PyTorch训练模型识别数字+英文图片验证码
    做SaaS的程序员们,是时候关注企业架构了
    java计算机毕业设计进出货管理系统源码+mysql数据库+系统+lw文档+部署
    NSTextField如何实现字体居中
    【开源】历史学习网站 JAVA+Vue.js+SpringBoot+MySQL
    万界星空科技MES系统中的车间管理的作用
    Himall商城Web帮助类获得请求客户端的操作系统名称、判断是否是浏览器请求、是否是移动设备请求、判断是否是搜索引擎爬虫请求
    java牛客笔试题练习日记 - 集合篇
    微信小程序获取用户信息
  • 原文地址:https://blog.csdn.net/zhangying1994/article/details/127880236