计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

dws-client

更新时间:2025-02-26 GMT+08:00

简介

dws-client是一款基于DWS JDBC实现的高性能、便捷入库工具,用户在使用时必须保证JDBC可以连接。其中使用dws-client入库具备如下优势:

  1. dws-client提供对缓存的空间、时间维度的限制,支持攒批提升入库性能,以满足业务在高峰低谷期的入库实时性。
    说明:

    攒批:在实时性要求不严格的场景,对单条数据操作进行缓存,待缓存至多条后批量操作,以提升写入性能。

  2. 支持并发入库。
  3. 内部实现多种高性能入库方式、主键冲突策略,满足各种场景入库需求。
  4. API方式交互,低门槛使用。

引入依赖

dws-client已经发布至maven仓库,可在仓库中选择最新版本使用,链接请参见:https://mvnrepository.com/artifact/com.huaweicloud.dws/dws-client
1
2
3
4
5
<dependency>
   <groupId>com.huaweicloud.dws</groupId>
   <artifactId>dws-client</artifactId>
   <version>${version}</version>
</dependency>

核心功能

整体主要以2.X版本为例,说明为兼容方式时表示1.x的。

初始化client

初始化client用于创建一个client实例,以便后续的入库等操作使用。

dws-client支持参数全部在com.huaweicloud.dws.client.config.DwsClientConfigs中记录,其中每一个常量ConfigOp均表示一个参数,底层最终采用map存储,其中ConfigOp中的key表示存储时的key,如果采用配置文件方式也是配置文件的key。

  1. 以下举例为最简单的方式,只需要数据库的连接配置即可,其余配置均采用默认配置。
    public DwsClient getClient() throws Exception {
        DwsConfig config = DwsConfig.of()
            .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url"))
            .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd"))
            .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username"));
        return new DwsClient(config);
    }
  2. 使用配置文件
    新建配置文件client.properties
    dws.client.jdbc.url=jdbc:gaussdb://xxxx:8000/gaussdb
    dws.client.jdbc.password=****
    dws.client.jdbc.username=dbadmin
    配置文件初始化
    public DwsClient getClientByProperties() throws Exception {
        URL resource = this.getClass().getClassLoader().getResource("client.properties");
        DwsConfig config = new DwsConfig(resource.getFile());
        return new DwsClient(config);
    }
  3. 通过map参数
    public DwsClient getClientByMap() throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put(DwsClientConfigs.JDBC_URL.key(), System.getenv("db_url"));
        config.put(DwsClientConfigs.JDBC_PASSWORD.key(), System.getenv("db_pwd"));
        config.put(DwsClientConfigs.JDBC_USERNAME.key(), System.getenv("db_username"));
        return new DwsClient(new DwsConfig(config));
    }
  4. 兼容1.x版本方式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public DwsClient getClient(){
            DwsConfig config = DwsConfig
                    .builder()
                    .withUrl("jdbc:gaussdb://***/gaussdb")
                    .withUsername("***")
                    .withPassword("****")
                    .build();
            return new DwsClient(config);
         }
    
  5. 配置表级参数
    当client入库多张表时,需要设置不同表的参数配置时可通过表级参数配置,设置时可通过对全局配置withTable("xxx")获取到表级参数构造器,此时会根据此刻的全局参数初始化表级参数,新设置的将覆盖原有参数,build时完成设置并添加到全局参数中,此时接口返回的时全局参数,入下可实现链式调用
    public DwsClient getClientTable() throws Exception {
        DwsConfig config = DwsConfig.of()
            .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url"))
            .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd"))
            .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username"))
            .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 10000)
            .withTable("test")
            .with(DwsClientConfigs.WRITE_CONFLICT_STRATEGY, ConflictStrategy.INSERT_OR_IGNORE)
            .build()
            .withTable("test1")
            .with(DwsClientConfigs.WRITE_AUTO_FLUSH_BATCH_SIZE, 200)
            .build();
        return new DwsClient(config);
    }
  6. 1.x兼容
    1
    2
    3
    4
    5
    6
    7
    return DwsConfig.builder()
                    .withUrl(System.getenv("db_url"))
                    .withPassword(System.getenv("db_pwd"))
                    .withUsername(System.getenv("db_username"))
                    .withAutoFlushBatchSize(1000) // 默认攒批1000
                    .withTableConfig("test.t_c_batch_size_2", new TableConfig()
                            .withAutoFlushBatchSize(500)); // 对于表test.t_c_batch_size_2 攒批500;
    

