Flink State 状态管理

news/2024/2/25 20:07:47/文章来源:https://blog.csdn.net/gwc791224/article/details/135574235

文章目录

  • 前言
  • 一、状态分类
  • 二、keyed代码示例
    • ListState
    • MapState
  • 总结


前言

状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容:

  • 状态数据的存储和访问
    在Task内部,如何高效地保存状态数据和使用状态数据。
  • 状态数据的备份和恢复
    作业失败是无法避免的,那么就要考虑如何高效地将状态数据保存下来,避免状态备份降低集群的吞吐量,并且在Failover时恢复作业到失败前的状态。
  • 状态数据的划分和动态扩容
    作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分,在作业修改并行度导致Task数据改变的时候,如何确保正确地恢复。

一、状态分类

State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、ReducingState、AggregatingState。

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

二、keyed代码示例

更多代码示例请下载Flink State体系剖析以及案例实践

ListState

代码如下:


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 需求:当接收到的相同 key 的元素个数等于 3个,就计算这些元素的 value 的平均值。* 计算keyed stream中每3个元素的 value 的平均值*/
public class TestKeyedStateMain {public static void main(String[] args) throws  Exception{//获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(12);//获取数据源DataStreamSource<Tuple2<Long, Long>> dataStreamSource =env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 7L),Tuple2.of(2L, 4L),Tuple2.of(1L, 5L),Tuple2.of(2L, 2L),Tuple2.of(2L, 6L));/*** 1L, 3L* 1L, 7L* 1L, 5L** 1L,5.0 double** 2L, 4L* 2L, 2L* 2L, 6L** 2L,4.0 double***/// 输出://(1,5.0)//(2,4.0)dataStreamSource.keyBy(tuple -> tuple.f0) //分组.flatMap(new CountAverageWithListState()).print();env.execute("TestStatefulApi");}
}import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;import java.util.Collections;
import java.util.List;/***  ListState<T> :这个状态为每一个 key 保存集合的值*      get() 获取状态值*      add() / addAll() 更新状态值,将数据放到状态中*      clear() 清除状态*/
public class CountAverageWithListStateextends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {// managed keyed state/*** ValueState : 里面只能存一条元素* ListState : 里面可以存很多数据*/private ListState<Tuple2<Long, Long>> elementsByKey;@Overridepublic void open(Configuration parameters) throws Exception {// 注册状态ListStateDescriptor<Tuple2<Long, Long>> descriptor =new ListStateDescriptor<Tuple2<Long, Long>>("average",  // 状态的名字Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型elementsByKey = getRuntimeContext().getListState(descriptor);}@Overridepublic void flatMap(Tuple2<Long, Long> element,Collector<Tuple2<Long, Double>> out) throws Exception {// 拿到当前的 key 的状态值Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();// 如果状态值还没有初始化,则初始化if (currentState == null) {elementsByKey.addAll(Collections.emptyList());}// 更新状态elementsByKey.add(element);// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());if (allElements.size() == 3) {long count = 0;long sum = 0;for (Tuple2<Long, Long> ele : allElements) {count++;sum += ele.f1;}double avg = (double) sum / count;out.collect(Tuple2.of(element.f0, avg));// 清除状态elementsByKey.clear();}}
}

MapState

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;import java.util.List;
import java.util.UUID;/***  MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合*      put() 将对应的 key 的键值对放到状态中*      values() 拿到 MapState 中所有的 value*      clear() 清除状态*/
public class CountAverageWithMapStateextends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {// managed keyed state//1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值/*** MapState:*      Map集合的特点,相同key,会覆盖数据。*/private MapState<String, Long> mapState;@Overridepublic void open(Configuration parameters) throws Exception {// 注册状态MapStateDescriptor<String, Long> descriptor =new MapStateDescriptor<String, Long>("average",  // 状态的名字String.class, Long.class); // 状态存储的数据类型mapState = getRuntimeContext().getMapState(descriptor);}/**** @param element* @param out* @throws Exception*/@Overridepublic void flatMap(Tuple2<Long, Long> element,Collector<Tuple2<Long, Double>> out) throws Exception {mapState.put(UUID.randomUUID().toString(), element.f1); //list// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出List<Long> allElements = Lists.newArrayList(mapState.values());if (allElements.size() == 3) {long count = 0;long sum = 0;for (Long ele : allElements) {count++;sum += ele;}double avg = (double) sum / count;//out.collect(Tuple2.of(element.f0, avg));// 清除状态mapState.clear();}}
}

总结

