【Flink】基本转换算子使用之fliter、flatMap,键控流转换算子和分布式转换算子

news/2024/5/15 11:52:34/文章来源:https://blog.csdn.net/weixin_43923463/article/details/127960141

文章目录

  • 一 Flink DataStream API
    • 1 基本转换算子的使用
      • (1)fliter
        • a 使用匿名类实现
        • b 使用外部类函数实现
        • b 使用flatMap实现
      • (2)flatMap
        • a 使用匿名类实现
        • b 使用匿名函数实现
    • 2 键控流转换算子
      • (1) keyBy
      • (2)滚动聚合
      • (3)reduce
      • (4)案例
    • 3 分布式转换算子
      • (1)Random
      • (2)Round-Robin
      • (3)Rescale
      • (4)Broadcast(常用)
      • (5)Global
      • (6)Custom

一 Flink DataStream API

1 基本转换算子的使用

基本转换算子的定义:作用在数据流中的每一条单独的数据上的算子。基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。

(1)fliter

在这里插入图片描述

a 使用匿名类实现

public class Example3 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> stream = env.addSource(new ClickSource());stream.filter(r -> r.user.equals("Marry")).print();stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("Marry");}}).print();env.execute();}

b 使用外部类函数实现

stream.filter(new MyFilter()).print();
public static class MyFilter implements FilterFunction<Event>{@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("Marry");}
}

b 使用flatMap实现

stream.flatMap(new FlatMapFunction<Event, Event>() {@Overridepublic void flatMap(Event event, Collector<Event> collector) throws Exception {if(event.user.equals("Marry"))collector.collect(event);}}).print();

输入一条元素,输出1个结果使用map,输出0 或 1 个结果使用filter,针对每一个条数据输出0 1 或者多个结果使用flatmap。

(2)flatMap

a 使用匿名类实现

public class Example4 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 将white直接输出,将black复制,将gray过滤DataStreamSource<String> stream = env.fromElements("white", "black", "gray");// 当每一条数据进入到flatMap算子时,就会触发flatMap的调用// 在flink中程序只是定义了一个有向无环图,需要事件去驱动它的运行stream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {if(s.equals("white")){collector.collect(s);}else if(s.equals("black")){collector.collect(s);collector.collect(s);}}}).print();env.execute();}
}

b 使用匿名函数实现

stream.flatMap((String s,Collector<String> collector) -> {if(s.equals("white")){collector.collect(s);}else if(s.equals("black")) {collector.collect(s);collector.collect(s);}})// Collector<String>会被擦除.returns(Types.STRING).print();

2 键控流转换算子

很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。DataStream API 提供了一个叫做 KeyedStream 的抽象,此抽象会从逻辑上对 DataStream 进行分区,分区后的数据拥有同样的 Key 值,分区后的流互不相关。

针对 KeyedStream 的状态转换操作可以读取数据或者写入数据到当前事件 Key 所对应的状态中。这表明拥有同样 Key 的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。KeyedStream 可以使用 map,flatMap 和 filter 算子来处理。

在这里插入图片描述

DataStream KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。从逻辑上分区去对这些数据进行处理,物理上的位置无关紧要。不过最终同一个Key中的数据一定在一个任务槽中,这样会出现数据倾斜的问题。

(1) keyBy

keyBy 通过指定 key 来将 DataStream 转换成 KeyedStream。基于不同的 key,流中的事件将被分配到不同的分区中去。所有具有相同 key 的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同 key 的事件可以在同一个任务中处理。但是算子只能访问当前事件的 key 所对应的状态。keyBy() 方法接收一个参数,这个参数指定了 key 或者 keys,有很多不同的方法来指定 key。

如之前使用的匿名类方式,针对每一条数据指定key。

	KeyedStream<WordWithCount, String> keyedStream = mappedStream// 第一个泛型:流中元素的泛型// 第二个泛型:key的泛型.keyBy(new KeySelector<WordWithCount, String>() {public String getKey(WordWithCount value) throws Exception {return value.word;}});

只要存在分组,就一定存在聚合,所以提出了滚动聚合的概念。

(2)滚动聚合

滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。DataStream API 提供了以下滚动聚合方法。

  • sum():在输入流上对指定的字段做滚动相加操作。
  • min():在输入流上对指定的字段求最小值。
  • max():在输入流上对指定的字段求最大值。
  • minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
  • maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

如以下例子按照key进行分组并聚合:

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Integer, Integer>> stream = env.fromElements(Tuple2.of(1, 3),Tuple2.of(1, 4),Tuple2.of(2, 3));// 逻辑上进行分流KeyedStream<Tuple2<Integer, Integer>, Integer> keyedStream = stream.keyBy(r -> r.f0);// 针对第一个位置进行聚合keyedStream.sum(1).print();env.execute();
}

输出结果也同样体现出滚动聚合的效果:

(1,3)
(1,7)
(2,3)

(3)reduce

reduce 算子是滚动聚合的泛化实现。它将一个 ReduceFunction 应用到了一个 KeyedStream 上面去。reduce 算子将会把每一个输入事件和当前已经 reduce 出来的值做聚合计算。reduce 操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

reduce 函数可以通过实现接口 ReduceFunction 来创建一个类。ReduceFunction 接口定义了 reduce() 方法,此方法接收两个输入事件,输出一个相同类型的事件。

如下同样可以实现sum的功能。

keyedStream.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {return Tuple2.of(value1.f0,value1.f1 + value2.f1);}
}).print();

(4)案例

求整数的平均值。

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new SourceFunction<Integer>() {private boolean running = true;private Random random = new Random();@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (running){ctx.collect(random.nextInt(10));Thread.sleep(100);}}@Overridepublic void cancel() {running = false;}}).map(r -> Tuple2.of(r,1)).returns(Types.TUPLE(Types.INT,Types.INT))// reduce必须在keyBy之后使用// 如果想在一条流上直接使用滚动聚合// 将所有数据shuffle到同一个逻辑分区.keyBy(r -> true).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {return Tuple2.of(value1.f0 + value2.f0,value1.f1 + value2.f1);}}).map(new MapFunction<Tuple2<Integer, Integer>, Object>() {@Overridepublic Object map(Tuple2<Integer, Integer> value) throws Exception {return (double) value.f0 / value.f1;}}).print();env.execute();
}

总结:滚动聚合的要点在于每一个Key都有自己的累加器(状态变量),一条数据来到处理完成之后就丢弃了,向下游发送的数据是累加器中的数据,这样就不需要将所有的数据都保存下来,节省内存空间,性能高于批处理。

scala中为什么会出现伪递归:在纯正的函数式编程中是没有循环的,那么如何实现循环的功能呢?使用递归!那么使用递归又带来了一个问题,递归的栈会超过内存,造成内存溢出Stack Overflow,那么伪递归用新来的栈去覆盖原有的栈,栈的深度不变,所以可以使用伪递归来模拟循环,伪递归当中有累加器的存在。

3 分布式转换算子

分区操作对应于之前的“数据交换策略”。这些操作定义了事件如何分配到不同的任务中去。当使用 DataStream API 来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。

有些时候,当需要在应用程序的层面控制分区策略,或者自定义分区策略时。例如,如果我们知道会发生数据倾斜,那么想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,需要自定义分区策略的时候。

keyBy() 方法不同于分布式转换算子。所有的分布式转换算子将产生 DataStream 数据类型。而 keyBy() 产生的类型是 KeyedStream,它拥有自己的 keyed state。

综上,分布式转换算子可以对数据进行物理分区,也就是说可以将数据分配到不同的任务槽中。

(1)Random

Random随机数据交换由 DataStream.shuffle() 方法实现。shuffle 方法将数据随机的分配到下游算子的并行任务中去,可以将数据分配到不同的任务槽中。

	env.fromElements(1,2,3,4).setParallelism(1).shuffle().print("shuffle: ").setParallelism(2);

运行结果如下:第一任务槽中数据为1和3,第二个任务槽中数据为2和4。

shuffle: :1> 1
shuffle: :1> 3
shuffle: :2> 4
shuffle: :2> 2

(2)Round-Robin

rebalance() 方法使用 Round-Robin 负载均衡算法将输入流平均分配到随后的并行运行的任务中去。

	env.fromElements(1,2,3,4).setParallelism(1).rebalance().print("rebanlance: ").setParallelism(2);

(3)Rescale

rescale()方法使用的也是round-robin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时,rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时,rescale 操作将会效率更高。

rebalance() 和 rescale() 的根本区别在于任务之间连接的机制不同。rebalance() 将会针对所有发送者任务和所有接收者任务之间建立通信通道,而 rescale() 仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。

两者的示意图如下:

在这里插入图片描述

(4)Broadcast(常用)

broadcast() 方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。

	env.fromElements(1,2,3,4).setParallelism(1).broadcast().print("broadcast: ").setParallelism(2);

(5)Global

global() 方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个 task,将会对应用程序造成很大的压力。

(6)Custom

当 Flink 提供的分区策略都不适用时,我们可以使用 partitionCustom() 方法来自定义分区策略。这个方法接收一个 Partitioner 对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者 key 来进行分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_225329.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中国互联网众筹行业

近些年&#xff0c;中国互联网发展迅速&#xff0c;众筹这种起源于美国的新型互联网金融模式更是一直处于风口浪尖。在“大众创业、万众创新”的背景下&#xff0c;这种低门槛的融资模式也深受欢迎&#xff0c;加上阿里、京东、苏宁三大电商的巨头的相继入场&#xff0c;更令这…

IMS各网元的主要功能

文章目录用户注册时&#xff1a; 手机发出一个注册消息到他所在的拜访地的P。 比如&#xff0c;他是山西太原的用户&#xff0c;他这时候到了北京&#xff0c;那么这个时候&#xff0c;他要注册到IMS网络里面的话&#xff0c;这个P-CSCF就是北京的P-CSCF&#xff0c;这个北京的…

CAS号:376364-38-4,rCRAMP (rat)

rCRAMP (rat) 是一种大鼠组织蛋白酶相关的抗菌肽&#xff0c;有助于大鼠脑肽/蛋白质提取物的抗菌活性。rCRAMP (rat) 是大鼠中枢神经系统先天免疫系统的关键参与者。rCRAMP (rat) is the rat cathelin-related antimicrobial peptide. rCRAMP (rat) contributes to the antibac…

Kotlin 开发Android app(十一):Android控件RecyclerView

Android 中的控件非常的丰富&#xff0c;我们会陆陆续续的进行介绍&#xff0c;从第九节开始&#xff0c;关于Kotlin 的语法特性就差不多结束&#xff0c;后面如果有发现需要说明的语法&#xff0c;再进行相关的补充。 在Android的控件中&#xff0c;RecyclerView算是一个大控…

从 Uber 数据泄露事件我们可以学到什么?

Uber 数据泄露始于一名黑客从暗网市场购买属于一名 Uber 员工的被盗凭证。最初尝试使用这些凭据连接到 Uber 的网络失败&#xff0c;因为该帐户受 MFA 保护。为了克服这一安全障碍&#xff0c;黑客通过 What’s App 联系了 Uber 员工&#xff0c;并假装是 Uber 的安全人员&…

OA系统,有效提升企业办公效率落实执行力

企业管理的成功将最终取决于企业的执行情况&#xff0c;只要有良好的经营管理&#xff0c;管理系统&#xff0c;一个好的领导者&#xff0c;充分调动员工的积极性&#xff0c;将能最大限度的管理执行力。 OA协同办公系统提供了工作流和协同工作互补结合。工作流程严格规定了工作…

大数据面试题(四):Yarn核心高频面试题

文章目录 Yarn核心高频面试题 一、简述Hadoop1与Hadoop2的架构异同 二、为什么会产生yarn&#xff0c;它解决了什么问题&#xff0c;有什么优势&#xff1f; 三、HDFS的数据压缩算法&#xff1f;及每种算法的应用场景&#xff1f; 1、gzip压缩 2、Bzip2压缩 3、Lzo压缩 …

为什么 NGINX 的 reload 不是热加载?

作者&#xff1a;刘维 这段时间在 Reddit 看到一个讨论&#xff0c;为什么 NGINX 不支持热加载&#xff1f;乍看之下很反常识&#xff0c;作为世界第一大 Web 服务器&#xff0c;不支持热加载&#xff1f;难道大家都在使用的 nginx -s reload 命令都用错了&#xff1f; 带着这个…

数据治理系列:数仓建模之数仓主题与主题域

背景&#xff1a; 数据仓库之父 Bill Inmon 将数据仓库描述为一个面向主题的、集成的、稳定的、反应历史变化的数据集合&#xff0c;用于支持管理者的决策过程。 从上面的引言里面&#xff0c;我们其实可以知道主题在数仓建设里面绝对是很重要的一环&#xff0c;这的确是的。…

【计算机网络】HTTP/HTTPS协议基础知识汇总

目录 1.URL&#xff1a; 2.HTTP协议&#xff1a; 2.1抓包工具&#xff08;这里用fiddler&#xff09;&#xff1a; 2.2请求和响应的格式&#xff1a; 2.3方法的介绍&#xff1a; 2.4请求报头&#xff08;header&#xff09;&#xff1a; 2.5状态码&#xff1a; 2.6响应…

antd——使用a-tree组件实现 检索+自动展开+自定义增删改查功能——技能提升

之前写后台管理系统时&#xff0c;遇到一个下面的需求&#xff0c;下面是最终完成的效果图。 实现的功能有&#xff1a; 1. 下拉 选择不同的类型——就是一个普通的select组件&#xff0c;下面并不做介绍 2. 通过关键字可以进行tree树形结构的筛选&#xff0c;然后将筛选后的…

Python_数据容器_元组tuple

一、元组tuple定义 为什么需要元组 列表是可以修改的&#xff0c;如果想要传递的信息不被篡改&#xff0c;列表就不适合了 元组和列表一样&#xff0c;都是可以封装多个不同类型的元素在内 最大的不同点在于&#xff1a; 元祖一旦定义完成&#xff0c;就不可修改 所以&am…

LabVIEW使用Desktop Execution Trace工具包

LabVIEW使用Desktop Execution Trace工具包 可以使用桌面执行跟踪工具包来调试和优化大型LabVIEW应用程序&#xff0c;包括具有多个循环的应用程序、客户端-服务器架构、动态加载VI等。该工具包从本地或远程计算机桌面上运行的应用程序捕获执行事件&#xff0c;并在表窗格中显…

聊一聊如何截获 C# 程序产生的日志

一&#xff1a;背景 1.讲故事 前段时间分析了一个dump&#xff0c;一顿操作之后&#xff0c;我希望用外力来阻止程序内部对某一个com组件的调用&#xff0c;对&#xff0c;就是想借助外力实现&#xff0c;如果用 windbg 的话&#xff0c;可以说非常轻松&#xff0c;但现实情况…

48种数据分析可视化图表

可视化对于数据分析师来说可能不是最重要的&#xff0c;重要的是你分析或挖掘出来的结果是否有效。在这基础之上就需要通过可视化恰当完整的表达见解。这里又有区别了&#xff1a;实用性和美观性哪个更重要&#xff1f;要我说实用性是第一位的&#xff0c;能用一个元素表达最好…

MySQL数据库:2、MySQL的下载与安装、基本使用、系统服务制作

一、MySQL简介 ​ MySQL是一种关系型数据库管理系统&#xff0c;关系数据库将数据保存在不同的表中&#xff0c;而不是将所有数据放在一个大仓库内&#xff0c;这样就增加了速度并提高了灵活性。 二、安装与下载 1、下载流程 1、访问官方&#xff08;www.mysql.com&#xf…

查询利器—索引

目录 索引的优缺点 常见索引分类 MySQL数据操作的宏观过程 认识磁盘 正式理解索引结构 采用B树的原因 聚簇索引与非聚簇索引 索引操作 索引创建原则 索引的优缺点 优点&#xff1a;提高一个海量数据的检索速度 缺点&#xff1a;查询速度的提高是以插入、更新、删除…

二进制逻辑运算和基本门电路

目录 基本门电路很重要&#xff0c;做内存扩展片选译码的时候会常用 一&#xff1a;逻辑非&#xff08;按位取反&#xff09; not 二&#xff1a;逻辑乘&#xff08;逻辑与&#xff09;按位求“与” 有零出零 and 三&#xff1a;逻辑或&#xff08;逻辑加) 有1出1 …

Unity VR 开发教程: Oculus 一体机开发 (一) 环境配置(基于 Oculus Integration v46)

文章目录&#x1f4d5;教程说明&#x1f4d5;安装 Unity 时需要添加的模块&#x1f4d5;设置 Unity 的 Build Settings&#x1f4d5;导入 Oculus Integration&#x1f4d5;设置 Project Settings⭐通用设置⭐Rendering 设置⭐Identification 设置⭐Configuration 设置⭐XR Plug…

【爬虫进阶】易班登录加密逆向

目录前言分析代码过程結果完整代码前言 demo比较简单&#xff0c;逆向难点&#xff1a;rsa加密&#xff0c;图片验证码 分析 我们模拟登录&#xff0c;请求一下 红框内是我们提交的参数&#xff0c;password看上去应该是rsa加密&#xff0c;captcha是验证码&#xff0c;key…