FlinkSQL之Flink SQL Join二三事

news/2024/4/29 17:24:24/文章来源:https://blog.csdn.net/wen811651208/article/details/136977267

​ Flink SQL支持对动态表进行复杂而灵活的连接操作。 为了处理不同的场景,需要多种查询语义,因此有几种不同类型的 Join。默认情况下,joins 的顺序是没有优化的。表的 join 顺序是在 FROM 从句指定的。可以通过把更新频率最低的表放在第一个、频率最高的放在最后这种方式来微调 join 查询的性能。需要确保表的顺序不会产生笛卡尔积,因为不支持这样的操作并且会导致查询失败。

​ Flink Join根据输入源形式不同可以分为双流Join维表Join其他Join多种形式,下面根据大类分别介绍各自特点。

一 双流JOIN

​ 在正式进入FlinkSQL Join场景研究之前,首先我们先介绍一下在FlinkSQL场景下常见的Kafka数据流分类。截止到Flink1.18为止,目前常见的Kafka数据流包括不含键更新的普通Kafka数据流(即Kafka SQL Connector数据流)和包含键更新的Kafka数据流(即Upsert-Kafka SQL Connector数据流)两种。

1 Regular Join

​ Regular join 是最通用的 join 类型。在这种 join 下,join 两侧表的任何新记录或变更都是可见的,并会影响整个 join 的结果。对于流式查询,regular join 的语法是最灵活的,允许任何类型的更新(插入、更新、删除)输入表。 然而,这种操作具有重要的操作意义:Flink 需要将 Join 输入的两边数据永远保持在状态中。 因此,计算查询结果所需的状态可能会无限增长,这取决于所有输入表的输入数据量。你可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这样做可能会影响查询的正确性。

​ 左右两边流数据都能驱动join,左侧流新加入数据会和右侧流状态中所有匹配记录join上;同理,右侧流新增数据会和左侧流所有匹配记录join上,外连接不会等待,即使Join不上也会即及时输出,待对侧数据到来通过回撤修复数据。

  • Inner Join

    根据 join 限制条件返回一个简单的笛卡尔积。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。

    select t1.order_id    as order_id,t2.product_id  as product_id,t1.create_time as create_time
    from tbl_order t1 
    join tbl_order_product t2 on t1.order_id = t2.order_id 
    ;
    

    Inner join不会产生回撤流,source端可以是Kafka SQL Connector也可以试Upsert-kafka SQL Connector,也可以是混合模式,sink端理论均可以是Kafka Connector,但如果输入端有重复输入,输出端可以设置成Upsert-Kafka SQL Connector接收数据。Upsert-Kafka SQL Connector注意设置主键。

  • outer join

    返回所有符合条件的笛卡尔积(即:所有通过 join 条件连接的行),加上所有外表没有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。

    select t1.order_id    as order_id,t2.product_id  as product_id,t1.create_time as create_time
    from tbl_order t1 
    left join tbl_order_product t2 on t1.order_id = t2.order_id 
    ;select t1.order_id    as order_id,t2.product_id  as product_id,t1.create_time as create_time
    from tbl_order t1 
    right join tbl_order_product t2 on t1.order_id = t2.order_id 
    ;select t1.order_id    as order_id,t2.product_id  as product_id,t1.create_time as create_time
    from tbl_order t1 
    full join tbl_order_product t2 on t1.order_id = t2.order_id 
    ;
    

    Outer Join会产生回撤流,source端可以是Kafka SQL Connector也可以是Upsert-kafka SQL Connector,也可以是混合模式,sink端理仅支持设置成Upsert-Kafka SQL Connector接收数据。Upsert-Kafka SQL Connector注意设置主键。

  • Regular Join总结应用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):

    • a join a => a|u

    • u join u => a|u

    • a join u => a|u

    • a left join a => u

    • u left join u => u

    • a left join u => u

2 Interval Join

