Development Rules
Ensure Data Accuracy When Aggregating Updated Data
When aggregating updated data, you need to select a proper solution. Otherwise, the aggregation result can be incorrect.
Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid;
- First batch of data: [1,1,10],[2,1,11],[3,2,8]
- Second batch of data: [2,1,12] //Update the record whose ID is 2.
Error result: [1,33], [2,8] //Updated data (ID=2) cannot be identified.
Aggregation result: [1,22], [2,8] //The result is correct because update is identified.
There are three ways to identify whether the data is updated:
- Using the state backend
The state backend stores all raw data. The new arriving data is determined as the updated data based on the status. Then, the Flink aggregation withdrawal function is used to update the aggregation result data.
Advantage: The aggregation accuracy is ensured. This solution has no requirement on data which is easy to use.
Disadvantage: When there is a large amount of data, large state backend storage is required.
- Using data in CDC format
The update operation record of CDC data contains both original data and updated data. The previous aggregation result is removed based on original data, and the latest calculation result is updated based on updated data.
Advantage: Large state backend storage is not required. The overall compute resource pressure is lower than that of the state backend solution.
Disadvantage: This solution depends on the CDC format. Typically, data is captured by a CDC collection tool and sent to Kafka, and then Flink reads Kafka data for calculation.
- Using changelog data
The changelog format is similar to the CDC format. The only difference is that the CDC format records original and updated data in one row, and the changelog format stores updated data in two rows. One row is used to delete the original data, and the other row stores insertion operation records of updated data. Flink deletes the aggregation result based on the updated data and inserts the calculation result based on the updated data. Changelog can be implemented based on Hudi tables. Data in CDC format can be converted into changelog data and stored in the log files of Hudi MOR tables. Changelog data of Hudi can also be generated based on state backends.
Advantage: Aggregation consistency of updated data can be ensured based on data lake storage.
Disadvantage:
- Only the log file of a Hudi MOR table contains changelog data. If upstream data is stacked when Flink job calculation is delayed and the log file is cleared, changelog data will be lost. You need to retain more versions and properly configure resources for Flink jobs to make the data stacking period shorter than the clearance period.
- The generation of changelogs based on the state backend also depends on the state backend. Generally, the TTL is configured for the state backend and is not retained permanently. In this scenario, the update operation is arbitrary and there is no update period limit. For example, to update data of the last month, set TTL to a value greater than one month. To update all data, set TTL to permanent (do not do so for large tables).
- Currently, the MOR table of changelogs can only be compacted by the Flink engine. Spark engine is not available for compaction.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot