Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Common Issues About Flink/ How Do I Use Print Connector to Locate Data Problems in Flink?
Updated on 2025-08-15 GMT+08:00

How Do I Use Print Connector to Locate Data Problems in Flink?

Symptom

You may encounter the situations where the data is not processed as expected when using Flink. For example, some data that should be deleted is still present in the final output.

Solution

The causes of this issue can be varied. You can use print connector to print out the data processed by Flink and locate the problem based on the data.

How to use the print connector:
CREATE TABLE KafkaSource (
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE PrintSink(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'print'
);
Insert into
  PrintSink
select
  *
from
  KafkaSource;

Case Description

When CDC is used to read MySQL data and write the data to another database, the data that should have been deleted still exists, causing a data error. You can run the print connector command to locate the fault.

Procedure

  1. Sample data.

    age_table data

    32 26
    33 16
    35 16

    name_table data

    32 zw
    33 ls
    35 ww
  2. Sample SQL.
    CREATE TABLE age_table(
      id int,
      `age` int,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '8.5.193.13',
      'port' = '3306',
      'username' = 'root',
      'password' = '',
      'database-name' = 'test',
      'table-name' = 'age_table'
    );
    CREATE TABLE name_table(
      id int,
      `name` string,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '8.5.193.13',
      'port' = '3306',
      'username' = 'root',
      'password' = '',
      'database-name' = 'test',
      'table-name' = 'name_table'
    );
    CREATE TABLE PrintSink(
      id int,
      `age` int,
      `name` string,
      currentdata timestamp,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH ('connector' = 'print');
    insert into
      PrintSink
    select
      a.id * 10 as id,
      a.`age`,
      b.`name`,
      now()
    from
      age_table a
      join name_table b on a.id = b.id;
  3. The following figure shows the information printed when a job is started.

  4. Run the following command to update data in the table:
    update name_table set name='aaaa' where id=32;
  5. Check the updated output. The printed data contains only +U but not -U. After multiple attempts, it was found that there was no -U data in the result. It was suspected that SinkUpsertMaterializer is intercepting the -U data.

  6. After an analysis of the SQL and SinkUpsertMaterializer principles, it was found that the issue was caused by the now() function in the SQL. The now() function returned the current time each time it was called, so the data that entered the SinkUpsertMaterializer operator was always different. The issue was resolved after the now() function was removed.