  1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed
    state 的数值总是与一个 current key 对应。
  2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state
    backend 有 on-heap 和 off-heap(RocksDB)的多种实现。
  3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现
    snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。
  4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是
    相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。
    更多内容和代码示例请下载Flink State体系剖析以及案例实践

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_925287.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电脑/设备网络共享给其他设备上网

文章目录 一、概述二、设置网络共享2.1 电脑可以上网&#xff0c;通过网络共享让其他设备也可以上网2.2 手机如何使用USB数据线共享网络给电脑 一、概述 现在有如下几种情况&#xff1a; 设备本身不能上网&#xff0c;需要通过电脑上网 笔记本WIFI连热点上网&#xff0c;然后…

【贪心】重构字符串

/*** 思路&#xff1a;如果s长度小于2&#xff0c;直接返回s&#xff0c;假设字符串s的长度为n。* n为偶数&#xff0c;如果字符串中的某个字符数量超过 n/2 则肯定会存在相邻的字符。* n为奇数&#xff0c;如果字符串中的某个字符的数量超过 &#xff08;n1&am…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 第1章 HTML5+CSS3初体验 项目1-1 三栏布局页面

项目展示 三栏布局是一种常用的网页布局结构。 除了头部区域、底部区域外&#xff0c;中间的区域&#xff08;主体区域&#xff09;划分成了三个栏目&#xff0c;分别是左侧边栏、内容区域和右侧边栏&#xff0c;这三个栏目就构成了三栏布局。当浏览器的宽度发声变化时&#x…

十四.变量、异常处理

变量、异常处理 1.变量1.1系统变量1.1.1系统变量分类1.1.2查看系统变量 1.2用户变量1.2.1用户变量分类1.2.2会话用户变量1.2.3局部变量1.2.4对比会话用户变量与局部变量 补充:MySQL 8.0的新特性—全局变量的持久化 2.定义条件与处理程序2.1案例分析2.2定义条件2.3定义处理程序2…

设计模式-- 3.适配器模式

适配器模式 将一个类的接口转换成客户希望的另外一个接口。使得原本由于接口不兼容而不能一起工作的那些类可以一起工作。 角色和职责 请求者&#xff08;client&#xff09;&#xff1a;客户端角色,需要使用适配器的对象&#xff0c;不需要关心适配器内部的实现&#xff0c;…

坑记(HttpInputMessage)

一、背景知识 public interface HttpInputMessage extends HttpMessage Represents an HTTP input message, consisting of headers and a readable body.Typically implemented by an HTTP request on the server-side, or a response on the client-side.Since: 3.0 Author:…

【JaveWeb教程】(26) Mybatis基础操作(新增、修改、查询、删除) 详细代码示例讲解(最全面)

目录 1. Mybatis基础操作1.1 需求1.2 准备1.3 删除1.3.1 功能实现1.3.2 日志输入1.3.3 预编译SQL1.3.3.1 介绍1.3.3.2 SQL注入1.3.3.3 参数占位符 1.4 新增1.4.1 基本新增1.4.2 主键返回 1.5 更新1.6 查询1.6.1 根据ID查询1.6.2 数据封装1.6.3 条件查询1.6.4 参数名说明 1. Myb…

Redis集群Cluster和分片

1.Cluster集群介绍 背景 Sentinel解决了主从架构故障自动迁移的问题但是master主节点的写能力和存储能力依旧受限使用Redis的集群Cluster就是为了解决单机Redis容量有限的问题&#xff0c;将数据按一定的规则分配到多台机器 什么是集群Cluster 是一组相互独立的、通过告诉网络…

ssm基于Java的药店药品信息管理系统的设计与实现论文

摘 要 传统信息的管理大部分依赖于管理人员的手工登记与管理&#xff0c;然而&#xff0c;随着近些年信息技术的迅猛发展&#xff0c;让许多比较老套的信息管理模式进行了更新迭代&#xff0c;药品信息因为其管理内容繁杂&#xff0c;管理数量繁多导致手工进行处理不能满足广大…

布隆过滤器四种实现(Java,Guava,hutool,Redisson)

1.背景 为预防大量黑客故意发起非法的时间查询请求&#xff0c;造成缓存击穿&#xff0c;建议采用布隆过滤器的方法解决。布隆过滤器通过一个很长的二进制向量和一系列随机映射函数&#xff08;哈希函数&#xff09;来记录与识别某个数据是否在一个集合中。如果数据不在集合中…

Alibaba-> EasyExcel 整理3

1 导入依赖 <!-- easyExcel --><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version >3.2.1</version><exclusions><exclusion><artifactId>poi-ooxml-schemas</art…

SQL-分页查询and语句执行顺序

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

LLVM系列(1): 在微软Visual Studio下编译LLVM

参考链接&#xff1a; Getting Started with the LLVM System using Microsoft Visual Studio — LLVM 18.0.0git documentation 1.安装visualstudio&#xff0c;版本需要大于vs2019 本机环境已安装visual studio2022&#xff0c;省略 2安装Makefile&#xff0c;版本需要大…

【K8s学习】

k8s的简单执行流程&#xff1a; Kubernetes Master&#xff08;API Server、Scheduler等组件&#xff09;负责调度Pod到合适的Node上。 当Pod被调度到某个Node时&#xff0c;该Node上的kubelet代理会收到指令并开始执行Pod的生命周期管理任务&#xff0c;包括创建、监控和终止P…

【Python数据可视化】matplotlib之绘制常用图形:折线图、柱状图(条形图)、饼图和直方图

文章传送门 Python 数据可视化matplotlib之绘制常用图形&#xff1a;折线图、柱状图&#xff08;条形图&#xff09;、饼图和直方图matplotlib之设置坐标&#xff1a;添加坐标轴名字、设置坐标范围、设置主次刻度、坐标轴文字旋转并标出坐标值matplotlib之增加图形内容&#x…

阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小黄&#xff0c;阿里巴巴淘天Java开发工程师&#xff0c;CSDN博客专家&#x1f4d5;系列专栏&#xff1a;Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列&#x1f525;如果感觉博主的文章还不错…

JVM基础(7)——ParNew垃圾回收器

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…

市场复盘总结 20240116

仅用于记录当天的市场情况&#xff0c;用于统计交易策略的适用情况&#xff0c;以便程序回测 短线核心&#xff1a;不参与任何级别的调整&#xff0c;采用龙空龙模式 昨日主题投资 连板进级率 18% 二进三&#xff1a; 进级率低 60% 最常用的二种方法&#xff1a; 方法一&am…

【iOS】数据持久化(四)之FMDB基本使用

正如我们前面所看到的&#xff0c;原生SQLite API在使用时还是比较麻烦的&#xff0c;于是&#xff0c;开源社区就出现了一系列将SQLite API进行封装的库&#xff0c;其中FMDB的被大多数人所使用 FMDB和SQLite相比较&#xff0c;SQLite比较原始&#xff0c;操作比较复杂&#…

GPT/GPT4在人工智能,深度学习,编程等领域应用

详情点击链接&#xff1a;GPT/GPT4在人工智能&#xff0c;深度学习&#xff0c;编程等领域应用 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Ge…