Koltin协程:异步热数据流的设计与使用

news/2024/4/30 1:11:31/文章来源:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126919887

一.异步冷数据流

    在Kotlin协程:协程的基础与使用中,通过使用协程中提供的flow方法可以创建一个Flow对象。这种方法得到的Flow对象实际上是一个异步冷数据流,代码如下:

private suspend fun test() {val flow = flow {emit(1)emit(2)emit(3)emit(4)}GlobalScope.launch {// 触发flow执行flow.collect {Log.d("liduo", "test1: $it")}}GlobalScope.launch {// 再次触发flow执行flow.collect {Log.d("liduo", "test2: $it")}}
}

    在上面的代码中,通过调用flow方法,构建了一个名为flow对象,并对flow对象异步执行了两次。每次都会打印出1、2、3、4,然后结束执行。无论谁在前谁在后,无论执行多少次,得到的结果都是相同的,这就是异步冷数据流的一个特点。

二.异步热数据流

    既然有冷数据流,那就一定有热数据流。在协程中提供了MutableSharedFlow方法来创建异步热数据流。相比于异步冷数据流,异步热数据流一般在类似广播订阅的场景中使用。

1.异步热数据流的设计

    在异步热数据流中,核心接口的继承关系如下图所示:
在这里插入图片描述

1)SharedFlow接口

    SharedFlow接口继承自Flow接口,代码如下:

public interface SharedFlow<out T> : Flow<T> {// 用于保存最近的已经发送的数据public val replayCache: List<T>
}
  • replay缓存:每个SharedFlow类型的对象会将最新发射的数据保存到replayCache中,每一个新的订阅者会先从replayCache中获取数据,然后再获取最新发射的数据。
  • 订阅过程:在SharedFlow中,每个FlowCollecter类型的对象都被称为订阅者。调用SharedFlow类型对象的collect方法会触发订阅。正常情况下,订阅不会自动结束,但订阅者可以取消订阅,当订阅者所在的协程被取消时,订阅过程就会取消。
  • 操作符使用:对于大部分终端操作符,比如:toList方法,当对SharedFlow类型的对象使用这些操作符将永远不会结束或完成变换(toList用于将上游发射的所有数据保存到列表中,并返回列表)。对于部分用于截断流的操作符,比如:take方法,当对SharedFlow类型的对象使用这些操作符可以完成变换(take用于截取指定数量的上游流发射的数据)。当对SharedFlow类型的对象使用flowOn操作符、cancellable操作符,或使用指定参数为RENDEZVOUS的buffer操作符是无效的。
  • SharedFlow并发: SharedFlow中所有的方法都是线程安全的,并且可以在多协程并发的场景中使用且不必额外加锁。
  • 冷流转换热流:对于一个冷流,可以通过调用shareIn方法,转换为一个热流。
  • SharedFlow与BroadcastChannel的区别:从概念上讲,SharedFlow与BroadcastChannel很相似,但二者也有很大的差别,推荐使用SharedFlow,SharedFlow设计的目的就是要在未来替代BroadcastChannel:
    • SharedFlow更简单,不需要实现一堆与Channel相关的接口。
    • SharedFlow支持配置replay缓存与缓存溢出策略。
    • SharedFlow清楚地划分了只读的SharedFlow和可读可写的SharedFlow。
    • SharedFlow不能关闭,也不能表示失败,因此如果需要,所有的错误与完成信号都应该具体化。

2)MutableSharedFlow接口

    MutableSharedFlow接口继承自SharedFlow接口与FlowCollector接口,并在此基础上定义了两个方法与一个常量,代码如下:

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {// 该方法用于尝试发射一个数据,// 当返回true时表示发射成功,返回false时,表示缓存空间不足,需要挂起。public fun tryEmit(value: T): Boolean// 该常量表示当前SharedFlow的订阅者的数量,// 该常量是一个状态流StateFlow,也是一个热流,当其中数值发生变化时会进行回调通知public val subscriptionCount: StateFlow<Int>// 用于清空replayCache// 在调用该方法之前老的订阅者,可以继续收到replaycache中的缓存数据,// 在调用该方法之后的新的订阅者,只能收到emit方法发射的新数据@ExperimentalCoroutinesApipublic fun resetReplayCache()
}

2.异步热数据流的使用

1)MutableSharedFlow方法

    在协程中,可以通过调用MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:

public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {...
}

    其中构造方法中三个参数的含义如下:

  • replay:表示新订阅的接收者可以收到的最近已经发射的数据的数量,默认为0。
  • extraBufferCapacity:表示除replay外,当发射速度大于接收速度时数据可缓存的数量,默认为0。
  • onBufferOverflow:表示当缓存已满,数据即将溢出时的数据的处理策略,默认为SUSPEND。

    当创建MutableSharedFlow类型的对象时,可以通过参数replay确定SharedFlow接口中定义的replayCache的最大容量,通过参数extraBufferCapacity设置一个不包括replay大小的缓存数量。replayCache本质上也是缓存的一部分,因此extraBufferCapacity与replay共同决定了缓存的大小。

    对于处理数据慢的订阅者,可以通过从缓存中获取数据,以此来避免发射者的挂起。缓存的数量大小决定了数据处理快的订阅者与数据处理慢的订阅者之间的延迟程度。

    当使用默认的构造方法创建MutableSharedFlow类型的对象时,它的缓存数量为0。当调用它的emit方法时会直接挂起,直到所有的订阅者都处理完当前emit方法发送的数据,才会恢复emit方法的挂起。如果MutableSharedFlow类型的对象没有订阅者,则调用emit方法会直接返回。

2)使用示例

    代码如下:

private suspend fun test() {// 创建一个热流val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)// 启动一个协程,发射数据:1// 由于有缓存,因此会被添加到缓存中,不会挂起GlobalScope.launch {flow.emit(1)}// 将MutableSharedFlow对象转换为SharedFlow对象// SharedFlow对象不能调用emit方法,因此只能用于接收val onlyReadFlow = flow.asSharedFlow()// 接收者1// 启动一个新协程GlobalScope.launch {// 订阅监听,当collect方法触发订阅时,会首先会调onSubscription方法onlyReadFlow.onSubscription {Log.d("liduozuishuai", "test0: ")// 发射数据:3// 向下游发射数据:3,其他接收者收不到emit(3)}.onEach {// 处理接收的数据Log.d("liduozuishuai", "test1: $it")}.collect()}// 接收者2// 启动一个新的协程GlobalScope.launch {// 触发并处理接收的数据onlyReadFlow.collect {Log.d("liduozuishuai", "test2: $it")}}// 发送数据:2GlobalScope.launch {flow.emit(2)}
}

    对于上面的代码,接收者1会依次打印出:3、1、2,接收者2会依次打印出1、2。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_9925.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ESP8266升級SDK到V3.0版本編譯報錯

