更新时间:2024-12-11 GMT+08:00
分享

使用Temporal join关联维表的最新版本

功能描述

对于Hive表,我们可以将其作为有界流读出。在这种情况下,Hive表只能在查询时跟踪其最新版本。最新版本的表保留了Hive表的所有数据。

注意事项

  • 每个连接子任务都需要保留自己的Hive表缓存。请确保Hive表可以放入TM任务槽的内存中。
  • 建议为streaming-source.monitor-interval(最新分区作为临时表)或 lookup.join.cache.ttl(所有分区作为临时表)设置一个相对较大的值。否则,作业容易出现性能问题,避免表更新和重新加载过于频繁。
  • 缓存刷新需加载整个Hive表。无法区分新数据和旧数据。

参数说明

在执行与最新的Hive表的时间关联时,Hive表将被缓存到Slot内存中,然后通过键将流中的每条记录与表进行关联,以确定是否找到匹配项。将最新的Hive表用作时间表不需要任何额外的配置。使用以下属性配置Hive表缓存的TTL。在缓存过期后,将重新扫描Hive表以加载最新的数据。

参数

默认值

类型

说明

lookup.join.cache.ttl

60 min

Duration

查找连接中构建表的缓存 TTL(例如 10 分钟)。默认情况下,TTL 为 60 分钟。

该选项仅在查找有界的 hive 表源时有效,如果您使用流式 hive 源作为时态表,请使用 streaming-source.monitor-interval 配置数据更新间隔。

示例

该示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表连接以扩充流。

  1. 使用spark sql 创建 hive obs 外表,并插入数据。
    CREATE TABLE if not exists dimension_hive_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING
    ) 
    STORED AS PARQUET 
    LOCATION 'obs://demo/spark.db/dimension_hive_table' 
    PARTITIONED BY (
        create_time   STRING
    );
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
    INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
  1. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。
    如下脚本中的加粗参数请根据实际环境修改。
    CREATE CATALOG myhive WITH (
        'type' = 'hive' ,
        'default-database' = 'demo',
         'hive-conf-dir' = '/opt/flink/conf'
    );
    
    USE CATALOG myhive;
    
    CREATE TABLE if not exists ordersSource (
      product_id STRING,
      user_name string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TOPIC',
      'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    create table if not exists print (
      product_id STRING,
      user_name string,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP,
      update_user STRING,
      create_time   STRING
    ) with (
      'connector' = 'print'
    );
    
    insert into print 
    select 
      orders.product_id,
      orders.user_name,
      dim.product_name,
      dim.unit_price,
      dim.pv_count,
      dim.like_count,
      dim.comment_count,
      dim.update_time,
      dim.update_user,
      dim.create_time
    from ordersSource orders
    left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */
       for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
  2. 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
    {"product_id": "product_id_11", "user_name": "name11"}
    {"product_id": "product_id_12", "user_name": "name12"}
  3. 查看print结果表数据。
    +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1]
    +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]

相关文档