【实时数仓】在Hbase建立维度表、保存维度数据到Hbase、保存业务数据到kafka主题

news/2024/5/8 11:38:20/文章来源:https://blog.csdn.net/weixin_43923463/article/details/128226822

文章目录

  • 一 分流Sink之建立维度表到HBase(Phoenix)
    • 1 拼接建表语句
      • (1)定义配置常量类
      • (2)引入依赖
      • (3)hbase-site.xml
      • (4)在phoenix中执行
      • (5)增加代码
        • a TableProcessFunction
        • b checkTable
      • (6)测试
    • 2 过滤字段
      • (1)代码编写
      • (2)测试
      • (3)总结
  • 二 分流Sink之保存维度数据到HBase(Phoenix)
    • 1 程序执行流程
    • 2 DimSink
    • 3 BaseDBApp
    • 4 测试
  • 三 分流Sink之保存业务数据到Kafka主题
    • 1 BaseDBApp
    • 2 MyKafkaUtil
    • 3 测试
    • 4 总结
  • 四 总结
  • 五 附录:完整代码
    • 0 BaseDBApp
    • 1 MyKafkaUtil
    • 2 GmallConfig
    • 3 TableProcess
    • 4 MyDeserializationSchemaFunction
    • 5 TableProcessFunction
    • 6 DimSink

一 分流Sink之建立维度表到HBase(Phoenix)

1 拼接建表语句

如果读取到的配置信息是维度数据,提前在hbase中通过Phoenix创建维度表。

(1)定义配置常量类

定义一个项目中常用的配置常量类GmallConfig。

package com.hzy.gmall.realtime.common;/*** 实时数仓中的常量类*/
public class GmallConfig {public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

(2)引入依赖

<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,可以很方便的对bean对象的属性进行操作-->
<dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.9.3</version>
</dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-spark</artifactId><version>5.0.0-HBase-2.0</version><exclusions><exclusion><groupId>org.glassfish</groupId><artifactId>javax.el</artifactId></exclusion></exclusions>
</dependency>

(3)hbase-site.xml

因为要用单独的schema,所以在Idea程序中加入hbase-site.xml。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>hbase.rootdir</name><value>hdfs://hadoop101:8020/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><property><name>hbase.zookeeper.quorum</name><value>hadoop101,hadoop102,hadoop103</value></property><property><name>hbase.unsafe.stream.capability.enforce</name><value>false</value></property><property><name>hbase.wal.provider</name><value>filesystem</value></property><property><name>phoenix.schema.isNamespaceMappingEnabled</name><value>true</value></property><property><name>phoenix.schema.mapSystemTablesToNamespace</name><value>true</value></property></configuration>

注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上后两个配置,并使用xsync进行同步

/opt/module/hbase-2.0.5/conf  注意分发
/opt/module/phoenix-5.0.0/bin

重启Hbase服务

(4)在phoenix中执行

create schema GMALL2022_REALTIME;
# 在hbase查看是否创建成功
cd /opt/module/hbase-2.0.5/bin/
hbase shell
list_namespace

(5)增加代码

a TableProcessFunction

// 声明连接对象
private Connection conn;@Override
public void open(Configuration parameters) throws Exception {// 注册驱动Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");// 获取连接conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);

b checkTable

// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {// 对主键进行空值处理if (pk == null){pk = "id";}// 对建表扩展进行空值处理if (ext == null){ext = "";}StringBuilder createSql = new StringBuilder("create table if not exists "+GmallConfig.HBASE_SCHEMA + "." + tableName +"(");String[] fieldArr = fields.split(",");for (int i = 0; i < fieldArr.length; i++) {String field = fieldArr[i];// 判断是否为主键if (field.equals(pk)){createSql.append(field + " varchar primary key ");}else {createSql.append(field + " varchar ");}if(i < fieldArr.length - 1){createSql.append(",");}}createSql.append(")" + ext);System.out.println("phoenix中的建表语句:" + createSql);// 创建数据库操作对象PreparedStatement ps = null;try {ps = conn.prepareStatement(createSql.toString());// 执行sql语句ps.execute();} catch (SQLException e) {e.printStackTrace();throw new RuntimeException("在phoenix中建表失败");} finally {// 释放资源if(ps != null){ps.close();}}
}

(6)测试

启动hadoop(等待安全模式关闭再进行下一步),zookeeper,kafka,hbase,phoenix,maxwell,在phoenix中查看表数据
!tables
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

2 过滤字段

数据在向下游传递之前,过滤掉不需要的字段,只保留配置表中sink_columns存在的字段。

(1)代码编写

if (tableProcess != null){// 在配置表中找到了该操作对应的配置// 判断是事实数据还是维度数据String sinkTable = tableProcess.getSinkTable();jsonObj.put("sink_table",sinkTable);// 在向下游传递数据之前,将不需要的字段过滤掉// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤JSONObject dataJsonObj = jsonObj.getJSONObject("data");filterColumns(dataJsonObj,tableProcess.getSinkColumns());String sinkType = tableProcess.getSinkType();if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){// 维度数据,放到维度侧输出流中ctx.output(dimTag,jsonObj);}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){// 事实数据,放到主流中out.collect(jsonObj);}
}else {// 在配置表中没有该操作对应的配置System.out.println("No This Key In TableProcess:" + key);
}
// 过滤字段private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}// sinkColumns : id,tm_nameString[] columnArr = sinkColumns.split(",");// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法List<String> columnList = Arrays.asList(columnArr);// 获取json中的每一个名值对(KV)Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
//            if(!columnList.contains(it.next().getKey())){
//                it.remove();
//            }
//        }entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));}

(2)测试

开启相关环境,在表中添加或者删除数据,查看输出结果。

(3)总结

在这里插入图片描述

动态分流总结:

  • 广播流数据处理,FlinkCDC从MySQL中读取配置信息
    • 获取状态
    • 读取配置**(维度表的创建)**
    • 将配置封装为Map<sourceTable:operateType,TableProcess>放到状态中。
  • 主流数据处理,maxwell从业务数据库中采集到的数据
    • 获取状态
    • 从状态中获取当前处理数据的配置信息(字段过滤)
    • 根据配置信息进行分流(事实与维度)
  • 维度表的创建
    • 拼接建表语句
    • 通过jdbc执行建表语句
    • 测试
      • schema和namespace的映射
      • 拼接sql,空格处理
      • 提前创建schema
  • 字段过滤
    • 获取保留字段,放到List集合中
    • 获取dataJsonObj,转换为EntrySet
    • 根据保留字段判断EntrySet遍历出来的entry,是否保留

二 分流Sink之保存维度数据到HBase(Phoenix)

1 程序执行流程

在这里插入图片描述

DimSink 继承了RichSinkFunction,这个function得分两条时间线。

  • 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
  • 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。

2 DimSink

package com.hzy.gmall.realtime.app.fun;
/*** 将维度侧输出流的数据写到hbase(Phoenix)中*/
public class DimSink extends RichSinkFunction<JSONObject> {private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}@Overridepublic void invoke(JSONObject jsonObj, Context context) throws Exception {// 上游传递过来的数据格式如下:// {"database":"gmall2022",// "data":{"tm_name":"a","id":13},// "commit":true,// "sink_table":"dim_base_trademark",// "type":"insert",// "table":"base_trademark","// ts":1670131087}// 获取维度表表名String tableName = jsonObj.getString("sink_table");// 获取数据JSONObject dataJsonObj = jsonObj.getJSONObject("data");// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);String upsertSql = genUpsertSql(tableName,dataJsonObj);System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);PreparedStatement ps = null;try {// 创建数据库操作对象ps = conn.prepareStatement(upsertSql);// 执行sql语句ps.executeUpdate();// 手动提交事务,phoenix的连接实现类不是自动提交事务conn.commit();}catch (SQLException e){e.printStackTrace();throw new RuntimeException("向phoenix维度表中插入数据失败了");} finally {// 释放资源if (ps != null){ps.close();}}}// 拼接插入语句private String genUpsertSql(String tableName, JSONObject dataJsonObj) {// id 10// tm_name zsString upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +" ("+ StringUtils.join(dataJsonObj.keySet(),",") +") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";return upsertSql;}
}

3 BaseDBApp

//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());

4 测试

开启必要环境,向base_tradmark表中添加一条数据,查看phoenix是否插入成功。

三 分流Sink之保存业务数据到Kafka主题

1 BaseDBApp

//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),jsonObj.getJSONObject("data").toJSONString().getBytes());}})
);

