Flink-转换算子

news/2024/4/28 12:22:01/文章来源:https://blog.csdn.net/dafsq/article/details/129695021

 基本转换算子

        map(映射)

        filter(过滤)

        flatMap(扁平映射)

 聚合算子

        keyBy(按键分区)

        简单聚合

        reduce(归约聚合)

UDF介绍 

        函数类

        富函数类


       数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为 新的 DataStream。一个 Flink 程序的核心,其实就是所有的转换操作,它们决 定了处理的业务逻辑。 

 基本转换算子

        map(映射)

        主要用于将数据流中的数据进行转换,形成新 的数据流。

        我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是 接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改 变。

case class Event(user: String, url: String, timestamp: Long)
object test {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))//转换需求:提取用户名//实现方式1:使用匿名函数stream.map( _.user).print("方式1:")//实现方式2:实现MapFunction接口stream.map(new UserEX).print("方式2:")//执行env.execute()}//定义实现接口类class UserEX extends MapFunction[Event,String]{override def map(t: Event): String = t.user}
}

        filter(过滤)

        filter()转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤 条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉

        进行 filter()转换之后的新数据流的数据类型与原数据流是相同的。filter()转换需要传入的 参数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回 布尔类型的条件表达式。

case class Event(user: String, url: String, timestamp: Long)
object f5 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))//过滤出用户名为王五的点击事件//方式1:使用匿名函数stream.filter(_.user == "王五").print("方式1:")//方式2:使用匿名函数stream.filter(new UserF).print("方式2:")//执行env.execute()}//定义实现接口类class UserF extends FilterFunction[Event] {override def filter(t: Event): Boolean = t.user == "王五"}
}

        flatMap(扁平映射)

        flatMap()操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个 一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap()可以认为是“扁平化” (flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分, 再对拆分后的元素做转换处理。

        同 map()一样,flatMap()也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方 式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流的数据类型相同,也 可以不同。

case class Event(user: String, url: String, timestamp: Long)
object f5 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))//测试灵活的输出形式:如果点击用户为李四 则输出李四stream.flatMap(new UserF).print()//执行env.execute()}//定义实现接口类class UserF extends FlatMapFunction[Event,String] {override def flatMap(t: Event, collector: Collector[String]): Unit = {//如果当前数据是Mary的点击事件则直接输出userif(t.user == "李四"){collector.collect(t.user)}}}
}

 聚合算子

        keyBy(按键分区)

        对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合 肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区; 这个操作就是通过 keyBy()来完成的。

        keyBy()是聚合前必须要用到的一个算子。keyBy()通过指定键(key),可以将一条流从逻 辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对 应着任务槽(task slots)。

        基于不同的 key,流中的数据将被分配到不同的分区中去,如图 5-8 所示;这样一来,所 有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot 中进行处理了。

case class Event(user: String, url: String, timestamp: Long)
object f6 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))//按键分区//1、接口实现stream.keyBy( new MyKeyb ).print()//匿名函数实现stream.keyBy( k => k.user)//执行env.execute()}//定义实现接口类class MyKeyb extends  KeySelector[Event,String]{override def getKey(in: Event): String = in.user}
}

        简单聚合

        有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种: 

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计 算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包 含字段最小值的整条数据。 
  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与 min()/minBy()完全一致。
object f6 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("张三", "01", 6000L),Event("李四", "04", 2000L),Event("李四", "05", 6000L),Event("王五", "01", 1000L),Event("赵六", "03", 1000L))//使用 user 作为分组的字段,并计算最大的时间戳stream.keyBy( new MyKeyb ).maxBy("timestamp").print()//执行env.execute()}//定义实现接口类class MyKeyb extends  KeySelector[Event,String]{override def getKey(in: Event): String = in.user}
}

        reduce(归约聚合)

与简单聚合类似,reduce()操作也会将 KeyedStream 转换为 DataStream。它不会改变流的 元素数据类型,所以输出类型和输入类型是一样的。

         调用 KeyedStream 的 reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。接 口在源码中的定义如下:

case class Event(user: String, url: String, timestamp: Long)
object f6 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("张三", "01", 6000L),Event("李四", "04", 2000L),Event("李四", "05", 6000L),Event("王五", "01", 1000L),Event("赵六", "03", 1000L))//归约聚合:统计用户点击页面的数量,提取当前最活跃的用户stream.map( d => (d.user,1L)).keyBy(_._1).reduce( new MyS) //统计每个用户的活跃度.keyBy( d => true) //将所有的数据按照同样的key分到同一个组中.reduce((a,b) => if(b._2 > a._2) b else a) //选取当前最活跃的用户.print() //输出结果//执行env.execute()}//定义实现接口类class MyS extends ReduceFunction[(String,Long)]{override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = (t._1,t._2+t1._2)}
}

UDF介绍 

        函数类

对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口, 来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类, 例如 MapFunction、FilterFunction、ReduceFunction 等。所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。

下面例子实现了 FilterFunction 接口,用来筛选 url 中包含“01”的内容:

object f7 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("张三", "01", 6000L),Event("李四", "04", 2000L),Event("李四", "05", 6000L),Event("王五", "01", 1000L),Event("赵六", "03", 1000L))//测试自定义函数类UDF的用法:筛选url中包含关键字“01”的事件//1、实现一个自定义的函数类stream.filter( new MyFFun).print()//2、使用匿名类 筛选url中包含关键字“04”的事件stream.filter( new FilterFunction[Event] {override def filter(t: Event): Boolean = t.url.contains("04")}).print()//3、使用匿名函数stream.filter(d => d.url.contains("05"))//执行env.execute()}//定义实现接口类class MyFFun extends FilterFunction[Event]{override def filter(t: Event): Boolean = t.url.contains("01")}
}

        富函数类

        “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、 RichReduceFunction 等。 

        与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能,典型的生命周期方法有:

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当 一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调 用。所以像文件 IO 流的创建,数据库连接的创建,配置文件的读取等等这样一次性 的工作,都适合在 open()方法中完成。
  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一 些清理工作。 
object f8 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("张三", "01", 6000L),Event("李四", "04", 2000L),Event("李四", "05", 6000L),Event("王五", "01", 1000L),Event("赵六", "03", 1000L))//自定义一个RichMapFunction 测试富函数类的功能stream.map(new MyRichMap).print()//执行env.execute()}//定义实现接口类class MyRichMap extends RichMapFunction[Event,Long]{override def open(parameters: Configuration): Unit = {println("索引号为:"+ getRuntimeContext.getIndexOfThisSubtask + "的任务开始")}override def map(in: Event): Long = in.timestamp //输出一个长整型的时间戳override def close(): Unit = {println("索引号为:"+ getRuntimeContext.getIndexOfThisSubtask + "的任务结束")}}
}

         适合使用的场景

