• Kotlin 协程 - 协程调度器 CoroutineDispatcher


    一、概念

    协程必须运行在一个线程上,所以要指定调度器。是一个抽象类,Dispatcher是一个标准库中帮我们封装了切换线程的帮助类,可以调度协程在哪类线程上执行。创建协程时,上下文如果没有指定也没有继承到调度器,则会添加一个默认调度器(调度器通过 ContinuationInterceptor 延续体拦截器实现的)。通过Dispatchers调度,而不是Thread因为不是单纯指定线程。

    二、模式

    • 由于子协程会继承父协程的上下文,在父协程上指定调度器模式后子协程默认使用这个模式。
    • IO 和 DEFAULT 模式共享同一线程池,重用线程起到优化(DEFAULT 切换到 IO 大概率停留在同一线程上),两者对线程数量限制是独立的不会让对方饥饿。最大限度一起使用的话,默认同时活跃的线程数为 64+CPU数。
    Dispatcher.Main运行于主线程,在 Android 中就是 UI 线程,用来处理一些 UI 交互的轻量级任务。

    调用 suspend 函数

    调用 UI 函数

    更新 LiveData

    Dispatcher.Main.immediate协程的调度是有成本的,当我们已经处在主线程时,开启一个调度到主线程的子协程,会经历挂起等待恢复,这是不必要的开销,甚至队列很长会导致数据延迟显示(例如 ViewModelScope 就处在Android默认的主线程中,因此上下文中的调度器使用了这个),此时指定为 immediate 就只会在需要的时候调度,否则直接执行。函数被withContext包装在Dispatcher.Main上运行时使用。
    Dispatcher.IO

    运行于线程池,专为IO阻塞型任务进行了优化。最大线程数为64个,只要没超过且没有空闲线程就一直可开辟新线程执行新任务。

    数据库

    文件读写

    网络处理

    Dispatcher.Default

    运行于线程池,专为CPU密集型计算任务进行了优化。最大线程数为CPU核心个数(但不少于2个),若全在忙碌时新任务无法得到执行。

    数组排序

    Json解析

    处理差异判断

    计算Bitmap

    Dispatcher.Unconfined不改变线程,在启动它的线程执行,在恢复它的线程执行。调度成本最低性能最好,但有处在主线程上调用了阻塞操作的风险。当不需要关心协程在哪个线程上被挂起时使用。

    三、限制线程数 limitedParallelism()

    1.6版本引入。

    • 对于Default模式:当有一个开销很大的任务,可能会导致其它使用相同调度器的协程抢不到线程执行权,这个时候就可以用来限制该协程的线程使用数量。
    • 对于IO模式:当有一个开销很大的任务,可能会导致阻塞太多线程让其它任务暂停等待,突破默认64个线程的限制加速执行(不显著)。
    • 传参将线程限制为1,解决多线程并发修改数据的同步问题。但如果阻塞了它,其它操作都要等待。
    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher
    1. suspend fun main(): Unit = coroutineScope {
    2. //使用默认IO模式
    3. launch {
    4. printTime(Dispatchers.IO) //打印:Dispatchers.IO 花费了: 2038/
    5. }
    6. //使用limitedParallelism增加线程
    7. launch {
    8. val dispatcher = Dispatchers.IO.limitedParallelism(100)
    9. printTime(dispatcher) //打印:LimitedDispatcher@1cc12797 花费了: 1037
    10. }
    11. }
    12. suspend fun printTime(dispatcher: CoroutineDispatcher) {
    13. val time = measureTimeMillis {
    14. coroutineScope {
    15. repeat(100) {
    16. launch(dispatcher) {
    17. Thread.sleep(1000)
    18. }
    19. }
    20. }
    21. }
    22. println("$dispatcher 花费了: $time")
    23. }

    四、多线程并发问题

    创建10个协程,每个协程执行 i++ 1000次,预期结果 i=10000。

    4.1 避免使用共享变量

    1. fun main() = runBlocking {
    2. val deferreds = mutableListOfInt>>()
    3. repeat(10) {
    4. val deferred = async(Dispatchers.Default) {
    5. var i = 0
    6. repeat(1000) { i++ }
    7. return@async i
    8. }
    9. deferreds.add(deferred)
    10. }
    11. var result = 0
    12. deferreds.forEach {
    13. result += it.await()
    14. }
    15. println("i = $result")
    16. }
    17. 打印:i = 10000,耗时:77

    4.2 使用Java方法(不推荐)

    可以使用 Synchronized、Lock、Atomic。由于是线程模型下的阻塞方式,不支持调用挂起函数,会影响协程挂起特性。

    4.2.1 使用同步锁

    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. var i = 0
    4. val jobs = mutableListOf()
    5. @Synchronized
    6. fun add() { i++ }
    7. repeat(10) {
    8. val job = launch(Dispatchers.Default) {
    9. repeat(1000) { add() }
    10. }
    11. jobs.add(job)
    12. }
    13. jobs.joinAll()
    14. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    15. }
    16. //打印:i = 10000,耗时:71

    4.2.2 使用同步代码块

    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. val lock = Any()
    4. var i = 0
    5. val jobs = mutableListOf()
    6. repeat(10) {
    7. val job = launch(Dispatchers.Default) {
    8. repeat(1000) {
    9. synchronized(lock) { i++ }
    10. }
    11. }
    12. jobs.add(job)
    13. }
    14. jobs.joinAll()
    15. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    16. }
    17. //打印:i = 10000,耗时:73

    4.2.3 使用可重入锁 ReentrantLock

    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. val lock = ReentrantLock()
    4. var i = 0
    5. val jobs = mutableListOf()
    6. repeat(10) {
    7. val job = launch(Dispatchers.Default) {
    8. repeat(1000) {
    9. lock.lock()
    10. i++
    11. lock.unlock()
    12. }
    13. }
    14. jobs.add(job)
    15. }
    16. jobs.joinAll()
    17. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    18. }
    19. //打印:i = 10000,耗时:83

    4.2.4 使用 AtomicInteger 保证原子性

    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. var i = AtomicInteger(0)
    4. val jobs = mutableListOf()
    5. repeat(10) {
    6. val job = launch(Dispatchers.Default) {
    7. repeat(1000) { i.incrementAndGet() }
    8. }
    9. jobs.add(job)
    10. }
    11. jobs.joinAll()
    12. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    13. }
    14. //打印:i = 10000,耗时:89

    4.3  使用单线程(不推荐)

    在没有 limitedParallelism() 的 1.6 版本以前就是这样做的,该方式的问题是容易忘记使用 close() 关闭,以及可能会抵消的使用该线程池(将未使用的线程保持活跃状态却不与其它服务共享这些线程)。

    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. val mySingleDispatcher = Executors.newSingleThreadExecutor {
    4. Thread(it, "我的线程").apply { isDaemon = true }
    5. }.asCoroutineDispatcher()
    6. var i = 0
    7. val jobs = mutableListOf()
    8. repeat(10) {
    9. val job = launch(mySingleDispatcher) {
    10. repeat(1000) { i++ }
    11. }
    12. jobs.add(job)
    13. }
    14. jobs.joinAll()
    15. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    16. }
    17. //打印:i = 10000,耗时:64

    4.4  使用 Mutex

    Java方式不支持调用挂起函数,同步锁是阻塞式的会影响协程特性,为此 Kotlin 提供了非阻塞式锁Mutex。使用 mutex.lock() 和 mutex.unlock() 包裹需要同步的计算逻辑就可以实现多线程同步了,但由于包裹内容可能出现的异常使得 unlock() 无法被执行,写在 finally{} 中会很繁琐,因此提供了扩展函数 mutex.withLock{ },本质就是在 finally{ } 中调用了 unlock()。

    public suspend inline fun Mutex.withLock(owner: Any? = null, action: () -> T): T {
        lock(owner)
        try {
            return action()
        } finally {          // 注意,这里并没有 catch 代码块,所以不会捕获异常
            unlock(owner)
        }
    }
    1. fun main() = runBlocking {
    2. val start = System.currentTimeMillis()
    3. var i = 0
    4. val mutex = Mutex()
    5. //使用方式一
    6. mutex.lock()
    7. // try {
    8. // repeat(10000) { i++ }
    9. // } catch (e: Exception) {
    10. // e.printStackTrace()
    11. // } finally {
    12. // mutex.unlock()
    13. // }
    14. //使用方式二
    15. mutex.withLock {
    16. try {
    17. repeat(10000) { i++ }
    18. } catch (e: Exception) {
    19. e.printStackTrace()
    20. }
    21. }
    22. println("i = $i,耗时:${System.currentTimeMillis() - start}")
    23. }
    24. //方式一打印:i = 10000,耗时:17
    25. //方式二打印:i = 10000,耗时:17

     4.5 使用 Actor

    Actor是一个并发同步模型,本质是基于Channel管道消息实现的。

    1. sealed class Msg {
    2. object AddMsg : Msg()
    3. class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
    4. }
    5. @OptIn(ObsoleteCoroutinesApi::class)
    6. fun main() = runBlocking {
    7. val start = System.currentTimeMillis()
    8. val actor = actor {
    9. var i = 0
    10. for (msg in channel) {
    11. when (msg) {
    12. is Msg.AddMsg -> i++
    13. is Msg.ResultMsg -> msg.result.complete(i)
    14. }
    15. }
    16. }
    17. val jobs = mutableListOf()
    18. repeat(10) {
    19. val job = launch {
    20. repeat(1000) {
    21. actor.send(Msg.AddMsg)
    22. }
    23. }
    24. jobs.add(job)
    25. }
    26. jobs.joinAll()
    27. val deferred = CompletableDeferred<Int>()
    28. actor.send(Msg.ResultMsg(deferred))
    29. val result = deferred.await()
    30. actor.close()
    31. println("i = $result,耗时:${System.currentTimeMillis() - start}")
    32. }
    33. //打印:i = 10000,耗时:167

     4.6 使用Semaphore

    Semaphore是协程中的信号量 ,指定通行的数量为1就可以保证并发的数量为1。

    1. fun main() = runBlocking {
    2. var count = 0
    3. val semaphore = Semaphore(1)
    4. repeat(1000) {
    5. GlobalScope.launch {
    6. semaphore.withPermit { count++ }
    7. }
    8. }.joinAll()
    9. println(count)
    10. }
  • 相关阅读:
    学生成绩管理系统?
    收银台——Web自动化测试
    windows11下运行swin-transformer算法
    ASP.Net Core创建MVC项目上传多个文件(流方式)
    【Java笔试强训】Day2(OR62 倒置字符串,排序子序列)
    【考研】数据结构——特殊矩阵的压缩存储(含真题)
    docker 基本命令
    wireshark抓包工具的使用
    【物联网】NB-IoT
    2023-05-20:go语言的slice和rust语言的Vec的扩容流程是什么?
  • 原文地址:https://blog.csdn.net/HugMua/article/details/132797687