Using ignoreDelete in JDBC Data Writes by Flink SQL
This section applies to MRS 3.3.0 or later.
Scenarios
Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.
How to Use
Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.
- filter.record.enabled: Whether to enable the record filtering function. If this parameter is set to true, the record filtering function is enabled. Flink filters records based on the specified filter criteria.
- filter.row-kinds: Row type to be filtered.
The following is a SQL example:
CREATE TABLE user_score ( idx varchar(20), user_id varchar(20), score bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-qk', 'properties.bootstrap.servers' = 'xxxx:21005', 'properties.group.id' = 'test_qk', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE mysql_output ( idx varchar(20), user_id varchar(20), all_score bigint, PRIMARY KEY(idx, user_id) NOT ENFORCED ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL server IP address:MySQL server port/mysql', 'table-name' = 'customer_t1', 'username' = 'username', --Username for connecting the MySQL database 'password' = 'password',--Password for connecting the MySQL database 'filter.record.enabled' = 'true', --Enables the record filtering function. 'filter.row-kinds' = 'UPDATE_BEFORE' --Filters out UPDATE_BEFORE records during data writes. ); insert into mysql_output select idx, user_id, sum(score) as all_score from user_score group by idx, user_id;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.