当,Kotlin Flow与Channel相逢

news/2024/4/27 15:41:57/文章来源:https://blog.csdn.net/wekajava/article/details/130278999

前言

之前的文章已经分析了Flow的相关原理与简单使用,Flow之所以用起来香,Flow便捷的操作符功不可没,而想要熟练使用更复杂的操作符,那么需要厘清Flow和Channel的关系。
本篇文章构成:

image.png

1. Flow与Channel 对比

1.1 Flow核心原理与使用场景

原理

先看最简单的Demo:

    fun test0() {runBlocking {//构造flowval flow = flow {//下游emit("hello world ${Thread.currentThread()}")}//收集flowflow.collect {//下游println("collect:$it ${Thread.currentThread()}")}}}

打印结果:

collect:hello world Thread[main,5,main] Thread[main,5,main]

说明下游和上游运行在同一线程里。

image.png

一个最基本的flow包含如下几个元素:

  1. 操作符,也即是函数
  2. 上游,通过构造操作符创建
  3. 下游,通过末端操作符构建

我们可以类比流在管道里流动:
image.png

上游早就准备好了,只是下游没有发出指令,此时上下游是没有建立起关联的,只有当下游渴了,需要水了才会通知上游放水,这个时候上下游才关联起来,管道就建好了。
因此我们认为Flow是冷流。

更多Flow细节请移步:Kotlin Flow啊,你将流向何方?

使用
基于Flow的特性,通常将其用在提供数据的场景,比如生产数据的模块将生产过程封装到flow的上游里,最终创建了flow对象。
而使用数据的模块就可以通过该flow对象去收集上游的数据,如下:

//提供数据的模块
class StudentInfo {fun getInfoFlow() : Flow<String> {return flow {//假装构造数据Thread.sleep(2000)emit("name=fish age=18")}}
}//消费数据的模块fun test1() {runBlocking {val flow = StudentInfo().getInfoFlow()flow.collect {println("studentInfo:$it")}}}

1.2 Channel核心原理与使用场景

原理
由上可知,Flow比较被动,在没有收集数据之前,上下游是互不感知的,管道并没有建起来。
而现在我们有个场景:

需要将管道提前建起来,在任何时候都可以在上游生产数据,在下游取数据,此时上下游是可以感知的

先看最简单的Demo:

    fun test2() {//提前建立通道/管道val channel = Channel<String>()GlobalScope.launch {//上游放数据(放水)delay(200)val data = "放水啦"println("上游:data=$data ${Thread.currentThread()}")channel.send(data)}GlobalScope.launch {val data = channel.receive()println("下游收到=$data ${Thread.currentThread()}")}}

image.png

一个最基本的Channel包含如下几个元素:

  1. 创建Channel
  2. 往Channel里放数据(生产)
  3. 从Channel里取数据(消费)

image.png

使用
可以看出与Flow不同的是,生产者、消费者都可以往Channel里存放/取出数据,只是能否进行有效的存放,能否成功取出数据需要根据Channel的状态确定。
Channel最大的特点:

  1. 生产者、消费者访问Channel是线程安全的,也就是说不管生产者和消费者在哪个线程,它们都能安全的存取数据
  2. 数据只能被消费一次,上游发送了1条数据,只要有1个下游消费了数据,则其它下游将不会拿到此数据

更多Channel细节请移步:继续来,同我一起撸Kotlin Channel 深水区

2. Flow与Channel 相逢

2.1 Flow切换线程的始末

思考一种场景:需要在flow里进行耗时操作(比如网络请求),外界拿到flow对象后等待收集数据即可。
很容易我们就想到如下写法:

    fun test3() {runBlocking {//构造flowval flow = flow {//下游//模拟耗时thread { Thread.sleep(3000)emit("hello world ${Thread.currentThread()}")}}}}

可惜的是编译不通过:
image.png
因为emit是挂起函数,需要在协程作用域里调用。

当然,添加一个协程作用域也很简单:

    fun test4() {runBlocking {//构造flowval flow = flow {//下游val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)coroutineScope.launch {//模拟耗时,在子线程执行Thread.sleep(3000)emit("hello world ${Thread.currentThread()}")}}flow.collect {println("collect:$it")}}}

编译没有报错,满心欢喜执行,等待3s后,事与愿违:
image.png
意思是"检测到了在另一个线程里发射数据,这种行为不是线程安全的因此被禁止了"。

查看源码发现:
image.png

在emit之前会检测emit所在的协程与collect所在协程是否一致,不一致就抛出异常。
显然在我们上面的Demo里,collect属于runBlocking协程,而emit属于我们新开的协程,当然不一样了。

2.2 ChannelFlow 闪亮登场

2.2.1 自制丐版ChannelFlow

既然是线程安全问题,我们很容易想到使用Channel来解决,在此之前需要对Flow进行封装:

//参数为SendChannel扩展函数
class MyFlow(private val block: suspend SendChannel<String>.() -> Unit) : Flow<String> {//构造Channelprivate val channel = Channel<String>()override suspend fun collect(collector: FlowCollector<String>) {val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)coroutineScope.launch {//启动协程//模拟耗时,在子线程执行Thread.sleep(3000)//把Channel对象传递出去block(channel)}//获取数据val data = channel.receive()//发射collector.emit(data)}
}

如上,重写了Flow的collect函数,当外界调用flow.collect时:

  1. 先启动一个协程
  2. 从channel里读取数据,没有数据则挂起当前协程
  3. 1里的协程执行,调用flow的闭包执行上游逻辑
  4. 拿到数据后进行发射,最终传递到collect的闭包

外界使用flow:

    fun test5() {runBlocking {//构造flowval myFlow = MyFlow {send("hello world emit 线程: ${Thread.currentThread()}")}myFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

最终打印:

下游收到=hello world emit 线程: Thread[DefaultDispatcher-worker-1,5,main] collect 线程: Thread[main,5,main]

可以看出,上游、下游在不同的协程里执行,也在不同的线程里执行。
如此一来就满足了需求。

2.2.2 ChannelFlow 核心原理

上面重写的Flow没有使用泛型,也没有对Channel进行关闭,还有其它的点没有完善。
还好官方已经提供了完善的类和操作符,得益于此我们很容易就完成如上需求。

    fun test6() {runBlocking {//构造flowval channelFlow = channelFlow<String> {send("hello world emit 线程: ${Thread.currentThread()}")}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

接着来简单分析其原理:

#ChannelFlow.kt
private open class ChannelFlowBuilder<T>(//闭包对象private val block: suspend ProducerScope<T>.() -> Unit,context: CoroutineContext = EmptyCoroutineContext,//Channel模式capacity: Int = Channel.BUFFERED,//Buffer满之后的处理方式,此处是挂起onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {//...override suspend fun collectTo(scope: ProducerScope<T>) =//调用闭包block(scope)//...
}public abstract class ChannelFlow<T>(// upstream context@JvmField public val context: CoroutineContext,// buffer capacity between upstream and downstream context@JvmField public val capacity: Int,// buffer overflow strategy@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {//produceImpl 开启的新协程会调用这internal val collectToFun: suspend (ProducerScope<T>) -> Unitget() = { collectTo(it) }public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =//创建Channel协程,返回Channel对象scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)//重写collect函数override suspend fun collect(collector: FlowCollector<T>): Unit =//开启协程coroutineScope {//发射数据collector.emitAll(produceImpl(this))}
}

produceImpl函数并不耗时,仅仅只是开启了新的协程。
接着来看collector.emitAll:

#Channels.kt
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {ensureActive()var cause: Throwable? = nulltry {//循环从Channel读取数据while (true) {//从Channel获取数据val result = run { channel.receiveCatching() }if (result.isClosed) {//如果Channel关闭了,也就是上游关闭了,则退出循环result.exceptionOrNull()?.let { throw it }break // returns normally when result.closeCause == null}//发射数据emit(result.getOrThrow())}} catch (e: Throwable) {cause = ethrow e} finally {//关闭Channelif (consume) channel.cancelConsumed(cause)}
}

从源码可能无法一眼厘清其流程,老规矩上图就会清晰明了。
image.png

上一小结丐版的实现就是参照channelFlow,若是了解了丐版,再来了解官方豪华版就比较容易。

2.2.3 ChannelFlow 应用场景

查看ChannelFlow衍生的子类:
image.png
这些子类是Flow里各种复杂操作符的基础,如:
buffer、flowOn、flatMapLatest、flatMapMerge等。
因此掌握了ChannelFlow再来看各种操作符就会豁然开朗。

2.3 callbackFlow 拯救你的回调

2.3.1 原理

使用channelFlow {},虽然能够在新的协程里执行闭包,但由于新协程的调度器是使用collect所在协程调度器不够灵活:

    fun test6() {runBlocking {//构造flowval channelFlow = channelFlow<String> {send("hello world emit 线程: ${Thread.currentThread()}")}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

collect所在的协程为runBlocking协程,而send函数虽然在新的协程里,但它的协程调度器使用的是collect协程的,因此send函数与collect函数所运行的线程是同一个线程。
虽然我们可以更改外层的调度器使之运行在不同的线程如:

    fun test6() {GlobalScope.launch {//构造flowval channelFlow = channelFlow<String> {send("hello world emit 线程: ${Thread.currentThread()}")}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

但终归不灵活,从设计的角度来说,Flow(对象)的提供者并不关心使用者在什么样的环境下进行collect操作。

还是以网络请求为例:

fun getName(callback:NetResult<String>) {thread {//假装从网络获取Thread.sleep(2000)callback.onSuc("I'm fish")}
}interface NetResult<T> {fun onSuc(t:T)fun onFail(err:String)
}

如上,存在这样一个网络请求,在子线程里进行网络请求,并通过回调通知外部调用者。
很典型的一个请求回调,该怎么把它封装为Flow呢?尝试用channelFlow进行封装:

    fun test7() {runBlocking {//构造flowval channelFlow = channelFlow {getName(object : NetResult<String> {override fun onSuc(t: String) {println("begin send")trySend("hello world emit 线程: ${Thread.currentThread()}")println("stop send")}override fun onFail(err: String) {}})}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

看似美好,实则却收不到数据,明明"begin send"和"stop send"都打印了,为啥collect闭包里没有打印呢?
getName函数内部开启了线程,因此它本身并不是耗时操作,由此可知channelFlow闭包很快就执行完成了。
由ChannelFlow源码可知:CoroutineScope.produce的闭包执行结束后会关闭Channel:
image.png
既然channel都关闭了,当子线程里回调onSuc并执行trySend并不会再往channel发送数据,collect当然就收不到了。

要解决这个问题也很简单:不让协程关闭channel,换句话说只要协程没有结束,那么channel就不会被关闭。而让协程不结束,最直接的方法就是在协程里调用挂起函数。
刚好,官方也提供了相应的挂起函数:

    fun test7() {runBlocking {//构造flowval channelFlow = channelFlow {getName(object : NetResult<String> {override fun onSuc(t: String) {println("begin send")trySend("hello world emit 线程: ${Thread.currentThread()}")println("stop send")//关闭channel,触发awaitClose闭包执行close()}override fun onFail(err: String) {}})//挂起函数awaitClose {//走到此,channel关闭println("awaitClose")}}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

相较上个Demo而言,增加了2点:

  1. awaitClose 挂起协程,该协程不结束,则channel不被关闭
  2. channel使用完成后需要释放资源,主动调用channel的close函数,该函数最终会触发awaitClose闭包执行,在闭包里做一些释放资源的操作

你可能会说以上用法不太友好,如果不知道有awaitClose这函数,都无法排查为啥没收到数据。
嗯,这官方也考虑到了,那就是callbackFlow。
image.png
可以看出就比channelFlow函数多了个判断:
若是执行了红框部分,说明该协程没有被挂起,则抛出异常提示我们在协程里调用awaitClose函数。

2.3.2 使用

和channelFlow的使用一模一样:

    fun test8() {runBlocking {//构造flowval channelFlow = callbackFlow {getName(object : NetResult<String> {override fun onSuc(t: String) {println("begin send")trySend("hello world emit 线程: ${Thread.currentThread()}")println("stop send")//关闭channel,触发awaitClose闭包执行
//                        close()}override fun onFail(err: String) {}})//挂起函数awaitClose {//走到此,channel关闭println("awaitClose")}}channelFlow.collect {println("下游收到=$it collect 线程: ${Thread.currentThread()}")}}}

有了callbackFlow,我们就可以优雅的将回调转为Flow提供给外部调用者使用。

3. Flow与Channel 互转

3.1 Channel 转 Flow

Flow和Channel相遇,碰撞出了ChannelFlow,ChannelFlow顾名思义,既是Channel也是Flow,因此可以作为中介对Flow与Channel进行转换。

    fun test9() {runBlocking {val channel = Channel<String>()val flow = channel.receiveAsFlow()GlobalScope.launch {flow.collect {println("collect:$it")}}delay(200)channel.send("hello fish")}}

channel通过send,flow通过collect收集。

3.2 Flow 转 Channel

    fun test10() {runBlocking {val flow = flow {emit("hello fish")}val channel = flow.produceIn(this)val data = channel.receive()println("data:$data")}}

flow.produceIn(this) 触发collect操作,进而执行flow闭包,emit将数据放到channel里,最后通过channel.receive()取数据。

下篇将完全解析Flow各种操作符,掌握了ChannelFlow再去看操作符相信你会如虎添翼。

本文基于Kotlin 1.5.3,文中完整实验Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103164.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文实验1、安装tensorflow运行节点嵌入相关方法

还是官方的教程好使 使用 pip 安装 TensorFlow 只有三步 1.安装python&#xff0c;版本太高不行&#xff0c;在推荐版本里选最高的。 2.安装python虚拟环境venv python -m venv --system-site-packages .\venv .\venv\Scripts\activate 3.在虚拟环境里装tensorflow pip…

开发人员应考虑使用 Edge浏览器的 8 个理由

1.无限访问ChatGPT 这是正确的。您可以通过 Bing 访问 GPT-4。但与 2021 年后没有数据的 ChatGPT 不同&#xff0c;必应通过从自己的搜索引擎中提取最新数据来对其进行补偿。 首先&#xff0c;点击Edge 浏览器左上角的Bing 小图标 Bing 具有三个选项卡&#xff1a;Chat、Compo…

VSCode连接远程服务器调试代码详细流程

文章目录 1.远程连接服务器2. 打开项目文件目录3. 配置调试环境 在研究人工智能项目时&#xff0c;很多时候本地机器性能不够&#xff0c;只能把代码拉倒服务器上&#xff0c;然后利用服务器资源来运行代码。遇到问题时需要调试&#xff0c;本文详细介绍利用VScode来调试远程服…

DAB-Deformable-DETR代码学习记录之模型构建

DAB-DETR的作者在Deformable-DETR基础上&#xff0c;将DAB-DETR的思想融入到了Deformable-DETR中&#xff0c;取得了不错的成绩。今天博主通过源码来学习下DAB-Deformable-DETR模型。 首先我么看下Deformable的创新之处&#xff1a; Deformable-DETR创新 多尺度融合 首先便是…

layui 表格中嵌入下拉框被遮挡

1、代码 单元格样式&#xff1a; * 设置下拉框的高度与表格单元相同 */.layui-table-cell {width: 100%;height: 100%;border: 1px;border-color: #F2F2F2;} 表格初始化后的回调&#xff1a; done: function (res, curr, count) {$(".layui-table-body, .layui-tabl…

MC9S12G128开发板—实现按键发送CAN报文指示小车移动功能

实验环境&#xff1a;MC9S12G128开发板 基本功能&#xff1a;控制开发板上的按键&#xff0c;模拟车辆移动的上下左右四个方位&#xff0c;通过can通信告诉上位机界面&#xff0c;车辆轨迹的移动方位。 1. 1939报文发送的示例代码 MC9S12G128开发板1939协议发送can报文数据的…

php+vue 校友交流平台

1.普通用户功能分析 &#xff08;1&#xff09;用户注册&#xff1a;用于注册校友录用户。 &#xff08;2&#xff09;用户登录&#xff1a;供校友录用户登录。 &#xff08;3&#xff09;资料修改&#xff1a;修改当前登录使用者信息。 &#xff08;4&#xff09;…

“量子+生成式AI”!IBM联合生物制药公司Moderna进行疫苗研究

​ &#xff08;图片来源&#xff1a;网络&#xff09; 4月20日&#xff0c;以COVID-19疫苗而闻名的生物技术和制药公司Moderna Inc.表示&#xff0c;宣布正在与IBM公司合作&#xff0c;利用量子计算和生成式人AI探索推进研究mRNA技术的方法。 双方签署了一项协议&#xff0c;允…

【社区图书馆】Fundamentals Of Computer Graphics——The beginning of computer graphics

目录 English 中文 English "Fundamentals Of Computer Graphics" is a classic textbook on computer graphics, also known as the "Tiger Book". It is considered one of the best introductory texts in the field of computer graphics. The book …

022 - C++ 析构函数

上期我们讨论了构造函数。认识了它是什么以及如何使用它。如果你没有看上一期&#xff0c;那么你一定要回去看一下。 今天我们要讨论一下它的“孪生兄弟”&#xff0c;析构函数&#xff0c;它们在某些方面非常相似。 构造函数是你创建一个新的实例对象时运行&#xff0c;而析…

【iOS】AVPlayer 视频播放

视频播放器的类别 iOS开发中不可避免地会遇到音视频播放方面的需求。 常用的音频播放器有 AVAudioPlayer、AVPlayer 等。不同的是&#xff0c;AVAudioPlayer 只支持本地音频的播放&#xff0c;而 AVPlayer 既支持本地音频播放&#xff0c;也支持网络音频播放。 常用的视频播放…

深入理解Javascript事件处理机制

深入理解javascript事件处理机制 前言 在开发web应用程序时&#xff0c;事件处理机制是javascript中至关重要的一部分。许多高级特性&#xff0c;如事件冒泡、事件捕获和事件委托&#xff0c;都是通过事件处理来实现的。熟练掌握这些技术可以帮助我们更好地组织代码、提高代码…

C++篇----类、封装、类访问权限、类实例化

文章目录 一、面向过程和面向对象二、类 一、面向过程和面向对象 c语言是面向过程的编程语言 c是面向对象的编程语言 面向过程&#xff1a;关注过程&#xff0c;对于求解问题的不走&#xff0c;调用函数逐步解决问题 就洗衣服来说&#xff1a;洗衣服需要放水&#xff0c;倒洗衣…

10个必备的建筑可视化3dmax插件

当日复一日地处理项目时&#xff0c;很容易陷入舒适但效率不高的工作流程中。 插件是在不牺牲工作质量的情况下改进和加快工作流程的好方法。 尤其是在建筑可视化时&#xff0c;快节奏的行业往往需要艺术家灵活机智。 在本文中&#xff0c;我们将介绍 10 个最好的 3ds Max 插件…

C语言从入门到精通第11天(数组的基本操作)

数组的基本操作 数组的概念一维数组二维数组 数组的概念 在程序设计中&#xff0c;为了方便处理数据把具有相同类型的若干变量按有序形式集合在一起&#xff0c;这些按序排列的同类数据元素的集合称为数组。 在C语言中&#xff0c;数组属于构造数据类型&#xff0c;一个数组可…

Linux文本处理三大利器Grep、AWK、Sed

写在前面 Linux三剑客是文本处理工具&#xff0c;它们可以帮助我们快速、高效地对文本进行处理。其中包括了grep、awk、以及sed这三个强大的命令行工具。 Linux 三剑客主要作用: grep&#xff0c;它可以根据正则表达式查找相关内容并打印对应的数据。awk&#xff0c;它可以根…

C. Painting the Fence(思维 + 前缀和)

Problem - C - Codeforces You需要油漆一个由n个部分组成的长围栏。不幸的是&#xff0c;它没有被涂漆&#xff0c;所以你决定雇用q名画家来完成这项工作。第i名画家将会油漆所有满足lisxsri的部分x. 不幸的是&#xff0c;你的预算很紧&#xff0c;所以你只能雇用q-2名画家。显…

数据湖Iceberg-简介(1)

文章目录 Iceberg简介概述特性数据存储、计算引擎插件化实时流批一体数据表演化&#xff08;Table Evolution&#xff09;模式演化&#xff08;Schema Evolution&#xff09;分区演化&#xff08;Partition Evolution&#xff09;列顺序演化&#xff08;Sort Order Evolution&a…

itop-3568开发板驱动学习笔记(22)设备树(一)设备树基础

《【北京迅为】itop-3568开发板驱动开发指南.pdf》 学习笔记 文章目录 设备树简介设备树编译设备树语法设备根节点设备子节点节点名称reg 属性#address-cell 和 #size-cells 属性model 属性status 属性compatible 属性aliases 节点chosen 节点device_type 属性自定义属性 设备树…

Linux云服务器的使用,以及运行Python程序

目录 1、使用Linux云服务器的软件 2、Linux系统运行Python程序 3、Linux系统查看包、虚拟环境、安装包等 以下几个深度学习服务器都不错&#xff1a;智星云、AutoDL、恒源云 1、使用Linux云服务器的软件 MobaXterm_Personal 推荐MobaXterm_Personal mobaxterm是一款方便网站…