GPSS【实践 01】Developing a Greenplum Streaming Server Client 自定义GPSS客户端开发实例

news/2024/5/1 17:22:06/文章来源:https://blog.csdn.net/weixin_39168541/article/details/124950435

自定义GPSS客户端开发流程

    • 1.GPSS是什么
    • 2.架构
    • 3.组件下载安装
    • 4.自定义客户端
      • 4.1 GPSS Batch Data API Service Definition
      • 4.2 Setting up a Java Development Environment
      • 4.3 Generating the Batch Data API Client Classes
      • 4.4 Coding the GPSS Batch Data Client
        • 4.4.1 Connect to the GPSS Server
        • 4.4.2 Connect to Greenplum Database
        • 4.4.3 Retrieve Greenplum schema and table information
        • 4.4.4 Prepare a Greenplum table for writing
        • 4.4.5 Write data to the Greenplum table
    • 5.总结

1.GPSS是什么

Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。

  • Greenplum Stream Server包括gpss命令行工具。运行gpss时,会启动一个gpss实例,此实例无限期地等待客户端数据。
  • Greenplum Stream Server还包括gpsscli命令行工具,这是一个客户端工具,用于向GPSS实例提交数据加载作业并管理这些作业。

2.架构

Greenplum Stream Server是一个gRPC服务器。GPSS gRPC服务定义的内容包括:连接到Greenplum数据库和检查Greenplum元数据所需的操作和消息格式;数据从客户端写入greenplum数据库表所需的操作和消息格式。有关gRPC内容参考:https://grpc.io/docs/

gpsscli命令行工具是Greenplum Stream Server的gRPC客户端工具,也可以用于操作Greenplum-Kafka 集成和Greenplum-Informatica连接器。可以使用GPSS API开发自己的GPSS gRPC客户端。Greenplum Stream Server架构如下图:
在这里插入图片描述

Greenplum Stream Server 处理ETL任务的执行流程如下所示:

  • 用户通过客户端应用程序启动一个或多个ETL加载作业;
  • 客户端应用程序使用gRPC协议向正在运行的GPSS服务实例提交和启动数据加载作业;
  • GPSS服务实例将每个加载请求事务提交给Greenplum集群的Master节点,并创建或者重用已存在外部表来存储数据。
  • GPSS服务实例将客户端提交的数据直接写到Greenplum集群Segment节点中。

在这里插入图片描述

GPSS存在的问题
同一个Greenplum Stream Server实例,当有多个客户端同时向一张表写入数据时,客户端会出现连接不稳定情况,这里需要额外做处理。一个客户端向一张表写入数据,不会存在该问题。

3.组件下载安装

组件官网下载《地址》可根据安装的数据库版本进行下载,本次以gpss-gpdb6-1.6.0-rhel7-x86_64.gppkg安装包为例进行说明:

-- 版本信息查询
SELECT "version"()
-- 结果数据
PostgreSQL 9.4.24 
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source) 
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16

在这里插入图片描述

GPSS安装这里不再赘述,可查看《GPSS扩展安装启用配置启动实例》。

4.自定义客户端

《官方GPSS开发文档》已不再提供1.6及以下版本的在线数据,可下载pdf格式查看【实测在线1.7跟1.6是一样的】:

在这里插入图片描述

4.1 GPSS Batch Data API Service Definition

GPSS批处理数据API服务定义如下。将内容复制/粘贴到一个名为gpss的文件中。并注意文件系统的位置。

syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";package api;option java_multiple_files = true;// Connect service Request message
message ConnectRequest {string Host = 1;      // Host address of Greenplum master; must be accessible from gpss server systemint32 Port = 2;       // Greenplum master portstring Username = 3;  // User or role name that gpss uses to access Greenplum string Password = 4;  // User passwordstring DB = 5;        // Database namebool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL
}// Connect service Response message
message Session {string ID = 1;  // Id of client connection to gpss
}// Operation mode
enum Operation {Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.Merge = 1;   // Insert and UpdateUpdate = 2;  // Update the value of "UpdateColumns" if "MatchColumns" matchRead = 3;    // Not supported
}// Required parameters of the Insert operation
message InsertOption {repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loadingbool TruncateTable = 2;               // Truncate table before loading?int64 ErrorLimitCount = 4;            // Error limit count; used by external tableint32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}// Required parameters of the Update operation
message UpdateOption {repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or notrepeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns matchstring Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clauseint64 ErrorLimitCount = 4;            // Error limit count; used by external tableint32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {repeated string InsertColumns = 1;repeated string MatchColumns = 2;repeated string UpdateColumns = 3;string Condition = 4;int64 ErrorLimitCount = 5;int32 ErrorLimitPercentage = 6;
}// Open service Request message
message OpenRequest {Session Session = 1;      // Session ID returned by Connectstring SchemaName = 2;    // Name of the Greenplum Database schemastring TableName = 3;     // Name of the Greenplum Database tablestring PreSQL = 4;        // SQL to execute before gpss loads the datastring PostSQL = 5;       // SQL to execute after gpss loads the dataint32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supportedstring Encoding = 7;      // Encoding of text data; not supportedstring StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target tableoneof Option {            // Identify the type of write operation to performInsertOption InsertOption = 100;UpdateOption UpdateOption = 101;MergeOption MergeOption = 102;}
}message DBValue {oneof DBType {int32 Int32Value = 1;int64 Int64Value = 2;float Float32Value = 5;double Float64Value = 6;string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.bytes BytesValue = 8;google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezonegoogle.protobuf.NullValue NullValue = 11;}
}message Row {repeated DBValue Columns = 1;
}message RowData {    bytes Data = 1;     // A single protobuf-encoded Row
}// Write service Request message
message WriteRequest {Session Session = 1;repeated RowData Rows = 2;     // The data to load into the target table
}// Close service Response message
message TransferStats {          // Status of the data load operationint64 SuccessCount = 1;        // Number of rows successfully loadedint64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reachedrepeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}// Close service Request message
message CloseRequest {Session session = 1;int32 MaxErrorRows = 2;        // -1: returns all, 0: nothing, >0: max rowsbool Abort = 3;
}// ListSchema service request message
message ListSchemaRequest {Session Session = 1;
}message Schema {string Name = 1;string Owner = 2;
}// ListSchema service response message
message Schemas {repeated Schema Schemas = 1;
}// ListTable service request message
message ListTableRequest {Session Session = 1;string Schema = 2;    // 'public' is the default if no Schema is provided
}// DescribeTable service request message
message DescribeTableRequest {Session Session = 1;string SchemaName = 2;string TableName = 3;
}enum RelationType {Table = 0;View = 1;Index = 2;Sequence = 3;Special = 4;Other = 255;
}message TableInfo {string Name = 1;RelationType Type = 2;
}// ListTable service response message
message Tables {repeated TableInfo Tables = 1;
}// DescribeTable service response message
message Columns {repeated ColumnInfo Columns = 1;
}message ColumnInfo {string Name = 1;            // Column namestring DatabaseType = 2;    // Greenplum data typebool HasLength = 3;         // Contains length information?int64 Length = 4;           // Length if HasLength is truebool HasPrecisionScale = 5; // Contains precision or scale information?int64 Precision = 6;int64 Scale = 7;bool HasNullable = 8;       // Contains Nullable constraint?bool Nullable = 9;
}service Gpss {// Establish a connection to Greenplum Database; returns a Session objectrpc Connect(ConnectRequest) returns (Session) {}// Disconnect, freeing all resources allocated for a sessionrpc Disconnect(Session) returns (google.protobuf.Empty) {}// Prepare and open a table for writerpc Open(OpenRequest) returns(google.protobuf.Empty) {}// Write data to tablerpc Write(WriteRequest) returns(google.protobuf.Empty) {}// Close a write operationrpc Close(CloseRequest) returns(TransferStats) {}// List all available schemas in a databaserpc ListSchema(ListSchemaRequest) returns (Schemas) {}// List all tables and views in a schemarpc ListTable(ListTableRequest) returns (Tables) {}// Decribe table metadata(column name and column type)rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}

官方给出了数据类型的对应关系:

在这里插入图片描述

4.2 Setting up a Java Development Environment

Java 开发环境主要是 JDK 和 gRPC 的代码生成插件,这里以 Maven 为例,也可以从官方文档给出的 git 地址进行拉取:

        <!--以下为 GPSS 使用依赖的版本--><grpc.version>1.16.1</grpc.version><protobuf.version>3.6.1</protobuf.version><os.maven.plugin>1.6.2</os.maven.plugin><protobuf.maven.plugin>0.6.1</protobuf.maven.plugin><!--以下为 GPSS 使用的依赖--><dependency><groupId>io.grpc</groupId><artifactId>grpc-all</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>${protobuf.version}</version></dependency><!--以下为 GPSS 使用的插件--><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>${protobuf.maven.plugin}</version><configuration><protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin>

4.3 Generating the Batch Data API Client Classes

使用4.2的依赖和插件即可生成api类:

在这里插入图片描述

4.4 Coding the GPSS Batch Data Client

这里是实际的编程阶段,要注意的是GPSS插件安装之后要启动服务。

4.4.1 Connect to the GPSS Server

配置类:

@Configuration
@PropertySource(value = "classpath:gpss.properties")
public class GPSSConfiguration {@Value("${gpss_host}")public String gpssHost;@Value("${gpss_port}")public int gpssPort;
}

连接工具类:

@Component
public class GPSSManagement {@Autowiredprivate GPSSConfiguration configuration;private ManagedChannel channel = null;private GpssGrpc.GpssBlockingStub bStub = null;/*** 获取bStub** @return GpssGrpc.GpssBlockingStub 对象*/public GpssGrpc.GpssBlockingStub getBlockingStub() {channel = ManagedChannelBuilder.forAddress(configuration.gpssHost, configuration.gpssPort).usePlaintext(true).build();bStub = GpssGrpc.newBlockingStub(channel);return bStub;}/*** 获取session** @return Session对象*/public Session getSession() {ConnectRequest connRequest = ConnectRequest.newBuilder().setHost(configuration.gpHost).setPort(configuration.gpPort).setUsername(configuration.gpUsername).setPassword(configuration.gpPassword).setDB(configuration.gpDatabase).setUseSSL(false).build();return bStub.connect(connRequest);}/*** 断开连接** @param session 要断开的session对象*/public void closeConnection(Session session) {bStub.disconnect(session);}/*** 断开通道** @param time 断开等待时间* @throws InterruptedException 可能出现的异常*/public void closeChannel(long time) throws InterruptedException {if (time < 10) {time = 10L;}channel.shutdown().awaitTermination(time, TimeUnit.SECONDS);}
}

4.4.2 Connect to Greenplum Database

这里贴出官方的代码片段,连接数据库的代码会整合到GPSSUtill类内:

Session mSession = null;
String gpMasterHost = "localhost";
Integer gpMasterPort = 15432;
String gpRoleName = "gpadmin";
String gpPasswd = "changeme";
String dbname = "testdb";// create a connect request builder
ConnectRequest connReq = ConnectRequest.newBuilder().setHost(gpMasterHost).setPort(gpMasterPort).setUsername(gpRoleName).setPassword(gpPasswd).setDB(dbname).setUseSSL(false).build();// use the blocking stub to call the Connect service
mSession = bStub.connect(connReq);// (placeholder) do greenplum stuff here// use the blocking stub to call the Disconnect service
bStub.disconnect(mSession);

4.4.3 Retrieve Greenplum schema and table information

这里贴出官方的代码片段,获取数据库schema和表信息的代码会整合到GPSSUtill类内,获取全部schema:

import java.util.ArrayList;
import java.util.List;// create a list schema request builder
ListSchemaRequest lsReq = ListSchemaRequest.newBuilder().setSession(mSession).build();// use the blocking stub to call the ListSchema service
List<Schema> listSchema = bStub.listSchema(lsReq).getSchemasList();// extract the name of each schema and save in an array
ArrayList<String> schemaNameList = new ArrayList<String>();
for(Schema s : listSchema) {schemaNameList.add(s.getName());
} 

获取schema下的表:

// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder().setSession(mSession).setSchema(schemaName).build();// use the blocking stub to call the ListTable service
List<TableInfo> tblList = bStub.listTable(ltReq).getTablesList();// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {if(ti.getTypeValue() == RelationType.Table_VALUE) {tblNameList.add(ti.getName());}
}

获取表的字段信息【代码内要根据字段信息对数据进行校验和格式转换】:

// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(mSession).setSchemaName(schemaName).setTableName(tableName).build();// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();// print the name and type of each column
for(ColumnInfo ci : columnList) {String colname = ci.getName();String dbtype = ci.getDatabaseType();// display the column name and type to stdoutSystem.out.println( "column " + colname + " type: " + dbtype );
}

4.4.4 Prepare a Greenplum table for writing

这里贴出官方的代码片段,写入表准备的代码会整合到GPSSUtill类内:

Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).setTruncateTable(false).addInsertColumns("loantitle").addInsertColumns("riskscore").addInsertColumns("d2iratio").build();// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder().setSession(mSession).setSchemaName(schemaName).setTableName(tableName)//.setPreSQL("")//.setPostSQL("")//.setEncoding("").setTimeout(5)//.setStagingSchema("").setInsertOption(iOpt).build();// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);// (placeholder) write data here// create a close request builder
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder().setSession(mSession)//.setMaxErrorRows(15)//.setAbort(true).build();// use the blocking stub to call the Close service
tStats = bStub.close(cReq);// display the result to stdout
System.out.println( "CloseRequest tStats: " + tStats.toString() );

4.4.5 Write data to the Greenplum table

这里贴出官方的代码片段,写入表的代码会整合到GPSSUtill类内【包含meger和update操作代码】:

// create an array of rows
ArrayList<RowData> rows = new ArrayList<>();
for (int row = 0; row < 2; row++) {// create a row builderapi.Row.Builder builder = api.Row.newBuilder();// create builders for each column, in order, and set values - text, int, textapi.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();colbuilder1.setStringValue("xxx");builder.addColumns(colbuilder1.build());api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();colbuilder2.setInt32Value(77);builder.addColumns(colbuilder2.build());api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();colbuilder3.setStringValue("yyy");builder.addColumns(colbuilder3.build());// build the rowRowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());// add the rowrows.add(rowbuilder.build());
}// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder().setSession(mSession).addAllRows(rows).build();// use the blocking stub to call the Write service; it returns nothing
bStub.write(wReq);

GPSSUtil类代码:

@Component
public class GPSSUtil {@Autowiredprivate GPSSManagement gpssManagement;public void insertData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;InsertOption.Builder builderInsert = InsertOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).setTruncateTable(false);//        columnNameList.forEach(builderInsert::addInsertColumns);columnList.forEach(columnInfo -> {builderInsert.addInsertColumns(columnInfo.getName());
//            builderInsert.addExpressions("(jdata->>'" + columnInfo.getName() + "')::" + columnInfo.getDatabaseType());});InsertOption insertOpt = builderInsert.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName)
//                .setEncoding("utf-8").setTimeout(5).setInsertOption(insertOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the  Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}public void updateData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;UpdateOption.Builder updateOption = UpdateOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct);columnNameList.forEach(columnName -> {if ("id".equalsIgnoreCase(columnName)) {updateOption.addMatchColumns(columnName);} else {updateOption.addUpdateColumns(columnName);}});UpdateOption updateOpt = updateOption.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).setEncoding("utf-8").setTimeout(5).setUpdateOption(updateOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the  Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}public void mergeData(String insertTableName, List<Map<Object, Object>> data) {String schemaName = "public";int size = data.size();long start = System.currentTimeMillis();// 获取连接及sessionGpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();Session session = gpssManagement.getSession();// 根据入库表名称获取表结构DescribeTableRequest dtReq = DescribeTableRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).build();List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());// 封装入库信息并开启请求// 错误个数,错误百分比int errLimit = 20, errPct = 20;MergeOption.Builder mergeOption = MergeOption.newBuilder().setErrorLimitCount(errLimit).setErrorLimitPercentage(errPct).addMatchColumns("id");columnNameList.forEach(columnName -> {mergeOption.addInsertColumns(columnName);mergeOption.addUpdateColumns(columnName);});MergeOption mergeOpt = mergeOption.build();// 开启请求OpenRequest openReq = OpenRequest.newBuilder().setSession(session).setSchemaName(schemaName).setTableName(insertTableName).setEncoding("utf-8").setTimeout(5).setMergeOption(mergeOpt).build();gpssStub.open(openReq);// 格式化对象List<RowData> rows = new ArrayList<>();formatRows(rows, columnList, data);// create a write request builderWriteRequest writeReq = WriteRequest.newBuilder().setSession(session).addAllRows(rows).build();//use the blocking stub to call the write service it returns nothinggpssStub.write(writeReq);// create a close request builderTransferStats tStats = null;CloseRequest closeReq = CloseRequest.newBuilder().setSession(session).build();// use the blocking stub to call the  Close servicetStats = gpssStub.close(closeReq);gpssManagement.closeConnection(session);try {gpssManagement.closeChannel(20L);} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();long use = end - start;System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");}/*** 格式化数据** @param rows       格式化后的数据* @param columnList 目标表的字段名称和类型* @param data       入库数据*/private void formatRows(List<RowData> rows, List<ColumnInfo> columnList, List<Map<Object, Object>> data) {// addColumnsfor (Map<Object, Object> insertRow : data) {// builderRow.Builder builder = Row.newBuilder();// formatcolumnList.forEach(columnInfo -> {String name = columnInfo.getName();Object valueObject = insertRow.get(name);String databaseType = columnInfo.getDatabaseType();if (valueObject != null) {String value = valueObject.toString();if ("VARCHAR".equals(databaseType)) {builder.addColumns(DBValue.newBuilder().setStringValue(value));} else if ("INT2".equalsIgnoreCase(databaseType)) {builder.addColumns(DBValue.newBuilder().setInt32Value(Integer.parseInt(value)));}} else {if ("VARCHAR".equals(databaseType)) {builder.addColumns(DBValue.newBuilder().setStringValue(""));} else if ("INT2".equalsIgnoreCase(databaseType)) {builder.addColumns(DBValue.newBuilder().setInt32Value(-1));}}});// builder the rowRowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());// add the rowrows.add(rowBuilder.build());}}
}

