Flink SQL使用Catalog消费Kafka时,多个Source读取同一主题解决方案

news/2024/5/18 17:00:19/文章来源:https://blog.csdn.net/miaoge_miaoge/article/details/127325856

一、Catalog定义

        Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的

二、Catalog在Flink中实现讲解

2.1Catalog源码定义

        Catalog的定义在org.apache.flink.table.catalog.Catalog,可以看到它是一个接口,具体如下:

public interface Catalog {''''
}

        Catalog包含了主要的方法:包括获取数据库信、表、视图、表列信息、函数Function、分区Partition等信息的增删查改。

2.1Catalog继承关系

        下图为Catalog接口继承图
Catalog继承关系

        可以看到,Catalog的子类为AbstractCatalog,它有3个实现类,GenericInMemoryCatalog,HiveCatalog,AbstractJdbclog,也就是它可以把Catalog信息保存在内存、Hive、Jdbc(目前仅支持Postgre)中。默认的是保存在内存中。

三、Catalog信息保存在Hive测试

        要验证多个source读取同一张Kafka表如何处理,我们把Catalog信息保存在MySQL中进行测试。
首先是创建Catalog

3.1创建Hive Catalog

        启动SQL客户端

./sql-client.sh

        创建 Hive Catalog


启动Flink sqlclient$FLINK_HOME/bin/sql-client.sh创建CATALOG的语法为:create catalog hive_catalog with ('type' = 'hive','default-database' = 'catalogtest','hive-conf-dir' = '/opt/hive/apache-hive-3.1.2-bin/conf/'
);
其中hive-conf-dir为本地hive的配置文件路,即$HIVE_HOME/confUSE CATALOG hive_catalog ;

3.2 创建Kafka主题,并且往其中写数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
往Kafka队列中写入数据,格式为:string,int>abs,12
>saz,43
.......

3.3在Flink创建Kafka表

CREATE TABLE mykafka (name String, age int) with ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'test','connector.properties.bootstrap.servers' = 'localhost:9092','connector.properties.group.id' = 'testGroup','format.type' = 'csv','update-mode' = 'append'
);
其中testGroup为默认的groupid

四、测试数据

4.1 第一组消费者使用已经注册的表去消费

select * from mykafka;

4.2第二组消费者使用已经注册的表去消费

select *  from mykafka /*+ OPTIONS('connector.properties.group.id' = 'testGroup111') */;

        这里使用了hint语法,可以直接消费数据,如果找不到表,需要创建一下catalog(flink官网有默认的catalog设置)

4.3停止第二组消费者,继续往kafka中写数据,再重启第二组消费者

        首先暂停第二组消费者,然后往Kafka主题中发送数据,接着写入数据后,再重新消费,会观察到数据是从消费停止后开始消费的,这也就说明了这2组消费者是独立的。

五、结论

        Flink的Catalog支持存储在内存(默认)、Hive、JDBC(仅支持Postgre)中,如果注册完一张Kafka的表默认已经指定了groupId,如果要通过不同的groupId来消费数据,可以通过hint语法指定groupId消费
        除此之外,还可以通过修改源码的方式,在使用时指定groupId来读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_23416.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

apollo在虚拟机下部署遇到的坑

目录问题描述解决方法编译问题总结问题描述 ​   其实在虚拟机下部署apollo网上是有线程教程的。可以参考在虚拟机上安装运行百度Apollo 6.0,Apollo 6.0 安装完全指南。我依靠这两个指南准备部署的是apollo 7.0,事实证明虽然版本不同,但部…

1、6边距复合属性

提示:文章写完后,padding可以有到四个值。 1、语法: div{ padding:“50px”; padding:“5px 10px”; padding:“5px 10px 20px”; padding:“5…

flex竖排列元素排列方向

flex竖排列元素排列方向一、flex-direction: (元素排列方向) ※ flex-direction:row (横向从左到右排列==左对齐)※ flex-direction:row-reverse (与row 相反)※ flex-direction:column (从上往下排列==顶对齐)※ flex-direction:column-reverse (与column 相反) 二…

基于导频的信道估计实现

目录 零、前言 一、为什么要信道估计 二、导频的概念 (1)为什么要有导频 (2)导频在信道估计中作用 (3)关于导频序列的补充 三、最小二乘法估计 (1)LS信道估计算法分析 &…

24.登录form的显示

1.概括 本次博客的代码就不自己去写输入框了,直接去引用element官网中的就好。 具体如何实现可以去订阅Vue专栏中的最后两节课噢!!!!!!2.操作方法 打开element官网 https://element.eleme.cn/…

TRC丨艾美捷TRC 那非那韦亚砜说明书

艾美捷TRC Nelfinavir Sulfoxide 是 Nelfinavir Mesylate (N389750) 的杂质。Nelfinavir USP 相关化合物 A。 艾美捷TRC 那非那韦亚砜化学性质: 目录号N389770 化学名称那非那韦亚砜 同义词(3S,4aS,8aS)-N-(1,1-二甲基乙基)十氢-2-[(2R,3R)-2-羟基-3-[(3-羟基-2-…

合宙AIR32F103CBT6刷回CMSIS-DAP固件以及刷ST-LINK V2-1固件方法

合宙AIR32F103CBT6刷回CMSIS DAP固件以及刷ST-LINK V2-1固件方法📌官方介绍文档:https://wiki.luatos.com/chips/air32f103/index.html📍原理图:https://cdn.openluat-luatcommunity.openluat.com/attachment/20220605164915340_AIR32CBT6.pd…

从零备战蓝桥杯——动态规划(递推篇)

双非刷leetcode备战2023年蓝桥杯,qwq加油吧,无论结果如何总会有收获!一起加油,我是跟着英雄哥的那个思维导图刷leetcode的,大家也可以看看所有涉及到的题目用leetcode搜索就可以哦,因为避让添加外链,一起加…

简历石沉大海?来围观月薪 20k 的软件测试工程师真实简历...

​前言:面试的重要性 在互联网公司,你面试的时候能拿到多少 k 薪资,基本上决定了你未来 1-2 年的工资,这个非常现实。软件测试工程师在企业中俩内年想涨工资非常难的,就算有涨,涨幅也不大。当然不排除你待…

前置句与倒装句练习题

1. 特殊语序:前置 1.All the information you need I am putting in the post today. 2.Any item in our catelogue we can supply and deliver 3.How she got the gun through customs they never found out. 4.The kitchen we are planning to redecorate in the…

Day25Linux获取命令帮助,压缩与解压缩,vim编辑器使用,Linux系统下载软件,通过yum方式安装软件

命令字的帮助信息的查询 rm -fr fdisk -l ls ls -l ls -出现许多.开头的文件隐藏文件 Linux命令字格式 命令字 [选项] 命令字 [选项] 文件或目录 ls哪些选项? 1.如何查看一个命令字的帮助手册? man man ls 按q退出 ls -a显示隐藏文件 ls -l显示文件的详…

Chap4 循环结构 学习总结 第五小组

1、为什么需要循环?: 在 c语言中需要重复执行某些操作时,需要用到循环结构 2、循环的三个语句: for循环、while循环、do-while循环。 下列是while循环和for循环的流程图3、三种循环语句的表达式: (1)while(进入循环条件)循环体语句; (2)do {循环体语句;}while(进…

LVS负载均衡—DR模式

内容预知 1.DR模式的特点 2.LVS-DR中的ARP问题 2.1 问题一:VIP地址相同导致响应冲突 问题原因: 解决方法: 2.2 问题二:返回报文时源地址使用VIP,导致网关设备的ARP缓存表紊乱 问题原因: 解决方法&…

GitHub爆火,一份从零到1「架构师成长手册」,原来成为架构师也有捷径

架构师】我想应该没有哪个程序员会陌生了吧,作为一个程序员技术追求的里程碑,有多少程序员想转型架构师而不得门路,其实架构师比较抽象的拆解能力就两方面技术项目足够的技术栈深度和广度再加上足够的项目经验其实是完全可以驾驭架构师的岗位…

QFramework v1.0 使用指南 架构篇:05. 引入 Utility

05. 引入 Utility 在这一篇,我们来支持 CounterApp 的存储功能。 其代码也非常简单,只需要修改一部分 Model 的代码即可,如下: // 定义一个 Model 对象public class CounterAppModel : AbstractModel{private int mCount;public…

爬虫学习(01):了解爬虫超文本传输协议的理解

一、爬虫入门二、web请求过程(百度为例)2.1 页面渲染1. 服务器渲染 -> 数据直接在页面源代码里能搜到2. 前端JS渲染 -> 数据在页面源代码里搜不到三、浏览器工具的使用(重点)1. Elements2. Console3. Source4. Network四、超文本传输协议请求:响应:https协议加密方法(三种…

常见的网络安全风险有哪些?

常见的网络安全风险: 1、勒索软件 勒索软件(Ransomware,又称勒索病毒)是一种恶意软件,它的工作方式基本与计算机病毒类似,不过跟一般的计算机病毒不同,它们不会直接地破坏数据,而是将数据进行加密锁定&am…

搭建云上博客

安装apache: yum -y install httpd mod_ssl mod_perl mod_auth_mysql httpd -v systemctl start httpd.service Firefox ESR浏览器的址栏中,访问http://ECS公网地址。 安装MariaDB数据库: yum install -y mariadb-server systemctl start mariadb systemctl …

Day33、JavaScript

1、JavaScript 1.1、JavaScript组成 1.2、什么是ECMAScript 1)ECMAScript是一种语法标准 语法、变量和数据类型、运算符、逻辑控制语句、关键字、保留字、对象 2)编码遵循ECMAScript标准 1.3、什么是BOM 1)BOM:Browser Object Mod…

leetcode 474一和零

一和零 动态规划(01背包,三级数组) 和经典的背包问题只有一种容量不同,这道题有两种容量,即选取的字符串子集中的 0 和 1 的数量上限。 经典的背包问题可以使用二维动态规划求解,两个维度分别是物品和容量…