• 自定义Kotlin协程调度器


    一、Kotlin协程

    在Android开发中,Kotlin最终编译为Java的字节码。众所周知,Java中只有进程和线程的概念,并没有协程的概念。那么什么是协程?为什么我们需要协程?

    协程,又称微线程。协程不像线程和进程那样,需要进行系统内核的上下文切换,协程的上下文切换由开发人员来决定。

    概念过于抽象,我们举一个例子。

    我们都知道多线程,当需要执行多项任务的时候,会采用多线程并发执行。拿Android开发中的网络请求来说,假如每个网络请求彼此独立不相干,那我们可以每个网络请求可以单独开启一个线程来执行。

    当有1个网络请求,我们开启1个线程。

    当有10个网络请求,我们开启10个线程。

    当有100个网络请求,我们开启100个线程。

    当有1000个网络请求,我们开启1000个线程。

    隐隐感觉有哪里不对。
    虽然理论上多线程并发执行会带来效率上的提高,但是系统内核就那么多,太多的线程并发执行只会拥堵在那里,并不会带来效率上的提升,并且会带来额外的线程切换开销。不仅如此,每个线程都会带来额外的内存开销。所以我们不能无限制的创建线程。
    在这里插入图片描述

    协程的出现刚好可以解决上述2个问题。协程运行在线程之上,当一个协程执行完成后,可以选择主动让出,让另一个协程运行在当前线程之上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多个协程。

    有人会提出异议,协程听起来和线程池没有啥区别啊?

    不错,笔者认为,Kotlin中的协程本质上与线程池并无区别,Kotlin中协程本质上是一套易用的线程池API,带来的是可读性的提升。

    但是这种可读性的提升帮助是巨大的。协程有效解决了回调地狱问题。

    假设有下面的场景,要先进行登录,然后才能获取位置,然后才能获取用户信息。每一次都必须异步执行,那我们不使用协程的写法如下:

    fun main() {
        login {
            getLocation {
                getUserInfo {
                    //通过一层层回调拿到真正结果
                }
            }
        }
    }
    
    
    fun login(result: (Boolean) -> Unit) {
        result.invoke(true)
    }
    
    fun getLocation(result: (Any) -> Unit) {
        result.invoke(true)
    }
    
    fun getUserInfo(result: (Any) -> Unit) {
        result.invoke(true)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    使用协程之后,可以发现原来使用回调来实现的逻辑,现在变成了顺序执行,代码可读性好了很多。

    fun main() {
        GlobalScope.launch {
            val loginResult = login()
            val location = getLocation()
            val user = getUserInfo()
        }
    }
    
    suspend fun login(): Boolean {
        //Dispatchers.IO表示使用IO调度器
        return GlobalScope.async(Dispatchers.IO) { true }.await()
    }
    
    suspend fun getLocation(): Any {
        //Dispatchers.IO表示使用Main调度器
        return GlobalScope.async(Dispatchers.Main) { Any() }.await()
    }
    
    suspend fun getUserInfo():Any {
        return GlobalScope.async(Dispatchers.Default) { Any() }.await()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    二、Kotlin预置的调度器

    在创建协程时候,需要指定调度器。协程的调度器用于确定执行协程的目标载体,即运行在哪个线程,包含在一个或者多个线程中。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派在线程池中,或者让它无线执行。系统预制的调度器有三种:

    • Dispatchers.Default:默认的调度器,当不指定调度器时,使用该调度器。此调度程序经过专门优化,适合在主线程之外执行占用大量CPU资源的工作,如对列表排序,对JSON解析。
    • Dispatchers.Main:此调度可以在Android主线程上运行协程。此调度只能用于与界面交互和执行快速工作。如Android界面框架操作,更新LiveData对象等。
    • Dispatchers.IO:此调度适合在主线程之外执行磁盘或者网络IO。如使用ROOM组件、从文件中读取数据或向文件中写入数据,以及运行任何网络操作。
    • Dispatchers.Unconfined:对执行协程的线程不做限制,可以直接在当前调度器所在的线程上执行。

    篇幅原因,这里我们选用Dispatchers.Default看一下源码实现:
    核心部分有三个位置,第一个是参数的定义

    public open class ExperimentalCoroutineDispatcher(
        private val corePoolSize: Int,
        private val maxPoolSize: Int,
        private val idleWorkerKeepAliveNs: Long,
        private val schedulerName: String = "CoroutineScheduler"
    ) : ExecutorCoroutineDispatcher() {
        public constructor(
            corePoolSize: Int = CORE_POOL_SIZE, //Default调度器会向线程池一样,定义核心线程,默认是2个
            maxPoolSize: Int = MAX_POOL_SIZE, //定义做大线程
            schedulerName: String = DEFAULT_SCHEDULER_NAME
        ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
    xxxxxx
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    第二个位置是CoroutineScheduler类的dispatch部分

    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        //将我们要执行的任务 block 封装为一个Task
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        //找到要执行这个任务的worker,Worker可以被理解为Thread
        val currentWorker = currentWorker()
        //将任务提交到队列中执行
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        //xxxxx
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    第三个位置是Worker的runWorker方法

    
    private fun runWorker() {
        var rescanned = false
        while (!isTerminated && state != WorkerState.TERMINATED) {
            val task = findTask(mayHaveLocalTasks)
            // Task found. Execute and repeat
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                //这里会依次从队首获取到当前要执行的任务, 进行执行
                executeTask(task)
                continue
            } else {
                mayHaveLocalTasks = false
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    三、Kotlin自定义调度器

    从上述例子中,我们可以看到系统预置的Default调度器是怎么执行的,我们可以仿照Default调度器来实现自己的调度器。

    比如说有以下场景,我希望我的任务都在子线程中顺序执行,那我们可以定义一个线程数为1的调度器,如下。

    我们定义一个线程池,实现调度器中的dispatch方法,直接在单线程池中执行即可。

    object SingleDispatcher : ExecutorCoroutineDispatcher() {
        private val myExecutor: Executor by lazy {
            Executors.newSingleThreadExecutor()
        }
        override val executor: Executor
            get() = myExecutor
    
        override fun close() {
        }
    
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            executor.execute(block)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    或者我们参考Default实现,规定最大线程数即可。

    @OptIn(InternalCoroutinesApi::class)
    object NewSingleDispatcher
        : ExperimentalCoroutineDispatcher(
        corePoolSize = 1, 
        maxPoolSize = 1, 
        idleWorkerKeepAliveNs = 60 * 1000 * 1000, 
        "NewSingleDisapthcer"
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用办法如下

    suspend fun login(): Boolean {
        //Dispatchers.IO表示使用IO调度器
        return GlobalScope.async(SingleDispatcher) { true }.await()
    }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    ElasticSearch 索引设计
    【数据结构】二叉树详解(1)
    Android SVGA动画
    【EMQX】2.1.2 为什么选择EMQ X
    【原型详解】JavaScript原型链:深入了解Prototype,超级详细!!!
    Kafka之Broker原理
    6、流程控制语句
    马尔科夫链、PCV及贝叶斯动图详解
    反向传播不香了?解读 Hinton 大佬的 Forward-Forward 算法
    硅谷来信:Google、Facebook员工的“成长型思维”
  • 原文地址:https://blog.csdn.net/weixin_43662090/article/details/126320892