本文是对 Kotlin Coroutines by Tutorials By Filip Babić and Nishant Srivastava 书本内容的转述和笔记,书本内容是受原版权保护的内容。
Chapter 6: 上下文切换与协程调度器
TL; DR
Work Scheduling 工作调度
TL; DR
Swimming in a Pool of Thread 畅游线程池
TL; DR
Context Switching 上下文切换
在协程 API
中,你不必担心创建你自己的线程或线程池,也不必担心调度多个协程如何执行,以及它们的生命周期。协程
API 有一种特定的方式来交流所有这些信息 ── 通过
ContinuationInterceptors,你通过 Dispatchers
提供这些信息,你将在本章后面了解这些信息。
为了充分理解这些 Dispatchers
是如何工作的,重要的是要理解系统的进程,和线程状态的基本通信模式。这种模式被称为:上下文切换。然而,从单任务系统到多任务系统,其定义各不相同。但在本章中我们将重点讨论多任务。
从本质上讲,当系统切换上下文时,意味着它从一个任务转移到另一个任务,并保存前一个任务的状态,以便以后可以继续进行。这听起来很熟悉?这与
Continuation
在内部所做的非常相似,当它涉及到可挂起功能的挂起点时。嗯,这也是为什么所有的调度器实际上都实现了
ContinuationInterceptor,因为通过拦截
Continuation
的过程,以及它们的执行流程,系统可以挂起和恢复──也就是──切换当前任务或函数的上下文。
但挂起和恢复任务并不是全部,系统还应该能够在单个任务的线程之间进行切换。仔细想想,这两个概念在 Kotlin 协程中是对等的。
如果你需要在后台做一些事情,然后切换到主线程,发布一个值或一些操作的结果,你最终会创建另一个协程,把它推到主线程,然后从内部切换到该协程。这基本上就是上下文切换,另外就是在线程之间进行切换。因此,每个协程中最重要的部分被称为
CoroutineContext,这并不是巧合。
现在你有点明白了系统是如何处理协程的上下文切换的,是时候进入调度环节了
Explaining
ContinuationInterceptors 解析「延续」拦截器
尽管本章提到了
ContinuationInterceptors,但对它们的工作原理可能还是有点不清楚。如果你还记得调用堆栈中的函数和可暂停的函数被调用时的情况的图示:

当你在栈中有多个函数,以及多个「延续」时,你了解到你可以通过将值或异常向下传播,一直返回到主延续。
ContinuationInterceptors
与该函数执行和线程一起工作。每当你启动一个协程,或者用
Dispatcher
调用一个可挂起函数时,你就给了拦截器挂起和恢复协程的继续的能力──执行流(Execution
Flow)。它可以在一个点上拦截值的传播,并将其重定向到另一个协程或任务。
正因为如此,如果你使用 Dispatchers.Default
创建一个协程来获取一些值,然后在其中用 Dispatchers.Main
启动一个新的协程来把它推到主线程上,你将有效地拦截第一个协程的返回值,继续进行第二个协程传入上下文和值,这样你就可以在主线程上做一些工作,然后你完成了两个协程。如果第二个协程出了问题,拦截器会将异常一直向上传播到父协程,取消掉这两个协程。
这种类型的行为是通过包装 Continuation
的过程实现的。每次你用 ContinuationInterceptor
切换上下文时,它都会通过使用 interceptContinuation
来包装前一个 Continuation,从而创建一个新的
Continuation。该函数的签名如下:
abstract fun <T> interceptContinuation(
continuation: Continuation<T>
): Continuation<T>它非常简单,但也非常强大。任何时候,系统都会给一个函数的
Continuation
发出信号,包括一个新的值或一个异常,ContinuationInterceptor
可以接受这个 Continuation,对它做一些工作,最后恢复执行。在
Dispatchers 中,ContinuationInterceptors
所做的工作通常是上下文切换,就是从一个线程池转移到另一个线程池。
Coroutine Dispatcher Types 协程调度器类型
Kotlin 提供了一种简洁的方式,使用 Dispatchers
在协程中交流线程设定。它们是 CoroutineContext.Element
的实现,构成了协程执行时行为的拼图的一部分。在一般的计算中,调度器是一个模块,它将
CPU 的控制权交给调度机制选择的哪个进程来执行。因此,一个
Scheduler 决定哪个进程是下一个要用 CPU
的进程,但是它把这个进程交给一个 Dispatcher,让
Dispatcher
控制这个进程实际使用的资源。这两个模块或机制共同控制着操作系统中的进程。
类似的事情也发生在 CoroutineDispatchers
身上。调度器通过将线程或线程池委托给它们的方式来决定协程如何使用可用资源。一旦你将某个调度器附加到一个协程,它就会被分配到调度器所知道的线程或线程池。
由于它们与线程打交道,协程中的调度器可以是
限制(Confined) 的和
非限制(Unconfined)
的。限制调度器总是依赖于预定义的系统上下文──比如
Dispatchers.Main。无论你使用了多少次 Main
调度器,它都会使协程在主线程上工作。另一方面,无限制调度器没有特定的操作上下文,也不遵循任何严格的规则。它们要么创建新的线程来运行协程,要么将工作推给已经被代码调用了的线程,这使它们变得不可预知。
只有很少一部分预定义的调度器,这些调度器分别是
Dispatchers.Default:启动协程的默认线程策略,局限于父级的上下文,通常是一个 Worker 线程池。Dispatchers.IO:与Default类似,它基于 JVM,并由线程池支持,主要运行 IO 相关的任务。Dispatchers.Main:主线程调度器,与线程相连,对 UI 对象进行操作。Dispatchers.Unconfined:顾名思义,它是无限制的,它将在当前使用它的那个线程上运行。
让我们逐一看一下它们,看看你能用它们做什么。
Default Dispatcher 默认调度器
它被用在协程的基础上,只要你不指定一个调度器,DEFAULT
就会被使用。它使用起来很方便,因为它有一个工作线程池支持,DEFAULT
调度器能处理的任务数总是等于系统的核心数,而且至少是 2
个。因为整个线程机制和线程池都是预先建立的,所以你可以依靠它来完成你的日常工作,脱离你的主线程。
IO Dispatcher IO 调度器
每当你试图处理一些有输入和输出的东西时,比如上传或解密 / 加密文件,你可以使用这个调度器来为你提供方便。也就是说,它与 JVM 绑定,所以如果你使用 Kotlin/JavaScript 或 Kotlin/Native 项目,你将无法使用它。
Main Dispatcher 主调度器
这个调度器与具有某种形式的用户界面的系统联系在一起,如 Android
或可视化的 Java 应用程序。如前所述,它将工作分派给处理 UI
对象的线程。你不能在没有用户界面的情况下使用它,如果你试图在一个不使用
Swing、JavaFX 或不是 Android 应用的项目中调用
Dispatchers.Main,你的代码会崩溃。
它最好是在你获取你需要的数据后,在另一个协程中使用,然后在显示数据前处理所有的逻辑。你只需将数据传回主线程,然后让它呈现。或者更好的是,你可以在主线程上运行协程,桥接到后台,使用
withContext 或
async/await,最终将结果拉回到主线程进行渲染。
Using Dispatchers 使用调度器
假设你有下面的代码:
GlobalScope.launch {
println("This is a coroutine")
}你无法得知任何关于线程或调度的信息,这发生在幕后。让我们回顾一下启动函数的签名:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job第一部分很重要,在这里。它默认使用
EmptyCoroutineContext,除非你指定一个不同的。你已经了解到,上下文定义了协程如何启动,在哪里启动,如何处理错误,以及它的生命周期是什么。EmptyCoroutineContext
没有定义任何错误处理,它没有父级上下文,它使用默认的生命周期管理,而且它没有
CoroutineInterceptor,所以它使用
Dispatchers.Default。
从你所学到的关于调度器的知识来看,这意味着该协程将使用一个预定义的线程池来完成它的工作。如果你想使用任何其他的调度器,只需将其传入以取代
EmptyCoroutineContext
的默认参数。一旦你这么做了,该调度器将被用作协程的上下文,它将决定所有的线程。
接下来,对这个例子做一点改变。不要打印一些假文本,而是使用下面的代码片段,让协程打印它所处的线程。
fun main() {
GlobalScope.launch { println(Thread.currentThread().name) }
Thread.sleep(50)
}如果你运行这个,它将打印出与此类似的东西。
DefaultDispatcher-worker-1这仍然还处于你对默认调度器的了解范围内。你用下面的方法得到同样的结果:
fun main() {
GlobalScope.launch(context = Dispatchers.Default) {
println(Thread.currentThread().name)
}
Thread.sleep(50)
}如果你要传入一个不同的调度器,你会得到不同的结果。一个例子是
Dispatchers.Unconfined。因此,如果你有下面的代码:
fun main() {
GlobalScope.launch(context = Dispatchers.Unconfined) {
println(Thread.currentThread().name)
}
Thread.sleep(50)
}它将打印出 main
作为它的线程。这是因为非受限调度器只是接收了代码运行的任何一个线程,并将协程附加到该线程。但是,如果你不想把你的代码限制在一组约束中,而协程
API 为你提供了这些约束呢?如果你需要的任务需要一个单独的线程呢?
Creating a Work Stealing Executor 创建一个偷工减料执行器
通过标准的协程
API,你也有能力为协程创建新的线程或线程池。这是通过创建一个新的
Executor 来实现的。Executor
是执行指定任务的对象。它们通常与 Runnable
捆绑在一起,因为它们将任务包裹在一个需要执行的 Runnable
中。例如,创建一个 Work Stealing Executor
意味着它将使用所有可用的资源,以实现一定程度的并行性,你可以定义它如下。
fun main() {
val executorDispatcher = Executors
.newWorkStealingPool()
.asCoroutineDispatcher()
GlobalScope.launch(context = executorDispatcher) {
println(Thread.currentThread().name)
}
Thread.sleep(50)
}如果你运行这个,它将打印出与此类似的东西。
ForkJoinPool-1-worker-9Executor 在这里使用所有可用的资源,比如
ForkJoinPool 线程池,来完成你的任务。
一旦它完成了任务,它就可以把占用的资源重新分配给应用程序的其他部分。如果你把该
Executor 的并行性级别定为 4 级,并且你在许多协程中使用该
Executor,只要有工作需要分配,它就会分配资源以实现 4
个并行执行。