使用数据库连接执行SQL

该接口主要用于一些特殊业务,在目前支持的功能中无法满足时使用。例如: 数据查询,可以直接使用原生JDBC连接操作数据库。

API参数为一个函数式接口,接口会提供一个数据库连接,返回值可以是任意类型,由业务返回类型决定。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void sql() throws DwsClientException {
        Integer id = getClient().sql(connection -> {
            try (ResultSet resultSet = connection.createStatement().executeQuery("select id from test.user where name = 'zhangsan'")) {
                if (resultSet.next()) {
                    return resultSet.getInt("id");
                }
            }
            return null;
        });
        System.out.println("zhangsan id = " + id);
     }

获取表信息

API可以根据一个带schema的表名获取到表结构(会存在缓存),表结构的定义包括:所有列、主键。
1
2
3
public void getTableSchema() throws DwsClientException {
        TableSchema tableSchema = getClient().getTableSchema(TableName.valueOf("test.test"));
     }

数据入库

数据入库的操作client提供一个write API承载,接口会返回一个Operate接口,可以通过接口操作表字段,提交代表操作完成,交由client执行入库,提交时可以选择同步或者异步,设置字段时可以选择是否在主键冲突时忽略该设置。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public void write() throws DwsClientException {
        getClient().write("test.test")
                .setObject("id", 1)
                .setObject("name", "test")
                // 只有在insert时会生效该设置,主键冲突时不更新
                .setObject("age", 38, true)
                // 异步入库,放入后台缓存即可返回
                //.commit()
                // 同步入库 会等待入库成功返回
                .syncCommit();
     }
对于表结构不做变化的,通过全局schema减少数据库元数据的查询,以减少DWS的压力并提升入库性能。
public void testWrite() throws Exception {
    try (DwsClient client = getClient()) {
        client.sql((conn) -> {
            conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;"
                + "create table  test.dws_client_test (id integer, name varchar(10), age int);");
            return null;
        });
        TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test"));
        log.info("table schema {}", tableSchema);
        for (int i = 0; i < 100; i++) {
            Operate operate = client.write(tableSchema)
                .setObject("id", i)
                .setObject("name", "name_" + i)
                .setObject("age", i);
            operate.commit();
        }
    }
}

通过字段索引写client,减少hash计算以缓解客户端cpu压力提升写client吞吐。

字段的下标即为数据库字段的顺序,也是tableSchema.getColumns()集合中的顺序。
public void testWrite() throws Exception {
    try (DwsClient client = getClient()) {
        client.sql((conn) -> {
            conn.createStatement().execute("DROP Table IF EXISTS test.dws_client_test;"
                + "create table  test.dws_client_test (id integer, name varchar(10), age int);");
            return null;
        });
        TableSchema tableSchema = client.getTableSchema(TableName.valueOf("test.dws_client_test"));
        log.info("table schema {}", tableSchema);
        for (int i = 0; i < 100; i++) {
            Operate operate = client.write(tableSchema)
                .setObject(0, i)
                .setObject(1, "name_" + i)
                .setObject(2, i);
            operate.commit();
        }
    }
}

数据删除

删除API和入库API都由Operate承载,但删除时只能且必须设置主键字段,同时忽略更新字段不生效。
public void delete() throws DwsClientException {
        getClient().delete("test.test")
                .setObject("id", 1)
                // 异步入库,放入后台缓存即可返回
                //.commit()
                // 同步入库 会等待入库成功返回
                .syncCommit();
    }
强制刷新缓存到数据库
public void flush() throws DwsClientException {
        getClient().flush();
    }

关闭资源

