更新时间:2024-10-26 GMT+08:00

dws-client

简介

dws-client是一款基于DWS JDBC实现的高性能、便捷入库工具,用户在使用时必须保证JDBC可以连接。其中使用dws-client入库具备如下优势:

  1. dws-client提供对缓存的空间、时间维度的限制,支持攒批提升入库性能,以满足业务在高峰低谷期的入库实时性。

    攒批:在实时性要求不严格的场景,对单条数据操作进行缓存,待缓存至多条后批量操作,以提升写入性能。

  2. 支持并发入库。
  3. 内部实现多种高性能入库方式、主键冲突策略,满足各种场景入库需求。
  4. API方式交互,低门槛使用。
图1 dws-client交互场景

引入依赖

dws-client已经发布至maven仓库,可在仓库中选择最新版本使用,链接请参见:https://mvnrepository.com/artifact/com.huaweicloud.dws/dws-client
1
2
3
4
5
<dependency>
   <groupId>com.huaweicloud.dws</groupId>
   <artifactId>dws-client</artifactId>
   <version>${version}</version>
</dependency>

场景使用

前提条件:初始化client

以下举例为最简单的方式,只需要数据库的连接配置即可,其余配置均采用默认配置:
1
2
3
4
5
6
7
8
9
public DwsClient getClient(){
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:gaussdb://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .build();
        return new DwsClient(config);
     }
场景一:表级参数配置
1
2
3
4
5
6
7
return DwsConfig.builder()
                .withUrl(System.getenv("db_url"))
                .withPassword(System.getenv("db_pwd"))
                .withUsername(System.getenv("db_username"))
                .withAutoFlushBatchSize(1000) // 默认攒批1000
                .withTableConfig("test.t_c_batch_size_2", new TableConfig()
                        .withAutoFlushBatchSize(500)); // 对于表test.t_c_batch_size_2 攒批500;

场景二:使用数据库连接执行SQL

该接口主要用于一些特殊业务,在目前支持的功能中无法满足时使用。例如: 数据查询,可以直接使用原生JDBC连接操作数据库。

API参数为一个函数式接口,接口会提供一个数据库连接,返回值可以是任意类型,由业务返回类型决定。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void sql() throws DwsClientException {
        Integer id = getClient().sql(connection -> {
            try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) {
                if (resultSet.next()) {
                    return resultSet.getInt("id");
                }
            }
            return null;
        });
        System.out.println("zhangsan id = " + id);
     }

场景三:获取表信息

API可以根据一个带schema的表名获取到表结构(会存在缓存),表结构的定义包括:所有列、主键。
1
2
3
public void getTableSchema() throws DwsClientException {
        TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test"));
     }

场景四:数据入库

数据入库的操作client提供一个write API承载,接口会返回一个Operate接口,可以通过接口操作表字段,提交代表操作完成,交由client执行入库,提交时可以选择同步或者异步,设置字段时可以选择是否在主键冲突时忽略该设置。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void write() throws DwsClientException {
        getClient().write("test.test")
                .setObject("id", 1)
                .setObject("name", "test")
                // 只有在insert时会生效该设置,主键冲突时不更新
                .setObject("age", 38, true)
                // 异步入库,放入后台缓存即可返回
                //.commit()
                // 同步入库 会等待入库成功返回
                .syncCommit();
     }

场景五:数据删除

删除API和入库API都由Operate承载,但删除时只能且必须设置主键字段,同时忽略更新字段不生效。
public void delete() throws DwsClientException {
        getClient().delete("test.test")
                .setObject("id", 1)
                // 异步入库,放入后台缓存即可返回
                //.commit()
                // 同步入库 会等待入库成功返回
                .syncCommit();
    }
场景六:强制刷新缓存到数据库
public void flush() throws DwsClientException {
        getClient().flush();
    }

场景七:关闭资源

close时会将缓存刷库,同时close后无法再次执行入库、删除、sql等API接口。
public void close() throws IOException {
        getClient().close();
     }

监听数据入库成功的事件

在异步入库的场景,想要知道哪些数据已经入库,则可通过绑定flushSuccess函数接口实现,该接口会在数据库事务提交完成后回调,回调时会将该批入库数据传递给接口。

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onFlushSuccess(records -> {
                    for (Record record : records) {
                        log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record));
                    }
                })
                .build();
        return new DwsClient(config);
     }

监听后台异常任务

