自定义GPSS客户端开发流程
- 1.GPSS是什么
- 2.架构
- 3.组件下载安装
- 4.自定义客户端
- 4.1 GPSS Batch Data API Service Definition
- 4.2 Setting up a Java Development Environment
- 4.3 Generating the Batch Data API Client Classes
- 4.4 Coding the GPSS Batch Data Client
- 4.4.1 Connect to the GPSS Server
- 4.4.2 Connect to Greenplum Database
- 4.4.3 Retrieve Greenplum schema and table information
- 4.4.4 Prepare a Greenplum table for writing
- 4.4.5 Write data to the Greenplum table
- 5.总结
1.GPSS是什么
Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。
- Greenplum Stream Server包括gpss命令行工具。运行gpss时,会启动一个gpss实例,此实例无限期地等待客户端数据。
- Greenplum Stream Server还包括gpsscli命令行工具,这是一个客户端工具,用于向GPSS实例提交数据加载作业并管理这些作业。
2.架构
Greenplum Stream Server是一个gRPC服务器。GPSS gRPC服务定义的内容包括:连接到Greenplum数据库和检查Greenplum元数据所需的操作和消息格式;数据从客户端写入greenplum数据库表所需的操作和消息格式。有关gRPC内容参考:https://grpc.io/docs/
gpsscli命令行工具是Greenplum Stream Server的gRPC客户端工具,也可以用于操作Greenplum-Kafka 集成和Greenplum-Informatica连接器。可以使用GPSS API开发自己的GPSS gRPC客户端。Greenplum Stream Server架构如下图:
Greenplum Stream Server 处理ETL任务的执行流程如下所示:
- 用户通过客户端应用程序启动一个或多个ETL加载作业;
- 客户端应用程序使用gRPC协议向正在运行的GPSS服务实例提交和启动数据加载作业;
- GPSS服务实例将每个加载请求事务提交给Greenplum集群的Master节点,并创建或者重用已存在外部表来存储数据。
- GPSS服务实例将客户端提交的数据直接写到Greenplum集群Segment节点中。
GPSS存在的问题
同一个Greenplum Stream Server实例,当有多个客户端同时向一张表写入数据时,客户端会出现连接不稳定情况,这里需要额外做处理。一个客户端向一张表写入数据,不会存在该问题。
3.组件下载安装
组件官网下载《地址》可根据安装的数据库版本进行下载,本次以gpss-gpdb6-1.6.0-rhel7-x86_64.gppkg
安装包为例进行说明:
-- 版本信息查询
SELECT "version"()
-- 结果数据
PostgreSQL 9.4.24
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source)
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16
GPSS安装这里不再赘述,可查看《GPSS扩展安装启用配置启动实例》。
4.自定义客户端
《官方GPSS开发文档》已不再提供1.6及以下版本的在线数据,可下载pdf格式查看【实测在线1.7跟1.6是一样的】:
4.1 GPSS Batch Data API Service Definition
GPSS批处理数据API服务定义如下。将内容复制/粘贴到一个名为gpss的文件中。并注意文件系统的位置。
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";package api;option java_multiple_files = true;// Connect service Request message
message ConnectRequest {string Host = 1; // Host address of Greenplum master; must be accessible from gpss server systemint32 Port = 2; // Greenplum master portstring Username = 3; // User or role name that gpss uses to access Greenplum string Password = 4; // User passwordstring DB = 5; // Database namebool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
}// Connect service Response message
message Session {string ID = 1; // Id of client connection to gpss
}// Operation mode
enum Operation {Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.Merge = 1; // Insert and UpdateUpdate = 2; // Update the value of "UpdateColumns" if "MatchColumns" matchRead = 3; // Not supported
}// Required parameters of the Insert operation
message InsertOption {repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loadingbool TruncateTable = 2; // Truncate table before loading?int64 ErrorLimitCount = 4; // Error limit count; used by external tableint32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}// Required parameters of the Update operation
message UpdateOption {repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or notrepeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns matchstring Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clauseint64 ErrorLimitCount = 4; // Error limit count; used by external tableint32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {repeated string InsertColumns = 1;repeated string MatchColumns = 2;repeated string UpdateColumns = 3;string Condition = 4;int64 ErrorLimitCount = 5;int32 ErrorLimitPercentage = 6;
}// Open service Request message
message OpenRequest {Session Session = 1; // Session ID returned by Connectstring SchemaName = 2; // Name of the Greenplum Database schemastring TableName = 3; // Name of the Greenplum Database tablestring PreSQL = 4; // SQL to execute before gpss loads the datastring PostSQL = 5; // SQL to execute after gpss loads the dataint32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supportedstring Encoding = 7; // Encoding of text data; not supportedstring StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target tableoneof Option { // Identify the type of write operation to performInsertOption InsertOption = 100;UpdateOption UpdateOption = 101;MergeOption MergeOption = 102;}
}message DBValue {oneof DBType {int32 Int32Value = 1;int64 Int64Value = 2;float Float32Value = 5;double Float64Value = 6;string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.bytes BytesValue = 8;google.protobuf.Timestamp TimeStampValue = 10; // Time without timezonegoogle.protobuf.NullValue NullValue = 11;}
}message Row {repeated DBValue Columns = 1;
}message RowData { bytes Data = 1; // A single protobuf-encoded Row
}// Write service Request message
message WriteRequest {Session Session = 1;repeated RowData Rows = 2; // The data to load into the target table
}// Close service Response message
message TransferStats { // Status of the data load operationint64 SuccessCount = 1; // Number of rows successfully loadedint64 ErrorCount = 2; // Number of error lines if Errorlimit is not reachedrepeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}// Close service Request message
message CloseRequest {Session session = 1;int32 MaxErrorRows = 2; // -1: returns all, 0: nothing, >0: max rowsbool Abort = 3;
}// ListSchema service request message
message ListSchemaRequest {Session Session = 1;
}message Schema {string Name = 1;string Owner = 2;
}// ListSchema service response message
message Schemas {repeated Schema Schemas = 1;
}// ListTable service request message
message ListTableRequest {Session Session = 1;string Schema = 2; // 'public' is the default if no Schema is provided
}// DescribeTable service request message
message DescribeTableRequest {Session Session = 1;string SchemaName = 2;string TableName = 3;
}enum RelationType {Table = 0;View = 1;Index = 2;Sequence = 3;Special = 4;Other = 255;
}message TableInfo {string Name = 1;RelationType Type = 2;
}// ListTable service response message
message Tables {repeated TableInfo Tables = 1;
}// DescribeTable service response message
message Columns {repeated ColumnInfo Columns = 1;
}message ColumnInfo {string Name = 1; // Column namestring DatabaseType = 2; // Greenplum data typebool HasLength = 3; // Contains length information?int64 Length = 4; // Length if HasLength is truebool HasPrecisionScale = 5; // Contains precision or scale information?int64 Precision = 6;int64 Scale = 7;bool HasNullable = 8; // Contains Nullable constraint?bool Nullable = 9;
}service Gpss {// Establish a connection to Greenplum Database; returns a Session objectrpc Connect(ConnectRequest) returns (Session) {}// Disconnect, freeing all resources allocated for a sessionrpc Disconnect(Session) returns (google.protobuf.Empty) {}// Prepare and open a table for writerpc Open(OpenRequest) returns(google.protobuf.Empty) {}// Write data to tablerpc Write(WriteRequest) returns(google.protobuf.Empty) {}// Close a write operationrpc Close(CloseRequest) returns(TransferStats) {}// List all available schemas in a databaserpc ListSchema(ListSchemaRequest) returns (Schemas) {}// List all tables and views in a schemarpc ListTable(ListTableRequest) returns (Tables) {}// Decribe table metadata(column name and column type)rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}
官方给出了数据类型的对应关系:
4.2 Setting up a Java Development Environment
Java 开发环境主要是 JDK 和 gRPC 的代码生成插件,这里以 Maven 为例,也可以从官方文档给出的 git 地址进行拉取:
<!--以下为 GPSS 使用依赖的版本--><grpc.version>1.16.1</grpc.version><protobuf.version>3.6.1</protobuf.version><os.maven.plugin>1.6.2</os.maven.plugin><protobuf.maven.plugin>0.6.1</protobuf.maven.plugin><!--以下为 GPSS 使用的依赖--><dependency><groupId>io.grpc</groupId><artifactId>grpc-all</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>${protobuf.version}</version></dependency><!--以下为 GPSS 使用的插件--><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>${protobuf.maven.plugin}</version><configuration><protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin>
4.3 Generating the Batch Data API Client Classes
使用4.2的依赖和插件即可生成api类:
4.4 Coding the GPSS Batch Data Client
这里是实际的编程阶段,要注意的是GPSS插件安装之后要启动服务。
4.4.1 Connect to the GPSS Server
配置类:
@Configuration
@PropertySource(value = "classpath:gpss.properties")
public class GPSSConfiguration {@Value("${gpss_host}")public String gpssHost;@Value("${gpss_port}")public int gpssPort;
}
连接工具类:
@Component
public class GPSSManagement {@Autowiredprivate GPSSConfiguration configuration;private ManagedChannel channel = null;private GpssGrpc.GpssBlockingStub bStub = null;/*** 获取bStub** @return GpssGrpc.GpssBlockingStub 对象*/public GpssGrpc.GpssBlockingStub getBlockingStub() {channel = ManagedChannelBuilder.forAddress(configuration.gpssHost, configuration.gpssPort).usePlaintext(true).build();bStub = GpssGrpc.newBlockingStub(channel);return bStub;}/*** 获取session** @return Session对象*/public Session getSession() {ConnectRequest connRequest = ConnectRequest.newBuilder().setHost(configuration.gpHost).setPort(configuration.gpPort).setUsername(configuration.gpUsername).setPassword(configuration.gpPassword).setDB(configuration.gpDatabase).setUseSSL(false).build();return bStub.connect(connRequest);}/*** 断开连接** @param session 要断开的session对象*/public void closeConnection(Session session) {bStub.disconnect(session);}/*** 断开通道** @param time 断开等待时间* @throws InterruptedException 可能出现的异常*/public void closeChannel(long time) throws InterruptedException {if (time < 10) {time = 10L;}channel.shutdown().awaitTermination(time, TimeUnit.SECONDS);}
}
4.4.2 Connect to Greenplum Database
这里贴出官方的代码片段,连接数据库的代码会整合到GPSSUtill
类内:
Session mSession = null;
String gpMasterHost = "localhost";
Integer gpMasterPort = 15432;
String gpRoleName = "gpadmin";
String gpPasswd = "changeme";
String dbname = "testdb";// create a connect request builder
ConnectRequest connReq = ConnectRequest.newBuilder().setHost(gpMasterHost).setPort(gpMasterPort).setUsername(gpRoleName).setPassword(gpPasswd).setDB(dbname).setUseSSL(false).build();// use the blocking stub to call the Connect service
mSession = bStub.connect(connReq);// (placeholder) do greenplum stuff here// use the blocking stub to call the Disconnect service
bStub.disconnect(mSession);
4.4.3 Retrieve Greenplum schema and table information
这里贴出官方的代码片段,获取数据库schema和表信息的代码会整合到GPSSUtill
类内,获取全部schema:
import java.util.ArrayList;
import java.util.List;// create a list schema request builder
ListSchemaRequest lsReq = ListSchemaRequest.newBuilder().setSession(mSession).build();// use the blocking stub to call the ListSchema service
List<Schema> listSchema = bStub.listSchema(lsReq).getSchemasList();// extract the name of each schema and save in an array
ArrayList<String> schemaNameList = new ArrayList<String>();
for(Schema s : listSchema) {schemaNameList.add(s.getName());
}
获取schema下的表:
// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder().setSession(mSession).setSchema(schemaName).build();// use the blocking stub to call the ListTable service
List<TableInfo> tblList = bStub.listTable(ltReq).getTablesList();// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {if(ti.getTypeValue() == RelationType.Table_VALUE) {tblNameList.add(ti.getName());}
}
获取表的字段信息【代码内要根据字段信息对数据进行校验和格式转换】:
// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(mSession).setSchemaName(schemaName).setTableName(tableName).build();// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();// print the name and type of each column
for(ColumnInfo ci : columnList) {String colname = ci.getName();String dbtype = ci.getDatabaseType();// display the column name and type to stdoutSystem.out.println( "column " + colname + " type: " + dbtype );
}
4.4.4 Prepare a Greenplum table for writing
这里贴出官方的代码片段,写入表准备的代码会整合到GPSSUtill
类内:
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).setTruncateTable(false).addInsertColumns("loantitle").addInsertColumns("riskscore").addInsertColumns("d2iratio").build();// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder().setSession(mSession).setSchemaName(schemaName).setTableName(tableName)//.setPreSQL("")//.setPostSQL("")//.setEncoding("").setTimeout(5)//.setStagingSchema("").setInsertOption(iOpt).build();// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);// (placeholder) write data here// create a close request builder
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder().setSession(mSession)//.setMaxErrorRows(15)//.setAbort(true).build();// use the blocking stub to call the Close service
tStats = bStub.close(cReq);// display the result to stdout
System.out.println( "CloseRequest tStats: " + tStats.toString() );
4.4.5 Write data to the Greenplum table
这里贴出官方的代码片段,写入表的代码会整合到GPSSUtill
类内【包含meger和update操作代码】:
// create an array of rows
ArrayList<RowData> rows = new ArrayList<>();
for (int row = 0; row < 2; row++) {// create a row builderapi.Row.Builder builder = api.Row.newBuilder();// create builders for each column, in order, and set values - text, int, textapi.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();colbuilder1.setStringValue("xxx");builder.addColumns(colbuilder1.build());api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();colbuilder2.setInt32Value(77);builder.addColumns(colbuilder2.build());api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();colbuilder3.setStringValue("yyy");builder.addColumns(colbuilder3.build());// build the rowRowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());// add the rowrows.add(rowbuilder.build());
}// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder().setSession(mSession).addAllRows(rows).build();// use the blocking stub to call the Write service; it returns nothing
bStub.write(wReq);
GPSSUtil
类代码:
@Component
public class GPSSUtil {@Autowiredprivate GPSSManagement gpssManagement;public void insertData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;InsertOption.Builder builderInsert = InsertOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).setTruncateTable(false);// columnNameList.forEach(builderInsert::addInsertColumns);columnList.forEach(columnInfo -> {builderInsert.addInsertColumns(columnInfo.getName());
// builderInsert.addExpressions("(jdata->>'" + columnInfo.getName() + "')::" + columnInfo.getDatabaseType());});InsertOption insertOpt = builderInsert.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName)
// .setEncoding("utf-8").setTimeout(5).setInsertOption(insertOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}public void updateData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;UpdateOption.Builder updateOption = UpdateOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct);columnNameList.forEach(columnName -> {if ("id".equalsIgnoreCase(columnName)) {updateOption.addMatchColumns(columnName);} else {updateOption.addUpdateColumns(columnName);}});UpdateOption updateOpt = updateOption.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).setEncoding("utf-8").setTimeout(5).setUpdateOption(updateOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}public void mergeData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;MergeOption.Builder mergeOption = MergeOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).addMatchColumns("id");columnNameList.forEach(columnName -> {mergeOption.addInsertColumns(columnName);mergeOption.addUpdateColumns(columnName);});MergeOption mergeOpt = mergeOption.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).setEncoding("utf-8").setTimeout(5).setMergeOption(mergeOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}/*** 格式化数据** @param rows 格式化后的数据* @param columnList 目标表的字段名称和类型* @param data 入库数据*/private void formatRows(List<RowData> rows, List<ColumnInfo> columnList, List<Map<Object, Object>> data) {// addColumnsfor (Map<Object, Object> insertRow : data) {// builderRow.Builder builder = Row.newBuilder();// formatcolumnList.forEach(columnInfo -> {String name = columnInfo.getName();Object valueObject = insertRow.get(name);String databaseType = columnInfo.getDatabaseType();if (valueObject != null) {String value = valueObject.toString();if ("VARCHAR".equals(databaseType)) {builder.addColumns(DBValue.newBuilder().setStringValue(value));} else if ("INT2".equalsIgnoreCase(databaseType)) {builder.addColumns(DBValue.newBuilder().setInt32Value(Integer.parseInt(value)));}} else {if ("VARCHAR".equals(databaseType)) {builder.addColumns(DBValue.newBuilder().setStringValue(""));} else if ("INT2".equalsIgnoreCase(databaseType)) {builder.addColumns(DBValue.newBuilder().setInt32Value(-1));}}});// builder the rowRowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());// add the rowrows.add(rowBuilder.build());}}
}
5.总结
官方文档还算是比较详细的,但是仅给出核心代码,实际使用时要写的代码还是挺多的。