​ 返回一个符合 join 条件和时间限制的简单笛卡尔积。Interval join 需要至少一个 equi-join 条件和一个 join 两边都包含的时间限定 join 条件。范围判断可以定义成就像一个条件(<, <=, >=, >),也可以是一个 BETWEEN 条件,或者两边表的一个相同类型(即:处理时间 或 事件时间)的时间属性 的等式判断。

​ 下面列举了一些有效的 interval join 时间条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

​ 对于流式查询,对比 regular join,interval join 只支持有时间属性的Append-Only表。 由于时间属性是递增的,Flink 从状态中移除旧值也不会影响结果的正确性,即interval join会根据间隔自动维护状态大小,不丢弃状态也不会让状态无限增长。

  • Inner join

    select * 
    from tbl_order t1 
    join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time
    ;
    

    ​ 输入源只支持Kafka SQL Connector,不支持任何一方回撤流,这也可以理解,因为Interval Join是有时间属性参与Join的。输出数据可以是Kafka SQL Connector也可以试Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意键设计。

  • Outer join

    select * 
    from tbl_order t1 
    left join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time
    ;select * 
    from tbl_order t1 
    right join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time
    ;select * 
    from tbl_order t1 
    full join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time
    ;
    

    ​ 输入端仅至此Kafka SQL Connector,不支持任何一方回撤流,这也可以理解,因为Interval Join是有时间属性参与Outer Join的。输出数据可以是Kafka SQL Connector也可以试Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意键设计。

  • 注意点

    • 测试要配置并行度为1,否则右表关联不上数据因为水位线识别不到会而不超时输出;

      executionEnvironment.setParallelism(1);
      
    • left join右表关联不上输出条件

      • 右表关联数据出现触发输出
      • 超时触发器输出关联不上数据
  • Interval Join总结应用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):

    • a join a => a|u

    • a left join a => a|u

3 Temporal Join(Snapshot Join)

​ 时态表(Temporal table)是一个随时间变化的表:在 Flink 中被称为动态表。时态表中的行与一个或多个时间段相关联,所有 Flink 中的表都是时态的(Temporal)。 时态表包含一个或多个版本的表快照,它可以是一个变化的历史表,跟踪变化(例如,数据库变化日志,包含所有快照)或一个变化的维度表,也可以是一个将变更物化的维表(例如,存放最终快照的数据表)。

  • Inner join

    select t1.order_id    as order_id,t1.user_id     as user_id,t2.user_name   as user_name,t1.create_time as create_time
    from tbl_order t1 
    join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id 
    ;
    

    特点:

    • 左右两边事件时间属性,标识两侧流join场景,如果处理时间请参考Lookup join;
    • 只支持event-time,如果是processing-time那么就变成join最新版本数据,同Lookup Join;
    • 左表支持append流和upsert流;
    • 右表只支持upsert流;
    • 输出可以是append流或者upsert流;
    • 左表触发计算,右表更新不触发计算;
    • 设置超时时间:tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
  • Left join

    select t1.order_id    as order_id,t1.user_id     as user_id,t2.user_name   as user_name,t1.create_time as create_time
    from tbl_order t1 
    left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id 
    ;
    

    特点:

    • 左右两边事件时间属性,标识两侧流join场景,如果处理时间请参考Lookup join;
    • 只支持event-time,如果是processing-time那么就变成join最新版本数据,同Lookup Join;
    • 左表支持append流和upsert流;
    • 右表只支持upsert流;
    • 输出可以是append流或者upsert流;
    • 左表触发计算,右表更新不触发计算;
    • 设置超时时间:tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
  • Snapshot Join总结应用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):

    • a join u => a|u

    • u join u => u

    • a left join u => a|u

    • u left join u => u

4 Window Join

​ 窗口关联就是增加时间维度到关联条件中。在此过程中,窗口关联将两个流中在同一窗口且符合 join 条件的元素 join 起来。窗口关联的语义和DataStream window join相同。

​ 在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态。

​ 通常,窗口关联和窗口表值函数一起使用。而且,窗口关联可以在其他基于窗口表值函数的操作后使用,例如窗口聚合,窗口 Top-N和窗口关联。

​ 目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件。