close时会将缓存刷库,同时close后无法再次执行入库、删除、sql等API接口。
public void close() throws IOException {
        getClient().close();
     }

监听数据入库成功的事件

在异步入库的场景,想要知道哪些数据已经入库,则可通过绑定flushSuccess函数接口实现,该接口会在数据库事务提交完成后回调,回调时会将该批入库数据传递给接口。
public DwsClient getClient() throws Exception {
    DwsConfig config = DwsConfig.of()
        .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url"))
        .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd"))
        .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username"))
        .onFlushSuccess(records -> {
            for (Record record : records) {
                log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record));
            }
        });
    return new DwsClient(config);
}

兼容1.x

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onFlushSuccess(records -> {
                    for (Record record : records) {
                        log.info("flush success. value = {}, pk = {}", RecordUtil.toMap(record), RecordUtil.getRecordPrimaryKeyValue(record));
                    }
                })
                .build();
        return new DwsClient(config);
     }

监听后台异常任务

异步入库数据会由后台任务执行,后台任务执行失败想要感知可通过绑定error函数接口实现,否则只能在下次提交时发生异常错误,由业务感知,如果绑定接口不上报异常,那么该异常将消除,不会在下次提交时报出,否则会以接口异常在下次提交时报出给业务。
public DwsClient getClient() throws Exception {
    DwsConfig config = DwsConfig.of()
        .with(DwsClientConfigs.JDBC_URL, System.getenv("db_url"))
        .with(DwsClientConfigs.JDBC_PASSWORD, System.getenv("db_pwd"))
        .with(DwsClientConfigs.JDBC_USERNAME, System.getenv("db_username"))
        .onError((clientException, client) -> {
            if (clientException instanceof DwsClientRecordException) {
                DwsClientRecordException recordException = (DwsClientRecordException) clientException;
                List<Record> records = recordException.getRecords();
                List<DwsClientException> exceptions = recordException.getExceptions();
                for (int i = 0; i < records.size(); i++) {
                    log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i));
                }
            }
            if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) {
                throw clientException;
            }
            log.error("code = {}", clientException.getCode(), clientException.getOriginal());
            return null;
        });
    return new DwsClient(config);
}

兼容1.x

public DwsClient getClient() {
        DwsConfig config = DwsConfig
                .builder()
                .withUrl("jdbc:postgresql://***/gaussdb")
                .withUsername("***")
                .withPassword("****")
                .onError((clientException, client) -> {
                    if (clientException instanceof DwsClientRecordException) {
                        DwsClientRecordException recordException = (DwsClientRecordException) clientException;
                        List<Record> records = recordException.getRecords();
                        List<DwsClientException> exceptions = recordException.getExceptions();
                        for (int i = 0; i < records.size(); i++) {
                            log.error("pk = {} . error = {}", RecordUtil.getRecordPrimaryKeyValue(records.get(i)), exceptions.get(i));
                        }
                    }
                    if (clientException.getCode() != ExceptionCode.CONNECTION_ERROR && clientException.getCode() != ExceptionCode.LOCK_ERROR) {
                        throw clientException;
                    }
                    log.error("code = {}", clientException.getCode(), clientException.getOriginal());
                    return null;
                })
                .build();
        return new DwsClient(config);
     }

异常处理

异常可分为三类:

  1. InvalidException运行时异常不显示抛出,触发在请求参数无效时。
  2. DwsClientException对所有异常的封装,包含被解析的code以及原始异常。
  3. DwsClientRecordException对DwsClientException的扩展,包含写入异常的数据集合以及对应DwsClientException异常。

异常code对照:

public enum ExceptionCode {
    /**
     * 无效参数
     */
    INVALID_CONFIG(1),

    /**
     * 连接异常
     */
    CONNECTION_ERROR(100),
    /**
     * 只读
     */
    READ_ONLY(101),
    /**
     * 超时
     */
    TIMEOUT(102),
    /**
     * 连接数过多
     */
    TOO_MANY_CONNECTIONS(103),
    /**
     * 加锁异常
     */
    LOCK_ERROR(104),


