更新时间:2025-12-10 GMT+08:00
分享

Flink作业大小表Join能力增强

本章节适用于MRS 3.3.0及以后版本。

Flink作业大小表Join

Flink作业双流Join时存在大小表数据,通过内核broadcast策略确保小表数据发送到Join的task中,通过rebalance策略将大表数据打散到Join中,提高Flink SQL易用性,增强作业稳定性。

图1 Flink作业大小表Join

在使用Flink SQL时,该特性通过hints方法指定Join的左表或右表为广播表,另一张表为rebalance表,SQL语句示例如下,分别以A\C作为小表示例:

  • 以A表作为广播表
    • 使用Join方式
      SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1
    • 使用Where方式
      SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1
  • 以A和C表作为广播表
    SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1
  • 支持通过“/*+ BROADCAST(smallTable1, smallTable2) */”方式使用该特性,兼容开源双流Join逻辑。
  • 不支持开源双流Join和该特性的切换,因为该特性会将数据广播到每个Join算子。
  • 不支持LEFT JOIN时小表为左表,RIGHT JOIN时小表为右表。

这种方式存在一个问题:如果大表发生更新,在Join操作中无法保证数据的有序性,因为大表采用的是Round-Robin分区策略,数据可能被分配到不同的分区中。

指定字段值进行Hash分区(仅MRS 3.6.0-LTS及之后版本支持)

FlinkSQL支持通过指定字段值进行Hash分区来解决大表Join时的无序性问题,这种方式大表可以根据指定字段进行Hash分区。如果指定的Hash字段是主键,那么当大表以主键进行更新时,新数据将与之前的数据进入同一个分区,从而在Join操作中保证数据的有序性。

约束与限制

  • 当指定的Hash字段是主键时,可以保证数据的有序性,其他情况下无法保证数据的有序性。
  • 需要保证指定的Hash字段没有倾斜问题。

使用方法

在FlinkSQL中添加 /*+ BROADCAST(s),HASH_BY_KEY('l'='user_id') */,其中BROADCAST(s)表示s表是广播表,HASH_BY_KEY('l'='user_id')表示l表将以user_id字段作为Hash字段。

SELECT /*+ BROADCAST(A), HASH_BY_KEY('B'='b2') */ a2, b2 FROM A JOIN B ON a1 = b1 /* 表A广播,表 B 按列b2 hash 分布 */ 
SELECT /*+ BROADCAST(A), HASH_BY_KEY('B'='b2, b3') */ a2, b2 FROM A JOIN B ON a1 = b1 /* 也支持多个字段进行hash,但b3如果不存在于B表,则会报错 */

SQL示例:

CREATE TABLE large_table (
  user_id INT,
  id INT,
  u_name STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
 'connector' = 'hudi',
 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
 'hoodie.datasource.write.recordkey.field' = 'user_id ',
 'read.tasks' = '1',
 'read.streaming.enabled' = 'true',
 'read.streaming.check-interval' = '5',
 'read.streaming.start-commit' = 'earliest'
);
CREATE TABLE small_table(
  id INT,
  age int
) WITH (
  'connector' = 'kafka',
  'topic' = 'small_table',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
SELECT
  /*+ BROADCAST(s),HASH_BY_KEY('l'='user_id') */
  l.user_id,
  s.id,
  l.u_name,
  s.age
FROM
  large_table l
  JOIN small_table s ON l.id = s.id;

Flink作业大小表Join去重

在双流关联的业务模型中,关联算子接收到其中一个流发送的大量重复数据,则会导致下游算子需要处理大量重复数据,影响作业性能。

如A表字段(P1,A1,A2)使用如下方式关联B表字段(P1,B1,B2,B3)生成C的场景中,B表信息发生大量更新,但是B中的所需字段没有更新,在该关联中仅用到了B表的B1和B2字段,对于B表,每个记录更新只更新B3字段,B1和B2不更新,因此当B表更新,可以忽略更新后的数据。

select  A.A1,B.B1,B.B2 from A join B on A.P1=B.P1

为解决如上问题可通过使用hint单独为左表(duplicate.left)或右表(duplicate.right)设置去重:

  • 格式
    • 为左表设置去重
       /*+ OPTIONS('duplicate.left'='true')*/
    • 为右表设置去重
       /*+ OPTIONS('duplicate.right'='true')*/
    • 同时为左表和右表设置去重
       /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/
  • 在SQL语句中配置
    如同时为左表“user_info”和右表“user_score”设置去重。
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      -- 为左表和右表设置去重
      user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;
    Kafka Broker实例IP地址及端口号说明:
    • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
    • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
    • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

      登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

相关文档