​ 窗口关联支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。

  • 语法

    select ...
    from l [left|right|full outer] join r -- l and r are relations applied windowing TVF
    on l.window_start = r.window_start and l.window_end = r.window_end and ...
    
  • 注意

    • 当前版本窗口Join必须同时指定window_start和window_end等值条件

    • 窗口Join不支持源是upsert流的情况

  • 限制

    • Join 子句的限制

    ​ 目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件。未来,如果是滚动或滑动窗口,只需要在 join on 条件中包含窗口开始相等即可。

    • 输入的窗口表值函数的限制

    ​ 目前,关联的左右两边必须使用相同的窗口表值函数。这个规则在未来可以扩展,比如:滚动和滑动窗口在窗口大小相同的情况下 join。

    • 窗口表值函数之后直接使用窗口关联的限制

    ​ 目前窗口关联支持作用在滚动(TUMBLE)、滑动(HOP)和累积(CUMULATE)窗口表值函数之上,但是还不支持会话窗口(SESSION)。

  • Snapshot Join总结应用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):

    • a join a => a|u

    • a left join a => a|u

二 维表JOIN

5 Lookup Join(processing-time temporal join)

​ lookup join 通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性,另一个表由查找源连接器(lookup source connnector)支持。通常使用基于处理时间的流表与外部版本表(例如 mysql、hbase)的最新版本相关联(即processing-time temporal join 常常用在使用外部系统来丰富流的数据)。

​ 通过定义一个处理时间属性,这个 join 总是返回最新的值。可以将 build side 中被查找的表想象成一个存储所有记录简单的 HashMap<K,V>。 这种 join 的强大之处在于,当无法在 Flink 中将表具体化为动态表时,它允许 Flink 直接针对外部系统工作。

​ Join操作由流端触发,当新增一个流数据,会查询外部DB映射,获取数据补全后发出结果数据。

  • inner join

    select t1.order_id    as order_id,t1.user_id     as user_id,t2.user_name   as user_name,t1.create_time as create_time
    from tbl_order t1 
    join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id 
    ;
    

    特点:

    • Lookup join只支持inner join和left join;
    • 源必须声明处理时间,即row_time as proctime(),如果源声明为事件时间,那么要走Snapshot join方式;
    • 源支持kafka和upsert-kafka连接器
    • 输出支持kafka和upsert-kafka连接器
    • 查询外部表注意使用异步IO/Cache特性优化外表查询性能
  • Left join

    select t1.order_id    as order_id,t1.user_id     as user_id,t2.user_name   as user_name,t1.create_time as create_time
    from tbl_order t1 
    left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id 
    ;
    

    特点:

    • Lookup join只支持inner join和left join;
    • 源必须声明处理时间,即row_time as proctime(),如果源声明为事件时间,那么要走Snapshot join方式;
    • 源支持kafka和upsert-kafka连接器
    • 输出支持kafka和upsert-kafka连接器
    • 查询外部表注意使用异步IO/Cache特性优化外表查询性能
  • Lookup Join总结应用模式如下(a代表Append-Only流,s代表外表静态表):

    • a join s => a|u

    • u join s => a|u

    • a left join s => a|u

    • u left join s => a|u

三 其他JOIN

6 Array Expansion

​ 对于输入的包含数组列的单行数据,返回给定数组中每个元素的新行,拆分后的数据除解析数组元素外,其他元素与原始行数据一致。

selectorder_id,order_tag,tag
from tbl_order_source cross join unnest(order_tag) as t(tag)
;

特征:

  • 输入数据可以是Append或者Upsert
  • 输出数据可以是Append或者Upsert

7 Table Function

​ 将表与表函数的结果联接。左侧(外部)表的每一行都与表函数的相应调用产生的所有行相连接。用户自定义表函数必须在使用前注册。

​ 对于是inner join,如果表函数调用返回一个空结果,那么左表的这行数据将不会输出。对于left join,如果表函数调用返回了一个空结果,则保留相应的行,并用空值填充未关联到的结果。当前,针对 lateral table 的 left outer join 需要 ON 子句中有一个固定的 TRUE 连接条件。