    /**
     * 认证失败
     */
    AUTH_FAIL(201),
    /**
     * 已经关闭
     */
    ALREADY_CLOSE(202),
    /**
     * 无权限
     */
    PERMISSION_DENY(203),
    SYNTAX_ERROR(204),
    /**
     * 内部异常
     */
    INTERNAL_ERROR(205),
    /**
     * 中断异常
     */
    INTERRUPTED(206),
    /**
     * 表未发现
     */
    TABLE_NOT_FOUND(207),
    CONSTRAINT_VIOLATION(208),
    DATA_TYPE_ERROR(209),
    DATA_VALUE_ERROR(210),

    /**
     * 解析不到的异常
     */
    UNKNOWN_ERROR(500);
    private final int code;

 }

详细配置说明

列表只包含公开使用的参数,部分参数为内部运行需要,列表中不包含的参数不建议自行配置。

在使用配置文件方式配置时通过key配置参数,value为时间单位,支持:

天:d、day(s)

小时:h、hour(s)

分钟:min(s)、m、minute(s)

秒:s、sec(s)、second(s)

毫秒:ms、milli(s)、millisecond(s)

内存参数:

byte: b

kb:k、kb

mb: m、mb

gb:g、gb

参数

key

1.x参数

说明

默认值

JDBC_URL

dws.client.jdbc.url

url

dws数据库JDBC连接地址,client强制使用jdbc:gaussdb://***,配置

jdbc:postgresql也会被替换为jdbc:gaussdb,因此使用上两者均可配置且功能一致。

-

JDBC_USERNAME

dws.client.jdbc.username

username

dws数据库用户名。

-

JDBC_PASSWORD

dws.client.jdbc.password

password

dws数据库用户密码。

-

JDBC_CONNECTION_MAX_USE_TIME

dws.client.jdbc.max.use-time

connectionMaxUseTimeSeconds

连接最大使用时间,超过该时间会强制关闭当前连接并重新获取;使用COPY_MERGE/COPY_UPSERT时会使用临时表,临时表的schema在连接断开时才会清除,主要用于清除该部分数据。

3600s

JDBC_CONNECTION_MAX_IDLE

dws.client.jdbc.max.idle

connectionMaxIdleMs

连接最大空闲时间:当无数据时连接一直处于空闲状态,当达到该值时连接被释放

60s

WRITE_PARTITION_POLICY

dws.client.write.partition-policy

--

partition缓存(表下面的多份缓存)分区策略:

DYNAMIC:动态调整,根据资源使用情况动态调整partition数量在WRITE_PARTITION_MIN和WRITE_PARTITION_MAX之间(当前未实现动态调整仅按WRITE_PARTITION_MIN初始化)

DN:按照dws集群DN分布策略分布,分布后每一份缓存均在同一个DN中,入库时也会使用内部协议直接将数据通过DN入库,当前策略需要集群额外配置,只有在指导下配置使用,同时表的分布列仅支持int*、text类字段作为分布列,切勿自行使用。

DYNAMIC

CACHE_TABLE_METADATA

dws.client.cache.table-metadata

metadataCacheSeconds

表元数据缓存时间,为提升性能,会对理论上不怎么变更的数据,例如表结构,该参数用于设置缓存过期时间;如果系统不考虑支持在线表结构变更,无需配置该参数,以减少对dws集群系统表查询导致的压力,小于等于0表示永不过期

-1

1.x为180s

RETRY_SLEEP_BASE_TIME

dws.client.retry.sleep-base-time

retryBaseTime

重试时sleep时间 = RETRY_SLEEP_BASE_TIME * 次数 + (0~RETRY_SLEEP_RANDOM_TIME),该参数设置时间基数。

1000ms

RETRY_SLEEP_RANDOM_TIME

dws.client.retry.sleep-random-time

retryRandomTime

