更新时间:2025-12-10 GMT+08:00
分享

FlinkSQL Lookup Join支持PROCTIME()语法

本章节仅适用于MRS 3.6.0-LTS及之后版本。

使用场景

Flink Lookup Join语法中要求FOR SYSTEM_TIME AS OF包含proc_time字段,例如FOR SYSTEM_TIME AS OF o.proc_time。现在可以使用PROCTIME()代替表中的proc_time字段,直接使用FOR SYSTEM_TIME AS OF PROCTIME(),表中不需要再定义proc_time字段。子查询也适用。

使用方法

配置Flink作业时,Lookup Join可以直接使用PROCTIME()来代替表中proc_time字段。

SQL示例:

CREATE TABLE kafkasource(
  order_id STRING,
  price DECIMAL(32, 2),
  currency STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_source',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
CREATE TEMPORARY TABLE customers (id STRING, name STRING, country STRING) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:ip/customerdb',
  'table-name' = 'customers',
  'username' = 'xxxx',
  'password' = 'xxxx'
);
CREATE TABLE print (
  order_id STRING,
  price DECIMAL(32, 2),
  name STRING,
  country STRING
) WITH ('connector' = 'print');
insert into
  print
SELECT
  o.order_id,
  o.price,
  c.name,
  c.country
FROM
  kafkasource as o
  JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() AS c ON o.order_id = c.id;

相关文档