流数据湖平台Apache Paimon(二)集成 Flink 引擎

news/2024/4/29 23:27:29/文章来源:https://blog.csdn.net/xianyu120/article/details/132001087

文章目录

  • 第2章 集成 Flink 引擎
    • 2.1 环境准备
      • 2.1.1 安装 Flink
      • 2.1.2 上传 jar 包
      • 2.1.3 启动 Hadoop
      • 2.1.4 启动 sql-client
    • 2.2 Catalog
      • 2.2.1 文件系统
      • 2.2.2 Hive Catalog
      • 2.2.3 sql 初始化文件
    • 2.3 DDL
      • 2.3.1 建表
      • 2.3.2 修改表
    • 2.4 DML
      • 2.4.1 插入数据
      • 2.4.2 覆盖数据
      • 2.4.3 更新数据
      • 2.4.4 删除数据
      • 2.4.5 Merge Into
    • 2.5 DQL查询表
      • 2.5.1 批量查询
      • 2.5.2 流式查询
      • 2.5.3 查询优化
    • 2.6 系统表
      • 2.6.1 快照表 Snapshots Table
      • 2.6.2 模式表 Schemas Table
      • 2.6.3 选项表 Options Table
      • 2.6.4 审计日志表 Audit log Table
      • 2.6.5 文件表 Files Table
      • 2.6.6 标签表 Tags Table
    • 2.7 维表Join
    • 2.8 CDC集成
      • 2.8.1 MySQL
      • 2.8.2 Kafka
      • 2.8.3 支持的schema变更

第2章 集成 Flink 引擎

Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。

2.1 环境准备

环境准备

2.1.1 安装 Flink

1)上传并解压Flink安装包

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/

2)配置环境变量

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=hadoop classpath

source /etc/profile.d/my_env.sh

2.1.2 上传 jar 包

1)下载并上传Paimon的jar包

jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/

2)拷贝paimon的jar包到flink的lib目录下

cp paimon-flink-1.17-0.5-20230703.002437-67.jar /opt/module/flink-1.17.0/lib

2.1.3 启动 Hadoop

(略)

2.1.4 启动 sql-client

1)修改flink-conf.yaml配置

vim /opt/module/flink-1.16.0/conf/flink-conf.yaml

#解决中文乱码,1.17之前参数是env.java.opts

env.java.opts.all: -Dfile.encoding=UTF-8

classloader.check-leaked-classloader: false

taskmanager.numberOfTaskSlots: 4

execution.checkpointing.interval: 10s

state.backend: rocksdb

state.checkpoints.dir: hdfs://hadoop102:8020/ckps

state.backend.incremental: true

2)启动 Flink集群

(1)解决依赖问题

cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/

(2)这里以 Yarn-Session模式为例

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

3)启动Flink的sql-client

/opt/module/flink-1.17.0/bin/sql-client.sh -s yarn-session

img

4)设置结果显示模式

SET ‘sql-client.execution.result-mode’ = ‘tableau’;

2.2 Catalog

Paimon Catalog可以持久化元数据,当前支持两种类型的metastore:

文件系统(默认):将元数据和表文件存储在文件系统中。

hive:在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。

2.2.1 文件系统

CREATE CATALOG fs_catalog WITH (

‘type’ = ‘paimon’,

‘warehouse’ = ‘hdfs://hadoop102:8020/paimon/fs’

);

USE CATALOG fs_catalog;

2.2.2 Hive Catalog

通过使用Hive Catalog,对Catalog的更改将直接影响相应的hive metastore。在此类Catalog中创建的表也可以直接从 Hive 访问。

要使用 Hive Catalog,数据库名称、表名称和字段名称应小写。

1)上传 hive-connector

将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上川到Flink的lib目录下

2)重启yarn-session集群

3)启动hive的metastore服务

nohup hive --service metastore &

4)创建Hive Catalog

CREATE CATALOG hive_catalog WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://hadoop102:9083','hive-conf-dir' = '/opt/module/hive/conf','warehouse' = 'hdfs://hadoop102:8020/paimon/hive');

USE CATALOG hive_catalog;

5)注意事项

使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置

vim /opt/module/hive/conf/hive-site.xml;

  <property>​    <name>hive.metastore.disallow.incompatible.col.type.changes</name>​    <value>false</value></property>

上述配置需要在hive-site.xml中配置,且hive metastore服务需要重启。

如果使用的是 Hive3,请禁用 Hive ACID:

hive.strict.managed.tables=false

hive.create.as.insert.only=false

metastore.create.as.acid=false

2.2.3 sql 初始化文件

1)创建初始化sql文件

vim conf/sql-client-init.sql

CREATE CATALOG fs_catalog WITH ('type' = 'paimon','warehouse' = 'hdfs://hadoop102:8020/paimon/fs');CREATE CATALOG hive_catalog WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://hadoop102:9083','hive-conf-dir' = '/opt/module/hive/conf','warehouse' = 'hdfs://hadoop102:8020/paimon/hive');USE CATALOG hive_catalog;SET 'sql-client.execution.result-mode' = 'tableau';

2)启动sql-client时,指定该sql初始化文件

bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql

3)查看catalog

show catalogs;

show current catalog;

2.3 DDL

2.3.1 建表

2.3.1.1 管理表

在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。

1)创建表

CREATE TABLE test (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED);

2)创建分区表

CREATE TABLE test_p (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) PARTITIONED BY (dt, hh);

通过配置partition.expiration-time,可以自动删除过期的分区。

如果定义了主键,则分区字段必须是主键的子集。

可以定义以下三类字段为分区字段:

创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。

事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。

CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。

3)Create Table As

表可以通过查询的结果创建和填充,例如,我们有一个这样的sql: CREATE TABLE table_b AS SELECT id, name FORM table_a, 生成的表table_b将相当于创建表并插入数据以下语句:CREATE TABLE table_b(id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

使用CREATE TABLE AS SELECT时我们可以指定主键或分区。

CREATE TABLE test1(

user_id BIGINT,

item_id BIGINT

);

CREATE TABLE test2 AS SELECT * FROM test1;

– 指定分区

CREATE TABLE test2_p WITH (‘partition’ = ‘dt’) AS SELECT * FROM test_p;

– 指定配置

CREATE TABLE test3(

​ user_id BIGINT,

​ item_id BIGINT

) WITH (‘file.format’ = ‘orc’);

CREATE TABLE test3_op WITH (‘file.format’ = ‘parquet’) AS SELECT * FROM test3;

– 指定主键

CREATE TABLE test_pk WITH (‘primary-key’ = ‘dt,hh’) AS SELECT * FROM test;

– 指定主键和分区

CREATE TABLE test_all WITH (‘primary-key’ = ‘dt,hh’, ‘partition’ = ‘dt’) AS SELECT * FROM test_p;

4)Create Table Like

创建与另一个表具有相同schema、分区和表属性的表。

CREATE TABLE test_ctl LIKE test;

5)表属性

用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。有关此类属性的完整列表,请参阅配置: https://paimon.apache.org/docs/master/maintenance/configurations/。

CREATE TABLE tbl(user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ('bucket' = '2','bucket-key' = 'user_id');

2.3.1.2 外部表

外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。

Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。

CREATE TABLE ex (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) WITH ('connector' = 'paimon','path' = 'hdfs://hadoop102:8020/paimon/external/ex','auto-create' = 'true' );

2.3.1.3 临时表

仅 Flink 支持临时表。与外部表一样,临时表只是记录,但不由当前 Flink SQL 会话管理。如果临时表被删除,其资源将不会被删除。当 Flink SQL 会话关闭时,临时表也会被删除。与外部表的区别在于,临时表在Paimon Catalog中创建。

如果想将Paimon Catalog与其他表一起使用,但不想将它们存储在其他Catalog中,可以创建临时表。

USE CATALOG hive_catalog;CREATE TEMPORARY TABLE temp (k INT,v STRING) WITH ('connector' = 'filesystem','path' = 'hdfs://hadoop102:8020/temp.csv','format' = 'csv');

2.3.2 修改表

2.3.2.1 修改表

1)更改/添加表属性

ALTER TABLE test SET (

‘write-buffer-size’ = ‘256 MB’

);

2)重命名表名称

ALTER TABLE test1 RENAME TO test_new;

3)删除表属性

ALTER TABLE test RESET (‘write-buffer-size’);

2.3.2.2 修改列

1)添加新列

ALTER TABLE test ADD (c1 INT, c2 STRING);

2)重命名列名称

ALTER TABLE test RENAME c1 TO c0;

3)删除列

ALTER TABLE test DROP (c0, c2);

4)更改列的可为空性

CREATE TABLE test_null(

id INT PRIMARY KEY NOT ENFORCED,

coupon_info FLOAT NOT NULL

);

– 列coupon_info修改成允许为null

ALTER TABLE test_null MODIFY coupon_info FLOAT;

– 列coupon_info修改成不允许为null

– 如果表中已经有null值, 修改之前先设置如下参数删除null值

SET ‘table.exec.sink.not-null-enforcer’ = ‘DROP’;

ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;

5)更改列注释

ALTER TABLE test MODIFY user_id BIGINT COMMENT ‘user id’;

6)添加列位置

ALTER TABLE test ADD a INT FIRST;

ALTER TABLE test ADD b INT AFTER a;

7)更改列位置

ALTER TABLE test MODIFY b INT FIRST;

ALTER TABLE test MODIFY a INT AFTER user_id;

8)更改列类型

ALTER TABLE test MODIFY a DOUBLE;

2.3.2.3 修改水印

1)添加水印

CREATE TABLE test_wm (

id INT,

name STRING,

ts BIGINT

);

ALTER TABLE test_wm ADD(

et AS to_timestamp_ltz(ts,3),

WATERMARK FOR et AS et - INTERVAL ‘1’ SECOND

);

2)更改水印

ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL ‘2’ SECOND;

3)去掉水印

ALTER TABLE test_wm DROP WATERMARK;

