Clickhouse实战处理(一)集成引擎和Distributed引擎之集成Hive

news/2024/7/22 12:19:21/文章来源:https://blog.csdn.net/sheep8521/article/details/139088108

一、集成引擎

集成引擎集成第三方的存储和系统来读写数据,ClickHouse本身不存储数据。
集成引擎包含:

  1. Kafka
  2. MySQL
  3. ODBC
  4. JDBC
  5. HDF

Kafka引擎

1、Kafka引擎介绍

SELECT查询对于读取消息并不是很有用(除了调试),因为每个消息只能读取一次。
通常,将该引擎结合物化视图一起使用,使用方法:
(1)、使用Kafka引擎创建一个Kafka的消费者,并将其视为一个数据流。
(2)、创建所需结构的表。
(3)、创建一个物化视图,该视图转换来自引擎的数据并将其放入上一步创建的表中。
当物化视图添加至该引擎,它将会在后台收集数据。这就允许你从Kafka持续接收消息并使用
SELECT将数据转换为所需的格式。它们不直接从Kafka中读取数据,而是接收新记录,以block为
单位, 这样就可以写入具有不同详细信息级别的多个表(分组聚合或无聚合)中。
为了提高性能,接收到的消息将被分组为大小为max_insert_block_size的block(块)。如果block没
有在stream_flush_interval_ms时间内形成,则不管block的完整性如何,数据都将刷新到表中。
Kafka引擎
要停止接收topic数据或更改转换逻辑,需detach物化视图。
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
如果要使用ALTER更改目标表,建议禁用物化视图,以避免目标表和该视图中的数据之间出现差异。

2、kafka的参数和示例

Kafka引擎结合Kafka使用,可实现订阅或发布数据流。
指定表引擎:
ENGINE = Kafka()
SETTINGS
kafka_broker_list = ‘host:port’,
kafka_topic_list = ‘topic1,topic2,…’,
kafka_group_name = ‘group_name’,
kafka_format = ‘data_format’,
kafka_row_delimiter = ‘delimiter_symbol’,
kafka_schema = ‘’,
kafka_num_consumers = N,
kafka_skip_broken_messages = N

必选参数:
kafka_broker_list :以逗号分隔的brokers列表。
kafka_topic_list :以逗号分隔的kafka的topic列表。
kafka_group_name: Kafka消费组。
kafka_format :消息的格式,例如JSONEachRow。
可选参数:
kafka_row_delimiter :行之间的分隔符。
kafka_schema :按需定义schema,例如Cap'n Proto格式需指定。
kafka_num_consumers :消费者数量,默认1,最多不超过
Topic的分区数。
kafka_skip_broken_messages :每个block中,Kafka的消息解析器容忍schema不兼容消息的数量。默认值:0

创建Kafka引擎表示例
示例1:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’,
‘topic’, ‘group1’, ‘JSONEachRow’);

示例2:
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS
kafka_broker_list =
‘localhost:9092’,
kafka_topic_list =
‘topic’,
kafka_group_name =
‘group1’,
kafka_format =
‘JSONEachRow’,

示例3:
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic’, ‘group1’)
SETTINGS kafka_format =
‘JSONEachRow’,
kafka_num_consumers = 4;

3、Kafka的扩展配置
Kafka引擎支持使用ClickHouse配置文件扩展配置。用户可以使用两个配置key, 全局的kafka和topic级别的kafka_*。首先应用全局配置,然后应用topic级别的配置。

cgrp smallest 250 100000 有关可能的配置选项的列表,参见librdkafka配置,链接: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 。 ClickHouse配置中使用下划线(_)代替点,例如,check.crcs=true 将配置为 true

MySQL引擎

1、MySQL引擎介绍

MySQL引擎可实现对MySQL数据库的表执行插入和查询操作。ClickHouse表结构可以不同于原始的MySQL表结构。
列名应当与原始MySQL表中的列名相同,但可以按任意顺序使用其中的一些列。列的数据类型可能与原始的MySQL表中的列类型不同,ClickHouse尝试进行数据类型转换。

2、MySQL表引擎参数:

ENGINE = MySQL(‘host:port’, ‘database’, ‘table’, ‘user’, ‘password’[, replace_query,
‘on_duplicate_clause’]);
引擎参数:

  1. host:port :MySQL server的地址。
  2. database :MySQL数据库名称。
  3. table : MySQL表名。
  4. user : MySQL用户名。
  5. password : MySQL用户密码。
  6. replace_query :将INSERT INTO查询转换为REPLACE INTO查询的标识。如果replace_query=1,查询将被替换。
  7. on_duplicate_clause : 将on_duplicate_clause的表达式添加到INSERT查询中。

例如:插入1个数据: INSERT INTO t (c1,c2) VALUES (‘a’, 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,
on_duplicate_clause表达式为: UPDATE c2 = c2 + 1。
如果使用on_duplicate_clause,需设置参数replace_query=0。如果同时传递replace_query=1和on_duplicate_clause,ClickHouse将抛出异常。

MySQL引擎的样例

1、在MySQL创建表和插入数据
DROP TABLE test.test ;
create table test.test (id INT  NOT NULL AUTO_INCREMENT,cnt INT,PRIMARY KEY (id)
);insert into test.test (id, cnt) VALUES (1,2);
select * from test.test;
-----如果mysql插入1条数据。
INSERT INTO test (id,cnt) VALUES (2, 3) ON DUPLICATE KEY UPDATE cnt = cnt + 1;   
关于DUPLICATE KEY UPDATE c2 = c2 + 1,它是MySQL中INSERT INTO语句的一种变体。当使用INSERT INTO语句插入数据时,如果插入的数据与已有数据的主键或唯一索引重复,则会触发错误。但是使用DUPLICATE KEY UPDATE c2 = c2 + 1可以在重复时更新该行数据。2. 在ClickHouse中创建MySQL引擎的表DROP TABLE mysql_table_dup;
CREATE TABLE mysql_table_dup
(id Int32,cnt Int32
)
ENGINE = MySQL('127.0.0.1:3306', 'test', 'test', 'root', '123456', 0, 'UPDATE cnt=cnt+1'); 

JDBC引擎

1、JDBC引擎介绍

ClickHouse通过JDBC引擎连接到外部数据库, 如MySQL、Oracle、PostgreSQL等。
ClickHouse使用一个单独的项目clickhouse-jdbc-bridge, 其作为一个守护进程运行。记得要启动服务: java -jar ./clickhouse-jdbc-bridge-1.0.1.jar --driver-path /root/jdbc/lib/
其中/root/jdbc/lib为数据库驱动包所在的目录
clickhouse-jdbc-bridge项目链接:https://github.com/ClickHouse/clickhouse-jdbc-bridge

2、表引擎参数:

ENGINE = JDBC(dbms_uri, external_database, external_table)

  1. dbms_uri :
    外部DBMS的URI:jdbc:<driver_name>://<host_name>:/?user=&password=。
    例如:jdbc:mysql://localhost:3306/?user=root&password=root 。
  2. external_database : 外部DBMS的数据库。
  3. external_table : 外部数据库的表名

Hive引擎

Hive引擎允许对HDFS Hive表执行 SELECT 查询。

创建ck表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Hive('thrift://host:port', 'database', 'table');
PARTITION BY expr 
;thrift://host:port — Hive Metastore 地址
database — 远程数据库名.
table — 远程数据表名. 
  1. 列名应该与原来的Hive表相同,但你可以使用这些列中的一些,并以任何顺序,你也可以使用一些从其他列计算的别名列。
  2. 列类型与原Hive表的列类型保持一致。
  3. “Partition by expression”应与原Hive表保持一致,“Partition by expression”中的列应在表结构中。

二、Distributed引擎

Distributed引擎,也就是分布式引擎,本身不存储数据,但可以在多个服务器上进行分布式查询。读是自动并行的。读取时,远程服务器表的索引(如果存在)会被使用。

create table dis_table(id UInt16, name String)
Distributed(cluster_name, database, table, [sharding_key])参数解析:
cluster_name:服务器配置文件中的集群名,在/etc/metrika.xml中配置的。具体配置见前文。
database:数据库名。
table:表名。
sharding_key:数据分片键。 
  1. 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片
  2. 分布式表:分布式表只能使用Distribute表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表

有了分布式表之后,我们就可以向分布式表中插入数据,那么分布式表会根据配置的sharding_key将数据写入到不同的节点分片中。

分片键sharding_key:
分片键要求返回一个整形类型的取值,包括Int系列和UInt系列,有如下几种情况:Distributed(cluster,database,table,userid)   --可以是一个具体的整形列字段
Distributed(cluster,database,table,rand())   --可以按照随机数划分
Distributed(cluster,database,table,intHash64(userid))   --可以按照某个整形列进行散列值划分

注意:如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张本地表,否则,在写入数据时将会报错。如果分布式表只包含一个分片,也就失去了分布式的意义,所以虽然分片键是选填参数,但是通常都会按照业务规则进行设置。

2、Hive2CK(分布式查询)

1、创建local表和分布式表 :
create table adm.local表 on CLUSTER CK集群名称 (字段1           String,字段2           String,字段3           String,dt              int
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{分片变量参数}/adm/hive表名_local','{副本变量参数}')
order by (dept_id)
PARTITION BY (dt)
SETTINGS storage_policy = 'data_data1'
;create table adm.ck表名(一般和hive表名一样) on CLUSTER cluster_5shards_2replicas (字段1                 String,字段2               String,字段3             String,dt                      int
)
ENGINE = Distributed(CK集群名称,adm,local表,rand())
;

2、数据同步工具和conf配置文件

批量数据使用数据同步工具(datax、waterdrop)进行同步。
Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
1)使用 Waterdrop前请先准备好Spark和Java运行环境。java版本为jdk1.8。
2)一个完整的Waterdrop配置包含spark, input, filter, output,

spark是spark相关的配置,可配置的spark参数见: Spark Configuration, 其中master, deploy-mode两个参数不能在这里配置,需要在Waterdrop启动脚本中指定。
input可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
output可配置任意的output插件及其参数,具体参数随不同的output插件而变化。filter处理完的数据,会发送给output中配置的每个插件。

conf文件如下:

spark {spark.sql.catalogImplementation = "hive"spark.app.name = "hive2ck-"${schema}"."${table_name}"."${dt}spark.executor.instances = 2spark.executor.cores = 2spark.executor.memory = "10g"spark.yarn.maxAppAttempts = 1spark.dynamicAllocation.enabled = "true"spark.dynamicAllocation.maxExecutors = 10
}input {hive {pre_sql = """select 字段1,字段2,字段3,,,,,,from  adm.hive表名where dt='${dt}'----'"""${dt}"'"table_name = "hive表名"}
}filter {
convert {     --这里是要做转化的字段说明,CK不支持string。source_field = "dt",new_type = "integer"
}
}output {clickhouse {host = ${ck_host}":"${ck_http_port}database = ${schema}table = ${table_name}"_local"cluster = "CK集群名称"username = ${ck_user}password = ${ck_passwd}clickhouse.socket_timeout = 100000bulk_size = 500000}
}

3、运行脚本

# client 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/application.conf# cluster 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/application.conf# cluster 模式下按固定周期从Hive表同步到CK分布表,如果conf文件没有实现准备,也可通过命令执行原表和目标表。不推荐这种。
./waterdrop/bin/start-waterdrop.sh 
--master yarn --deploy-mode cluster \
--config  ${conf_file} \
-i schema=${schema} -i table_name=${table_name} -i month=${month}  -i dt=${dt}  -i hour=${hour} \
-i ck_host="${ck_host}" -i ck_http_port="${ck_http_port}" -i ck_user="${ck_user}" -i ck_passwd="${ck_passwd}"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1052823.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EpicDesigner编辑器的源码修改和扩展“数据库”进行数据库字段设置