2 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }public static FlinkKafkaProducer<String> getKafkaSink(String topic){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());}},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}// 获取kafka的生产者public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}

3 测试

在base_trademark表中添加一条数据,查看程序输出结果,在phoenix中查看结果。

维度数据::3> {"database":"gmall2022","xid":24993,"data":{"tm_name":"c","id":14},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1670150491}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('c','14')select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

启动kafka消费者,kfkcon.sh dwd_order_info,在order_info中修改一条数据,查看程序输出结果,在kafka中查看结果。

事实数据::3> {"database":"gmall2022","xid":25144,"data":{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00},"old":{"consignee":"苗冰"},"commit":true,"sink_table":"dwd_order_info","type":"update","table":"order_info","ts":1670150618}
kafka中输出内容
{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00}

4 总结

动态分流测试执行流程

  • 需要启动的进程:zookeeper,kafka,maxwell,hdfs,hbase,BaseDBApp

  • 当业务数据发生变化,maxwell会采集变化数据到ods层

  • BaseDBApp从ods层读取到变化数据,作为业务主流

  • BaseDBApp在启动时,会通过FlinkCDC读取配置表,作为广播流

  • 业务流和广播流通过connect进行连接

  • 对连接之后的数据通过process进行处理

    • processElement
    • processBroadcastElement

    具体执行流程见一 2(3)总结

  • 将维度侧输出流的数据写到Hbase中 – DimSink

    • 拼接upsert
    • 执行sql(手动提交事务)
  • 将主流数据写回到kafka的dwd层

    • 重写获取FlinkKafkaProducer的方法,自定义序列化的过程
    • 将主流的数据写到kafka不同的主题中,并且保存精准一次性

四 总结

DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中使用了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个算子何时会用到,如何进行选择,汇总见下表:

Function可转换结构可过滤数据侧输出open****方法可以使用状态输出至
MapFunctionYesNoNoNoNo下游算子
FilterFunctionNoYesNoNoNo下游算子
RichMapFunctionYesNoNoYesYes下游算子
RichFilterFunctionNoYesNoYesYes下游算子
ProcessFunctionYesYesYesYesYes下游算子
SinkFunctionYesYesNoNoNo外部
RichSinkFunctionYesYesNoYesYes外部

从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。

五 附录:完整代码

0 BaseDBApp

package com.hzy.gmall.realtime.app.dwd;public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 基本环境准备//流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(4);//        //TODO 2 检查点设置
//        //开启检查点
//        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
//        // 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        // 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        // 设置job取消后,检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        // 指定操作HDFS的用户
//        System.setProperty("HADOOP_USER_NAME","hzy");//TODO 3 从kafka中读取数据//声明消费的主题以及消费者组String topic = "ods_base_db_m";String groupId = "base_db_app_group";// 获取消费者对象FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);// 读取数据,封装成流DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//TODO 4 对数据类型进行转换 String -> JSONObjectSingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);//TODO 5 简单的ETLSingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(new FilterFunction<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonobj) throws Exception {boolean flag =jsonobj.getString("table") != null &&jsonobj.getString("table").length() > 0 &&jsonobj.getJSONObject("data") != null &&jsonobj.getString("data").length() > 3;return flag;}});
//        filterDS.print("<<<");//TODO 6 使用FlinkCDC读取配置表数据//获取dataSourceProperties props = new Properties();props.setProperty("scan.startup.mode","initial");SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder().hostname("hadoop101").port(3306).username("root").password("123456")// 可配置多个库.databaseList("gmall2022_realtime")///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据//注意:指定的时候需要使用"db.table"的方式.tableList("gmall2022_realtime.table_process").debeziumProperties(props).deserializer(new MyDeserializationSchemaFunction()).build();// 读取数据封装流DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去// 想要使用广播状态,状态描述器只能是map,使用map状态存储MapStateDescriptor<String, TableProcess> mapStateDescriptor =new MapStateDescriptor<>("table_process", String.class, TableProcess.class);BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);// 调用非广播流的connect方法,将业务流与配置流进行连接BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中//声明维度侧输出流的标记OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(new TableProcessFunction(dimTag,mapStateDescriptor));// 获取维度侧输出流DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);realDS.print("事实数据:");dimDS.print("维度数据:");//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中dimDS.addSink(new DimSink());//TODO 9 将主流数据写回kafka的dwd层realDS.addSink(MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),jsonObj.getJSONObject("data").toJSONString().getBytes());}}));env.execute();}
}

