更新时间:2025-08-01 GMT+08:00
FlinkSQL写入JDBC数据支持ignoreDelete
本章节适用于MRS 3.3.0及以后版本。
使用场景
FlinkSQL写入JDBC数据时可以过滤掉DELETE和UPDATE_BEFORE状态的数据。
使用方法
在创建的JDBC Connector Sink流表中添加“filter.record.enabled”和“filter.row-kinds”参数。

- filter.record.enabled:用于启用记录过滤功能。配置为“true”时启用记录过滤功能,Flink会根据指定的过滤条件过滤记录。
- filter.row-kinds:用于指定需要过滤的行类型。
SQL示例如下:
CREATE TABLE user_score ( idx varchar(20), user_id varchar(20), score bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-qk', 'properties.bootstrap.servers' = 'xxxx:21005', 'properties.group.id' = 'test_qk', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE mysql_output ( idx varchar(20), user_id varchar(20), all_score bigint, PRIMARY KEY(idx, user_id) NOT ENFORCED ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL的服务器IP:MySQL的服务器端口/mysql', 'table-name' = 'customer_t1', 'username' = 'username', --连接MySQL数据库的用户名 'password' = 'password',--连接MySQL数据库的密码 'filter.record.enabled' = 'true', --表示启用记录过滤功能 'filter.row-kinds' = 'UPDATE_BEFORE' --表示在写入数据时,过滤掉UPDATE_BEFORE类型的记录 ); insert into mysql_output select idx, user_id, sum(score) as all_score from user_score group by idx, user_id;
父主题: Flink企业级能力增强