Updated on 2024-11-29 GMT+08:00

Using ignoreDelete in JDBC Data Writes

Scenarios

Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.

How to Use

Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.

  • The default value of filter.record.enabled is false.
  • The default value of filter.row-kinds is UPDATE_BEFORE, DELETE.

The following is a SQL example:

CREATE TABLE user_score (
  idx varchar(20),
  user_id varchar(20),
  score bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-qk',
  'properties.bootstrap.servers' = 'xxxx:21005',
  'properties.group.id' = 'test_qk',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE dws_output (
  idx varchar(20),
  user_id varchar(20),
  all_score bigint,
  PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
  'connector' = 'jdbc',
  'driver' = 'com.xxx.gauss200.jdbc.Driver',
  'url' = 'jdbc:gaussdb://IP address of the GaussDB server:25308/postgres ',
  'table-name' = 'customer_t1',
  'username' = 'username',--Username for logging in to the GaussDB(DWS) database
  'password' = 'password',--Password for logging in to the GaussDB(DWS) database
  'filter.record.enabled' = 'true',
  'filter.row-kinds' = 'UPDATE_BEFORE'
);
insert into
  dws_output
select
  idx,
  user_id,
  sum(score) as all_score
from
  user_score
group by
  idx,
  user_id;