FlinkSQL Lookup Join支持PROCTIME()语法
本章节仅适用于MRS 3.6.0-LTS及之后版本。
使用场景
Flink Lookup Join语法中要求FOR SYSTEM_TIME AS OF包含proc_time字段,例如FOR SYSTEM_TIME AS OF o.proc_time。现在可以使用PROCTIME()代替表中的proc_time字段,直接使用FOR SYSTEM_TIME AS OF PROCTIME(),表中不需要再定义proc_time字段。子查询也适用。
使用方法
配置Flink作业时,Lookup Join可以直接使用PROCTIME()来代替表中proc_time字段。
SQL示例:
CREATE TABLE kafkasource(
order_id STRING,
price DECIMAL(32, 2),
currency STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_source',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
CREATE TEMPORARY TABLE customers (id STRING, name STRING, country STRING) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:ip/customerdb',
'table-name' = 'customers',
'username' = 'xxxx',
'password' = 'xxxx'
);
CREATE TABLE print (
order_id STRING,
price DECIMAL(32, 2),
name STRING,
country STRING
) WITH ('connector' = 'print');
insert into
print
SELECT
o.order_id,
o.price,
c.name,
c.country
FROM
kafkasource as o
JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() AS c ON o.order_id = c.id;