在Android开发中,Kotlin最终编译为Java的字节码。众所周知,Java中只有进程和线程的概念,并没有协程的概念。那么什么是协程?为什么我们需要协程?
协程,又称微线程。协程不像线程和进程那样,需要进行系统内核的上下文切换,协程的上下文切换由开发人员来决定。
概念过于抽象,我们举一个例子。
我们都知道多线程,当需要执行多项任务的时候,会采用多线程并发执行。拿Android开发中的网络请求来说,假如每个网络请求彼此独立不相干,那我们可以每个网络请求可以单独开启一个线程来执行。
当有1个网络请求,我们开启1个线程。
当有10个网络请求,我们开启10个线程。
当有100个网络请求,我们开启100个线程。
当有1000个网络请求,我们开启1000个线程。
…
隐隐感觉有哪里不对。
虽然理论上多线程并发执行会带来效率上的提高,但是系统内核就那么多,太多的线程并发执行只会拥堵在那里,并不会带来效率上的提升,并且会带来额外的线程切换开销。不仅如此,每个线程都会带来额外的内存开销。所以我们不能无限制的创建线程。
协程的出现刚好可以解决上述2个问题。协程运行在线程之上,当一个协程执行完成后,可以选择主动让出,让另一个协程运行在当前线程之上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多个协程。
有人会提出异议,协程听起来和线程池没有啥区别啊?
不错,笔者认为,Kotlin中的协程本质上与线程池并无区别,Kotlin中协程本质上是一套易用的线程池API,带来的是可读性的提升。
但是这种可读性的提升帮助是巨大的。协程有效解决了回调地狱问题。
假设有下面的场景,要先进行登录,然后才能获取位置,然后才能获取用户信息。每一次都必须异步执行,那我们不使用协程的写法如下:
fun main() {
login {
getLocation {
getUserInfo {
//通过一层层回调拿到真正结果
}
}
}
}
fun login(result: (Boolean) -> Unit) {
result.invoke(true)
}
fun getLocation(result: (Any) -> Unit) {
result.invoke(true)
}
fun getUserInfo(result: (Any) -> Unit) {
result.invoke(true)
}
使用协程之后,可以发现原来使用回调来实现的逻辑,现在变成了顺序执行,代码可读性好了很多。
fun main() {
GlobalScope.launch {
val loginResult = login()
val location = getLocation()
val user = getUserInfo()
}
}
suspend fun login(): Boolean {
//Dispatchers.IO表示使用IO调度器
return GlobalScope.async(Dispatchers.IO) { true }.await()
}
suspend fun getLocation(): Any {
//Dispatchers.IO表示使用Main调度器
return GlobalScope.async(Dispatchers.Main) { Any() }.await()
}
suspend fun getUserInfo():Any {
return GlobalScope.async(Dispatchers.Default) { Any() }.await()
}
在创建协程时候,需要指定调度器。协程的调度器用于确定执行协程的目标载体,即运行在哪个线程,包含在一个或者多个线程中。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派在线程池中,或者让它无线执行。系统预制的调度器有三种:
篇幅原因,这里我们选用Dispatchers.Default看一下源码实现:
核心部分有三个位置,第一个是参数的定义
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE, //Default调度器会向线程池一样,定义核心线程,默认是2个
maxPoolSize: Int = MAX_POOL_SIZE, //定义做大线程
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
xxxxxx
}
第二个位置是CoroutineScheduler类的dispatch部分
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
//将我们要执行的任务 block 封装为一个Task
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
//找到要执行这个任务的worker,Worker可以被理解为Thread
val currentWorker = currentWorker()
//将任务提交到队列中执行
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
//xxxxx
}
第三个位置是Worker的runWorker方法
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//这里会依次从队首获取到当前要执行的任务, 进行执行
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
}
从上述例子中,我们可以看到系统预置的Default调度器是怎么执行的,我们可以仿照Default调度器来实现自己的调度器。
比如说有以下场景,我希望我的任务都在子线程中顺序执行,那我们可以定义一个线程数为1的调度器,如下。
我们定义一个线程池,实现调度器中的dispatch方法,直接在单线程池中执行即可。
object SingleDispatcher : ExecutorCoroutineDispatcher() {
private val myExecutor: Executor by lazy {
Executors.newSingleThreadExecutor()
}
override val executor: Executor
get() = myExecutor
override fun close() {
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
executor.execute(block)
}
}
或者我们参考Default实现,规定最大线程数即可。
@OptIn(InternalCoroutinesApi::class)
object NewSingleDispatcher
: ExperimentalCoroutineDispatcher(
corePoolSize = 1,
maxPoolSize = 1,
idleWorkerKeepAliveNs = 60 * 1000 * 1000,
"NewSingleDisapthcer"
)
使用办法如下
suspend fun login(): Boolean {
//Dispatchers.IO表示使用IO调度器
return GlobalScope.async(SingleDispatcher) { true }.await()
}