CoroutineContext是协程的上下文,它的使用场景很多。
1.CoroutineScope的launch扩展函数的第一个参数就是CoroutineContext,默认值是EmptyCoroutineContext。
- public fun CoroutineScope.launch(
- context: CoroutineContext = EmptyCoroutineContext,
- start: CoroutineStart = CoroutineStart.DEFAULT,
- block: suspend CoroutineScope.() -> Unit
- ): Job {
- ......
- }
2.withContext() 函数的参数中也有CoroutineContext。
- public suspend fun
withContext( - context: CoroutineContext,
- block: suspend CoroutineScope.() -> T
- ): T {
- ......
- }
- suspend fun getUserInfo(): String {
- printCoroutine("Before IO Context")
- withContext(Dispatchers.IO) {
- printCoroutine("In IO Context")
- delay(1000L)
- }
- printCoroutine("After IO Context")
- return "David"
- }
-
- runBlocking {
- val userInfo = getUserInfo()
- printCoroutine(userInfo)
- }
-
- fun printCoroutine(any: Any?) {
- println("" + any + ";Thread:" + Thread.currentThread().name)
- }
-
-
- Log:
-
- Before IO Context;Thread:main @coroutine#1
- In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- After IO Context;Thread:main @coroutine#1
- David;Thread:main @coroutine#1
withContext() 指定Dispatchers.IO以后,Lambda 当中的代码就会被分发到 DefaultDispatcher 线程池中去执行,而它外部的所有代码仍然还是运行在 main 线程上。
在runBlocking中传入Dispatchers.IO,则所有的代码都运行在 DefaultDispatcher 这个线程池当中了。
- runBlocking(Dispatchers.IO) {
- val userInfo = getUserInfo()
- printCoroutine(userInfo)
- }
-
-
- suspend fun getUserInfo(): String {
- printCoroutine("Before IO Context")
- withContext(Dispatchers.IO) {
- printCoroutine("In IO Context")
- delay(1000L)
- }
- printCoroutine("After IO Context")
- return "David"
- }
-
-
- Log:
-
- Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- After IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- David;Thread:DefaultDispatcher-worker-1 @coroutine#1
3.内置 Dispatcher
Dispatchers.Main 在 UI 编程平台才有意义,比如Android的Main线程。
Dispatchers.Unconfined 可能运行在任意线程之上。
Dispatchers.Default 用于CPU 密集型任务的线程池。一般来说,它内部的线程个数是与机器 CPU 核心数量保持一致的,不过它有一个最小限制 2。
Dispatchers.IO 用于 IO 密集型任务的线程池。具体线程的数量可以通过参数来配置:kotlinx.coroutines.io.parallelism。
注意:Dispatchers.IO 底层是可能复用 Dispatchers.Default 当中的线程的。
- suspend fun getUserInfo(): String {
- printCoroutine("Before IO Context")
- withContext(Dispatchers.IO) {
- printCoroutine("In IO Context")
- delay(1000L)
- }
- printCoroutine("After IO Context")
- return "David"
- }
-
- runBlocking(Dispatchers.Default) {
- val userInfo = getUserInfo()
- printCoroutine(userInfo)
- }
-
- Log:
- Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- In IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
- After IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
- David;Thread:DefaultDispatcher-worker-2 @coroutine#1
Dispatchers.Default 线程池当中有富余线程的时候,它是可以被 IO 线程池复用的。Dispatchers.Default 被 Dispatchers.IO 复用线程导致的。
4.使用自定义Dispatcher
- runBlocking(mySingleDispatcher) {
- val userInfo = getUserInfo()
- printCoroutine(userInfo)
- }
-
-
- val mySingleDispatcher = Executors.newSingleThreadExecutor {
- Thread(it, "MySingleThread").apply {
- isDaemon = true
- }
- }.asCoroutineDispatcher()
-
- suspend fun getUserInfo(): String {
- printCoroutine("Before IO Context")
- withContext(Dispatchers.IO) {
- printCoroutine("In IO Context")
- delay(1000L)
- }
- printCoroutine("After IO Context")
- return "David"
- }
-
-
-
- Log:
-
- Before IO Context;Thread:MySingleThread @coroutine#1
- In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
- After IO Context;Thread:MySingleThread @coroutine#1
- David;Thread:MySingleThread @coroutine#1
通过 asCoroutineDispatcher() 扩展函数,创建了一个 Dispatcher。Dispatcher 的本质仍然还是线程,协程运行在线程之上。当为 runBlocking 传入自定义的 mySingleDispatcher 以后,由于它底层只有一个线程,因此只有“In IO Context”是运行在 DefaultDispatcher 这个线程池的,其他代码都运行在 mySingleDispatcher 之上。
- public fun CoroutineScope.launch(
- context: CoroutineContext = EmptyCoroutineContext,
- start: CoroutineStart = CoroutineStart.DEFAULT,
- block: suspend CoroutineScope.() -> Unit
- ): Job {
- ......
- }
-
- public interface CoroutineScope {
-
- public val coroutineContext: CoroutineContext
- }
CoroutineScope是一个接口,而这个接口只有唯一的成员,就是 CoroutineContext。所以,CoroutineScope 只是对 CoroutineContext 做了一层封装而已,它的核心能力其实都来自于 CoroutineContext。
CoroutineScope 的作用:可以方便我们批量控制协程。
- runBlocking {
- val scope = CoroutineScope(Job())
- scope.launch {
- printCoroutine("First start!")
- delay(1000L)
- printCoroutine("First End!")
- }
-
- scope.launch {
- printCoroutine("Second start!")
- delay(1000L)
- printCoroutine("Second End!")
- }
-
- scope.launch {
- printCoroutine("Third start!")
- delay(1000L)
- printCoroutine("Third End!")
- }
-
- delay(500L)
- scope.cancel()
- delay(1000L)
- }
-
- Log:
-
- First start!;Thread:DefaultDispatcher-worker-1 @coroutine#2
- Second start!;Thread:DefaultDispatcher-worker-2 @coroutine#3
- Third start!;Thread:DefaultDispatcher-worker-3 @coroutine#4
Job 继承自 CoroutineContext.Element,而 CoroutineContext.Element 继承自 CoroutineContext,Job 是间接继承自 CoroutineContext 的。所以说,Job 就是一个 CoroutineContext。
- public interface Job : CoroutineContext.Element {
- }
-
- public interface CoroutineContext {
- public interface Element : CoroutineContext {
- }
- }
CoroutineContext 本身的接口设计:
- @SinceKotlin("1.3")
- public interface CoroutineContext {
-
- public operator fun
get(key: Key<E>): E? -
- public fun
fold(initial: R, operation: (R, Element) -> R): R -
-
- public operator fun plus(context: CoroutineContext): CoroutineContext =
- if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
- context.fold(this) { acc, element ->
- val removed = acc.minusKey(element.key)
- if (removed === EmptyCoroutineContext) element else {
- // make sure interceptor is always last in the context (and thus is fast to get when present)
- val interceptor = removed[ContinuationInterceptor]
- if (interceptor == null) CombinedContext(removed, element) else {
- val left = removed.minusKey(ContinuationInterceptor)
- if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
- CombinedContext(CombinedContext(left, element), interceptor)
- }
- }
- }
-
-
- public fun minusKey(key: Key<*>): CoroutineContext
-
-
- public interface Key<E : Element>
-
-
- public interface Element : CoroutineContext {
-
-
- public val key: Key<*>
-
- public override operator fun
get(key: Key<E>): E? = - @Suppress("UNCHECKED_CAST")
- if (this.key == key) this as E else null
-
- public override fun
fold(initial: R, operation: (R, Element) -> R): R = - operation(initial, this)
-
- public override fun minusKey(key: Key<*>): CoroutineContext =
- if (this.key == key) EmptyCoroutineContext else this
- }
- }
从get()、plus()、minusKey()、fold() 看CoroutineContext 的接口设计,和 Map 十分类似。可以把 CoroutineContext 当作 Map 来用。
- runBlocking {
- val scope = CoroutineScope(Job() + mySingleDispatcher)
- scope.launch {
- printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
- delay(1000L)
- printCoroutine("First end!")
- }
-
- delay(500L)
- scope.cancel()
- delay(1000L)
- }
-
- Log:
-
- true;Thread:MySingleThread @coroutine#2
使用了“Job() + mySingleDispatcher”这样的方式创建 CoroutineScope, CoroutineContext 的 plus() 进行了操作符重载。
-
- public operator fun
plus(key: Key<E>): E?
-
- public operator fun
plus(key: Key<E>): E?
创建出 scope 以后,后续创建的协程就全部都运行在 mySingleDispatcher 这个线程之上了。
-
- public actual object Dispatchers {
-
- public actual val Default: CoroutineDispatcher = DefaultScheduler
-
- public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
-
- public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
-
- public val IO: CoroutineDispatcher = DefaultIoScheduler
-
- public fun shutdown() { }
- }
-
- public abstract class CoroutineDispatcher :
- AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}
-
- public interface ContinuationInterceptor : CoroutineContext.Element {}
Dispatchers 其实是一个 object 单例,它的内部成员的类型是 CoroutineDispatcher,而它又是继承自 ContinuationInterceptor,这个类则是实现了 CoroutineContext.Element 接口。由此可见,Dispatcher 确实就是 CoroutineContext。
- runBlocking {
- val scope = CoroutineScope(Job() + mySingleDispatcher)
- scope.launch(CoroutineName("MyFirstCoroutine!")) {
- printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
- delay(1000L)
- printCoroutine("First end!")
- }
-
- delay(500L)
- scope.cancel()
- delay(1000L)
- }
-
- true;Thread:MySingleThread @MyFirstCoroutine!#2
调用 launch 的时候,传入了“CoroutineName(“MyFirstCoroutine!”)”作为协程的名字,得到了“@MyFirstCoroutine!#2”这样的输出。
CoroutineExceptionHandler 的 handleException() 可以自定义异常处理器。
-
-
- public interface CoroutineExceptionHandler : CoroutineContext.Element {
-
- public companion object Key : CoroutineContext.Key
-
- public fun handleException(context: CoroutineContext, exception: Throwable)
- }
- runBlocking {
- val myExceptionHandler = CoroutineExceptionHandler{_,throwable->
- println("Catch exception: $throwable")
- }
- val scope = CoroutineScope(Job()+ mySingleDispatcher)
- val job = scope.launch(myExceptionHandler) {
- val s: String? = null
- s!!.length
- }
- job.join()
- }
-
- Catch exception: java.lang.NullPointerException