项目介绍&#xff1a; EpicDesigner是一个功能强大、开箱即用的拖拽式低代码设计器。它基于 Vue3 开发&#xff0c;兼容多套 UI 组件库&#xff0c;除了基础的页面设计功能&#xff0c;EDesigner 还提供了强大的扩展功能&#xff0c;可以让开发者根据自己的需求自由扩展和定制…

颜色空间的选择

1.选择Gamma颜色空间&#xff0c;Web平台或者不支持线性空间&#xff0c;或者追求高饱和度的 2.选择Linear&#xff0c;追求真实光照和物理准确

亚信安慧AntDB数据库采集技术创新:ACC从Java到Go的转型之路

传统的指标采集方法通常使用一些命令行工具&#xff0c;如top、free等来获取系统的性能数据。然而&#xff0c;这种方法存在一些缺点。首先&#xff0c;这些命令行工具输出的数据格式通常是文本形式&#xff0c;需要进行解析和处理才能得到有用的信息&#xff0c;这增加了开发者…

Vulnhub - AI-WEB-1.0靶机教程

目录 站点信息收集 c段扫描 端口扫描 目录扫描 漏洞利用 使用 burp 抓包 查询数据库名 查询数据库下的表 查询表中的字段名 查询字段中的数据 --os-shell 上传一句话木马 下载地址&#xff1a;https://download.vulnhub.com/aiweb/AI-Web-1.0.7z 我们从站点信息收…

OpenHarmony 实战开发——ArkUI中的线程和看门狗机制

一、前言 本文主要分析ArkUI中涉及的线程和看门狗机制。 二、ArkUI中的线程 应用Ability首次创建界面的流程大致如下&#xff1a; 说明&#xff1a; • AceContainer是一个容器类&#xff0c;由前端、任务执行器、资源管理器、渲染管线、视图等聚合而成&#xff0c;提供了生…

nss做题

[NCTF 2018]签到题 1.f12在index.php中找到flag [NSSCTF 2022 Spring Recruit]ezgame 1.在js源码中就有flag [UUCTF 2022 新生赛]websign 1.打开环境后发现ctrlu和右键&#xff0c;f12都被禁用了。两种方法&#xff0c;第一种&#xff1a;禁用js&#xff1b;第二中提前打开…

亚马逊高效广告打法及数据优化,亚马逊高阶广告打法课

课程下载&#xff1a;https://download.csdn.net/download/m0_66047725/89342733 更多资源下载&#xff1a;关注我。 课程内容&#xff1a; 001.1-亚马逊的广告漏斗和A9算法的升级变化.mp4 002.2-流量入口解析和广告的曝光机制.mp4 003.3-标签理论 .mp4 004.4-不同广告类…

GoldenEye-v1(vulnhub)靶机练习实践报告

GoldenEye-v1****靶机练习实践报告 一、安装靶机 靶机是.ova文件&#xff0c;需要用VirtualBox打开&#xff0c;但我习惯于使用VMWare,因此修改靶机文件&#xff0c;使其适用于VMWare打开。 解压ova文件&#xff0c;得到.ovf文件和.vmdk文件。 用记事本打开.ovf文件并修改“…

预训练大模型

参考代码&#xff1a;https://github.com/LlamaFamily/Llama-Chinese

AI大模型在测试中的深度应用与实践案例

文章目录 1. 示例项目背景2. 环境准备3. 代码实现3.1. 自动生成测试用例3.2. 自动化测试脚本3.3. 性能测试3.4. 结果分析 4. 进一步深入4.1. 集成CI/CD管道4.1.1 Jenkins示例 4.2. 详细的负载测试和性能监控4.2.1 Locust示例 4.3. 测试结果分析与报告 5. 进一步集成和优化5.1. …

