Help Center/
MapReduce Service/
Component Operation Guide (LTS) (Ankara Region)/
Using Flink/
Enhancements to Flink SQL/
Using ignoreDelete in JDBC Data Writes
Updated on 2024-11-29 GMT+08:00
Using ignoreDelete in JDBC Data Writes
Scenarios
Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.
How to Use
Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.
- The default value of filter.record.enabled is false.
- The default value of filter.row-kinds is UPDATE_BEFORE, DELETE.
The following is a SQL example:
CREATE TABLE user_score (
idx varchar(20),
user_id varchar(20),
score bigint
) WITH (
'connector' = 'kafka',
'topic' = 'topic-qk',
'properties.bootstrap.servers' = 'xxxx:21005',
'properties.group.id' = 'test_qk',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
CREATE TABLE dws_output (
idx varchar(20),
user_id varchar(20),
all_score bigint,
PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
'connector' = 'jdbc',
'driver' = 'com.xxx.gauss200.jdbc.Driver',
'url' = 'jdbc:gaussdb://IP address of the GaussDB server:25308/postgres ',
'table-name' = 'customer_t1',
'username' = 'username',--Username for logging in to the GaussDB(DWS) database
'password' = 'password',--Password for logging in to the GaussDB(DWS) database
'filter.record.enabled' = 'true',
'filter.row-kinds' = 'UPDATE_BEFORE'
);
insert into
dws_output
select
idx,
user_id,
sum(score) as all_score
from
user_score
group by
idx,
user_id;
Parent topic: Enhancements to Flink SQL
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot