Flink 的算子介绍(下)

news/2024/4/20 6:03:27/文章来源:https://blog.csdn.net/bluedraam_pp/article/details/129199381

上篇博客中,说了一下转化、分组、聚合,此博客接着连接。连接分为下面:

  • union : 将数据类型相同的流合并成一个流。
  • connect: 将数据类型不同的流合并一个流
  • cogroup: 将数据类型不同的流合并成一个流并写到缓存到窗口中。
  • join: 将数据类型不同的流合并成一个流并写到缓存到窗口中,当窗口被触发之后,两边的数据进行笛卡尔积式的计算。
  • interval join : 处理数据的逻辑基本和 join 差不多,多了一点式可以扩大两个流之间的匹配范围,比如,A 是 stream1 的数据,B 是 stream2 的数据,A.timestamp - interval time <= B.timestamp <= A.timestamp + interval time 的数据。
  • broadcast , 广播流,它会将广播流中的所有数据发送到另外一个流中的所有分区中,然后实现计算逻辑,这一特性可以让我们实现关联维表的功能。处理维表关联的其他方案还有异步I/O。这个会单独写一篇博客来讲解。

下面来展示一下所有 join 类型算子的功能。

union 的用法:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> src1 = env.socketTextStream("127.0.0.1", 6666);DataStreamSource<String> src2 = env.socketTextStream("127.0.0.1", 8888);DataStream<String> union = src1.union(src2);union.print("------");env.execute("test-union");

两个 source 从 socket 中读书数据,数据类型是 String 类型的,然后将两个流 union 起来,连接起来的数据都是一样的。

connect 的用法。当遇到得到两个 topic 中的数据之后,才能计算的情况下,需要使用 connect 将两个 topic 中的数据取出。下面的例子中,模拟了 inner join on 的效果,也就是取交集的效果,使用了 map state 来存储已经到来的数据,当另外一个流中的相关数据到来时,往下发送。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> src1 = env.socketTextStream("127.0.0.1", 6666);DataStreamSource<String> src2 = env.socketTextStream("127.0.0.1", 8888);KeyedStream<Integer, String> intSrc = src1.map(new RichMapFunction<String, Integer>() {@Overridepublic Integer map(String record) throws Exception {return Integer.parseInt(record);}}).keyBy(new KeySelector<Integer, String>() {@Overridepublic String getKey(Integer integer) throws Exception {return integer.toString();}});KeyedStream<String, String> keyedSrc2 = src2.keyBy(x -> x);/*** 模拟 inner join 的逻辑,取交集* */intSrc.connect(keyedSrc2).process(new CoProcessFunction<Integer, String, Tuple2<String,Integer>>() {private ValueState<List<String>> stream1Buffer = null ;private ValueState<List<String>> stream2Buffer = null ;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<List<String>> stream1BufferDesc = new ValueStateDescriptor<List<String>>("Stream1Buffer", TypeInformation.of(new TypeHint<List<String>>() {}));ValueStateDescriptor<List<String>> stream2BufferDesc = new ValueStateDescriptor<List<String>>("Stream2Buffer", TypeInformation.of(new TypeHint<List<String>>() {}));stream1Buffer = getRuntimeContext().getState(stream1BufferDesc);stream2Buffer = getRuntimeContext().getState(stream2BufferDesc);}@Overridepublic void processElement1(Integer record, CoProcessFunction<Integer, String, Tuple2<String, Integer>>.Context context, Collector<Tuple2<String, Integer>> collector) throws Exception {join(record.toString() , collector , stream2Buffer , stream1Buffer);}@Overridepublic void processElement2(String record, CoProcessFunction<Integer, String, Tuple2<String, Integer>>.Context context, Collector<Tuple2<String, Integer>> collector) throws Exception {join(record , collector , stream1Buffer, stream2Buffer);}private void join(String record , Collector<Tuple2<String, Integer>> collector , ValueState<List<String>> streamBuffered , ValueState<List<String>> streamOwer) throws IOException {List<String> buffered = streamBuffered.value();if(Objects.isNull(buffered)){buffered = new CopyOnWriteArrayList<>();}int idx = Collections.<String>binarySearch(buffered, record);if(idx>=0){String s = buffered.get(idx);buffered.remove(idx);collector.collect(new Tuple2<String,Integer>(record+" join " + s , 1));}else{buffered.add(record);streamOwer.update(buffered);}}}).print("------");env.execute();

join 的用法,JoinFunction 接口中,一次处理两个流中个一条数据,而且是笛卡尔积的方式发送给此接口计算。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);WatermarkStrategy<String> ws = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner((String data , long ts )->{return Long.parseLong(data.split(",")[2]);});KeySelector<String, String> keySelector = new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s.split(",")[0];}};SingleOutputStreamOperator<String> src1 = env.socketTextStream("127.0.0.1", 6666).assignTimestampsAndWatermarks(ws);SingleOutputStreamOperator<String> src2 = env.socketTextStream("127.0.0.1", 8888).assignTimestampsAndWatermarks(ws);src1.join(src2).where(keySelector).equalTo(keySelector).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new JoinFunction<String, String, String>() {@Overridepublic String join(String s, String s2) throws Exception {return s.concat(":").concat(s2);}}).print("----");env.execute();

interval join 的用法,当发送测试数据的时候,会比上面的 join 早触发 1 秒,因我我设置了 interval 是 1 秒

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);WatermarkStrategy<String> ws = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner((String data ,long ts )->{return Long.parseLong(data.split(",")[2]);});KeyedStream<String, String> src1 = env.socketTextStream("127.0.0.1", 6666).assignTimestampsAndWatermarks(ws).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value.split(",")[0];}});KeyedStream<String, String> src2 = env.socketTextStream("127.0.0.1", 8888).assignTimestampsAndWatermarks(ws).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value.split(",")[0];}});src1.intervalJoin(src2).between(Time.seconds(-1) , Time.seconds(1)).upperBoundExclusive().lowerBoundExclusive().process(new ProcessJoinFunction<String, String, String>() {@Overridepublic void processElement(String left, String right, Context ctx, Collector<String> out) throws Exception {out.collect(left + "-->" + right);}}).print();env.execute("test-interval-join");

coGroup 算子的用法,我使用的 tumbling time window ,时间使用的 eventtime ,当数据的最大时间戳到达了窗口的最大时间,则窗口被触发,执行 RichCoGroupFunction 接口中的计算。这里我使用了 forBoundedOutOfOrderness 的 watermark ,它里面的参数是 1 ,所有会比正常的窗口晚触发 1 秒。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);WatermarkStrategy<String> ws = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner((String data , long ts )->{return Long.parseLong(data.split(",")[1]);});SingleOutputStreamOperator<String> src1 = env.socketTextStream("127.0.0.1", 6666).assignTimestampsAndWatermarks(ws);SingleOutputStreamOperator<String> src2 = env.socketTextStream("127.0.0.1", 8888).assignTimestampsAndWatermarks(ws);src1.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s.split(",")[0] ;}}).coGroup(src2.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s.split(",")[0] ;}})).where(new KeySelector<String, String>() {@Overridepublic String getKey(String src1Data) throws Exception {return src1Data.split(",")[0] ;}}).equalTo(new KeySelector<String, String>() {@Overridepublic String getKey(String src2Data) throws Exception {return src2Data.split(",")[0] ;}}).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new RichCoGroupFunction<String, String, String>() {@Overridepublic void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> collector) throws Exception {String a = "" ;String b = "" ;for(String e : first){a+=e;}for(String e : second){b+=e;}collector.collect(a + ":" + b);}}).print("-----");env.execute();

broadcast 广播流的功能演示,下面的例子是官方文档中的例子,很简单的例子,维表关联有一个数据预加载的问题,可以将维表中的数据加载到类的本地变量中,也可以在广播流中给那些没有关联到维表的数据打标记,然后在后面的算子中将打过标记的数据发送到测流中,进行处理。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> src1 = env.socketTextStream("127.0.0.1", 6666);DataStreamSource<String> stringDataStreamSource = env.fromElements("green,good", "blue,excellant", "purple,2", "red,4");MapStateDescriptor<String,String> mapDesc = new MapStateDescriptor<String,String>("rule" ,String.class,String.class);BroadcastStream<String> broadcast = stringDataStreamSource.broadcast(mapDesc);src1.connect(broadcast).process(new BroadcastProcessFunction<String,String,String>(){private final MapStateDescriptor<String,String> mapRule =  new MapStateDescriptor<String, String>("rule",String.class , String.class);@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {String s = ctx.getBroadcastState(mapRule).get(value);out.collect("out:"+s);}@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {ctx.getBroadcastState(mapRule).put(value.split(",")[0],value.split(",")[1]);}}).print("------");env.execute();

