做结果表
格式语法
SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。
1 2 3 4 5 6 7 8 9 10 11 12 | create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
Flink SQL配置参数
Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。
| 参数 | 说明 | 默认值 |
|---|---|---|
| connector | flink框架区分connector参数,固定为dws。 | - |
| url | 数据库连接地址。 | - |
| username | 配置连接用户。 | - |
| password | 配置密码。 | - |
| tableName | 对应dws表,默认会取public模式下的表,如果非public模式则需要显示指定,格式为:schema.tableName。 | - |
| 参数 | 说明 | 默认值 |
|---|---|---|
| connectionSize | 初始dws-client时的并发数量。 | 1 |
| connectionMaxUseTimeSeconds | 连接创建多少秒后强制释放(单位:秒)。 | 3600(一小时) |
| connectionMaxIdleMs | 连接最大空闲时间,超过后将释放(单位:毫秒)。 | 60000(一分钟) |
当dws-client为2.x版本,参数全量支持在Flink SQL通过key方式配置。下表参数为兼容1.x版本参数,当同时配置2.x和1.x参数时,2.x版本的参数值生效。
| 参数 | 说明 | 默认值 |
|---|---|---|
| conflictStrategy | 有主键表数据写入时主键冲突策略:
| update |
| writeMode | 入库方式:
| auto |
| maxFlushRetryTimes | 在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 | 3 |
| autoFlushBatchSize | 自动刷库的批大小(攒批大小)。 | 5000 |
| autoFlushMaxInterval | 自动刷库的最大间隔时间(攒批时长)。 | 5s |
| copyWriteBatchSize | 在“writeMode == auto”下,使用copy的批大小。 | 1000(2.0.0.6版本及以前默认为5000) |
| metadataCacheSeconds | 系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 | 180 |
| copyMode | copy入库格式:
| CSV |
| createTempTableMode | 创建临时表方式:
| AS |
| numberAsEpochMsForDatetime | 如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。 | false |
| stringToDatetimeFormat | 如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 | null |
| updateAll | upsert时set字段是否包含主键,hstore表在全列更新(set字段包含数据库所有字段)时不需要反查数据库,性能会更好。 | true |
| 参数 | 说明 | 默认值 |
|---|---|---|
| ignoreDelete | 忽略flink任务中的delete。 | false (1.0.10前默认true) |
| ignoreNullWhenUpdate | 是否忽略flink中字段值为null的更新,只有在“conflictStrategy == update”时有效。 | false |
| sink.parallelism | flink系统参数用于设置sink并发数量。 | 跟随上游算子 |
| printDataPk | 是否在connector接收到数据时打印数据主键,用于排查问题。 | false |
| ignoreUpdateBefore | 忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 | true |
使用Flink SQL直连DN入库
该能力依赖flink sql DISTRIBUTE BY能力,mrs有提供此能力,具体请参见Flink SQL语法增强。
connector提供udf函数可根据分布列值计算出下游并发结合flink sql DISTRIBUTE BY能力实现将数据按DN分区能力,示例:
- 需要在SQL中引入UDF。
CREATE temporary FUNCTION dn_hash AS 'com.huaweicloud.dws.connectors.flink.partition.DnHashFunction';
- 正常写Source SQL。
CREATE TABLE users ( id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000', 'fields.name.length' = '10', 'fields.age.min' = '18', 'fields.age.max' = '60', 'fields.text.length' = '5' ) - Sink表定义SQL中需要新增一个字段并且要求int类型值用于接收UDF计算的结果,示例中叫dn_hash。
create table dws_users ( dn_hash int, id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = '%s', 'tableName' = 'test.users', 'username' = '%s', 'autoFlushBatchSize' = '50000', 'password' = '%s' ) - Insert into sql使用 udf获取数据下游算子信息,同时使用DISTRIBUTE BY对返回结果做数据分区,数据就会按照udf返回信息到下游指定并行度。
insert into dws_users select /*+ DISTRIBUTEBY('dn_hash') */ dn_hash('test.users',10,1024, id) as dn_hash, * from users
UDF函数DnHashFunction参数说明
参数格式
dn_hash('dws表名',sink并行度,最大并行度,dws作为分布列的数据在源数据的字段名称{1,})
参数说明
- 使用时上游并行度必须不多于sink并行度,DnHashFunction同样是通过进程内获取sink 算子初始化的dws client实例获取到的表元数据,如果当前进程无sink算子就会导致无法获取client实例。
- 使用后会增加一个hash算子,如果链路有多个算子处理业务,当执行hash算子后不可以再有改变数据分区的算子,否则数据会被再次分区就不能到达指定sink算子。
- 最大并行度默认flink自动调整的,算法中需要使用,因此自动调整的无法使用,必须通过参数设置固定并把设置值作为UDF的参数,可以通过参数pipeline.max-parallelism设置或者jar方式通过API设置:
StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment(); evn.setParallelism(1); evn.setMaxParallelism(1024);
- 如果分布列包含多个字段,分布列的字段顺序需要保持和DWS一致,分布列支持的字段类型和dws client一致参考参数WRITE_PARTITION_POLICY,使用功能同样需要额外配置,不可自行使用。
示例
该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下:
- 在DWS数据库中创建表public.dws_order:
1 2 3 4 5 6 7 8 9 10 11
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR );
- 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'password', 'autoFlushMaxInterval' = '10s', 'autoFlushBatchSize' = '30000' ); insert into dwsSink select * from kafkaSource;
- 给Kafka写入测试数据:
1{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 等10秒后在DWS表中查询结果:
1select * from dws_order
结果如下:

使用Flink SQL直连DN基于内核动态库
- 使用约束:
- DN端口监听
默认情况下,管控面部署集群的时候只会让DN的端口监听在eth2网卡,CN端口会在eth1、eth2上监听。客户端可以通过eth1网卡在同一个VPC下连接或者给eth1绑定公网IP访问,eth2为内部通信网卡只用作集群节点间通信,模型如下图所示:

对于正常用户来说客户只能通过eth1网卡连接集群,因此要支持用户连接必须将DN的端口监听在eth1网卡,需要在每个节点上执行脚本:
eth1IP=`ifconfig eth1|grep -w inet|awk '{print $2}'` path=`ps ux |grep -w gaussdb|grep -E "primary0|standby1"|awk '{print $15}'` for p in `echo $path` ; do echo $p sed -i "s/isten_addresses = '/isten_addresses = '$eth1IP,/g" /var/chroot/$p/postgresql.conf done
- 上述配置一定要重启才会生效。
- 如果客户端在资源租户下和集群在同一个VPC下则无需此设置。
- 安全组配置
此约束针对云上集群。查询DWS集群对应的DN端口号(云上环境一般为40000),默认云上的集群没有放开DN端口的安全组,添加一条规则进行放通。

- 防火墙配置
默认防火墙不开放DN端口,放开防火墙可参考如下脚本,脚本执行需要root用户(具体脚本执行需要适配下自身环境,确保放通DN端口):
path=`ps -ef |grep -w gaussdb|grep -E "primary0|standby1"|awk '{print $12}'` for p in `echo $path` ; do port=`grep port /var/chroot/$p/postgresql.conf|head -n 1|awk '{print $3}' `sed -i "13a -A INPUT -p tcp -m tcp --dport $port -j ACCEPT" /etc/sysconfig/iptables done service iptables restart - 开启白名单
DWS集群DN上白名单默认只对集群内部放开,在其他节点无法访问,因此在管控面部署中需要放开该DN端口的白名单,可执行以下命令:
gs_guc set -Z datanode -I all -N all -h "host all all 0.0.0.0/0 sha256"
- DN端口监听
- 使用方式
在目标表的Flink SQL中添加以下两个关键的配置即可开启直连DN:
- dws.client.write.partition-policy 指定为DN后,即表示开启直连DN。
- dws.client.network.internal-privateIp 配置DN节点的IP映射,需要配置所有DN节点。internalIp为DN节点的内部IP,privateIp是客户端可以访问通的IP,格式为两个IP中间以冒号分割,不同DN之间以分号分割。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '', 'dws.client.write.partition-policy' = 'DN', 'dws.client.network.internal-privateIp' = 'internalIp:privateIp;internalIp:privateIp;......' );
- 场景限制
- 当前直连DN特性仅支持表分布列字段为int、bigint、varchar、text类型,其他类型会抛出异常。
- 当前直连DN特性仅支持SQL_ASCII、UTF-8编码格式,其他类型会抛出异常。
- 当前直连DN特性仅支持Hash类型的分布表,其他分布类型不会报错,会回退成连CN入库场景。
- 当前直连DN特性不支持有超长字符截断或非法字符替换的配置的场景。
具体来说,当DWS集群开启td_compatible_truncation,或者配置了load_auto_truncation、change_illegal_char后会回退成CN入库场景。
- 当前直连DN特性不支持数据同步期间DWS库执行以下操作:
- 修改分布列。
- 执行数据同步表的DDL语句。
- 修改DN信息,如扩缩容等。
- 直连DN特性对比直连CN的场景,主要适用于CN并发度高、资源使用率较高、达到CN性能瓶颈后的场景,此时开启直连DN性能提升较大,有明显优势。在CN未达到资源瓶颈时,由于本身MPP架构,与直连CN性能基本持平。
- 直连DN使用的动态库,引入了ICU组件,需要依赖gcc、c++组件。如果没有,需要在环境上提前安装好,并且CXXABI的版本需要>=1.3.8,GLIBC的版本需要>=3.4.21。
常见问题
- Q:writeMode参数设置什么值比较合适?
A:根据业务场景分update(只更新存在的数据)和upsert(对于同一主键数据如果存在就更新,不存在就新增一条数据)两个类型,推荐直接使用auto方式即可,该方式下会根据数据量的大小自动选择,如果数据量较大会增大攒批参数autoFlushBatchSize,即可提升入库性能。
具体来说,当使用auto作为writeMode时(默认情况下copy的数据攒批大小copyWriteBatchSize为1000),低于copyWriteBatchSize时会走upsert,高于copyWriteBatchSize时分两种情况:有主键时走copy到临时表+upsert,无主键时直接copy到目标表。
- Q:autoFlushBatchSize和autoFlushMaxInterval怎么设置比较合适? A:autoFlushBatchSize参数用于设置最大攒批条数,autoFlushMaxInterval参数用于设置最大攒批间隔,两个参数分别从时间和空间维度管控攒批。
- Q:遇到数据库死锁了怎么办?
- 行锁:该场景通常为同一主键数据的并发更新造成行锁,该情况可以通过对数据做key by解决,key by必须根据数据库主键做,保证同一个主键数据会在同一个并发中,破坏掉并发更新的条件,无法造成死锁。Flink SQL做key by需要Flink本身支持,对于DLI/MRS均能实现,如MRS flink通过增加参数“key-by-before-sink=true”可实现key by。具体怎么使用以实现方为准,对于无法使用的建议使用API方式入库。
- 分布式死锁:该场景通常为列存表的并发更新造成分布式死锁,暂无法解决,建议使用行存或者hstore。
- Q:遇到Flink的taskManager中有入库异常,但是flink ui页面上没有展示怎么办?
A:此为Flink的同步机制导致。Flink通常只感知sink算子直接抛出的异常,不会感知sink算子内部启动其他子线程抛出的异常。但是这个不影响Flink自身的重试策略。
- Q:ignoreNullWhenUpdate这个参数什么情况下使用?
A:ignoreNullWhenUpdate参数通常是为了应对部分列更新的业务而引入。基于部分列更新,入库SQL需要根据数据特征定制化SQL(即upsert时只针对有值的列进行更新),否则会存在数据覆盖的问题,导致数据不一致。但是引入该参数后可能会导致以下问题:
- 分组过多。由于配置该参数后攒的一批数据通常存在多种数据特征,所以需要对不同的数据特征的数据进行划分,每个划分后的小组执行独立的SQL进行入库。基于此情形,可能存在极端情况,即原表为大宽表,且数据特征较散,则势必会导致分组过多,导致一批中入库SQL频繁执行,没有达到大批量执行的目的。
- 部分列更新upsert性能差。此为GaussDB内核侧的性能基线—部分列upsert要比全列upsert慢3倍以上。
如果遇到上述性能问题,则需要业务侧进行数据整改,或者接受不开此参数的影响。
- Q:涉及忽略数据前后空格的场景怎么办?
A:忽略前后空格场景可以使用Flink SQL中的trim函数,如 insert into sink select trim(a),trim(b) from source; 即可达到业务效果。
- Q:涉及源端数据(如MySQL)中的varchar字段中携带有\0,同步后发现丢失怎么办?
A:丢失是正常现象,和DWS-connector工具无关。
\0一般会被认为是字符中的终止符,可能存在截断、丢失等情况。同时部分CDC工具(如MySQL-CDC工具)中具有反序列化的逻辑,会将\0字符转义成NULL,从而导致丢失。
- 规避手段1:使用其他数据类型,如varbinary,避免使用varchar,即可解决此问题。
- 规避手段2:使用Stream API方式启动作业,添加自定义的反序列化器实现此逻辑。
- 业务侧也可对此种特殊数据进行整改,避免varchar字段中携带此种数据进行入库。
- Q:当sink业务表无主键,同时指定writeMode为copy时,若此时发生故障,触发Flink重试后会导致数据重复的问题,该如何解决?
A:Flink故障后重试会基于上一次成功的Checkpoint点位进行恢复,即source算子会将消费位点回退到Checkpoint中记录的位置,那么必然会导致有一部分数据重复进行入库(上一次Checkpoint成功时间到故障发生时间这一时间范围内)。如果此时业务表无主键,那么就会再次插入一份数据,导致数据重复,或者说导致数据不一致的产生。
- 解决方案1:使用Upsert语义。配合业务表主键使用,即可达到写入结果一致的效果。
- 解决方案2:开启sink端的事务,部分数据源支持两阶段提交,即Flink先预导入数据,当sink算子的Checkpoint成功后再提交事务。但此方案dws-connector暂未支持。