Flink ExecuteGraph构建源码解析

news/2024/7/27 7:58:13/文章来源:https://blog.csdn.net/gwc791224/article/details/136538387

文章目录

  • 前言
  • ExecutionGraph中的主要抽象概念
  • 源码核心代码入口
  • 源码核心流程:


前言

在这里插入图片描述

JobGraph构建过程中分析了JobGraph的构建过程,本文分析ExecutionGraph的构建过程。JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。


ExecutionGraph中的主要抽象概念

1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。

源码核心代码入口

ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph(completedCheckpointStore,checkpointsCleaner,checkpointIdCounter,initializationTimestamp,mainThreadExecutor,jobStatusListener,vertexParallelismStore);

在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph
在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 SchedulerBase 的构造
方法中去生成 ExecutionGraph。

源码核心流程:

DefaultExecutionGraphFactory.createAndRestoreExecutionGraph()
ExecutionGraph newExecutionGraph = createExecutionGraph(...)
DefaultExecutionGraphBuilder.buildGraph(jobGraph, ....)
// 创建 ExecutionGraph 对象
executionGraph = (prior != null) ? prior : new ExecutionGraph(...)
// 生成 JobGraph 的 JSON 表达形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
// 重点,从 JobGraph 构建 ExecutionGraph
executionGraph.attachJobGraph(sortedTopology);
// 遍历 JobVertex 执行并行化生成 ExecutioinVertex
for(JobVertex jobVertex : topologiallySorted) {// 每一个 JobVertex 对应到一个 ExecutionJobVertexExecutionJobVertex ejv = new ExecutionJobVertex(jobGraph,jobVertex);ejv.connectToPredecessors(this.intermediateResults);List<JobEdge> inputs = jobVertex.getInputs();for(int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);IntermediateResult ires =intermediateDataSets.get(edgeID);this.inputs.add(ires);// 根据并行度来设置 ExecutionVertexfor(int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge,consumerIndex);}}
}

DefaultExecutionGraphBuilder 详细代码如下:

