更新时间:2025-12-10 GMT+08:00
分享

FlinkSQL HBase支持Batch Lookup Join

使用场景

Lookup Join是FlinkSQL的一种数据连接方式,用于执行输入流与位于某个远程数据库(维度表)上的表的连接,对于输入流中的每条记录,都会到维表中进行一次查询,用于获取数据的最新状态。但频繁的I/O操作会影响作业性能,开启Batch Lookup,能够很大程度上减少I/O操作,降低对远程数据库的压力,从而提高Flink作业的吞吐量。

约束与限制

  • 本章节仅适用于MRS 3.6.0-LTS及之后版本。
  • Batch Lookup会使用内存缓存区,如果设置的batch size太大,可能导致高内存消耗,特别是在异步模式下。
  • Batch Lookup使用了Flink的Minibatch机制,所以Batch Lookup和Minibatch不能同时使用。

使用方法

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.exec.batch-lookup.enabled”为“true”开启Batch Lookup优化,可参考创建FlinkServer作业

SQL示例:
CREATE TABLE hbaseTable (
  -- HBase作为维表
  user_id STRING,
  f1 ROW < address STRING >,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'hbase-2.2',
  'table-name' = 'hbase_dim_table',
  'lookup.batch' = 'true',  -- 开启HBase Batch Lookup
  'lookup.batch.size' = '1000',  --设置每个批次的大小
  'zookeeper.quorum' = 'ZooKeeper的quorumpeer实例业务IP1:ZooKeeper客户端端口号,ZooKeeper的quorumpeer实例业务IP2:ZooKeeper客户端端口号'
);
CREATE TABLE datagen (
  `user_id` STRING,
  `user_name` STRING,
  proctime as proctime()
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '100000',
  'fields.user_id.kind' = 'sequence',
  'fields.user_id.start' = '1',
  'fields.user_id.end' = '1000000',
  'fields.user_name.length' = '15'
);
CREATE TABLE print (
  -- Kafka作为Sink表
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `address` VARCHAR
) WITH ('connector' = 'print');
INSERT INTO
  print
SELECT
  t.user_id,
  t.user_name,
  d.address
FROM
  datagen as t
  JOIN hbaseTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_id = d.user_id;
  • ZooKeeper的quorumpeer实例业务IP:

    ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

  • ZooKeeper客户端端口号:

    登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

相关文档