class MyFlatMap extends RichFlatMapFunction[IN,OUT]{override def open(parameters: Configuration): Unit = {// 做一些初始化工作// 例如建立一个和 MySQL 的连接}override def flatMap(value: IN, out: Collector[OUT]): Unit = {// 对数据库进行读写}override def close(): Unit = {// 清理工作,关闭和 MySQL 数据库的连接。}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_274494.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Neodynamic EPLPrinter SDK 2.0 for .NET Crack

Neodynamic EPLPrinter Emulator SDK for .NET Standard V2.0 添加对 FK&#xff08;删除表单&#xff09;、FR&#xff08;检索表单&#xff09;和 FS&#xff08;存储表单&#xff09;表单相关命令的支持。 21月 2023&#xff0c; 10 - 34&#xff1a;<>新版本 特征…

如何在24小时内让你的网站跻身谷歌前列?

在当今互联网时代&#xff0c;拥有一个排名靠前的网站对于企业来说非常重要&#xff0c;因为这意味着更多的流量和更高的曝光率。 而谷歌&#xff08;Google&#xff09;是全球最受欢迎的搜索引擎之一&#xff0c;因此在谷歌的搜索结果中排名靠前非常重要。 那么如何在24小时…

tomcat服务器前端部署【Tomcat Manager、思路分析】

问题描述 当前需要我进行前端代码的部署&#xff0c;但是我忘记了这个系统对应的部署位置&#xff0c;但是隐约记得好像是通过tomcat部署的。 然后当时为了方便部署&#xff0c;我们打开了Tomcat Manager 以下是基于Tomcat Manager的&#xff0c;没有打开的需要前往tomcat下载…

详解:企业知识管理的制作步骤!

随着信息技术的快速发展&#xff0c;企业面临着海量的信息和知识&#xff0c;如何管理和利用这些信息和知识&#xff0c;已经成为企业发展的重要问题。知识管理是一种管理方法和技术&#xff0c;旨在帮助企业有效地管理和利用知识资产&#xff0c;提高企业的创新能力和竞争力。…

【CSS】浮动 ② ( 浮动语法简介 | 文字环绕效果 | 左浮动 | 右浮动 )

文章目录一、浮动语法简介1、语法说明2、没有浮动的效果3、左浮动的效果4、右浮动的效果5、右浮动 外边距效果二、完整代码示例一、浮动语法简介 1、语法说明 为 元素 设置了 浮动 CSS 属性 , 可以实现 : 元素标签 不再受 标准流 控制 ; ( 块级元素 , 行内元素 , 行内块元素 …

【嵌入式Linux学习笔记】platform设备驱动和input子系统

对于Linux这种庞大的操作系统&#xff0c;代码重用性非常重要&#xff0c;所以需要有相关的机制来提升效率&#xff0c;去除重复无意义的代码&#xff0c;尤其是对于驱动程序&#xff0c;所以就有了platform和INPUT子系统这两种工作机制。 学习视频地址&#xff1a;【正点原子…

【JavaSE】泛型中的通配符

文章目录1. 概述2. 上界通配符 < ? extends E>3. 下界通配符 < ? super E>3. &#xff1f;和 T 的区别1. 概述 Java 泛型&#xff08;generics&#xff09;是 JDK 5 中引入的一个新特性, 泛型提供了编译时类型安全检测机制&#xff0c;该机制允许开发者在编译时…

【QT神奇Bug】中文乱码、括号乱码、冒号乱码【2023.03.22】

&#x1f60d;Qt乱码疑难杂症解决方案 Solved by Yang Naifen. &#x1f4fa;视频讲解地址&#xff1a;【Qt疑难杂症之乱码-哔哩哔哩】 https://b23.tv/83MmXru 附言&#xff1a;解决这个bug按照我当前的薪资&#xff0c;至少四百RMB。都是工农阶级的工友&#xff0c;有bug一…

本地调试Java程序时只对部分接口忽略代理

场景 今天有位朋友问了个问题&#xff0c;在本地IDE开发工具调试代码的时候&#xff0c;怎么不动代码的情况只针对部分API走proxy&#xff0c;因为他们的代码只需要在本地调试的时候才要用到Proxy&#xff0c;而平时都是部署在云上&#xff0c;是用不到Proxy的&#xff0c;所以…

JDBC基础,介绍了简单的连接数据库,并通过在后端写SQL语句对数据库进行基本的增删查改操作

一、JDBC基础 跟数据库连接&#xff0c;并且可以对数据库里面的数据通过SQL语句进行处理等操作。 1.1 JDBC JDBC是SUN公司的&#xff0c;所以要按照他们的规范来&#xff0c;因为MYSQL和Oracle都是SUN公司的。三个产品都是一个公司的&#xff0c;一般不会出现兼容性不好的问…

智能手机2023:高端前攻、中端后守

配图来自Canva可画 沉寂许久的行业&#xff0c;终于在疫情之后迎来了久违的舞台&#xff0c;MWC线下展会三年来第一次召开。2月27日至3月2日&#xff0c;2023年世界移动通讯大会如期在巴塞罗那举行&#xff0c;国内一众手机厂商们纷纷登台亮相、大秀肌肉。与以往相比&#xff…

Rocketmq-Mqtt 开发实例

一、RocketMQ MQTT 概览传统的消息队列MQ主要应用于服务&#xff08;端&#xff09;之间的消息通信&#xff0c;比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下&#xff0c;还有一个非常重要且常见的消息领域&#xff0c;即IoT类终端设备消息。近些年&…

Tomcat源码:启动类Bootstrap与Catalina的加载

参考资料&#xff1a; 《Tomcat源码解析系列&#xff08;一&#xff09;Bootstrap》 《Tomcat源码解析系列&#xff08;二&#xff09;Catalina》 《Tomcat - 启动过程&#xff1a;初始化和启动流程》 《Tomcat - 启动过程:类加载机制详解》 《Tomcat - 启动过程:Catalina…

不用科学上网,免费的GPT-4 IDE工具Cursor保姆级使用教程

大家好&#xff0c;我是可乐。 过去的一周&#xff0c;真是疯狂的一周。 GPT-4 震撼发布&#xff0c;拥有了多模态能力&#xff0c;不仅能和GPT3一样进行文字对话&#xff0c;还能读懂图片&#xff1b; 然后斯坦福大学发布 Alpaca 7 B&#xff0c;性能匹敌 GPT-3.5&#xff…

易基因:PIWI/piRNA在人癌症中的表观遗传调控机制(DNA甲基化+m6A+组蛋白修饰)|综述

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。2023年03月07日&#xff0c;南华大学衡阳医学院李二毛团队在《Molecular Cancer》杂志发表了题为“The epigenetic regulatory mechanism of PIWI/piRNAs in human cancers”的综述文章&am…

数据处理时代,绕不开的数据分析

数据分析的出现是因为人类难以理解海量数据所呈现出来的信息&#xff0c;不能从中找到相应的规律来对现实中的事物进行对应&#xff0c;我们都知道数据有很高的价值&#xff0c;但不能利用的价值&#xff0c;没有任何意义。 为了解决这一问题&#xff0c;数据分析在长期的数据…

Golang每日一练(leetDay0012)

目录 34. 查找元素首末位置 Find-first-and-last-position-of-element-in-sorted-array &#x1f31f;&#x1f31f; 35. 搜索插入位置 Search Insert Position &#x1f31f; 36. 有效的数独 Valid Sudoku &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 …

[vue问题]Uncaught SyntaxError: Not available in legacy mode

[vue问题]Uncaught SyntaxError: Not available in legacy mode问题描述问题分析解决方案直接回退vue-i18n的版本解决错误提示的问题问题描述 Uncaught SyntaxError: Not available in legacy modeat Object.createCompileError (message-compiler.cjs.js?af13:58:1)at creat…

GTC 2023 | 「皮衣刀客」黄仁勋畅谈 AI Top 5,科学计算、生成式 AI、Omniverse 榜上有名

内容一览&#xff1a;北京时间 3 月 21 日 23:00&#xff0c;英伟达创始人兼 CEO 黄仁勋在 GTC 2023 上发表主题演讲&#xff0c;介绍了生成式 AI、元宇宙、大语言模型、云计算等领域最新进展。 关键词&#xff1a;英伟达 黄仁勋 GTC 2023 「Don’t Miss This Defining Momen…

辉煌优配|沪指震荡涨0.25%,建筑、家居等板块拉升,数字经济概念活跃

22日早盘&#xff0c;两市股指盘中强势上扬&#xff0c;接近午盘涨幅收窄&#xff1b;两市半日成交近6000亿元&#xff0c;北向资金小幅净流出。 到午间收盘&#xff0c;沪指涨0.25%报3263.85点&#xff0c;深成指涨0.39%&#xff0c;创业板指微跌0.01%&#xff0c;两市合计成交…