StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑

news/2024/7/27 8:45:48/文章来源:https://blog.csdn.net/hiliang521/article/details/136485967

文章目录

    • 一. StreamTask核心组件与能力
    • 二. OneInputStreamTask接入网络数据并处理
    • 三. 处理数据
      • 1. StreamElement类别
      • 2. 业务数据处理逻辑
    • 四. 小结

先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的,这里以OneInputStreamTask为例进行说明。

一. StreamTask核心组件与能力

如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。

OneInputStreamTask
public void init() throws Exception {StreamConfig configuration = getConfiguration();int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {// 创建CheckpointedInputGateCheckpointedInputGate inputGate = createCheckpointedInputGate();TaskIOMetricGroup taskIOMetricGroup = getEnvironment().getMetricGroup().getIOMetricGroup();taskIOMetricGroup.gauge("checkpointAlignmentTime", inputGate::getAlignmentDurationNanos);// 创建DataOutput组件DataOutput<IN> output = createDataOutput();StreamTaskInput<IN> input = createTaskInput(inputGate, output);// 创建StreamOneInputProcessorinputProcessor = new StreamOneInputProcessor<>(input,output,getCheckpointLock(),operatorChain);}headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  1. 创建CheckpointedInputGate:CheckpointedInputGate是对InputGate进行封装,实现对CheckpointBarrier对齐的功能。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。
  2. 创建DataOutput组件:在StreamTaskInput中会将 接入的数据 通过DataOutput组件输出到算子链的HeaderOperator中。
  3. 创建StreamTaskInput组件:用于接收数据,将InputGate和DataOutput作为内部成员,完成对数据的接入和输出。
  4. 创建StreamOneInputProcessor数据处理器:此组件会被Task线程模型调度并执行,实现周期性地从StreamTaskInput组件中读取数据元素并处理。

小结:

OneInputStreamTask初始化过程中,包括创建StreamTaskInput和DataOutput组件。

 
 
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。

二. OneInputStreamTask接入网络数据并处理

StreamTask.processInput()方法定义了处理数据的主要流程。

  1. 数据最终会通过MailboxProcessor调度与执行
  2. 调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理
  3. 调度StreamOneInputProcessor组件,串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件,最终完成数据元素的处理操作。
StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput();// 上游如果还有数据,则继续等待执行if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}// 上游如果没有数据,则发送控制消息到控制器if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume);
}

 
接下来详细看StreamOneInputProcessor.processInput()

emitNext():通过StreamTaskNetworkInput接收数据元素,并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部,用于将数据元素输出到算子链中

public InputStatus processInput() throws Exception {InputStatus status = input.emitNext(output);if (status == InputStatus.END_OF_INPUT) {synchronized (lock) {operatorChain.endHeadOperatorInput(1);}}return status;
}

StreamTaskNetworkInput.emitNext():处理数据逻辑。


//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。public InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// 从Deserializer中获取数据元素if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);// 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}// 如果result是完整的数据元素,则调用processElement()方法进行处理if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 从checkpointedInputGate中拉取数据//如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();// 如果有数据则调用processBufferOrEvent()方法进行处理if (bufferOrEvent.isPresent()) {processBufferOrEvent(bufferOrEvent.get());} else {// 如果checkpointedInputGate已关闭,则返回END_OF_INPUTif (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");if (!checkpointedInputGate.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}
}

 

三. 处理数据

1. StreamElement类别

StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。

//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {// StreamRecord类型if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());// Watermark类型} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);// LatencyMarker类型} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());// StreamStatus类型} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}
}

 

2. 业务数据处理逻辑

对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链中进行处理。

如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator

OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {synchronized (lock) {//累加器计算消费数量numRecordsIn.inc();//通过算子链处理operator.setKeyContextElement1(record);operator.processElement(record);}
}

 

四. 小结

Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。

 
 
《Flink设计与实现:核心原理与源码解析》 – 张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_999672.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Lesson 6 Convolutional Neural Network(CNN)

听课&#xff08;李宏毅老师的&#xff09;笔记&#xff0c;方便梳理框架&#xff0c;以作复习之用。本节课主要讲了CNN的适用范围&#xff0c;整体架构与工作流程&#xff0c;CNN的应用&#xff0c;CNN的缺点以及解决方法。 1. CNN的输入与输出 CNN是专门为了图像而设计的一…

【学习笔记】开源计算机视觉库OPENCV学习方案

本文中&#xff0c;我们试图提供一些学习OpenCV的详细和实用资源&#xff0c;这些资源包括基础知识、进阶技巧以及实践项目&#xff0c;旨在帮助初学者和进阶学习者更好地掌握和使用OpenCV库。 一、学习资源 官方文档&#xff1a;OpenCV的官方文档是学习OpenCV的最佳起点。它包…

OpenHarmony教程指南-自定义通知推送

介绍 本示例主要展示了通知过滤回调管理的功能&#xff0c;使用ohos.notificationManager 接口&#xff0c;进行通知监听回调&#xff0c;决定应用通知是否发送。 效果预览 使用说明 1.在使用本应用时&#xff0c;需安装自定义通知角标应用&#xff1b; 2.在主界面&#xff…

⭐每天一道leetcode:67.二进制求和(简单;模拟过程)

⭐今日份题目 给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 示例1 输入:a "11", b "1" 输出&#xff1a;"100" 示例2 输入&#xff1a;a "1010", b "1011" 输出&#xff1a;"…

【❤️算法笔记❤️】-每日一刷-19、删除链表的倒数第 N个结点

文章目录 题目思路解答 题目 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5]示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&#xff1a;[]示例 3&…

