Help Center/ Data Lake Insight/ Flink SQL Syntax Reference/ Flink OpenSource SQL 1.15 Syntax Reference/ Connectors/ Hive/ Using Temporal Join to Associate the Latest Partition of a Dimension Table
Updated on 2024-05-07 GMT+08:00

Using Temporal Join to Associate the Latest Partition of a Dimension Table

Function

For partitioned tables that change over time, we can read them as unbounded streams. If each partition contains a complete set of data for a certain version, the partition can be considered as a version of the temporal table, which retains the data of the partition. Flink supports automatically tracking the latest partition (version) of the temporal table in processing-time joins.

The latest partition (version) is defined by the streaming-source.partition-order parameter.

This is the most common use case for using Hive tables as dimension tables in Flink streaming applications.

Caveats

Using Temporal join to associate the latest partition of a dimension table is only supported in Flink STREAMING mode.

Example

The following example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.

  1. Create a Hive OBS external table using Spark SQL and insert data.
    CREATE TABLE if not exists dimension_hive_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING
    ) 
    STORED AS PARQUET 
    LOCATION 'obs://demo/spark.db/dimension_hive_table' 
    PARTITIONED BY (
        create_time   STRING
    );
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
  1. Create a Flink OpenSource SQL job. Enter the following job script and submit the job. This job simulates reading data from Kafka, performs a join with a Hive dimension table to denormalize the data, and outputs it to Print.
    Change the values of the parameters in bold as needed in the following script.
    CREATE CATALOG myhive WITH (
        'type' = 'hive' ,
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    CREATE TABLE if not exists ordersSource (
      product_id STRING,
      user_name string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TOPIC',
      'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    create table if not exists print (
      product_id STRING,
      user_name string,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING,
      create_time   STRING
    ) with (
      'connector' = 'print'
    );
    
    insert into print 
    select 
      orders.product_id,
      orders.user_name,
      dim.product_name,
      dim.unit_price,
      dim.pv_count,
      dim.like_count,
      dim.comment_count,
      dim.update_time,
      dim.update_user,
      dim.create_time
    from ordersSource orders
    left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true',
       'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */
       for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
  2. Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
    {"product_id": "product_id_11", "user_name": "name11"}
    {"product_id": "product_id_12", "user_name": "name12"}
  3. View the data in the Print result table.
    +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1]
    +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
  4. Simulate inserting new partition data into the Hive dimension table.
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
  5. Connect to the Kafka cluster and insert the following test data into the source topic in Kafka. Associate the data from the previous partition with create_time='create_time_1':
     {"product_id": "product_id_13", "user_name": "name13"}
  6. View the data in the Print result table. The data of the previous partition create_time='create_time_1' in the Hive dimension table has been deleted.
      +I[product_id_13, name13, null, null, null, null, null, null, null, null]
  7. Connect to the Kafka cluster and insert the following test data into the source topic in Kafka. Associate the latest partition data with create_time='create_time_2':
     {"product_id": "product_id_21", "user_name": "name21"}
  8. View the data in the Print result table. The Hive dimension table retains the data of the latest partition with create_time='create_time_2'.
      +I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]