截至 2022.05 月,Java 尚未在正式版中支持协程,而是通过织布机计划实验性的支持了虚拟线程,作用类似于协程。
在 Java 里面,我们可以通过 Thread 类来创建一个线程:
在 Kotlin 中,我们怎么创建一个协程呢?
有如下方法可以创建协程:runBlocking()、launch()、async()、produce()、actor(),我们一一介绍。
我们先来创建一个最简单的协程:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
val job : Job = GlobalScope.launch {
delay(1000)
println("World!")
}
print("Hello, ")
Thread.sleep(1500)
}
我们逐行解读代码。
首先是 GlobalScope。GlobalScope 是协程作用域(CoroutineScope)的一种。每个协程必须归属于某个 CoroutineScope,称为“结构性并发”(Structured Concurrency),不仅 Kotlin,Swift 也通过 Task 支持了结构性并发。
有了 CoroutineScope,我们可以很方便的对作用域下注册的所有协程进行统一管理,比如将其全部取消执行等。除了 GlobalScope,还有为安卓设计的 MainScope 以及用 CoroutineScope.coroutineScope() 构造的自定义作用域。借助 MainScope,我们可以在 Activity 销毁时取消执行协程,从而避免内存泄漏。
launch 是 CoroutineScope 的扩展函数,其完整的声明是:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
launch() 是最常见的创建协程的方法,返回一个 Job,可以通过 Job.cancel() 来终止该 Job 下所有的协程,其效果跟 CoroutineScope 是一样的,其实 CoroutineScope.cancel() 也是委托给 Job.cancel() 来实现的。
通过 launch() 我们就启动了一个协程,虽然里面只有两行代码:
val job : Job = GlobalScope.launch {
delay(1000)
println("World!")
}
为了方便组织代码,我们可以将上述两行代码封装成一个方法。注意,如果该方法是耗时方法,则需要在前面加上 suspend 关键字:
fun main() {
val job : Job = GlobalScope.launch {
printDelayedWorld()
}
print("Hello, ")
Thread.sleep(1500)
}
suspend fun printDelayedWorld() {
delay(1000)
println("World!")
}
最后需要注意的是 Thread.sleep(1500)
这行代码:
fun main() {
val job : Job = GlobalScope.launch {
delay(1000)
println("World!")
}
print("Hello, ")
Thread.sleep(1500)
}
这行代码是必现要有的,如果没有这行代码,上面的程序只能打印出 “Hello, ”,而无法打印出“World!”,因为在执行完print("Hello, ")
之后,整个进程就结束了,里面的所有线程也就被强制结束了,运行在线程之上的协程也就无法执行了。我们可以将上述代码进行如下改写,也能达到相同的目的:
fun main() = runBlocking {
val job : Job = GlobalScope.launch {
delay(1000)
println("World!")
}
print("Hello, ")
}
除了 launch(),我们还有其他协程构造器,比如 async()、produce()、actor(),其中 async() 方法不会立即启动一个协程,而是在调用 await() 方法时才会启动,需要注意的是 await() 是同步的,比如下面代码中 println("Completed in $time ms")
必须等待2个 await() 执行完毕后再执行:
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking(CoroutineName("main")) {
val time = measureTimeMillis {
val one = async(CoroutineName("v1")) { one() }
val two = async(CoroutineName("v2")) { two() }
log("launched two async")
log("the answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
suspend fun one(): Int {
delay(1000)
log("Computing v1")
return 19
}
suspend fun two(): Int {
delay(500)
log("Computing v2")
return 10
}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
运行结果:
[main]launched two async
[main]Computing v1
[main]Computing v1
[main]the answer is 29
Completed in 1020 ms
async()
构造器返回值类型是 Deferred
,Deferred
是 Job
的子类。
然后是 delay() 方法。这是一个非阻塞的方法。非阻塞的意思是不会让线程由 Run 状态转入 Block 状态,该线程可以继续执行其他代码。相比之下,Thread.sleep() 则是阻塞方法,会使线程处于阻塞状态,无法继续执行。
需要注意的是,GlobalScope 的生命周期与应用相同,一般不建议使用,因为很容易因遗忘cancel()操作而导致内存泄漏。可以使用自定义的作用域:
fun main() = runBlocking {
coroutineScope {
launch {}
launch {}
}
}
看到 async() 和 await(),让人不免联想到其他语言中的 async 和 await 关键字,比如 JavaScript、Dart、Swift 和 C#。我们简单比较下二者的异同:
取消正在运行中的协程的方法有:cancel,withTimeout,抛异常。
我们可以调用 CoroutineScope.cancel() 来取消该作用域下的所有协程,包括子孙协程。我们在安卓开发中常见的 mainScope 就是采用的这种方式:
class MyAndroidActivity {
private val scope = MainScope()
override fun onDestroy() {
super.onDestroy()
scope.cancel()
}
}
实际上 CoroutineScope.cancel() 内部调用的是 Job 的 cancel() 实现的:
/**
* Cancels this scope, including its job and all its children with an optional cancellation [cause].
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* Throws [IllegalStateException] if the scope does not have a job in it.
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
如注释所述,如果该作用域没有 Job,则抛出 IllegalStateException 异常。
cancel() 方法执行之后 Job 内部发生了什么呢?具体可以看如下状态机:
cancel() 方法只对suspend()方法有效。因为 suspend() 方法执行时会先检查该作用域或Job是否被取消,如果被取消就不再往下执行了。而非 suspend() 方法因为没有这种功能,所以是无法生效的。比如:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
try {
println("coroutine started")
delay(1000)
println("coroutine completed")
} catch (e: Exception) {
e.printStackTrace()
} finally {
println("coroutine finally")
}
}
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
}
join() 方法的作用是等待所有协程执行完成。
上述代码的执行结果如下:
coroutine started
coroutine canceled
coroutine finally
coroutine ended
java.util.concurrent.CancellationException: i am canceled
at coroutine.basics.CancelKt$main$1.invokeSuspend(cancel.kt:24)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:489)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at coroutine.basics.CancelKt.main(cancel.kt:5)
at coroutine.basics.CancelKt.main(cancel.kt)
协程被取消成功,被取消的时候会抛出 CancellationException。之所以能被取消,是因为协程代码块里面有 delay() 这个 suspend 方法,如果我们把 delay() 换成非 suspend 方法 Thread.sleep(),能被取消成功吗?我们试验下:
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
try {
println("coroutine started")
Thread.sleep(1000) // 注意此处不再是 suspend 方法 delay()
println("coroutine completed")
} catch (e: Exception) {
e.printStackTrace()
} finally {
println("coroutine finally")
}
}
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
}
输出结果如下:
coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended
可以看出,协程没有被取消。如果我们想不包含suspend方法的协程被取消,可以手动检查下当前协程的状态,如果 isActive 是 true 才往下执行,否则就结束掉:
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
try {
println("coroutine started")
Thread.sleep(1000)
if (isActive) {
println("coroutine completed")
}
} catch (e: Exception) {
e.printStackTrace()
} finally {
println("coroutine finally")
}
}
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
}
输出如下:
coroutine started
coroutine canceled
coroutine finally
coroutine ended
coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended
需要注意的是,如果协程已经被取消了,在 finally 代码里面使用 suspend 方法是不会执行的:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
try {
println("coroutine started")
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
} finally {
println("running finally")
delay(1000)
println("coroutine finally")
}
}
delay(500)
println("coroutine canceled")
job.cancelAndJoin()
println("coroutine ended")
}
输出如下:
coroutine started
coroutine canceled
running finally
coroutine ended
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@7ebf7c43
可以看出对应的这两行代码都没有执行:
delay(1000)
println("coroutine finally")
如果我们想让上述两行代码执行,可以使用 withContext(NonCancellable)
:
package coroutine.basics
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
try {
println("coroutine started")
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
} finally {
withContext(NonCancellable) {
println("running finally")
delay(1000)
println("coroutine finally")
}
}
}
delay(500)
println("coroutine canceled")
job.cancelAndJoin()
println("coroutine ended")
}
上述代码输出如下:
coroutine started
coroutine canceled
running finally
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@70c1cc32
coroutine finally
coroutine ended
除了 cancel() 方法,我们还可以使用 withTimeout() 来取消协程:
fun main() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i ->
println("i am sleeping $i")
delay(500)
}
}
}
在过了 1300ms 后就会取消正在运行的协程,上述代码输出如下:
i am sleeping 0
i am sleeping 1
i am sleeping 2
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
at java.base/java.lang.Thread.run(Thread.java:831)
withTimeout() 通过抛出 TimeoutCancellationException 的方式来结束协程。TimeoutCancellationException 是 CancellationException 的子类。如果我们想在结束时做些额外的操作比如关闭资源等,可以使用 try-catch(e: TimeoutCancellationException) {}。或者使用 withTimeoutOrNull
,该方法返回null而非抛出异常:
fun main() = runBlocking {
val ret = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("i am sleeping $i")
delay(500)
}
}
println("Result is $ret")
}
上述代码输出为:
i am sleeping 0
i am sleeping 1
i am sleeping 2
Result is null
需要注意的是,withTimeout
方法是异步的,是和其包裹着的代码块并发执行的,执行的时机可能是任何时候,包括代码块正在或已经执行完成 return 语句等。正因为此,我们在代码块里面打开或者获取需要关闭的资源时,一定要格外小心。比如我们有一个 Resource 类,来监控被打开和关闭的次数,我们用10w个协程模拟对该资源的打开和关闭操作:
var acquired = 0
class Resource {
init {
acquired++
}
fun close() {
acquired--
}
}
fun main() {
runBlocking {
repeat(100_000) {
launch {
val resource = withTimeout(20) {
delay(19)
Resource()
}
resource.close()
}
}
}
println(acquired)
}
根据执行环境不同,可以适当调整 20 和 19的数值,我们会发现多执行几次后打印的结果并不总是0。注意,上述代码并不会出现线程安全的问题,因为10w的协程都在是主线程中执行的。那么我们怎么避免这个问题呢?可以存储一个资源的引用,然后使用这个引用释放资源:
fun main() {
runBlocking {
repeat(1000_000) {
launch {
var resource: Resource? = null
try {
withTimeout(20) {
delay(19)
resource = Resource()
}
} finally {
resource?.close()
}
}
}
}
println(acquired)
}
这样每次输出都是0了,不会出现资源泄露的情况。