Updated on 2024-12-13 GMT+08:00

Using ignoreDelete in JDBC Data Writes

This section applies to MRS 3.3.0 or later.

Scenarios

Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.

How to Use

Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.

  • The default value of filter.record.enabled is false.
  • The default value of filter.row-kinds is UPDATE_BEFORE, DELETE.

The following is a SQL example:

CREATE TABLE user_score (
  idx varchar(20),
  user_id varchar(20),
  score bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-qk',
  'properties.bootstrap.servers' = 'xxxx:21005',
  'properties.group.id' = 'test_qk',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE mysql_output (
  idx varchar(20),
  user_id varchar(20),
  all_score bigint,
  PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://IP address of the MySQL server:MySQL server port/mysql',
  'table-name' = 'customer_t1',
  'username' = 'username', --Username for connecting the MySQL database
  'password' = 'password',--Password for connecting the MySQL database
  'filter.record.enabled' = 'true',
  'filter.row-kinds' = 'UPDATE_BEFORE'
);
insert into
  mysql_output
select
  idx,
  user_id,
  sum(score) as all_score
from
  user_score
group by
  idx,
  user_id;