Kotlin学习:5.2、异步数据流 Flow

news/2024/4/26 8:28:23/文章来源:https://blog.csdn.net/weixin_35691921/article/details/126709158

Flow

  • 一、Flow
    • 1、Flow是什么东西?
    • 2、实现功能
    • 3、特点
    • 4、冷流和热流
    • 5、流的连续性
    • 6、流的构建器
    • 7、流的上下文
    • 8、指定流所在协程
    • 9、流的取消
      • 9.1、超时取消
      • 9.2、主动取消
      • 9.3、密集型任务的取消
    • 10、背压和优化
      • 10.1、buffer 操作符
      • 10.2、 flowOn
      • 10.3、conflate 操作符
      • 10.4、collectLatest 操作符
  • 二、操作符
    • 1、变换操作符
      • 1.1、buffer (缓存)
      • 1.2、map (变换)
        • 1.2.1、map
        • 1.2.2、mapNotNull (不空的下发)
        • 1.2.3、mapLatest
      • 1.3、transform (一转多)
      • 1.4、reduce (累*加减乘除)
      • 1.5、fold(累*加减乘除 and 拼接)
      • 1.6、flatMapConcat (有序变换)
      • 1.7、flatMapMerge (无序变换)
      • 1.8、flatMapLatest (截留)
    • 2、过滤型操作符
      • 2.1、take (截留)
        • 2.1.2、takeWhile
      • 2.2、filter(满足条件下发)
        • 2.2.2、filterNotNull (不空的下发)
        • 2.2.3、filterNot(符合条件的值将被丢弃)
        • 2.2.4、filterInstance (筛选符合类型的值)
      • 2.3、skip 和 drop(跳过)
        • 2.3.2、dropWhile
      • 2.4、distinctUntilChanged (过滤重复)
        • 2.4.2、distinctUntilChangedBy
      • 2.5、single (判断是否一个事件)
      • 2.6、first (截留第一个事件)
      • 2.7、debounce (防抖动)
      • 2.8、conflate
      • 2.9、sample (周期采样)
    • 3、组合型操作符
      • 3.1、count (计数)
      • 3.2、zip (合并元素)
      • 3.3、combine(合并元素)
      • 3.4、merge (合并成流)
      • 3.5、flattenConcat (展平流)
      • 3.6、flattenMerge(展平流)
    • 4、异常操作符
      • 4.1、catch (拦截异常)
      • 4.2、retry (重试)
        • 4.2.2、retryWhen
      • 4.3、withTimeout (超时)
    • 5、辅助操作符
      • 5.1、onXXX
      • 5.2、delay (延时)
      • 5.3、measureTimeMillis (计时)
  • 参考地址

一、Flow

1、Flow是什么东西?

Flow 是有点类似 RxJava 的 Observable

都有冷流和热流之分;
都有流式构建结构;
都包含 map、filter 等操作符。

区别于Observable,Flow可以配合挂起函数使用

2、实现功能

异步返回多个值

可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能
在这里插入图片描述
当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。

转:https://blog.csdn.net/qq_30382601/article/details/121825461

3、特点

  • flow{…}块中的代码可以挂起
  • 使用flow,suspend修饰符可以省略
  • 流使用emit函数发射值
  • 流使用collect的函数收集值
  • flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。
  • 流的连续性:流收集都是按顺序收集的
  • flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
  • 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。

转:https://blog.csdn.net/zx_android/article/details/122744370

4、冷流和热流

  • 冷流
    冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据
    Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

  • 热流
    热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可

5、流的连续性

流的连续性:流收集都是按顺序收集的

在这里插入图片描述

6、流的构建器

如下三种为冷流构建器

  1. flow{emit} .collect{}
  2. flowOf(***).collect{}
  3. (***).asFlow().collect{}
    @Testfun `test flow builder`() = runBlocking<Unit> {flowOf("one", "two", "three").onEach { delay(1000) }.collect { value ->println(value)}(1..3).asFlow().collect { value ->println(value)}flow<Int> {for (i in 11..13) {delay(1000) //假装在一些重要的事情emit(i) //发射,产生一个元素}}.collect { value ->println(value)}}

7、流的上下文

flowOn (多用于切线程)

流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。

    fun simpleFlow3() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow3().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}

如下:流的发射和接收在一个协程内

Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)

如下这种写法不被允许

    fun simpleFlow4() = flow<Int> {withContext(Dispatchers.Default) {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}}

那么如何切换协程上下文呢?
flowOn操作符,该函数用于更改流发射的上下文

    fun simpleFlow5() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}.flowOn(Dispatchers.Default)@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow5().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}