select order_id,order_tag,tag
from tbl_order_source
left join lateral table(table_func(order_tag)) t(tag) on true
;

特征:

  • 输入数据可以是Append或者Upsert
  • 输出数据可以是Append或者Upsert

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1027830.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库审计和安全

互联网、云计算、物联网等新技术的应用&#xff0c;数据安全面临前所未有的挑战!我国信息安全已从终端安全、网络安全&#xff0c;发展到数据安全建设阶段。数据安全的核心是对“数据”全方位的安全防护&#xff0c;其产品及解决方案直接涉及国家和企业的核心机密 核心数据库存…

win10微软拼音输入法 - bug - 在PATH变量为空的情况下,无法输入中文

文章目录 win10微软拼音输入法 - bug - 在PATH变量为空的情况下&#xff0c;无法输入中文概述笔记实验前提条件100%可以重现 - 无法使用win10拼音输入法输入中文替代的输入法软件备注END win10微软拼音输入法 - bug - 在PATH变量为空的情况下&#xff0c;无法输入中文 概述 在…

ES6学习之路:迭代器Iterator和生成器Generator

迭代器 一、知识背景 什么是迭代器 迭代器就是在一个数据集合中不断取出数据的过程迭代和遍历的区别 遍历是把所有数据都取出迭代器注重的是依次取出数据&#xff0c;它不会在意有多少数据&#xff0c;也不会保证能够取出多少或者能够把数据都取完。比如斐波那契额数列&#…

linux nginx配置ssl, 实现https+ip访问

mkdir sslZhengShu openssl req -newkey rsa:2048 -nodes -keyout ca.key -out ca.csr openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt openssl genrsa -out server.key 2048 openssl req -new -key server.key -out server.csr 和之前输入一样即可 …

Python基本运算

1.逻辑运算符 第四行会有黄色的下划线是因为这个不是系统推荐的写法&#xff0c;系统推荐的是第五行的链式比较&#xff1b; 2.短路求值 对于and而言&#xff0c;左边的语句是false&#xff0c;那么整体一定是false,右边的表达式就不会进行计算&#xff1b; 对于or而言&…

FTP 文件传输服务

FTP连接 控制连接&#xff1a;TCP 21&#xff0c;用于发送FTP命令信息 数据连接&#xff1a;TCP 20&#xff0c;用于上传、下载数据 数据连接的建立类型&#xff1a; 主动模式&#xff1a;服务端从 20 端口主动向客户端发起连接 被动模式&#xff1a;服务端在指定范围…

平台介绍-搭建赛事运营平台(3)

上文介绍了品牌隔离的基本原理&#xff0c;就是通过不同的前端和微服务来实现。但是确实很多功能是类似的&#xff0c;所以从编程角度还是有些管理手段的。 前端部分&#xff1a;前端部分没有什么特别手段&#xff0c;就是两个独立的项目工程&#xff0c;分别维护。相同的部分复…

神策数据参与制定首份 SDK 网络安全国家标准

国家市场监督管理总局、国家标准化管理委员会发布中华人民共和国国家标准公告&#xff08;2023 年第 13 号&#xff09;&#xff0c;全国信息安全标准化技术委员会归口的 3 项国家标准正式发布。其中&#xff0c;首份 SDK 国家标准《信息安全技术 移动互联网应用程序&#xff0…

2核4G服务器租用价格表,阿里云/腾讯云/华为云/京东云

当前最新2核4G云服务器多少钱&#xff1f;165元一年&#xff0c;30元3个月。阿里云2核4G服务器165元一年&#xff0c;30元3个月、腾讯云2核4G5M服务器165元一年、京东云2核4G云主机126元1年&#xff0c;华为云也提供2核4G配置云服务器。阿腾云atengyun.com整理2024年最新云服务…

【NLP笔记】预训练+Prompt Tuning新范式之LLM时代(GPT3...)

