• Kotlin协程基础-CoroutineContext


    一、协程上下文CoroutineContext

    CoroutineContext是协程的上下文,它的使用场景很多。

    1.CoroutineScope的launch扩展函数的第一个参数就是CoroutineContext,默认值是EmptyCoroutineContext。

    1. public fun CoroutineScope.launch(
    2. context: CoroutineContext = EmptyCoroutineContext,
    3. start: CoroutineStart = CoroutineStart.DEFAULT,
    4. block: suspend CoroutineScope.() -> Unit
    5. ): Job {
    6. ......
    7. }

    2.withContext() 函数的参数中也有CoroutineContext。

    1. public suspend fun withContext(
    2. context: CoroutineContext,
    3. block: suspend CoroutineScope.() -> T
    4. ): T {
    5. ......
    6. }
    1. suspend fun getUserInfo(): String {
    2. printCoroutine("Before IO Context")
    3. withContext(Dispatchers.IO) {
    4. printCoroutine("In IO Context")
    5. delay(1000L)
    6. }
    7. printCoroutine("After IO Context")
    8. return "David"
    9. }
    10. runBlocking {
    11. val userInfo = getUserInfo()
    12. printCoroutine(userInfo)
    13. }
    14. fun printCoroutine(any: Any?) {
    15. println("" + any + ";Thread:" + Thread.currentThread().name)
    16. }
    17. Log:
    18. Before IO Context;Thread:main @coroutine#1
    19. In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    20. After IO Context;Thread:main @coroutine#1
    21. David;Thread:main @coroutine#1

    withContext() 指定Dispatchers.IO以后,Lambda 当中的代码就会被分发到 DefaultDispatcher 线程池中去执行,而它外部的所有代码仍然还是运行在 main 线程上。

    在runBlocking中传入Dispatchers.IO,则所有的代码都运行在 DefaultDispatcher 这个线程池当中了。

    1. runBlocking(Dispatchers.IO) {
    2. val userInfo = getUserInfo()
    3. printCoroutine(userInfo)
    4. }
    5. suspend fun getUserInfo(): String {
    6. printCoroutine("Before IO Context")
    7. withContext(Dispatchers.IO) {
    8. printCoroutine("In IO Context")
    9. delay(1000L)
    10. }
    11. printCoroutine("After IO Context")
    12. return "David"
    13. }
    14. Log:
    15. Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    16. In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    17. After IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    18. David;Thread:DefaultDispatcher-worker-1 @coroutine#1

    3.内置 Dispatcher

    Dispatchers.Main 在 UI 编程平台才有意义,比如Android的Main线程。

    Dispatchers.Unconfined 可能运行在任意线程之上。

    Dispatchers.Default 用于CPU 密集型任务的线程池。一般来说,它内部的线程个数是与机器 CPU 核心数量保持一致的,不过它有一个最小限制 2。

    Dispatchers.IO  用于 IO 密集型任务的线程池。具体线程的数量可以通过参数来配置:kotlinx.coroutines.io.parallelism。

    注意:Dispatchers.IO 底层是可能复用 Dispatchers.Default 当中的线程的。

    1. suspend fun getUserInfo(): String {
    2. printCoroutine("Before IO Context")
    3. withContext(Dispatchers.IO) {
    4. printCoroutine("In IO Context")
    5. delay(1000L)
    6. }
    7. printCoroutine("After IO Context")
    8. return "David"
    9. }
    10. runBlocking(Dispatchers.Default) {
    11. val userInfo = getUserInfo()
    12. printCoroutine(userInfo)
    13. }
    14. Log:
    15. Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    16. In IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
    17. After IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
    18. David;Thread:DefaultDispatcher-worker-2 @coroutine#1

    Dispatchers.Default 线程池当中有富余线程的时候,它是可以被 IO 线程池复用的。Dispatchers.Default 被 Dispatchers.IO 复用线程导致的。

    4.使用自定义Dispatcher

    1. runBlocking(mySingleDispatcher) {
    2. val userInfo = getUserInfo()
    3. printCoroutine(userInfo)
    4. }
    5. val mySingleDispatcher = Executors.newSingleThreadExecutor {
    6. Thread(it, "MySingleThread").apply {
    7. isDaemon = true
    8. }
    9. }.asCoroutineDispatcher()
    10. suspend fun getUserInfo(): String {
    11. printCoroutine("Before IO Context")
    12. withContext(Dispatchers.IO) {
    13. printCoroutine("In IO Context")
    14. delay(1000L)
    15. }
    16. printCoroutine("After IO Context")
    17. return "David"
    18. }
    19. Log:
    20. Before IO Context;Thread:MySingleThread @coroutine#1
    21. In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
    22. After IO Context;Thread:MySingleThread @coroutine#1
    23. David;Thread:MySingleThread @coroutine#1

     通过 asCoroutineDispatcher() 扩展函数,创建了一个 Dispatcher。Dispatcher 的本质仍然还是线程,协程运行在线程之上。当为 runBlocking 传入自定义的 mySingleDispatcher 以后,由于它底层只有一个线程,因此只有“In IO Context”是运行在 DefaultDispatcher 这个线程池的,其他代码都运行在 mySingleDispatcher 之上。

    二、CoroutineContext 

    1.CoroutineScope

    1. public fun CoroutineScope.launch(
    2. context: CoroutineContext = EmptyCoroutineContext,
    3. start: CoroutineStart = CoroutineStart.DEFAULT,
    4. block: suspend CoroutineScope.() -> Unit
    5. ): Job {
    6. ......
    7. }
    8. public interface CoroutineScope {
    9. public val coroutineContext: CoroutineContext
    10. }

     CoroutineScope是一个接口,而这个接口只有唯一的成员,就是 CoroutineContext。所以,CoroutineScope 只是对 CoroutineContext 做了一层封装而已,它的核心能力其实都来自于 CoroutineContext。

    CoroutineScope 的作用:可以方便我们批量控制协程。

    1. runBlocking {
    2. val scope = CoroutineScope(Job())
    3. scope.launch {
    4. printCoroutine("First start!")
    5. delay(1000L)
    6. printCoroutine("First End!")
    7. }
    8. scope.launch {
    9. printCoroutine("Second start!")
    10. delay(1000L)
    11. printCoroutine("Second End!")
    12. }
    13. scope.launch {
    14. printCoroutine("Third start!")
    15. delay(1000L)
    16. printCoroutine("Third End!")
    17. }
    18. delay(500L)
    19. scope.cancel()
    20. delay(1000L)
    21. }
    22. Log:
    23. First start!;Thread:DefaultDispatcher-worker-1 @coroutine#2
    24. Second start!;Thread:DefaultDispatcher-worker-2 @coroutine#3
    25. Third start!;Thread:DefaultDispatcher-worker-3 @coroutine#4

    2.Job 和 Dispatcher

     Job 继承自 CoroutineContext.Element,而 CoroutineContext.Element 继承自 CoroutineContext,Job 是间接继承自 CoroutineContext 的。所以说,Job 就是一个 CoroutineContext。

    1. public interface Job : CoroutineContext.Element {
    2. }
    3. public interface CoroutineContext {
    4. public interface Element : CoroutineContext {
    5. }
    6. }

     CoroutineContext 本身的接口设计:

    1. @SinceKotlin("1.3")
    2. public interface CoroutineContext {
    3. public operator fun get(key: Key<E>): E?
    4. public fun fold(initial: R, operation: (R, Element) -> R): R
    5. public operator fun plus(context: CoroutineContext): CoroutineContext =
    6. if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
    7. context.fold(this) { acc, element ->
    8. val removed = acc.minusKey(element.key)
    9. if (removed === EmptyCoroutineContext) element else {
    10. // make sure interceptor is always last in the context (and thus is fast to get when present)
    11. val interceptor = removed[ContinuationInterceptor]
    12. if (interceptor == null) CombinedContext(removed, element) else {
    13. val left = removed.minusKey(ContinuationInterceptor)
    14. if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
    15. CombinedContext(CombinedContext(left, element), interceptor)
    16. }
    17. }
    18. }
    19. public fun minusKey(key: Key<*>): CoroutineContext
    20. public interface Key<E : Element>
    21. public interface Element : CoroutineContext {
    22. public val key: Key<*>
    23. public override operator fun get(key: Key<E>): E? =
    24. @Suppress("UNCHECKED_CAST")
    25. if (this.key == key) this as E else null
    26. public override fun fold(initial: R, operation: (R, Element) -> R): R =
    27. operation(initial, this)
    28. public override fun minusKey(key: Key<*>): CoroutineContext =
    29. if (this.key == key) EmptyCoroutineContext else this
    30. }
    31. }

     从get()、plus()、minusKey()、fold() 看CoroutineContext 的接口设计,和 Map 十分类似。可以把 CoroutineContext 当作 Map 来用。

    1. runBlocking {
    2. val scope = CoroutineScope(Job() + mySingleDispatcher)
    3. scope.launch {
    4. printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
    5. delay(1000L)
    6. printCoroutine("First end!")
    7. }
    8. delay(500L)
    9. scope.cancel()
    10. delay(1000L)
    11. }
    12. Log:
    13. true;Thread:MySingleThread @coroutine#2

     使用了“Job() + mySingleDispatcher”这样的方式创建 CoroutineScope, CoroutineContext 的 plus() 进行了操作符重载

    1. public operator fun plus(key: Key<E>): E?
    1. public operator fun plus(key: Key<E>): E?

    创建出 scope 以后,后续创建的协程就全部都运行在 mySingleDispatcher 这个线程之上了。

    1. public actual object Dispatchers {
    2. public actual val Default: CoroutineDispatcher = DefaultScheduler
    3. public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    4. public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    5. public val IO: CoroutineDispatcher = DefaultIoScheduler
    6. public fun shutdown() { }
    7. }
    8. public abstract class CoroutineDispatcher :
    9. AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}
    10. public interface ContinuationInterceptor : CoroutineContext.Element {}

    Dispatchers 其实是一个 object 单例,它的内部成员的类型是 CoroutineDispatcher,而它又是继承自 ContinuationInterceptor,这个类则是实现了 CoroutineContext.Element 接口。由此可见,Dispatcher 确实就是 CoroutineContext。

    3.CoroutineName

    1. runBlocking {
    2. val scope = CoroutineScope(Job() + mySingleDispatcher)
    3. scope.launch(CoroutineName("MyFirstCoroutine!")) {
    4. printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
    5. delay(1000L)
    6. printCoroutine("First end!")
    7. }
    8. delay(500L)
    9. scope.cancel()
    10. delay(1000L)
    11. }
    12. true;Thread:MySingleThread @MyFirstCoroutine!#2

    调用 launch 的时候,传入了“CoroutineName(“MyFirstCoroutine!”)”作为协程的名字,得到了“@MyFirstCoroutine!#2”这样的输出。

    4.CoroutineExceptionHandler

    CoroutineExceptionHandler 的 handleException() 可以自定义异常处理器。

    1. public interface CoroutineExceptionHandler : CoroutineContext.Element {
    2. public companion object Key : CoroutineContext.Key
    3. public fun handleException(context: CoroutineContext, exception: Throwable)
    4. }

    1. runBlocking {
    2. val myExceptionHandler = CoroutineExceptionHandler{_,throwable->
    3. println("Catch exception: $throwable")
    4. }
    5. val scope = CoroutineScope(Job()+ mySingleDispatcher)
    6. val job = scope.launch(myExceptionHandler) {
    7. val s: String? = null
    8. s!!.length
    9. }
    10. job.join()
    11. }
    12. Catch exception: java.lang.NullPointerException

  • 相关阅读:
    使用设计模式基于easypoi优雅的设计通用excel导入功能
    JVM——类加载器(JDK8及之前,双亲委派机制)
    BigDecimal运算使用方法(附简单案例)
    jenkins 部署 vue 项目
    knex事务
    MySQL安装配置与使用教程(2023.11.13 MySQL8.0.35)
    React 高阶组件 和 受控组件
    Java面试题之cpu占用率100%,进行定位和解决
    践行“双碳” 迈动互联节能数据产品上线
    hdfswriter优化之提高写速度
  • 原文地址:https://blog.csdn.net/zhangying1994/article/details/127123130