打完收工。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_73831.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浏览器输入www.baidu.com后执行的全部过程

日升时奋斗&#xff0c;日落时自省 <1>URL输入 URL称为 : 统一资源定位符,用于定位互联网上的资源,也就是平常提起的"网址" 地址栏输入网址之后按下回车,浏览器会对输入的信息进行评判 (1)检查输入的内容是否是是一个合法的网址连接(非法地址不行) (2)合法的…

Python Unittest框架

1、unittest简介 unittest是Python自带的单元测试框架,具备编写用例、组织用例、执行用例、输出报告等自动化框架的条件,主要适用于单元测试,可以用来作自动化测试框架的用例组织执行框架。 2、unittest框架的特性: 提供用例组织与执行:当测试用例只有几条的时候可以不考虑…

Nginx 02篇——Nginx基本配置与参数说明篇

Nginx 02篇——Nginx基本配置与参数说明篇前言-默认配置文件1. 前言——关于nginx1.1 关于nginx1. 2 Nginx 01篇——Nginx安装2. Nginx 配置文件结构2.1 Nginx 安装后的默认文件2.2 Nginx 的三大组成部分3. 配置参说明-1——整个配置3.1 配置说明3.2 参考4. 配置说明-2—详细说…

postgres 源码解析51 LWLock轻量锁--2

本篇将着重讲解LWLock涉及的主要API工作流程与实现原理&#xff0c;相关基础知识见回顾&#xff1a;postgres 源码解析50 LWLock轻量锁–1 API介绍 函数API功能CreateLWLocks分配LWLocks所需的内存并进行初始化LWLockNewTrancheId分配新的Tranche ID,供用户使用Extension模块…

结构效度分析流程

结构效度分析流程如下图 一、结构效度的意义 效度分析在学术研究中非常常见&#xff0c;结构效度是为了分析“从量表获得的结果与设计该量表时所假定的理论之间的符合程度”。简单来讲&#xff0c;在研究者设计量表之初&#xff0c;一般会预设好几个维度&#xff0c;在经过因子…

kafka入门到精通

文章目录一、kafka概述&#xff1f;1.定义1.2消息队列1.2.1 传统消息队列的使用场景1.2.2 消息队列好处1.2.3 消息队列两种模式1.3 kafka基础架构二、kafka快速入门1.1使用docker-compose安装kafka1.2测试访问kafka-manager1.3 查看kafka版本号1.4 查看zookeeper版本号1.5 扩展…

python学习之OpenCV-Python模块的部分应用示例(生成素描图和动漫图)

文章目录前言一、图片转灰度二、对图片进行二值化处理三、对图片去除噪点四、调整图片透明度五、生成素描滤镜效果图&#xff08;方法结合应用&#xff09;六、生成动漫卡通滤镜效果图&#xff08;方法结合应用&#xff09;总结前言 OpenCV 是一个图像和视频处理库&#xff0c…

掌握饮食健康:了解你的宏量营养素摄入

谷禾健康 // 俗话说“病从口入”&#xff0c;我们的健康状况很大一部分取决于饮食。而食物基本上是由各种营养素构成的。 宏量营养素是人体大量需要的必需营养成分。宏量营养素指的是“三大”营养素&#xff1a;蛋白质、脂肪和碳水化合物&#xff0c;它们是我们饮食中的关键。 …

【JavaScript】基本语法大全

前言&#xff1a; 大家好&#xff0c;我是程序猿爱打拳。在学习C和Java这样的后端编程语言后&#xff0c;我们大概率会学习一些关于前端的语言如HTMLJavaScript。又因为前后端基本语法有些许不同&#xff0c;因此我整理出来。今天给大家讲解的是JS中的数据类型、运算符、选择结…

