dws-client
简介
dws-client是一款基于DWS JDBC实现的高性能、便捷入库工具,用户在使用时必须保证JDBC可以连接。其中使用dws-client入库具备如下优势:
- dws-client提供对缓存的空间、时间维度的限制,支持攒批提升入库性能,以满足业务在高峰低谷期的入库实时性。
攒批:在实时性要求不严格的场景,对单条数据操作进行缓存,待缓存至多条后批量操作,以提升写入性能。
- 支持并发入库。
- 内部实现多种高性能入库方式、主键冲突策略,满足各种场景入库需求。
- API方式交互,低门槛使用。
引入依赖
1 2 3 4 5 |
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-client</artifactId> <version>${version}</version> </dependency> |
场景使用
前提条件:初始化client
1 2 3 4 5 6 7 8 9 |
public DwsClient getClient(){ DwsConfig config = DwsConfig .builder() .withUrl("jdbc:gaussdb://***/gaussdb") .withUsername("***") .withPassword("****") .build(); return new DwsClient(config); } |
1 2 3 4 5 6 7 |
return DwsConfig.builder() .withUrl(System.getenv("db_url")) .withPassword(System.getenv("db_pwd")) .withUsername(System.getenv("db_username")) .withAutoFlushBatchSize(1000) // 默认攒批1000 .withTableConfig("test.t_c_batch_size_2", new TableConfig() .withAutoFlushBatchSize(500)); // 对于表test.t_c_batch_size_2 攒批500; |
场景二:使用数据库连接执行SQL
该接口主要用于一些特殊业务,在目前支持的功能中无法满足时使用。例如: 数据查询,可以直接使用原生JDBC连接操作数据库。
1 2 3 4 5 6 7 8 9 10 11 |
public void sql() throws DwsClientException { Integer id = getClient().sql(connection -> { try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) { if (resultSet.next()) { return resultSet.getInt("id"); } } return null; }); System.out.println("zhangsan id = " + id); } |
场景三:获取表信息
1 2 3 |
public void getTableSchema() throws DwsClientException { TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test")); } |
场景四:数据入库
1 2 3 4 5 6 7 8 9 10 11 |
public void write() throws DwsClientException { getClient().write("test.test") .setObject("id", 1) .setObject("name", "test") // 只有在insert时会生效该设置,主键冲突时不更新 .setObject("age", 38, true) // 异步入库,放入后台缓存即可返回 //.commit() // 同步入库 会等待入库成功返回 .syncCommit(); } |
场景五:数据删除
public void delete() throws DwsClientException { getClient().delete("test.test") .setObject("id", 1) // 异步入库,放入后台缓存即可返回 //.commit() // 同步入库 会等待入库成功返回 .syncCommit(); }
public void flush() throws DwsClientException { getClient().flush(); }
场景七:关闭资源
public void close() throws IOException { getClient().close(); }
监听数据入库成功的事件
在异步入库的场景,想要知道哪些数据已经入库,则可通过绑定flushSuccess函数接口实现,该接口会在数据库事务提交完成后回调,回调时会将该批入库数据传递给接口。
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }) .build(); return new DwsClient(config); }
监听后台异常任务
异步入库数据会由后台任务执行,后台任务执行失败想要感知可通过绑定error函数接口实现,否则只能在下次提交时发生异常错误,由业务感知,如果绑定接口不上报异常,那么该异常将消除,不会再下次提交时报出,否则会以接口异常在下次提交时报出给业务。
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }) .build(); return new DwsClient(config); }
异常处理
异常可分为三类:
- InvalidException运行时异常不显示抛出,触发在请求参数无效时。
- DwsClientException对所有异常的封装,包含被解析的code以及原始异常。
- DwsClientRecordException对DwsClientException的扩展,包含写入异常的数据集合以及对应DwsClientException异常。
异常code对照:
public enum ExceptionCode { /** * 无效参数 */ INVALID_CONFIG(1), /** * 连接异常 */ CONNECTION_ERROR(100), /** * 只读 */ READ_ONLY(101), /** * 超时 */ TIMEOUT(102), /** * 连接数过多 */ TOO_MANY_CONNECTIONS(103), /** * 加锁异常 */ LOCK_ERROR(104), /** * 认证失败 */ AUTH_FAIL(201), /** * 已经关闭 */ ALREADY_CLOSE(202), /** * 无权限 */ PERMISSION_DENY(203), SYNTAX_ERROR(204), /** * 内部异常 */ INTERNAL_ERROR(205), /** * 中断异常 */ INTERRUPTED(206), /** * 表未发现 */ TABLE_NOT_FOUND(207), CONSTRAINT_VIOLATION(208), DATA_TYPE_ERROR(209), DATA_VALUE_ERROR(210), /** * 解析不到的异常 */ UNKNOWN_ERROR(500); private final int code; }
详细配置说明
参数 |
说明 |
默认值 |
支持版本 |
---|---|---|---|
url |
dws数据库JDBC连接地址。 |
- |
1.0 |
username |
dws数据库用户名。 |
- |
|
password |
dws数据库用户密码。 |
- |
|
connectionMaxUseTimeSeconds |
连接最大使用时间(秒),超过该时间会强制关闭当前连接并重新获取;使用COPY_MERGE/COPY_UPSERT时会使用临时表,临时表的schema在连接断开时才会清除,主要用于清除该部分数据。 |
3600 |
|
connectionMaxIdleMs |
连接最大空闲时间(毫秒)。 |
60000 |
|
metadataCacheSeconds |
元数据缓存时间(秒),为提升性能,会对理论上不怎么变更的数据,例如表结构,该参数用于设置缓存过期时间。 |
180 |
|
retryBaseTime |
重试时sleep时间 = retryBaseTime * 次数 + (0~retryRandomTime)毫秒,该参数设置时间基数(毫秒)。 |
1000 |
|
retryRandomTime |
retryBaseTime重试时sleep时间 = retryBaseTime * 次数 +(0~retryRandomTime)毫秒,该参数设置重试时的随机数范围,该参数主要用于在死锁场景将两个task执行时间错开(毫秒)。 |
300 |
|
maxFlushRetryTimes |
执行刷库任务时,最大尝试执行次数。 |
3 |
|
autoFlushBatchSize |
后台任务刷库策略:缓存条数大于等于autoFlushBatchSize或者当前时间 - 缓存开始时间大于等于autoFlushMaxIntervalMs,该参数配置缓存最大条数。 |
5000 |
|
autoFlushMaxIntervalMs |
后台任务刷库策略:缓存条数大于等于autoFlushBatchSize或者当前时间 - 缓存开始时间大于等于autoFlushMaxIntervalMs,该参数配置缓存最大时间(毫秒)。 |
3000 |
|
copyWriteBatchSize |
在writeMode设置为AUTO时,在数据量低于copyWriteBatchSize时会使用upsert方式入库,否则根据是否有主键选择copy/copy+ upsert方式入库。 |
6000 |
|
writeMode |
数据写入模式:
|
AUTO |
|
conflictStrategy |
数据库存在主键时的主键冲突策略:
|
INSERT_OR_UPDATE |
|
threadSize |
执行任务时的并发数量,异步任务中以表为维度提交任务,多表之间可并发;对于同一个表存在对字段列数不同的操作,例如在攒批中存在100条操作A B C字段、200条操作A B D字段那么最后会将操作字段一样的归为一类,不同类之间可并发入库,设置该参数可参考这两个场景设置,以提升吞吐。 |
3 |
|
logSwitch |
日志开关,开启后会打印比较详细的过程日志,便于在调试或定位问题时开启。 |
false |
|
logDataTables |
入库时需要打印数据的表,便于在定位问题时对比数据。 |
- |
|
flushSuccessFunction |
数据入库成功后的回调函数。 |
- |
|
errorFunction |
后台任务执行失败的回调函数。 |
- |
|
batchOutWeighRatio |
为提高整体吞吐,当对autoFlushBatchSize要求不是很严格时,可设置该参数,当往buffer中提交数据时buffer中数据量大于batchOutWeighRatio * autoFlushBatchSize时提交线程将会执行提交入库的任务,该参数用于避免业务线程提交任务,尽量使用后台线程执行提交。 |
1 |
|
tableConfig |
对于conflictStrategy、writeMode、copyWriteBatchSize、autoFlushMaxIntervalMs、autoFlushBatchSize、batchOutWeighRatio在多表共用一个client的情况可能需要根据不同表配置不同值,该参数可实现以上参数的表级配置,在未配置的表则生效全局参数。
说明:
注意一旦配置表级参数,其它表级参数也会被设置默认值,必须将其它表级参数也设置上。 |
- |
|
uniqueKeys |
该参数为表级参数必须通过tableConfig配置,该参数用于在表中无主键但是有唯一索引时,在入库时使用该参数指定字段做唯一约束,在update场景中该字段不需要是唯一索引或者主键,但upsert场景必须要唯一索引或主键。 |
- |
1.0.3 |
copyMode |
使用copy入库的格式: CSV:将数据拼接成字符串数据用双引号包裹的CSV格式,其中字段间以逗号分割,数据间以换行分割。使用jdbc copy api入库,该方式性能略低于DELIMITER方式,但比较稳定可靠。 DELIMITER:将数据字段使用copy api入库,其中字符间以0X1E分割,数据间以0X1F分割。该方式要求数据不包含分隔符,如包含将报错不能正常入库,且该方式定义null字符串为null数据,如果数据为null字符串将被设置为null。 |
CSV |
1.0.6 |
caseSensitive |
表字段大小写是否敏感。 |
false |
1.0.7 |
createTempTableMode |
在使用copy merge/upsert时,创建临时表方式:
|
AS |
1.0.7 |
numberAsEpochMsForDatetime |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字类型,是否将源数据按毫秒时间戳转换为对应时间类型。
说明:
|
false |
1.0.9 |
stringToDatetimeFormat |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串类型,通过SimpleDateFormat按stringToDatetimeFormat格式转换为日期类型,然后通过日期中的时间戳构造数据库对应类型数据。
说明:
该参数配置即代表开启,如果不需要请勿配置。 |
null |
|
updateAll |
upsert时set字段是否包含主键。 |
true |
1.0.10 |