2.4 DML

2.4.1 插入数据

INSERT 语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }

part_spec

可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date’2019-01-02’)。

语法: PARTITION (分区列名称 = 分区列值 [ , … ] )

column_list

可选,指定以逗号分隔的字段列表。

语法:(col_name1 [,column_name2, …])

所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与 VALUES 子句或查询中的数据大小完全相同。

value_expr

指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。

语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]

目前,Flink 不支持直接使用 NULL,因此需要将 NULL 转换为实际数据类型值,比如“CAST (NULL AS STRING)”

注意:将 Nullable 字段写入 Not-null 字段

不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理,比如A表的key1是not null,B表的key2是nullable:

INSERT INTO A key1 SELECT COALESCE(key2, ) FROM B

案例:

INSERT INTO test VALUES(1,1,‘order’,‘2023-07-01’,‘1’), (2,2,‘pay’,‘2023-07-01’,‘2’);

INSERT INTO test_p PARTITION(dt=‘2023-07-01’,hh=‘1’) VALUES(3,3, ‘pv’);

– 执行模式区分流、批

INSERT INTO test_p SELECT * from test;

Paimon支持在sink阶段通过partition和bucket对数据进行shuffle。

2.4.2 覆盖数据

覆盖数据只支持batch模式。默认情况下,流式读取将忽略 INSERT OVERWRITE 生成的提交。如果你想读取OVERWRITE的提交,你可以配置streaming-read-overwrite。

RESET ‘execution.checkpointing.interval’;

SET ‘execution.runtime-mode’ = ‘batch’;

1)覆盖未分区的表

INSERT OVERWRITE test VALUES(3,3,‘pay’,‘2023-07-01’,‘2’);

2)覆盖分区表

对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)。您可以配置动态分区覆盖来更改它。

INSERT OVERWRITE test_p SELECT * from test;

覆盖指定分区:

INSERT OVERWRITE test_p PARTITION (dt = ‘2023-07-01’, hh = ‘2’) SELECT user_id,item_id,behavior from test;

3)清空表

可以使用 INSERT OVERWRITE 通过插入空值来清除表(关闭动态分区覆盖)。

INSERT OVERWRITE test_p/*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */ SELECT * FROM test_p WHERE false;

2.4.3 更新数据

目前,Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新记录。您可以在Flink的批处理模式下执行UPDATE。

只有主键表支持此功能。不支持更新主键。

MergeEngine 需要deduplicate或partial-update才能支持此功能。(默认deduplicate)

UPDATE test SET item_id = 4, behavior = ‘pv’ WHERE user_id = 3;

2.4.4 删除数据

从表中删除(Flink 1.17):

只有写入模式设置为change-log的表支持此功能。(有主键默认就是change-log)

如果表有主键,MergeEngine需要为deduplicate。(默认deduplicate)

DELETE FROM test WHERE user_id = 3;

2.4.5 Merge Into

通过merge into实现行级更新,只有主键表支持此功能。该操作不会产生 UPDATE_BEFORE,因此不建议设置 ‘changelog-producer’ = ‘input’。

merge-into 操作使用“upsert”语义而不是“update”,这意味着如果该行存在,则执行更新,否则执行插入。

1)语法说明:

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \merge-into \--warehouse <warehouse-path> \--database <database-name> \--table <target-table> \[--target-as <target-table-alias>] \--source-table <source-table-name> \[--source-sql <sql> ...]\--on <merge-condition> \--merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \--matched-upsert-condition <matched-condition> \--matched-upsert-set <upsert-changes> \--matched-delete-condition <matched-condition> \--not-matched-insert-condition <not-matched-condition> \--not-matched-insert-values <insert-values> \--not-matched-by-source-upsert-condition <not-matched-by-source-condition> \--not-matched-by-source-upsert-set <not-matched-upsert-changes> \--not-matched-by-source-delete-condition <not-matched-by-source-condition> \[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]--source-sql <sql> 可以传递sql来配置环境并在运行时创建源表。

“match”的说明:

(1)matched:更改的行来自目标表,每个行都可以根据条件匹配源表行(source ∩ target):

合并条件(–on)

匹配条件(–matched-xxx-condition)

(2)not-matched:更改的行来自源表,并且根据条件所有行都不能与任何目标表的行匹配(source – target):

合并条件(–on)

不匹配条件(–not-matched-xxx-condition):不能使用目标表的列来构造条件表达式。

(3)not-matched-by-source:更改的行来自目标表,并且基于条件所有行都不能与任何源表的行匹配(target – source):

合并条件(–on)

源不匹配条件(–not-matched-by-source-xxx-condition):不能使用源表的列来构造条件表达式。

2)案例实操

需要用到paimon-flink-action-xxxx.jar,上传:

cp paimon-flink-action-0.5-20230703.002437-53.jar /opt/module/flink-1.17.0/opt

下载地址:

https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/0.5-SNAPSHOT/

(1)准备测试表:

