How Do I Use Print Connector to Locate Data Problems in Flink?
Symptom
You may encounter the situations where the data is not processed as expected when using Flink. For example, some data that should be deleted is still present in the final output.
Solution
The causes of this issue can be varied. You can use print connector to print out the data processed by Flink and locate the problem based on the data.
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE PrintSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'print' ); Insert into PrintSink select * from KafkaSource;
Case Description
When CDC is used to read MySQL data and write the data to another database, the data that should have been deleted still exists, causing a data error. You can run the print connector command to locate the fault.
Procedure
- Sample data.
32 26 33 16 35 16
name_table data
32 zw 33 ls 35 ww
- Sample SQL.
CREATE TABLE age_table( id int, `age` int, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '8.5.193.13', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test', 'table-name' = 'age_table' ); CREATE TABLE name_table( id int, `name` string, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '8.5.193.13', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test', 'table-name' = 'name_table' ); CREATE TABLE PrintSink( id int, `age` int, `name` string, currentdata timestamp, PRIMARY KEY(id) NOT ENFORCED ) WITH ('connector' = 'print'); insert into PrintSink select a.id * 10 as id, a.`age`, b.`name`, now() from age_table a join name_table b on a.id = b.id;
- The following figure shows the information printed when a job is started.
- Run the following command to update data in the table:
update name_table set name='aaaa' where id=32;
- Check the updated output. The printed data contains only +U but not -U. After multiple attempts, it was found that there was no -U data in the result. It was suspected that SinkUpsertMaterializer is intercepting the -U data.
- After an analysis of the SQL and SinkUpsertMaterializer principles, it was found that the issue was caused by the now() function in the SQL. The now() function returned the current time each time it was called, so the data that entered the SinkUpsertMaterializer operator was always different. The issue was resolved after the now() function was removed.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot