更新时间:2024-11-29 GMT+08:00
FlinkSQL写入JDBC数据支持ignoreDelete
使用场景
FlinkSQL写入JDBC数据时可以过滤掉DELETE和UPDATE_BEFORE状态的数据。
使用方法
在创建的JDBC Connector Sink流表中添加“filter.record.enabled”和“filter.row-kinds”参数。
- “filter.record.enabled”默认值为“false”。
- “filter.row-kinds”默认值为“UPDATE_BEFORE, DELETE”。
SQL示例如下:
CREATE TABLE user_score (
idx varchar(20),
user_id varchar(20),
score bigint
) WITH (
'connector' = 'kafka',
'topic' = 'topic-qk',
'properties.bootstrap.servers' = 'xxxx:21005',
'properties.group.id' = 'test_qk',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
CREATE TABLE dws_output (
idx varchar(20),
user_id varchar(20),
all_score bigint,
PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
'connector' = 'jdbc',
'driver' = 'com.xxx.gauss200.jdbc.Driver',
'url' = 'jdbc:gaussdb://GaussDB的服务器IP:25308/postgres',
'table-name' = 'customer_t1',
'username' = 'username', --连接GaussDB(DWS)数据库的用户名
'password' = 'password',--连接GaussDB(DWS)数据库的密码
'filter.record.enabled' = 'true',
'filter.row-kinds' = 'UPDATE_BEFORE'
);
insert into
dws_output
select
idx,
user_id,
sum(score) as all_score
from
user_score
group by
idx,
user_id;
父主题: FlinkSQL特性增强