一. 实用
1. 倒计时
class MainViewModel : ViewModel() {
private val timeFlow = flow {
var time = 0
while (true) {
emit(time)
delay(1000)
time++
}
}
/**
stateIn函数可以将其他的Flow转换成StateFlow。将timeFlow转换成了StateFlow。
stateIn函数接收3个参数,其中第1个参数是作用域,传入viewModelScope即可。第3个参数是初始值,计时器的初始值传入0即可。
第二个参数的解释:生命周期的超时时常,这里写的是5秒的超时
因为横竖屏切换通常很快就能完成,这里我们通过stateIn函数的第2个参数指定了一个5秒的超时时长,那么只要在5秒钟内横竖屏切换完成了,Flow就不会停止工作。
反过来讲,这也使得程序切到后台之后,如果5秒钟之内再回到前台,那么Flow也不会停止工作。但是如果切到后台超过了5秒钟,Flow就会全部停止了。
**/
//
//
val stateFlow =
timeFlow.stateIn(
viewModelScope,
SharingStarted.WhileSubscribed(5000),
0
)
}
UI界面
//repeatOnLifecycle 每次生命周期都会重新执行 并且在Started的时候才会执行
val textView = findViewById<TextView>(R.id.text_view)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
mainViewModel.stateFlow.collect {
time ->
textView.text = time.toString()
}
}
}
2. StateFlow 和 SharedFlow
class MainViewModel : ViewModel() {
//不会随屏幕的旋转而改变数据 粘性
private val _clickCountFlow = MutableStateFlow(0)
val clickCountFlow = _clickCountFlow.asStateFlow()
fun increaseClickCount() {
//可以直接操作value 热流
_clickCountFlow.value += 1
}
}
class MainViewModel : ViewModel() {
//会随屏幕的旋转而改变
private val _loginFlow = MutableSharedFlow<String>()
val loginFlow = _loginFlow.asSharedFlow()
fun startLogin() {
viewModelScope.launch {
//这里就类似于Flow 冷流 而emit是挂起函数 必须增加viewModelScope 协程作用域
_loginFlow.emit("Login Success")
}
}
}
二. 协程
fun main1(args: Array<String>) = runBlocking {
// this: CoroutineScope
launch {
delay(200L)
println("Task from runBlocking")
}
coroutineScope {
// 创建一个协程作用域 会阻塞 导致最后一行最后打印
launch {
delay(500L)
println("Task from nested launch")
}
delay(100L)
println("Task from coroutine scope") // 这一行会在内嵌 launch 之前输出
}
println("Coroutine scope is over") // 这一行在内嵌 launch 执行完毕后才输出
}
/**输出
* Task from coroutine scope
* Task from runBlocking
* Task from nested launch
* Coroutine scope is over
*/
fun main() = runBlocking {
launch {
doWorld() }
println("Hello,")
}
// 这是你的第一个挂起函数
suspend fun doWorld() {
delay(1000L)
println("World!")
}
/**输出
* Hello,
* World!
*/
取消
fun main()= runBlocking {
val job = launch {
repeat(1000) {
i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该作业
job.join() // 等待作业执行结束 等待cancel结束处理完毕
//job.cancelAndJoin()//等于上面两行
println("main: Now I can quit.")
}
/**
* job: I'm sleeping 0 ...
* job: I'm sleeping 1 ...
* job: I'm sleeping 2 ...
* main: I'm tired of waiting!
* main: Now I can quit.
*/
取消 使用isActive
fun main()= runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) {
// 在协程激活情况下打印
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${
i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
}
/**
* job: I'm sleeping 0 ...
* job: I'm sleeping 1 ...
* job: I'm sleeping 2 ...
* main: I'm tired of waiting!
* main: Now I can quit.
*/
利用try做处理
fun main()= runBlocking {
val job = launch {
try {
repeat(1000) {
i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}catch (e:Exception){
//独立的协程被取消
//error:StandaloneCoroutine was cancelled
println("error:${
e.message}")
} finally {
//此时协程已经在取消状态了
println("协程的状态:$isActive")
println("job: I'm running finally")
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并且等待它结束
println("main: Now I can quit.")
}
/**
* job: I'm sleeping 0 ...
* job: I'm sleeping 1 ...
* job: I'm sleeping 2 ...
* main: I'm tired of waiting!
* 协程的状态:false
* job: I'm running finally
* main: Now I can quit.
*/
不可以被取消的协程
fun main()= runBlocking {
val job = launch {
try {
repeat(1000) {
i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
//NonCancellable 取消不掉的
withContext(NonCancellable) {
println("job: I'm running finally")
delay(3000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
//在1300秒内执行完 repeat 再执行finally 因为有不能被取消的协程 所以时间必须消耗完再结束
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
}
/**
* job: I'm sleeping 0 ...
* job: I'm sleeping 1 ...
* job: I'm sleeping 2 ...
* main: I'm tired of waiting!
* job: I'm running finally
* job: And I've just delayed for 1 sec because I'm non-cancellable
* main: Now I can quit.
*/
取消前获取结果
fun main() = runBlocking {
val result = withTimeoutOrNull(1300L) {
repeat(1000) {
i ->
//repeat(2) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
//再循环结束前获取结果肯定获取的是null 不会得到Done
"Done" // 在它运行得到结果之前取消它
}
println("Result is $result")
}
/**
* I'm sleeping 0 ...
* I'm sleeping 1 ...
* I'm sleeping 2 ...
* Result is null
*/
组合挂起函数
1.同步异步的差别
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假设我们在这里做了一些有用的事
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假设我们在这里也做了一些有用的事
return 29
}
fun main()= runBlocking {
//同步进行
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${
one + two}")
}
println("Completed in $time ms")
/**
* The answer is 42
* Completed in 2017 ms
*/
//并发进行 这里快了两倍,因为两个协程并发执行。 请注意,使用协程进行并发总是显式的。
/* val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")*/
/**
* The answer is 42
* Completed in 1015 ms
*/
}
2.惰性用法
fun main()= runBlocking {
val time = measureTimeMillis {
//只有结果通过 await 获取的时候协程才会启动 或者在 Job 的 start 函数调用的时候
val one = async(start = CoroutineStart.LAZY) {
doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) {
doSomethingUsefulTwo() }
// 执行一些计算
one.start() // 启动第一个
two.start() // 启动第二个
println("The answer is ${
one.await() + two.await()}")
}
println("Completed in $time ms")
/**
* The answer is 42
* Completed in 1013 ms
*/
}
/**
* 如果在 concurrentSum 函数内部发生了错误,并且它抛出了一个异常, 所有在作用域中启动的协程都会被取消。
*/
suspend fun concurrentSum(): Int = coroutineScope {
val one = async {
doSomethingUsefulOne() }
val two = async {
doSomethingUsefulTwo() }
one.await() + two.await()
}
fun main()= runBlocking {
val time = measureTimeMillis {
println("The answer is ${
concurrentSum()}")
}
println("Completed in $time ms")
}
3.异常情况
fun main() = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch(e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // 模拟一个长时间的运算
42
} finally {
//第二个处理出错 即使等待很长的时间 这里也会被取消
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
三. 协程上下文与调度器
1. Context
fun main() = runBlocking<Unit> {
launch {
// 运行在父协程的上下文中,即 runBlocking 主协程
println("main runBlocking : I'm working in thread ${
Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
// 不受限的——将工作在主线程中
println("Unconfined : I'm working in thread ${
Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
// 将会获取默认调度器
println("Default : I'm working in thread ${
Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) {
// 将使它获得一个新的线程
println("newSingleThreadContext: I'm working in thread ${
Thread.currentThread().name}")
}
}
/**
* 注意顺序
* Unconfined : I'm working in thread main
* Default : I'm working in thread DefaultDispatcher-worker-1
* newSingleThreadContext: I'm working in thread MyOwnThread
* main runBlocking : I'm working in thread main
*/
冷流
fun simple4(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple4()
println("Calling collect...")
flow.collect {
value -> println(value) }
println("Calling collect again...")
//只要收集就会获取到结果
flow.collect {
value -> println(value) }
}
/**
* Calling simple function...
* Calling collect...
* Flow started
* 1
* 2
* 3
* Calling collect again...
* Flow started
* 1
* 2
* 3
*/
超时取消
fun simple5(): Flow<Int> = flow {
for (i in 1..6) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(350) {
// 在 250 毫秒后超时
simple5().collect {
value -> println(value) }
}
println("Done")
}
/**
* Emitting 1
* 1
* Emitting 2
* 2
* Emitting 3
* 3
* Done
*/
构建流
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步工作
val response = "response $request"
println("request $request")
return response
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一个请求流
.map {
request -> performRequest(request) }
.collect {
response -> println(response) }
}
/**
* request 1
* response 1
* request 2
* response 2
* request 3
* response 3
*/
转换操作符
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一个请求流
.transform {
request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect {
response -> println(response) }
}
/**
* Making request 1
* request 1
* response 1
* Making request 2
* request 2
* response 2
* Making request 3
* request 3
* response 3
*/
限长操作符
//函数体的执行在发射出第二个数字后停止
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只获取前两个
.collect {
value -> println(value) }
}
/**
* 1
* 2
* Finally in numbers
*/
末端流操作符
转化为各种集合,例如 toList 与 toSet。
获取第一个(first)值与确保流发射单个(single)值的操作符。
使用 reduce 与 fold 将流规约到单个值。
fun main() = runBlocking {
val sum = (1..5).asFlow()
.map {
it * it } // 数字 1 至 5 的平方
.reduce {
a, c -> a + c } // 求和(末端操作符)
println(sum)
}
//55
流是连续的
从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
/**
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
**/
协程上下文报错
fun simple6(): Flow<Int> = flow {
// 在流构建器中更改消耗 CPU 代码的上下文的错误方式
//withContext可以改变协程的上下文 导致 和上面flow的不一致 所以报错
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
emit(i) // 发射下一个值
}
}
}
fun main() = runBlocking<Unit> {
simple6().collect {
value -> println(value) }
}
如何正确使用flowOn修改发射流的上下文
fun log(msg: String) = println("[${
Thread.currentThread().name}] $msg")
fun simple7(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
log("Emitting $i")
emit(i) // 发射下一个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式
fun main() = runBlocking<Unit> {
simple7().collect {
value ->
log("Collected $value")
}
}
/**
* [DefaultDispatcher-worker-1] Emitting 1
* [main] Collected 1
* [DefaultDispatcher-worker-1] Emitting 2
* [main] Collected 2
* [DefaultDispatcher-worker-1] Emitting 3
* [main] Collected 3
*/
缓冲
fun simple8(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple8().collect {
value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
/**
* 1
* 2
* 3
* Collected in 1238 ms
*/
修改一下 buffer 操作符来并发运行这个 simple 流中发射元素的代码以及收集的代码
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple8()
.buffer() // 缓冲发射项,无需等待
.collect {
value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
/**
1
2
3
Collected in 1052 ms
*/
合并
- 当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个。在本示例中,当收集器处理它们太慢的时候, conflate 操作符可以用于跳过中间值
val time = measureTimeMillis {
simple()
.conflate() // 合并发射项,不对每个值进行处理
.collect {
value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
处理最新值
- 取消缓慢的收集器,并在每次发射新值的时候重新启动它。有一组与 xxx 操作符执行相同基本逻辑的 xxxLatest 操作符,但是在新值产生的时候取消执行其块中的代码
val time = measureTimeMillis {
simple()
.collectLatest {
value -> // 取消并重新发射最后一个值
println("Collecting $value")
delay(300) // 假装我们花费 300 毫秒来处理它 导致最后只能收集到第三个
println("Done $value")
}
}
println("Collected in $time ms")
/**
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
**/
zip 操作符用于组合两个流中的相关值
fun main() = runBlocking {
val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) {
a, b -> "$a -> $b" } // 组合单个字符串
.collect {
println(it) } // 收集并打印
}
/**
* 1 -> one
* 2 -> two
* 3 -> three
*/
combine
可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算
fun main27() = runBlocking {
val nums = (1..3).asFlow().onEach {
delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach {
delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) {
a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect {
value -> // 收集并打印
println("$value at ${
System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1 -> one at 421 ms from start
* 2 -> two at 823 ms from start
* 3 -> three at 1233 ms from start
*/
fun main() = runBlocking {
val nums = (1..3).asFlow().onEach {
delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach {
delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) {
a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
.collect {
value -> // 收集并打印
println("$value at ${
System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1 -> one at 430 ms from start
* 2 -> one at 637 ms from start
* 2 -> two at 835 ms from start
* 3 -> two at 938 ms from start
* 3 -> three at 1241 ms from start
*/
展平流
flatMapConcat 与 flattenConcat 操作符实现 连接
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
fun main() = runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach {
delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat {
requestFlow(it) }
.collect {
value -> // 收集并打印
println("$value at ${
System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1: First at 120 ms from start
* 1: Second at 625 ms from start
* 2: First at 730 ms from start
* 2: Second at 1238 ms from start
* 3: First at 1349 ms from start
* 3: Second at 1852 ms from start
*/
flatMapMerge 会顺序调用代码块(本示例中的 { requestFlow(it) }),但是并发收集结果流,相当于执行顺序是首先执行 map { requestFlow(it) } 然后在其返回结果上调用 flattenMerge
fun main() = runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow()
.onEach {
delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge {
requestFlow(it) }
.collect {
value -> // 收集并打印
println("$value at ${
System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1: First at 140 ms from start
* 2: First at 243 ms from start
* 3: First at 346 ms from start
* 1: Second at 646 ms from start
* 2: Second at 746 ms from start
* 3: Second at 849 ms from start
*/
flatMapLatest 类似于collectLatest收集最新的
代码略
try cach
fun simple10(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
try {
simple10().collect {
value ->
println(value)
check(value <= 1) {
"Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
/**
* Emitting 1
* 1
* Emitting 2
* 2
* Caught java.lang.IllegalStateException: Collected 2
*/
fun simple11(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple11()
.catch {
e -> println("Caught $e") } // 不会捕获下游异常
.collect {
value ->
check(value <= 1) {
"Collected $value" }
println(value)
}
}
其他
fun main() = runBlocking<Unit> {
try {
simple10()
.onCompletion {
println("Done") }//它在流完全收集时调用
.collect {
value ->
if (value == 3) cancel()//取消流
println(value)
}
} finally {
println("Done")//它在结束时调用
}
}
通道
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
for (x in 1..10) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(10) {
println(channel.receive()) }
println("Done!")
}
/**
* 1
* 4
* 9
* 16
* 25
* 36
* 49
* 64
* 81
* 100
* Done!
*/
通过通道生产
fun main() = runBlocking {
val numbers = produceNumbers() // 从 1 开始生成整数
val squares = square(numbers) // 整数求平方
repeat(5) {
println(squares.receive()) // 输出前五个
}
println("Done!") // 至此已完成
coroutineContext.cancelChildren() // 取消子协程
}
/**
* 生产整数
*/
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 从 1 开始的无限的整数流
}
/**
* 把生产的整数转成平方
*/
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}