Help Center/ Data Lake Insight/ Flink SQL Syntax Reference/ Flink OpenSource SQL 1.15 Syntax Reference/ Connectors/ Hive/ Using Temporal Join to Associate the Latest Version of a Dimension Table
Updated on 2024-05-07 GMT+08:00

Using Temporal Join to Associate the Latest Version of a Dimension Table

Function

For Hive tables, we can read them as bounded streams. In this case, the Hive table can only track its latest version when queried. The latest version of the table retains all the data of the Hive table.

Caveats

  • Each joining subtask needs to keep its own cache of the Hive table. Mmake sure the Hive table can fit into the memory of a TM task slot.
  • It is encouraged to set a relatively large value both for streaming-source.monitor-interval (latest partition as temporal table) or lookup.join.cache.ttl (all partitions as temporal table). Otherwise, jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.
  • Currently we simply load the whole Hive table whenever the cache needs refreshing. There's no way to differentiate new data from the old.

Parameter Description

When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.

Parameter

Default Value

Data Type

Description

lookup.join.cache.ttl

60 min

Duration

The cache TTL (e.g. 10 min) for the build table in lookup join. By default the TTL is 60 minutes.

The option only works when looking up bounded hive table source, if you are using streaming hive source as temporal table, use streaming-source.monitor-interval to configure the interval of data update.

Example

The example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.

  1. Create a Hive OBS external table using Spark SQL and insert data.
    CREATE TABLE if not exists dimension_hive_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING
    ) 
    STORED AS PARQUET 
    LOCATION 'obs://demo/spark.db/dimension_hive_table' 
    PARTITIONED BY (
        create_time   STRING
    );
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
  1. Create a Flink OpenSource SQL job. Enter the following job script and submit the job. This job simulates reading data from Kafka, performs a join with a Hive dimension table to denormalize the data, and outputs it to Print.
    Change the values of the parameters in bold as needed in the following script.
    CREATE CATALOG myhive WITH (
        'type' = 'hive' ,
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    CREATE TABLE if not exists ordersSource (
      product_id STRING,
      user_name string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TOPIC',
      'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    create table if not exists print (
      product_id STRING,
      user_name string,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING,
      create_time   STRING
    ) with (
      'connector' = 'print'
    );
    
    insert into print 
    select 
      orders.product_id,
      orders.user_name,
      dim.product_name,
      dim.unit_price,
      dim.pv_count,
      dim.like_count,
      dim.comment_count,
      dim.update_time,
      dim.update_user,
      dim.create_time
    from ordersSource orders
    left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */
       for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
  2. Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
    {"product_id": "product_id_11", "user_name": "name11"}
    {"product_id": "product_id_12", "user_name": "name12"}
  3. View the data in the Print result table.
    +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1]
    +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]