【华为OD机试模拟题】用 C++ 实现 - 最低位排序(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 货币单位换算(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 选座位(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 停车场最大距离(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 重组字符串(2023.Q1) 【华为OD机试模…

Eth-trunk :LACP模式链路聚合实战

Eth-trunk : LACP模式链路聚合实战 需求描述 PC1和PC3数据vlan10 &#xff0c;网段为192.168.10.0 /24PC2和PC4数据vlan20 &#xff0c;网段为192.168.20.0 /24确保设备之间互联互通&#xff0c;使用最大互联带宽并没有环路确保相同网段的PC可以互通判断交换机之间的每个端口…

ros下用kinectv2运行orbslam2

目录 前提 创建工作空间 orbslam2源码配置、测试&#xff1a; 配置usb_cam ROS功能包 配置kinect 前提 vim 、 cmake 、 git 、 gcc 、 g 这些一般都装了 主要是Pangolin 、 OpenCV 、 Eigen的安装 18.04建议Pangolin0.5 创建工作空间 我们在主目录下创建一个catkin_…

Node 10.0.8.6:9003 is unknown to cluster

解决方案解决方案一解决方案一 ① 概念介绍 公网ip&#xff1a;就是任意两台连接了互联网的电脑可以互相ping ip,能够通的ip 内网ip&#xff1a;只是在内网中使用无法与外网连接的ip ②问题背景 在腾讯云上搭建的一个redis集群&#xff0c;集群启动后 可以看到启动节点…

TX Text Control .NET Server for ASP.NET 31.0 SP2 CRK

用于 ASP.NET 31.0 SP2 的 TX 文本控件 .NET 服务器 用于 ASP.NET 的 TX 文本控件 .NET 服务器 TX Text Control Server for ASP.NET 是用于 Web 应用程序或服务的服务器端组件。它是一个完全可编程的 ASP.NET 文字处理器引擎&#xff0c;提供了广泛的文字处理功能。使用 TX Te…

C++中的内存管理

文章目录前言1.C中内存空间的划分2.C内存管理方式1.对内置类型的处理2.对自定义类型的处理3.new和delete实现原理4.定位new3.总结1. malloc/free和new/delete的区别2. 内存泄漏前言 C中的内存空间划分和C语言是很像的&#xff0c;基本上区别不大。但是因C中&#xff0c;引入了…

davis2016评估教程

DAVIS 2016是VOS任务中的一个经典的benchmark&#xff0c;但是一些VOT的算法有时候也可以预测mask&#xff0c;所以也会在上面测一测性能&#xff0c;本次就随手记录一下自己评测的过程&#xff0c;有需要的小伙伴可以往下看。 DAVIS 2016数据集官方项目网站&#xff1a;https:…

TCP四次挥手

TCP 四次挥手过程是怎样的&#xff1f; TCP 断开连接是通过四次挥手方式。 双方都可以主动断开连接&#xff0c;断开连接后主机中的「资源」将被释放&#xff0c;四次挥手的过程如下图&#xff1a; 客户端打算关闭连接&#xff0c;此时会发送一个 TCP 首部 FIN 标志位被置为 1…

node报错

记录bug:运行 npx -p storybook/cli sb init 时报错gyp info spawn C:\Program Files\Microsoft Visual Studio\2022\Community\MSBuild\Current\Bin\MSBuild.exegyp info spawn args [gyp info spawn args build/binding.sln,gyp info spawn args /nologo,gyp info spawn args…

prometheus + alterManager + 飞书通知,实现服务宕机监控告警;实测可用

架构设计图 最终效果图 项目准备 xml依赖 <!-- 监控相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>io.…

消息队列--Kafka

Kafka简介集群部署配置Kafka测试Kafka1.Kafka简介 数据缓冲队列。同时提高了可扩展性。具有峰值处理能力&#xff0c;使用消息队列能够使关键组件顶住突发的访问压力&#xff0c;而不会因为突发的超负荷的请求而完全崩溃。 Kafka是一个分布式、支持分区的&#xff08;partition…