如下:切换上下文成功

Flow started DefaultDispatcher-worker-2 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

8、指定流所在协程

launchIn 用于指定协程作用域通知flow执行

使用 launchIn 替换 collect 在单独的协程中启动收集流

  • 指定协程
    //事件源private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
//                .collect {}.launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
//                .launchIn(this)//这里使用当前上下文传入Flowjob.join()}

打印:

Event: 1 DefaultDispatcher-worker-2 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
Event: 3 DefaultDispatcher-worker-3 @coroutine#2
  • 也可以指定当前协程中执行
    @Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
//                .collect {}
//            .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow.launchIn(this)//这里使用当前上下文传入Flow//        job.join()}
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2

9、流的取消

流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。

9.1、超时取消

withTimeoutOrNull 不能取消密集型任务

    fun simpleFlow6() = flow<Int> {for (i in 1..300) {delay(1000)emit(i)println("Emitting $i")}}@Testfun `test cancel flow`() = runBlocking<Unit> {withTimeoutOrNull(2500) {simpleFlow6().collect { value -> println(value) }}println("Done")}

9.2、主动取消

cancel

    @Testfun `test cancel flow `() = runBlocking<Unit> {simpleFlow6().collect { value ->if (value == 3) {cancel()}println(value)}println("Done")}

9.3、密集型任务的取消

密集型任务需要流的取消检测

cancel + cancellable

    @Testfun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()println("cancel check ${coroutineContext[Job]?.isActive}")}}

10、背压和优化

  • 什么是背压?

在这里插入图片描述

生产者生产的效率大于消费者消费的效率,元素积压

例,演示背压

     fun simpleFlow8() = flow<Int> {for (i in 1..10) {// emit 上面这段代码在collect之前执行delay(100)emit(i) // 调用collect// emit下面这段代码在 collect 之后执行println("Emitting $i ${Thread.currentThread().name}")}}@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected 4 Test worker @coroutine#1
Emitting 4 Test worker @coroutine#1
Collected 5 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Emitting 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Emitting 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Emitting 10 Test worker @coroutine#1
Collected in 3169 ms
  • 如何解决背压?

通过缓存进行性能优化

10.1、buffer 操作符

并发运行流中发射元素的代码

注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.

    @Testfun `test flow back pressure buffer`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().buffer(10) //缓存发射事件.collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2398 ms

在这里插入图片描述

10.2、 flowOn

flowOn(),修改流上下文,达到异步处理的效果,从而优化背压

    @Testfun `test flow back pressure flowOn`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().flowOn(Dispatchers.IO).collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 DefaultDispatcher-worker-1 @coroutine#2
Emitting 2 DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 DefaultDispatcher-worker-1 @coroutine#2
Emitting 4 DefaultDispatcher-worker-1 @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 DefaultDispatcher-worker-1 @coroutine#2
Emitting 6 DefaultDispatcher-worker-1 @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 DefaultDispatcher-worker-1 @coroutine#2
Emitting 8 DefaultDispatcher-worker-1 @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 DefaultDispatcher-worker-1 @coroutine#2
Emitting 10 DefaultDispatcher-worker-1 @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2385 ms

10.3、conflate 操作符

conflate(),合并发射项,处理最新的值,不对每个值进行处理;

    @Testfun `test flow back pressure conflate`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().conflate().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 6 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 8 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 1554 ms

10.4、collectLatest 操作符

collectLatest(),取消并重新发射最后一个值

    @Testfun `test flow back pressure collectLatest`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collectLatest { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 10 Test worker @coroutine#12
Collected in 1648 ms

二、操作符

1、变换操作符

1.1、buffer (缓存)

上面背压有栗子

1.2、map (变换)

1.2.1、map

map 是变换元素

data class Student(var name: String, var age: Int)
    private suspend fun performRequest(age: Int): Student {delay(500)return Student("这是name", age)}@Testfun `test map flow operator`() = runBlocking<Unit> {(1..3).asFlow().map { request -> performRequest(request) }.collect { value -> println(value) }}
Student(name=这是name, age=1)
Student(name=这是name, age=2)
Student(name=这是name, age=3)

1.2.2、mapNotNull (不空的下发)

    @Testfun `test mapNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(2)}.mapNotNull { request ->if (1 == request) {null} else {Student("这是name", request)}}.collect { value -> println(value) }}
Student(name=这是name, age=3)
Student(name=这是name, age=2)

1.2.3、mapLatest

当有新值发送时,如果上个转换还没结束,会取消掉,用法同map

    @Testfun `test mapLatest flow operator`() = runBlocking<Unit> {flow {emit(1)emit(2)emit(3)}.mapLatest {if (2 == it) delay(100L)"it is $it"}.collect {println(it)}}

1.3、transform (一转多)

    @Testfun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow().transform { request ->emit("Making request $request")emit(performRequest(request))}.collect { value -> println(value) }}
Making request 1
Student(name=这是name, age=1)
Making request 2
Student(name=这是name, age=2)
Making request 3
Student(name=这是name, age=3)

1.4、reduce (累*加减乘除)

    @Testfun `test reduce operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.reduce { accumulator, value -> accumulator + value })}

1.5、fold(累*加减乘除 and 拼接)

    @Testfun `test fold + operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.fold(3) { accumulator, value -> accumulator + value })}
17
    @Testfun `test fold - operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator - value })}
13
    @Testfun `test fold multiply by operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)}.fold(3) { accumulator, value -> accumulator * value })}
18
    @Testfun `test fold devide operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator / value })}
3
  • 拼接
    @Testfun `test fold joint operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(2)emit(3)}.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" })}
拼接 =+= 1 =+= 2 =+= 3

1.6、flatMapConcat (有序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。

    @Testfun `test flatMapConcat operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapConcat { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 3
value -> num: 4
value -> num: 5

1.7、flatMapMerge (无序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。

    @Testfun `test flatMapMerge operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapMerge() { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 4
value -> num: 3
value -> num: 5

1.8、flatMapLatest (截留)

快速执行的事件都正常下发,
当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。

    @Testfun `test flatMapLatest operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapLatest() { num ->flow {if (3 == num) {delay(200)}emit("num: $num")emit("num2: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num2: 1
value -> num: 2
value -> num2: 2
value -> num: 4
value -> num2: 4
value -> num: 5
value -> num2: 5

2、过滤型操作符

2.1、take (截留)

跟Rxjava一样

    fun numbers() = flow<Int> {try {emit(1)emit(2)println("This line will not execute")emit(3)} finally {println("Finally in numbers")}}@Testfun `test limit length operator`() = runBlocking<Unit> {//take(2),表示 当计数元素被消耗时,原始流被取消numbers().take(2).collect { value -> println(value) }}
1
2
Finally in numbers

2.1.2、takeWhile

找到第一个不满足条件的值,发送它之前的值,和dropWhile相反

    @Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(2)emit(1)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}

如上什么也不会输出;

    @Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}

会输出 1

2.2、filter(满足条件下发)

跟Rxjava一样

    @Testfun `test filter operator`() = runBlocking<Unit> {numbers().filter {it == 2}.collect { value -> println(value) }}

2.2.2、filterNotNull (不空的下发)

    @Testfun `test filterNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(null)emit(2)}.filterNotNull ().collect { value -> println(value) }}
1
3
2

2.2.3、filterNot(符合条件的值将被丢弃)

筛选不符合条件的值,相当于filter取反

    @Testfun `test filterNot operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)}.filterNot {it > 2}.collect { value -> println(value) }}
1
2

2.2.4、filterInstance (筛选符合类型的值)

对标rxjava中的ofType

筛选符合类型的值(不符合类型的值将被丢弃)

    @Testfun `test filterInstance operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit("2")emit(3)emit("str")}.filterIsInstance<String>().collect { value -> println(value) }}
2
str

2.3、skip 和 drop(跳过)

在这里插入图片描述

    @Testfun `test skip operator`() = runBlocking<Unit> {numbers().drop(2).collect { value -> println(value) }}

输出

3

2.3.2、dropWhile

找到第一个不满足条件的值,继续发送它和它之后的值

    @Testfun `test dropWhile operator`() = runBlocking<Unit> {numbers().dropWhile { it <= 2 }.collect { value -> println(value) }}
This line will not execute
3
Finally in numbers

2.4、distinctUntilChanged (过滤重复)

    @Testfun `test distinctUntilChanged operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.distinctUntilChanged().collect { value -> println(value) }}