public class DefaultExecutionGraphBuilder {public static DefaultExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor)  {final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();final JobInformation jobInformation = new JobInformation(... );// create a new execution graph, if none exists so farfinal DefaultExecutionGraph executionGraph;executionGraph = new DefaultExecutionGraph( ....);// set the basic propertiesexecutionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfor (JobVertex vertex : jobGraph.getVertices()) {String executableClass = vertex.getInvokableClassName();vertex.initializeOnMaster(new SimpleInitializeOnMasterContext(classLoader,vertexParallelismStore.getParallelismInfo(vertex.getID()).getParallelism()));}// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);// configure the state checkpointingif (isDynamicGraph) {// dynamic graph does not support checkpointing so we skip itlog.warn("Skip setting up checkpointing for a job with dynamic graph.");} else if (isCheckpointingEnabled(jobGraph)) {JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// load the state backend from the application settingsfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}final StateBackend rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend,snapshotSettings.isChangelogStateBackendEnabled(),jobManagerConfig,classLoader,log);// load the checkpoint storage from the application settingsfinal CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {applicationConfiguredStorage =                           serializedAppConfiguredStorage.deserializeValue(classLoader);final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooksfinal SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();String changelogStorage = jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE);executionGraph.enableCheckpointing(chkConfig,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,rootStorage,checkpointStatsTrackerFactory.get(),checkpointsCleaner,jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE));}return executionGraph;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_997512.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++前置声明的学习

【C】C中前置声明的应用与陷阱_前置生命如何使用-CSDN博客 首先&#xff0c;这样写会报错&#xff1a; #pragma once #include "A.h" class B {A a; public:B(void);~B(void); };#include "B.h" B::B(void) { }B::~B(void) { } #pragma once #include &…

URL?后参数有特殊字符问题

前端对于URL的参数不做处理 不处理、用URLDecoder.decode()处理、用URLEncoder.encode()处理、用URLEncoder.encode()处理后再用URLDecoder.decode()处理 结果 前端对于URL的参数用encodeURIComponent(‘XF-OPPZZD-26*316’)处理 结果 前端不处理有&字符时 结果会把后…

前端网络请求异步处理——Promise使用记录

Promise是ES6中新增的一个处理复杂异步请求的工具&#xff0c;其主要形式为&#xff1a; const baseUrl http://localhost:80 export const $request (param {}) > {console.log(请求参数, param)return new Promise((resolve, reject) > {wx.request({url: baseUrl …

海外服务器被DDOS攻击了该怎么办

在当今全球化的时代&#xff0c;越来越多的企业和组织选择将业务拓展至海外市场。然而&#xff0c;随着业务的扩大和网络的延伸&#xff0c;也面临着来自不同地区的网络威胁和攻击风险。如果您的海外服务器遭受了DDOS攻击&#xff0c;以下是一些应对措施&#xff1a; 一、立即断…

【Redis】Redis的应用场景

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Redis ⛺️稳中求进&#xff0c;晒太阳 Redis的应用场景&#xff1a; 限流 要求10s内只能访问一次 RequestMapping("xian")public String xianLiu(String sign){String sign1 …

力扣刷题

文章目录 1. 双指针1.1 两数之和1.2 三数之和1.3 盛最多水的容器1.4 接雨水 2. 字串2.1 滑动窗口最大值 3. 动态规划4. 多维动态规划4.1 最长回文字串 1. 双指针 1.1 两数之和 思路&#xff1a;因为是有序数组&#xff0c; 1.2 三数之和 题目要求不能重复 思路&#xff1a;三…

简明固体物理--晶体的形成与晶体结构的描述

简明固体物理-国防科技大学 chapter 1 Formation of Crystal Contents and roadmapQuantum Mechanics and atomic structureElectronsOld quantum theoryMethod of Quantum MechanicsDistributing functions of micro-particles BindingCrystal structure and typical crystal…

YOLOv9(2):YOLOv9网络结构

1. 前言 本文仅以官方提供的yolov9.yaml来进行简要讲解。 讲解之前&#xff0c;还是要做一些简单的铺垫。 Slice层不做任何的操作&#xff0c;纯粹是做一个占位层。这样一来&#xff0c;在parse_model时&#xff0c;ch[n]可表示第n层的输出通道。 Detect和DDetect主要区别还…

Java开发从入门到精通(一):Java的基础语法进阶

Java大数据开发和安全开发 &#xff08;一&#xff09;Java注释符1.1 单行注释 //1.2 多行注释 /* */1.3 文档注释 /** */1.4 各种注释区别1.5 注释的特点1.5 注释的快捷键 &#xff08;二&#xff09;Java的字面量&#xff08;三&#xff09;Java的变量3.1 认识变量3.2 为什么…

例行性工作(at,crontab)

目录 单一执行的例行性工作at 语法 选项 时间格式 at的工作文件存放目录 at工作的日志文件 实例 命令总结&#xff1a; 循环执行的例行性工作crond 语法 选项 crontab工作调度对应的系统服务 crontab工作的日志文件 用户定义计划任务的文件所在目录 动态查看 crontab文件格式 文…

集合拆分Lists.partition的使用

集合拆分Lists.partition的使用 集合拆分Lists.partition的使用 需要的包 import com.google.common.collect.Lists;引入maven依赖 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>21.0</…

真Unity-Editor二次开发-ScriptableObject 可自定义UI界面

关于ScriptablObject自定义 作为官方指定的&#xff0c;曾经我也吐槽过ScriptableObject很鸡肋&#xff0c;个人曾经也是强烈反对在项目中使用&#xff0c;但直到我今天看到下面这个代码&#xff0c;菜发现其实只是自己太菜鸡而已 --------------不想多写什么 -------------…

Rust组织管理,箱Crate包Package和模块module定义和区别,use关键字作用

Rust 组织管理 任何一门编程语言如果不能组织代码都是难以深入的&#xff0c;几乎没有一个软件产品是由一个源文件编译而成的。 本教程到目前为止所有的程序都是在一个文件中编写的&#xff0c;主要是为了方便学习 Rust 语言的语法和概念。 对于一个工程来讲&#xff0c;组织…

NineData与OceanBase完成产品兼容认证,共筑企业级数据库新生态

近日&#xff0c;云原生智能数据管理平台 NineData 和北京奥星贝斯科技有限公司的 OceanBase 数据库完成产品兼容互认证。经过严格的联合测试&#xff0c;双方软件完全相互兼容、功能完善、整体运行稳定且性能表现优异。 此次 NineData 与 OceanBase 完成产品兼容认证&#xf…

Manz高压清洗机S11-028GCH-High Quality Cleaner 操作使用说明492页

Manz高压清洗机S11-028GCH-High Quality Cleaner 操作使用说明492页

理解 JSON 和 Form-data 的区别

在讨论现代网络开发与API设计的语境下&#xff0c;理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里&#xff0c;特别值得关注的是两种主流数据格式&#xff1a;JSON与Form-data。尽管它们的终极目标一致&#xff0c;即数据传输的高效性和可靠性&#xff0c;但…

wps没保存关闭了怎么恢复数据?数据恢复这样做

WPS文件已成为我们不可或缺的一部分。从撰写报告、制作表格到展示演讲&#xff0c;WPS系列软件为我们提供了极大的便利。然而正如任何电子设备都可能遇到的问题一样&#xff0c;WPS文件有时也可能出现损坏的情况&#xff0c;这无疑给我们的工作带来了不小的困扰。 那么当WPS文件…

RPC——远程过程调用

一、RPC介绍 1.1 概述 RPC&#xff08;Remote Procedure Call Protocol&#xff09; 远程过程调用协议。RPC是一种通过网络从远程计算机程序上请求服务&#xff0c;不需要了解底层网络技术的协议。RPC主要作用就是不同的服务间方法调用就像本地调用一样便捷。 1.2 RPC框架 …

【项目笔记】java微服务:黑马头条(day02)

文章目录 app端文章查看&#xff0c;静态化freemarker,分布式文件系统minIO1)文章列表加载1.1)需求分析1.2)表结构分析1.3)导入文章数据库1.3.1)导入数据库1.3.2)导入对应的实体类 1.4)实现思路1.5)接口定义1.6)功能实现1.6.1)&#xff1a;导入heima-leadnews-article微服务&am…

JEDEC标准介绍及JESD22全套下载

JEDEC标准 作为半导体相关的行业的从业者&#xff0c;或多或少会接触到JEDEC标准。标准对硬件系统的设计、应用、验证&#xff0c;调试等有着至关重要的作用。 JEDEC&#xff08;全称为 Joint Electron Device Engineering Council&#xff09;是一个电子组件工程标准制定组织…