use catalog hive_catalog;create database test;use test;CREATE TABLE ws1 (id INT,ts BIGINT,vc INT,PRIMARY KEY (id) NOT ENFORCED);INSERT INTO ws1 VALUES(1,1,1),(2,2,2),(3,3,3);CREATE TABLE ws_t (id INT,ts BIGINT,vc INT,PRIMARY KEY (id) NOT ENFORCED);INSERT INTO ws_t VALUES(2,2,2),(3,3,3),(4,4,4),(5,5,5);

(2)案例一: ws_t与ws1匹配id,将ws_t中ts>2的vc改为10,ts<=2的删除

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions matched-upsert,matched-delete \

–matched-upsert-condition “ws_t.ts > 2” \

–matched-upsert-set “vc = 10” \

–matched-delete-condition “ws_t.ts <= 2”

(3)案例二: ws_t与ws1匹配id,匹配上的将ws_t中vc加10,ws1中没匹配上的插入ws_t中

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions matched-upsert,not-matched-insert \

–matched-upsert-set “vc = ws_t.vc + 10” \

–not-matched-insert-values “*”

(4)案例三: ws_t与ws1匹配id,ws_t中没匹配上的,ts大于4则vc加20,ts=4则删除

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-table test.ws1 \

–on “ws_t.id = ws1.id” \

–merge-actions not-matched-by-source-upsert,not-matched-by-source-delete \

–not-matched-by-source-upsert-condition “ws_t.ts > 4” \

–not-matched-by-source-upsert-set “vc = ws_t.vc + 20” \

–not-matched-by-source-delete-condition " ws_t.ts = 4"

(5)案例四: 使用–source-sql创建新catalog下的源表,匹配ws_t的id,没匹配上的插入ws_t

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

merge-into \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table ws_t \

–source-sql “CREATE CATALOG fs2 WITH (‘type’ = ‘paimon’,‘warehouse’ = ‘hdfs://hadoop102:8020/paimon/fs2’)” \

–source-sql “CREATE DATABASE IF NOT EXISTS fs2.test” \

–source-sql “CREATE TEMPORARY VIEW fs2.test.ws2 AS SELECT id+10 as id,ts,vc FROM test.ws1” \

–source-table fs2.test.ws2 \

–on “ws_t.id = ws2. id” \

–merge-actions not-matched-insert\

–not-matched-insert-values “*”

2.5 DQL查询表

2.5.1 批量查询

就像所有其他表一样,Paimon 表可以使用 SELECT 语句进行查询。

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。

在sql-client中,设置执行模式为批即可:

RESET ‘execution.checkpointing.interval’;

SET ‘execution.runtime-mode’ = ‘batch’;

2.5.1.1 时间旅行

1)读取指定id的快照

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘2’) */;

2)读取指定时间戳的快照

– 查看快照信息

SELECT * FROM ws_t&snapshots;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)读取指定标签

SELECT * FROM ws_t /*+ OPTIONS(‘scan.tag-name’ = ‘my-tag’) */;

2.5.1.2 增量查询

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:

SELECT * FROM ws_t /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:

SELECT * FROM ws_t$audit_log /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

2.5.2 流式查询

默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。

SET ‘execution.checkpointing.interval’=‘30s’;

SET ‘execution.runtime-mode’ = ‘streaming’;

也可以从最新读取,设置扫描模式:

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’ = ‘latest’) */

2.5.2.1 时间旅行

如果只想处理今天及以后的数据,则可以使用分区过滤器来实现:

SELECT * FROM test_p WHERE dt > ‘2023-07-01’

如果不是分区表,或者无法按分区筛选,可以使用时间旅行的流读取。

1)从指定快照id开始读取变更数据

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

2)从指定时间戳开始读取

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)第一次启动时读取指定快照数据,并继续读取变化

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’=‘from-snapshot-full’,‘scan.snapshot-id’ = ‘3’) */;

2.5.2.2 Consumer ID

1)优点

在流式读取表时指定consumer-id,这是一个实验性功能。

当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:

当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。

在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。

当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着您可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。

2)案例演示

指定consumer-id开始流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

停掉原先的流式查询,插入数据:

insert into ws_t values(6,6,6);

再次指定consumer-id流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

2.5.3 查询优化

强烈建议在查询时指定分区和主键过滤器,这将加快查询的数据跳过速度。

可以加速数据跳跃的过滤函数有:

=

<

<=

=

IN (…)

LIKE ‘abc%’

IS NULL

Paimon会按主键对数据进行排序,从而加快点查询和范围查询的速度。使用复合主键时,查询过滤器最好形成主键的最左边前缀,以获得良好的加速效果。

CREATE TABLE orders (

catalog_id BIGINT,

order_id BIGINT,

…,

PRIMARY KEY (catalog_id, order_id) NOT ENFORCED – composite primary key

)

通过为主键最左边的前缀指定范围过滤器,查询获得了很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders

WHERE catalog_id=1025jkjkjk

AND order_id>2035 AND order_id<6000;

下面例子的过滤器不能很好地加速查询:

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

2.6 系统表

系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

2.6.1 快照表 Snapshots Table