重试时sleep时间 = RETRY_SLEEP_BASE_TIME * 次数 +(0~RETRY_SLEEP_RANDOM_TIME,该参数设置重试时的随机数范围,该参数主要用于在死锁场景将两个task执行时间错开。

300ms

RETRY_MAX_TIMES

dws.client.retry.max-times

maxFlushRetryTimes

执行刷库任务时,最大尝试执行次数。

3

WRITE_AUTO_FLUSH_BATCH_SIZE

dws.client.write.auto-flush-size

autoFlushBatchSize

后台任务刷库策略:缓存条数大于等于WRITE_AUTO_FLUSH_BATCH_SIZE或者当前时间 - 缓存开始时间大于等于WRITE_AUTO_FLUSH_MAX_INTERVAL,该参数配置缓存最大条数。

30000

WRITE_FORCE_FLUSH_BATCH_SIZE

dws.client.write.force-flush-size

--

强制刷库数据条数,为提高整体吞吐,当业务线程buffer中提交数据时buffer中数据符合自动刷库条件,但是未达到当前参数值,则业务线程依旧写缓存,期望由后台线程负责提交入库以减少业务线程阻塞,若达到当前值则业务线程会将缓存提交执行入库才可返回,此时如果线程池无空闲资源则需等到有空闲资源方可提交成功,通常该参数设置比WRITE_AUTO_FLUSH_BATCH_SIZE略大比较合理。

40000

WRITE_AUTO_FLUSH_MAX_INTERVAL

dws.client.write.auto-flush-max-interval

autoFlushMaxIntervalMs

后台任务刷库策略:缓存条数大于等于WRITE_AUTO_FLUSH_BATCH_SIZE或者当前时间 - 缓存开始时间大于等于WRITE_AUTO_FLUSH_MAX_INTERVAL,该参数配置缓存最大时间。

3s

WRITE_FIXED_COPY_CACHE_SIZE

dws.client.write.fixed-copy.cache-size

--

当使用fixed copy模式时,用于流式写入的缓冲队列大小。

1000

WRITE_USE_COPY_BATCH_SIZE

dws.client.write.use-copy-size

copyWriteBatchSize

在writeMode设置为AUTO时,在数据量低于copyWriteBatchSize时会使用upsert方式入库,否则根据是否有主键选择copy/copy+ upsert方式入库。

1000

WRITE_BUFFER_ALL_MAX_BYTES

dws.client.write.buffer.all-max-bytes

--

当整个client实例中攒批数据内存大小达到该参数设置大小时为避免OOM会执行强制刷库,该参数如果不配置会根据JVM最大堆大小设置,大小为JVM最大内存 * WRITE_BUFFER_JVM_PROCESSORS

说明:

攒批时根据对象类型预估,并非精确值。

1G

WRITE_BUFFER_TABLE_MAX_BYTES

dws.client.write.buffer.table-max-bytes

--

当一张表最大攒批大小达到该值时为避免oom会将整个表缓存刷库,默认取值WRITE_BUFFER_ALL_MAX_BYTES,如果涉及入库多张表建议合理设置此参数。

500m

WRITE_BUFFER_PARTITION_MAX_BYTES

dws.client.write.buffer.partition-max-bytes

--

每个partition攒批大小达到该值为避免OOM,会强制将整个partition缓存刷库,默认初始化为WRITE_BUFFER_TABLE_MAX_BYTES/WRITE_PARTITION_MIN。

200m

WRITE_BUFFER_JVM_PROCESSORS

dws.client.write.buffer.jvm-processors

--

若无WRITE_BUFFER_ALL_MAX_BYTES配置时,则初始化为JVM最大内存 * WRITE_BUFFER_JVM_PROCESSORS。

0.4

WRITE_PARTITION_MIN

dws.client.write.partition-min

--

设置同一个表的partition缓存个数,如果WRITE_PARTITION_POLICY为DN时会为DN个数*该参数个,多个partition时数据根据分布列做hash分布到不同partition中,partition直接数据可并非入库。

1

WRITE_PARTITION_MAX

dws.client.write.partition-max

--

当WRITE_PARTITION_POLICY为DYNAMIC时根据资源使用情况动态调整partition数量在WRITE_PARTITION_MIN和WRITE_PARTITION_MAX之间(当前未实现)。

1

WRITE_FIXED_COPY

dws.client.write.fixed-copy

--

fixed copy模式,该模式仅在WRITE_MODE配置为copy*时生效,开启后第一条数据提交到client时就会建立和数据库的copy流通道,数据会被直接写入数据库IO流中,不在缓存中攒批,为了数据去重重复数据会被放入下个批次中以及重试获得全部数据因此部分场景还是需要将数据写入缓存。

注意:

1、当前模式只支持入库,不支持删除操作,同时要求所有数据入库字段一致

2、当RETRY_MAX_TIMES=1 不使用client的重试并且是纯写入场景此时可以完全不在client攒批直接流式写入dws数据库,以节省客户端内存资源,纯写人场景:无主键表、自增主键(client不设置主键字段的值)、有主键但客户端强制设置WRITE_MODE=copy,由业务保证数据无重复、WRITE_MEMORY_DUPLICATE_REMOVAL=false关闭内存去重

3、此模式下需要一直持有一个连接池资源。

false

WRITE_MODE

dws.client.write.mode

writeMode

数据写入模式:

  • AUTO:

    数据量低于copyWriteBatchSize使用UPSERT方式入库,否则使用COPY_UPSERT方式入库。

  • COPY_MERGE:
    • 有主键使用copy+merge入库。
    • 无主键使用copy入库。
  • COPY_UPSERT:
    • 无主键使用copy入库。
    • 有主键使用copy + upsert入库。
  • UPSERT:
    • 无主键使用insert into。
    • 有主键使用upsert入库。
  • UPDATE:
    • 使用update where语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须是唯一索引,但非唯一索引可能会影响性能。
  • COPY_UPDATE:
    • 数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。
  • UPDATE_AUTO:
    • 批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。

AUTO

WRITE_CONFLICT_STRATEGY

dws.client.write.update.conflict-policy

conflictStrategy

数据库存在主键时的主键冲突策略:

  • INSERT_OR_IGNORE:主键冲突时忽略新数据。
  • INSERT_OR_UPDATE:主键冲突时使用新的数据列更新原数据对应列。
  • INSERT_OR_REPLACE:主键冲突时使用新数据替换原数据,数据库中新数据不包含列设置为null,对于全列更新和INSERT_OR_UPDATE无差异。

INSERT_OR_UPDATE

WRITE_THREAD_SIZE

dws.client.write.thread-size

threadSize

client内部入库线程池大小,每个线程会占用一个连接因此也是连接池大小,连接池用于执行入库、查询、SQL事件,在入库中缓存达到写入条件时将会提交到线程池中执行入库,如果提交时线程池资源无空闲将会等待空闲后提交,提交成功缓存将会被清空,业务才能继续写缓存,否则将会被阻塞;可通过配置参数WRITE_PARTITION_MIN增加同一表缓存份数,结合线程池大小提升整体入库速度。

注意:在1.x版本中缓存按表维度收集因此同一张表只会有一份缓存,缓存数据需要保证入库的先后顺序和缓存写入顺序一致,该版本中未强制保证一致,在数据库入库速度小于缓存写入速度时会产生并发入库行为,会导致锁冲突或者乱想问题,因此建议设置1。

3

WRITE_NOT_WRITE

dws.client.write.disable

--

关闭入库,数据会完整的按照正常入库流程执行客户端所有操作,但是会跳过数据写入数据库这一步,可用于测试客户端排除DWS后的整体链路性能以分析性能问题。

false

WRITE_TABLE_COMPARE_FIELD

dws.client.write.table.compare-field

compareField

用于配置比较字段包含的字段必须满足数据值比当前值小才能入库相当于在入库时加了一个where条件例如:

INSERT INTO "test"."compare_test"("id", "age", "update_time") VALUES (?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time" WHERE "update_time"< EXCLUDED."update_time"

-

LOG_SWITCH

dws.client.log.enable

logSwitch

日志开关,开启后会打印比较详细的过程日志,便于在调试或定位问题时开启。

false

onFlushSuccess

--

onFlushSuccess

数据入库成功后的回调函数。

-

onError

--

onError

后台任务执行失败的回调函数。

-

WRITE_TABLE_UNIQUE_KEY

dws.client.write.table.unique-key

uniqueKeys

该参数用于在表中无主键但是有唯一索引时,在入库时使用该参数指定字段做唯一约束,在update场景中该字段不需要是唯一索引或者主键,但upsert场景必须要唯一索引或主键。

-

WRITE_COPY_FORMAT

dws.client.write.copy-format

copyMode

使用copy入库的格式:

CSV:将数据拼接成字符串数据用双引号包裹的CSV格式,其中字段间以逗号分割,数据间以换行分割。使用jdbc copy api入库,该方式性能略低于DELIMITER方式,但比较稳定可靠。

DELIMITER:将数据字段使用copy api入库,其中字符间以0X1E分割,数据间以0X1F分割。该方式要求数据不包含分隔符,如包含将报错不能正常入库,且该方式定义null字符串为null数据,如果数据为null字符串将被设置为null。

CSV

WRITE_TABLE_FIELD_CASE_SENSITIVE

dws.client.write.table.case-sensitive

caseSensitive

表字段大小写是否敏感。

false

WRITE_TABLE_CREATE_TEMP_MODE

dws.client.write.table.create-temp-mode

createTempTableMode

在使用copy merge/upsert时,创建临时表方式:

  • AS:使用create temp table *** as select * from *** as方式创建,该方式支持表中带自增字段的使用,但性能略低。
  • LIKE:使用create temp table *** like方式创建,该方式不支持表中带自增字等临时表不支持类型的字段。
  • CUSTOM(仅2.x支持):自定义临时表的创建,创建时整体使用like方式,创建SQL可通过多个字段影响:create WRITE_TABLE_TEMP_TYPE temp table name_tmp like name WRITE_TABLE_TEMP_INCLUDING with ( WRITE_TABLE_TEMP_WITH ) DISTRIBUTE BY WRITE_TABLE_TEMP_DISTRIBUTE

AS

WRITE_TABLE_TEMP_TYPE

dws.client.write.table.temp-type

--

当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表类型:

COMMON:默认类型。

VOLATILE: volatile类型,需要内核支持。

COMMON

WRITE_TABLE_TEMP_INCLUDING

dws.client.write.table.temp-including

--

当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表including参数。

including defaults

WRITE_TABLE_TEMP_WITH

dws.client.write.table.temp-with

--

当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表with参数。

-

WRITE_TABLE_TEMP_DISTRIBUTE

dws.client.write.table.temp-distribute

--

当WRITE_TABLE_CREATE_TEMP_MODE=CUSTOM时配置临时表distribute参数。

-

WRITE_FORMAT_NUMBER_TO_DATE

dws.client.write.format.number-to-date

numberAsEpochMsForDatetime

如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字类型,是否使用数字转时间功能,转换时会统一转换成标准时间戳(毫秒),然后创建对应类型java对象。

说明:
  • 1.x在copy入库场景该参数不生效。
  • 1.0.9前该参数为开启状态,且如果数据是数字类型字符串也将视为时间戳。

false

WRITE_FORMAT_INT_TO_DATE_TYPE

dws.client.write.format.number-to-date.int-type

--

当WRITE_FORMAT_NUMBER_TO_DATE=true时,如果数据为int,转换逻辑:

days:当作相对于元年的天数转换。

seconds:当作相对于元年的秒。

-

WRITE_FORMAT_LONG_TO_DATE_TYPE

dws.client.write.format.number-to-date.long-type

--

当WRITE_FORMAT_NUMBER_TO_DATE=true时,如果数据为long,转换逻辑:

days:当作相对于元年的天数转换。

seconds:当作相对于元年的秒。

ms:当作相对于元年的毫秒。

-

WRITE_FORMAT_NUMBER_STRING_TO_DATE

dws.client.write.format.number-string-date

--

如果数据库字段是时间类型(date\time\timestamp)并且数据源为数字字符串,是否将其转换为数字,转换后可使用WRITE_FORMAT_NUMBER_TO_DATE功能。

false

WRITE_FORMAT_STRING_TO_DATE

dws.client.write.format.string-date

stringToDatetimeFormat

如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串类型,通过SimpleDateFormat按stringToDatetimeFormat格式转换为日期类型,然后通过日期中的时间戳构造数据库对应类型数据。

说明:

该参数配置即代表开启,如果不需要请勿配置。

-

WRITE_FORMAT_DATE_0000

dws.client.write.format.date-0000

--

如果数据库字段是时间类型(date\time\timestamp)并且数据源为字符串 0000-00-00 00:00:00 ,是否转换为时间戳0对应时间。

false

WRITE_UPDATE_INCLUDE_PRIMARY_KEY

dws.client.write.update.include-pk

updateAll

upsert时set字段是否包含主键,hstore表在全列更新(set字段包含数据库所有字段)时不需要反查数据库,性能会更好。

true

JDBC_AUTO_COMMIT

dws.client.jdbc.auto-commit

autoCommit

入库时使用自动事务。

false

WRITE_FORMAT_STRING_U0000

dws.client.write.format.string-u0000

--

是否删除字符串中的特殊字符\u0000,如果不删除copy会报错,只能使用upsert入库。

true

WRITE_FORMAT_DECIMAL_DEF_TYPE

dws.client.write.format.decimal-def.type

--

当数据库为NUMERIC或者DECIMAL时,输入为空或者空字符串时填充默认值策略:

null:填充为空。

zero:填充为0。

custom:通过参数WRITE_FORMAT_DECIMAL_DEF_CUSTOM自定义。

null

WRITE_FORMAT_DECIMAL_DEF_CUSTOM

dws.client.write.format.decimal-def.custom

--

WRITE_FORMAT_DECIMAL_DEF_TYPE=custom时,设置自定值。

-

NETWORK_INTERNAL_PRIVATE_IP

dws.client.network.internal-privateIp

--

WRITE_PARTITION_POLICY=DN时如果客户端程序和dws集群网络不在同一网络(内部通信网络),通过此参数配置网络ip映射格式为:内部IP:外部IP多个使用;分割。

-

TIMEOUT_TASK

dws.client.timeout.task

--

执行任务的超时时间,包括重试时间。

10m

TIMEOUT_SQL_STATEMENT

dws.client.timeout.statement

--

SQL执行超时时间。

5m

TRACE_SWITCH

dws.client.trace.enable

-

打印trace记录,日志会记录主要的步骤及耗时,用于分析性能问题。

false

SYSTEM_TIMEZONE

dws.client.system.timezone

--

用于设置系统的时区以便于在做时间转换时时区正确,该参数会影响整个Java进程的时区,理论上需要和dws时区一致,如果不设置则保持和客户端进程默认时区一致。

-

PRINT_METRICS_NAMES

dws.client.print-metrics.names

--

用于配置需要打印的指标名称,配置为正则表达式,匹配到的指标就会被打印,可配置.*打印所有指标,确认选哟哪些指标:

rps_.*:用于打印每个表的入库速率

write_rpc:整个实例的写入速度

action_submit_wait:提交线程池的等待时间

buffer_size_.*:每次入库攒批数量

action_cost_.*:每次执行入库耗时

addBatch_cost_.*:执行addBatch的用时统计

executeBatch_cost_.*:执行executeBatch的用时统计

buildBuffer_cost_.*:构建copy buffer的用时统计

copy_cost_.*:执行copy的用时统计

-

PRINT_METRICS_PERIOD

dws.client.print-metrics.names

--

打印指标频率。

30s

2.x删除参数

参数

说明

删除原因

logDataTables

入库时需要打印数据的表,便于在定位问题时对比数据。

基本无使用场景,开发节点debug更方便,生成也不可能配置此参数。

batchOutWeighRatio

为提高整体吞吐,当对autoFlushBatchSize要求不是很严格时,可设置该参数,当往buffer中提交数据时buffer中数据量大于batchOutWeighRatio * autoFlushBatchSize时提交线程将会执行提交入库的任务,该参数用于避免业务线程提交任务,尽量使用后台线程执行提交。

WRITE_FORCE_FLUSH_BATCH_SIZE参数功能一致,更直观使用。

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容