协程必须运行在一个线程上,所以要指定调度器。是一个抽象类,Dispatcher是一个标准库中帮我们封装了切换线程的帮助类,可以调度协程在哪类线程上执行。创建协程时,上下文如果没有指定也没有继承到调度器,则会添加一个默认调度器(调度器通过 ContinuationInterceptor 延续体拦截器实现的)。
通过Dispatchers调度,而不是Thread因为不是单纯指定线程。
- 由于子协程会继承父协程的上下文,在父协程上指定调度器模式后子协程默认使用这个模式。
- IO 和 DEFAULT 模式共享同一线程池,重用线程起到优化(DEFAULT 切换到 IO 大概率停留在同一线程上),两者对线程数量限制是独立的不会让对方饥饿。最大限度一起使用的话,默认同时活跃的线程数为 64+CPU数。
Dispatcher.Main | 运行于主线程,在 Android 中就是 UI 线程,用来处理一些 UI 交互的轻量级任务。 | 调用 suspend 函数 调用 UI 函数 更新 LiveData |
Dispatcher.Main.immediate | 协程的调度是有成本的,当我们已经处在主线程时,开启一个调度到主线程的子协程,会经历挂起等待恢复,这是不必要的开销,甚至队列很长会导致数据延迟显示(例如 ViewModelScope 就处在Android默认的主线程中,因此上下文中的调度器使用了这个),此时指定为 immediate 就只会在需要的时候调度,否则直接执行。 | 函数被withContext包装在Dispatcher.Main上运行时使用。 |
Dispatcher.IO | 运行于线程池,专为IO阻塞型任务进行了优化。最大线程数为64个,只要没超过且没有空闲线程就一直可开辟新线程执行新任务。 | 数据库 文件读写 网络处理 |
Dispatcher.Default | 运行于线程池,专为CPU密集型计算任务进行了优化。最大线程数为CPU核心个数(但不少于2个),若全在忙碌时新任务无法得到执行。 | 数组排序 Json解析 处理差异判断 计算Bitmap |
Dispatcher.Unconfined | 不改变线程,在启动它的线程执行,在恢复它的线程执行。调度成本最低性能最好,但有处在主线程上调用了阻塞操作的风险。 | 当不需要关心协程在哪个线程上被挂起时使用。 |
1.6版本引入。
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher |
- suspend fun main(): Unit = coroutineScope {
- //使用默认IO模式
- launch {
- printTime(Dispatchers.IO) //打印:Dispatchers.IO 花费了: 2038/
- }
- //使用limitedParallelism增加线程
- launch {
- val dispatcher = Dispatchers.IO.limitedParallelism(100)
- printTime(dispatcher) //打印:LimitedDispatcher@1cc12797 花费了: 1037
- }
- }
-
- suspend fun printTime(dispatcher: CoroutineDispatcher) {
- val time = measureTimeMillis {
- coroutineScope {
- repeat(100) {
- launch(dispatcher) {
- Thread.sleep(1000)
- }
- }
- }
- }
- println("$dispatcher 花费了: $time")
- }
创建10个协程,每个协程执行 i++ 1000次,预期结果 i=10000。
- fun main() = runBlocking {
- val deferreds = mutableListOf
Int>>() - repeat(10) {
- val deferred = async(Dispatchers.Default) {
- var i = 0
- repeat(1000) { i++ }
- return@async i
- }
- deferreds.add(deferred)
- }
- var result = 0
- deferreds.forEach {
- result += it.await()
- }
- println("i = $result")
- }
- 打印:i = 10000,耗时:77
可以使用 Synchronized、Lock、Atomic。由于是线程模型下的阻塞方式,不支持调用挂起函数,会影响协程挂起特性。
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- var i = 0
- val jobs = mutableListOf
() -
- @Synchronized
- fun add() { i++ }
-
- repeat(10) {
- val job = launch(Dispatchers.Default) {
- repeat(1000) { add() }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:71
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- val lock = Any()
- var i = 0
- val jobs = mutableListOf
() - repeat(10) {
- val job = launch(Dispatchers.Default) {
- repeat(1000) {
- synchronized(lock) { i++ }
- }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:73
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- val lock = ReentrantLock()
- var i = 0
- val jobs = mutableListOf
() - repeat(10) {
- val job = launch(Dispatchers.Default) {
- repeat(1000) {
- lock.lock()
- i++
- lock.unlock()
- }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:83
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- var i = AtomicInteger(0)
- val jobs = mutableListOf
() - repeat(10) {
- val job = launch(Dispatchers.Default) {
- repeat(1000) { i.incrementAndGet() }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:89
在没有 limitedParallelism() 的 1.6 版本以前就是这样做的,该方式的问题是容易忘记使用 close() 关闭,以及可能会抵消的使用该线程池(将未使用的线程保持活跃状态却不与其它服务共享这些线程)。
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- val mySingleDispatcher = Executors.newSingleThreadExecutor {
- Thread(it, "我的线程").apply { isDaemon = true }
- }.asCoroutineDispatcher()
- var i = 0
- val jobs = mutableListOf
() - repeat(10) {
- val job = launch(mySingleDispatcher) {
- repeat(1000) { i++ }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:64
Java方式不支持调用挂起函数,同步锁是阻塞式的会影响协程特性,为此 Kotlin 提供了非阻塞式锁Mutex。使用 mutex.lock() 和 mutex.unlock() 包裹需要同步的计算逻辑就可以实现多线程同步了,但由于包裹内容可能出现的异常使得 unlock() 无法被执行,写在 finally{} 中会很繁琐,因此提供了扩展函数 mutex.withLock{ },本质就是在 finally{ } 中调用了 unlock()。
public suspend inline fun lock(owner) try { return action() } finally { // 注意,这里并没有 catch 代码块,所以不会捕获异常 unlock(owner) } } |
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- var i = 0
- val mutex = Mutex()
- //使用方式一
- mutex.lock()
- // try {
- // repeat(10000) { i++ }
- // } catch (e: Exception) {
- // e.printStackTrace()
- // } finally {
- // mutex.unlock()
- // }
- //使用方式二
- mutex.withLock {
- try {
- repeat(10000) { i++ }
- } catch (e: Exception) {
- e.printStackTrace()
- }
- }
- println("i = $i,耗时:${System.currentTimeMillis() - start}")
- }
- //方式一打印:i = 10000,耗时:17
- //方式二打印:i = 10000,耗时:17
Actor是一个并发同步模型,本质是基于Channel管道消息实现的。
- sealed class Msg {
- object AddMsg : Msg()
- class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
- }
-
- @OptIn(ObsoleteCoroutinesApi::class)
- fun main() = runBlocking {
- val start = System.currentTimeMillis()
- val actor = actor
{ - var i = 0
- for (msg in channel) {
- when (msg) {
- is Msg.AddMsg -> i++
- is Msg.ResultMsg -> msg.result.complete(i)
- }
- }
- }
- val jobs = mutableListOf
() - repeat(10) {
- val job = launch {
- repeat(1000) {
- actor.send(Msg.AddMsg)
- }
- }
- jobs.add(job)
- }
- jobs.joinAll()
- val deferred = CompletableDeferred<Int>()
- actor.send(Msg.ResultMsg(deferred))
- val result = deferred.await()
- actor.close()
- println("i = $result,耗时:${System.currentTimeMillis() - start}")
- }
- //打印:i = 10000,耗时:167
Semaphore是协程中的信号量 ,指定通行的数量为1就可以保证并发的数量为1。
- fun main() = runBlocking {
- var count = 0
- val semaphore = Semaphore(1)
- repeat(1000) {
- GlobalScope.launch {
- semaphore.withPermit { count++ }
- }
- }.joinAll()
- println(count)
- }