5.总结

官方文档还算是比较详细的,但是仅给出核心代码,实际使用时要写的代码还是挺多的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_96754.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精准关键词获取-行业搜索词分析

SEO关键词的收集通常可以通过以下几种方法&#xff1a; 根据市场价值、搜索词竞争性和企业实际产品特征进行筛选&#xff1a;确定您的关键词列表之前&#xff0c;建议先进行市场分析&#xff0c;了解您的竞争对手、行业状况和目标受众等信息&#xff0c;以更好的了解所需的特定…

为何ChatGPT如此擅长编造故事?

“幻觉”——人工智能中的一个偏见性术语 AI聊天机器人(如OpenAI的ChatGPT)依赖于一种称为“大型语言模型”(LLM)的人工智能来生成它们的响应。LLM是一种计算机程序&#xff0c;经过数百万文本源的训练&#xff0c;可以阅读并生成“自然语言”文本语言&#xff0c;就像人类自然…

HTTP协议概述 | 简析HTTP请求流程 | HTTP8种请求方法

目录 &#x1f30f; HTTP的简单介绍 何为HTTP HTTP1.0与HTTP1.1 &#x1f30f; HTTP的请求方法 1、OPTIONS 2、HEAD 3、GET 4、POST 5、PUT 6、DELETE 7、TRACE 8、CONNECT &#x1f30f; HTTP的工作原理 &#x1f30f; HTTP请求/响应的步骤 1、客户端连接到Web…

