更新时间:2025-07-14 GMT+08:00
Flink使用print connector定位数据问题
问题现象
在使用Flink过程中有时会遇到数据处理结果与预想不一致的问题。比如通过与原端对比,有的数据应该删除,但在最终处理结果中并没有被删除。
解决方法
造成这种问题的原因较多,可以借用print connector将Flink处理的数据打印出来,通过查看数据的输出结果来定位问题。
print connector使用方式:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE PrintSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'print' ); Insert into PrintSink select * from KafkaSource;
案例说明
在使用CDC读取MySQL数据写入其他数据库时,发现本来应该删除的数据还存在,导致出现数据问题。可以使用print connector定位问题。
操作步骤
- 样例数据。
32 26 33 16 35 16
name_table 表数据
32 zw 33 ls 35 ww
- 样例SQL:
CREATE TABLE age_table( id int, `age` int, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '8.5.193.13', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test', 'table-name' = 'age_table' ); CREATE TABLE name_table( id int, `name` string, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '8.5.193.13', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test', 'table-name' = 'name_table' ); CREATE TABLE PrintSink( id int, `age` int, `name` string, currentdata timestamp, PRIMARY KEY(id) NOT ENFORCED ) WITH ('connector' = 'print'); insert into PrintSink select a.id * 10 as id, a.`age`, b.`name`, now() from age_table a join name_table b on a.id = b.id;
- 作业启动时打印如下图所示。
- 执行以下命令,更新表中数据。
update name_table set name='aaaa' where id=32;
- 查看更新后的输出结果,打印数据只有+U而没有-U。经过多次尝试,发现都没有-U数据,怀疑是SinkUpsertMaterializer将-U数据拦截了。
- 后经过对SQL及SinkUpsertMaterializer原理分析发现是SQL中加了now()函数,now()函数每次都会取当前时间,导致进入SinkUpsertMaterializer算子的数据都不一样 ,去除now()函数后正常。
父主题: Flink常见问题