更新时间:2024-11-29 GMT+08:00

FlinkSQL写入JDBC数据支持ignoreDelete

使用场景

FlinkSQL写入JDBC数据时可以过滤掉DELETE和UPDATE_BEFORE状态的数据。

使用方法

在创建的JDBC Connector Sink流表中添加“filter.record.enabled”和“filter.row-kinds”参数。

  • “filter.record.enabled”默认值为“false”。
  • “filter.row-kinds”默认值为“UPDATE_BEFORE, DELETE”。

SQL示例如下:

CREATE TABLE user_score (
  idx varchar(20),
  user_id varchar(20),
  score bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-qk',
  'properties.bootstrap.servers' = 'xxxx:21005',
  'properties.group.id' = 'test_qk',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE dws_output (
  idx varchar(20),
  user_id varchar(20),
  all_score bigint,
  PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
  'connector' = 'jdbc',
  'driver' = 'com.xxx.gauss200.jdbc.Driver',
  'url' = 'jdbc:gaussdb://GaussDB的服务器IP:25308/postgres',
  'table-name' = 'customer_t1',
  'username' = 'username', --连接GaussDB(DWS)数据库的用户名
  'password' = 'password',--连接GaussDB(DWS)数据库的密码
  'filter.record.enabled' = 'true',
  'filter.row-kinds' = 'UPDATE_BEFORE'
);
insert into
  dws_output
select
  idx,
  user_id,
  sum(score) as all_score
from
  user_score
group by
  idx,
  user_id;