【Flink系列七】TableAPI和FlinkSQL初体验

news/2024/5/20 1:55:05/文章来源:https://blog.csdn.net/qq_33592535/article/details/134960513

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL

  • Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
  •  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

基本程序结构

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;// 可以从流中创建表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataStream<String> inputStream = env.readTextFile("xxx文件地址")//包装转换
DataStream<SensorReading> dataStream = xx转换;//创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//tableEnv
Table dataTable = tableEnv.fromDataSource(dataStream);Table resultTable = dataTable.select("id, temp")
.where("id=1");// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);// 执行sql
Table resultTable1 = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");// 输出数据
tableEnv.toAppendStream(resultTable1, Sensor.class);

TableEnvironment

TableAPI的核心概念是TableEnvironment,它主要负责

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )

Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()//.inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);

或者也可以按程序示例中的方式,从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

在 Catalog 中创建表

---先解释一下什么是Catalog:数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。cataLog详细解释

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

创建表

在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:

// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);// 执行sql
Table resultTable = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);

注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享。

Connector Tables

另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。

// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenOptions.ROWS_PER_SECOND, 100).build();tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

扩展表标识符 

表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。

用户可以指定一个 catalog 和数据库作为 “当前catalog” 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。

标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。

// 默认
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");// 注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("exampleView", table);//注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 指定数据库的名称为 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);// 注册一个view 命名为  'example.View', 在 catalog 中命名为'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("`example.View`", table);// 注册view名为 'exampleView', 指定 catalog命名为'other_catalog'
// 指定数据库的名称为  'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查询表

tableApi

Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(...).select(),其中 groupBy(...) 指定 table 的分组,而 select(...) 在 table 分组上的投影。

所有流处理和批处理表支持的 Table API 算子--> 文档 Table API 

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// 注册表// 获取order表
Table orders = tableEnv.from("Orders");
// 从所有的数据中过滤 cCountry=France,然后聚合
Table revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName")).select($("cID"), $("cName"), $("revenue").sum().as("revSum"));// emit or convert Table
// execute query

SQL

Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。

Flink对流处理和批处理表的SQL支持-->文档 SQL 

一个示例:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// 注册 Orders table// 执行sql处理
Table revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// 展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。
// 执行sql获取到的数据 and 提交到 "RevenueFrance"表中
tableEnv.executeSql("INSERT INTO RevenueFrance " +"SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");

输出表

Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink

演示如何输出 Table

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// create an output Table
final Schema schema = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.BIGINT()).build();tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/path/to/file").format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build()).build());// compute a result Table using Table API operators and/or SQL queries
Table result = ...//方法 Table.executeInsert(String tableName) 将 Table 发送至已注册的 TableSink。该方法通过名称在 catalog 中查找 TableSink 并确认Table schema 和 TableSink schema 一致。
result.executeInsert("CsvSinkTable");

翻译与执行查询 

不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。 查询在内部表示为逻辑查询计划,并被翻译成两个阶段:

  1. 优化逻辑执行计划
  2. 翻译成 DataStream 程序

Table API 或者 SQL 查询在下列情况下会被翻译:

  1. 当 T ableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
  2. 当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
  3. 当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
  4. 当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
  5. 当 Table 被转换成 DataStream 时(参阅与 DataStream 集成)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEn vironment.execute() 时被执行。

查询优化

Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如:

  1.  基于 Apache Calcite 的子查询解相关
  2. 投影剪裁
  3. 分区剪裁
  4. 过滤器下推
  5. 子计划消除重复数据以避免重复计算
  6. 特殊子查询重写,包括两部分:
    1. 将 IN 和 EXISTS 转换为 left semi-joins
    2. 将 NOT IN 和 NOT EXISTS 转换为 left anti-join
  7. 可选 join 重新排序
    1. 通过 table. optimizer.join-reorder-enabled 启用

注意: 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。

优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个算子(例如 io,cpu,网络和内存)的细粒度成本来做出明智的决策。