【Linux】用户命令(创建,修改,切换,删除,密码)

目录 1.创建 查看用户信息 查看id 2.修改 修改用户名 修改用户uid 操作前&#xff1a; 操作后 修改组名 操作前&#xff1a; 操作后: 修改组id 操作前&#xff1a; 操作后&#xff1a; 操作前&#xff1a; 操作后: 3.切换用户 4.删除 操作前&#xff1a; 操作…

LeetCode:376. 摆动序列——说什么贪心和动规~

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; &#x1f33b;算法&#xff0c;不如说它是一种思考方式&#x1f340;算法专栏&#xff1a; &#x1f449;&#x1f3fb;123 一、&#x1f331;376. 摆动序列 题目描述&#xff1a;如果连续数字之间的差严格地在正数和…

php7类型约束,严格模式

在PHP7之前&#xff0c;函数和类方法不需要声明变量类型 &#xff0c;任何数据都可以被传递和返回&#xff0c;导致几乎大部分的调用操作都要判断返回的数据类型是否合格。 为了解决这个问题&#xff0c;PHP7引入了类型声明。 目前有两类变量可以声明类型&#xff1a; 形参&a…

拼多多运营中需要采集淘宝天猫京东平台商品详情页面数据上架拼多多店铺,如何使用技术封装接口实现

业务背景&#xff1a;电商平台趋势&#xff0c;平台化。大家可以看到大的电商都开始有自己的平台&#xff0c;其实这个道理很清楚&#xff0c;就是因为这是充分利用自己的流量、自己的商品和服务大效益化的一个过程&#xff0c;因为有平台&#xff0c;可以利用全社会的资源弥补…

RPC调用框架简单介绍

一.Thrift Apache Doris目前使用的RPC调度框架。Thrift是一款基于CS&#xff08;client -server&#xff09;架构的RPC通信框架&#xff0c;开发人员可以根据定义Thrift的IDL(interface decription language)文件来定义数据结构和服务接口&#xff0c;灵活性高&#xff0c;支持…

项目5:实现数据字典的上传下载

项目5&#xff1a;实现数据字典的上传下载 1.什么是数据字典&#xff1f;如何设计&#xff1f; 2.业务流程逻辑 3.数据库表的设计 4.实现上传下载逻辑&#xff08;前端&#xff09; 5.实现上传逻辑&#xff08;后端&#xff09; 6.实现下载依赖&#xff08;后端&#xff…

代码随想录Day49

今天继续学习动规解决完全背包问题。 322.零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;…

vuex中的 mapState, mapMutations

vuex中的 mapState&#xff0c; mapMutations Start 今天使用vuex的过程中&#xff0c;遇到 mapState&#xff0c; mapMutations 这么两个函数&#xff0c;今天学习一下这两个函数。 本文介绍的vuex基于 vuex3.0 1. 官方文档说明 1.1 mapState 官方解释 点击这里&#xff1…

