文章目录
- 一 分流Sink之建立维度表到HBase(Phoenix)
- 1 拼接建表语句
- (1)定义配置常量类
- (2)引入依赖
- (3)hbase-site.xml
- (4)在phoenix中执行
- (5)增加代码
- a TableProcessFunction
- b checkTable
- (6)测试
- 2 过滤字段
- (1)代码编写
- (2)测试
- (3)总结
- 二 分流Sink之保存维度数据到HBase(Phoenix)
- 1 程序执行流程
- 2 DimSink
- 3 BaseDBApp
- 4 测试
- 三 分流Sink之保存业务数据到Kafka主题
- 1 BaseDBApp
- 2 MyKafkaUtil
- 3 测试
- 4 总结
- 四 总结
- 五 附录:完整代码
- 0 BaseDBApp
- 1 MyKafkaUtil
- 2 GmallConfig
- 3 TableProcess
- 4 MyDeserializationSchemaFunction
- 5 TableProcessFunction
- 6 DimSink
一 分流Sink之建立维度表到HBase(Phoenix)
1 拼接建表语句
如果读取到的配置信息是维度数据,提前在hbase中通过Phoenix创建维度表。
(1)定义配置常量类
定义一个项目中常用的配置常量类GmallConfig。
package com.hzy.gmall.realtime.common;/*** 实时数仓中的常量类*/
public class GmallConfig {public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}
(2)引入依赖
<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,可以很方便的对bean对象的属性进行操作-->
<dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.9.3</version>
</dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-spark</artifactId><version>5.0.0-HBase-2.0</version><exclusions><exclusion><groupId>org.glassfish</groupId><artifactId>javax.el</artifactId></exclusion></exclusions>
</dependency>
(3)hbase-site.xml
因为要用单独的schema,所以在Idea程序中加入hbase-site.xml。
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>hbase.rootdir</name><value>hdfs://hadoop101:8020/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><property><name>hbase.zookeeper.quorum</name><value>hadoop101,hadoop102,hadoop103</value></property><property><name>hbase.unsafe.stream.capability.enforce</name><value>false</value></property><property><name>hbase.wal.provider</name><value>filesystem</value></property><property><name>phoenix.schema.isNamespaceMappingEnabled</name><value>true</value></property><property><name>phoenix.schema.mapSystemTablesToNamespace</name><value>true</value></property></configuration>
注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上后两个配置,并使用xsync进行同步。
/opt/module/hbase-2.0.5/conf 注意分发
/opt/module/phoenix-5.0.0/bin
重启Hbase服务
(4)在phoenix中执行
create schema GMALL2022_REALTIME;
# 在hbase查看是否创建成功
cd /opt/module/hbase-2.0.5/bin/
hbase shell
list_namespace
(5)增加代码
a TableProcessFunction
// 声明连接对象
private Connection conn;@Override
public void open(Configuration parameters) throws Exception {// 注册驱动Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");// 获取连接conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);
b checkTable
// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {// 对主键进行空值处理if (pk == null){pk = "id";}// 对建表扩展进行空值处理if (ext == null){ext = "";}StringBuilder createSql = new StringBuilder("create table if not exists "+GmallConfig.HBASE_SCHEMA + "." + tableName +"(");String[] fieldArr = fields.split(",");for (int i = 0; i < fieldArr.length; i++) {String field = fieldArr[i];// 判断是否为主键if (field.equals(pk)){createSql.append(field + " varchar primary key ");}else {createSql.append(field + " varchar ");}if(i < fieldArr.length - 1){createSql.append(",");}}createSql.append(")" + ext);System.out.println("phoenix中的建表语句:" + createSql);// 创建数据库操作对象PreparedStatement ps = null;try {ps = conn.prepareStatement(createSql.toString());// 执行sql语句ps.execute();} catch (SQLException e) {e.printStackTrace();throw new RuntimeException("在phoenix中建表失败");} finally {// 释放资源if(ps != null){ps.close();}}
}
(6)测试
启动hadoop(等待安全模式关闭再进行下一步),zookeeper,kafka,hbase,phoenix,maxwell,在phoenix中查看表数据
!tables
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;
2 过滤字段
数据在向下游传递之前,过滤掉不需要的字段,只保留配置表中sink_columns存在的字段。
(1)代码编写
if (tableProcess != null){// 在配置表中找到了该操作对应的配置// 判断是事实数据还是维度数据String sinkTable = tableProcess.getSinkTable();jsonObj.put("sink_table",sinkTable);// 在向下游传递数据之前,将不需要的字段过滤掉// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤JSONObject dataJsonObj = jsonObj.getJSONObject("data");filterColumns(dataJsonObj,tableProcess.getSinkColumns());String sinkType = tableProcess.getSinkType();if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){// 维度数据,放到维度侧输出流中ctx.output(dimTag,jsonObj);}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){// 事实数据,放到主流中out.collect(jsonObj);}
}else {// 在配置表中没有该操作对应的配置System.out.println("No This Key In TableProcess:" + key);
}
// 过滤字段private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}// sinkColumns : id,tm_nameString[] columnArr = sinkColumns.split(",");// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法List<String> columnList = Arrays.asList(columnArr);// 获取json中的每一个名值对(KV)Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
// // 获取迭代器
// Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
// // 遍历,如果不包含则删除
// for (;it.hasNext();) {
// if(!columnList.contains(it.next().getKey())){
// it.remove();
// }
// }entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));}
(2)测试
开启相关环境,在表中添加或者删除数据,查看输出结果。
(3)总结
动态分流总结:
- 广播流数据处理,FlinkCDC从MySQL中读取配置信息
- 获取状态
- 读取配置**(维度表的创建)**
- 将配置封装为
Map<sourceTable:operateType,TableProcess>
放到状态中。
- 主流数据处理,maxwell从业务数据库中采集到的数据
- 获取状态
- 从状态中获取当前处理数据的配置信息(字段过滤)
- 根据配置信息进行分流(事实与维度)
- 维度表的创建
- 拼接建表语句
- 通过jdbc执行建表语句
- 测试
- schema和namespace的映射
- 拼接sql,空格处理
- 提前创建schema
- 字段过滤
- 获取保留字段,放到List集合中
- 获取dataJsonObj,转换为EntrySet
- 根据保留字段判断EntrySet遍历出来的entry,是否保留
二 分流Sink之保存维度数据到HBase(Phoenix)
1 程序执行流程
DimSink 继承了RichSinkFunction,这个function得分两条时间线。
- 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
- 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。
2 DimSink
package com.hzy.gmall.realtime.app.fun;
/*** 将维度侧输出流的数据写到hbase(Phoenix)中*/
public class DimSink extends RichSinkFunction<JSONObject> {private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}@Overridepublic void invoke(JSONObject jsonObj, Context context) throws Exception {// 上游传递过来的数据格式如下:// {"database":"gmall2022",// "data":{"tm_name":"a","id":13},// "commit":true,// "sink_table":"dim_base_trademark",// "type":"insert",// "table":"base_trademark","// ts":1670131087}// 获取维度表表名String tableName = jsonObj.getString("sink_table");// 获取数据JSONObject dataJsonObj = jsonObj.getJSONObject("data");// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);String upsertSql = genUpsertSql(tableName,dataJsonObj);System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);PreparedStatement ps = null;try {// 创建数据库操作对象ps = conn.prepareStatement(upsertSql);// 执行sql语句ps.executeUpdate();// 手动提交事务,phoenix的连接实现类不是自动提交事务conn.commit();}catch (SQLException e){e.printStackTrace();throw new RuntimeException("向phoenix维度表中插入数据失败了");} finally {// 释放资源if (ps != null){ps.close();}}}// 拼接插入语句private String genUpsertSql(String tableName, JSONObject dataJsonObj) {// id 10// tm_name zsString upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +" ("+ StringUtils.join(dataJsonObj.keySet(),",") +") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";return upsertSql;}
}
3 BaseDBApp
//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());
4 测试
开启必要环境,向base_tradmark表中添加一条数据,查看phoenix是否插入成功。
三 分流Sink之保存业务数据到Kafka主题
1 BaseDBApp
//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),jsonObj.getJSONObject("data").toJSONString().getBytes());}})
);
2 MyKafkaUtil
package com.hzy.gmall.realtime.utils;
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
// public static FlinkKafkaProducer<String> getKafkaSink(String topic){
// return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
// }public static FlinkKafkaProducer<String> getKafkaSink(String topic){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());}},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}// 获取kafka的生产者public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}
3 测试
在base_trademark表中添加一条数据,查看程序输出结果,在phoenix中查看结果。
维度数据::3> {"database":"gmall2022","xid":24993,"data":{"tm_name":"c","id":14},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1670150491}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('c','14')select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;
启动kafka消费者,kfkcon.sh dwd_order_info
,在order_info中修改一条数据,查看程序输出结果,在kafka中查看结果。
事实数据::3> {"database":"gmall2022","xid":25144,"data":{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00},"old":{"consignee":"苗冰"},"commit":true,"sink_table":"dwd_order_info","type":"update","table":"order_info","ts":1670150618}
kafka中输出内容
{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00}
4 总结
动态分流测试执行流程
-
需要启动的进程:zookeeper,kafka,maxwell,hdfs,hbase,BaseDBApp
-
当业务数据发生变化,maxwell会采集变化数据到ods层
-
BaseDBApp从ods层读取到变化数据,作为业务主流
-
BaseDBApp在启动时,会通过FlinkCDC读取配置表,作为广播流
-
业务流和广播流通过connect进行连接
-
对连接之后的数据通过process进行处理
- processElement
- processBroadcastElement
具体执行流程见一 2(3)总结
-
将维度侧输出流的数据写到Hbase中 – DimSink
- 拼接upsert
- 执行sql(手动提交事务)
-
将主流数据写回到kafka的dwd层
- 重写获取FlinkKafkaProducer的方法,自定义序列化的过程
- 将主流的数据写到kafka不同的主题中,并且保存精准一次性
四 总结
DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中使用了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个算子何时会用到,如何进行选择,汇总见下表:
Function | 可转换结构 | 可过滤数据 | 侧输出 | open****方法 | 可以使用状态 | 输出至 |
---|---|---|---|---|---|---|
MapFunction | Yes | No | No | No | No | 下游算子 |
FilterFunction | No | Yes | No | No | No | 下游算子 |
RichMapFunction | Yes | No | No | Yes | Yes | 下游算子 |
RichFilterFunction | No | Yes | No | Yes | Yes | 下游算子 |
ProcessFunction | Yes | Yes | Yes | Yes | Yes | 下游算子 |
SinkFunction | Yes | Yes | No | No | No | 外部 |
RichSinkFunction | Yes | Yes | No | Yes | Yes | 外部 |
从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。
五 附录:完整代码
0 BaseDBApp
package com.hzy.gmall.realtime.app.dwd;public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 基本环境准备//流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(4);// //TODO 2 检查点设置
// //开启检查点
// env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
// // 设置检查点超时时间
// env.getCheckpointConfig().setCheckpointTimeout(60000L);
// // 设置重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// // 设置job取消后,检查点是否保留
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// // 指定操作HDFS的用户
// System.setProperty("HADOOP_USER_NAME","hzy");//TODO 3 从kafka中读取数据//声明消费的主题以及消费者组String topic = "ods_base_db_m";String groupId = "base_db_app_group";// 获取消费者对象FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);// 读取数据,封装成流DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//TODO 4 对数据类型进行转换 String -> JSONObjectSingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);//TODO 5 简单的ETLSingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(new FilterFunction<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonobj) throws Exception {boolean flag =jsonobj.getString("table") != null &&jsonobj.getString("table").length() > 0 &&jsonobj.getJSONObject("data") != null &&jsonobj.getString("data").length() > 3;return flag;}});
// filterDS.print("<<<");//TODO 6 使用FlinkCDC读取配置表数据//获取dataSourceProperties props = new Properties();props.setProperty("scan.startup.mode","initial");SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder().hostname("hadoop101").port(3306).username("root").password("123456")// 可配置多个库.databaseList("gmall2022_realtime")///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据//注意:指定的时候需要使用"db.table"的方式.tableList("gmall2022_realtime.table_process").debeziumProperties(props).deserializer(new MyDeserializationSchemaFunction()).build();// 读取数据封装流DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去// 想要使用广播状态,状态描述器只能是map,使用map状态存储MapStateDescriptor<String, TableProcess> mapStateDescriptor =new MapStateDescriptor<>("table_process", String.class, TableProcess.class);BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);// 调用非广播流的connect方法,将业务流与配置流进行连接BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中//声明维度侧输出流的标记OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(new TableProcessFunction(dimTag,mapStateDescriptor));// 获取维度侧输出流DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);realDS.print("事实数据:");dimDS.print("维度数据:");//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中dimDS.addSink(new DimSink());//TODO 9 将主流数据写回kafka的dwd层realDS.addSink(MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),jsonObj.getJSONObject("data").toJSONString().getBytes());}}));env.execute();}
}
1 MyKafkaUtil
package com.hzy.gmall.realtime.utils;
/*** 操作kafka工具类*/
public class MyKafkaUtil {private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";private static final String DEFAULT_TOPIC = "default_topic";// 获取kafka的消费者public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);// 定义消费者组props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);}// 获取kafka的生产者// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
// public static FlinkKafkaProducer<String> getKafkaSink(String topic){
// return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
// }public static FlinkKafkaProducer<String> getKafkaSink(String topic){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());}},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}// 获取kafka的生产者public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}
}
2 GmallConfig
package com.hzy.gmall.realtime.common;/*** 实时数仓中的常量类*/
public class GmallConfig {public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}
3 TableProcess
package com.hzy.gmall.realtime.beans;import lombok.Data;
@Data
public class TableProcess {//动态分流Sink常量 改为小写和脚本一致public static final String SINK_TYPE_HBASE = "hbase";public static final String SINK_TYPE_KAFKA = "kafka";public static final String SINK_TYPE_CK = "clickhouse";//来源表String sourceTable;//操作类型 insert,update,deleteString operateType;//输出类型 hbase kafkaString sinkType;//输出表(主题)String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}
4 MyDeserializationSchemaFunction
package com.hzy.gmall.realtime.app.fun;public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {// 反序列化@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {// 导入的是org.apache.kafka.connnect.data包Struct valueStruct = (Struct) sourceRecord.value();// 获取数据的来源Struct afterStruct = valueStruct.getStruct("after");// 获取数据库和表名的来源Struct sourceStruct = valueStruct.getStruct("source");// 获取数据库String database = sourceStruct.getString("db");// 获取表名String table = sourceStruct.getString("table");// 获取操作类型
// String op = valueStruct.getString("op");String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();if(type.equals("create")){type = "insert";}JSONObject jsonObj = new JSONObject();jsonObj.put("database",database);jsonObj.put("table",table);jsonObj.put("type",type);// 获取影响的数据// 删除时,afterStruct为空JSONObject dataJsonObj = new JSONObject();if (afterStruct != null){// schema获取源数据的格式,fields获取里面的各个元素for (Field field : afterStruct.schema().fields()) {String fieldName = field.name();Object fieldValue = afterStruct.get(field);dataJsonObj.put(fieldName,fieldValue);}}// 删除操作会使得data属性不为空,但size为0jsonObj.put("data",dataJsonObj);// 向下游发送数据collector.collect(jsonObj.toJSONString()) ;}// 指定类型@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}
5 TableProcessFunction
package com.hzy.gmall.realtime.app.fun;/*** 实现动态分流* 目前流中有两条流中的数据,使用以下两个方法分别进行处理*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {// 声明维度侧输出流标签private OutputTag<JSONObject> dimTag;// 声明广播状态描述器private MapStateDescriptor<String, TableProcess> mapStateDescriptor;// 声明连接对象private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {// 注册驱动Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");// 获取连接conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {this.dimTag = dimTag;this.mapStateDescriptor = mapStateDescriptor;}// 处理业务流中的数据,maxwell从业务数据库中采集到的数据@Overridepublic void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {String table = jsonObj.getString("table");String type = jsonObj.getString("type");// 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insertif (type.equals("bootstrap-insert")){type = "insert";jsonObj.put("type",type);}// 拼接keyString key = table + ":" + type;// 获取状态ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 从状态中获取配置信息TableProcess tableProcess = broadcastState.get(key);if (tableProcess != null){// 在配置表中找到了该操作对应的配置// 判断是事实数据还是维度数据String sinkTable = tableProcess.getSinkTable();jsonObj.put("sink_table",sinkTable);// 在向下游传递数据之前,将不需要的字段过滤掉// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤JSONObject dataJsonObj = jsonObj.getJSONObject("data");filterColumns(dataJsonObj,tableProcess.getSinkColumns());String sinkType = tableProcess.getSinkType();if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){// 维度数据,放到维度侧输出流中ctx.output(dimTag,jsonObj);}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){// 事实数据,放到主流中out.collect(jsonObj);}}else {// 在配置表中没有该操作对应的配置System.out.println("No This Key In TableProcess:" + key);}}// 过滤字段private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}// sinkColumns : id,tm_nameString[] columnArr = sinkColumns.split(",");// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法List<String> columnList = Arrays.asList(columnArr);// 获取json中的每一个名值对(KV)Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
// // 获取迭代器
// Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
// // 遍历,如果不包含则删除
// for (;it.hasNext();) {
// if(!columnList.contains(it.next().getKey())){
// it.remove();
// }
// }entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));}// 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息@Overridepublic void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {// 获取状态BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 将json格式字符串转换为JSON对象JSONObject jsonObj = JSONObject.parseObject(jsonStr);// 获取配置表中的一条配置信息// parseObject:将json格式字符串转化为json格式对象// 第二个参数为将json字符串转化为何种格式的对象TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);// 业务数据库表名String sourceTable = tableProcess.getSourceTable();// 操作类型String operateType = tableProcess.getOperateType();// 数据类型 hbase -- 维度数据 kafka -- 事实数据String sinkType = tableProcess.getSinkType();// 指定输出目的地String sinkTable = tableProcess.getSinkTable();// 主键String sinkPk = tableProcess.getSinkPk();// 指定保留字段(列)String sinkColumns = tableProcess.getSinkColumns();// 指定建表扩展语句String sinkExtend = tableProcess.getSinkExtend();// 如果读取到的配置信息是维度数据,提前创建维度表if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);}// 将配置信息放到状态中// 拼接keyString key = sourceTable + ":" + operateType;broadcastState.put(key,tableProcess);}// 在处理配置数据时,提前建立维度表// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {// 对主键进行空值处理if (pk == null){pk = "id";}// 对建表扩展进行空值处理if (ext == null){ext = "";}StringBuilder createSql = new StringBuilder("create table if not exists "+GmallConfig.HBASE_SCHEMA + "." + tableName +"(");String[] fieldArr = fields.split(",");for (int i = 0; i < fieldArr.length; i++) {String field = fieldArr[i];// 判断是否为主键if (field.equals(pk)){createSql.append(field + " varchar primary key ");}else {createSql.append(field + " varchar ");}if(i < fieldArr.length - 1){createSql.append(",");}}createSql.append(")" + ext);System.out.println("phoenix中的建表语句:" + createSql);// 创建数据库操作对象PreparedStatement ps = null;try {ps = conn.prepareStatement(createSql.toString());// 执行sql语句ps.execute();} catch (SQLException e) {e.printStackTrace();throw new RuntimeException("在phoenix中建表失败");} finally {// 释放资源if(ps != null){ps.close();}}}
}
6 DimSink
package com.hzy.gmall.realtime.app.fun;/*** 将维度侧输出流的数据写到hbase(Phoenix)中*/
public class DimSink extends RichSinkFunction<JSONObject> {private Connection conn;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}@Overridepublic void invoke(JSONObject jsonObj, Context context) throws Exception {// 上游传递过来的数据格式如下:// {"database":"gmall2022",// "data":{"tm_name":"a","id":13},// "commit":true,// "sink_table":"dim_base_trademark",// "type":"insert",// "table":"base_trademark","// ts":1670131087}// 获取维度表表名String tableName = jsonObj.getString("sink_table");// 获取数据JSONObject dataJsonObj = jsonObj.getJSONObject("data");// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);String upsertSql = genUpsertSql(tableName,dataJsonObj);System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);PreparedStatement ps = null;try {// 创建数据库操作对象ps = conn.prepareStatement(upsertSql);// 执行sql语句ps.executeUpdate();// 手动提交事务,phoenix的连接实现类不是自动提交事务conn.commit();}catch (SQLException e){e.printStackTrace();throw new RuntimeException("向phoenix维度表中插入数据失败了");} finally {// 释放资源if (ps != null){ps.close();}}}// 拼接插入语句private String genUpsertSql(String tableName, JSONObject dataJsonObj) {// id 10// tm_name zsString upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +" ("+ StringUtils.join(dataJsonObj.keySet(),",") +") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";return upsertSql;}
}