文章目录 概述GPT3 【参考链接】 一张图总结大语言模型的技术分类、现状和开源情况 大语言模型LLM微调技术&#xff1a;Prompt Tuning A Survey of Large Language ModelsThe Practical Guides for Large Language ModelsGPT3&#xff1a;Language Models are Few-Shot Learner…

行存储与列存储:大数据存储方案的选择与优缺点分析

随着大数据时代的来临&#xff0c;数据的规模和复杂性呈指数级增长&#xff0c;传统的关系数据库已经不再适应这一巨大的存储量和计算要求。在大数据存储领域&#xff0c;行存储和列存储成为两种备受关注的存储方案。本文将探讨行存储和列存储的定义、优缺点&#xff0c;并结合…

python pytz是什么

pytz模块常用于时区的转换&#xff0c;常常配合datetime一起使用。我们知道datetime除了data方法生成的时间是没有时区概念&#xff0c;其他如time、datetime等都是有时区概念&#xff0c;即指定了tzinfo信息。 >>> import datetime >>> datetime.datetime.n…

骗子查询系统源码

源码简介 小权云黑管理系统 V1.0 功能如下&#xff1a; 1.添加骗子&#xff0c;查询骗子 2.可添加团队后台方便审核用 3.在线反馈留言系统 4.前台提交骗子&#xff0c;后台需要审核才能过 5.后台使用光年UI界面 6.新增导航列表&#xff0c;可给网站添加导航友链 7.可添加云黑类…

C语言运算符和表达式——增1和减1运算符

目录 增1和减1运算符 一元运算符 前缀增1/减1运算符 后缀增1/减1运算符 前缀与后缀对变量和表达式的影响 稍微复杂一点的例子 增1和减1运算符的优缺点 增1和减1运算符 增1运算符&#xff08;Increment&#xff09; *使变量的值增加1个单位 减1运算符&#xff08;Decre…

量化交易软件开发定制的步骤

量化交易软件的定制开发是一个复杂而精细的过程&#xff0c;需要经过一系列步骤来确保最终交付的软件符合客户的需求并具有高度的可靠性和效率。以下是量化交易软件开发定制的主要步骤&#xff1a; 1. 需求分析与规划 在开始开发之前&#xff0c;首先需要与客户深入沟通&…

【使用matlab绘制音频数据的时域图和频域图】

使用matlab绘制音频数据的时域图和频域图 虚拟的数据集见附件 一、读取数据并设置参数 close all;clear all;colordef black 设置参数 filedir D:\Projects\MATLAB\data name 2024-03-28.txt % disp(filedir);Fs 8192; %采样率&#xff0c;即单位时间的样本个数&#xff…

电脑如何更新AMD独立显卡驱动?安装官方驱动的方法来了!

前言 有小伙伴在电脑上安装了独立显卡之后&#xff0c;总会用驱动人生或者驱动精灵等软件给独立显卡安装驱动。这种安装方法并不能说是错的&#xff0c;反正能用就行。 安装官方驱动的办法其实很简单&#xff0c;现在独立显卡一共就那么几家&#xff0c;最常见的显卡就是Nvidi…

Java基于微信小程序的校园订餐小程序的实现,附源码和数据库

博主介绍&#xff1a;✌Java徐师兄、7年大厂程序员经历。全网粉丝13w、csdn博客专家、掘金/华为云等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb; 不…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 3月29日,星期五

每天一分钟&#xff0c;知晓天下事&#xff01; 2024年3月29日 星期五 农历二月二十 1、 网络表演&#xff08;直播与短视频&#xff09;运营团体标准发布&#xff1a;应建立举报处置机制。 2、 商务部&#xff1a;中国决定终止对澳大利亚进口葡萄酒征收反倾销税和反补贴税。…

八大技术趋势案例(虚拟现实增强现实)

科技巨变,未来已来,八大技术趋势引领数字化时代。信息技术的迅猛发展,深刻改变了我们的生活、工作和生产方式。人工智能、物联网、云计算、大数据、虚拟现实、增强现实、区块链、量子计算等新兴技术在各行各业得到广泛应用,为各个领域带来了新的活力和变革。 为了更好地了解…