【JUC进阶】详解synchronized锁升级

文章目录1. synchronized概述2. synchronized 的实现原理2.1 Java对象组成2.2 Monitor2.3 从字节码角度看synchronized3. 锁升级3.1 偏向锁3.2 轻量级锁1. synchronized概述 synchronized是一个悲观锁&#xff0c;可以实现线程同步&#xff0c;在多线程的环境下&#xff0c;需…

DIN35电压电流转频率单位脉冲输出信号变换器集电极开路隔离变送器

主要特性 将直流电压或电流信号转换成单位脉冲信号。 精度等级&#xff1a;0.1 级、0.2 级。产品出厂前已检验校正&#xff0c;用户可以直接使用。 国际标准信号输入:0-5V/0-10V/1-5V 等电压信号,0-10mA/0-20mA/4-20mA 等电流信号。 输出标准信号&#xff1a;0-5KHz/0-…

Flink CDC 在京东的探索与实践

摘要&#xff1a;本文整理自京东资深技术专家韩飞&#xff0c;在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分&#xff1a; 京东自研 CDC 介绍京东场景的 Flink CDC 优化业务案例未来规划点击查看直播回放和演讲 PPT 一、京东自研 CDC 介绍 京东自研…

小白学Pytorch系列- -torch.distributions API Distributions (1)

小白学Pytorch系列- -torch.distributions API Distributions (1) 分布包包含可参数化的概率分布和抽样函数。这允许构造用于优化的随机计算图和随机梯度估计器。这个包通常遵循TensorFlow分发包的设计。 不可能通过随机样本直接反向传播。但是&#xff0c;有两种主要方法可以…

tomcat中出现RFC7230和RFC3986问题解析

问题截图 问题分析 出现上述问题&#xff0c;是因为各版本tomcat中对特殊字符和请求路径中携带中文参数而产生的错误提示。 解决办法 1、调整tomcat版本 tomcat 7.0.76之前的版本不会出现类似问题 2、tomcat9之前&#xff0c;修改tomcat目录底下的/conf/catalina.properti…

chapter-5 数据库设计

以下课程来源于MOOC学习—原课程请见&#xff1a;数据库原理与应用 考研复习 引言 设计的时候: 我们为什么不能设计成R&#xff08;学号&#xff0c;课程号&#xff0c;姓名&#xff0c;所咋系&#xff0c;系主任&#xff0c;成绩&#xff09;&#xff1f; 因为存在数据冗余…

C++算法初级7——二分查找

C算法初级7——二分查找 文章目录C算法初级7——二分查找在升序的数组上进行二分查找总结应用范围应用二分查找的原理&#xff1a;每次排除掉一半答案&#xff0c;使可能的答案区间快速缩小。 二分查找的时间复杂度&#xff1a;O(log n)&#xff0c;因为每次询问会使可行区间的…

appium+python自动化测试启动app

一、部署环境 1、依次下载安装以下工具&#xff0c;并配置环境变量&#xff1a; android sdk Nodejs appium appium-doctor Appium-Python-Client pycharm64 ps:安装包下载和配置环境变量的操作步骤跟着网上各路大神的帖子一步一步做就好了&#xff0c;没啥难度 二、连…

Machine Learning-Ex4(吴恩达课后习题)Neural Networks Learning

目录 1. Neural Networks 1.1 Visualizing the data 1.2 Model representation 1.3 Feedforward and cost function 1.4 Regularized cost function 2. Backpropagation 2.1 Sigmoid gradient 2.2 Random initialization 2.3 Backpropagation 2.4 Gradient Checking…