1 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
/*** 操作kafka工具类*/
public class MyKafkaUtil {private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";private static final String DEFAULT_TOPIC = "default_topic";// 获取kafka的消费者public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);// 定义消费者组props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);}// 获取kafka的生产者// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }public static FlinkKafkaProducer<String> getKafkaSink(String topic){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());}},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}// 获取kafka的生产者public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}
}

2 GmallConfig

package com.hzy.gmall.realtime.common;/*** 实时数仓中的常量类*/
public class GmallConfig {public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

3 TableProcess

package com.hzy.gmall.realtime.beans;import lombok.Data;
@Data
public class TableProcess {//动态分流Sink常量   改为小写和脚本一致public static final String SINK_TYPE_HBASE = "hbase";public static final String SINK_TYPE_KAFKA = "kafka";public static final String SINK_TYPE_CK = "clickhouse";//来源表String sourceTable;//操作类型 insert,update,deleteString operateType;//输出类型 hbase kafkaString sinkType;//输出表(主题)String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}

4 MyDeserializationSchemaFunction

package com.hzy.gmall.realtime.app.fun;public class MyDeserializationSchemaFunction  implements DebeziumDeserializationSchema<String> {// 反序列化@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {// 导入的是org.apache.kafka.connnect.data包Struct valueStruct = (Struct) sourceRecord.value();// 获取数据的来源Struct afterStruct = valueStruct.getStruct("after");// 获取数据库和表名的来源Struct sourceStruct = valueStruct.getStruct("source");// 获取数据库String database = sourceStruct.getString("db");// 获取表名String table = sourceStruct.getString("table");// 获取操作类型
//        String op = valueStruct.getString("op");String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();if(type.equals("create")){type = "insert";}JSONObject jsonObj = new JSONObject();jsonObj.put("database",database);jsonObj.put("table",table);jsonObj.put("type",type);// 获取影响的数据// 删除时,afterStruct为空JSONObject dataJsonObj = new JSONObject();if (afterStruct != null){// schema获取源数据的格式,fields获取里面的各个元素for (Field field : afterStruct.schema().fields()) {String fieldName = field.name();Object fieldValue = afterStruct.get(field);dataJsonObj.put(fieldName,fieldValue);}}// 删除操作会使得data属性不为空,但size为0jsonObj.put("data",dataJsonObj);// 向下游发送数据collector.collect(jsonObj.toJSONString()) ;}// 指定类型@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}

5 TableProcessFunction

package com.hzy.gmall.realtime.app.fun;/*** 实现动态分流* 目前流中有两条流中的数据,使用以下两个方法分别进行处理*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {// 声明维度侧输出流标签private OutputTag<JSONObject> dimTag;// 声明广播状态描述器private MapStateDescriptor<String, TableProcess> mapStateDescriptor;// 声明连接对象private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {// 注册驱动Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");// 获取连接conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {this.dimTag = dimTag;this.mapStateDescriptor = mapStateDescriptor;}// 处理业务流中的数据,maxwell从业务数据库中采集到的数据@Overridepublic void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {String table = jsonObj.getString("table");String type = jsonObj.getString("type");// 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insertif (type.equals("bootstrap-insert")){type = "insert";jsonObj.put("type",type);}// 拼接keyString key = table  + ":" + type;// 获取状态ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 从状态中获取配置信息TableProcess tableProcess = broadcastState.get(key);if (tableProcess != null){// 在配置表中找到了该操作对应的配置// 判断是事实数据还是维度数据String sinkTable = tableProcess.getSinkTable();jsonObj.put("sink_table",sinkTable);// 在向下游传递数据之前,将不需要的字段过滤掉// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤JSONObject dataJsonObj = jsonObj.getJSONObject("data");filterColumns(dataJsonObj,tableProcess.getSinkColumns());String sinkType = tableProcess.getSinkType();if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){// 维度数据,放到维度侧输出流中ctx.output(dimTag,jsonObj);}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){// 事实数据,放到主流中out.collect(jsonObj);}}else {// 在配置表中没有该操作对应的配置System.out.println("No This Key In TableProcess:" + key);}}// 过滤字段private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}// sinkColumns : id,tm_nameString[] columnArr = sinkColumns.split(",");// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法List<String> columnList = Arrays.asList(columnArr);// 获取json中的每一个名值对(KV)Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
//            if(!columnList.contains(it.next().getKey())){
//                it.remove();
//            }
//        }entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));}// 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息@Overridepublic void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {// 获取状态BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 将json格式字符串转换为JSON对象JSONObject jsonObj = JSONObject.parseObject(jsonStr);// 获取配置表中的一条配置信息// parseObject:将json格式字符串转化为json格式对象// 第二个参数为将json字符串转化为何种格式的对象TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);// 业务数据库表名String sourceTable = tableProcess.getSourceTable();// 操作类型String operateType = tableProcess.getOperateType();// 数据类型 hbase -- 维度数据    kafka -- 事实数据String sinkType = tableProcess.getSinkType();// 指定输出目的地String sinkTable = tableProcess.getSinkTable();// 主键String sinkPk = tableProcess.getSinkPk();// 指定保留字段(列)String sinkColumns = tableProcess.getSinkColumns();// 指定建表扩展语句String sinkExtend = tableProcess.getSinkExtend();// 如果读取到的配置信息是维度数据,提前创建维度表if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);}// 将配置信息放到状态中// 拼接keyString key = sourceTable + ":" + operateType;broadcastState.put(key,tableProcess);}// 在处理配置数据时,提前建立维度表// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {// 对主键进行空值处理if (pk == null){pk = "id";}// 对建表扩展进行空值处理if (ext == null){ext = "";}StringBuilder createSql = new StringBuilder("create table if not exists "+GmallConfig.HBASE_SCHEMA + "." + tableName +"(");String[] fieldArr = fields.split(",");for (int i = 0; i < fieldArr.length; i++) {String field = fieldArr[i];// 判断是否为主键if (field.equals(pk)){createSql.append(field + " varchar primary key ");}else {createSql.append(field + " varchar ");}if(i < fieldArr.length - 1){createSql.append(",");}}createSql.append(")" + ext);System.out.println("phoenix中的建表语句:" + createSql);// 创建数据库操作对象PreparedStatement ps = null;try {ps = conn.prepareStatement(createSql.toString());// 执行sql语句ps.execute();} catch (SQLException e) {e.printStackTrace();throw new RuntimeException("在phoenix中建表失败");} finally {// 释放资源if(ps != null){ps.close();}}}
}

6 DimSink

package com.hzy.gmall.realtime.app.fun;/*** 将维度侧输出流的数据写到hbase(Phoenix)中*/
public class DimSink extends RichSinkFunction<JSONObject> {private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}@Overridepublic void invoke(JSONObject jsonObj, Context context) throws Exception {// 上游传递过来的数据格式如下:// {"database":"gmall2022",// "data":{"tm_name":"a","id":13},// "commit":true,// "sink_table":"dim_base_trademark",// "type":"insert",// "table":"base_trademark","// ts":1670131087}// 获取维度表表名String tableName = jsonObj.getString("sink_table");// 获取数据JSONObject dataJsonObj = jsonObj.getJSONObject("data");// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);String upsertSql = genUpsertSql(tableName,dataJsonObj);System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);PreparedStatement ps = null;try {// 创建数据库操作对象ps = conn.prepareStatement(upsertSql);// 执行sql语句ps.executeUpdate();// 手动提交事务,phoenix的连接实现类不是自动提交事务conn.commit();}catch (SQLException e){e.printStackTrace();throw new RuntimeException("向phoenix维度表中插入数据失败了");} finally {// 释放资源if (ps != null){ps.close();}}}// 拼接插入语句private String genUpsertSql(String tableName, JSONObject dataJsonObj) {// id 10// tm_name zsString upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +" ("+ StringUtils.join(dataJsonObj.keySet(),",") +") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";return upsertSql;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_236308.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用Python写一个模拟qq聊天小程序的代码实例

前言 今天小编就为大家分享一篇关于用Python写一个模拟qq聊天小程序的代码实例&#xff0c;小编觉得内容挺不错的&#xff0c;现在分享给大家&#xff0c;具有很好的参考价值&#xff0c;需要的朋友一起跟随小编来看看吧 Python 超简单的聊天程序 客户端: 服务器: 模拟qq聊…

张驰咨询:快速提高流程效率的5个关键精益生产工具

精益&#xff0c;又称“精益制造”或“精益生产”&#xff0c;注重通过消除浪费、消除缺陷&#xff0c;实现客户价值最大化。精益工具是关于理解过程&#xff0c;发现浪费&#xff0c;防止错误和记录你所做的事情。 让我们来看看流程改进中使用的五种精益工具&#xff0c;它们…

对 CSS 工程化的理解

CSS 工程化是为了解决以下问题&#xff1a; 宏观设计&#xff1a;CSS 代码如何组织、如何拆分、模块结构怎样设计&#xff1f;编码优化&#xff1a;怎样写出更好的 CSS&#xff1f;构建&#xff1a;如何处理我的 CSS&#xff0c;才能让它的打包结果最优&#xff1f;可维护性&a…

ReplicaSet和Deployment

ReplicaSet和Deployment 写在前面 语雀原文阅读效果更佳&#xff1a;198 ReplicaSet和Deployment 语雀 《198 ReplicaSet和Deployment》 1、ReplicaSet 假如我们现在有一个 Pod 正在提供线上的服务&#xff0c;我们来想想一下我们可能会遇到的一些场景&#xff1a; 某次运营…

计算机毕业设计django基于python大学生多媒体学习系统

项目介绍 随着计算机多媒体技术的发展和网络的普及。采用当前流行的B/S模式以及3层架构的设计思想通过Python技术来开发此系统的目的是建立一个配合网络环境的大学生多媒体学习系统的平台,这样可以有效地解决数据学习系统混乱的局面。 本文首先介绍了大学生多媒体学习系统的发…

加密与认证技术

加密与认证技术密码技术概述密码算法与密码体制的基本概念加密算法与解密算法秘钥的作用什么是密码密钥长度对称密码体系对称加密的基本概念典型的对称加密算法DES加密算法3DES加密算法非对称密码体系非对称加密基本概念密码技术概述 密码技术是保证网络安全的核心技术之一&am…

Jmeter(十六):jmeter场景的运行架构(本地运行和远程运行)配置远程负载机

jmeter场景的运行架构(本地运行和远程运行) 运行方式&#xff1a; GUI运行&#xff1a;通过图形界面方式运行&#xff0c;该运行方式的可视化界面及监听器动态展示 结果都比较消耗负载机资源&#xff0c;建议大并发时不用&#xff0c;一般进行脚本调试&#xff1b; 命令行运行…

Jina 开箱即用的云原生多模态系统解决方案

Jina 是一个基于云原生的多模态应用框架&#xff0c;开发者使用 Jina 可以轻松构建、部署和维护高性能的云原生应用。你可能认为这些都只是空泛的营销口号&#xff0c;甚至产生疑问&#xff0c;到底什么是云原生&#xff1f;对构建多模态应用有什么帮助&#xff1f;它是否只是 …

2022年总结:道阻且长,行则将至

前言 今年是第四个年头写总结了&#xff0c;直到这个时候&#xff0c;我仍未想出今年的标题是什么。 2019年总结&#xff0c;平凡的我仍在平凡的生活 2020年总结&#xff0c;所有努力只为一份期待 2021年总结&#xff1a;前路有光&#xff0c;初心莫忘 如果非得用一句话来…

Spark 3.0 - 12.ML GBDT 梯度提升树理论与实战

目录 一.引言 二.GBDT 理论 1.集成学习 2.分类 & 回归问题 3.梯度提升 4.GBDT 生成 三.GBDT 实战 1.数据准备 2.构建 GBDT Pipeline 3.预测与评估 四.总结 一.引言 关于决策树前面已经介绍了常规决策树与随机森林两种类型的知识&#xff0c;本文主要介绍梯度提…

微服务调用工具

微服务调用工具目录概述需求&#xff1a;设计思路实现思路分析1.A2.B3.C参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for change,challenge Survive…

华为二面,原来是我对自动化测试的理解太肤浅了..

如何使用Python实现自动化测试 如果你入职一家新的公司&#xff0c;领导让你开展自动化测试&#xff0c;作为一个新人&#xff0c;你肯定会手忙脚乱&#xff0c;你会如何落地自动化测试呢&#xff1f;资深测试架构师沉醉将告诉你如何落地自动kan化测试&#xff0c;本次话题主要…

事业编招聘:南方科技大学附属实验学校2022年·面向应届毕业生招聘在编教师公告

南方科技大学是在中国高等教育改革发展背景下创建的一所高起点公办创新型大学&#xff0c;2022年2月14日&#xff0c;教育部等三部委公布第二轮“双一流”建设高校及建设学科名单&#xff0c;南方科技大学入选“双一流”建设高校名单。 南方科技大学附属实验学校&#xff0c;地…

大学生静态HTML网页源码 我的校园网页设计成品 学校班级网页制作模板 web课程设计 dreamweaver网页作业

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

Jenkins持续集成项目搭建与实践——基于Python Selenium自动化测试(自由风格)

&#x1f4cc; 博客主页&#xff1a; 程序员二黑 &#x1f4cc; 专注于软件测试领域相关技术实践和思考&#xff0c;持续分享自动化软件测试开发干货知识&#xff01; &#x1f4cc; 公号同名&#xff0c;欢迎加入我的测试交流群&#xff0c;我们一起交流学习&#xff01; 目录…

艾美捷CpG ODN——ODN 1720 (TLRGRADE)说明书

艾美捷CpG ODN系列——ODN 1720 (TLRGRADE)&#xff1a;具有硫代磷酸酯骨架的GpC寡脱氧核苷酸。 艾美捷CpG ODN 丨ODN 1720 (TLRGRADE)化学性质&#xff1a; 序列&#xff1a;5-tccatgagcttcctgatgct-3&#xff08;小写字母表示硫代磷酸酯键&#xff09;。 MW&#xff1a;638…

MySQL -2 指令

客户端SQL指令记录&#xff1a; -- 针对 数据库和针对数据表 &#xff08;一&#xff09;数据库 1. 查看当前所有数据库&#xff1a;show databases; 2. 创建数据库&#xff1a;create database 数据库名 DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 3. 删除数据库&#…

微信公众号开发——实现用户微信网页授权流程

&#x1f60a; 作者&#xff1a; 一恍过去&#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390&#x1f38a; 社区&#xff1a; Java技术栈交流&#x1f389; 主题&#xff1a; 微信公众号开发——实现用户微信网页授权流程⏱️ 创作时间&#xff1a; …

哈希表及其与Java类集的关系

目录 1.哈希表的概念 2.哈希冲突 3.如何避免哈希冲突? 3.1哈希函数设计 3.2 负载因子的调节 4.解决哈希冲突 4.1闭散列 4.1.1线性探测 4.1.2二次探测 4.2开散列(哈希桶) 5.HashMap 6.HashSet 1.哈希表的概念 假设有一组数据,要让你去搜索其中的一个关键码,这种场…

嵌入式软件工程师技能树——Linux应用编程+网络编程+驱动开发+操作系统+计算机网络

文章目录Linux驱动开发1、Linux内核组成2、用户空间与内核的通讯方式有哪些&#xff1f;3、系统调用read/write流程4、内核态用户态的区别5、bootloader内核 根文件的关系6、BootLoader的作用7、BootLoader两个启动阶段1、汇编实现&#xff0c;完成依赖于CPU体系架构的设置&…