Using Temporal Join to Associate the Latest Partition of a Dimension Table
Function
For partitioned tables that change over time, we can read them as unbounded streams. If each partition contains a complete set of data for a certain version, the partition can be considered as a version of the temporal table, which retains the data of the partition. Flink supports automatically tracking the latest partition (version) of the temporal table in processing-time joins.
The latest partition (version) is defined by the streaming-source.partition-order parameter.
This is the most common use case for using Hive tables as dimension tables in Flink streaming applications.
Caveats
Using Temporal join to associate the latest partition of a dimension table is only supported in Flink STREAMING mode.
Example
The following example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.
- Create a Hive OBS external table using Spark SQL and insert data.
CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING );
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
- Create a Flink OpenSource SQL job. Enter the following job script and submit the job. This job simulates reading data from Kafka, performs a join with a Hive dimension table to denormalize the data, and outputs it to Print.
Change the values of the parameters in bold as needed in the following script.
CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
- Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
{"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"}
- View the data in the Print result table.
+I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
- Simulate inserting new partition data into the Hive dimension table.
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
- Connect to the Kafka cluster and insert the following test data into the source topic in Kafka. Associate the data from the previous partition with create_time='create_time_1':
{"product_id": "product_id_13", "user_name": "name13"}
- View the data in the Print result table. The data of the previous partition create_time='create_time_1' in the Hive dimension table has been deleted.
+I[product_id_13, name13, null, null, null, null, null, null, null, null]
- Connect to the Kafka cluster and insert the following test data into the source topic in Kafka. Associate the latest partition data with create_time='create_time_2':
{"product_id": "product_id_21", "user_name": "name21"}
- View the data in the Print result table. The Hive dimension table retains the data of the latest partition with create_time='create_time_2'.
+I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot