【Flink】Flink 处理函数之基本处理函数(一)

news/2024/4/29 13:48:59/文章来源:https://blog.csdn.net/sdut406/article/details/136886394

1. 处理函数介绍

流处理API,无论是基本的转换聚合、还是复杂的窗口操作,都是基于DataStream进行转换的,所以统称为DataStreamAPI,这是Flink编程的核心。

但其实Flink为了更强大的表现力和易用性,Flink本身提供了多层API,DataStreamAPI只是中间一环,如下图所示:在这里插入图片描述
在更底层,Flink可以不定义任何具体的算子(比如 mapfilter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

在处理函数中,操作的就是数据流中最基本的元素:数据事件(event)状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

2. 处理函数的分类

DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。

Flink 提供了 8 个不同的处理函数:

  • ProcessFunction
    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
  • KeyedProcessFunction
    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream
  • ProcessWindowFunction
    开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入。
  • ProcessAllWindowFunction
    同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
  • CoProcessFunction
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
  • ProcessJoinFunction
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  • BroadcastProcessFunction
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream与一个广播流(BroadcastStream)连接(conncet)之后的产物。
  • KeyedBroadcastProcessFunction
    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream广播流(BroadcastStream)做连接之后的产物。

2.1 基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。在Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作 ProcessFunction

2.1.1 处理函数的功能和使用

转换算子一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如Map算子只能获取当前的数据;而想窗口聚合复杂的操作AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现)。另外还有富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度任务名称之类的运行时信息。

但无论那种算子,如果想要访问事件的时间戳,或者当前的水位线信息,都是获取不到的。但是处理函数可以获取,处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)时间戳(timestamp)水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunctionMyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

代码实例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stream.process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {out.collect(value.toString());if (value.getUser().equals("Mary")) {out.collect(value.user + "click " + value.getUrl());} else if (value.getUser().equals("Alice")) {out.collect(value.user);out.collect(value.user);}System.out.println("timestamp:" + ctx.timestamp());System.out.println("watermark:" + ctx.timerService().currentWatermark());System.out.println(getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}}).print();env.execute();}

运行结果:
在这里插入图片描述

这里第一次的水位线的值其实是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
在这里插入图片描述

然后每次下一次的水位线都是上一次的timestamp - 1

2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...
public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
...
}
2.1.2.1 抽象方法.processElement()

用于处理元素,定义了处理的核心逻辑。这个方法对流中的每个元素都会调用一次,参数包括三个: 输入数据值 value上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value: 当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • cts:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()

Context 抽象类定义如下:

public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);
}
  • out: “收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

ProcessFunction 可以轻松实现flatMap这样的基本转换功能(当然 mapfilter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

2.1.2.2 非抽象方法.onTimer()
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);
}

用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp)上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。

处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用的是.processElement()方法,而处理水位线事件调用的是.onTimer()

.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1028157.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何配置本地ssh连接远程Linux服务器

1.条件 本地操作系统Ubuntu远程服务器&#xff08;Linux都可以&#xff09; 本地如果是Window,其实也一样&#xff0c;但是需要先下载ssh和putty工具&#xff0c;然后操作步骤是一样的 2.生成ssh公私钥对 # 在本地重新生成SSH公私钥对非常简单&#xff0c;在你的命令行终端&a…

DeepMind终结大模型幻觉?标注事实比人类靠谱、还便宜20倍,全开源

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了人工智能中文站https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源​ 发布在https://it.weoknow.com 更多资源欢迎关注 ​ DeepMind 这篇论文一出&#xff0c;人类标注者的饭碗也要被砸了吗&a…

2.3 Mac OS安装Python环境

Mac OS安装Python环境 和 Linux 发行版类似&#xff0c;最新版的 Mac OS X 也会默认自带 Python 2.x。 我们可以在终端&#xff08;Terminal&#xff09;窗口中输入python命令来检测是否安装了 Python 开发环境&#xff0c;以及安装了哪个版本&#xff0c;如下所示&#xff1…

探索生成式AI Agent,让公众自动化触手可及

在科技浪潮的推动下&#xff0c;AI Agent市场正经历深刻变革。Kognitos智能RPA厂商凭借675万美元融资和生成式AI自动化的定位&#xff0c;吸引业界关注。然而&#xff0c;微软早已将ChatGPT融入Power Platform&#xff0c;提供低代码应用开发体验&#xff0c;引领市场。初创公司…

小白入门级教程:R语言lavaan结构方程模型(SEM)

查看原文>>>最新基于R语言lavaan结构方程模型&#xff08;SEM&#xff09;实践技术应用 目录 专题一&#xff1a;R/Rstudio简介及入门 专题二&#xff1a;结构方程模型&#xff08;SEM&#xff09;介绍 专题三&#xff1a; lavaan包讲解及应用案例 专题四&#x…

常用类(String)

目录 字符串相关的类1.1、String类的概述1.2、理解String的不可变性1.3、String不同实例化方式的对比1.4、String不同拼接操作的对比1.4.1、String使用陷阱 1.5、String的常用方法1.6、String与基本数据类型、包装类、char[]、byte[]的转换1.7、StringBuffer和StringBuilder的介…

衰老抑制剂原知因起源金NMN热销,“海弗里克极限”将被打破?

美国著名生物学家列奥纳多 海弗里克 , 在 1961 年研究人类胎儿的细胞群体分裂次数时提出了著名的 " 海弗里克极限 " 理论。该理论认为 , 正常细胞分裂的周期是 2-3 年 , 分裂次数大概是 50 次 , 得出人类的极限寿命高达 150 岁。半个世纪后 , 世界上最长寿的人 , 打…

