Using Temporal Join to Associate the Latest Version of a Dimension Table
Function
For Hive tables, we can read them as bounded streams. In this case, the Hive table can only track its latest version when queried. The latest version of the table retains all the data of the Hive table.
Caveats
- Each joining subtask needs to keep its own cache of the Hive table. Make sure the Hive table can fit into the memory of a TM task slot.
- It is encouraged to set a relatively large value both for streaming-source.monitor-interval (latest partition as temporal table) or lookup.join.cache.ttl (all partitions as temporal table). Otherwise, jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.
- Currently we simply load the whole Hive table whenever the cache needs refreshing. There's no way to differentiate new data from the old.
Parameter Description
When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.
Parameter |
Default Value |
Data Type |
Description |
---|---|---|---|
lookup.join.cache.ttl |
60 min |
Duration |
The cache TTL (e.g. 10 min) for the build table in lookup join. By default the TTL is 60 minutes. The option only works when looking up bounded hive table source, if you are using streaming hive source as temporal table, use streaming-source.monitor-interval to configure the interval of data update. |
Example
The example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.
- Create a Hive OBS external table using Spark SQL and insert data.
CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING );
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
- Create a Flink OpenSource SQL job. Enter the following job script and submit the job. This job simulates reading data from Kafka, performs a join with a Hive dimension table to denormalize the data, and outputs it to Print.
Change the values of the parameters in bold as needed in the following script.
CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
- Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
{"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"}
- View the data in the Print result table.
+I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot