Flink主要有两种基础类型的状态:operator state。

news/2024/4/27 3:55:31/文章来源:https://blog.csdn.net/qq_42496461/article/details/130308784

Flink主要有两种基础类型的状态:keyed state 和operator state。
Operator State
对于Operator State(或者non-keyed state),每个operator state绑定到一个并行operator实例上。在Flink中,Kafka Connector是一个使用Operator State的很好的例子。每个并行Kafka消费者实例维护一个主题分区和偏移的map作为它的Operator State。
当并行度被修改时,Operator State接口支持在并行operator实例上重新分配状态。进行这种重新分配可以有不同的方案。
Raw and Managed State
Keyed State 和 Operator State 有两种形式: managed和raw。
Managed State表示数据结构由Flink runtime控制,例如内部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime层会编码State并将其写入checkpoint中。
Raw State是操作算子保存在它的数据结构中的state。当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。
所有的数据流函数都可以使用managed state,但是raw state接口只可以在操作算子的实现类中使用。推荐使用managed state(而不是raw state),因为使用managed state,当并行度变化时,Flink可以自动的重新分布状态,也可以做更好的内存管理。
注意 如果你的managed state需要自定义序列化逻辑,请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。

managed non-keyed state
可以通过实现CheckpointedFunction或者ListCheckpointed接口,来使用managed non-keyed状态。

1.CheckpointedFunction
CheckpointedFunction接口通过不同的重新分配方案提供对non-keyed状态的访问。它需要实现两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
每当必须执行checkpoint时,都会调用snapshotState()。对应的initializeState()在每次初始化用户定义的函数时调用,可以是在函数第一次初始化时调用,也可以是在函数实际从较早的checkpoint恢复时调用。因此,initializeState()不仅是初始化不同类型状态的地方,也是状态恢复逻辑实现地方。

目前,支持List样式的管理操作状态。状态是一个可序列化对象的列表,彼此独立,因此在重新扫描时能够进行重新分区。换句话说,这些对象是可以重新分区no-keyed状态的最佳粒度。根据状态访问方法的不同,定义了以下重分区方案:

Even-split redistribution:每个操作算子返回一个状态元素列表。逻辑上串联起所有的列表就是状态元素完整列表。在恢复/重新分区时,该列表会均分成算子实例个数个子列表。每个操作算子实例获取一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果并行度为1,则操作算子的检查点状态包含元素element1和element2。当并行度增加到2时,element1可能会出现在算子实例0中,而element2会出现在算子实例1中。
Union redistribution: 每个操作算子返回一个状态元素列表。整个状态在逻辑上是串联起所有列表。在恢复/重新分发时,每个操作算子都获得状态元素的完整列表。
下面是一个有状态的SinkFunction,在讲数据元素写入外部存储之前使用CheckpointedFunction来缓存元素。主要是用来验证event-split充分布list状态。

下面的例子是一个有状态的SinkFunction,该sink会在数据发送到外部存储之前缓存数据元素。该例子是机遇均分重分布来实现的:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {

private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();
}@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}
}@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}
}@Override
public void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}
}

}
initializeState方法以FunctionInitializationContext作为参数。用于初始化non-keyed状态“containers”。这是ListState类型的容器,其中non-keyed状态对象将在checkpoint上存储。
留意状态是如何初始化的,类似于keyed状态,使用一个StateDescriptor,其中包含状态名和关于状态持有的值的类型的信息:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
“buffered-elements”,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含其重分区模式及其状态结构。例如,要在还原时使用具有union重分区方案的list state,使用getUnionListState(descriptor)访问状态。如果方法名不包含重分区模式,例如getListState(descriptor),它仅仅意味着将使用均分重分区模式(Even-split redistribution)。
在初始化container之后,我们使用上下文的isrestore()方法检查失败后是否正在恢复。如果是true,即正在恢复,则执行恢复逻辑。
如修改后的BufferingSink代码所示,状态初始化期间恢复的数据保存在一个ListState变量中,以备将来在snapshotState()中使用。在那里,ListState将清除前一个检查点包含的所有对象,然后被我们想要检查的新选项填满。
另外,keyed状态也可以在initializeState()方法中初始化。可以使用FunctionInitializationContext来完成。
2.ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的一个有限制的变体,它只支持列表样式的状态,在恢复时使用均分重分区方案。它还需要实现两种方法:
List snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List state) throws Exception;
在snapshotState()上,操作应该向检查点返回一个对象列表,而restoreState()必须在恢复时处理这个列表。如果状态不可重分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)。

有状态的源函数(Stateful Source Functions)
与其他操作符相比,有状态源需要更多的关注。为了更新状态和输出集合的原子性(用于故障/恢复上的精确一次语义),用户需要从源上下文获取一个锁。

public static class CounterSource
extends RichParallelSourceFunction
implements ListCheckpointed {

/**  current offset for exactly once semantics */
private Long offset;/** flag for job cancellation */
private volatile boolean isRunning = true;@Override
public void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}
}@Override
public void cancel() {isRunning = false;
}@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {return Collections.singletonList(offset);
}@Override
public void restoreState(List<Long> state) {for (Long s : state)offset = s;
}

}
当Flink完全确认检查点时,一些操作可能需要这些信息来与外部世界进行通信。在本例中,请参见org.apache.flink.runtime.state.CheckpointListener接口。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103245.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java每日一练(20230425)

目录 1. 乘积最大子数组 &#x1f31f;&#x1f31f; 2. 插入区间 &#x1f31f;&#x1f31f; 3. 删除有序数组中的重复项 II &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏…

CSGO搬砖,每天1-2小时,23年最强副业非它莫属(内附操作流程)

自从我学会了CSGO搬运&#xff0c;我发现生活也有了不小的改变&#xff0c;多了一份收入&#xff0c;生活质量也就提高了一份。 其实刚接触CSGO&#xff0c;我压根就不相信这么能挣钱&#xff0c;因为在印象中&#xff0c;游戏供玩家娱乐竞技的&#xff0c;作为我这种技术渣渣…

直播系统开发中如何优化API接口的并发

概述 在直播系统中&#xff0c;API接口并发的优化是非常重要的&#xff0c;因为它可以提高系统的稳定性和性能。本文将介绍一些优化API接口并发的方法。 理解API接口并发 在直播系统中&#xff0c;API接口是用于处理客户端请求的关键组件。由于许多客户端同时连接到系统&…

HTTP1.1(十二)Cookie的格式与约束

一 Cookie的格式与约束 ① Cookies是什么 1) cookie是我们在前端编程中经常使用的概念2) 使用cookie利用浏览器帮助我们保存客户的相关状态信息,保存用户已经做了什么事情3) 重点和难点[1]、cookie的工作原理[2]、cookie的限制是什么[3]、session又是怎样与cookie关联起来 …

90年三本程序员,8年5跳,年薪4万变92万……

很多时候&#xff0c;虽然跳槽可能带来降薪的结果&#xff0c;但依然有很多人认为跳槽可以涨薪。近日&#xff0c;看到一则帖子。 发帖的楼主表示&#xff0c;自己8年5跳&#xff0c;年薪4万到92万&#xff0c;现在环沪上海各一套房&#xff0c;再干5年码农&#xff0c;就可以…

【Vue】学习笔记-初始化脚手架

初始化脚手架 初始化脚手架说明具体步骤脚手架文件结构 初始化脚手架 说明 Vue脚手架是vue官方提供的标准化开发工具&#xff08;开发平台&#xff09;最新版本是4.x文档Vue CLI 具体步骤 如果下载缓慢请配置npm淘宝镜像 npm config set registry http://registry.npm.taoba…

【移动端网页布局】流式布局案例 ② ( 实现顶部固定定位提示栏 | 布局元素百分比设置 | 列表样式设置 | 默认样式设置 )

文章目录 一、样式测量及核心要点1、样式测量2、高度设定3、列表项设置4、设置每个元素的百分比宽度5、设置图像宽度 二、核心代码编写1、HTML 标签结构2、CSS 样式3、展示效果 三、完整代码示例1、HTML 标签结构2、CSS 样式3、展示效果 一、样式测量及核心要点 1、样式测量 京…

【ChatGPT】如何让 ChatGPT 不再频繁报错,获取更加稳定的体验?

文章目录 一、问题描述二、方案1&#xff1a;使用 OpenAI API Key 来访问 ChatGPT三、方案2&#xff1a;安装 Chrome 插件3.1 介绍3.2 安装步骤3.2.1 插件 & 脚本安装3.2.2 解读功能 一、问题描述 最近一段时间&#xff0c;相信大家都发现了 ChatGPT 一个问题&#xff0c;…

Unity音量滑块沿弧形移动

一、音量滑块的移动 1、滑块在滑动的时候&#xff0c;其运动轨迹沿着大圆的弧边展开 2、滑块不能无限滑动&#xff0c;而是两端各有一个挡块&#xff0c;移动到挡块位置&#xff0c;则不能往下移动&#xff0c;但可以折回 3、鼠标悬停滑块时&#xff0c;给出音量值和操作提示 …

前端 百度地图绘制路线加上图片

使用百度官方示例的方法根据起终点经纬度查询驾车路线但是只是一个线路 <template><div class"transportInfo"><div id"mapcontainer" class"map">11</div><div class"collapse"><el-collapse v-mo…

克隆Linux系统(centos)

克隆前得保证你有一台Linux系统的虚拟机了。 如果没有&#xff0c;可以参考这篇文章&#xff1a; 安装VMware虚拟机、Linux系统&#xff08;CentOS7&#xff09;_何苏三月的博客-CSDN博客 按照示意图一步一步执行即可。 克隆前先关闭运行的虚拟机系统。 然后右键已安装的虚拟…

【图像抠图】【深度学习】Ubuntu18.04下GFM官方代码Pytorch实现

【图像抠图】【深度学习】Ubuntu18.04下GFM官方代码Pytorch实现 提示:最近开始在【图像抠图】方面进行研究,记录相关知识点,分享学习中遇到的问题已经解决的方法。 文章目录 【图像抠图】【深度学习】Ubuntu18.04下GFM官方代码Pytorch实现前言数据集说明1.AM-2k【自然动物】2.B…

Ubuntu更新软件下载更新与移除

目录 一、更新软件源 二、下载与安装软件 三、如何移除软件 四、Ubuntu商店下载软件 一、更新软件源 更新Ubuntu软件源的操作步骤&#xff0c;更新软件源的目的就是&#xff0c;将在Ubuntu官网的软件源更改到本地&#xff0c;也就是国内的软件源&#xff0c;这样的话下载安…

HTML+CSS+JS 学习笔记(三)———Javascript(下)

&#x1f331;博客主页&#xff1a;大寄一场. &#x1f331;系列专栏&#xff1a;前端 &#x1f331;往期回顾&#xff1a;HTMLCSSJS 学习笔记&#xff08;三&#xff09;———Javascript(上) &#x1f618;博客制作不易欢迎各位&#x1f44d;点赞⭐收藏➕关注 目录 JavaScrip…

ES6 新特性的let--const 解构赋值--模板字符串--对象相关新特性--箭头函数--综合代码示例

目录 ES6 新特性 ES6 基本介绍 ES6 是什么? let 声明变量 演示 let 的基本使用 注意事项和使用细节 代码演示 : const 声明常量/只读变量 应用实例 注意事项和使用细节 解构赋值 基本介绍 应用实例-数组解构 应用实例-对象解构 模板字符串 基本介绍 应用实例…

一文带你学会如何写一份糟糕透顶的简历

我们每个人几乎都会面对找工作这件事&#xff0c;而找工作或者说求职首先就是要写一份简历。今天狗哥将以一个不同的视角带你写一份无与伦比&#xff0c;糟糕透顶的求职简历&#xff0c;说实话&#xff0c;其实几年前&#xff0c;我就是这么写的。 目录 1. 文件名 2. 基本信…

OpenAI ChatGPT 能取代多少程序员的工作?导致失业吗?

阅读原文&#xff1a;https://bysocket.com/openai-chatgpt-vs-developer/ ChatGPT 能取代多少程序员的工作&#xff1f;导致我们程序员失业吗&#xff1f;这是一个很好的话题&#xff0c;我这里分享下&#xff1a; 一、ChatGPT 是什么&#xff1f;有什么作用 ChatGPT是一种…

关于 OpenShift(OKD) 网络 Service、Routes的一些笔记

写在前面 参加考试&#xff0c;分享一些学习 OpenShift 的笔记博文内容为 OpenShift 网络相关组件 Service、Routes 很浅的一些认识学习环境为 openshift v3 的版本&#xff0c;有些旧这里如果专门学习 openshift &#xff0c;建议学习 v4 版本理解不足小伙伴帮忙指正 傍晚时分…

开源 AI 辅助编程工具 AutoDev 现已上架 Jetbrains 插件市场

我们非常高兴地宣布 AutoDev v0.2.0 的发布&#xff01;AutoDev 是一款强大的 AI 辅助编程工具&#xff0c;可以与 Jetbrains 系列 IDE 无缝集成&#xff08;VS Code 支持正在开发中&#xff09;。通过与需求管理系统&#xff08;如 Github Issue 等&#xff09;直接对接&#…

Vue收集表单数据学习笔记

收集表单数据 v-model双向数据绑定&#xff0c;收集的是input框的value&#xff0c;单选按钮不存在value&#xff0c;就像代码中的男女选项&#xff0c;即使绑定性别v-model“sex”&#xff0c;控制台依然不能接收性别的值&#xff0c;因为没有value值&#xff0c;&#xff0c…