异步入库数据会由后台任务执行,后台任务执行失败想要感知可通过绑定error函数接口实现,否则只能在下次提交时发生异常错误,由业务感知,如果绑定接口不上报异常,那么该异常将消除,不会再下次提交时报出,否则会以接口异常在下次提交时报出给业务。

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onError((clientException, client) -> {
                    if (clientException instanceof DwsClientRecordException) {
                        DwsClientRecordException recordException = (DwsClientRecordException) clientException;
                        List<Record> records = recordException.getRecords();
                        List<DwsClientException> exceptions = recordException.getExceptions();
                        for (int i = 0; i < records.size(); i++) {
                            log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i));
                        }
                    }
                    if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) {
                        throw clientException;
                    }
                    log.error("code = {}", clientException.getCode(), clientException.getOriginal());
                    return null;
                })
                .build();
        return new DwsClient(config);
     }

异常处理

异常可分为三类:

  1. InvalidException运行时异常不显示抛出,触发在请求参数无效时。
  2. DwsClientException对所有异常的封装,包含被解析的code以及原始异常。
  3. DwsClientRecordException对DwsClientException的扩展,包含写入异常的数据集合以及对应DwsClientException异常。

异常code对照:

public enum ExceptionCode {
    /**
     * 无效参数
     */
    INVALID_CONFIG(1),

    /**
     * 连接异常
     */
    CONNECTION_ERROR(100),
    /**
     * 只读
     */
    READ_ONLY(101),
    /**
     * 超时
     */
    TIMEOUT(102),
    /**
     * 连接数过多
     */
    TOO_MANY_CONNECTIONS(103),
    /**
     * 加锁异常
     */
    LOCK_ERROR(104),


    /**
     * 认证失败
     */
    AUTH_FAIL(201),
    /**
     * 已经关闭
     */
    ALREADY_CLOSE(202),
    /**
     * 无权限
     */
    PERMISSION_DENY(203),
    SYNTAX_ERROR(204),
    /**
     * 内部异常
     */
    INTERNAL_ERROR(205),
    /**
     * 中断异常
     */
    INTERRUPTED(206),
    /**
     * 表未发现
     */
    TABLE_NOT_FOUND(207),
    CONSTRAINT_VIOLATION(208),
    DATA_TYPE_ERROR(209),
    DATA_VALUE_ERROR(210),

    /**
     * 解析不到的异常
     */
    UNKNOWN_ERROR(500);
    private final int code;

 }

详细配置说明

参数

说明

默认值

支持版本

url

dws数据库JDBC连接地址。

-

1.0

username

dws数据库用户名。

-

password

dws数据库用户密码。

-

connectionMaxUseTimeSeconds

连接最大使用时间(秒),超过该时间会强制关闭当前连接并重新获取;使用COPY_MERGE/COPY_UPSERT时会使用临时表,临时表的schema在连接断开时才会清除,主要用于清除该部分数据。

3600

connectionMaxIdleMs

连接最大空闲时间(毫秒)。

60000

metadataCacheSeconds

元数据缓存时间(秒),为提升性能,会对理论上不怎么变更的数据,例如表结构,该参数用于设置缓存过期时间。

180

retryBaseTime

重试时sleep时间 = retryBaseTime * 次数 + (0~retryRandomTime)毫秒,该参数设置时间基数(毫秒)。

1000

retryRandomTime

retryBaseTime重试时sleep时间 = retryBaseTime * 次数 +(0~retryRandomTime)毫秒,该参数设置重试时的随机数范围,该参数主要用于在死锁场景将两个task执行时间错开(毫秒)。

300

maxFlushRetryTimes

执行刷库任务时,最大尝试执行次数。

3

autoFlushBatchSize

后台任务刷库策略:缓存条数大于等于autoFlushBatchSize或者当前时间 - 缓存开始时间大于等于autoFlushMaxIntervalMs,该参数配置缓存最大条数。

5000

autoFlushMaxIntervalMs

后台任务刷库策略:缓存条数大于等于autoFlushBatchSize或者当前时间 - 缓存开始时间大于等于autoFlushMaxIntervalMs,该参数配置缓存最大时间(毫秒)。

3000

copyWriteBatchSize

在writeMode设置为AUTO时,在数据量低于copyWriteBatchSize时会使用upsert方式入库,否则根据是否有主键选择copy/copy+ upsert方式入库。

6000

writeMode