解释表

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:

  1. 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
  2. 优化的逻辑查询计划,以及
  3. 物理执行计划。

可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 EXPLAIN 页面.

以下代码展示了一个示例以及对给定 Table 使用 Table.explain() 方法的相应输出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1.where($("word").like("F%")).unionAll(table2);System.out.println(table.explain());

上述例子的结果是:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:  +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

参考资料:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/#expanding-table-identifiers

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_568831.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年网络安全(黑客)——自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟…

深度学习在人体动作识别领域的应用:开源工具、数据集资源及趋动云GPU算力不可或缺

人体动作识别检测是一种通过使用计算机视觉和深度学习技术&#xff0c;对人体姿态和动作进行实时监测和分析的技术。该技术旨在从图像或视频中提取有关人体姿态、动作和行为的信息&#xff0c;以便更深入地识别和理解人的活动。 人体动作识别检测的基本步骤包括&#xff1a; 数…

使用kibana查看es数据

前提 已安装好es还有kibana&#xff0c;启动es及kibana 修改kibana配置文件 在kibana文件中配置es的地址及索引&#xff0c;我的kibana安装在mac端了 修改配置文件 /usr/local/opt/kibana/config/kibana.yml 重启kibana 配置kibana 下面查询数据 例如查询 traceId 为192…

【TES720D-KIT】基于国内某厂商FMQL20S400全国产化ARM开发套件(核心板+底板)

板卡概述 TES720D-KIT是专门针对我司TES720D&#xff08;基于国内某厂商FMQL20S400的全国产化ARM核心板&#xff09;的一套开发套件&#xff0c;它包含1个TES720D核心板&#xff0c;加上一个TES720D-EXT扩展底板。 FMQL20S400是国内某厂商电子研制的全可编程融合芯片&#xf…

案例044:基于微信小程序的消防隐患在线举报系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

Python将已标注的两张图片进行上下拼接并修改、合并其对应的Labelme标注文件(v2.0)

Python将已标注的两张图片进行上下拼接并修改、合并其对应的Labelme标注文件&#xff08;v2.0&#xff09; 前言前提条件相关介绍实验环境上下拼接图片并修改、合并其对应的Labelme标注文件代码实现输出结果 前言 此版代码&#xff0c;相较于Python将已标注的两张图片进行上下拼…

如何使用ArcGIS Pro裁剪影像

对影像进行裁剪是一项比较常规的操作&#xff0c;因为到手的影像可能是多种范围&#xff0c;需要根据自己需求进行裁剪&#xff0c;这里为大家介绍一下ArcGIS Pro中裁剪的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所使用的数据是从水经微图中下载的影像和行政区…

c语言:指针与数组

目录 使用指针访问数组 使用第一个元素获取数组首地址 使用数组名获取数组首地址 使用指针访问数组等价于下标访问 使用指针访问数组 指针类型的加减运算可以使指针内保存的首地址移动。指针类型加n后。首地址向后移动 n * 步长 字节。 指针类型减n后。首地址向前移动 n *…

26种主流的神经网络偏微分方程求解方法汇总

偏微分方程&#xff08;PDE&#xff09;是数学中一门重要的分支&#xff0c;应用范围广泛涉及自然科学、工程技术、生物学领域等。然而我们都知道&#xff0c;偏微分方程的求解过程异常艰难&#xff0c;如果碰上了特别复杂的&#xff0c;传统的计算方法可能需要数百万个CPU小时…

智能高效|AIRIOT智慧货运管理解决方案

随着全球贸易的增加和消费需求的不断扩大&#xff0c;货运行业面临更大的压力&#xff0c;传统货运行业运输效率低下、信息不透明&#xff0c;往往存在如下的运维问题和管理痛点&#xff1a; 无法实时定位和追踪信息&#xff1a;无法提供实时的货物位置信息&#xff0c;以便随…

解决firefox(火狐)浏览器使用transform: scale导致的border不显示或显示不全的问题;

