更新时间:2025-07-14 GMT+08:00

Flink使用print connector定位数据问题

问题现象

在使用Flink过程中有时会遇到数据处理结果与预想不一致的问题。比如通过与原端对比,有的数据应该删除,但在最终处理结果中并没有被删除。

解决方法

造成这种问题的原因较多,可以借用print connector将Flink处理的数据打印出来,通过查看数据的输出结果来定位问题。

print connector使用方式:
CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE PrintSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'print'
);
Insert into
  PrintSink
select
  *
from
  KafkaSource;

案例说明

在使用CDC读取MySQL数据写入其他数据库时,发现本来应该删除的数据还存在,导致出现数据问题。可以使用print connector定位问题。

操作步骤

  1. 样例数据。

    age_table 表数据

    32 26
    33 16
    35 16

    name_table 表数据

    32 zw
    33 ls
    35 ww
  2. 样例SQL:
    CREATE TABLE age_table(
      id int,
      `age` int,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '8.5.193.13',
      'port' = '3306',
      'username' = 'root',
      'password' = '',
      'database-name' = 'test',
      'table-name' = 'age_table'
    );
    CREATE TABLE name_table(
      id int,
      `name` string,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '8.5.193.13',
      'port' = '3306',
      'username' = 'root',
      'password' = '',
      'database-name' = 'test',
      'table-name' = 'name_table'
    );
    CREATE TABLE PrintSink(
      id int,
      `age` int,
      `name` string,
      currentdata timestamp,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH ('connector' = 'print');
    insert into
      PrintSink
    select
      a.id * 10 as id,
      a.`age`,
      b.`name`,
      now()
    from
      age_table a
      join name_table b on a.id = b.id;
  3. 作业启动时打印如下图所示。

  4. 执行以下命令,更新表中数据。
    update name_table set name='aaaa' where id=32;
  5. 查看更新后的输出结果,打印数据只有+U而没有-U。经过多次尝试,发现都没有-U数据,怀疑是SinkUpsertMaterializer将-U数据拦截了。

  6. 后经过对SQL及SinkUpsertMaterializer原理分析发现是SQL中加了now()函数,now()函数每次都会取当前时间,导致进入SinkUpsertMaterializer算子的数据都不一样 ,去除now()函数后正常。