2.4.2、distinctUntilChangedBy

判断两个连续值是否重复,可以设置是否丢弃重复值。
去重规则有点复杂,没完全懂

    @Testfun `test distinctUntilChangedBy operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).distinctUntilChangedBy { it.name == "Jack" }.collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=11)
Student(name=Tom, age=10)
Student(name=Jack, age=12)
Student(name=Tom, age=11)

2.5、single (判断是否一个事件)

用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:

    @Testfun `test single operator`() = runBlocking<Unit> {try {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.single())} catch (e: Exception) {println("e =$e")}}

如果一个事件,就正常执行;否则异常。

e =java.lang.IllegalArgumentException: Flow has more than one element

2.6、first (截留第一个事件)

    @Testfun `test first operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.first())}
1

2.7、debounce (防抖动)

    @Testfun `test debounce operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).onEach {if (it.name == "Jack" && it.age == 13)delay(500)}.debounce(500).collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=12)
Student(name=Tom, age=11)

2.8、conflate

见 10.3、conflate

仅保留最新值, 内部就是 buffer(CONFLATED``)

2.9、sample (周期采样)

固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃

sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

    @Testfun `test sample operator`() = runBlocking<Unit> {flow {repeat(10) {delay(50)emit(it)}}.sample(100).collect {println(it)}}
0
2
4
6
8

3、组合型操作符

3.1、count (计数)

    @Testfun `test count operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.count())}

3.2、zip (合并元素)

跟Rxjava一样

    @Testfun `test zip operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.zip(numFlow) { string, num ->"$string$num"}.collect {println("value -> $it")}}

3.3、combine(合并元素)

    @Testfun `test combine operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.combine(numFlow) { string, num ->"$string$num"}.collect {println("value -> $it")}}
value -> 小红:1
value -> 小黑:2
value -> 小黑:3

3.4、merge (合并成流)

merge 是将两个flow合并起来,将每个值依次发出来

    @Testfun `test merge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()merge(flow1, flow2).collect { value -> println(value) }}
1
2
one
two
three

3.5、flattenConcat (展平流)

展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用

    @Testfun `test flattenConcat operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenConcat().collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx

3.6、flattenMerge(展平流)

flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量

    @Testfun `test flattenMerge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenMerge(2).collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx

4、异常操作符

4.1、catch (拦截异常)

对标rxjava 中的 onErrorResumeNext

Exception、Throwable、Error 都会拦截

    @Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it) throw NullPointerException() }.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
    @Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it)
//                throw Exception("测试 异常")
//                throw Throwable("测试 异常")throw Error("测试 错误")}.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
value -> 1
value -> 110
e == java.lang.Error: 测试 错误

4.2、retry (重试)

所有异常错误都拦截

  • 拦截次数
    @Testfun `test retry operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Exception("异常")emit(3)}.retry(2).catch { emit(110) }.collect { value -> println(value) }}
  • 拦截条件
    @Testfun `test retry 2 operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Error("异常")emit(3)}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}

如上,满足拦截条件,所以会一直打印日志

1
2
1
2
1
2
1
2
1
... 不杀死程序一直打印

4.2.2、retryWhen

4.3、withTimeout (超时)

    @Testfun `test retry 2 operator`() = runBlocking<Unit> {withTimeout(2500) {flow<Any> {emit(1)throw Error("异常")}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}}

输出:

1
1
... 好多个
1
1
1Timed out waiting for 2500 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms(Coroutine boundary)at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 msat app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)at java.base@11.0.13/java.lang.Thread.run(Thread.java:834)

5、辅助操作符

5.1、onXXX

onXXX 的方法包含

onCompletion 流完成时调用
onStart 流开始时调用
onEach 元素下发时调用,每次下发都调用

对比rxjava 中:
onCompletion == doOnComplete
onStart == doOnSubscribe 或者 doOnLifecycle
onEach == doNext

    @Testfun `test do operator`() = runBlocking<Unit> {(1..5).asFlow().onCompletion { println(" onCompletion == $it ") }.onStart { println(" onStart ") }.onEach { println(" onEach == $it ") }.collect {println("value -> $it")}}

5.2、delay (延时)

延时

    private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)

5.3、measureTimeMillis (计时)

测量代码用时

    @Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}

参考地址

笔记大部分内容来自动脑学院的文章和视频

动脑学院
:https://blog.csdn.net/qq_30382601/article/details/121825461

Kotlin 之 协程(三)Flow异步流
:https://blog.csdn.net/zx_android/article/details/122744370

Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75483.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

同为(TOWE)防雷产品助力福建移动南平分公司防雷改造

01 公司简介中国移动通信集团福建有限公司南平分公司属于福建移动地级分公司&#xff0c;所属行业为电信、广播电视和卫星传输服务。现已建成覆盖范围广、业务品种多、通信质量高的综合通信网络&#xff0c;具备行业领先的经营管理制度。移动通信大楼的综合防雷及地接系统&…

Fedora系统安装KubeVela

话不多说直接看命令 Docker安装 Vela安装需要先安装Docker sudo yum -y install docker只需这行命令便可以自动添加 yum和dnf理论上都能成功&#xff0c;但是很看网速&#xff0c;&#xff0c;&#xff0c;实践证明yum是最好的。 如果发生报错mirrors trieds大概率就是网速超…

Kubernetes06:Controller (Deployment无状态应用)

Kubernetes06:Controller 1、什么是controller 管理和运行容器的对象&#xff0c;是一个物理概念 在集群上管理和运行容器的对象 2、Pod和Controller之间的关系 Pod是通过controller来实现应用的运维 比如伸缩、滚动升级等等操作Pod和Controller之间通过 label 标签建立关系…

Java 常用 API

文章目录一、Math二、System三、Object1. toString() 方法2. equals() 方法四、Arrays1. 冒泡排序2. Arrays 常用方法五、基本类型包装类1. Integer2. int 和 String 相互转换3. 字符串中数据排序4. 自动装箱和拆箱六、日期类1. Date2. SimpleDateFormat3. Calendar4. 二月天一…

来面试阿里测开工程师,HR问我未来3-5年规划,我给HR画个大饼。

在面试的过程中是不是经常被面试官问未来几年的职业规划?你会答吗&#xff1f;是不是经常脑袋里一片空白&#xff0c;未来规划&#xff1f;我只是想赚更多的钱啊&#xff0c;哈哈哈&#xff0c;今天我来教大家&#xff0c;如何给面试官画一个大饼&#xff0c;让他吃的不亦乐乎…

C++ STL:迭代器 Iterator

文章目录1、迭代器的类型2、traitsiterator_traitstype_traits泛化的指针&#xff0c;容器与算法的桥梁。提供一种方法&#xff0c;按照一定顺序访问一个聚合对象中各个元素&#xff0c;而又不暴露该对象的内部表示。既能对容器进行遍历&#xff0c;又可以对外隐藏容器的底层实…

数据库多主键in查询组合篇(sqlserver特殊)

此篇介绍的是oracle、mysql、sqlserver、达梦、人大金仓、南大通用数据库的单主键和复合主键select in的查询总结。 Mysql Select id,name from t_db_task where (id,name) in((915,Oracle内到外全表同步),(916,Oracle外到内全表同步),(921,Oracle外到内的触发同步)); selec…

ElasticSearch 学习笔记总结(三)

文章目录一、ES 相关名词 专业介绍二、ES 系统架构三、ES 创建分片副本 和 elasticsearch-head插件四、ES 故障转移五、ES 应对故障六、ES 路由计算 和 分片控制七、ES集群 数据写流程八、ES集群 数据读流程九、ES集群 更新流程 和 批量操作十、ES 相关重要 概念 和 名词十一、…

Java9之HttpClientAPI实战详解

Java9 之 HttpClientAPI 实战详解 前言 相信关注 java9 的小伙伴们都知道 java9 版本内置模块提供了 Http 功能&#xff0c;当然并不是说之前 jdk 之前并不支持&#xff0c;那么这次更新又多了什么呢&#xff1f;或者是解决了什么问题&#xff1f; 说明 自 JDK 1.0 以来&…

mac安装 Termius

1.下载安装包 链接: https://pan.baidu.com/s/1f5xmvYnVehCkMUD291SbsA?pwdy43k 提取码: y43k 2.打开系统偏好设置 -> 安全性与隐私 -> 通用&#xff0c;勾选“任何来源” 显示文件损坏的情况下执行下面操作 3.打开terminal终端 3.1 输入&#xff1a;sudo spctl --m…

“来源可靠、程序规范、要素合规”与“四性”

《从技术可行性的视角看电子档案的“四性”》一文中已经明确&#xff0c;笔者认为的电子档案“四性”是指“真实性、完整性、可用性和安全性”。而《从特斯拉“刹车失灵”事件看电子档案的法定要求》一文中&#xff0c;笔者对于“来源可靠、程序规范、要素合规”的解读如下&…

解决windows安装wxPython安装失败、速度过慢及PyCharm上wx包爆红问题

网上关于wxPython安装失败&#xff0c;安装速度过慢&#xff0c;以及安装成功后PyCharm中import wx仍然爆红的文章有很多&#xff0c;也特别杂&#xff0c;解决起来特别困难&#xff0c;今天在这里对问题的处理进行一个整合&#xff0c;希望能帮助到大家。 安装wxPython这里运用…

MySQL表的增删查改(基础)

gitee:博客中的所有操作整合新增语法:insert [into] table_name values(value_list)[案例] 创建一个学生表进行数据插入1.1单行数据全列插入[提示]我们可以想在记事本上写下命令,让后复制到数据库客户端,这样可以在出错的时候进行快速修改.同时为了美观和明了,我们可以进行适当…

计算机的发展

个人简介&#xff1a;云计算网络运维专业人员&#xff0c;了解运维知识&#xff0c;掌握TCP/IP协议&#xff0c;每天分享网络运维知识与技能。个人爱好: 编程&#xff0c;打篮球&#xff0c;计算机知识个人名言&#xff1a;海不辞水&#xff0c;故能成其大&#xff1b;山不辞石…

低代码开发平台选型必看指南

低代码开发是近年来逐渐兴起的一种新型软件开发方式。它通过封装常见的软件开发流程和代码&#xff0c;使得非专业的开发者也能够轻松创建复杂的应用程序。这种开发方式已经受到了许多企业的青睐&#xff0c;成为提高生产效率、降低开发成本的一种有效途径。 低代码开发的核心…

docker部署zabbix6.2.7+grafana

目录 1、下载docker 2、下载相关镜像文件 3、创建一个供zabbix系统使用的网络环境 4、创建一个供mysql数据库存放文件的目录 5、启动mysql容器 6、为zabbix-server创建一个持久卷 7、启动zabbix-server容器 8、创建语言存放目录 9、启动zabbix-web容器 10、启动zabbix…

【解锁技能】学会Python条件语句的终极指南!

文章目录前言一. python条件语句的介绍1.1 什么是条件语句1.2 条件语句的语法1.3 关于内置函数bool()二. 分支语句之单分支三. 多分支语句3.1 二分支语句3.2 多分支语句3.3 嵌套循环总结前言 &#x1f3e0;个人主页&#xff1a;欢迎访问 沐风晓月的博客 &#x1f9d1;个人简介&…

EPICS synApps介绍

一、synApps是什么&#xff1f; 1&#xff09; 一个用于同步束线用户的EPICS模块集合。 2&#xff09; EPICS模块 alive, autosave, busy, calc, camac, caputRecorder, dac128V, delaygen, dxp, ip, ip330, ipUnidig, love, mca, measComp, modbus, motor, optics, quadEM,…

【蓝桥杯选拔赛真题38】python目标值判断 青少年组蓝桥杯python 选拔赛STEMA比赛真题解析

目录 python目标值判断 一、题目要求 1、编程实现 2、输入输出 二、解题思路

【牛客刷题专栏】0x0E:JZ6 从尾到头打印链表(C语言编程题)

前言 个人推荐在牛客网刷题(点击可以跳转)&#xff0c;它登陆后会保存刷题记录进度&#xff0c;重新登录时写过的题目代码不会丢失。个人刷题练习系列专栏&#xff1a;个人CSDN牛客刷题专栏。 题目来自&#xff1a;牛客/题库 / 在线编程 / 剑指offer&#xff1a; 目录前言问题…