FlinkSQL支持使用Processing Time Temporal Join
使用场景
FlinkSQL业务维表数据量较大,TaskManager内存无法承载所有维表数据时,可以使用Processing Time Temporal Join特性,将业务维表数据加载进状态后端。
使用限制
- 该特性仅支持Iceberg Connector类型的维表。
- Lookup算子与PTTJ算子的快照互相不兼容。
- PTTJ算子会根据Join Key对数据去重,所以该特性仅适用于维表数据根据Join Key唯一存在的场景。
- table.exec.state.ttl参数仅对PTTJ算子的Right State生效。
- 本章节仅适用于MRS 3.6.0-LTS及之后版本。
使用方法
在FlinkSQL中添加/*+ LOOKUP('enabled'='false') */,同时配置Flink作业时,在FlinkServer WebUI的Flink作业开发界面添加以下自定义参数开启PTTJ算子能力。可参考创建FlinkServer作业。
- 配置“execution.checkpointing.interval-during-backlog”参数值为“0”。
- 配置“execution.backlog-data-processing.enabled”参数值为“true”。
SQL示例
- Iceberg Connector使用Flip27 Source示例:
CREATE TABLE iceberg_table (id BIGINT, data STRING) WITH ( 'connector' = 'iceberg', 'catalog-name' = 'hive_prod', 'catalog-database' = 'default', 'catalog-table' = 'iceberg_table', 'uri' = '连接MetaStore的URI配置', 'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数 ); CREATE TEMPORARY TABLE datagen (id BIGINT, proctime as PROCTIME ()) WITH ('connector' = 'datagen'); CREATE TABLE print WITH ('connector' = 'print') AS SELECT /*+ LOOKUP('enabled'='false') */ * FROM datagen left JOIN iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s', 'starting-strategy'='TABLE_SCAN_THEN_INCREMENTAL')*/ FOR SYSTEM_TIME AS OF datagen.proctime AS t1 ON datagen.id = t1.id; - Iceberg Connector不使用Flip27 Source示例:
set table.exec.iceberg.use-flip27-source=false; CREATE TABLE iceberg_table (id BIGINT, data STRING) WITH ( 'connector' = 'iceberg', 'catalog-name' = 'hive_prod', 'catalog-database' = 'default', 'catalog-table' = 'iceberg_table', 'uri' = '连接MetaStore的URI配置', 'hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' --普通模式集群不需要该参数 ); CREATE TEMPORARY TABLE datagen (id BIGINT, proctime as PROCTIME ()) WITH ('connector' = 'datagen'); CREATE TABLE print WITH ('connector' = 'print') AS SELECT /*+ LOOKUP('enabled'='false') */ * FROM datagen left JOIN iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/ FOR SYSTEM_TIME AS OF datagen.proctime AS t1 ON datagen.id = t1.id;