• 【对比Java学Kotlin】协程-创建和取消



    v1.6 Kotlin 协程全景图:
    在这里插入图片描述

    一、创建协程

    截至 2022.05 月,Java 尚未在正式版中支持协程,而是通过织布机计划实验性的支持了虚拟线程,作用类似于协程。
    在 Java 里面,我们可以通过 Thread 类来创建一个线程:

    
    
    • 1

    Kotlin 中,我们怎么创建一个协程呢?
    有如下方法可以创建协程:runBlocking()、launch()、async()、produce()、actor(),我们一一介绍。

    launch

    我们先来创建一个最简单的协程:

    import kotlinx.coroutines.GlobalScope
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    
    fun main() {
        val job : Job = GlobalScope.launch {
            delay(1000)
            println("World!")
        }
        print("Hello, ")
        Thread.sleep(1500)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    我们逐行解读代码。

    首先是 GlobalScope。GlobalScope 是协程作用域(CoroutineScope)的一种。每个协程必须归属于某个 CoroutineScope,称为“结构性并发”(Structured Concurrency),不仅 Kotlin,Swift 也通过 Task 支持了结构性并发。

    有了 CoroutineScope,我们可以很方便的对作用域下注册的所有协程进行统一管理,比如将其全部取消执行等。除了 GlobalScope,还有为安卓设计的 MainScope 以及用 CoroutineScope.coroutineScope() 构造的自定义作用域。借助 MainScope,我们可以在 Activity 销毁时取消执行协程,从而避免内存泄漏。

    launch 是 CoroutineScope 的扩展函数,其完整的声明是:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
    
    • 1
    • 2
    • 3
    • 4
    • 5

    launch() 是最常见的创建协程的方法,返回一个 Job,可以通过 Job.cancel() 来终止该 Job 下所有的协程,其效果跟 CoroutineScope 是一样的,其实 CoroutineScope.cancel() 也是委托给 Job.cancel() 来实现的。
    通过 launch() 我们就启动了一个协程,虽然里面只有两行代码:

        val job : Job = GlobalScope.launch {
            delay(1000)
            println("World!")
        }
    
    • 1
    • 2
    • 3
    • 4

    为了方便组织代码,我们可以将上述两行代码封装成一个方法。注意,如果该方法是耗时方法,则需要在前面加上 suspend 关键字:

    fun main() {
        val job : Job = GlobalScope.launch {
            printDelayedWorld()
        }
        print("Hello, ")
        Thread.sleep(1500)
    }
    
    suspend fun printDelayedWorld() {
        delay(1000)
        println("World!")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    runBlocking

    最后需要注意的是 Thread.sleep(1500) 这行代码:

    fun main() {
        val job : Job = GlobalScope.launch {
            delay(1000)
            println("World!")
        }
        print("Hello, ")
        Thread.sleep(1500)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这行代码是必现要有的,如果没有这行代码,上面的程序只能打印出 “Hello, ”,而无法打印出“World!”,因为在执行完print("Hello, ")之后,整个进程就结束了,里面的所有线程也就被强制结束了,运行在线程之上的协程也就无法执行了。我们可以将上述代码进行如下改写,也能达到相同的目的:

    fun main() = runBlocking {
        val job : Job = GlobalScope.launch {
            delay(1000)
            println("World!")
        }
        print("Hello, ")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    async & await

    除了 launch(),我们还有其他协程构造器,比如 async()、produce()、actor(),其中 async() 方法不会立即启动一个协程,而是在调用 await() 方法时才会启动,需要注意的是 await() 是同步的,比如下面代码中 println("Completed in $time ms") 必须等待2个 await() 执行完毕后再执行:

    import kotlinx.coroutines.*
    import kotlin.system.measureTimeMillis
    
    fun main() = runBlocking(CoroutineName("main")) {
        val time = measureTimeMillis {
            val one = async(CoroutineName("v1")) { one() }
            val two = async(CoroutineName("v2")) { two() }
            log("launched two async")
            log("the answer is ${one.await() + two.await()}")
        }
    
        println("Completed in $time ms")
    }
    
    suspend fun one(): Int {
        delay(1000)
        log("Computing v1")
        return 19
    }
    
    suspend fun two(): Int {
        delay(500)
        log("Computing v2")
        return 10
    }
    
    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    运行结果:

    [main]launched two async
    [main]Computing v1
    [main]Computing v1
    [main]the answer is 29
    Completed in 1020 ms
    
    • 1
    • 2
    • 3
    • 4
    • 5

    async() 构造器返回值类型是 DeferredDeferredJob 的子类。

    然后是 delay() 方法。这是一个非阻塞的方法。非阻塞的意思是不会让线程由 Run 状态转入 Block 状态,该线程可以继续执行其他代码。相比之下,Thread.sleep() 则是阻塞方法,会使线程处于阻塞状态,无法继续执行。

    需要注意的是,GlobalScope 的生命周期与应用相同,一般不建议使用,因为很容易因遗忘cancel()操作而导致内存泄漏。可以使用自定义的作用域:

    fun main() = runBlocking {
        coroutineScope {
            launch {}
            launch {}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    看到 async() 和 await(),让人不免联想到其他语言中的 async 和 await 关键字,比如 JavaScript、Dart、Swift 和 C#。我们简单比较下二者的异同:

    • Kotlin 的 async & await 是以扩展方法的形式存在的,其他语言是以关键字的形式存在;
    • Kotlin 的 await() 返回的是一个 Deferred 类型,这与 Js 的 Promise、Dart 的 Future 和 Swift 的 Task 是类似的,执行到 await 都会挂起。

    二、取消协程

    取消正在运行中的协程的方法有:cancel,withTimeout,抛异常。

    cancel()

    我们可以调用 CoroutineScope.cancel() 来取消该作用域下的所有协程,包括子孙协程。我们在安卓开发中常见的 mainScope 就是采用的这种方式:

      class MyAndroidActivity {
          private val scope = MainScope()
     
          override fun onDestroy() {
              super.onDestroy()
              scope.cancel()
          }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    实际上 CoroutineScope.cancel() 内部调用的是 Job 的 cancel() 实现的:

    /**
     * Cancels this scope, including its job and all its children with an optional cancellation [cause].
     * A cause can be used to specify an error message or to provide other details on
     * a cancellation reason for debugging purposes.
     * Throws [IllegalStateException] if the scope does not have a job in it.
     */
    public fun CoroutineScope.cancel(cause: CancellationException? = null) {
        val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
        job.cancel(cause)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    如注释所述,如果该作用域没有 Job,则抛出 IllegalStateException 异常。

    cancel() 方法执行之后 Job 内部发生了什么呢?具体可以看如下状态机:
    在这里插入图片描述
    在这里插入图片描述

    cancel() 方法只对suspend()方法有效。因为 suspend() 方法执行时会先检查该作用域或Job是否被取消,如果被取消就不再往下执行了。而非 suspend() 方法因为没有这种功能,所以是无法生效的。比如:

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                println("coroutine started")
                delay(1000)
                println("coroutine completed")
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                println("coroutine finally")
            }
        }
    
        delay(500)
    
        println("coroutine canceled")
        job.cancel(CancellationException("i am canceled"))
        job.join()
    
        println("coroutine ended")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    join() 方法的作用是等待所有协程执行完成。
    上述代码的执行结果如下:

    coroutine started
    coroutine canceled
    coroutine finally
    coroutine ended
    java.util.concurrent.CancellationException: i am canceled
    	at coroutine.basics.CancelKt$main$1.invokeSuspend(cancel.kt:24)
    	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
    	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
    	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
    	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
    	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
    	at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
    	at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:489)
    	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
    	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
    	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    	at coroutine.basics.CancelKt.main(cancel.kt:5)
    	at coroutine.basics.CancelKt.main(cancel.kt)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    协程被取消成功,被取消的时候会抛出 CancellationException。之所以能被取消,是因为协程代码块里面有 delay() 这个 suspend 方法,如果我们把 delay() 换成非 suspend 方法 Thread.sleep(),能被取消成功吗?我们试验下:

    fun main() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                println("coroutine started")
                Thread.sleep(1000) // 注意此处不再是 suspend 方法 delay()
                println("coroutine completed")
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                println("coroutine finally")
            }
        }
    
        delay(500)
    
        println("coroutine canceled")
        job.cancel(CancellationException("i am canceled"))
        job.join()
    
        println("coroutine ended")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    输出结果如下:

    coroutine started
    coroutine canceled
    coroutine completed
    coroutine finally
    coroutine ended
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看出,协程没有被取消。如果我们想不包含suspend方法的协程被取消,可以手动检查下当前协程的状态,如果 isActive 是 true 才往下执行,否则就结束掉:

    fun main() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                println("coroutine started")
                Thread.sleep(1000)
                if (isActive) {
                    println("coroutine completed")
                }
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                println("coroutine finally")
            }
        }
    
        delay(500)
    
        println("coroutine canceled")
        job.cancel(CancellationException("i am canceled"))
        job.join()
    
        println("coroutine ended")
    }
    输出如下:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    coroutine started
    coroutine canceled
    coroutine finally
    coroutine ended

    coroutine started
    coroutine canceled
    coroutine completed
    coroutine finally
    coroutine ended
    
    • 1
    • 2
    • 3
    • 4
    • 5

    需要注意的是,如果协程已经被取消了,在 finally 代码里面使用 suspend 方法是不会执行的:

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                println("coroutine started")
                delay(1000)
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                println("running finally")
                delay(1000)
                println("coroutine finally")
            }
        }
    
        delay(500)
    
        println("coroutine canceled")
        job.cancelAndJoin()
        println("coroutine ended")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出如下:

    coroutine started
    coroutine canceled
    running finally
    coroutine ended
    kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@7ebf7c43
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看出对应的这两行代码都没有执行:

                delay(1000)
                println("coroutine finally")
    
    • 1
    • 2

    如果我们想让上述两行代码执行,可以使用 withContext(NonCancellable)

    package coroutine.basics
    
    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                println("coroutine started")
                delay(1000)
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                withContext(NonCancellable) {
                    println("running finally")
                    delay(1000)
                    println("coroutine finally")
                }
            }
        }
    
        delay(500)
    
        println("coroutine canceled")
        job.cancelAndJoin()
    
        println("coroutine ended")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    上述代码输出如下:

    coroutine started
    coroutine canceled
    running finally
    kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@70c1cc32
    coroutine finally
    coroutine ended
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    withTimeout

    除了 cancel() 方法,我们还可以使用 withTimeout() 来取消协程:

    fun main() = runBlocking {
        withTimeout(1300L) {
            repeat(1000) { i ->
                println("i am sleeping $i")
                delay(500)
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在过了 1300ms 后就会取消正在运行的协程,上述代码输出如下:

    i am sleeping 0
    i am sleeping 1
    i am sleeping 2
    Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
    	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
    	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
    	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
    	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
    	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
    	at java.base/java.lang.Thread.run(Thread.java:831)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    withTimeout() 通过抛出 TimeoutCancellationException 的方式来结束协程。TimeoutCancellationException 是 CancellationException 的子类。如果我们想在结束时做些额外的操作比如关闭资源等,可以使用 try-catch(e: TimeoutCancellationException) {}。或者使用 withTimeoutOrNull,该方法返回null而非抛出异常:

    fun main() = runBlocking {
        val ret = withTimeoutOrNull(1300L) {
            repeat(1000) { i ->
                println("i am sleeping $i")
                delay(500)
            }
        }
    
        println("Result is $ret")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    上述代码输出为:

    i am sleeping 0
    i am sleeping 1
    i am sleeping 2
    Result is null
    
    • 1
    • 2
    • 3
    • 4

    需要注意的是,withTimeout 方法是异步的,是和其包裹着的代码块并发执行的,执行的时机可能是任何时候,包括代码块正在或已经执行完成 return 语句等。正因为此,我们在代码块里面打开或者获取需要关闭的资源时,一定要格外小心。比如我们有一个 Resource 类,来监控被打开和关闭的次数,我们用10w个协程模拟对该资源的打开和关闭操作:

    var acquired = 0
    
    class Resource {
        init {
            acquired++
        }
    
        fun close() {
            acquired--
        }
    }
    
    fun main() {
        runBlocking {
            repeat(100_000) {
                launch {
                    val resource = withTimeout(20) {
                        delay(19)
                        Resource()
                    }
    
                    resource.close()
                }
            }
        }
        println(acquired)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    根据执行环境不同,可以适当调整 20 和 19的数值,我们会发现多执行几次后打印的结果并不总是0。注意,上述代码并不会出现线程安全的问题,因为10w的协程都在是主线程中执行的。那么我们怎么避免这个问题呢?可以存储一个资源的引用,然后使用这个引用释放资源:

    fun main() {
        runBlocking {
            repeat(1000_000) {
                launch {
                    var resource: Resource? = null
                    try {
                        withTimeout(20) {
                            delay(19)
                            resource = Resource()
                        }
                    } finally {
                        resource?.close()
                    }
                }
            }
        }
    
        println(acquired)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这样每次输出都是0了,不会出现资源泄露的情况。

    参考文章

  • 相关阅读:
    Java后端获取当前项目所在路径的方法(适用于Linux、Windows)
    DPDK丢包那些事
    hooks 内部结构
    RHCE-ansible(一)--- 安装ansible、主机清单、sudo提权、特权升级
    【数据结构与算法】深度剖析“八大排序”(上)_ 探寻一些不为人知的细节
    【React】第六部分 生命周期
    .Net_C#面试题(一)
    centos图形化桌面中火狐浏览器无法访问项目页面问题处理
    掌动智能:UI自动化测试工具的重要性和应用
    如何在 uwsgi 配置中传递自定义参数?
  • 原文地址:https://blog.csdn.net/zhaizu/article/details/127134332