excel里如何将数据分组转置?

这个表格怎样转换为下表&#xff1f;按照国家来分组&#xff0c;把不同年份对应的不同序列值进行转置&#xff1f;&#xff1f; 这演示用数据透视表就完成这个数据转换。 1.创建数据透视表 选中数据中任意单元格&#xff0c;点击插入选项卡&#xff0c;数据透视表&#xff0c;…

【数学建模】碎纸片的拼接复原

2013高教社杯全国大学生数学建模竞赛B题 问题一模型一模型二条件设立思路 问题求解 问题一 已知 d i d_i di​为第 i i i张图片图片的像素矩阵 已知 d i d_i di​都是 n ∗ m n*m n∗m二维矩阵 假设有 N N N张图片 模型一 我们认为对应位置像素匹配为 d i [ j ] [ 1 ] d k…

clocking wizard IP核通过AXI4-Lite接口实现动态重新配置应用实例

在最近的FPGA应用中&#xff0c;应用到了基于Zynq 7000的Uart串口设计&#xff0c;为了让串口的时钟更精确&#xff0c;采用了外部时钟模式&#xff0c;如下图所示。外部时钟连接到了Clocking Wizard IP核的输出端。 在串口通信时&#xff0c;发现串口有错码出现。例如&#xf…

leetcode124 二叉树中的最大路径和-dp

题目 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root &…

C++模板方法模式

文章目录 1. 定义抽象基类&#xff08;Abstract Class&#xff09;2. 实现具体子类&#xff08;Concrete Class&#xff09;3. 使用模板方法模板方法模式的优点模板方法模式的应用场景注意事项实现示例抽象类&#xff08;模板&#xff09;具体实现类客户端代码 总结 模板方法模…

Elasticsearch (ES) (上万字)详细学习总结

一、认识ES 二、ES相关安装和部署(elasticsearch 、kbana、ik) 这部分的内容可以查看我之前写的Docker常用部署这篇文章 三、Mapping映射 3.1 Mapping映射属性 3.2 索引库操作 3.2.1 遵循Restful规范说明 3.2.2 具体使用方式说明 3.2.3增删改查示例 #创建 PUT /heima {&q…

【Java面试】四、MySQL篇(上)

文章目录 1、定位慢查询2、慢查询的原因分析3、索引3.1 数据结构选用&#xff1a;二叉树 & 红黑树3.2 数据结构选用&#xff1a;B树 4、聚簇索引、非聚簇索引、回表查询4.1 聚簇索引、非聚簇索引4.2 回表查询 5、覆盖索引、超大分页优化5.1 覆盖索引5.2 超大分页处理 6、索…

存储+调优:存储-memcached

存储调优&#xff1a;存储-memcached 什么是memcached? 高性能的分布式内存缓存服务器。通过缓存数据库的查询结果&#xff0c;减少数据库访问次数&#xff0c;以提高动态Web应用的速度、提高可扩展性。 在memcached中存什么&#xff1f; 尽快被保存 访问频率高 1.数据保…

【Unity】Unity项目转抖音小游戏(三)资源分包,抖音云CDN

业务需求&#xff0c;开始接触一下抖音小游戏相关的内容&#xff0c;开发过程中记录一下流程。 使用资源分包可以优化游戏启动速度&#xff0c;是抖音小游戏推荐的一种方式&#xff0c;抖音云也提供存放资源的CDN服务 抖音云官方文档&#xff1a;https://developer.open-douyi…

R可视化:另类的箱线图

介绍 方格状态的箱线图 加载R包 knitr::opts_chunk$set(echo TRUE, message FALSE, warning FALSE) library(patternplot) library(png) library(ggplot2) library(gridExtra)rm(list ls()) options(stringsAsFactors F)导入数据 data <- read.csv(system.file(&qu…