更新时间:2024-11-26 GMT+08:00
FlinkSQL写入JDBC数据支持ignoreDelete
本章节适用于MRS 3.3.0及以后版本。
使用场景
FlinkSQL写入JDBC数据时可以过滤掉DELETE和UPDATE_BEFORE状态的数据。
使用方法
在创建的JDBC Connector Sink流表中添加“filter.record.enabled”和“filter.row-kinds”参数。
- “filter.record.enabled”默认值为“false”。
- “filter.row-kinds”默认值为“UPDATE_BEFORE, DELETE”。
SQL示例如下:
CREATE TABLE user_score ( idx varchar(20), user_id varchar(20), score bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-qk', 'properties.bootstrap.servers' = 'xxxx:21005', 'properties.group.id' = 'test_qk', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE mysql_output ( idx varchar(20), user_id varchar(20), all_score bigint, PRIMARY KEY(idx, user_id) NOT ENFORCED ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL的服务器IP:MySQL的服务器端口/mysql', 'table-name' = 'customer_t1', 'username' = 'username', --连接MySQL数据库的用户名 'password' = 'password',--连接MySQL数据库的密码 'filter.record.enabled' = 'true', 'filter.row-kinds' = 'UPDATE_BEFORE' ); insert into mysql_output select idx, user_id, sum(score) as all_score from user_score group by idx, user_id;
父主题: Flink企业级能力增强