文献速递:文献速递:基于SAM的医学图像分割--SAM-Med3D

Title 题目 SAM-Med3D 01 文献速递介绍 医学图像分析已成为现代医疗保健不可或缺的基石&#xff0c;辅助诊断、治疗计划和进一步的医学研究]。在这一领域中最重要的挑战之一是精确分割体积医学图像。尽管众多方法在一系列目标上展现了值得称赞的有效性&#xff0c;但现有的…

3月份的倒数第二个周末有感

坐在图书馆的那一刻&#xff0c;忽然感觉时间的节奏开始放缓。今天周末因为我们两都有任务需要完成&#xff0c;所以就选了嘉定图书馆&#xff0c;不得不说嘉定新城远香湖附近的图书馆真的很有感觉。然我不经意回想起学校的时光&#xff0c;那是多么美好且短暂的时光。凝视着窗…

创建多节点 k8s 集群

主机IP系统master192.168.2.15ubuntu20.04 x64 2C 4GWorker1192.168.2.16ubuntu20.04 x64 2C 4GWorker1192.168.2.18ubuntu20.04 x64 2C 4G 使用 iterm2 连接四台服务器 command shift i 同时操作 初始化配置 关闭防火墙 systemctl stop firewalld systemctl disable firewa…

Pixelmator Pro:专业级图像编辑,触手可及mac版

Pixelmator Pro是一款功能强大的图像编辑软件&#xff0c;专为Mac操作系统设计。它拥有直观的界面和丰富的工具&#xff0c;能够满足用户各种图像处理需求。 Pixelmator Pro软件获取 首先&#xff0c;Pixelmator Pro支持多种文件格式&#xff0c;包括JPEG、PNG、GIF、BMP、TIF…

springcloud微服务项目,通过gateway+nacos实现灰度发布(系统不停机升级)

一、背景 灰度发布的目的是保证系统的高可用&#xff0c;不停机&#xff0c;提升用户体验。在微服务系统中&#xff0c;原有系统不下线&#xff0c;新版系统与原有系统同时在线&#xff0c;通过访问权重在线实时配置&#xff0c;可以让少量用户先应用新版本功能&#xff0c;如…

2024软件设计师备考讲义——(8)

操作系统 〇、操作系统概述 OS作用、OS特征、OS分类 作用&#xff1a;提高计算机效率&#xff0c;人机交互友好特征&#xff1a;并发性、共享性、虚拟性、不确定性分类&#xff1a;批处理、分时、实时、网络、分布式、微机嵌入式操作系统&#xff1a;微型化、可定制、实时性、可…

Nuxt(组件-基础使用)

1.根目录下新建compoents目录&#xff0c;必须是这个名字 2.封装组件 示例代码如下&#xff08;Header.vue&#xff09;&#xff1a; <template><div><NuxtLink to"/"> 首页 </NuxtLink><NuxtLink to"/about"> 关于 </…

创龙教仪基于瑞芯微3568的ARM Cortex A-55教学实验箱 适用于人工智能 传感器 物联网等领域

适用课程 Cortex-A55 ARM嵌入式实验箱主要用于《ARM 系统开发》、《ARM 应用开发》《物联网通信技术》、《嵌入式系统设计》、《移动互联网技术》、《无线传感器网络》、《物联网设计方法与应用》、《人工智能》等课程。 适用专业 Cortex-A55 ARM嵌入式实验箱主要面向电子信…

20232831 2023-2024-2 《网络攻防实践》第4次作业

目录 20232831 2023-2024-2 《网络攻防实践》第4次作业1.实验内容2.实验过程&#xff08;1&#xff09;ARP缓存欺骗攻击&#xff08;2&#xff09;ICMP重定向攻击&#xff08;3&#xff09;SYN Flood攻击&#xff08;4&#xff09;TCP RST攻击&#xff08;5&#xff09;TCP会话…

vue3 渲染一个后端返回的图片字段渲染、table表格内放置图片

一、后端直接返回图片url 当图片字段接口直接返回的是图片url&#xff0c;可以直接放到img标签上 <img v-if"thumbLoader" class"r-image-loader-thumb" :src"resUrl" /> 二、当图片字段接口直接返回的是图片Id 那么就需要去拼一下图片…

正则表达式 vs. 字符串处理:解析优势与劣势

title: 正则表达式 vs. 字符串处理&#xff1a;解析优势与劣势 date: 2024/3/27 15:58:40 updated: 2024/3/27 15:58:40 tags: 正则起源正则原理模式匹配优劣分析文本处理性能比较编程应用 1. 正则表达式起源与演变 正则表达式&#xff08;Regular Expression&#xff09;最早…

【媒体邀约】选择媒体公关公司邀约媒体有哪些优势

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 选择媒体公关公司邀约媒体具有以下优势&#xff1a; 丰富的媒体资源&#xff1a;媒体公关公司通常与各大主流媒体、行业媒体、网络媒体等有着长期合作关系&#xff0c;拥有丰富的媒体资…

鸿蒙OS开发实例:【工具类封装-页面路由】

import common from ohos.app.ability.common; import router from ohos.router 封装app内的页面之间跳转、app与app之间的跳转工具类 【使用要求】 DevEco Studio 3.1.1 Release api 9 【使用示例】 import MyRouterUtil from ../common/utils/MyRouterUtil MyRouterUtil…