FlinkSQL HBase支持Batch Lookup Join
使用场景
Lookup Join是FlinkSQL的一种数据连接方式,用于执行输入流与位于某个远程数据库(维度表)上的表的连接,对于输入流中的每条记录,都会到维表中进行一次查询,用于获取数据的最新状态。但频繁的I/O操作会影响作业性能,开启Batch Lookup,能够很大程度上减少I/O操作,降低对远程数据库的压力,从而提高Flink作业的吞吐量。
约束与限制
- 本章节仅适用于MRS 3.6.0-LTS及之后版本。
- Batch Lookup会使用内存缓存区,如果设置的batch size太大,可能导致高内存消耗,特别是在异步模式下。
- Batch Lookup使用了Flink的Minibatch机制,所以Batch Lookup和Minibatch不能同时使用。
使用方法
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.exec.batch-lookup.enabled”为“true”开启Batch Lookup优化,可参考创建FlinkServer作业。
CREATE TABLE hbaseTable (
-- HBase作为维表
user_id STRING,
f1 ROW < address STRING >,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'hbase_dim_table',
'lookup.batch' = 'true', -- 开启HBase Batch Lookup
'lookup.batch.size' = '1000', --设置每个批次的大小
'zookeeper.quorum' = 'ZooKeeper的quorumpeer实例业务IP1:ZooKeeper客户端端口号,ZooKeeper的quorumpeer实例业务IP2:ZooKeeper客户端端口号'
);
CREATE TABLE datagen (
`user_id` STRING,
`user_name` STRING,
proctime as proctime()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100000',
'fields.user_id.kind' = 'sequence',
'fields.user_id.start' = '1',
'fields.user_id.end' = '1000000',
'fields.user_name.length' = '15'
);
CREATE TABLE print (
-- Kafka作为Sink表
`user_id` VARCHAR,
`user_name` VARCHAR,
`address` VARCHAR
) WITH ('connector' = 'print');
INSERT INTO
print
SELECT
t.user_id,
t.user_name,
d.address
FROM
datagen as t
JOIN hbaseTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_id = d.user_id;
