本文是对发布在 Medium 上的文章 Kotlin Coroutines Flow in a nutshell 的翻译。
请注意,虽然本翻译按照
CC BY-SA 4.0分发,但没有原作者的授权,我本质上不享有翻译权。所以如果想转载本文,你首先应该取得原作者的同意。
在我的上一篇文章中,我阐明了
RxJava 的工作原理。很长一段时间以来,在 Android 中,RxJava
是用于处理流和多线程的事实标准。但现在可以选择受 Google 推荐的,来自
JetBrains 的 Coroutines(协程)。
虽然很多开发者对 Coroutines 仍存疑虑,但新项目很少依赖于
RxJava。Flow 替代了它的位置。
本文关于……
在本文中,我会告诉你 Flow
如何工作。包含使用的基本准则和生命周期(lifecycle)管理,也就是说我们在讨论分配(Dispatching)。
本文的目标读者即包括第一次尝试理解 Flow
的人,也包括有一定经验的协程使用者。
链式调用
CoroutineScope(context = Dispatchers.Main.immediate).launch() {
doAction()
flowOf("Hey")
.onEach { doAction() }
.map { it.length }
.onStart { doAction() }
.flowOn(Dispatchers.Default)
.flatMapMerge {
doAction()
flowOf(1)
.flowOn(Dispatchers.Main)
.onEach { doAction() }
}
.flowOn(Dispatchers.IO)
.collect {
doAction()
}
}
}我们的目标是搞清楚每个动作实际的效果,以何顺序调用,在哪个线程上执行。
基础和生命周期
Flow 的定义只有两个接口:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}这两个接口是消费者-生产者模式(Consumer & Producer pattern)的基础。
在继续之前,我强烈建议你查看基于这两个接口编写的流 API 示例。
每个 Flow 链代表一组特定操作。每次操作会创建新的
Flow 对象,同时也会存储先前调用的 Flow
实例的引用。在调用 collet
方法之前,运算不会开始(冷流)。
Flow 的生命周期历经 5 个重要阶段:
启动 ⬇️
一个协程在使用特定 Dispatcher 的 CoroutineScope 上启动。 之后:Flow 创建、操作收集 & 数据发射。最终结果将在指定的 Dispatcher 上处理。
Flow 创建 ⬇️
在当前线程,运算从上到下创建。(类似于 Builder 模式。)
操作收集 ⬆️
自下而上进行,每个操作符收集上一个。(译注:也就是创建对上一个操作的引用)
数据发射(Emission)⬇️
数据发射开始于所有操作被成功收集,并最终调用了
collect时。数据从上往下依次执行操作。取消或完成 整个链死亡 😵
链执行完毕或取消
让我们仔细看看每个阶段。
启动
// Scope 启动
val job = CoroutineScope(Dispatchers.Main.immediate).launch {
doAction()
/*.../*
}一目了然。我们创建了一个在主线程上运行的协程作用域。doAction()
方法也在这个协程上启动。
Scope 返回了 Job,可以用于管理生命周期。(例如,调用
cancel() 停止全部工作。)
Immediate Dispatcher(调度器)的作用
译注:这里是 Android Only 的内容,对其他平台不一定适用。非 Android 开发者跳过此节没有影响。
在 Android 中,切换到主线程的唯一方式是使用
Handler/Looper/MessageQueue 链。
这个逻辑隐藏在 HandlerContext 里,同时这也是
Dispatcher.Main 的隐藏逻辑。
// 使用 Looper.mainLooper() 创建 handler
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}假设我们已经在主线程了,但我们仍尝试用 handler.post
切换。
在这种情况下代码不会立刻执行,因为可能会影响用户体验,比如,造成屏幕闪烁。
代码必须等待 MessageQueue
上的其他命令完成。Dispatcher.Main.immediate
主要作用是跳过该队列并立即执行。
Dispatcher 有一个 isDispatchNeeded
方法以解决问题。在 HandlerContext 中,该方法这样实现:
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}Dispatchers.Main.immediate 在
HandlerContext 创建新实例,它将
invokeImmediately 设为 true。因此,主线程的
Looper 将始终与当前线程的 Looper
比较,从而防止对 handler.post 的非必要调用。
Flow 创建
我们所写的 Flow 的第一个链是
flowOf("hey"),在底层,可以看到显式创建了 Flow
的实例,并将值存在 lambda 中。lambda 将会在收集阶段被调用。
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
public fun <T> flowOf(value: T): Flow<T> = flow {
emit(value)
}
internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}之后,onEach 方法也会以同样的方式创建。
总之,该拓展函数同样会保留对先前 Flow 的引用。
直到 collect()
的所有其他运算符都这样创建,同时不执行任何真正的运算。
在创建阶段链的行为:
flowOf("Hey")→ 缓存传递的值
onEach { doAction() }→ 缓存 lambda 并在发射阶段执行
map {...}→ 通过映射缓存 lambda
onStart { doAction() }→ 缓存 lambda 并在收集阶段执行
flowOn(Dispatchers.Default)→ 缓存赋值到该 Dispatcher 的操作
flatMapMerge {...}→ 缓存 lambda,并在发射阶段执行
flowOn(Dispatchers.IO)→ 缓存赋值到该 Dispatcher 的操作
最终,每个操作符,除了第一个,都保留对先前操作的引用,构成一个
LinkedList
创建操作将会在主线程执行。

收集
该过程自下而上进行,并在终端(terminal)操作符调用后立即开始。
Flow 中的终端操作:
collcet()first()toList()toSet()reduce()fold()
当我们调用 collect 时,不收集整条链,而只收集上方的
flowOn。
然后 flowOn 调用 collect
和自身之间的引用操作符,在这个例子里是
flatMapMerge。这就是为何操作符保留对上游引用的原因。
在收集阶段链的行为:
flowOn(Dispatchers.IO)→ 在 IO Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
flatMapMerge {...}→ 调用上游的 collectflowOn(Dispatchers.Default)→ 在 Default Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
onStart { doAction() }→ 在 Default Dispatcher 上执行操作 → 调用上游的 collect
map {...}→ 调用上游的 collect
onEach {...}→ 调用上游的 collect

在所有操作之中,只有 onStart 和 flowOn
被执行。
线程切换了两次:第一次到 IO,另一次是
Default。
也就是说 flowOn 将被执行多次,并创建一些协程实例。
⚠️ 然而,flowOn 并不总在底层创建新协程。看下面的例子:
CoroutineScope(Dispatchers.Main.immediate).launch {
flowOf("Hey")
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.collect()
}我们有意地写了多次切换到同样 Dispatcher 的 flowOn。
结果
// onStart1
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main
// onStart2
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main
// onStart3
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main如你所见,协程实例只被创建了一次,并且绑定了一个线程。
发射
一旦到达没有对其他 Flow 引用的
Flow,发射过程就开始了。从根 Flow 到最低。
flowOf("Hey")→ 发射
hey,Default DispatcheronEach { doAction() }→ 执行操作,Default Dispatcher
map {...}→ 映射,Default Dispatcher
onStart { doAction() }→ 发射
3,Default DispatcherflowOn(Dispathers.Default)→ 发射
3,Default DispatcherflatMapMerge { ... }这个操作比较棘手,回想一下它的内容:
// ... flatMapMerge { doAction() flowOf(1) .flowOn(Dispatchers.Main) .onEach { doAction() } }在
flatMapMerge里面的链也会经历创建、收集、发射。之后,最终的值会被发射到下流。
请注意,onEach 将在 IO Dispatcher 上执行。(在块执行之前恢复。)
与 RxJava 不同的是,Kotlin Flow 有上下文保留(Context Preservation)的概念,保证了上层流的上下文不会影响到下层流。
flowOn(Dispatchers.IO)→ 发射
1,IO Dispatchercollect→ 在主线程调用收集器,尽管协程上下文在上游中被改变

结论
collcet()方法被挂起,这迫使我们提前决定在哪个上下文中处理我们的链的结果。所有操作从上到下创建,从下到上收集。然后被一个
LinkedList组织起来。发射过程从根 Flow 开始,从上到下执行。一些操作可能在收集阶段执行。例如,写入多少值,
onStart就执行多少次。flowOn创建一个新的协程,在参数中传递 Dispatcher 并更改上下文。(但是,如果我们有多个 flowOn 使用同一个调度程序,则实际上只会创建一个协程。)它只影响上游,从而保证符合上下文保留原则。
协程的创建和上下文的改变都可以在收集和发射两个阶段执行。
一个线程可能被多个协程共用。 如果你写了
flowOn,如果当前上下文不同,肯定会创建一个新的协程。但是,不能保证线程不同。flatMapMerge/flatMapConcat仅在父链数据发射期间启动链。在根 Flow 收集过程中不执行任何操作。
在我的下一篇文章中,我将展示协程的底层,以及比常规线程更高效的原因。
敬请关注!
感谢 @kost.maksym 对内容的审阅。
