更新时间:2025-12-10 GMT+08:00
分享

FlinkSQL支持使用Processing Time Temporal Join

使用场景

FlinkSQL业务维表数据量较大,TaskManager内存无法承载所有维表数据时,可以使用Processing Time Temporal Join特性,将业务维表数据加载进状态后端。

使用限制

  • 该特性仅支持Iceberg Connector类型的维表。
  • Lookup算子与PTTJ算子的快照互相不兼容。
  • PTTJ算子会根据Join Key对数据去重,所以该特性仅适用于维表数据根据Join Key唯一存在的场景。
  • table.exec.state.ttl参数仅对PTTJ算子的Right State生效。
  • 本章节仅适用于MRS 3.6.0-LTS及之后版本。

使用方法

在FlinkSQL中添加/*+ LOOKUP('enabled'='false') */,同时配置Flink作业时,在FlinkServer WebUI的Flink作业开发界面添加以下自定义参数开启PTTJ算子能力。可参考创建FlinkServer作业

  • 配置“execution.checkpointing.interval-during-backlog”参数值为“0”。
  • 配置“execution.backlog-data-processing.enabled”参数值为“true”。

SQL示例

  • Iceberg Connector使用Flip27 Source示例:
    CREATE TABLE iceberg_table (id BIGINT, data STRING)
    WITH
      (
        'connector' = 'iceberg',
        'catalog-name' = 'hive_prod',
        'catalog-database' = 'default',
        'catalog-table' = 'iceberg_table',
        'uri' = '连接MetaStore的URI配置',
        'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数
      );
    CREATE TEMPORARY TABLE datagen (id BIGINT, proctime as PROCTIME ())
    WITH
      ('connector' = 'datagen');
    CREATE TABLE print
    WITH
      ('connector' = 'print') AS
    SELECT
      /*+ LOOKUP('enabled'='false') */ *
    FROM
      datagen
      left JOIN iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s', 'starting-strategy'='TABLE_SCAN_THEN_INCREMENTAL')*/ FOR SYSTEM_TIME AS OF datagen.proctime AS t1 ON datagen.id = t1.id;
  • Iceberg Connector不使用Flip27 Source示例:
    set table.exec.iceberg.use-flip27-source=false;
    CREATE TABLE iceberg_table (id BIGINT, data STRING)
    WITH
      (
        'connector' = 'iceberg',
        'catalog-name' = 'hive_prod',
        'catalog-database' = 'default',
        'catalog-table' = 'iceberg_table',
        'uri' = '连接MetaStore的URI配置',
        'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数
      );
    CREATE TEMPORARY TABLE datagen (id BIGINT, proctime as PROCTIME ())
    WITH
      ('connector' = 'datagen');
    CREATE TABLE print
    WITH
      ('connector' = 'print') AS
    SELECT
      /*+ LOOKUP('enabled'='false') */ *
    FROM
      datagen
      left JOIN iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/ FOR SYSTEM_TIME AS OF datagen.proctime AS t1 ON datagen.id = t1.id;

相关文档