最近火狐遇到了此问题&#xff0c;查了许久没有解决办法也有说是因为火狐不支持小于1px单位的&#xff0c;也有说火狐浏览器本身的问题&#xff0c;然后也没有解决方案&#xff0c;最后没办法只能用最笨的方法解决。。。。 只针对Firefox使用CSS&#xff0c;使用’-moz-documen…

poe与chatgpt那个功能更强大

在当前的人工智能领域&#xff0c;Poe Al Chat以其卓越的聊天能力和实用的功能&#xff0c;受到了大家的广泛关注和喜爱。本文好为您个绍Poe Al Chat的功能&#xff0c;以及我们国内用户如何进行充值订阅。Poe Al Chat是一个基于OpenAl的GPT模型开发的人工智能聊天工具。它能够…

代立冬:基于Apache Doris+SeaTunnel 实现多源实时数据仓库解决方案探索实践

大家好&#xff0c;我是白鲸开源的联合创始人代立冬&#xff0c;同时担任 Apache DolphinScheduler 的 PMC chair 和 SeaTunnel 的 PMC。作为 Apache Foundation 的成员和孵化器导师&#xff0c;我积极参与推动多个开源项目的发展&#xff0c;帮助它们通过孵化器成长为 Apache …

最简单的pixel刷机和安装面具、lsposed

一 下载手机对应的系统 1&#xff0c;手机usb连接然后重启进入Fastboot模式&#xff1a;adb reboot bootloader2&#xff0c;找到你下载的系统&#xff0c;Windows 系统 直接运行 flash-all.bat上图 &#xff1a;左边就是安卓11和12的系统&#xff0c;右边是对应的手机型号 下…

简记修复改etc下profile失败的补救措施

现象 下午配置环境变量一个小小的失误&#xff0c;把etc文件夹下的profile改崩了&#xff0c;导致很多基本命令都用不了&#xff0c;服务器出现了下面这种情况。 [rootxxxx ~]# vi /etc/profile -bash: vi: command not found [rootxxxxx~]# vi -bash: vi: command not found…

深度学习第5天:GAN生成对抗网络

☁️主页 Nowl &#x1f525;专栏 《深度学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 ​​ 文章目录 一、GAN1.基本思想2.用途3.模型架构 二、具体任务与代码1.任务介绍2.导入库函数3.生成器与判别器4.预处理5.模型训练6.图片生成7.不同训练轮次的结果对比 一…

关于uniapp X 的最新消息

uni-app x 是什么&#xff1f; uni-app x&#xff0c;是下一代 uni-app&#xff0c;是一个跨平台应用开发引擎。 uni-app x 没有使用js和webview&#xff0c;它基于 uts 语言。在App端&#xff0c;uts在iOS编译为swift、在Android编译为kotlin&#xff0c;完全达到了原生应用的…

GoLong的学习之路,进阶,微服务之序列化协议,Protocol Buffers V3

这章是接上一章&#xff0c;使用RPC包&#xff0c;序列化中没有详细去讲&#xff0c;因为这一块需要看的和学习的地方很多。并且这一块是RPC中可以说是最重要的一块&#xff0c;也是性能的重要影响因子。今天这篇主要会讲其使用方式。 文章目录 Protocol Buffers V3 背景以及概…

【JavaWeb笔记】单选框,结合Servlet

各个部分的作用 jsp部分 form action"..."&#xff1a;表单标签&#xff0c;供用户提交数据。内部的submit点击之后相当于是点action的URL input type"radio"&#xff1a;输入类型为单选框。把name设置为一样的&#xff0c;这样效果上就是单选&#xff…

java飞翔的鸟游戏

A.准备工作 Bird类 Column类 BirdGame类 Ground类 B.中间过程 准备工作&#xff1a; 安装Java开发环境&#xff08;JDK&#xff09;。选择一个集成开发环境&#xff08;IDE&#xff09;&#xff0c;如Eclipse、IntelliJ IDEA或NetBeans。 创建项目&#xff1a; 在IDE中创建一个…