編譯報錯信息 bin/libmain2.a(app_main.o): In function `user_uart_wait_tx_fifo_empty: (.irom0.text+0x678): undefined reference to `user_pre_init bin/libmain2.a(app_main.o): In function `system_phy_freq_trace_enable: (.irom0.text+0x6e4): undefined reference t…

SpringCloud基础7——Redis分布式缓存

用于复习快速回顾。 目录 1.Redis持久化 1.1.数据备份文件RDB持久化方案 1.1.1.执行时机 1.1.2.RDB原理 1.1.3.小结&#xff0c;bgsave流程、执行时间、缺点 1.2.追加文件AOF持久化方案 1.2.1.AOF原理 1.2.2.AOF配置 1.2.3.AOF文件重写 1.3.RDB与AOF对比 2.Redis主…

getBean方法源码

一、 三个API // 根据name获取bean Override public Object getBean(String name) throws BeansException {return doGetBean(name, null, null, false); }// 根据name获取bean&#xff0c;如果获取到的bean和指定类型不匹配&#xff0c;则抛出异常 Override public <T>…

C++11 - 8 -智能指针

C11 - 智能指针前言&#xff1a;普通指针&#xff1a;安全隐患&#xff1a;其他函数异常&#xff1a;new函数异常&#xff1a;智能指针&#xff1a;RAII原理&#xff1a;smart_ptr&#xff1a;auto_ptr&#xff1a;管理权转移&#xff1a;优点&#xff1a;缺点&#xff1a;uniq…

报告分享|2022汽车生态营销白皮书

报告链接&#xff1a;http://tecdat.cn/?p28679 不确定性增加&#xff0c;消费意愿在压力下等待释放 今年3月以来&#xff0c;受国际冲突和国内环境双重影响&#xff0c;能源价格大幅上涨&#xff0c;导致全球供应不稳定趋势加深&#xff0c;宏观经济下行压力明显&#xff0…

程序设计竞赛-过了这个村没这个店

文章目录个人经验竞赛简介蓝桥杯天梯赛CCPCICPC其他个人经验 初闻不知曲中意&#xff0c;再闻已是曲中人。 标题无意夸张&#xff0c;但是竞赛生涯的时间真的不长&#xff0c;机会真的错过了就没有了。一般来说&#xff0c;大一打基础&#xff0c;学习编程语言、数据结构和算法…

Moment.js的常用函数、借助vue和Moment.js实现一个简单的时钟

前言 项目中关于时间的处理是挺常见的&#xff0c;虽然之前就知道有Moment.js这个库&#xff0c;但是一直没有接触过。只不过最近同事在项目中使用了&#xff0c;那也只能简单学习一下&#xff0c;不然遇到了完全看不懂。 本文只介绍一下常用的函数&#xff0c;其他内容可以在…

想换工作?那还不赶紧来看看这份面试题

引言 “寒冬”之下&#xff0c;诸如 “Android 凉了”之类的话我已经屡见不鲜了&#xff0c;现在互联网行业的热潮已经褪去&#xff0c;开始恢复冷静&#xff1b;这样一来&#xff0c;互联网公司就会面向大量的开发者们&#xff0c;因此对应的要求只会越来越高&#xff1b;据反…

用纯css实现一个图片拼接九宫格

<style> body{ margin: 0; padding: 0; // 设定居中 display: flex; justify-content: center; align-items: center; height: 100vh; } .container{ width: 300px; height: 300px; display: flex; // 子盒子布局&#xff0c;要让子盒子之间有间隙就把宽高设大一些。 jus…

报告分享|2022年中国机器人产业图谱及云上发展研究报告

报告链接&#xff1a;http://tecdat.cn/?p28681 报告在分析当前我国机器人市场现状与产业图谱的基础上&#xff0c;对人工智能、5G、云计算、边缘计算等新兴技术赋能机器人智能化、轻量化、柔性化发展进行了理性探讨&#xff0c;结合阿里云加速器企业案例探讨了机器人企业的上…

连接查询-mysql详解(五)

上篇文章说了&#xff0c;mysql5.6.6版本之前数据默认在系统表空间&#xff0c;之后默认在独立表空间&#xff0c;innodb因为索引和数据在一个b树&#xff0c;所以两个文件&#xff0c;一个文件结构&#xff0c;一个存数据&#xff0c;myISAM则是三个文件。一个聚簇索引有两个段…

小程序云开发学习笔记

小程序云开发学习笔记 初始化 在app.js里面 小程序一开始就初始化&#xff0c;多次调用只有第一次触发 onLaunch() { console.log("小程序打开"); wx.cloud.init({ env: ayang-8g50ew302a3a6c5a, //云开发id }) } 数据库操作 查询&#xff08;一定要配置数据权限&a…

高等工程数学 —— 第一章 (1)距离与范数

前言 研一生活开始了&#xff0c;看了大家对我之前博客的鼓励让我知道写博客是一件多么有意义的事情。写这些让我遇见许多陌生的有缘人&#xff0c;有老骥伏枥的大叔、也有可爱温暖的学妹…… 这里将高等工程数学的笔记留给不爱吃香菜的月亮&#xff0c;希望这些陪伴过我的微光…

ElasticSearch(四)【高级查询】

四、高级查询 说明 ES中提供了一种强大的检索数据方式&#xff0c;这种检索方式称之为Query DSL&#xff0c;Query DSL是利用Rest API传递JSON格式的请求体&#xff08;Request Body&#xff09;数据与ES进行交互&#xff0c;这种方式的丰富查询语法让ES检索变得更强大&#xf…

Grafana alert预警+钉钉通知

1 Grafana alert预警 如下图所示&#xff0c;主要是前3步&#xff0c;设置alert rules、contact points 、notification policies。alert rules主要设置触发警告的规则&#xff1b;contact points设置通过什么发送预警&#xff0c;如钉钉&#xff1b;notification policies 将…

哲学家干饭问题 C++

哲学家干饭问题 C 哲学家就餐问题可以这样表述&#xff0c;假设有五位哲学家围坐在一张圆形餐桌旁&#xff0c;做以下两件事情之一&#xff1a;吃饭&#xff0c;或者思考。吃东西的时候&#xff0c;他们就停止思考&#xff0c;思考的时候也停止吃东西。餐桌上有五碗意大利面&am…

Vue2.0到3.0的过渡,setup,ref函数,reactive函数,计算属性computed

setup 1、Vue3.0的组件中所有用到的:数据、方法等等&#xff0c;均要配置在setup中&#xff0c;若要使用里面的数据&#xff0c;可以用return将其返回出来 2、若在setup中返回的是一个对象&#xff0c;则对象中的数据、方法、在模板中均可直接使用 例如 <template><di…

[Git] 系列三随意修改提交记录以及一些技巧

[Git] 系列三随意修改提交记录以及一些技巧 Author: Xin Pan Date: 2022.09.17 文章目录[Git] 系列三随意修改提交记录以及一些技巧整理提交记录未知提交号哈希值时怎么办&#xff1f;一些技巧本地栈式提交方法一方法二TagDescribe高级命令总结好了&#xff0c;大概总结好了。…

搭建游戏要选什么样的服务器?

服务器是游戏平台数据传输的重要载体&#xff0c;事关我们游戏创业发展的稳定性、安全性。那么&#xff0c;游戏平台搭建要选什么服务器&#xff1f;有什么参考指标&#xff1f;本文将带领大家一探究竟&#xff01; 首先是“游戏平台搭建要选择什么服务器”&#xff0c;我们可…

论文阅读_对比学习_SimCSE

英文题目&#xff1a;SimCSE: Simple Contrastive Learning of Sentence Embeddings 中文题目&#xff1a;SimSCE&#xff1a;用简单的对比学习提升句嵌入的质量 论文地址&#xff1a;https://export.arxiv.org/pdf/2104.08821.pdf 领域&#xff1a;自然语言处理&#xff0c;对…