dws-client
简介
dws-client是一款基于DWS JDBC实现的高性能、便捷入库工具,用户在使用时必须保证JDBC可以连接。其中使用dws-client入库具备如下优势:
- dws-client提供对缓存的空间、时间维度的限制,支持攒批提升入库性能,以满足业务在高峰低谷期的入库实时性。
攒批:在实时性要求不严格的场景,对单条数据操作进行缓存,待缓存至多条后批量操作,以提升写入性能。
- 支持并发入库。
- 内部实现多种高性能入库方式、主键冲突策略,满足各种场景入库需求。
- API方式交互,低门槛使用。
引入依赖
1 2 3 4 5 |
<dependency> <groupId>com.huaweicloud.dws</groupId> <artifactId>dws-client</artifactId> <version>${version}</version> </dependency> |
核心功能
整体主要以2.X版本为例,说明为兼容方式时表示1.x的。
初始化client
初始化client用于创建一个client实例,以便后续的入库等操作使用。
dws-client支持参数全部在com.huaweicloud.dws.client.config.DwsClientConfigs中记录,其中每一个常量ConfigOp均表示一个参数,底层最终采用map存储,其中ConfigOp中的key表示存储时的key,如果采用配置文件方式也是配置文件的key。
- 以下举例为最简单的方式,只需要数据库的连接配置即可,其余配置均采用默认配置。
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")); return new DwsClient(config); }
- 使用配置文件
新建配置文件client.properties
dws.client.jdbc.url=jdbc:gaussdb://xxxx:8000/gaussdb dws.client.jdbc.password=**** dws.client.jdbc.username=dbadmin
配置文件初始化public DwsClient getClientByProperties() throws Exception { URL resource = this.getClass().getClassLoader().getResource("client.properties"); DwsConfig config = new DwsConfig(resource.getFile()); return new DwsClient(config); }
- 通过map参数
public DwsClient getClientByMap() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(DwsClientConfigs.JDBC_URL.key(), System.getenv("db_url")); config.put(DwsClientConfigs.JDBC_PASSWORD.key(), System.getenv("db_pwd")); config.put(DwsClientConfigs.JDBC_USERNAME.key(), System.getenv("db_username")); return new DwsClient(new DwsConfig(config)); }
- 兼容1.x版本方式
1 2 3 4 5 6 7 8 9
public DwsClient getClient(){ DwsConfig config = DwsConfig .builder() .withUrl("jdbc:gaussdb://***/gaussdb") .withUsername("***") .withPassword("****") .build(); return new DwsClient(config); }
- 配置表级参数
当client入库多张表时,需要设置不同表的参数配置时可通过表级参数配置,设置时可通过对全局配置withTable("xxx")获取到表级参数构造器,此时会根据此刻的全局参数初始化表级参数,新设置的将覆盖原有参数,build时完成设置并添加到全局参数中,此时接口返回的时全局参数,入下可实现链式调用 public DwsClient getClientTable() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 10000) .withTable("test") .with(DwsClientConfigs.WRITE_CONFLICT_STRATEGY, ConflictStrategy.INSERT_OR_IGNORE) .build() .withTable("test1") .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 200) .build(); return new DwsClient(config); }
- 1.x兼容
1 2 3 4 5 6 7
return DwsConfig.builder() .withUrl(System.getenv("db_url")) .withPassword(System.getenv("db_pwd")) .withUsername(System.getenv("db_username")) .withAutoFlushBatchSize(1000) // 默认攒批1000 .withTableConfig("test.t_c_batch_size_2", new TableConfig() .withAutoFlushBatchSize(500)); // 对于表test.t_c_batch_size_2 攒批500;
使用数据库连接执行SQL
该接口主要用于一些特殊业务,在目前支持的功能中无法满足时使用。例如: 数据查询,可以直接使用原生JDBC连接操作数据库。
1 2 3 4 5 6 7 8 9 10 11 |
public void sql() throws DwsClientException { Integer id = getClient().sql(connection -> { try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) { if (resultSet.next()) { return resultSet.getInt("id"); } } return null; }); System.out.println("zhangsan id = " + id); } |
获取表信息
1 2 3 |
public void getTableSchema() throws DwsClientException { TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test")); } |
数据入库
1 2 3 4 5 6 7 8 9 10 11 |
public void write() throws DwsClientException { getClient().write("test.test") .setObject("id", 1) .setObject("name", "test") // 只有在insert时会生效该设置,主键冲突时不更新 .setObject("age", 38, true) // 异步入库,放入后台缓存即可返回 //.commit() // 同步入库 会等待入库成功返回 .syncCommit(); } |
public void testWrite() throws Exception { try (DwsClient client = getClient()) { client.sql((conn) -> { conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;" + "create table test.dws_client_test (id integer, name varchar(10), age int);"); return null; }); TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test")); log.info("table schema {}", tableSchema); for (int i = 0; i < 100; i++) { Operate operate = client.write(tableSchema) .setObject("id", i) .setObject("name", "name_" + i) .setObject("age", i); operate.commit(); } } }
通过字段索引写client,减少hash计算以缓解客户端cpu压力提升写client吞吐。
public void testWrite() throws Exception { try (DwsClient client = getClient()) { client.sql((conn) -> { conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;" + "create table test.dws_client_test (id integer, name varchar(10), age int);"); return null; }); TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test")); log.info("table schema {}", tableSchema); for (int i = 0; i < 100; i++) { Operate operate = client.write(tableSchema) .setObject(0, i) .setObject(1, "name_" + i) .setObject(2, i); operate.commit(); } } }
数据删除
public void delete() throws DwsClientException { getClient().delete("test.test") .setObject("id", 1) // 异步入库,放入后台缓存即可返回 //.commit() // 同步入库 会等待入库成功返回 .syncCommit(); }
public void flush() throws DwsClientException { getClient().flush(); }
关闭资源
public void close() throws IOException { getClient().close(); }
监听数据入库成功的事件
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }); return new DwsClient(config); }
兼容1.x
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onFlushSuccess(records -> { for (Record record : records) { log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record)); } }) .build(); return new DwsClient(config); }
监听后台异常任务
public DwsClient getClient() throws Exception { DwsConfig config = DwsConfig.of() .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url")) .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd")) .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username")) .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }); return new DwsClient(config); }
兼容1.x
public DwsClient getClient() { DwsConfig config = DwsConfig .builder() .withUrl("jdbc:postgresql://***/gaussdb") .withUsername("***") .withPassword("****") .onError((clientException, client) -> { if (clientException instanceof DwsClientRecordException) { DwsClientRecordException recordException = (DwsClientRecordException) clientException; List<Record> records = recordException.getRecords(); List<DwsClientException> exceptions = recordException.getExceptions(); for (int i = 0; i < records.size(); i++) { log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i)); } } if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) { throw clientException; } log.error("code = {}", clientException.getCode(), clientException.getOriginal()); return null; }) .build(); return new DwsClient(config); }
异常处理
异常可分为三类:
- InvalidException运行时异常不显示抛出,触发在请求参数无效时。
- DwsClientException对所有异常的封装,包含被解析的code以及原始异常。
- DwsClientRecordException对DwsClientException的扩展,包含写入异常的数据集合以及对应DwsClientException异常。
异常code对照:
public enum ExceptionCode { /** * 无效参数 */ INVALID_CONFIG(1), /** * 连接异常 */ CONNECTION_ERROR(100), /** * 只读 */ READ_ONLY(101), /** * 超时 */ TIMEOUT(102), /** * 连接数过多 */ TOO_MANY_CONNECTIONS(103), /** * 加锁异常 */ LOCK_ERROR(104), /** * 认证失败 */ AUTH_FAIL(201), /** * 已经关闭 */ ALREADY_CLOSE(202), /** * 无权限 */ PERMISSION_DENY(203), SYNTAX_ERROR(204), /** * 内部异常 */ INTERNAL_ERROR(205), /** * 中断异常 */ INTERRUPTED(206), /** * 表未发现 */ TABLE_NOT_FOUND(207), CONSTRAINT_VIOLATION(208), DATA_TYPE_ERROR(209), DATA_VALUE_ERROR(210), /** * 解析不到的异常 */ UNKNOWN_ERROR(500); private final int code; }
详细配置说明
列表只包含公开使用的参数,部分参数为内部运行需要,列表中不包含的参数不建议自行配置。
在使用配置文件方式配置时通过key配置参数,value为时间时时间单位支持:
天:d、day(s)
小时:h、hour(s)
分钟:min(s)、m、minute(s)
秒:s、sec(s)、second(s)
毫秒:ms、milli(s)、millisecond(s)
内存参数:
byte: b
kb:k、kb
mb: m、mb
gb:g、gb
参数 |
key |
1.x参数 |
说明 |
默认值 |
---|---|---|---|---|
JDBC_URL |
dws.client.jdbc.url |
url |
dws数据库JDBC连接地址,client强制使用jdbc:gaussdb://***,配置 jdbc:postgresql也会被替换为jdbc:gaussdb,因此使用上两者均可配置且功能一致。 |
- |
JDBC_USERNAME |
dws.client.jdbc.username |
username |
dws数据库用户名。 |
- |
JDBC_PASSWORD |
dws.client.jdbc.password |
password |
dws数据库用户密码。 |
- |
JDBC_CONNECTION_MAX_USE_TIME |
dws.client.jdbc.max.use-time |
connectionMaxUseTimeSeconds |
连接最大使用时间,超过该时间会强制关闭当前连接并重新获取;使用COPY_MERGE/COPY_UPSERT时会使用临时表,临时表的schema在连接断开时才会清除,主要用于清除该部分数据。 |
3600s |
JDBC_CONNECTION_MAX_IDLE |
dws.client.jdbc.max.idle |
connectionMaxIdleMs |
连接最大空闲时间:当无数据时连接一直处于空闲状态,当达到该值时连接被释放 |
60s |
WRITE_PARTITION_POLICY |
dws.client.write.partition-policy |
-- |
partition缓存(表下面的多份缓存)分区策略: DYNAMIC:动态调整,根据资源使用情况动态调整partition数量在WRITE_PARTITION_MIN和WRITE_PARTITION_MAX之间(当前未实现动态调整仅按WRITE_PARTITION_MIN初始化) DN:按照dws集群DN分布策略分布,分布后每一份缓存均在同一个DN中,入库时也会使用内部协议直接将数据通过DN入库,当前策略需要集群额外配置,只有在指导下配置使用,同时表的分布列仅仅支持int*、text类字段作为分布列,切勿自行使用。 |
DYNAMIC |
CACHE_TABLE_METADATA |
dws.client.cache.table-metadata |
metadataCacheSeconds |
表元数据缓存时间,为提升性能,会对理论上不怎么变更的数据,例如表结构,该参数用于设置缓存过期时间;如果系统不考虑支持在线表结构变更,无需配置该参数,以减少对dws集群系统表查询导致的压力,小于等于0表示永不过期 |
-1 1.x为180s |
RETRY_SLEEP_BASE_TIME |
dws.client.retry.sleep-base-time |
retryBaseTime |
重试时sleep时间 = RETRY_SLEEP_BASE_TIME * 次数 + (0~RETRY_SLEEP_RANDOM_TIME),该参数设置时间基数。 |
1000ms |
RETRY_SLEEP_RANDOM_TIME |
dws.client.retry.sleep-random-time |
retryRandomTime |
重试时sleep时间 = RETRY_SLEEP_BASE_TIME * 次数 +(0~RETRY_SLEEP_RANDOM_TIME,该参数设置重试时的随机数范围,该参数主要用于在死锁场景将两个task执行时间错开。 |
300ms |
RETRY_MAX_TIMES |
dws.client.retry.max-times |
maxFlushRetryTimes |
执行刷库任务时,最大尝试执行次数。 |
3 |
WRITE_AUTO_FLUSH_BATCH_SIZE |
dws.client.write.auto-flush-size |
autoFlushBatchSize |
后台任务刷库策略:缓存条数大于等于WRITE_AUTO_FLUSH_BATCH_SIZE或者当前时间 - 缓存开始时间大于等于WRITE_AUTO_FLUSH_MAX_INTERVAL,该参数配置缓存最大条数。 |
30000 |
WRITE_FORCE_FLUSH_BATCH_SIZE |
dws.client.write.force-flush-size |
-- |
强制刷库数据条数,为提高整体吞吐,当业务线程buffer中提交数据时buffer中数据符合自动刷库条件,但是未达到当前参数值,则业务线程依旧写缓存,期望由后台线程负责提交入库以减少业务线程阻塞,若达到当前值则业务线程会将缓存提交执行入库才可返回,此时如果线程池无空闲资源则续等到有空闲资源方可提交成功,通常该参数设置比WRITE_AUTO_FLUSH_BATCH_SIZE略大比较合理。 |
40000 |
WRITE_AUTO_FLUSH_MAX_INTERVAL |
dws.client.write.auto-flush-max-interval |
autoFlushMaxIntervalMs |
后台任务刷库策略:缓存条数大于等于WRITE_AUTO_FLUSH_BATCH_SIZE或者当前时间 - 缓存开始时间大于等于WRITE_AUTO_FLUSH_MAX_INTERVAL,该参数配置缓存最大时间。 |
3s |
WRITE_FIXED_COPY_CACHE_SIZE |
dws.client.write.fixed-copy.cache-size |
-- |
当使用fixed copy模式时,用于流式写入的缓冲队列大小。 |
1000 |
WRITE_USE_COPY_BATCH_SIZE |
dws.client.write.use-copy-size |
copyWriteBatchSize |
在writeMode设置为AUTO时,在数据量低于copyWriteBatchSize时会使用upsert方式入库,否则根据是否有主键选择copy/copy+ upsert方式入库。 |
1000 |
WRITE_BUFFER_ALL_MAX_BYTES |
dws.client.write.buffer.all-max-bytes |
-- |
当整个client实例中攒批数据内存大小达到该大小大小时为避免OOM会执行强制刷库,该参数如果不配置会根据JVM最大堆大小设置,大小为JVM最大内存 * WRITE_BUFFER_JVM_PROCESSORS
说明:
攒批时根据对象类型预估,并非精确值。 |
1G |
WRITE_BUFFER_TABLE_MAX_BYTES |
dws.client.write.buffer.table-max-bytes |
-- |
当一张表最大攒批大小达到该值时为避免oom会将整个表缓存刷库,默认取值WRITE_BUFFER_ALL_MAX_BYTES,如果涉及入库多张表建议合理设置此参数 |
500m |
WRITE_BUFFER_PARTITION_MAX_BYTES |
dws.client.write.buffer.partition-max-bytes |
-- |
每个partition攒批大小达到该值为避免oom,会强制将整个partition缓存刷库,默认初始化为WRITE_BUFFER_TABLE_MAX_BYTES/WRITE_PARTITION_MIN |
200m |
WRITE_BUFFER_JVM_PROCESSORS |
dws.client.write.buffer.jvm-processors |
-- |
若无WRITE_BUFFER_ALL_MAX_BYTES配置时,则初始化为JVM最大内存 * WRITE_BUFFER_JVM_PROCESSORS |
0.4 |
WRITE_PARTITION_MIN |
dws.client.write.partition-min |
-- |
设置同一个表的partition缓存个数,如果WRITE_PARTITION_POLICY为DN时会为DN个数*该参数个,多个partition时数据根据分布列做hash分布到不同partition中,partition直接数据可并非入库。 |
1 |
WRITE_PARTITION_MAX |
dws.client.write.partition-max |
-- |
当WRITE_PARTITION_POLICY为DYNAMIC时根据资源使用情况动态调整partition数量在WRITE_PARTITION_MIN和WRITE_PARTITION_MAX之间(当前未实现) |
1 |
WRITE_FIXED_COPY |
dws.client.write.fixed-copy |
-- |
fixed copy模式,该模式仅仅在WRITE_MODE配置为copy*时生效,开启后第一条数据提交到client时就会建立和数据库的copy流通道,数据会被直接写入数据库IO流中,不在缓存中攒批,为了数据去重重复数据会被放入下个批次中以及重试获得全部数据因此部分场景还是需要将数据写入缓存。 注意: 1、当前模式只支持入库,不支持删除操作,同时要求所有数据入库字段一致 2、当RETRY_MAX_TIMES=1 不使用client的重试并且是纯写入场景此时可以完全不在client攒批直接流式写入dws数据库,以节省客户端内存资源,纯写人场景:无主键表、自增主键(client不设置主键字段的值)、有主键但客户端强制设置WRITE_MODE=copy,由业务保证数据无重复、WRITE_MEMORY_DUPLICATE_REMOVAL=false关闭内存去重 3、此模式此模式下需要一直持有一个连接池资源 |
false |
WRITE_MODE |
dws.client.write.mode |
writeMode |
数据写入模式:
|
AUTO |
WRITE_CONFLICT_STRATEGY |
dws.client.write.update.conflict-policy |
conflictStrategy |
数据库存在主键时的主键冲突策略:
|
INSERT_OR_UPDATE |
WRITE_THREAD_SIZE |
dws.client.write.thread-size |
threadSize |
client内部入库线程池大小,每个线程会占用一个连接因此也是连接池大小,连接池用于执行入库、查询、SQL事件,在入库中缓存达到写入条件时将会提交到线程池中执行入库,如果提交时线程池资源无空闲将会等待空闲后提交,提交成功缓存将会被清空,业务才能继续写缓存,否则将会被阻塞;可通过配置参数WRITE_PARTITION_MIN增加同一表缓存份数,结合线程池大小提升整体入库速度。 注意:在1.x版本中缓存按表维度收集因此同一张表只会有一份缓存,缓存数据需要保证入库的先后顺序和缓存写入顺序一致,该版本中未强制保证一致,在数据库入库速度小于缓存写入速度时会产生并发入库行为,会导致锁冲突或者乱想问题,因此建议设置1。 |
3 |
WRITE_NOT_WRITE |
dws.client.write.disable |
-- |
关闭入库,数据会完整的按照正常入库流程执行客户端所有操作,但是会跳过数据写入数据库这一步,可用于测试客户端排除DWS后的整体链路性能以分析性能问题。 |
false |
WRITE_TABLE_COMPARE_FIELD |
dws.client.write.table.compare-field |
compareField |
用于配置比较字段包含的字段必须满足数据值比当前值小才能入库相当于在入库时加了一个where条件例如: INSERT INTO "test"."compare_test"("id", "age", "update_time") VALUES (?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time" WHERE "update_time"< EXCLUDED."update_time" |
|
LOG_SWITCH |
dws.client.log.enable |
logSwitch |
日志开关,开启后会打印比较详细的过程日志,便于在调试或定位问题时开启。 |
false |
onFlushSuccess |
-- |
onFlushSuccess |
数据入库成功后的回调函数。 |
- |
onError |
-- |
onError |
后台任务执行失败的回调函数。 |
- |
WRITE_TABLE_UNIQUE_KEY |
dws.client.write.table.unique-key |
uniqueKeys |
该参数用于在表中无主键但是有唯一索引时,在入库时使用该参数指定字段做唯一约束,在update场景中该字段不需要是唯一索引或者主键,但upsert场景必须要唯一索引或主键。 |
- |
WRITE_COPY_FORMAT |
dws.client.write.copy-format |
copyMode |
使用copy入库的格式: CSV:将数据拼接成字符串数据用双引号包裹的CSV格式,其中字段间以逗号分割,数据间以换行分割。使用jdbc copy api入库,该方式性能略低于DELIMITER方式,但比较稳定可靠。 DELIMITER:将数据字段使用copy api入库,其中字符间以0X1E分割,数据间以0X1F分割。该方式要求数据不包含分隔符,如包含将报错不能正常入库,且该方式定义null字符串为null数据,如果数据为null字符串将被设置为null。 |
CSV |
WRITE_TABLE_FIELD_CASE_SENSITIVE |
dws.client.write.table.case-sensitive |
caseSensitive |
表字段大小写是否敏感。 |
false |
WRITE_TABLE_CREATE_TEMP_MODE |
dws.client.write.table.create-temp-mode |
createTempTableMode |
在使用copy merge/upsert时,创建临时表方式:
|
AS |
WRITE_TABLE_TEMP_TYPE |
dws.client.write.table.temp-type |
-- |
当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表类型: COMMON:默认类型 VOLATILE: volatile类型,需要内核支持 |
COMMON |
WRITE_TABLE_TEMP_INCLUDING |
dws.client.write.table.temp-including |
-- |
当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表including参数。 |
including defaults |
WRITE_TABLE_TEMP_WITH |
dws.client.write.table.temp-with |
-- |
当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表with参数。 |
|
WRITE_TABLE_TEMP_DISTRIBUTE |
dws.client.write.table.temp-distribute |
-- |
当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表distribute参数。 |
|
WRITE_FORMAT_NUMBER_TO_DATE |
dws.client.write.format.number-to-date |
numberAsEpochMsForDatetime |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字类型,是否使用数字转时间功能,转换时会统一转换成标准时间戳(毫秒),然后创建对应类型java对象。
说明:
|
false |
WRITE_FORMAT_INT_TO_DATE_TYPE |
dws.client.write.format.number-to-date.int-type |
当WRITE_FORMAT_NUMBER_TO_DATE=true时,如果数据为int,转换逻辑: days:当作相对于元年的天数转换 seconds:当作相对于元年元年的秒 |
- |
|
WRITE_FORMAT_LONG_TO_DATE_TYPE |
dws.client.write.format.number-to-date.long-type |
当WRITE_FORMAT_NUMBER_TO_DATE=true时,如果数据为long,转换逻辑: days:当作相对于元年的天数转换 seconds:当作相对于元年元年的秒 ms:当作相对于元年元年的毫秒 |
- |
|
WRITE_FORMAT_NUMBER_STRING_TO_DATE |
dws.client.write.format.number-string-date |
-- |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字字符串,是否将其转换为数字,转换后可使用WRITE_FORMAT_NUMBER_TO_DATE功能 |
false |
WRITE_FORMAT_STRING_TO_DATE |
dws.client.write.format.string-date |
stringToDatetimeFormat |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串类型,通过SimpleDateFormat按stringToDatetimeFormat格式转换为日期类型,然后通过日期中的时间戳构造数据库对应类型数据。
说明:
该参数配置即代表开启,如果不需要请勿配置。 |
- |
WRITE_FORMAT_DATE_0000 |
dws.client.write.format.date-0000 |
-- |
如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串 0000-00-00 00:00:00 ,是否转换为时间戳0对应时间 |
false |
WRITE_UPDATE_INCLUDE_PRIMARY_KEY |
dws.client.write.update.include-pk |
updateAll |
upsert时set字段是否包含主键,hstore表在全列更新(set字段包含数据库所有字段)时不需要反查数据库,性能会更好 |
true |
JDBC_AUTO_COMMIT |
dws.client.jdbc.auto-commit |
autoCommit |
入库时使用自动事务。 |
false |
WRITE_FORMAT_STRING_U0000 |
dws.client.write.format.string-u0000 |
-- |
是否删除字符串中的特殊字符\u0000,如果不删除copy会报错,只能使用upsert入库。 |
true |
WRITE_FORMAT_DECIMAL_DEF_TYPE |
dws.client.write.format.decimal-def.type |
-- |
当数据库为NUMERIC或者DECIMAL时,输入为空或者空字符串时填充默认值策略: null:填充为空 zero:填充为0 custom:通过参数WRITE_FORMAT_DECIMAL_DEF_CUSTOM自定义 |
null |
WRITE_FORMAT_DECIMAL_DEF_CUSTOM |
dws.client.write.format.decimal-def.custom |
-- |
WRITE_FORMAT_DECIMAL_DEF_TYPE=custom时,设置自定值。 |
- |
NETWORK_INTERNAL_PRIVATE_IP |
dws.client.network.internal-privateIp |
-- |
WRITE_PARTITION_POLICY=DN时如果客户端程序和dws集群网络不在同一网络(内部通信网络),通过此参数配置网络ip映射格式为:内部IP:外部IP多个使用;分割。 |
- |
TIMEOUT_TASK |
dws.client.timeout.task |
-- |
执行任务的超时时间,包括重试时间。 |
10m |
TIMEOUT_SQL_STATEMENT |
dws.client.timeout.statement |
-- |
SQL执行超时时间。 |
5m |
TRACE_SWITCH |
dws.client.trace.enable |
- |
打印trace记录,日志会记录主要的步骤及耗时,用于分析性能问题。 |
false |
SYSTEM_TIMEZONE |
dws.client.system.timezone |
-- |
用于设置系统的时区以便于在做时间转换时时区正确,改参数会影响整个Java进程的时区,理论上需要和dws时区一致,如果不设置则保持和客户端进程默认时区一致。 |
- |
PRINT_METRICS_NAMES |
dws.client.print-metrics.names |
-- |
用于配置需要打印的指标名称,配置为正则表达式,匹配到的指标就会被打印,可配置.*打印所有指标,确认选哟哪些指标: rps_.*:用于打印每个表的入库速率 write_rpc:整个实例的写入速度 action_submit_wait:提交线程池的等待时间 buffer_size_.*:每次入库攒批数量 action_cost_.*:每次执行入库耗时 addBatch_cost_.*:执行addBatch的用时统计 executeBatch_cost_.*:执行executeBatch的用时统计 buildBuffer_cost_.*:构建copy buffer的用时统计 copy_cost_.*:执行copy的用时统计 |
- |
PRINT_METRICS_PERIOD |
dws.client.print-metrics.names |
-- |
打印指标频率。 |
30s |
2.x删除参数
参数 |
说明 |
删除原因 |
---|---|---|
logDataTables |
入库时需要打印数据的表,便于在定位问题时对比数据。 |
基本无使用场景,开发节点debug更方便,生成也不可能配置此参数。 |
batchOutWeighRatio |
为提高整体吞吐,当对autoFlushBatchSize要求不是很严格时,可设置该参数,当往buffer中提交数据时buffer中数据量大于batchOutWeighRatio * autoFlushBatchSize时提交线程将会执行提交入库的任务,该参数用于避免业务线程提交任务,尽量使用后台线程执行提交。 |
WRITE_FORCE_FLUSH_BATCH_SIZE参数功能一致,更直观使用。 |