通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。

SELECT * FROM ws_t$snapshots;

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

2.6.2 模式表 Schemas Table

通过schemas表可以查询该表的历史schema。

SELECT * FROM ws_t$schemas;

可以连接快照表和模式表以获取给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields

FROM ws_t s n a p s h o t s s J O I N w s t snapshots s JOIN ws_t snapshotssJOINwstschemas t

ON s.schema_id=t.schema_id where s.snapshot_id=3;

2.6.3 选项表 Options Table

可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值。

SELECT * FROM ws_t$options;

2.6.4 审计日志表 Audit log Table

如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。

rowkind 有四个值:

+I:插入操作。

-U:使用更新行的先前内容进行更新操作。

+U:使用更新行的新内容进行更新操作。

-D:删除操作。

SELECT * FROM ws_t$audit_log;

2.6.5 文件表 Files Table

可以查询特定快照表的文件。

– 查询最新快照的文件

SELECT * FROM ws_t$files;

– 查询指定快照的文件

SELECT * FROM ws_t$files /*+ OPTIONS(‘scan.snapshot-id’=‘1’) */;

2.6.6 标签表 Tags Table

通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。

SELECT * FROM ws_t$tags;

2.7 维表Join

Paimon支持Lookup Join语法,它用于从 Paimon 查询的数据来补充维度字段。要求一个表具有处理时间属性,而另一个表由查找源连接器支持。

Paimon 支持 Flink 中具有主键的表和append-only的表查找联接。以下示例说明了此功能。

USE CATALOG fs_catalog;

CREATE TABLE customers (

id INT PRIMARY KEY NOT ENFORCED,

name STRING,

country STRING,

zip STRING

);

INSERT INTO customers VALUES(1,‘zs’,‘ch’,‘123’),(2,‘ls’,‘ch’,‘456’), (3,‘ww’,‘ch’,‘789’);

CREATE TEMPORARY TABLE Orders (

order_id INT,

total INT,

customer_id INT,

proc_time AS PROCTIME()

) WITH (

‘connector’ = ‘datagen’,

‘rows-per-second’=‘1’,

‘fields.order_id.kind’=‘sequence’,

‘fields.order_id.start’=‘1’,

‘fields.order_id.end’=‘1000000’,

‘fields.total.kind’=‘random’,

‘fields.total.min’=‘1’,

‘fields.total.max’=‘1000’,

‘fields.customer_id.kind’=‘random’,

‘fields.customer_id.min’=‘1’,

‘fields.customer_id.max’=‘3’

);

SELECT o.order_id, o.total, c.country, c.zip

FROM Orders AS o

JOIN customers

FOR SYSTEM_TIME AS OF o.proc_time AS c

ON o.customer_id = c.id;

Lookup Join算子会在本地维护一个RocksDB缓存并实时拉取表的最新更新。查找连接运算符只会提取必要的数据,因此您的过滤条件对于性能非常重要。

如果Orders(主表)的记录Join缺失,因为customers(查找表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。

2.8 CDC集成

Paimon 支持多种通过模式演化将数据提取到 Paimon 表中的方法。这意味着添加的列会实时同步到Paimon表中,并且不会为此重新启动同步作业。

目前支持以下同步方式:

MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。

MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库中。

API同步表:将您的自定义DataStream输入同步到一张Paimon表中。

Kafka同步表:将一个Kafka topic的表同步到一张Paimon表中。

Kafka同步数据库:将一个包含多表的Kafka主题或多个各包含一表的主题同步到一个Paimon数据库中。

2.8.1 MySQL

添加Flink CDC 连接器。

cp flink-sql-connector-mysql-cdc-2.4.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.1.1 同步表

1)语法说明

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-table

–warehouse \

–database \

–table \

[–partition-keys ] \

[–primary-keys ] \

[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \

[–mysql-conf [–mysql-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明:

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–tablePaimon 表名称。
–partition-keysPaimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keysPaimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column计算列的定义。参数字段来自 MySQL 表字段名称。
–mysql-confFlink CDC MySQL 源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果指定的 Paimon 表不存在,此操作将自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

(1)MySQL一张表同步到Paimon一张表

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-table \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table order_info_cdc \

–primary-keys id \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–mysql-conf table-name=‘order_info’ \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

(2)MySQL多张表同步到Paimon一张表

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-table \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table order_cdc \

–primary-keys id \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–mysql-conf table-name=‘order_.*’ \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

2.8.1.2 同步数据库

1)语法说明

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database

–warehouse \

–database \

[–ignore-incompatible <true/false>] \

[–table-prefix ] \

[–table-suffix ] \

[–including-tables <mysql-table-name|name-regular-expr>] \

[–excluding-tables <mysql-table-name|name-regular-expr>] \

[–mysql-conf [–mysql-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明:

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–ignore-incompatible默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables用于指定要同步哪些源表。您必须使用“|”分隔多个表,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–mysql-confFlink CDC MySQL源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

只有具有主键的表才会被同步。

对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

mysql-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “ods_” \

–table-suffix “_cdc” \

–mysql-conf hostname=hadoop102 \

–mysql-conf username=root \

–mysql-conf password=000000 \

–mysql-conf database-name=gmall \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4 \

–including-tables ‘user_info|order_info|activity_rule’

3)同步数据库下新添加的表

首先假设 Flink 作业正在同步数据库 source_db 下的表 [product、user、address]。提交作业的命令如下所示:

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database \

–warehouse hdfs:///path/to/warehouse \

–database test_db \

–mysql-conf hostname=127.0.0.1 \

–mysql-conf username=root \

–mysql-conf password=123456 \

–mysql-conf database-name=source_db \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hive-metastore:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4 \

–including-tables ‘product|user|address’

稍后,我们希望作业也同步包含历史数据的表 [order, custom]。我们可以通过从作业的先前快照中恢复并从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后自动从之前的位置继续读取变更日志。

从以前的快照恢复并添加新表进行同步的命令如下所示:

<FLINK_HOME>/bin/flink run \

–fromSavepoint savepointPath \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

mysql-sync-database \

–warehouse hdfs:///path/to/warehouse \

–database test_db \

–mysql-conf hostname=127.0.0.1 \

–mysql-conf username=root \

–mysql-conf password=123456 \

–mysql-conf database-name=source_db \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hive-metastore:9083 \

–table-conf bucket=4 \

–including-tables ‘product|user|address|order|custom’

2.8.2 Kafka

Flink 提供了几种 Kafka CDC 格式:canal-json、debezium-json、ogg-json、maxwell-json。如果 Kafka 主题中的消息是使用更改数据捕获 (CDC) 工具从另一个数据库捕获的更改事件,则您可以使用 Paimon Kafka CDC。将解析后的INSERT、UPDATE、DELETE消息写入到paimon表中。Paimon官网列出支持的格式如下:

img

添加Kafka连接器:

cp flink-sql-connector-kafka-1.17.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.2.1 同步表

1)语法说明

将 Kafka 的一个主题中的一张或多张表同步到一张 Paimon 表中。

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

kafka-sync-table

–warehouse \

–database \

–table \

[–partition-keys ] \

[–primary-keys ] \

[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \

[–kafka-conf [–kafka-conf …]] \

[–catalog-conf [–catalog-conf …]] \

[–table-conf [–table-conf …]]

参数说明

配置描述
–warehousePaimon仓库路径。
–databasePaimon Catalog中的数据库名称。
–tablePaimon 表名称。
–partition-keysPaimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keysPaimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column计算列的定义。参数字段来自 Kafka 主题的表字段名称。
–kafka-confFlink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。
–catalog-confPaimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-confPaimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果您指定的 Paimon 表不存在,此操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info单表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal

#插入数据如下:

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}

{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}

{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}

{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}

{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含单表数据)同步到 Paimon表

bin/flink run \/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \kafka-sync-table \--warehouse hdfs://hadoop102:8020/paimon/hive \--database test \--table kafka_user_info_cdc \--primary-keys id \--kafka-conf properties.bootstrap.servers=hadoop102:9092 \--kafka-conf topic=paimon_canal \--kafka-conf properties.group.id=atguigu \--kafka-conf scan.startup.mode=earliest-offset \--kafka-conf value.format=canal-json \--catalog-conf metastore=hive \--catalog-conf uri=thrift://hadoop102:9083 \--table-conf bucket=4 \--table-conf changelog-producer=input \--table-conf sink.parallelism=4

2.8.2.2 同步数据库

1)语法说明

将多个主题或一个主题同步到一个 Paimon 数据库中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \kafka-sync-database--warehouse <warehouse-path> \--database <database-name> \[--schema-init-max-read <int>] \[--ignore-incompatible <true/false>] \[--table-prefix <paimon-table-prefix>] \[--table-suffix <paimon-table-suffix>] \[--including-tables <table-name|name-regular-expr>] \[--excluding-tables <table-name|name-regular-expr>] \[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

参数说明:

配置描述
–warehouseThe path to Paimon warehouse.通往派蒙仓库的道路。
–databasePaimon 目录中的数据库名称。
–schema-init-max-read如果您的表全部来自某个Topic,您可以设置该参数来初始化需要同步的表数量。默认值为 1000。
–ignore-incompatible默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables用于指定要同步哪些源表。您必须使用“|”分隔多个表。因为“|”为特殊字符,需要逗号,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–kafka-confFlink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。有关完整配置列表,请参阅其文档。
–catalog-confPaimon 目录的配置。每个配置都应以“key=value”的格式指定。请参阅此处以获取目录配置的完整列表。
–table-confPaimon 餐桌水槽的配置。每个配置都应以“key=value”的格式指定。请参阅此处了解表配置的完整列表。

只有具有主键的表才会被同步。

对于每个要同步的Kafka主题的表,如果对应的Paimon表不存在,该操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info和spu_info多表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_2

#插入数据如下(注意不要有空行):

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}

{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}

{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}

{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}

{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}

{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}

{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}

{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}

{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

再准备一个只包含spu_info单表数据的Topic:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_1

#插入数据如下:

{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}

{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}

{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}

{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含多表数据)同步到 Paimon 数据库

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

kafka-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “t1_” \

–table-suffix “_cdc” \

–schema-init-max-read 500 \

–kafka-conf properties.bootstrap.servers=hadoop102:9092 \

–kafka-conf topic=paimon_canal_2 \

–kafka-conf properties.group.id=atguigu \

–kafka-conf scan.startup.mode=earliest-offset \

–kafka-conf value.format=canal-json \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

从多个 Kafka 主题同步到 Paimon 数据库

bin/flink run \

/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \

kafka-sync-database \

–warehouse hdfs://hadoop102:8020/paimon/hive \

–database test \

–table-prefix “t2_” \

–table-suffix “_cdc” \

–kafka-conf properties.bootstrap.servers=hadoop102:9092 \

–kafka-conf topic=“paimon_canal;paimon_canal_1” \

–kafka-conf properties.group.id=atguigu \

–kafka-conf scan.startup.mode=earliest-offset \

–kafka-conf value.format=canal-json \

–catalog-conf metastore=hive \

–catalog-conf uri=thrift://hadoop102:9083 \

–table-conf bucket=4 \

–table-conf changelog-producer=input \

–table-conf sink.parallelism=4

2.8.3 支持的schema变更

cdc 集成支持有限的schema变更。目前,框架无法删除列,因此 DROP 的行为将被忽略,RENAME 将添加新列。当前支持的架构更改包括:

(1)添加列。

(2)更改列类型:

从字符串类型(char、varchar、text)更改为长度更长的另一种字符串类型,

从二进制类型(binary、varbinary、blob)更改为长度更长的另一种二进制类型,

从整数类型(tinyint、smallint、int、bigint)更改为范围更广的另一种整数类型,

从浮点类型(float、double)更改为范围更宽的另一种浮点类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_337286.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP使用Redis实战实录1:宝塔环境搭建、6379端口配置、Redis服务启动失败解决方案

宝塔环境搭建、6379端口配置、Redis服务启动失败解决方案 前言一、Redis安装部署1.安装Redis2.php安装Redis扩展3.启动Redis 二、避坑指南1.6379端口配置2.Redis服务启动&#xff08;1&#xff09;Redis服务启动失败&#xff08;2&#xff09;Redis启动日志排查&#xff08;3&a…

JS-----数据结构与算法(2)

目录 三. 栈结构 1.认识栈结构 2. 封装栈结构 3. 应用 3-1 十进制转二进制 3-2 进制转换法 四. 队列 1.队列是什么&#xff1f; 2.队列的封装 3. 队列的应用-击鼓传花 4. 双端队列 5.判断是否为回文 三. 栈结构 1.认识栈结构 栈&#xff08;stack&#xff09;又…

7.29训练总结

CodeForces - 1609E 这种使得整个串不包含子串’abc’的题目&#xff0c;发现可以用线段树维护 #include<bits/stdc.h> using namespace std; const int maxn1e55; #define lson now<<1 #define rson now<<1|1 struct seg {int a,b,c;int ab,bc,abc; }tr[m…

2023 年还推荐报计算机专业吗?

计算机科学是一个很好的专业&#xff0c;因为它由各种课程组成&#xff0c;为学生在成熟和新兴专业就业做好准备。以下是一些通常属于计算机科学专业的课程&#xff1a; 基本编程介绍了用于构建和维护数字架构和基础设施的编程语言和标准。 微积分为制定高级计算和设计概念提供…

eclipse 最新版没有navigator视图如何解决

使用project exploere视图可以显示类似navigator视图 1.显示project exploere视图 window---->show view --->project exploere 2.project exploere视图转换为类似navigator视图 第一步&#xff1a;点击视图右上角三个点或者倒三角&#xff0c;点击fiters and custom…

蓝图节点编辑器

打印字符串 第02章 蓝图结构 03 -注释和重新路由_哔哩哔哩_bilibili 第02章 蓝图结构 04 - 变量_哔哩哔哩_bilibili 第03章 蓝图简易门 01 - 箱子碰撞_哔哩哔哩_bilibili 第03章 蓝图简易门 02 - 静态Mesh和箭头_哔哩哔哩_bilibili 第03章 蓝图简易门 03 - 设置相对旋转节点_哔…

rocketmq rsqldb 简单记录

GitHub 地址 https://github.com/alibaba/rsqldb/tree/main&#xff0c;是和目前stream sql化看齐的Rocketmq的sql&#xff0c;类似还有kafka的sqlDB 和flink sql。 目前版本0.2 &#xff0c;主要提供rest模式调用&#xff0c;controller类为public class RsqlController支持的…

6G内存运行Llama2-Chinese-7B-chat模型

6G内存运行Llama2-Chinese-7B-chat模型 Llama2-Chinese中文社区 第一步&#xff1a; 从huggingface下载 Llama2-Chinese-7b-Chat-GGML模型放到本地的某一目录。 第二步&#xff1a; 执行python程序 git clone https://github.com/Rayrtfr/llama2-webui.gitcd llama2-web…

儿童居家健身好伙伴,小莫计数摸高训练器

现在的孩子们的越来越不喜欢运动了&#xff0c;总是爱玩手机游戏&#xff0c;对他们的身体健康非常不好&#xff0c;作为家长&#xff0c;我们希望能够给孩子提供更多的运动机会&#xff0c;有必要每天准备一些能让他们活动活动手脚的小游戏&#xff0c;让他们每天有足够的运动…

Pytorch个人学习记录总结 10

目录 优化器 优化器 官方文档地址&#xff1a;torch.optimhttps://pytorch.org/docs/stable/optim.html Debug过程中查看的grad所在的位置&#xff1a; model --> Protected Atributes --> _modules --> ‘model’ --> Protected Atributes --> _modules -…

【matlab】机器人工具箱快速上手-动力学仿真(代码直接复制可用)

动力学代码&#xff0c;按需修改参数 各关节力矩-关节变量的关系曲线&#xff1a; %%%%%%%%SCARA机器人仿真模型 l[0.457 0.325]; L(1) Link(d,0,a,l(1),alpha,0,standard,qlim,[-130 130]*pi/180);%连杆1 L(2)Link(d,0,a,l(2),alpha,pi,standard,qlim,[-145 145]*pi/180);%连…

小学期笔记——天天酷跑1

文件快照&#xff08;File snapshot&#xff09;通常是指对文件系统中某个特定时间点的文件或文件夹的快照或副本。它记录了文件或文件夹在某一时刻的状态&#xff0c;包括文件的内容、属性、权限、位置等信息。 文件快照通常用于数据备份、恢复和版本控制等目的。通过捕捉文件…

关于c++中虚函数和虚函数表的创建时机问题

以这段代码为例。 #include <iostream>using namespace std;class Parent { public:Parent(){}virtual void func1() {};virtual void func2() {}; };class Child :public Parent { public:Child():n(0),Parent(){cout << "Child()" << endl;}vir…

【网络原理】 (1) (应用层 传输层 UDP协议 TCP协议 TCP协议段格式 TCP内部工作机制 确认应答 超时重传 连接管理)

文章目录 应用层传输层UDP协议TCP协议TCP协议段格式TCP内部工作机制确认应答超时重传 网络原理部分我们主要学习TCP/IP协议栈这里的关键协议(TCP 和 IP),按照四层分别介绍.(物理层,我们不涉及). 应用层 我们需要学会自定义一个应用层协议. 自定义协议的原因? 当前的软件(应用…

轮趣科技教育版ros小车键盘控制运动

我之前买的ros小车是单独买的底板&#xff0c;以为随便一个树莓派就可以&#xff0c;因为我以前有一个树莓派3B&#xff0c;后来买了单独的小车之后&#xff0c;发现只能使用树莓派4B&#xff0c;然后又单独买了一个树莓派4B&#xff0c;给装上镜像&#xff0c;安装ros-melodic…

基于因果关系知识库的因果事件图谱构建、文本预处理、因果事件抽取、事件融合等

项目设计集合&#xff08;人工智能方向&#xff09;&#xff1a;助力新人快速实战掌握技能、自主完成项目设计升级&#xff0c;提升自身的硬实力&#xff08;不仅限NLP、知识图谱、计算机视觉等领域&#xff09;&#xff1a;汇总有意义的项目设计集合&#xff0c;助力新人快速实…

IntersectionObserver实现小程序长列表优化

IntersectionObserver实现小程序长列表优化 关于 IntersectionObserver 思路 这里以一屏数据为单位【一个分页的10条数据&#xff0c;最好大于视口高度】&#xff0c; 监听每一屏数据和视口的相交比例&#xff0c;即用户能不能看到它 只将可视范围的数据渲染到页面上&#x…

基于注解的 SpringMVC

SpringMVC SpringMVC使用SpringMVC的两个配置EnableWebMVC 和 ACWACSpringMVC执行流程接收请求参数Postman 发包工具&#xff08;&#xff09;get 请求---简单类型数据&#xff08;基本数据类型和String&#xff09;get 请求---对象类型数据get 请求---数组类型get 请求 --- 集…

Codeforces Round 886 (Div. 4)F题解

文章目录 [We Were Both Children](https://codeforces.com/contest/1850/problem/F)问题建模问题分析1.分析到达的点与跳跃距离的关系2.方法1倍数法累计每个点所能达到的青蛙数代码 方法2试除法累计每个点能到达的青蛙数代码 We Were Both Children 问题建模 给定n个青蛙每次…

自动驾驶感知系统--惯性导航定位系统

惯性导航定位 惯性是所有质量体本身的基本属性&#xff0c;所以建立在牛顿定律基础上的惯性导航系统&#xff08;Inertial Navigation System,INS&#xff09;(简称惯导系统)不与外界发生任何光电联系&#xff0c;仅靠系统本身就能对车辆进行连续的三维定位和三维定向。卫星导…