数据写入模式:

  • AUTO:

    数据量低于copyWriteBatchSize使用UPSERT方式入库,否则使用COPY_UPSERT方式入库。

  • COPY_MERGE:
    • 有主键使用copy+merge入库。
    • 无主键使用copy入库。
  • COPY_UPSERT:
    • 无主键使用copy入库。
    • 有主键使用copy + upsert入库。
  • UPSERT:
    • 无主键使用insert into。
    • 有主键使用upsert入库。
  • UPDATE:
    • 使用update where语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须是唯一索引,但非唯一索引可能会影响性能。
  • COPY_UPDATE:
    • 数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。
  • UPDATE_AUTO:
    • 批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。

AUTO

conflictStrategy

数据库存在主键时的主键冲突策略:

  • INSERT_OR_IGNORE:主键冲突时忽略新数据。
  • INSERT_OR_UPDATE:主键冲突时使用新的数据列更新原数据对应列。
  • INSERT_OR_REPLACE:主键冲突时使用新数据替换原数据,数据库中新数据不包含列设置为null,对于全列更新和INSERT_OR_UPDATE无差异。

INSERT_OR_UPDATE

threadSize

执行任务时的并发数量,异步任务中以表为维度提交任务,多表之间可并发;对于同一个表存在对字段列数不同的操作,例如在攒批中存在100条操作A B C字段、200条操作A B D字段那么最后会将操作字段一样的归为一类,不同类之间可并发入库,设置该参数可参考这两个场景设置,以提升吞吐。

3

logSwitch

日志开关,开启后会打印比较详细的过程日志,便于在调试或定位问题时开启。

false

logDataTables

入库时需要打印数据的表,便于在定位问题时对比数据。

-

flushSuccessFunction

数据入库成功后的回调函数。

-

errorFunction

后台任务执行失败的回调函数。

-

batchOutWeighRatio

为提高整体吞吐,当对autoFlushBatchSize要求不是很严格时,可设置该参数,当往buffer中提交数据时buffer中数据量大于batchOutWeighRatio * autoFlushBatchSize时提交线程将会执行提交入库的任务,该参数用于避免业务线程提交任务,尽量使用后台线程执行提交。

1

tableConfig

对于conflictStrategy、writeMode、copyWriteBatchSize、autoFlushMaxIntervalMs、autoFlushBatchSize、batchOutWeighRatio在多表共用一个client的情况可能需要根据不同表配置不同值,该参数可实现以上参数的表级配置,在未配置的表则生效全局参数。

说明:

注意一旦配置表级参数,其它表级参数也会被设置默认值,必须将其它表级参数也设置上。

-

uniqueKeys

该参数为表级参数必须通过tableConfig配置,该参数用于在表中无主键但是有唯一索引时,在入库时使用该参数指定字段做唯一约束,在update场景中该字段不需要是唯一索引或者主键,但upsert场景必须要唯一索引或主键。

-

1.0.3

copyMode

使用copy入库的格式:

CSV:将数据拼接成字符串数据用双引号包裹的CSV格式,其中字段间以逗号分割,数据间以换行分割。使用jdbc copy api入库,该方式性能略低于DELIMITER方式,但比较稳定可靠。

DELIMITER:将数据字段使用copy api入库,其中字符间以0X1E分割,数据间以0X1F分割。该方式要求数据不包含分隔符,如包含将报错不能正常入库,且该方式定义null字符串为null数据,如果数据为null字符串将被设置为null。

CSV

1.0.6

caseSensitive

表字段大小写是否敏感。

false

1.0.7

createTempTableMode

在使用copy merge/upsert时,创建临时表方式:

  • AS:使用create temp table *** as select * from *** as方式创建,该方式支持表中带自增字段的使用,但性能略低。
  • LIKE:使用create temp table *** like方式创建,该方式不支持表中带自增字段。

AS

1.0.7

numberAsEpochMsForDatetime

如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字类型,是否将源数据按毫秒时间戳转换为对应时间类型。

说明:
  • 在copy入库场景该参数不生效。
  • 在此版本前该参数为开启状态,且如果数据是数字类型字符串也将视为时间戳。

false

1.0.9

stringToDatetimeFormat

如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串类型,通过SimpleDateFormat按stringToDatetimeFormat格式转换为日期类型,然后通过日期中的时间戳构造数据库对应类型数据。

说明:

该参数配置即代表开启,如果不需要请勿配置。

null

updateAll

upsert时set字段是否包含主键。

true

1.0.10