【数据分享】2000-2022年全国1km分辨率的逐年PM2.5栅格数据(免费获取)

PM2.5作为最主要的空气质量指标&#xff0c;在我们日常研究中非常常用&#xff01;之前我们给大家分享了2013-2022年全国范围逐日的PM2.5栅格数据&#xff08;可查看之前的文章获悉详情&#xff09;&#xff01; 本次我们给大家带来的是2000-2022年全国范围的逐年的PM2.5栅格数…

ubuntu 卸载miniconda3

一开始安装路径错了&#xff0c;需要重新安一次&#xff0c;就一起记录了。 前提是这种方式安装&#xff1a; ubuntu安装miniconda3管理python版本-CSDN博客 删除Miniconda的安装目录 这目录就是你选择安装的时候指定的&#xff0c;如果记不得了,可以这样查看 which conda 这…

07-prometheus的自定义监控-pushgateway工具组件

一、概述 pushgateway用于自定义监控节点、节点中服务的工具&#xff0c;用户可以通过自定义的命令获取数据&#xff0c;并将数据推送给pushgateway中&#xff1b; prometheus服务&#xff0c;从pushgateway中获取监控数据&#xff1b; 二、部署pushgateway 我们可以“随便”找…

Python笔记(四)—— Python函数

4.1 函数的初体验 函数 函数&#xff1a;是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能的代码段 name "itheima" length len(name) print(length) 运行结果&#xff1a; 思考&#xff1a;为什么随时都可以使用len()统计长度 因为&#xff…

数据结构(一)综述

一、常见的数据结构 数据结构优点缺点数组查找快增删慢链表增删快查找慢哈希表增删、查找都快数据散列&#xff0c;对存储空间有浪费栈顶部元素插入和取出快除顶部元素外&#xff0c;存取其他元素都很慢队列顶部元素取出和尾部元素插入快存取其他元素都很慢二叉树增删、查找都快…

CRLF漏洞

CRLF 注入漏洞&#xff0c;是因为 Web 应用没有对用户输入做严格验证&#xff0c;导致攻击者可以输入一些恶意字符。攻击者一旦向请求行或首部中的字段注入恶意的 CRLF&#xff0c;就能注入一些首部字段或报文主体&#xff0c;并在响应中输出&#xff0c;所以又称为 HTTP 响应拆…

深入了解304缓存原理:提升网站性能与加载速度

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

自动驾驶革命:解密端到端背后的数据、算力和AI奇迹

作者 |毫末智行数据智能科学家 贺翔 编辑 |祥威 最近&#xff0c;特斯拉FSD V12的发布引发了业界对端到端自动驾驶的热议&#xff0c;业界纷纷猜测FSD V12的强大能力是如何训练出来的。从马斯克的测试视频可以大致归纳一下FSD V12系统的一些核心特征&#xff1a; 训练数据&am…

DualSPHysics使用FlowTool工具进行后处理,定义的粒子全在domains外,解决办法

可以知道DualSPHysics官方给了后处理工具使用的示例&#xff0c;如下就是官方给的案例&#xff0c;使用FlowTool工具可以计算出在两个实体domain内的粒子数。 然而我自己也定义了2个domains&#xff0c;但是计算出来Tank1和Tank2里边的粒子数一直是空的&#xff0c;粒子全部在…

树莓派4B Ubuntu20.04 Python3.9安装ROS踩坑记录

问题描述 在使用sudo apt-get update命令更新时发现无法引入apt-pkg,使用python3 -c "import apt_pkg"发现无法引入&#xff0c;应该是因为&#xff1a;20.04的系统默认python是3.8&#xff0c;但是我换成了3.9所以没有编译文件&#xff0c;于是使用sudo update-alte…

ArcGIS学习(十三)多源数据下的城市街道功能评估

ArcGIS学习(十三)多源数据下的城市街道功能评估 本任务带来的内容是多元数据下的城市街道功能评估。本任务包括两个关卡: 城市街道空间中观解读 城市街道功能详细评价 首先,我们来看看本任务的分析思路。 1.城市街道空间中观解读 下面我们正式进入第一关的内容一- 城市…

计算机网络——24路由器组成

路由器组成 路由器的结构概况 高层面(非常简化的)通用路由器体系架构 路由&#xff1a;运行路由选择算法&#xff0f;协议 (RIP, OSPF, BGP) - 生成 路由表转发&#xff1a;从输入到输出链路交换数据报 - 根据路由表进行分组的转发 输入端口功能 分布式交换&#xff1a; 根…

概要了解postman、jmeter 、loadRunner

postman还蛮好理解的&#xff0c;后续复习的话着重学习关联接口测试即可&#xff0c;感觉只要用几次就会记住&#xff1a; 1 从接口的响应结果当中提取需要的数据 2 设置成环境变量/全局变量&#xff08;json value check 、set environment para 3写入到下一个接口的请求数据中…

Winform窗体随着屏幕的DPI缩放,会引起窗体变形及字体变形,superTabControl标签字体大小不匹配

一、前言 superTabControl做的浏览器标签(cefsharp)在缩放比例(125%,150%时字体不协调) 物联网浏览器,定制浏览器,多媒体浏览器(支持H264)参考栏目文章即可 二、配置参数 app.manifest参数 dpiAware =true <application xmlns="urn:schemas-microsoft-c…

消息队列及发布订阅

概述 消息的发布、订阅是应用进程进行数据交换的常用方式&#xff0c;对于进程没有角色限制&#xff0c;既可以扮演发布也可以扮演订阅角色&#xff1b;该模式是进程间通信的异步模式。 mq_send函数说明 函数 mq_send() 会将参数 msg_ptr 指向的内容发送给参数mqdes 指向的消…