Updated on 2025-08-22 GMT+08:00

Using ignoreDelete in JDBC Data Writes by Flink SQL

This section applies to MRS 3.3.0 or later.

Scenarios

Data in DELETE and UPDATE_BEFORE states can be filtered out when FlinkSQL writes JDBC data.

How to Use

Add filter.record.enabled and filter.row-kinds parameters to a created JDBC Connector Sink stream table.

  • filter.record.enabled: Whether to enable the record filtering function. If this parameter is set to true, the record filtering function is enabled. Flink filters records based on the specified filter criteria.
  • filter.row-kinds: Row type to be filtered.

The following is a SQL example:

CREATE TABLE user_score (
  idx varchar(20),
  user_id varchar(20),
  score bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-qk',
  'properties.bootstrap.servers' = 'xxxx:21005',
  'properties.group.id' = 'test_qk',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE mysql_output (
  idx varchar(20),
  user_id varchar(20),
  all_score bigint,
  PRIMARY KEY(idx, user_id) NOT ENFORCED
) WITH(
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://MySQL server IP address:MySQL server port/mysql',
  'table-name' = 'customer_t1',
  'username' = 'username', --Username for connecting the MySQL database
  'password' = 'password',--Password for connecting the MySQL database
  'filter.record.enabled' = 'true', --Enables the record filtering function.
  'filter.row-kinds' = 'UPDATE_BEFORE' --Filters out UPDATE_BEFORE records during data writes.
);
insert into
  mysql_output
select
  idx,
  user_id,
  sum(score) as all_score
from
  user_score
group by
  idx,
  user_id;