做结果表
格式语法
SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。
1 2 3 4 5 6 7 8 9 10 11 12 |
create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
Flink SQL配置参数
Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。
参数 |
说明 |
默认值 |
---|---|---|
connector |
flink框架区分connector参数,固定为dws。 |
- |
url |
数据库连接地址。 |
- |
username |
配置连接用户。 |
- |
password |
配置密码。 |
- |
tableName |
对应dws表。 |
- |
参数 |
说明 |
默认值 |
---|---|---|
connectionSize |
初始dws-client时的并发数量。 |
1 |
connectionMaxUseTimeSeconds |
连接创建多少秒后强制释放(单位:秒)。 |
3600(一小时) |
connectionMaxIdleMs |
连接最大空闲时间,超过后将释放(单位:毫秒)。 |
60000(一分钟) |
参数 |
说明 |
默认值 |
---|---|---|
conflictStrategy |
有主键表数据写入时主键冲突策略:
|
update |
writeMode |
入库方式:
|
auto |
maxFlushRetryTimes |
在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 |
3 |
autoFlushBatchSize |
自动刷库的批大小(攒批大小)。 |
5000 |
autoFlushMaxInterval |
自动刷库的最大间隔时间(攒批时长)。 |
5s |
copyWriteBatchSize |
在“writeMode == auto”下,使用copy的批大小。 |
5000 |
ignoreDelete |
忽略flink任务中的delete。 |
false (1.0.10前默认true) |
ignoreNullWhenUpdate |
是否忽略flink中字段值为null的更新,只有在“conflictStrategy == update”时有效。 |
false |
metadataCacheSeconds |
系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 |
180 |
copyMode |
copy入库格式:
|
CSV |
createTempTableMode |
创建临时表方式:
|
AS |
numberAsEpochMsForDatetime |
如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。 |
false |
stringToDatetimeFormat |
如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 |
null |
sink.parallelism |
flink系统参数用于设置sink并发数量。 |
跟随上游算子 |
printDataPk |
是否在connector接收到数据时打印数据主键,用于排查问题。 |
false |
ignoreUpdateBefore |
忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 |
true |
示例
该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下:
- 在GaussDB(DWS)数据库中创建表public.dws_order:
1 2 3 4 5 6 7 8 9 10 11
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR );
- 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'autoFlushMaxInterval' = '10s', 'autoFlushBatchSize' = '30000' ); insert into dwsSink select * from kafkaSource;
- 给Kafka写入测试数据:
1
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 等10秒后在GaussDB(DWS)表中查询结果:
1
select * from dws_order
结果如下:
常见问题
- Q:writeMode参数设置什么值比较合适?
A:根据业务场景分update(只更新存在的数据)和upsert(对于同一主键数据如果存在就更新,不存在就新增一条数据)两个类型,推荐直接使用auto方式即可,该方式下会根据数据量的大小自动选择,如果数据量较大会增大攒批参数autoFlushBatchSize,即可提升入库性能。
- Q:autoFlushBatchSize和autoFlushMaxInterval怎么设置比较合适?
A:autoFlushBatchSize参数用于设置最大攒批条数,autoFlushMaxInterval参数用于设置最大攒批间隔,两个参数分别从时间和空间维度管控攒批。
- Q: 遇到数据库死锁了怎么办?
- 行锁:该场景通常为同一主键数据的并发更新造成行锁,该情况可以通过对数据做key by解决,key by必须根据数据库主键做,保证同一个主键数据会在同一个并发中,破坏掉并发更新的条件,无法造成死锁。Flink SQL做key by需要Flink本身支持,对于DLI/MRS均能实现,如MRS flink通过增加参数“key-by-before-sink=true”可实现key by。具体怎么使用以实现方为准,对于无法使用的建议使用API方式入库。
- 分布式死锁:该场景通常为列存表的并发更新造成分布式死锁,暂无法解决,建议使用行存或者hstore。