前面几篇已经介绍了Flow
的一些基本用法,及其背后的Channel
。
这是Kotlin协程系列的第四篇文章。
本篇将继续尝试以RxJava
使用者的角度,探索Flow
中更多进阶功能,以满足更多的使用场景。
Kotlin协程系列相关文章导航
Kotlin Flow上手指南(三)SharedFlow与StateFlow (本篇)
Kotlin版本 : 1.5.31
Coroutine 版本 : 1.5.2
以下正文。
SharedFlow
熟悉RxJava
的人应该对其中的Subject
有所印象,与于Observale
这类需要调用subscribe
订阅后才发送数据的数据流不同。
Subject
是热流,并且同时具有发送数据与接收数据的功能,既是生产者也是消费者。
而在Flow
中,同样也有对应功能实现——SharedFlow
相比普通Flow
,SharedFlow
意为可共享的数据流。
- 对于同一个数据流,可以允许有多个订阅者共享。
- 不调用
collect
收集数据,也会开始发送数据。 - 允许缓存历史数据
- 发送数据函数都是线程安全的。
SharedFlow
本身的定义很简单,只比Flow
多个历史数据缓存的集合,只允许订阅数据。
就如同Kotlin中List
与MutableList
,一个仅表示可读,一个表示可读可写的类型。
ShardFlow
同样也有一个可变类型的版本MutableShardFlow
,定义了发送数据功能。
创建
首先来看MutableSharedFlow
是如何创建的。
-
replay
表示历史元素缓存区容量。
- 能够将最新的数据缓存到集合内,当历史缓存区满后,会移除最早的元素。
- 当在新消费者订阅了该数据流,会先将历史缓存区元素依次发送给新的消费者,然后才发送新元素。
-
extraBufferCapacity
表示除历史缓存区外的额外缓存区容量,用于扩充内部整体缓存容量。
-
onBufferOverflow
缓存区背压策略,默认是熟悉的
BufferOverflow.SUSPEND
,当额外缓冲区满后,挂起emit
函数,暂停发送数据。只有在
replay
和extraBufferCapacity
均不为0时才支持其他背压策略。
简单使用
不同于Flow
与ChannelFlow
中需要利用FlowCollector
或ProducerScope
来发送数据,由于MutableSharedFlow
本身就拥有发送数据功能,使其用法更接近于日常使用MutableList
。
fun test() = runBlocking{
val sharedFlow = MutableSharedFlow<String>()
//假设处于另一个类,异步发送数据
val produce = launch(Dispatchers.IO){
for (i in 0..50){
sharedFlow.emit("data$i")
delay(50)
}
}
}
复制代码
虽然此时并没有消费者订阅,但依旧会执行发送数据操作,只是目前没有设置历史缓存,所有数据都被"抛弃"了。
通常我们并不希望在消费者订阅端能够发送数据,只允许外部进行数据流订阅,此时就需要调用asSharedFlow
函数,将可变的MutableSharedFlow
转化为只读的SharedFlow
。
fun test() = runBlocking{
...
//模拟在外部调用
val readOnlySharedFlow = sharedFlow.asSharedFlow()
val scope = CoroutineScope(SupervisorJob())
delay(100)
val job1 = scope.launch { //消费者单独一个协程
readOnlySharedFlow.map {
delay(100)
"$it receive 1"
}
.collect {
println("collect1 result : $it")
}
}
delay(1000)
job1.cancel() //注意要关闭消费者所在的协程
}
collect1 result : data3 receive 1
collect1 result : data4 receive 1
collect1 result : data5 receive 1
collect1 result : data6 receive 1
collect1 result : data7 receive 1
collect1 result : data8 receive 1
collect1 result : data9 receive 1
collect1 result : data10 receive 1
collect1 result : data11 receive 1
复制代码
我们模拟一个外部消费者,这里延迟100ms再订阅数据,就只接收到从订阅后开始发送的所有后续数据。
SharedFlow
作为Flow
的子类,自然也能够使用Flow
的中间操作符。
这等效于RxJava
中的PublishSubject
。
如果在SharedFlow
创建时设置replay
属性,比如设置为2,就会缓存最新的两个值,此时运行结果就变成了:
val sharedFlow = MutableSharedFlow<String>(replay = 2)
collect1 result : data1 receive 1
collect1 result : data2 receive 1
collect1 result : data3 receive 1
collect1 result : data4 receive 1
collect1 result : data5 receive 1
collect1 result : data6 receive 1
collect1 result : data7 receive 1
collect1 result : data8 receive 1
复制代码
原本在订阅前发送的两个值也被消费者收集到了,等效于RxJava
的ReplayRelay.createWithSize<String>(2)
。
SharedFlow
是热流,而collect
是个挂起函数,会一直等待上游数据,不论上游是否发送数据。所以对于
SharedFlow
需要注意消费者所在的协程内,后续任务是不会执行的。
fun test() = runBlocking{
...
delay(200)
val job2 = scope.launch { //消费者单独一个协程
readOnlySharedFlow.map {"$it receive 2"}
.collect{println("collect2 result : $it")}
}
delay(1000)
job1.cancel()
job2.cancel()
}
复制代码
如果再新增一个消费者,其就会继续接收上游新发送的数据,直到消费者所在协程被关闭。
collect1 result : data3 receive 1
collect2 result : data5 receive 2
collect1 result : data4 receive 1
collect2 result : data6 receive 2
...
collect2 result : data13 receive 2
collect1 result : data12 receive 1
collect2 result : data14 receive 2
collect1 result : data13 receive 1
复制代码
但如果在job2
的消费者中主动抛出异常:
readOnlySharedFlow.map {
if (it == "data6") throw Exception("test Exception")
"$it receive 2"
}
collect1 result : data3 receive 1
collect2 result : data5 receive 2
collect1 result : data4 receive 1
test Exception
java.lang.Exception: test Exception
...
复制代码
在消费者中出现了未捕获异常,此时根据消费者运行所在协程的Job
类型有两种情况
- 如果协程作用域(父协程)的context是
Job
,则抛出异常无法捕获,如果像是测试程序中使用runBlocking
会直接抛出异常,程序崩溃。- 如果协程作用域(父协程)的context是
SupervisorJob
,则只会影响到消费者所在的协程,其他消费者接收数据不受影响。
所有消费者抛出的异常并不会影响上游共享数据流,当然所有SharedFlow
的订阅者最好都利用catch
操作符捕获住异常,也可在协程内设置CoroutineExceptionHandler
进行捕获。
冷流转热流
在RxJava
中,允许将Subject
的热流通过toFlowable
函数转化为Flowable
类型的冷流,但反过来将冷流转化为热流的功能,却似乎并没有提供。
而在ShardFlow
中就提供了冷流转热流的函数——shareIn
。
这里started
表示新创建的共享数据流的启动与停止策略。
-
Eagerly
立即开始发送数据源。并且消费端永远收集数据,只会收到历史缓存和后续新数据,直到所在协程取消。
-
Lazily
等待出现第一个消费者订阅后,才开始发送数据源。保证第一个消费者能收到所有数据,但后续消费者只能收到历史缓存和后续数据。
消费者会永远等待收集数据,直到所在协程取消
-
WhileSubscribed
可以说是
Lazily
策略的进阶版,同样是等待第一个消费者订阅后,才开始发送数据源。但其可以配置在最后一个订阅者关闭后,共享数据流上游停止的时间(默认为立即停止),与历史数据缓存清空时间(默认为永远保留)。
public fun WhileSubscribed( stopTimeoutMillis: Long = 0, //上游数据流延迟结束,ms replayExpirationMillis: Long = Long.MAX_VALUE //缓冲数据清空延迟,ms ): SharingStarted 复制代码
利用shareIn
以及后文介绍的stateIn
,即可将消耗一次资源从数据源获取数据的Flow
数据流,转化为SharedFlow
或StateFlow
,实现一对多的事件分发,并减少多次调用资源的损耗。
需要注意,在使用
shareIn
每次都会创建一个新SharedFlow
实例,并且该实例会一直保留在内存中,直到被垃圾回收。所以最好减少转换流的执行次数,不要在函数内每次都调用这类函数。
更多关于SharingStarted
的使用场景,可以参见shareIn 和 stateIn 使用须知
SharedFlow实现分析
在上一篇提到的ChannelFlow
内部是通过Channel
实现线程安全的多协程通信,那么SharedFlow
的内部实现又是怎么样的呢?
通过MutableSharedFlow
工厂函数创建的SharedFlow
,内部实际是创建了SharedFlowImpl
对象,内部直接使用数组缓存数据。
发送数据
具体实现看起来有点多,我们先来看看MutableSharedFlow
的函数emit
与tryEmit
是如何实现发送数据的。
在tryEmit
内部通过synchronized
加锁了,发送数据是线程安全的。
历史缓存
那么当replay
不为0时,所谓的历史数据缓存又是什么呢?继续深入到tryEmitLocked
。
可以看到,实际上添加新元素到缓存数组是在enqueueLocked
函数内。
每次在数组添加新元素时,会检查缓存数组容量,默认会先创建容量为2的数组,当数组容量满后,会以当前容量的2倍进行扩容。
而历史缓存数据则是每次调用都在这个缓存数组buffer
中取出最新的replay
个数元素的组成一个List
。
当然,缓存数组buffer
也并不可能无限制缓存数据。
在tryEmitLocked
中,如果已缓存的数量bufferSize
超出允许保留的总缓存容量bufferCapacity
,会调用dropOldestLocked
函数抛弃最早发送的数据缓存。
默认不设置历史缓存时,缓存区容量
bufferCapacity
是Int.MAX_VALUE
。
dropOldestLocked
内部抛弃数据只是将数组索引的元素置空,缓存数组总长度不变,继续添加元素还是需要进行数组扩容。
收集数据
在Flow
中进行数据订阅,自然是找collect
函数的实现。
内部逻辑实现很清晰,通过分配绑定一个Slot
,在循环中尝试从缓存数组中取值并通过emit
函数发送到下游。
SharedFlow
的collect
内是个无限循环,会一直尝试从缓存中取值,所以collect
会一直处于挂起状态,直到所在协程关闭。
而这个Slot
则是种收集器状态工具,与在数据流消费者进行绑定,并记录当前分发数据的在缓存数组的索引。
在SharedFlow
的父类AbstractSharedFlow
内,利用数组缓存了这些收集器状态记录工具,并实现了allocateSlot
函数,用于分配绑定到收集器。
重新绑定收集器时,会将
Solt
内记录的下游重置为最新值索引的前replay
个元素索引,用于确保下游会先由历史数据开始接收数据。
继续回到从缓存数组中取出元素的tryTakeValue
,依旧是synchronized
加锁取值,并将待发送元素在缓存数组的索引+1。
如果此时没有需要发送的元素,就会调用挂起函数awaitValue
,进入挂起状态,等待缓存数组中添加数据。
小结
SharedFlow
的出现,意味着一对多数据流传递成为可能,而且还能享受到Flow
操作符带来的便利。
不过SharedFlow
内部并不是基于Channel
,而是基于数组+synchronized
。
相比内部基于Channel
的ChannelFlow
:
-
两者都是线程安全的,
Channel
使用ReentrantLock
+CAS,而SharedFlow
使用synchronized
。 -
Channel
内部是无锁双向链表结构,每个节点都是CAS引用类型,SharedFlow
则是数组结构,但允许存在一个从数组中截取的历史缓存集合。 -
SharedFlow
是热流,允许多个消费者订阅同一个数据流;ChannelFlow
是冷流,只允许单个消费者订阅。 -
由于
SharedFlow
在订阅前就运行发送数据,可能存在部分数据遗漏的问题。而ChannelFlow
只在的下游订阅后才开始发送数据,默认能接收到所有数据。 -
两者的内部无限循环的侧重点不一样。
ChannelFlow
从生产者角度一直循环等待Channel
发送完数据,提供了close
函数主动关闭Channel
通道。SharedFlow
是在消费者角度一直循环,等待取出上游发送的数据,并将关闭操作完全交由所在协程管理。
虽说事件总线的概念由于过渡滥用令人相当厌恶,不过SharedFlow
对于这类场景就和以前的RxBus
、EventBus
那样好用,并且还是线程安全的。
严格规范限制其使用作用范围,限制为特定场景的专用事件分发机制,比如从定位数据只取出一次数据,分发给所有订阅者的场景,还是不错的选择。
StateFlow
当需要多个消费者都只订阅到最新的一个值,并接收后续传递的所有值,同时还能不需要订阅也能直接获取最新值时,在RxJava
时代我们还有BehaviorSubject
的选项,亦或是后来职责更加单一的LiveData
。
而在有了Flow
之后,官方提供了另一个更便捷的类——StateFlow
,来处理这种场景。
StateFlow
实际上是SharedFlow
的子类,同样也拥有只读与可读可写的两种类型,StateFlow
与MutableStateFlow
。
从接口定义上,StateFlow
与LiveData
的定义可谓是非常相似。
相同点:
- 都允许多个消费者
- 都有只读与可变类型
- 永远只保存一个状态值
- 同样支持
DataBinding
(StateFlow
需要新版本才支持)
与LiveData
不同的是:
- 强制要求初始默认值
- 支持CAS模式赋值
- 默认支持防抖过滤
- value的空安全校验
Flow
丰富的异步数据流操作- 默认没有
Lifecycle
支持,flow
的collect
是挂起函数,会一直等待数据流传递数据 - 线程安全,
LiveData
的postValue
虽然也可在异步使用,但会导致数据丢失。
LiveData
除了对于Lifecycle
的支持,StateFlow
基本都是处于全面碾压的态势。
使用
作为SharedFlow
的子类,StateFlow
在使用上与其父类基本相同。
同样是利用同名工厂函数的进行创建,只是相比SharedFlow
,StateFlow
必须设置默认初始值。
而且MutableStateFlow
是无法配置缓冲区的,或者说固定永远只有一个,只会缓存最新的值。
同时我们需要屏蔽外部发送污染数据,只对外部提供只读属性的StateFlow
,此时就需要asStateFlow
。
fun test() = runBlocking{
val stateFlow = MutableStateFlow(1)
val readOnlyStateFlow = stateFlow.asStateFlow()
//模拟外部立即订阅数据
val job0 = launch {
readOnlyStateFlow.collect { println("collect0 : $it") }
}
delay(50)
//模拟在另一个类发送数据
launch {
for (i in 1..3){
println("wait emit $i")
stateFlow.emit(i)
delay(50)
}
}
//模拟启动页面,在新页面订阅
delay(200)
val job1 = launch {
readOnlyStateFlow.collect{ println("collect1 : $it") }
}
val job2 = launch {
readOnlyStateFlow.collect{ println("collect2 : $it") }
}
println("get value : ${readOnlyStateFlow.value}")
delay(200)
job0.cancel()
job1.cancel()
job2.cancel()
}
collect0 : 1
wait emit 1
wait emit 2
collect0 : 2
wait emit 3
collect0 : 3
get value : 3
collect1 : 3
collect2 : 3
复制代码
可以看到,在没有发送数据时订阅,会先接收默认值。
而新发送的数据后,由于第一个值与原有值相同,直接被过滤掉了。
后续新添加的订阅者能够接收到的就只有最新的值。
StateFlow
订阅者所在的协程,最好使用独立协程,collect
会一直挂起,协程内的后续操作不会执行
冷流转换热流
StateFlow
同样也有由Flow
冷流转化为热流的操作符函数stateIn
。
与shareIn
函数的区别只是必须设置默认值,stateIn
转化的共享数据流只缓存一个最新值。
StateFlow实现分析
StateFlow
内部并没有SharedFlow
的缓存数组,只是用atomic
引用类型的状态值,永远只保留一个最新的值。
由于只保存了一个值,可以通过对value
对象进行取值与赋值操作。
发送数据
所有发送数据操作tryEmit
与emit
都是调用setValue
操作,并最终调用updateState
函数进行CAS状态赋值。
函数看着比较长,其内部很巧妙的根据一个sequence
更新序号与synchronized
加锁,配合无限循环,只允许在更新序号为偶数才正常进行更新流程,并最终更新序号为奇数。
如果本身更新序号就为奇数,则表示已经执行过更新流程,直接跳过后续流程。
收集器状态
StateFlowImpl
的父类也是AbstractSharedFlow
,不同于SharedFlow
,这里的消费者状态工具是AbstractSharedFlowSlot
的另一个实现——StateFlowSlot
。
这个Slot
内部同样是atomic
CAS引用类型,其允许有四种状态
- null - 表示已经空闲释放,可以分配给消费者收集器
- NONE - 表示已经分配给消费者接收器
- PENDING - 表示上游已更新新值,待发送给收集器
- CancellableContinuationImpl - 表示收集器已挂起在等待上游数据
在StateFlow
更新状态值的流程中,会遍历所有已分配的Solt
,调用makePending
尝试将所有已分配的Solt
状态CAS更新为PENDING
,使消费端准备好接收数据。
收集数据
Flow
数据流消费端收集数据,自然还是使用collect
。
在消费者订阅函数,如SharedFlow
相同,会一直循环等待上游数据。
仅在第一次订阅时,
StateFlow
会立即发送最新数据。随后每次发送数据前都会进行重复过滤,并进行空安全检查。
随后也是同样的调用awaitPending
函数,创建新协程并进入挂起状态,直到上游重新传递数据将该协程恢复。
关联生命周期
早在RxJava
时代,如果直接在视图中调用subscribe
订阅数据流,若是视图生命周期处于后台状态时,接收数据更新就可能会出现不符合预期的情况。
所以在LiveData
出现后,在视图层的数据订阅都逐渐移交给LiveData
了,而RxJava
逐渐退居到数据层进行数据逻辑处理。
现在的Flow
在视图内调用collect
订阅数据流自然也会存在与RxJava
相同的问题。
因此我们需要让Flow
在视图生命周期处于后台时,不对数据流进行订阅处理、不传递数据到消费端。
LifecycleCoroutineScope
在早期,Lifecycle
库提供了拓展属性coroutineScope
。
作为视图专用的LifecycleCoroutineScope
协程作用域,会在视图销毁时取消协程作用域,其中拥有多个launchWith
系列函数。
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
public abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
...
public fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}
public fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}
public fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
...
}
复制代码
比如launchWhenStarted
函数,会将Flow
数据流消费端所在的协程,函数执行限定在小于Lifecycle.State.STARTED
状态下。(图片来自网络)
也即是消费者端所在协程内的collect
函数内部逻辑,只会在视图处于onStart
生命周期后才恢复执行,而处于后台的生命周期会被暂时挂起。
但对于Flow
数据流来说,即便消费端的处理逻辑挂起了,生产端数据源依然还在执行,尤其是某些数据源一直运行的场景可能造成不必要的资源损耗,官方目前也并不推荐使用这些函数。
repeatOnLifecycle
随着Lifecycle-runtime-ktx
库更新至2.4.0
版本,Lifecycle
提供了一个新的拓展函数repeatOnLifecycle
。
看着代码实现很多,内部逻辑其实很简单,切换CoroutineContext
到UI主线程,在进入允许的生命周期状态时,启动协程,订阅数据流。在超出设定的生命周期状态后,关闭协程,取消订阅
与其他方式的比较:(图片来自官方)
对于共享数据流的StateFlow
来说,每次订阅都只会获取最新的值,这也更接近LiveData
的使用逻辑。
在
repeatOnLifecycle
函数出现后,官方也开始计划删除launchWith
系列函数。
关于repeatOnLifecycle
的设计原因可以参见设计 repeatOnLifecycle API 背后的故事。
除此之外,Lifecycle
库还提供了一个Flow
的中间操作符flowWithLifecycle
,利用callbackFlow
来内部调用repeatOnLifecycle
。
而callbackFlow
内部实际上是基于Channel
实现的,对于上游数据流具有缓冲区的作用。
- 对于单个
Flow
数据流的生命周期控制,flowWithLifecycle
操作符可以很好解决样板代码。- 如果需要同时控制多个
Flow
数据流的生命周期,还是推荐使用repeatOnLifecycle
避免重复创建Channel
。- 冷流
Flow
不推荐直接使用flowWithLifecycle
,避免多次创建新的数据源。
于是Flow
数据流就可以在Activity
或Fragment
中很方便的绑定视图生命周期
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
//模拟从viewModel开放出来的状态更新
viewModel.readOnlyStateFlow
//在onStart开启订阅上游数据流,onPause取消订阅
.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
.onEach { //do something }
.launchIn(lifecycleScope) //运行在主线程的协程作用域,在视图销毁时自动取消作用域
}
复制代码
总结
至此,作为Kotlin Flow
的最后一部分拼图——共享数据流也就此集齐了。
SharedFlow
作为允许保留历史缓存并且只能收到新数据的存在,对于一对多事件分发的场景是个很好的选择。StateFlow
则是与原本LiveData
的定位重合,永远只持有最新数据,更适用于处理状态更新。
配合repeatOnLifecycle
限制视图生命周期订阅,StateFlow
可以完全替代LiveData
,更新视图的状态显示,同时支持粘性数据。
而原本需要封装LiveData
才能处理的不需要粘性的单次执行事件,只需要将replay
设置为0,SharedFlow
就能很好的承担这个职责。
当然,如果不需要Flow
数据流操作与线程安全的需求,像LiveData
这样职责单一的类,承担视图状态更新也还是不错的选择,简单也意味着不容易出错,便于维护。
毕竟StateFlow
到底还是要依靠Kotlin协程来实现,LiveData
利用observer
能直接订阅状态还是比较方便的。
总的来说,随着Kotlin Flow
的出现,从数据源的逻辑处理到视图层的状态与事件订阅,都有了很好的新选择。
也唯有熟悉与理解其背后运作机制,才能更好在合适的场景中灵活运用。