Using ignoreDelete in JDBC Data Writes
This section applies to MRS 3.3.0 or later.
Scenarios
Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.
How to Use
Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.
- The default value of filter.record.enabled is false.
- The default value of filter.row-kinds is UPDATE_BEFORE, DELETE.
The following is a SQL example:
CREATE TABLE user_score ( idx varchar(20), user_id varchar(20), score bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-qk', 'properties.bootstrap.servers' = 'xxxx:21005', 'properties.group.id' = 'test_qk', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE mysql_output ( idx varchar(20), user_id varchar(20), all_score bigint, PRIMARY KEY(idx, user_id) NOT ENFORCED ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP address of the MySQL server:MySQL server port/mysql', 'table-name' = 'customer_t1', 'username' = 'username', --Username for connecting the MySQL database 'password' = 'password',--Password for connecting the MySQL database 'filter.record.enabled' = 'true', 'filter.row-kinds' = 'UPDATE_BEFORE' ); insert into mysql_output select idx, user_id, sum(score) as all_score from user_score group by idx, user_id;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot